Skip to main content

grafeo_core/execution/operators/
set_ops.rs

1//! Set operations: EXCEPT, INTERSECT, and OTHERWISE.
2//!
3//! These operators implement the GQL composite query operations for
4//! combining result sets with set semantics.
5
6use std::collections::HashSet;
7
8use grafeo_common::types::{HashableValue, LogicalType, Value};
9
10use super::{DataChunk, Operator, OperatorError, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13/// A hashable row key: one `HashableValue` per column.
14type RowKey = Vec<HashableValue>;
15
16/// Extracts a hashable row key from a `DataChunk`.
17fn row_key(chunk: &DataChunk, row: usize) -> RowKey {
18    let mut key = Vec::with_capacity(chunk.num_columns());
19    for col_idx in 0..chunk.num_columns() {
20        let val = chunk
21            .column(col_idx)
22            .and_then(|col| col.get_value(row))
23            .unwrap_or(Value::Null);
24        key.push(HashableValue(val));
25    }
26    key
27}
28
29/// Extracts the plain Values from a row key (for chunk reconstruction).
30fn row_values(key: &RowKey) -> Vec<Value> {
31    key.iter().map(|hv| hv.0.clone()).collect()
32}
33
34/// Materializes all rows from an operator into a vector of row keys.
35fn materialize(op: &mut dyn Operator) -> Result<Vec<RowKey>, OperatorError> {
36    let mut rows = Vec::new();
37    while let Some(chunk) = op.next()? {
38        for row in chunk.selected_indices() {
39            rows.push(row_key(&chunk, row));
40        }
41    }
42    Ok(rows)
43}
44
45/// Rebuilds a `DataChunk` from a set of row keys.
46fn rows_to_chunk(rows: &[RowKey], schema: &[LogicalType]) -> DataChunk {
47    if rows.is_empty() {
48        return DataChunk::empty();
49    }
50    let mut builder = DataChunkBuilder::new(schema);
51    for row in rows {
52        let values = row_values(row);
53        for (col_idx, val) in values.into_iter().enumerate() {
54            if let Some(col) = builder.column_mut(col_idx) {
55                col.push_value(val);
56            }
57        }
58        builder.advance_row();
59    }
60    builder.finish()
61}
62
63/// EXCEPT operator: rows in left that are not in right.
64pub struct ExceptOperator {
65    left: Box<dyn Operator>,
66    right: Box<dyn Operator>,
67    all: bool,
68    output_schema: Vec<LogicalType>,
69    result: Option<Vec<RowKey>>,
70    position: usize,
71}
72
73impl ExceptOperator {
74    /// Creates a new EXCEPT operator.
75    pub fn new(
76        left: Box<dyn Operator>,
77        right: Box<dyn Operator>,
78        all: bool,
79        output_schema: Vec<LogicalType>,
80    ) -> Self {
81        Self {
82            left,
83            right,
84            all,
85            output_schema,
86            result: None,
87            position: 0,
88        }
89    }
90
91    fn compute(&mut self) -> Result<(), OperatorError> {
92        let left_rows = materialize(self.left.as_mut())?;
93        let right_rows = materialize(self.right.as_mut())?;
94
95        if self.all {
96            // EXCEPT ALL: for each right row, remove one matching left row
97            let mut result = left_rows;
98            for right_row in &right_rows {
99                if let Some(pos) = result.iter().position(|r| r == right_row) {
100                    result.remove(pos);
101                }
102            }
103            self.result = Some(result);
104        } else {
105            // EXCEPT DISTINCT: remove all matching rows
106            let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
107            let mut seen = HashSet::new();
108            let result: Vec<RowKey> = left_rows
109                .into_iter()
110                .filter(|row| !right_set.contains(row) && seen.insert(row.clone()))
111                .collect();
112            self.result = Some(result);
113        }
114        Ok(())
115    }
116}
117
118impl Operator for ExceptOperator {
119    fn next(&mut self) -> OperatorResult {
120        if self.result.is_none() {
121            self.compute()?;
122        }
123        let rows = self
124            .result
125            .as_ref()
126            .expect("result is Some: compute() called above");
127        if self.position >= rows.len() {
128            return Ok(None);
129        }
130        // Emit up to 1024 rows per chunk
131        let end = (self.position + 1024).min(rows.len());
132        let batch = &rows[self.position..end];
133        self.position = end;
134        if batch.is_empty() {
135            Ok(None)
136        } else {
137            Ok(Some(rows_to_chunk(batch, &self.output_schema)))
138        }
139    }
140
141    fn reset(&mut self) {
142        self.left.reset();
143        self.right.reset();
144        self.result = None;
145        self.position = 0;
146    }
147
148    fn name(&self) -> &'static str {
149        "Except"
150    }
151
152    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
153        self
154    }
155}
156
157/// INTERSECT operator: rows common to both inputs.
158pub struct IntersectOperator {
159    left: Box<dyn Operator>,
160    right: Box<dyn Operator>,
161    all: bool,
162    output_schema: Vec<LogicalType>,
163    result: Option<Vec<RowKey>>,
164    position: usize,
165}
166
167impl IntersectOperator {
168    /// Creates a new INTERSECT operator.
169    pub fn new(
170        left: Box<dyn Operator>,
171        right: Box<dyn Operator>,
172        all: bool,
173        output_schema: Vec<LogicalType>,
174    ) -> Self {
175        Self {
176            left,
177            right,
178            all,
179            output_schema,
180            result: None,
181            position: 0,
182        }
183    }
184
185    fn compute(&mut self) -> Result<(), OperatorError> {
186        let left_rows = materialize(self.left.as_mut())?;
187        let right_rows = materialize(self.right.as_mut())?;
188
189        if self.all {
190            // INTERSECT ALL: each right row matches at most one left row
191            let mut remaining_right = right_rows;
192            let mut result = Vec::new();
193            for left_row in &left_rows {
194                if let Some(pos) = remaining_right.iter().position(|r| r == left_row) {
195                    result.push(left_row.clone());
196                    remaining_right.remove(pos);
197                }
198            }
199            self.result = Some(result);
200        } else {
201            // INTERSECT DISTINCT: rows present in both, deduplicated
202            let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
203            let mut seen = HashSet::new();
204            let result: Vec<RowKey> = left_rows
205                .into_iter()
206                .filter(|row| right_set.contains(row) && seen.insert(row.clone()))
207                .collect();
208            self.result = Some(result);
209        }
210        Ok(())
211    }
212}
213
214impl Operator for IntersectOperator {
215    fn next(&mut self) -> OperatorResult {
216        if self.result.is_none() {
217            self.compute()?;
218        }
219        let rows = self
220            .result
221            .as_ref()
222            .expect("result is Some: compute() called above");
223        if self.position >= rows.len() {
224            return Ok(None);
225        }
226        let end = (self.position + 1024).min(rows.len());
227        let batch = &rows[self.position..end];
228        self.position = end;
229        if batch.is_empty() {
230            Ok(None)
231        } else {
232            Ok(Some(rows_to_chunk(batch, &self.output_schema)))
233        }
234    }
235
236    fn reset(&mut self) {
237        self.left.reset();
238        self.right.reset();
239        self.result = None;
240        self.position = 0;
241    }
242
243    fn name(&self) -> &'static str {
244        "Intersect"
245    }
246
247    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
248        self
249    }
250}
251
252/// OTHERWISE operator: use left result if non-empty, otherwise use right.
253pub struct OtherwiseOperator {
254    left: Box<dyn Operator>,
255    right: Box<dyn Operator>,
256    /// Which input we are currently streaming from.
257    state: OtherwiseState,
258}
259
260enum OtherwiseState {
261    /// Haven't started yet, need to probe left.
262    Init,
263    /// Left produced rows: buffer first chunk, then stream rest of left.
264    StreamingLeft(Option<DataChunk>),
265    /// Left was empty: stream right.
266    StreamingRight,
267    /// Done.
268    Done,
269}
270
271impl OtherwiseOperator {
272    /// Creates a new OTHERWISE operator.
273    pub fn new(left: Box<dyn Operator>, right: Box<dyn Operator>) -> Self {
274        Self {
275            left,
276            right,
277            state: OtherwiseState::Init,
278        }
279    }
280}
281
282impl Operator for OtherwiseOperator {
283    fn next(&mut self) -> OperatorResult {
284        loop {
285            match &mut self.state {
286                OtherwiseState::Init => {
287                    // Probe left for first chunk
288                    if let Some(chunk) = self.left.next()? {
289                        self.state = OtherwiseState::StreamingLeft(Some(chunk));
290                    } else {
291                        // Left is empty, switch to right
292                        self.state = OtherwiseState::StreamingRight;
293                    }
294                }
295                OtherwiseState::StreamingLeft(buffered) => {
296                    if let Some(chunk) = buffered.take() {
297                        return Ok(Some(chunk));
298                    }
299                    // Continue streaming from left
300                    match self.left.next()? {
301                        Some(chunk) => return Ok(Some(chunk)),
302                        None => {
303                            self.state = OtherwiseState::Done;
304                            return Ok(None);
305                        }
306                    }
307                }
308                OtherwiseState::StreamingRight => match self.right.next()? {
309                    Some(chunk) => return Ok(Some(chunk)),
310                    None => {
311                        self.state = OtherwiseState::Done;
312                        return Ok(None);
313                    }
314                },
315                OtherwiseState::Done => return Ok(None),
316            }
317        }
318    }
319
320    fn reset(&mut self) {
321        self.left.reset();
322        self.right.reset();
323        self.state = OtherwiseState::Init;
324    }
325
326    fn name(&self) -> &'static str {
327        "Otherwise"
328    }
329
330    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
331        self
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::execution::chunk::DataChunkBuilder;
339
340    struct MockOperator {
341        chunks: Vec<DataChunk>,
342        position: usize,
343    }
344
345    impl MockOperator {
346        fn new(chunks: Vec<DataChunk>) -> Self {
347            Self {
348                chunks,
349                position: 0,
350            }
351        }
352    }
353
354    impl Operator for MockOperator {
355        fn next(&mut self) -> OperatorResult {
356            if self.position < self.chunks.len() {
357                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
358                self.position += 1;
359                Ok(Some(chunk))
360            } else {
361                Ok(None)
362            }
363        }
364
365        fn reset(&mut self) {
366            self.position = 0;
367        }
368
369        fn name(&self) -> &'static str {
370            "Mock"
371        }
372
373        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
374            self
375        }
376    }
377
378    fn create_int_chunk(values: &[i64]) -> DataChunk {
379        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
380        for &v in values {
381            builder.column_mut(0).unwrap().push_int64(v);
382            builder.advance_row();
383        }
384        builder.finish()
385    }
386
387    fn collect_ints(op: &mut dyn Operator) -> Vec<i64> {
388        let mut result = Vec::new();
389        while let Some(chunk) = op.next().unwrap() {
390            for row in chunk.selected_indices() {
391                if let Some(v) = chunk.column(0).and_then(|c| c.get_int64(row)) {
392                    result.push(v);
393                }
394            }
395        }
396        result
397    }
398
399    #[test]
400    fn test_except_distinct() {
401        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 2])]);
402        let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
403        let mut op = ExceptOperator::new(
404            Box::new(left),
405            Box::new(right),
406            false,
407            vec![LogicalType::Int64],
408        );
409
410        let mut result = collect_ints(&mut op);
411        result.sort_unstable();
412        assert_eq!(result, vec![1, 3]);
413    }
414
415    #[test]
416    fn test_except_all() {
417        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 2, 3])]);
418        let right = MockOperator::new(vec![create_int_chunk(&[2])]);
419        let mut op = ExceptOperator::new(
420            Box::new(left),
421            Box::new(right),
422            true,
423            vec![LogicalType::Int64],
424        );
425
426        let mut result = collect_ints(&mut op);
427        result.sort_unstable();
428        // EXCEPT ALL removes one occurrence of 2
429        assert_eq!(result, vec![1, 2, 3]);
430    }
431
432    #[test]
433    fn test_except_empty_right() {
434        let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
435        let right = MockOperator::new(vec![]);
436        let mut op = ExceptOperator::new(
437            Box::new(left),
438            Box::new(right),
439            false,
440            vec![LogicalType::Int64],
441        );
442
443        let mut result = collect_ints(&mut op);
444        result.sort_unstable();
445        assert_eq!(result, vec![1, 2]);
446    }
447
448    #[test]
449    fn test_intersect_distinct() {
450        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 2])]);
451        let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4])]);
452        let mut op = IntersectOperator::new(
453            Box::new(left),
454            Box::new(right),
455            false,
456            vec![LogicalType::Int64],
457        );
458
459        let mut result = collect_ints(&mut op);
460        result.sort_unstable();
461        assert_eq!(result, vec![2, 3]);
462    }
463
464    #[test]
465    fn test_intersect_all() {
466        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 2, 3])]);
467        let right = MockOperator::new(vec![create_int_chunk(&[2, 2, 4])]);
468        let mut op = IntersectOperator::new(
469            Box::new(left),
470            Box::new(right),
471            true,
472            vec![LogicalType::Int64],
473        );
474
475        let mut result = collect_ints(&mut op);
476        result.sort_unstable();
477        assert_eq!(result, vec![2, 2]);
478    }
479
480    #[test]
481    fn test_intersect_no_overlap() {
482        let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
483        let right = MockOperator::new(vec![create_int_chunk(&[3, 4])]);
484        let mut op = IntersectOperator::new(
485            Box::new(left),
486            Box::new(right),
487            false,
488            vec![LogicalType::Int64],
489        );
490
491        let result = collect_ints(&mut op);
492        assert!(result.is_empty());
493    }
494
495    #[test]
496    fn test_otherwise_left_nonempty() {
497        let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
498        let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
499        let mut op = OtherwiseOperator::new(Box::new(left), Box::new(right));
500
501        let result = collect_ints(&mut op);
502        assert_eq!(result, vec![1, 2]);
503    }
504
505    #[test]
506    fn test_otherwise_left_empty() {
507        let left = MockOperator::new(vec![]);
508        let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
509        let mut op = OtherwiseOperator::new(Box::new(left), Box::new(right));
510
511        let result = collect_ints(&mut op);
512        assert_eq!(result, vec![10, 20]);
513    }
514
515    #[test]
516    fn test_otherwise_both_empty() {
517        let left = MockOperator::new(vec![]);
518        let right = MockOperator::new(vec![]);
519        let mut op = OtherwiseOperator::new(Box::new(left), Box::new(right));
520
521        let result = collect_ints(&mut op);
522        assert!(result.is_empty());
523    }
524
525    #[test]
526    fn test_operator_names() {
527        let empty = || MockOperator::new(vec![]);
528
529        let op = ExceptOperator::new(Box::new(empty()), Box::new(empty()), false, vec![]);
530        assert_eq!(op.name(), "Except");
531
532        let op = IntersectOperator::new(Box::new(empty()), Box::new(empty()), false, vec![]);
533        assert_eq!(op.name(), "Intersect");
534
535        let op = OtherwiseOperator::new(Box::new(empty()), Box::new(empty()));
536        assert_eq!(op.name(), "Otherwise");
537    }
538
539    #[test]
540    fn test_into_any() {
541        let empty = || MockOperator::new(vec![]);
542
543        let op: Box<dyn Operator> = Box::new(ExceptOperator::new(
544            Box::new(empty()),
545            Box::new(empty()),
546            false,
547            vec![],
548        ));
549        assert!(op.into_any().downcast::<ExceptOperator>().is_ok());
550
551        let op: Box<dyn Operator> = Box::new(IntersectOperator::new(
552            Box::new(empty()),
553            Box::new(empty()),
554            false,
555            vec![],
556        ));
557        assert!(op.into_any().downcast::<IntersectOperator>().is_ok());
558
559        let op: Box<dyn Operator> =
560            Box::new(OtherwiseOperator::new(Box::new(empty()), Box::new(empty())));
561        assert!(op.into_any().downcast::<OtherwiseOperator>().is_ok());
562    }
563}