Skip to main content

grafeo_core/execution/operators/
apply.rs

1//! Apply operator (lateral join / correlated subquery).
2//!
3//! For each row from the outer input, the inner subplan is reset and executed.
4//! Results are the concatenation of outer columns with inner columns.
5//!
6//! This operator is the backend for:
7//! - Cypher: `CALL { subquery }`
8//! - GQL: `VALUE { subquery }`
9//! - Pattern comprehensions (with a Collect aggregate wrapper)
10
11use std::sync::Arc;
12
13use grafeo_common::types::{LogicalType, Value};
14
15use super::parameter_scan::ParameterState;
16use super::{DataChunk, Operator, OperatorResult};
17use crate::execution::vector::ValueVector;
18
19/// Apply (lateral join) operator.
20///
21/// Evaluates `inner` once for each row of `outer`. The result schema is
22/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
23/// for a given outer row, that outer row is omitted (inner join semantics).
24///
25/// When `param_state` is set, outer row values for the specified column indices
26/// are injected into the shared [`ParameterState`] before each inner execution,
27/// allowing the inner plan's [`ParameterScanOperator`](super::ParameterScanOperator) to read them.
28pub struct ApplyOperator {
29    outer: Box<dyn Operator>,
30    inner: Box<dyn Operator>,
31    /// Shared parameter state for correlated subqueries.
32    param_state: Option<Arc<ParameterState>>,
33    /// Indices of outer columns to inject into the inner plan.
34    param_col_indices: Vec<usize>,
35    /// When true, outer rows with no inner results emit NULLs (left-join).
36    optional: bool,
37    /// Number of columns the inner plan produces (needed for NULL-padding).
38    inner_column_count: usize,
39    /// EXISTS mode: Some(true) = semi-join (keep if inner has rows),
40    /// Some(false) = anti-join (keep if inner has NO rows).
41    /// Inner columns are NOT appended in EXISTS mode.
42    exists_mode: Option<bool>,
43    /// Buffered outer rows waiting to be combined with inner results.
44    state: ApplyState,
45}
46
47enum ApplyState {
48    /// Pull next outer chunk, process row-by-row.
49    Init,
50    /// Processing a chunk of outer rows. `outer_chunk` is the current batch,
51    /// `outer_row` is the next row index to process.
52    Processing {
53        outer_chunk: DataChunk,
54        outer_row: usize,
55        /// Accumulated output rows (combined outer + inner).
56        output: Vec<Vec<Value>>,
57    },
58    /// All outer input exhausted.
59    Done,
60}
61
62impl ApplyOperator {
63    /// Creates a new Apply operator (uncorrelated: no parameter injection).
64    pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
65        Self {
66            outer,
67            inner,
68            param_state: None,
69            param_col_indices: Vec::new(),
70            optional: false,
71            inner_column_count: 0,
72            exists_mode: None,
73            state: ApplyState::Init,
74        }
75    }
76
77    /// Creates a correlated Apply operator that injects outer row values.
78    ///
79    /// `param_state` is shared with a [`ParameterScanOperator`](super::ParameterScanOperator) in the inner plan.
80    /// `param_col_indices` specifies which outer columns to inject (by index).
81    pub fn new_correlated(
82        outer: Box<dyn Operator>,
83        inner: Box<dyn Operator>,
84        param_state: Arc<ParameterState>,
85        param_col_indices: Vec<usize>,
86    ) -> Self {
87        Self {
88            outer,
89            inner,
90            param_state: Some(param_state),
91            param_col_indices,
92            optional: false,
93            inner_column_count: 0,
94            exists_mode: None,
95            state: ApplyState::Init,
96        }
97    }
98
99    /// Enables optional (left-join) semantics with the given inner column count.
100    ///
101    /// When enabled, outer rows that produce no inner results will be emitted
102    /// with NULL values for the inner columns instead of being dropped.
103    pub fn with_optional(mut self, inner_column_count: usize) -> Self {
104        self.optional = true;
105        self.inner_column_count = inner_column_count;
106        self
107    }
108
109    /// Enables EXISTS mode: semi-join (`keep_matches=true`) or anti-join
110    /// (`keep_matches=false`). Inner columns are NOT appended to the output.
111    pub fn with_exists_mode(mut self, keep_matches: bool) -> Self {
112        self.exists_mode = Some(keep_matches);
113        self
114    }
115
116    /// Extracts all values from a single row of a DataChunk.
117    fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
118        let mut values = Vec::with_capacity(chunk.num_columns());
119        for col_idx in 0..chunk.num_columns() {
120            let val = chunk
121                .column(col_idx)
122                .and_then(|col| col.get_value(row))
123                .unwrap_or(Value::Null);
124            values.push(val);
125        }
126        values
127    }
128
129    /// Builds a DataChunk from accumulated rows.
130    fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
131        if rows.is_empty() {
132            return DataChunk::empty();
133        }
134        let num_cols = rows[0].len();
135        let mut columns: Vec<ValueVector> = (0..num_cols)
136            .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
137            .collect();
138
139        for row in rows {
140            for (col_idx, val) in row.iter().enumerate() {
141                if col_idx < columns.len() {
142                    columns[col_idx].push_value(val.clone());
143                }
144            }
145        }
146        DataChunk::new(columns)
147    }
148}
149
150impl Operator for ApplyOperator {
151    fn next(&mut self) -> OperatorResult {
152        loop {
153            match &mut self.state {
154                ApplyState::Init => match self.outer.next()? {
155                    Some(chunk) => {
156                        self.state = ApplyState::Processing {
157                            outer_chunk: chunk,
158                            outer_row: 0,
159                            output: Vec::new(),
160                        };
161                    }
162                    None => {
163                        self.state = ApplyState::Done;
164                        return Ok(None);
165                    }
166                },
167                ApplyState::Processing {
168                    outer_chunk,
169                    outer_row,
170                    output,
171                } => {
172                    let selected: Vec<usize> = outer_chunk.selected_indices().collect();
173                    while *outer_row < selected.len() {
174                        let row = selected[*outer_row];
175                        let outer_values = Self::extract_row(outer_chunk, row);
176
177                        // Inject outer values into the inner plan's parameter state
178                        if let Some(ref param_state) = self.param_state {
179                            let injected: Vec<Value> = self
180                                .param_col_indices
181                                .iter()
182                                .map(|&idx| outer_values.get(idx).cloned().unwrap_or(Value::Null))
183                                .collect();
184                            param_state.set_values(injected);
185                        }
186
187                        // Reset and run inner plan for this outer row
188                        self.inner.reset();
189
190                        // EXISTS mode: check for row existence without appending inner columns
191                        if let Some(keep_matches) = self.exists_mode {
192                            let has_results = self.inner.next()?.is_some();
193                            if has_results == keep_matches {
194                                output.push(outer_values);
195                            }
196                        } else {
197                            let pre_len = output.len();
198                            while let Some(inner_chunk) = self.inner.next()? {
199                                for inner_row in inner_chunk.selected_indices() {
200                                    let inner_values = Self::extract_row(&inner_chunk, inner_row);
201                                    let mut combined = outer_values.clone();
202                                    combined.extend(inner_values);
203                                    output.push(combined);
204                                }
205                            }
206
207                            // OPTIONAL: emit outer row with NULLs when inner produced nothing
208                            if self.optional && output.len() == pre_len {
209                                let mut combined = outer_values;
210                                combined.extend(std::iter::repeat_n(
211                                    Value::Null,
212                                    self.inner_column_count,
213                                ));
214                                output.push(combined);
215                            }
216                        }
217
218                        *outer_row += 1;
219
220                        // Flush when we have enough rows
221                        if output.len() >= 1024 {
222                            let chunk = Self::build_chunk(output);
223                            output.clear();
224                            return Ok(Some(chunk));
225                        }
226                    }
227
228                    // Finished this outer chunk; flush any remaining output
229                    if !output.is_empty() {
230                        let chunk = Self::build_chunk(output);
231                        output.clear();
232                        self.state = ApplyState::Init;
233                        return Ok(Some(chunk));
234                    }
235
236                    // Move to next outer chunk
237                    self.state = ApplyState::Init;
238                }
239                ApplyState::Done => return Ok(None),
240            }
241        }
242    }
243
244    fn reset(&mut self) {
245        self.outer.reset();
246        self.inner.reset();
247        self.state = ApplyState::Init;
248    }
249
250    fn name(&self) -> &'static str {
251        "Apply"
252    }
253}