grafeo_core/execution/operators/apply.rs
1//! Apply operator (lateral join / correlated subquery).
2//!
3//! For each row from the outer input, the inner subplan is reset and executed.
4//! Results are the concatenation of outer columns with inner columns.
5//!
6//! This operator is the backend for:
7//! - Cypher: `CALL { subquery }`
8//! - GQL: `VALUE { subquery }`
9//! - Pattern comprehensions (with a Collect aggregate wrapper)
10
11use std::sync::Arc;
12
13use grafeo_common::types::{LogicalType, Value};
14
15use super::parameter_scan::ParameterState;
16use super::{DataChunk, Operator, OperatorResult};
17use crate::execution::vector::ValueVector;
18
19/// Apply (lateral join) operator.
20///
21/// Evaluates `inner` once for each row of `outer`. The result schema is
22/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
23/// for a given outer row, that outer row is omitted (inner join semantics).
24///
25/// When `param_state` is set, outer row values for the specified column indices
26/// are injected into the shared [`ParameterState`] before each inner execution,
27/// allowing the inner plan's [`ParameterScanOperator`](super::ParameterScanOperator) to read them.
28pub struct ApplyOperator {
29 outer: Box<dyn Operator>,
30 inner: Box<dyn Operator>,
31 /// Shared parameter state for correlated subqueries.
32 param_state: Option<Arc<ParameterState>>,
33 /// Indices of outer columns to inject into the inner plan.
34 param_col_indices: Vec<usize>,
35 /// When true, outer rows with no inner results emit NULLs (left-join).
36 optional: bool,
37 /// Number of columns the inner plan produces (needed for NULL-padding).
38 inner_column_count: usize,
39 /// EXISTS mode: Some(true) = semi-join (keep if inner has rows),
40 /// Some(false) = anti-join (keep if inner has NO rows).
41 /// Inner columns are NOT appended in EXISTS mode.
42 exists_mode: Option<bool>,
43 /// EXISTS flag mode: instead of filtering rows, append a boolean column
44 /// indicating whether the inner plan produced results. All outer rows
45 /// are preserved. Used for EXISTS inside OR predicates.
46 exists_flag: bool,
47 /// Buffered outer rows waiting to be combined with inner results.
48 state: ApplyState,
49}
50
51enum ApplyState {
52 /// Pull next outer chunk, process row-by-row.
53 Init,
54 /// Processing a chunk of outer rows. `outer_chunk` is the current batch,
55 /// `outer_row` is the next row index to process.
56 Processing {
57 outer_chunk: DataChunk,
58 outer_row: usize,
59 /// Accumulated output rows (combined outer + inner).
60 output: Vec<Vec<Value>>,
61 },
62 /// All outer input exhausted.
63 Done,
64}
65
66impl ApplyOperator {
67 /// Creates a new Apply operator (uncorrelated: no parameter injection).
68 pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
69 Self {
70 outer,
71 inner,
72 param_state: None,
73 param_col_indices: Vec::new(),
74 optional: false,
75 inner_column_count: 0,
76 exists_mode: None,
77 exists_flag: false,
78 state: ApplyState::Init,
79 }
80 }
81
82 /// Creates a correlated Apply operator that injects outer row values.
83 ///
84 /// `param_state` is shared with a [`ParameterScanOperator`](super::ParameterScanOperator) in the inner plan.
85 /// `param_col_indices` specifies which outer columns to inject (by index).
86 pub fn new_correlated(
87 outer: Box<dyn Operator>,
88 inner: Box<dyn Operator>,
89 param_state: Arc<ParameterState>,
90 param_col_indices: Vec<usize>,
91 ) -> Self {
92 Self {
93 outer,
94 inner,
95 param_state: Some(param_state),
96 param_col_indices,
97 optional: false,
98 inner_column_count: 0,
99 exists_mode: None,
100 exists_flag: false,
101 state: ApplyState::Init,
102 }
103 }
104
105 /// Enables optional (left-join) semantics with the given inner column count.
106 ///
107 /// When enabled, outer rows that produce no inner results will be emitted
108 /// with NULL values for the inner columns instead of being dropped.
109 pub fn with_optional(mut self, inner_column_count: usize) -> Self {
110 self.optional = true;
111 self.inner_column_count = inner_column_count;
112 self
113 }
114
115 /// Enables EXISTS mode: semi-join (`keep_matches=true`) or anti-join
116 /// (`keep_matches=false`). Inner columns are NOT appended to the output.
117 pub fn with_exists_mode(mut self, keep_matches: bool) -> Self {
118 self.exists_mode = Some(keep_matches);
119 self
120 }
121
122 /// Enables EXISTS flag mode: instead of filtering, appends a boolean
123 /// column indicating whether the inner plan produced results. All outer
124 /// rows are preserved. Used for EXISTS inside OR predicates where
125 /// semi-join filtering would be incorrect.
126 pub fn with_exists_flag(mut self) -> Self {
127 self.exists_flag = true;
128 self
129 }
130
131 /// Extracts all values from a single row of a DataChunk.
132 fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
133 let mut values = Vec::with_capacity(chunk.num_columns());
134 for col_idx in 0..chunk.num_columns() {
135 let val = chunk
136 .column(col_idx)
137 .and_then(|col| col.get_value(row))
138 .unwrap_or(Value::Null);
139 values.push(val);
140 }
141 values
142 }
143
144 /// Builds a DataChunk from accumulated rows.
145 fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
146 if rows.is_empty() {
147 return DataChunk::empty();
148 }
149 let num_cols = rows[0].len();
150 let mut columns: Vec<ValueVector> = (0..num_cols)
151 .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
152 .collect();
153
154 for row in rows {
155 for (col_idx, val) in row.iter().enumerate() {
156 if col_idx < columns.len() {
157 columns[col_idx].push_value(val.clone());
158 }
159 }
160 }
161 DataChunk::new(columns)
162 }
163}
164
165impl Operator for ApplyOperator {
166 fn next(&mut self) -> OperatorResult {
167 loop {
168 match &mut self.state {
169 ApplyState::Init => match self.outer.next()? {
170 Some(chunk) => {
171 self.state = ApplyState::Processing {
172 outer_chunk: chunk,
173 outer_row: 0,
174 output: Vec::new(),
175 };
176 }
177 None => {
178 self.state = ApplyState::Done;
179 return Ok(None);
180 }
181 },
182 ApplyState::Processing {
183 outer_chunk,
184 outer_row,
185 output,
186 } => {
187 let selected: Vec<usize> = outer_chunk.selected_indices().collect();
188 while *outer_row < selected.len() {
189 let row = selected[*outer_row];
190 let outer_values = Self::extract_row(outer_chunk, row);
191
192 // Inject outer values into the inner plan's parameter state
193 if let Some(ref param_state) = self.param_state {
194 let injected: Vec<Value> = self
195 .param_col_indices
196 .iter()
197 .map(|&idx| outer_values.get(idx).cloned().unwrap_or(Value::Null))
198 .collect();
199 param_state.set_values(injected);
200 }
201
202 // Reset and run inner plan for this outer row
203 self.inner.reset();
204
205 // EXISTS flag mode: append boolean column, keep all rows
206 if self.exists_flag {
207 let has_results = self.inner.next()?.is_some();
208 let mut combined = outer_values;
209 combined.push(Value::Bool(has_results));
210 output.push(combined);
211 }
212 // EXISTS mode: check for row existence without appending inner columns
213 else if let Some(keep_matches) = self.exists_mode {
214 let has_results = self.inner.next()?.is_some();
215 if has_results == keep_matches {
216 output.push(outer_values);
217 }
218 } else {
219 let pre_len = output.len();
220 while let Some(inner_chunk) = self.inner.next()? {
221 for inner_row in inner_chunk.selected_indices() {
222 let inner_values = Self::extract_row(&inner_chunk, inner_row);
223 let mut combined = outer_values.clone();
224 combined.extend(inner_values);
225 output.push(combined);
226 }
227 }
228
229 // OPTIONAL: emit outer row with NULLs when inner produced nothing
230 if self.optional && output.len() == pre_len {
231 let mut combined = outer_values;
232 combined.extend(std::iter::repeat_n(
233 Value::Null,
234 self.inner_column_count,
235 ));
236 output.push(combined);
237 }
238 }
239
240 *outer_row += 1;
241
242 // Flush when we have enough rows
243 if output.len() >= 1024 {
244 let chunk = Self::build_chunk(output);
245 output.clear();
246 return Ok(Some(chunk));
247 }
248 }
249
250 // Finished this outer chunk; flush any remaining output
251 if !output.is_empty() {
252 let chunk = Self::build_chunk(output);
253 output.clear();
254 self.state = ApplyState::Init;
255 return Ok(Some(chunk));
256 }
257
258 // Move to next outer chunk
259 self.state = ApplyState::Init;
260 }
261 ApplyState::Done => return Ok(None),
262 }
263 }
264 }
265
266 fn reset(&mut self) {
267 self.outer.reset();
268 self.inner.reset();
269 self.state = ApplyState::Init;
270 }
271
272 fn name(&self) -> &'static str {
273 "Apply"
274 }
275}