grafeo_core/execution/operators/apply.rs
1//! Apply operator (lateral join / correlated subquery).
2//!
3//! For each row from the outer input, the inner subplan is reset and executed.
4//! Results are the concatenation of outer columns with inner columns.
5//!
6//! This operator is the backend for:
7//! - Cypher: `CALL { subquery }`
8//! - GQL: `VALUE { subquery }`
9//! - Pattern comprehensions (with a Collect aggregate wrapper)
10
11use std::sync::Arc;
12
13use grafeo_common::types::{LogicalType, Value};
14
15use super::parameter_scan::ParameterState;
16use super::{DataChunk, Operator, OperatorResult};
17use crate::execution::vector::ValueVector;
18
19/// Apply (lateral join) operator.
20///
21/// Evaluates `inner` once for each row of `outer`. The result schema is
22/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
23/// for a given outer row, that outer row is omitted (inner join semantics).
24///
25/// When `param_state` is set, outer row values for the specified column indices
26/// are injected into the shared [`ParameterState`] before each inner execution,
27/// allowing the inner plan's [`ParameterScanOperator`](super::ParameterScanOperator) to read them.
28pub struct ApplyOperator {
29 outer: Box<dyn Operator>,
30 inner: Box<dyn Operator>,
31 /// Shared parameter state for correlated subqueries.
32 param_state: Option<Arc<ParameterState>>,
33 /// Indices of outer columns to inject into the inner plan.
34 param_col_indices: Vec<usize>,
35 /// When true, outer rows with no inner results emit NULLs (left-join).
36 optional: bool,
37 /// Number of columns the inner plan produces (needed for NULL-padding).
38 inner_column_count: usize,
39 /// EXISTS mode: Some(true) = semi-join (keep if inner has rows),
40 /// Some(false) = anti-join (keep if inner has NO rows).
41 /// Inner columns are NOT appended in EXISTS mode.
42 exists_mode: Option<bool>,
43 /// Buffered outer rows waiting to be combined with inner results.
44 state: ApplyState,
45}
46
47enum ApplyState {
48 /// Pull next outer chunk, process row-by-row.
49 Init,
50 /// Processing a chunk of outer rows. `outer_chunk` is the current batch,
51 /// `outer_row` is the next row index to process.
52 Processing {
53 outer_chunk: DataChunk,
54 outer_row: usize,
55 /// Accumulated output rows (combined outer + inner).
56 output: Vec<Vec<Value>>,
57 },
58 /// All outer input exhausted.
59 Done,
60}
61
62impl ApplyOperator {
63 /// Creates a new Apply operator (uncorrelated: no parameter injection).
64 pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
65 Self {
66 outer,
67 inner,
68 param_state: None,
69 param_col_indices: Vec::new(),
70 optional: false,
71 inner_column_count: 0,
72 exists_mode: None,
73 state: ApplyState::Init,
74 }
75 }
76
77 /// Creates a correlated Apply operator that injects outer row values.
78 ///
79 /// `param_state` is shared with a [`ParameterScanOperator`](super::ParameterScanOperator) in the inner plan.
80 /// `param_col_indices` specifies which outer columns to inject (by index).
81 pub fn new_correlated(
82 outer: Box<dyn Operator>,
83 inner: Box<dyn Operator>,
84 param_state: Arc<ParameterState>,
85 param_col_indices: Vec<usize>,
86 ) -> Self {
87 Self {
88 outer,
89 inner,
90 param_state: Some(param_state),
91 param_col_indices,
92 optional: false,
93 inner_column_count: 0,
94 exists_mode: None,
95 state: ApplyState::Init,
96 }
97 }
98
99 /// Enables optional (left-join) semantics with the given inner column count.
100 ///
101 /// When enabled, outer rows that produce no inner results will be emitted
102 /// with NULL values for the inner columns instead of being dropped.
103 pub fn with_optional(mut self, inner_column_count: usize) -> Self {
104 self.optional = true;
105 self.inner_column_count = inner_column_count;
106 self
107 }
108
109 /// Enables EXISTS mode: semi-join (`keep_matches=true`) or anti-join
110 /// (`keep_matches=false`). Inner columns are NOT appended to the output.
111 pub fn with_exists_mode(mut self, keep_matches: bool) -> Self {
112 self.exists_mode = Some(keep_matches);
113 self
114 }
115
116 /// Extracts all values from a single row of a DataChunk.
117 fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
118 let mut values = Vec::with_capacity(chunk.num_columns());
119 for col_idx in 0..chunk.num_columns() {
120 let val = chunk
121 .column(col_idx)
122 .and_then(|col| col.get_value(row))
123 .unwrap_or(Value::Null);
124 values.push(val);
125 }
126 values
127 }
128
129 /// Builds a DataChunk from accumulated rows.
130 fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
131 if rows.is_empty() {
132 return DataChunk::empty();
133 }
134 let num_cols = rows[0].len();
135 let mut columns: Vec<ValueVector> = (0..num_cols)
136 .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
137 .collect();
138
139 for row in rows {
140 for (col_idx, val) in row.iter().enumerate() {
141 if col_idx < columns.len() {
142 columns[col_idx].push_value(val.clone());
143 }
144 }
145 }
146 DataChunk::new(columns)
147 }
148}
149
150impl Operator for ApplyOperator {
151 fn next(&mut self) -> OperatorResult {
152 loop {
153 match &mut self.state {
154 ApplyState::Init => match self.outer.next()? {
155 Some(chunk) => {
156 self.state = ApplyState::Processing {
157 outer_chunk: chunk,
158 outer_row: 0,
159 output: Vec::new(),
160 };
161 }
162 None => {
163 self.state = ApplyState::Done;
164 return Ok(None);
165 }
166 },
167 ApplyState::Processing {
168 outer_chunk,
169 outer_row,
170 output,
171 } => {
172 let selected: Vec<usize> = outer_chunk.selected_indices().collect();
173 while *outer_row < selected.len() {
174 let row = selected[*outer_row];
175 let outer_values = Self::extract_row(outer_chunk, row);
176
177 // Inject outer values into the inner plan's parameter state
178 if let Some(ref param_state) = self.param_state {
179 let injected: Vec<Value> = self
180 .param_col_indices
181 .iter()
182 .map(|&idx| outer_values.get(idx).cloned().unwrap_or(Value::Null))
183 .collect();
184 param_state.set_values(injected);
185 }
186
187 // Reset and run inner plan for this outer row
188 self.inner.reset();
189
190 // EXISTS mode: check for row existence without appending inner columns
191 if let Some(keep_matches) = self.exists_mode {
192 let has_results = self.inner.next()?.is_some();
193 if has_results == keep_matches {
194 output.push(outer_values);
195 }
196 } else {
197 let pre_len = output.len();
198 while let Some(inner_chunk) = self.inner.next()? {
199 for inner_row in inner_chunk.selected_indices() {
200 let inner_values = Self::extract_row(&inner_chunk, inner_row);
201 let mut combined = outer_values.clone();
202 combined.extend(inner_values);
203 output.push(combined);
204 }
205 }
206
207 // OPTIONAL: emit outer row with NULLs when inner produced nothing
208 if self.optional && output.len() == pre_len {
209 let mut combined = outer_values;
210 combined.extend(std::iter::repeat_n(
211 Value::Null,
212 self.inner_column_count,
213 ));
214 output.push(combined);
215 }
216 }
217
218 *outer_row += 1;
219
220 // Flush when we have enough rows
221 if output.len() >= 1024 {
222 let chunk = Self::build_chunk(output);
223 output.clear();
224 return Ok(Some(chunk));
225 }
226 }
227
228 // Finished this outer chunk; flush any remaining output
229 if !output.is_empty() {
230 let chunk = Self::build_chunk(output);
231 output.clear();
232 self.state = ApplyState::Init;
233 return Ok(Some(chunk));
234 }
235
236 // Move to next outer chunk
237 self.state = ApplyState::Init;
238 }
239 ApplyState::Done => return Ok(None),
240 }
241 }
242 }
243
244 fn reset(&mut self) {
245 self.outer.reset();
246 self.inner.reset();
247 self.state = ApplyState::Init;
248 }
249
250 fn name(&self) -> &'static str {
251 "Apply"
252 }
253}