Skip to main content

grafeo_core/execution/operators/
apply.rs

1//! Apply operator (lateral join / correlated subquery).
2//!
3//! For each row from the outer input, the inner subplan is reset and executed.
4//! Results are the concatenation of outer columns with inner columns.
5//!
6//! This operator is the backend for:
7//! - Cypher: `CALL { subquery }`
8//! - GQL: `VALUE { subquery }`
9//! - Pattern comprehensions (with a Collect aggregate wrapper)
10
11use std::sync::Arc;
12
13use grafeo_common::types::{LogicalType, Value};
14
15use super::parameter_scan::ParameterState;
16use super::{DataChunk, Operator, OperatorResult};
17use crate::execution::vector::ValueVector;
18
19/// Apply (lateral join) operator.
20///
21/// Evaluates `inner` once for each row of `outer`. The result schema is
22/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
23/// for a given outer row, that outer row is omitted (inner join semantics).
24///
25/// When `param_state` is set, outer row values for the specified column indices
26/// are injected into the shared [`ParameterState`] before each inner execution,
27/// allowing the inner plan's [`ParameterScanOperator`](super::ParameterScanOperator) to read them.
28pub struct ApplyOperator {
29    outer: Box<dyn Operator>,
30    inner: Box<dyn Operator>,
31    /// Shared parameter state for correlated subqueries.
32    param_state: Option<Arc<ParameterState>>,
33    /// Indices of outer columns to inject into the inner plan.
34    param_col_indices: Vec<usize>,
35    /// Buffered outer rows waiting to be combined with inner results.
36    state: ApplyState,
37}
38
39enum ApplyState {
40    /// Pull next outer chunk, process row-by-row.
41    Init,
42    /// Processing a chunk of outer rows. `outer_chunk` is the current batch,
43    /// `outer_row` is the next row index to process.
44    Processing {
45        outer_chunk: DataChunk,
46        outer_row: usize,
47        /// Accumulated output rows (combined outer + inner).
48        output: Vec<Vec<Value>>,
49    },
50    /// All outer input exhausted.
51    Done,
52}
53
54impl ApplyOperator {
55    /// Creates a new Apply operator (uncorrelated: no parameter injection).
56    pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
57        Self {
58            outer,
59            inner,
60            param_state: None,
61            param_col_indices: Vec::new(),
62            state: ApplyState::Init,
63        }
64    }
65
66    /// Creates a correlated Apply operator that injects outer row values.
67    ///
68    /// `param_state` is shared with a [`ParameterScanOperator`](super::ParameterScanOperator) in the inner plan.
69    /// `param_col_indices` specifies which outer columns to inject (by index).
70    pub fn new_correlated(
71        outer: Box<dyn Operator>,
72        inner: Box<dyn Operator>,
73        param_state: Arc<ParameterState>,
74        param_col_indices: Vec<usize>,
75    ) -> Self {
76        Self {
77            outer,
78            inner,
79            param_state: Some(param_state),
80            param_col_indices,
81            state: ApplyState::Init,
82        }
83    }
84
85    /// Extracts all values from a single row of a DataChunk.
86    fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
87        let mut values = Vec::with_capacity(chunk.num_columns());
88        for col_idx in 0..chunk.num_columns() {
89            let val = chunk
90                .column(col_idx)
91                .and_then(|col| col.get_value(row))
92                .unwrap_or(Value::Null);
93            values.push(val);
94        }
95        values
96    }
97
98    /// Builds a DataChunk from accumulated rows.
99    fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
100        if rows.is_empty() {
101            return DataChunk::empty();
102        }
103        let num_cols = rows[0].len();
104        let mut columns: Vec<ValueVector> = (0..num_cols)
105            .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
106            .collect();
107
108        for row in rows {
109            for (col_idx, val) in row.iter().enumerate() {
110                if col_idx < columns.len() {
111                    columns[col_idx].push_value(val.clone());
112                }
113            }
114        }
115        DataChunk::new(columns)
116    }
117}
118
119impl Operator for ApplyOperator {
120    fn next(&mut self) -> OperatorResult {
121        loop {
122            match &mut self.state {
123                ApplyState::Init => match self.outer.next()? {
124                    Some(chunk) => {
125                        self.state = ApplyState::Processing {
126                            outer_chunk: chunk,
127                            outer_row: 0,
128                            output: Vec::new(),
129                        };
130                    }
131                    None => {
132                        self.state = ApplyState::Done;
133                        return Ok(None);
134                    }
135                },
136                ApplyState::Processing {
137                    outer_chunk,
138                    outer_row,
139                    output,
140                } => {
141                    let selected: Vec<usize> = outer_chunk.selected_indices().collect();
142                    while *outer_row < selected.len() {
143                        let row = selected[*outer_row];
144                        let outer_values = Self::extract_row(outer_chunk, row);
145
146                        // Inject outer values into the inner plan's parameter state
147                        if let Some(ref param_state) = self.param_state {
148                            let injected: Vec<Value> = self
149                                .param_col_indices
150                                .iter()
151                                .map(|&idx| outer_values.get(idx).cloned().unwrap_or(Value::Null))
152                                .collect();
153                            param_state.set_values(injected);
154                        }
155
156                        // Reset and run inner plan for this outer row
157                        self.inner.reset();
158                        while let Some(inner_chunk) = self.inner.next()? {
159                            for inner_row in inner_chunk.selected_indices() {
160                                let inner_values = Self::extract_row(&inner_chunk, inner_row);
161                                let mut combined = outer_values.clone();
162                                combined.extend(inner_values);
163                                output.push(combined);
164                            }
165                        }
166
167                        *outer_row += 1;
168
169                        // Flush when we have enough rows
170                        if output.len() >= 1024 {
171                            let chunk = Self::build_chunk(output);
172                            output.clear();
173                            return Ok(Some(chunk));
174                        }
175                    }
176
177                    // Finished this outer chunk; flush any remaining output
178                    if !output.is_empty() {
179                        let chunk = Self::build_chunk(output);
180                        output.clear();
181                        self.state = ApplyState::Init;
182                        return Ok(Some(chunk));
183                    }
184
185                    // Move to next outer chunk
186                    self.state = ApplyState::Init;
187                }
188                ApplyState::Done => return Ok(None),
189            }
190        }
191    }
192
193    fn reset(&mut self) {
194        self.outer.reset();
195        self.inner.reset();
196        self.state = ApplyState::Init;
197    }
198
199    fn name(&self) -> &'static str {
200        "Apply"
201    }
202}