grafeo_core/execution/operators/apply.rs
1//! Apply operator (lateral join / correlated subquery).
2//!
3//! For each row from the outer input, the inner subplan is reset and executed.
4//! Results are the concatenation of outer columns with inner columns.
5//!
6//! This operator is the backend for:
7//! - Cypher: `CALL { subquery }`
8//! - GQL: `VALUE { subquery }`
9//! - Pattern comprehensions (with a Collect aggregate wrapper)
10
11use std::sync::Arc;
12
13use grafeo_common::types::{LogicalType, Value};
14
15use super::parameter_scan::ParameterState;
16use super::{DataChunk, Operator, OperatorResult};
17use crate::execution::vector::ValueVector;
18
19/// Apply (lateral join) operator.
20///
21/// Evaluates `inner` once for each row of `outer`. The result schema is
22/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
23/// for a given outer row, that outer row is omitted (inner join semantics).
24///
25/// When `param_state` is set, outer row values for the specified column indices
26/// are injected into the shared [`ParameterState`] before each inner execution,
27/// allowing the inner plan's [`ParameterScanOperator`](super::ParameterScanOperator) to read them.
28pub struct ApplyOperator {
29 outer: Box<dyn Operator>,
30 inner: Box<dyn Operator>,
31 /// Shared parameter state for correlated subqueries.
32 param_state: Option<Arc<ParameterState>>,
33 /// Indices of outer columns to inject into the inner plan.
34 param_col_indices: Vec<usize>,
35 /// Buffered outer rows waiting to be combined with inner results.
36 state: ApplyState,
37}
38
39enum ApplyState {
40 /// Pull next outer chunk, process row-by-row.
41 Init,
42 /// Processing a chunk of outer rows. `outer_chunk` is the current batch,
43 /// `outer_row` is the next row index to process.
44 Processing {
45 outer_chunk: DataChunk,
46 outer_row: usize,
47 /// Accumulated output rows (combined outer + inner).
48 output: Vec<Vec<Value>>,
49 },
50 /// All outer input exhausted.
51 Done,
52}
53
54impl ApplyOperator {
55 /// Creates a new Apply operator (uncorrelated: no parameter injection).
56 pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
57 Self {
58 outer,
59 inner,
60 param_state: None,
61 param_col_indices: Vec::new(),
62 state: ApplyState::Init,
63 }
64 }
65
66 /// Creates a correlated Apply operator that injects outer row values.
67 ///
68 /// `param_state` is shared with a [`ParameterScanOperator`](super::ParameterScanOperator) in the inner plan.
69 /// `param_col_indices` specifies which outer columns to inject (by index).
70 pub fn new_correlated(
71 outer: Box<dyn Operator>,
72 inner: Box<dyn Operator>,
73 param_state: Arc<ParameterState>,
74 param_col_indices: Vec<usize>,
75 ) -> Self {
76 Self {
77 outer,
78 inner,
79 param_state: Some(param_state),
80 param_col_indices,
81 state: ApplyState::Init,
82 }
83 }
84
85 /// Extracts all values from a single row of a DataChunk.
86 fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
87 let mut values = Vec::with_capacity(chunk.num_columns());
88 for col_idx in 0..chunk.num_columns() {
89 let val = chunk
90 .column(col_idx)
91 .and_then(|col| col.get_value(row))
92 .unwrap_or(Value::Null);
93 values.push(val);
94 }
95 values
96 }
97
98 /// Builds a DataChunk from accumulated rows.
99 fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
100 if rows.is_empty() {
101 return DataChunk::empty();
102 }
103 let num_cols = rows[0].len();
104 let mut columns: Vec<ValueVector> = (0..num_cols)
105 .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
106 .collect();
107
108 for row in rows {
109 for (col_idx, val) in row.iter().enumerate() {
110 if col_idx < columns.len() {
111 columns[col_idx].push_value(val.clone());
112 }
113 }
114 }
115 DataChunk::new(columns)
116 }
117}
118
119impl Operator for ApplyOperator {
120 fn next(&mut self) -> OperatorResult {
121 loop {
122 match &mut self.state {
123 ApplyState::Init => match self.outer.next()? {
124 Some(chunk) => {
125 self.state = ApplyState::Processing {
126 outer_chunk: chunk,
127 outer_row: 0,
128 output: Vec::new(),
129 };
130 }
131 None => {
132 self.state = ApplyState::Done;
133 return Ok(None);
134 }
135 },
136 ApplyState::Processing {
137 outer_chunk,
138 outer_row,
139 output,
140 } => {
141 let selected: Vec<usize> = outer_chunk.selected_indices().collect();
142 while *outer_row < selected.len() {
143 let row = selected[*outer_row];
144 let outer_values = Self::extract_row(outer_chunk, row);
145
146 // Inject outer values into the inner plan's parameter state
147 if let Some(ref param_state) = self.param_state {
148 let injected: Vec<Value> = self
149 .param_col_indices
150 .iter()
151 .map(|&idx| outer_values.get(idx).cloned().unwrap_or(Value::Null))
152 .collect();
153 param_state.set_values(injected);
154 }
155
156 // Reset and run inner plan for this outer row
157 self.inner.reset();
158 while let Some(inner_chunk) = self.inner.next()? {
159 for inner_row in inner_chunk.selected_indices() {
160 let inner_values = Self::extract_row(&inner_chunk, inner_row);
161 let mut combined = outer_values.clone();
162 combined.extend(inner_values);
163 output.push(combined);
164 }
165 }
166
167 *outer_row += 1;
168
169 // Flush when we have enough rows
170 if output.len() >= 1024 {
171 let chunk = Self::build_chunk(output);
172 output.clear();
173 return Ok(Some(chunk));
174 }
175 }
176
177 // Finished this outer chunk; flush any remaining output
178 if !output.is_empty() {
179 let chunk = Self::build_chunk(output);
180 output.clear();
181 self.state = ApplyState::Init;
182 return Ok(Some(chunk));
183 }
184
185 // Move to next outer chunk
186 self.state = ApplyState::Init;
187 }
188 ApplyState::Done => return Ok(None),
189 }
190 }
191 }
192
193 fn reset(&mut self) {
194 self.outer.reset();
195 self.inner.reset();
196 self.state = ApplyState::Init;
197 }
198
199 fn name(&self) -> &'static str {
200 "Apply"
201 }
202}