1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
//! Apply operator (lateral join / correlated subquery).
//!
//! For each row from the outer input, the inner subplan is reset and executed.
//! Results are the concatenation of outer columns with inner columns.
//!
//! This operator is the backend for:
//! - Cypher: `CALL { subquery }`
//! - GQL: `VALUE { subquery }`
//! - Pattern comprehensions (with a Collect aggregate wrapper)
use std::sync::Arc;
use grafeo_common::types::{LogicalType, Value};
use super::parameter_scan::ParameterState;
use super::{DataChunk, Operator, OperatorResult};
use crate::execution::vector::ValueVector;
/// Apply (lateral join) operator.
///
/// Evaluates `inner` once for each row of `outer`. The result schema is
/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
/// for a given outer row, that outer row is omitted (inner join semantics).
///
/// When `param_state` is set, outer row values for the specified column indices
/// are injected into the shared [`ParameterState`] before each inner execution,
/// allowing the inner plan's [`ParameterScanOperator`](super::ParameterScanOperator) to read them.
pub struct ApplyOperator {
outer: Box<dyn Operator>,
inner: Box<dyn Operator>,
/// Shared parameter state for correlated subqueries.
param_state: Option<Arc<ParameterState>>,
/// Indices of outer columns to inject into the inner plan.
param_col_indices: Vec<usize>,
/// When true, outer rows with no inner results emit NULLs (left-join).
optional: bool,
/// Number of columns the inner plan produces (needed for NULL-padding).
inner_column_count: usize,
/// EXISTS mode: Some(true) = semi-join (keep if inner has rows),
/// Some(false) = anti-join (keep if inner has NO rows).
/// Inner columns are NOT appended in EXISTS mode.
exists_mode: Option<bool>,
/// EXISTS flag mode: instead of filtering rows, append a boolean column
/// indicating whether the inner plan produced results. All outer rows
/// are preserved. Used for EXISTS inside OR predicates.
exists_flag: bool,
/// Buffered outer rows waiting to be combined with inner results.
state: ApplyState,
}
enum ApplyState {
/// Pull next outer chunk, process row-by-row.
Init,
/// Processing a chunk of outer rows. `outer_chunk` is the current batch,
/// `outer_row` is the next row index to process.
Processing {
outer_chunk: DataChunk,
outer_row: usize,
/// Accumulated output rows (combined outer + inner).
output: Vec<Vec<Value>>,
},
/// All outer input exhausted.
Done,
}
impl ApplyOperator {
/// Creates a new Apply operator (uncorrelated: no parameter injection).
pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
Self {
outer,
inner,
param_state: None,
param_col_indices: Vec::new(),
optional: false,
inner_column_count: 0,
exists_mode: None,
exists_flag: false,
state: ApplyState::Init,
}
}
/// Creates a correlated Apply operator that injects outer row values.
///
/// `param_state` is shared with a [`ParameterScanOperator`](super::ParameterScanOperator) in the inner plan.
/// `param_col_indices` specifies which outer columns to inject (by index).
pub fn new_correlated(
outer: Box<dyn Operator>,
inner: Box<dyn Operator>,
param_state: Arc<ParameterState>,
param_col_indices: Vec<usize>,
) -> Self {
Self {
outer,
inner,
param_state: Some(param_state),
param_col_indices,
optional: false,
inner_column_count: 0,
exists_mode: None,
exists_flag: false,
state: ApplyState::Init,
}
}
/// Enables optional (left-join) semantics with the given inner column count.
///
/// When enabled, outer rows that produce no inner results will be emitted
/// with NULL values for the inner columns instead of being dropped.
pub fn with_optional(mut self, inner_column_count: usize) -> Self {
self.optional = true;
self.inner_column_count = inner_column_count;
self
}
/// Enables EXISTS mode: semi-join (`keep_matches=true`) or anti-join
/// (`keep_matches=false`). Inner columns are NOT appended to the output.
pub fn with_exists_mode(mut self, keep_matches: bool) -> Self {
self.exists_mode = Some(keep_matches);
self
}
/// Enables EXISTS flag mode: instead of filtering, appends a boolean
/// column indicating whether the inner plan produced results. All outer
/// rows are preserved. Used for EXISTS inside OR predicates where
/// semi-join filtering would be incorrect.
pub fn with_exists_flag(mut self) -> Self {
self.exists_flag = true;
self
}
/// Extracts all values from a single row of a DataChunk.
fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
let mut values = Vec::with_capacity(chunk.num_columns());
for col_idx in 0..chunk.num_columns() {
let val = chunk
.column(col_idx)
.and_then(|col| col.get_value(row))
.unwrap_or(Value::Null);
values.push(val);
}
values
}
/// Builds a DataChunk from accumulated rows.
fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
if rows.is_empty() {
return DataChunk::empty();
}
let num_cols = rows[0].len();
let mut columns: Vec<ValueVector> = (0..num_cols)
.map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
.collect();
for row in rows {
for (col_idx, val) in row.iter().enumerate() {
if col_idx < columns.len() {
columns[col_idx].push_value(val.clone());
}
}
}
DataChunk::new(columns)
}
}
impl Operator for ApplyOperator {
fn next(&mut self) -> OperatorResult {
loop {
match &mut self.state {
ApplyState::Init => match self.outer.next()? {
Some(chunk) => {
self.state = ApplyState::Processing {
outer_chunk: chunk,
outer_row: 0,
output: Vec::new(),
};
}
None => {
self.state = ApplyState::Done;
return Ok(None);
}
},
ApplyState::Processing {
outer_chunk,
outer_row,
output,
} => {
let selected: Vec<usize> = outer_chunk.selected_indices().collect();
while *outer_row < selected.len() {
let row = selected[*outer_row];
let outer_values = Self::extract_row(outer_chunk, row);
// Inject outer values into the inner plan's parameter state
if let Some(ref param_state) = self.param_state {
let injected: Vec<Value> = self
.param_col_indices
.iter()
.map(|&idx| outer_values.get(idx).cloned().unwrap_or(Value::Null))
.collect();
param_state.set_values(injected);
}
// Reset and run inner plan for this outer row
self.inner.reset();
// EXISTS flag mode: append boolean column, keep all rows
if self.exists_flag {
let has_results = self.inner.next()?.is_some();
let mut combined = outer_values;
combined.push(Value::Bool(has_results));
output.push(combined);
}
// EXISTS mode: check for row existence without appending inner columns
else if let Some(keep_matches) = self.exists_mode {
let has_results = self.inner.next()?.is_some();
if has_results == keep_matches {
output.push(outer_values);
}
} else {
let pre_len = output.len();
while let Some(inner_chunk) = self.inner.next()? {
for inner_row in inner_chunk.selected_indices() {
let inner_values = Self::extract_row(&inner_chunk, inner_row);
let mut combined = outer_values.clone();
combined.extend(inner_values);
output.push(combined);
}
}
// OPTIONAL: emit outer row with NULLs when inner produced nothing
if self.optional && output.len() == pre_len {
let mut combined = outer_values;
combined.extend(std::iter::repeat_n(
Value::Null,
self.inner_column_count,
));
output.push(combined);
}
}
*outer_row += 1;
// Flush when we have enough rows
if output.len() >= 1024 {
let chunk = Self::build_chunk(output);
output.clear();
return Ok(Some(chunk));
}
}
// Finished this outer chunk; flush any remaining output
if !output.is_empty() {
let chunk = Self::build_chunk(output);
output.clear();
self.state = ApplyState::Init;
return Ok(Some(chunk));
}
// Move to next outer chunk
self.state = ApplyState::Init;
}
ApplyState::Done => return Ok(None),
}
}
}
fn reset(&mut self) {
self.outer.reset();
self.inner.reset();
self.state = ApplyState::Init;
}
fn name(&self) -> &'static str {
"Apply"
}
}