Skip to main content

grafeo_core/execution/operators/
set_ops.rs

1//! Set operations: EXCEPT, INTERSECT, and OTHERWISE.
2//!
3//! These operators implement the GQL composite query operations for
4//! combining result sets with set semantics.
5
6use std::collections::HashSet;
7
8use grafeo_common::types::{HashableValue, LogicalType, Value};
9
10use super::{DataChunk, Operator, OperatorError, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13/// A hashable row key: one `HashableValue` per column.
14type RowKey = Vec<HashableValue>;
15
16/// Extracts a hashable row key from a `DataChunk`.
17fn row_key(chunk: &DataChunk, row: usize) -> RowKey {
18    let mut key = Vec::with_capacity(chunk.num_columns());
19    for col_idx in 0..chunk.num_columns() {
20        let val = chunk
21            .column(col_idx)
22            .and_then(|col| col.get_value(row))
23            .unwrap_or(Value::Null);
24        key.push(HashableValue(val));
25    }
26    key
27}
28
29/// Extracts the plain Values from a row key (for chunk reconstruction).
30fn row_values(key: &RowKey) -> Vec<Value> {
31    key.iter().map(|hv| hv.0.clone()).collect()
32}
33
34/// Materializes all rows from an operator into a vector of row keys.
35fn materialize(op: &mut dyn Operator) -> Result<Vec<RowKey>, OperatorError> {
36    let mut rows = Vec::new();
37    while let Some(chunk) = op.next()? {
38        for row in chunk.selected_indices() {
39            rows.push(row_key(&chunk, row));
40        }
41    }
42    Ok(rows)
43}
44
45/// Rebuilds a `DataChunk` from a set of row keys.
46fn rows_to_chunk(rows: &[RowKey], schema: &[LogicalType]) -> DataChunk {
47    if rows.is_empty() {
48        return DataChunk::empty();
49    }
50    let mut builder = DataChunkBuilder::new(schema);
51    for row in rows {
52        let values = row_values(row);
53        for (col_idx, val) in values.into_iter().enumerate() {
54            if let Some(col) = builder.column_mut(col_idx) {
55                col.push_value(val);
56            }
57        }
58        builder.advance_row();
59    }
60    builder.finish()
61}
62
63/// EXCEPT operator: rows in left that are not in right.
64pub struct ExceptOperator {
65    left: Box<dyn Operator>,
66    right: Box<dyn Operator>,
67    all: bool,
68    output_schema: Vec<LogicalType>,
69    result: Option<Vec<RowKey>>,
70    position: usize,
71}
72
73impl ExceptOperator {
74    /// Creates a new EXCEPT operator.
75    pub fn new(
76        left: Box<dyn Operator>,
77        right: Box<dyn Operator>,
78        all: bool,
79        output_schema: Vec<LogicalType>,
80    ) -> Self {
81        Self {
82            left,
83            right,
84            all,
85            output_schema,
86            result: None,
87            position: 0,
88        }
89    }
90
91    fn compute(&mut self) -> Result<(), OperatorError> {
92        let left_rows = materialize(self.left.as_mut())?;
93        let right_rows = materialize(self.right.as_mut())?;
94
95        if self.all {
96            // EXCEPT ALL: for each right row, remove one matching left row
97            let mut result = left_rows;
98            for right_row in &right_rows {
99                if let Some(pos) = result.iter().position(|r| r == right_row) {
100                    result.remove(pos);
101                }
102            }
103            self.result = Some(result);
104        } else {
105            // EXCEPT DISTINCT: remove all matching rows
106            let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
107            let mut seen = HashSet::new();
108            let result: Vec<RowKey> = left_rows
109                .into_iter()
110                .filter(|row| !right_set.contains(row) && seen.insert(row.clone()))
111                .collect();
112            self.result = Some(result);
113        }
114        Ok(())
115    }
116}
117
118impl Operator for ExceptOperator {
119    fn next(&mut self) -> OperatorResult {
120        if self.result.is_none() {
121            self.compute()?;
122        }
123        let rows = self.result.as_ref().unwrap();
124        if self.position >= rows.len() {
125            return Ok(None);
126        }
127        // Emit up to 1024 rows per chunk
128        let end = (self.position + 1024).min(rows.len());
129        let batch = &rows[self.position..end];
130        self.position = end;
131        if batch.is_empty() {
132            Ok(None)
133        } else {
134            Ok(Some(rows_to_chunk(batch, &self.output_schema)))
135        }
136    }
137
138    fn reset(&mut self) {
139        self.left.reset();
140        self.right.reset();
141        self.result = None;
142        self.position = 0;
143    }
144
145    fn name(&self) -> &'static str {
146        "Except"
147    }
148}
149
150/// INTERSECT operator: rows common to both inputs.
151pub struct IntersectOperator {
152    left: Box<dyn Operator>,
153    right: Box<dyn Operator>,
154    all: bool,
155    output_schema: Vec<LogicalType>,
156    result: Option<Vec<RowKey>>,
157    position: usize,
158}
159
160impl IntersectOperator {
161    /// Creates a new INTERSECT operator.
162    pub fn new(
163        left: Box<dyn Operator>,
164        right: Box<dyn Operator>,
165        all: bool,
166        output_schema: Vec<LogicalType>,
167    ) -> Self {
168        Self {
169            left,
170            right,
171            all,
172            output_schema,
173            result: None,
174            position: 0,
175        }
176    }
177
178    fn compute(&mut self) -> Result<(), OperatorError> {
179        let left_rows = materialize(self.left.as_mut())?;
180        let right_rows = materialize(self.right.as_mut())?;
181
182        if self.all {
183            // INTERSECT ALL: each right row matches at most one left row
184            let mut remaining_right = right_rows;
185            let mut result = Vec::new();
186            for left_row in &left_rows {
187                if let Some(pos) = remaining_right.iter().position(|r| r == left_row) {
188                    result.push(left_row.clone());
189                    remaining_right.remove(pos);
190                }
191            }
192            self.result = Some(result);
193        } else {
194            // INTERSECT DISTINCT: rows present in both, deduplicated
195            let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
196            let mut seen = HashSet::new();
197            let result: Vec<RowKey> = left_rows
198                .into_iter()
199                .filter(|row| right_set.contains(row) && seen.insert(row.clone()))
200                .collect();
201            self.result = Some(result);
202        }
203        Ok(())
204    }
205}
206
207impl Operator for IntersectOperator {
208    fn next(&mut self) -> OperatorResult {
209        if self.result.is_none() {
210            self.compute()?;
211        }
212        let rows = self.result.as_ref().unwrap();
213        if self.position >= rows.len() {
214            return Ok(None);
215        }
216        let end = (self.position + 1024).min(rows.len());
217        let batch = &rows[self.position..end];
218        self.position = end;
219        if batch.is_empty() {
220            Ok(None)
221        } else {
222            Ok(Some(rows_to_chunk(batch, &self.output_schema)))
223        }
224    }
225
226    fn reset(&mut self) {
227        self.left.reset();
228        self.right.reset();
229        self.result = None;
230        self.position = 0;
231    }
232
233    fn name(&self) -> &'static str {
234        "Intersect"
235    }
236}
237
238/// OTHERWISE operator: use left result if non-empty, otherwise use right.
239pub struct OtherwiseOperator {
240    left: Box<dyn Operator>,
241    right: Box<dyn Operator>,
242    /// Which input we are currently streaming from.
243    state: OtherwiseState,
244}
245
246enum OtherwiseState {
247    /// Haven't started yet, need to probe left.
248    Init,
249    /// Left produced rows: buffer first chunk, then stream rest of left.
250    StreamingLeft(Option<DataChunk>),
251    /// Left was empty: stream right.
252    StreamingRight,
253    /// Done.
254    Done,
255}
256
257impl OtherwiseOperator {
258    /// Creates a new OTHERWISE operator.
259    pub fn new(left: Box<dyn Operator>, right: Box<dyn Operator>) -> Self {
260        Self {
261            left,
262            right,
263            state: OtherwiseState::Init,
264        }
265    }
266}
267
268impl Operator for OtherwiseOperator {
269    fn next(&mut self) -> OperatorResult {
270        loop {
271            match &mut self.state {
272                OtherwiseState::Init => {
273                    // Probe left for first chunk
274                    if let Some(chunk) = self.left.next()? {
275                        self.state = OtherwiseState::StreamingLeft(Some(chunk));
276                    } else {
277                        // Left is empty, switch to right
278                        self.state = OtherwiseState::StreamingRight;
279                    }
280                }
281                OtherwiseState::StreamingLeft(buffered) => {
282                    if let Some(chunk) = buffered.take() {
283                        return Ok(Some(chunk));
284                    }
285                    // Continue streaming from left
286                    match self.left.next()? {
287                        Some(chunk) => return Ok(Some(chunk)),
288                        None => {
289                            self.state = OtherwiseState::Done;
290                            return Ok(None);
291                        }
292                    }
293                }
294                OtherwiseState::StreamingRight => match self.right.next()? {
295                    Some(chunk) => return Ok(Some(chunk)),
296                    None => {
297                        self.state = OtherwiseState::Done;
298                        return Ok(None);
299                    }
300                },
301                OtherwiseState::Done => return Ok(None),
302            }
303        }
304    }
305
306    fn reset(&mut self) {
307        self.left.reset();
308        self.right.reset();
309        self.state = OtherwiseState::Init;
310    }
311
312    fn name(&self) -> &'static str {
313        "Otherwise"
314    }
315}