Skip to main content

grafeo_core/execution/operators/
apply.rs

1//! Apply operator (lateral join / correlated subquery).
2//!
3//! For each row from the outer input, the inner subplan is reset and executed.
4//! Results are the concatenation of outer columns with inner columns.
5//!
6//! This operator is the backend for:
7//! - Cypher: `CALL { subquery }`
8//! - GQL: `VALUE { subquery }`
9//! - Pattern comprehensions (with a Collect aggregate wrapper)
10
11use std::sync::Arc;
12
13use grafeo_common::types::{LogicalType, Value};
14
15use super::parameter_scan::ParameterState;
16use super::{DataChunk, Operator, OperatorResult};
17use crate::execution::vector::ValueVector;
18
19/// Apply (lateral join) operator.
20///
21/// Evaluates `inner` once for each row of `outer`. The result schema is
22/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
23/// for a given outer row, that outer row is omitted (inner join semantics).
24///
25/// When `param_state` is set, outer row values for the specified column indices
26/// are injected into the shared [`ParameterState`] before each inner execution,
27/// allowing the inner plan's [`ParameterScanOperator`](super::ParameterScanOperator) to read them.
28pub struct ApplyOperator {
29    outer: Box<dyn Operator>,
30    inner: Box<dyn Operator>,
31    /// Shared parameter state for correlated subqueries.
32    param_state: Option<Arc<ParameterState>>,
33    /// Indices of outer columns to inject into the inner plan.
34    param_col_indices: Vec<usize>,
35    /// When true, outer rows with no inner results emit NULLs (left-join).
36    optional: bool,
37    /// Number of columns the inner plan produces (needed for NULL-padding).
38    inner_column_count: usize,
39    /// EXISTS mode: Some(true) = semi-join (keep if inner has rows),
40    /// Some(false) = anti-join (keep if inner has NO rows).
41    /// Inner columns are NOT appended in EXISTS mode.
42    exists_mode: Option<bool>,
43    /// EXISTS flag mode: instead of filtering rows, append a boolean column
44    /// indicating whether the inner plan produced results. All outer rows
45    /// are preserved. Used for EXISTS inside OR predicates.
46    exists_flag: bool,
47    /// Buffered outer rows waiting to be combined with inner results.
48    state: ApplyState,
49}
50
51enum ApplyState {
52    /// Pull next outer chunk, process row-by-row.
53    Init,
54    /// Processing a chunk of outer rows. `outer_chunk` is the current batch,
55    /// `outer_row` is the next row index to process.
56    Processing {
57        outer_chunk: DataChunk,
58        outer_row: usize,
59        /// Accumulated output rows (combined outer + inner).
60        output: Vec<Vec<Value>>,
61    },
62    /// All outer input exhausted.
63    Done,
64}
65
66impl ApplyOperator {
67    /// Creates a new Apply operator (uncorrelated: no parameter injection).
68    pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
69        Self {
70            outer,
71            inner,
72            param_state: None,
73            param_col_indices: Vec::new(),
74            optional: false,
75            inner_column_count: 0,
76            exists_mode: None,
77            exists_flag: false,
78            state: ApplyState::Init,
79        }
80    }
81
82    /// Creates a correlated Apply operator that injects outer row values.
83    ///
84    /// `param_state` is shared with a [`ParameterScanOperator`](super::ParameterScanOperator) in the inner plan.
85    /// `param_col_indices` specifies which outer columns to inject (by index).
86    pub fn new_correlated(
87        outer: Box<dyn Operator>,
88        inner: Box<dyn Operator>,
89        param_state: Arc<ParameterState>,
90        param_col_indices: Vec<usize>,
91    ) -> Self {
92        Self {
93            outer,
94            inner,
95            param_state: Some(param_state),
96            param_col_indices,
97            optional: false,
98            inner_column_count: 0,
99            exists_mode: None,
100            exists_flag: false,
101            state: ApplyState::Init,
102        }
103    }
104
105    /// Enables optional (left-join) semantics with the given inner column count.
106    ///
107    /// When enabled, outer rows that produce no inner results will be emitted
108    /// with NULL values for the inner columns instead of being dropped.
109    pub fn with_optional(mut self, inner_column_count: usize) -> Self {
110        self.optional = true;
111        self.inner_column_count = inner_column_count;
112        self
113    }
114
115    /// Enables EXISTS mode: semi-join (`keep_matches=true`) or anti-join
116    /// (`keep_matches=false`). Inner columns are NOT appended to the output.
117    pub fn with_exists_mode(mut self, keep_matches: bool) -> Self {
118        self.exists_mode = Some(keep_matches);
119        self
120    }
121
122    /// Enables EXISTS flag mode: instead of filtering, appends a boolean
123    /// column indicating whether the inner plan produced results. All outer
124    /// rows are preserved. Used for EXISTS inside OR predicates where
125    /// semi-join filtering would be incorrect.
126    pub fn with_exists_flag(mut self) -> Self {
127        self.exists_flag = true;
128        self
129    }
130
131    /// Extracts all values from a single row of a DataChunk.
132    fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
133        let mut values = Vec::with_capacity(chunk.num_columns());
134        for col_idx in 0..chunk.num_columns() {
135            let val = chunk
136                .column(col_idx)
137                .and_then(|col| col.get_value(row))
138                .unwrap_or(Value::Null);
139            values.push(val);
140        }
141        values
142    }
143
144    /// Builds a DataChunk from accumulated rows.
145    fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
146        if rows.is_empty() {
147            return DataChunk::empty();
148        }
149        let num_cols = rows[0].len();
150        let mut columns: Vec<ValueVector> = (0..num_cols)
151            .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
152            .collect();
153
154        for row in rows {
155            for (col_idx, val) in row.iter().enumerate() {
156                if col_idx < columns.len() {
157                    columns[col_idx].push_value(val.clone());
158                }
159            }
160        }
161        DataChunk::new(columns)
162    }
163}
164
165impl Operator for ApplyOperator {
166    fn next(&mut self) -> OperatorResult {
167        loop {
168            match &mut self.state {
169                ApplyState::Init => match self.outer.next()? {
170                    Some(chunk) => {
171                        self.state = ApplyState::Processing {
172                            outer_chunk: chunk,
173                            outer_row: 0,
174                            output: Vec::new(),
175                        };
176                    }
177                    None => {
178                        self.state = ApplyState::Done;
179                        return Ok(None);
180                    }
181                },
182                ApplyState::Processing {
183                    outer_chunk,
184                    outer_row,
185                    output,
186                } => {
187                    let selected: Vec<usize> = outer_chunk.selected_indices().collect();
188                    while *outer_row < selected.len() {
189                        let row = selected[*outer_row];
190                        let outer_values = Self::extract_row(outer_chunk, row);
191
192                        // Inject outer values into the inner plan's parameter state
193                        if let Some(ref param_state) = self.param_state {
194                            let injected: Vec<Value> = self
195                                .param_col_indices
196                                .iter()
197                                .map(|&idx| outer_values.get(idx).cloned().unwrap_or(Value::Null))
198                                .collect();
199                            param_state.set_values(injected);
200                        }
201
202                        // Reset and run inner plan for this outer row
203                        self.inner.reset();
204
205                        // EXISTS flag mode: append boolean column, keep all rows
206                        if self.exists_flag {
207                            let has_results = self.inner.next()?.is_some();
208                            let mut combined = outer_values;
209                            combined.push(Value::Bool(has_results));
210                            output.push(combined);
211                        }
212                        // EXISTS mode: check for row existence without appending inner columns
213                        else if let Some(keep_matches) = self.exists_mode {
214                            let has_results = self.inner.next()?.is_some();
215                            if has_results == keep_matches {
216                                output.push(outer_values);
217                            }
218                        } else {
219                            let pre_len = output.len();
220                            while let Some(inner_chunk) = self.inner.next()? {
221                                for inner_row in inner_chunk.selected_indices() {
222                                    let inner_values = Self::extract_row(&inner_chunk, inner_row);
223                                    let mut combined = outer_values.clone();
224                                    combined.extend(inner_values);
225                                    output.push(combined);
226                                }
227                            }
228
229                            // OPTIONAL: emit outer row with NULLs when inner produced nothing
230                            if self.optional && output.len() == pre_len {
231                                let mut combined = outer_values;
232                                combined.extend(std::iter::repeat_n(
233                                    Value::Null,
234                                    self.inner_column_count,
235                                ));
236                                output.push(combined);
237                            }
238                        }
239
240                        *outer_row += 1;
241
242                        // Flush when we have enough rows
243                        if output.len() >= 1024 {
244                            let chunk = Self::build_chunk(output);
245                            output.clear();
246                            return Ok(Some(chunk));
247                        }
248                    }
249
250                    // Finished this outer chunk; flush any remaining output
251                    if !output.is_empty() {
252                        let chunk = Self::build_chunk(output);
253                        output.clear();
254                        self.state = ApplyState::Init;
255                        return Ok(Some(chunk));
256                    }
257
258                    // Move to next outer chunk
259                    self.state = ApplyState::Init;
260                }
261                ApplyState::Done => return Ok(None),
262            }
263        }
264    }
265
266    fn reset(&mut self) {
267        self.outer.reset();
268        self.inner.reset();
269        self.state = ApplyState::Init;
270    }
271
272    fn name(&self) -> &'static str {
273        "Apply"
274    }
275}