Skip to main content

grafeo_core/execution/operators/
set_ops.rs

1//! Set operations: EXCEPT, INTERSECT, and OTHERWISE.
2//!
3//! These operators implement the GQL composite query operations for
4//! combining result sets with set semantics.
5
6use std::collections::HashSet;
7
8use grafeo_common::types::{HashableValue, LogicalType, Value};
9
10use super::{DataChunk, Operator, OperatorError, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13/// A hashable row key: one `HashableValue` per column.
14type RowKey = Vec<HashableValue>;
15
16/// Extracts a hashable row key from a `DataChunk`.
17fn row_key(chunk: &DataChunk, row: usize) -> RowKey {
18    let mut key = Vec::with_capacity(chunk.num_columns());
19    for col_idx in 0..chunk.num_columns() {
20        let val = chunk
21            .column(col_idx)
22            .and_then(|col| col.get_value(row))
23            .unwrap_or(Value::Null);
24        key.push(HashableValue(val));
25    }
26    key
27}
28
29/// Extracts the plain Values from a row key (for chunk reconstruction).
30fn row_values(key: &RowKey) -> Vec<Value> {
31    key.iter().map(|hv| hv.0.clone()).collect()
32}
33
34/// Materializes all rows from an operator into a vector of row keys.
35fn materialize(op: &mut dyn Operator) -> Result<Vec<RowKey>, OperatorError> {
36    let mut rows = Vec::new();
37    while let Some(chunk) = op.next()? {
38        for row in chunk.selected_indices() {
39            rows.push(row_key(&chunk, row));
40        }
41    }
42    Ok(rows)
43}
44
45/// Rebuilds a `DataChunk` from a set of row keys.
46fn rows_to_chunk(rows: &[RowKey], schema: &[LogicalType]) -> DataChunk {
47    if rows.is_empty() {
48        return DataChunk::empty();
49    }
50    let mut builder = DataChunkBuilder::new(schema);
51    for row in rows {
52        let values = row_values(row);
53        for (col_idx, val) in values.into_iter().enumerate() {
54            if let Some(col) = builder.column_mut(col_idx) {
55                col.push_value(val);
56            }
57        }
58        builder.advance_row();
59    }
60    builder.finish()
61}
62
63/// EXCEPT operator: rows in left that are not in right.
64pub struct ExceptOperator {
65    left: Box<dyn Operator>,
66    right: Box<dyn Operator>,
67    all: bool,
68    output_schema: Vec<LogicalType>,
69    result: Option<Vec<RowKey>>,
70    position: usize,
71}
72
73impl ExceptOperator {
74    /// Creates a new EXCEPT operator.
75    pub fn new(
76        left: Box<dyn Operator>,
77        right: Box<dyn Operator>,
78        all: bool,
79        output_schema: Vec<LogicalType>,
80    ) -> Self {
81        Self {
82            left,
83            right,
84            all,
85            output_schema,
86            result: None,
87            position: 0,
88        }
89    }
90
91    fn compute(&mut self) -> Result<(), OperatorError> {
92        let left_rows = materialize(self.left.as_mut())?;
93        let right_rows = materialize(self.right.as_mut())?;
94
95        if self.all {
96            // EXCEPT ALL: for each right row, remove one matching left row
97            let mut result = left_rows;
98            for right_row in &right_rows {
99                if let Some(pos) = result.iter().position(|r| r == right_row) {
100                    result.remove(pos);
101                }
102            }
103            self.result = Some(result);
104        } else {
105            // EXCEPT DISTINCT: remove all matching rows
106            let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
107            let mut seen = HashSet::new();
108            let result: Vec<RowKey> = left_rows
109                .into_iter()
110                .filter(|row| !right_set.contains(row) && seen.insert(row.clone()))
111                .collect();
112            self.result = Some(result);
113        }
114        Ok(())
115    }
116}
117
118impl Operator for ExceptOperator {
119    fn next(&mut self) -> OperatorResult {
120        if self.result.is_none() {
121            self.compute()?;
122        }
123        let rows = self
124            .result
125            .as_ref()
126            .expect("result is Some: compute() called above");
127        if self.position >= rows.len() {
128            return Ok(None);
129        }
130        // Emit up to 1024 rows per chunk
131        let end = (self.position + 1024).min(rows.len());
132        let batch = &rows[self.position..end];
133        self.position = end;
134        if batch.is_empty() {
135            Ok(None)
136        } else {
137            Ok(Some(rows_to_chunk(batch, &self.output_schema)))
138        }
139    }
140
141    fn reset(&mut self) {
142        self.left.reset();
143        self.right.reset();
144        self.result = None;
145        self.position = 0;
146    }
147
148    fn name(&self) -> &'static str {
149        "Except"
150    }
151}
152
153/// INTERSECT operator: rows common to both inputs.
154pub struct IntersectOperator {
155    left: Box<dyn Operator>,
156    right: Box<dyn Operator>,
157    all: bool,
158    output_schema: Vec<LogicalType>,
159    result: Option<Vec<RowKey>>,
160    position: usize,
161}
162
163impl IntersectOperator {
164    /// Creates a new INTERSECT operator.
165    pub fn new(
166        left: Box<dyn Operator>,
167        right: Box<dyn Operator>,
168        all: bool,
169        output_schema: Vec<LogicalType>,
170    ) -> Self {
171        Self {
172            left,
173            right,
174            all,
175            output_schema,
176            result: None,
177            position: 0,
178        }
179    }
180
181    fn compute(&mut self) -> Result<(), OperatorError> {
182        let left_rows = materialize(self.left.as_mut())?;
183        let right_rows = materialize(self.right.as_mut())?;
184
185        if self.all {
186            // INTERSECT ALL: each right row matches at most one left row
187            let mut remaining_right = right_rows;
188            let mut result = Vec::new();
189            for left_row in &left_rows {
190                if let Some(pos) = remaining_right.iter().position(|r| r == left_row) {
191                    result.push(left_row.clone());
192                    remaining_right.remove(pos);
193                }
194            }
195            self.result = Some(result);
196        } else {
197            // INTERSECT DISTINCT: rows present in both, deduplicated
198            let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
199            let mut seen = HashSet::new();
200            let result: Vec<RowKey> = left_rows
201                .into_iter()
202                .filter(|row| right_set.contains(row) && seen.insert(row.clone()))
203                .collect();
204            self.result = Some(result);
205        }
206        Ok(())
207    }
208}
209
210impl Operator for IntersectOperator {
211    fn next(&mut self) -> OperatorResult {
212        if self.result.is_none() {
213            self.compute()?;
214        }
215        let rows = self
216            .result
217            .as_ref()
218            .expect("result is Some: compute() called above");
219        if self.position >= rows.len() {
220            return Ok(None);
221        }
222        let end = (self.position + 1024).min(rows.len());
223        let batch = &rows[self.position..end];
224        self.position = end;
225        if batch.is_empty() {
226            Ok(None)
227        } else {
228            Ok(Some(rows_to_chunk(batch, &self.output_schema)))
229        }
230    }
231
232    fn reset(&mut self) {
233        self.left.reset();
234        self.right.reset();
235        self.result = None;
236        self.position = 0;
237    }
238
239    fn name(&self) -> &'static str {
240        "Intersect"
241    }
242}
243
244/// OTHERWISE operator: use left result if non-empty, otherwise use right.
245pub struct OtherwiseOperator {
246    left: Box<dyn Operator>,
247    right: Box<dyn Operator>,
248    /// Which input we are currently streaming from.
249    state: OtherwiseState,
250}
251
252enum OtherwiseState {
253    /// Haven't started yet, need to probe left.
254    Init,
255    /// Left produced rows: buffer first chunk, then stream rest of left.
256    StreamingLeft(Option<DataChunk>),
257    /// Left was empty: stream right.
258    StreamingRight,
259    /// Done.
260    Done,
261}
262
263impl OtherwiseOperator {
264    /// Creates a new OTHERWISE operator.
265    pub fn new(left: Box<dyn Operator>, right: Box<dyn Operator>) -> Self {
266        Self {
267            left,
268            right,
269            state: OtherwiseState::Init,
270        }
271    }
272}
273
274impl Operator for OtherwiseOperator {
275    fn next(&mut self) -> OperatorResult {
276        loop {
277            match &mut self.state {
278                OtherwiseState::Init => {
279                    // Probe left for first chunk
280                    if let Some(chunk) = self.left.next()? {
281                        self.state = OtherwiseState::StreamingLeft(Some(chunk));
282                    } else {
283                        // Left is empty, switch to right
284                        self.state = OtherwiseState::StreamingRight;
285                    }
286                }
287                OtherwiseState::StreamingLeft(buffered) => {
288                    if let Some(chunk) = buffered.take() {
289                        return Ok(Some(chunk));
290                    }
291                    // Continue streaming from left
292                    match self.left.next()? {
293                        Some(chunk) => return Ok(Some(chunk)),
294                        None => {
295                            self.state = OtherwiseState::Done;
296                            return Ok(None);
297                        }
298                    }
299                }
300                OtherwiseState::StreamingRight => match self.right.next()? {
301                    Some(chunk) => return Ok(Some(chunk)),
302                    None => {
303                        self.state = OtherwiseState::Done;
304                        return Ok(None);
305                    }
306                },
307                OtherwiseState::Done => return Ok(None),
308            }
309        }
310    }
311
312    fn reset(&mut self) {
313        self.left.reset();
314        self.right.reset();
315        self.state = OtherwiseState::Init;
316    }
317
318    fn name(&self) -> &'static str {
319        "Otherwise"
320    }
321}