Skip to main content

grafeo_core/execution/operators/
apply.rs

1//! Apply operator (lateral join / correlated subquery).
2//!
3//! For each row from the outer input, the inner subplan is reset and executed.
4//! Results are the concatenation of outer columns with inner columns.
5//!
6//! This operator is the backend for:
7//! - Cypher: `CALL { subquery }`
8//! - GQL: `VALUE { subquery }`
9//! - Pattern comprehensions (with a Collect aggregate wrapper)
10
11use grafeo_common::types::{LogicalType, Value};
12
13use super::{DataChunk, Operator, OperatorResult};
14use crate::execution::vector::ValueVector;
15
16/// Apply (lateral join) operator.
17///
18/// Evaluates `inner` once for each row of `outer`. The result schema is
19/// `outer_columns ++ inner_columns`. If the inner plan produces zero rows
20/// for a given outer row, that outer row is omitted (inner join semantics).
21pub struct ApplyOperator {
22    outer: Box<dyn Operator>,
23    inner: Box<dyn Operator>,
24    /// Buffered outer rows waiting to be combined with inner results.
25    state: ApplyState,
26}
27
28enum ApplyState {
29    /// Pull next outer chunk, process row-by-row.
30    Init,
31    /// Processing a chunk of outer rows. `outer_chunk` is the current batch,
32    /// `outer_row` is the next row index to process.
33    Processing {
34        outer_chunk: DataChunk,
35        outer_row: usize,
36        /// Accumulated output rows (combined outer + inner).
37        output: Vec<Vec<Value>>,
38    },
39    /// All outer input exhausted.
40    Done,
41}
42
43impl ApplyOperator {
44    /// Creates a new Apply operator.
45    pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
46        Self {
47            outer,
48            inner,
49            state: ApplyState::Init,
50        }
51    }
52
53    /// Extracts all values from a single row of a DataChunk.
54    fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
55        let mut values = Vec::with_capacity(chunk.num_columns());
56        for col_idx in 0..chunk.num_columns() {
57            let val = chunk
58                .column(col_idx)
59                .and_then(|col| col.get_value(row))
60                .unwrap_or(Value::Null);
61            values.push(val);
62        }
63        values
64    }
65
66    /// Builds a DataChunk from accumulated rows.
67    fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
68        if rows.is_empty() {
69            return DataChunk::empty();
70        }
71        let num_cols = rows[0].len();
72        let mut columns: Vec<ValueVector> = (0..num_cols)
73            .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
74            .collect();
75
76        for row in rows {
77            for (col_idx, val) in row.iter().enumerate() {
78                if col_idx < columns.len() {
79                    columns[col_idx].push_value(val.clone());
80                }
81            }
82        }
83        DataChunk::new(columns)
84    }
85}
86
87impl Operator for ApplyOperator {
88    fn next(&mut self) -> OperatorResult {
89        loop {
90            match &mut self.state {
91                ApplyState::Init => match self.outer.next()? {
92                    Some(chunk) => {
93                        self.state = ApplyState::Processing {
94                            outer_chunk: chunk,
95                            outer_row: 0,
96                            output: Vec::new(),
97                        };
98                    }
99                    None => {
100                        self.state = ApplyState::Done;
101                        return Ok(None);
102                    }
103                },
104                ApplyState::Processing {
105                    outer_chunk,
106                    outer_row,
107                    output,
108                } => {
109                    let selected: Vec<usize> = outer_chunk.selected_indices().collect();
110                    while *outer_row < selected.len() {
111                        let row = selected[*outer_row];
112                        let outer_values = Self::extract_row(outer_chunk, row);
113
114                        // Reset and run inner plan for this outer row
115                        self.inner.reset();
116                        while let Some(inner_chunk) = self.inner.next()? {
117                            for inner_row in inner_chunk.selected_indices() {
118                                let inner_values = Self::extract_row(&inner_chunk, inner_row);
119                                let mut combined = outer_values.clone();
120                                combined.extend(inner_values);
121                                output.push(combined);
122                            }
123                        }
124
125                        *outer_row += 1;
126
127                        // Flush when we have enough rows
128                        if output.len() >= 1024 {
129                            let chunk = Self::build_chunk(output);
130                            output.clear();
131                            return Ok(Some(chunk));
132                        }
133                    }
134
135                    // Finished this outer chunk; flush any remaining output
136                    if !output.is_empty() {
137                        let chunk = Self::build_chunk(output);
138                        output.clear();
139                        self.state = ApplyState::Init;
140                        return Ok(Some(chunk));
141                    }
142
143                    // Move to next outer chunk
144                    self.state = ApplyState::Init;
145                }
146                ApplyState::Done => return Ok(None),
147            }
148        }
149    }
150
151    fn reset(&mut self) {
152        self.outer.reset();
153        self.inner.reset();
154        self.state = ApplyState::Init;
155    }
156
157    fn name(&self) -> &'static str {
158        "Apply"
159    }
160}