grafeo_core/execution/operators/
set_ops.rs1use std::collections::HashSet;
7
8use grafeo_common::types::{HashableValue, LogicalType, Value};
9
10use super::{DataChunk, Operator, OperatorError, OperatorResult};
11use crate::execution::chunk::DataChunkBuilder;
12
13type RowKey = Vec<HashableValue>;
15
16fn row_key(chunk: &DataChunk, row: usize) -> RowKey {
18 let mut key = Vec::with_capacity(chunk.num_columns());
19 for col_idx in 0..chunk.num_columns() {
20 let val = chunk
21 .column(col_idx)
22 .and_then(|col| col.get_value(row))
23 .unwrap_or(Value::Null);
24 key.push(HashableValue(val));
25 }
26 key
27}
28
29fn row_values(key: &RowKey) -> Vec<Value> {
31 key.iter().map(|hv| hv.0.clone()).collect()
32}
33
34fn materialize(op: &mut dyn Operator) -> Result<Vec<RowKey>, OperatorError> {
36 let mut rows = Vec::new();
37 while let Some(chunk) = op.next()? {
38 for row in chunk.selected_indices() {
39 rows.push(row_key(&chunk, row));
40 }
41 }
42 Ok(rows)
43}
44
45fn rows_to_chunk(rows: &[RowKey], schema: &[LogicalType]) -> DataChunk {
47 if rows.is_empty() {
48 return DataChunk::empty();
49 }
50 let mut builder = DataChunkBuilder::new(schema);
51 for row in rows {
52 let values = row_values(row);
53 for (col_idx, val) in values.into_iter().enumerate() {
54 if let Some(col) = builder.column_mut(col_idx) {
55 col.push_value(val);
56 }
57 }
58 builder.advance_row();
59 }
60 builder.finish()
61}
62
63pub struct ExceptOperator {
65 left: Box<dyn Operator>,
66 right: Box<dyn Operator>,
67 all: bool,
68 output_schema: Vec<LogicalType>,
69 result: Option<Vec<RowKey>>,
70 position: usize,
71}
72
73impl ExceptOperator {
74 pub fn new(
76 left: Box<dyn Operator>,
77 right: Box<dyn Operator>,
78 all: bool,
79 output_schema: Vec<LogicalType>,
80 ) -> Self {
81 Self {
82 left,
83 right,
84 all,
85 output_schema,
86 result: None,
87 position: 0,
88 }
89 }
90
91 fn compute(&mut self) -> Result<(), OperatorError> {
92 let left_rows = materialize(self.left.as_mut())?;
93 let right_rows = materialize(self.right.as_mut())?;
94
95 if self.all {
96 let mut result = left_rows;
98 for right_row in &right_rows {
99 if let Some(pos) = result.iter().position(|r| r == right_row) {
100 result.remove(pos);
101 }
102 }
103 self.result = Some(result);
104 } else {
105 let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
107 let mut seen = HashSet::new();
108 let result: Vec<RowKey> = left_rows
109 .into_iter()
110 .filter(|row| !right_set.contains(row) && seen.insert(row.clone()))
111 .collect();
112 self.result = Some(result);
113 }
114 Ok(())
115 }
116}
117
118impl Operator for ExceptOperator {
119 fn next(&mut self) -> OperatorResult {
120 if self.result.is_none() {
121 self.compute()?;
122 }
123 let rows = self
124 .result
125 .as_ref()
126 .expect("result is Some: compute() called above");
127 if self.position >= rows.len() {
128 return Ok(None);
129 }
130 let end = (self.position + 1024).min(rows.len());
132 let batch = &rows[self.position..end];
133 self.position = end;
134 if batch.is_empty() {
135 Ok(None)
136 } else {
137 Ok(Some(rows_to_chunk(batch, &self.output_schema)))
138 }
139 }
140
141 fn reset(&mut self) {
142 self.left.reset();
143 self.right.reset();
144 self.result = None;
145 self.position = 0;
146 }
147
148 fn name(&self) -> &'static str {
149 "Except"
150 }
151
152 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
153 self
154 }
155}
156
157pub struct IntersectOperator {
159 left: Box<dyn Operator>,
160 right: Box<dyn Operator>,
161 all: bool,
162 output_schema: Vec<LogicalType>,
163 result: Option<Vec<RowKey>>,
164 position: usize,
165}
166
167impl IntersectOperator {
168 pub fn new(
170 left: Box<dyn Operator>,
171 right: Box<dyn Operator>,
172 all: bool,
173 output_schema: Vec<LogicalType>,
174 ) -> Self {
175 Self {
176 left,
177 right,
178 all,
179 output_schema,
180 result: None,
181 position: 0,
182 }
183 }
184
185 fn compute(&mut self) -> Result<(), OperatorError> {
186 let left_rows = materialize(self.left.as_mut())?;
187 let right_rows = materialize(self.right.as_mut())?;
188
189 if self.all {
190 let mut remaining_right = right_rows;
192 let mut result = Vec::new();
193 for left_row in &left_rows {
194 if let Some(pos) = remaining_right.iter().position(|r| r == left_row) {
195 result.push(left_row.clone());
196 remaining_right.remove(pos);
197 }
198 }
199 self.result = Some(result);
200 } else {
201 let right_set: HashSet<RowKey> = right_rows.into_iter().collect();
203 let mut seen = HashSet::new();
204 let result: Vec<RowKey> = left_rows
205 .into_iter()
206 .filter(|row| right_set.contains(row) && seen.insert(row.clone()))
207 .collect();
208 self.result = Some(result);
209 }
210 Ok(())
211 }
212}
213
214impl Operator for IntersectOperator {
215 fn next(&mut self) -> OperatorResult {
216 if self.result.is_none() {
217 self.compute()?;
218 }
219 let rows = self
220 .result
221 .as_ref()
222 .expect("result is Some: compute() called above");
223 if self.position >= rows.len() {
224 return Ok(None);
225 }
226 let end = (self.position + 1024).min(rows.len());
227 let batch = &rows[self.position..end];
228 self.position = end;
229 if batch.is_empty() {
230 Ok(None)
231 } else {
232 Ok(Some(rows_to_chunk(batch, &self.output_schema)))
233 }
234 }
235
236 fn reset(&mut self) {
237 self.left.reset();
238 self.right.reset();
239 self.result = None;
240 self.position = 0;
241 }
242
243 fn name(&self) -> &'static str {
244 "Intersect"
245 }
246
247 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
248 self
249 }
250}
251
252pub struct OtherwiseOperator {
254 left: Box<dyn Operator>,
255 right: Box<dyn Operator>,
256 state: OtherwiseState,
258}
259
260enum OtherwiseState {
261 Init,
263 StreamingLeft(Option<DataChunk>),
265 StreamingRight,
267 Done,
269}
270
271impl OtherwiseOperator {
272 pub fn new(left: Box<dyn Operator>, right: Box<dyn Operator>) -> Self {
274 Self {
275 left,
276 right,
277 state: OtherwiseState::Init,
278 }
279 }
280}
281
282impl Operator for OtherwiseOperator {
283 fn next(&mut self) -> OperatorResult {
284 loop {
285 match &mut self.state {
286 OtherwiseState::Init => {
287 if let Some(chunk) = self.left.next()? {
289 self.state = OtherwiseState::StreamingLeft(Some(chunk));
290 } else {
291 self.state = OtherwiseState::StreamingRight;
293 }
294 }
295 OtherwiseState::StreamingLeft(buffered) => {
296 if let Some(chunk) = buffered.take() {
297 return Ok(Some(chunk));
298 }
299 match self.left.next()? {
301 Some(chunk) => return Ok(Some(chunk)),
302 None => {
303 self.state = OtherwiseState::Done;
304 return Ok(None);
305 }
306 }
307 }
308 OtherwiseState::StreamingRight => match self.right.next()? {
309 Some(chunk) => return Ok(Some(chunk)),
310 None => {
311 self.state = OtherwiseState::Done;
312 return Ok(None);
313 }
314 },
315 OtherwiseState::Done => return Ok(None),
316 }
317 }
318 }
319
320 fn reset(&mut self) {
321 self.left.reset();
322 self.right.reset();
323 self.state = OtherwiseState::Init;
324 }
325
326 fn name(&self) -> &'static str {
327 "Otherwise"
328 }
329
330 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
331 self
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338 use crate::execution::chunk::DataChunkBuilder;
339
340 struct MockOperator {
341 chunks: Vec<DataChunk>,
342 position: usize,
343 }
344
345 impl MockOperator {
346 fn new(chunks: Vec<DataChunk>) -> Self {
347 Self {
348 chunks,
349 position: 0,
350 }
351 }
352 }
353
354 impl Operator for MockOperator {
355 fn next(&mut self) -> OperatorResult {
356 if self.position < self.chunks.len() {
357 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
358 self.position += 1;
359 Ok(Some(chunk))
360 } else {
361 Ok(None)
362 }
363 }
364
365 fn reset(&mut self) {
366 self.position = 0;
367 }
368
369 fn name(&self) -> &'static str {
370 "Mock"
371 }
372
373 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
374 self
375 }
376 }
377
378 fn create_int_chunk(values: &[i64]) -> DataChunk {
379 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
380 for &v in values {
381 builder.column_mut(0).unwrap().push_int64(v);
382 builder.advance_row();
383 }
384 builder.finish()
385 }
386
387 fn collect_ints(op: &mut dyn Operator) -> Vec<i64> {
388 let mut result = Vec::new();
389 while let Some(chunk) = op.next().unwrap() {
390 for row in chunk.selected_indices() {
391 if let Some(v) = chunk.column(0).and_then(|c| c.get_int64(row)) {
392 result.push(v);
393 }
394 }
395 }
396 result
397 }
398
399 #[test]
400 fn test_except_distinct() {
401 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 2])]);
402 let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
403 let mut op = ExceptOperator::new(
404 Box::new(left),
405 Box::new(right),
406 false,
407 vec![LogicalType::Int64],
408 );
409
410 let mut result = collect_ints(&mut op);
411 result.sort_unstable();
412 assert_eq!(result, vec![1, 3]);
413 }
414
415 #[test]
416 fn test_except_all() {
417 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 2, 3])]);
418 let right = MockOperator::new(vec![create_int_chunk(&[2])]);
419 let mut op = ExceptOperator::new(
420 Box::new(left),
421 Box::new(right),
422 true,
423 vec![LogicalType::Int64],
424 );
425
426 let mut result = collect_ints(&mut op);
427 result.sort_unstable();
428 assert_eq!(result, vec![1, 2, 3]);
430 }
431
432 #[test]
433 fn test_except_empty_right() {
434 let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
435 let right = MockOperator::new(vec![]);
436 let mut op = ExceptOperator::new(
437 Box::new(left),
438 Box::new(right),
439 false,
440 vec![LogicalType::Int64],
441 );
442
443 let mut result = collect_ints(&mut op);
444 result.sort_unstable();
445 assert_eq!(result, vec![1, 2]);
446 }
447
448 #[test]
449 fn test_intersect_distinct() {
450 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 2])]);
451 let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4])]);
452 let mut op = IntersectOperator::new(
453 Box::new(left),
454 Box::new(right),
455 false,
456 vec![LogicalType::Int64],
457 );
458
459 let mut result = collect_ints(&mut op);
460 result.sort_unstable();
461 assert_eq!(result, vec![2, 3]);
462 }
463
464 #[test]
465 fn test_intersect_all() {
466 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 2, 3])]);
467 let right = MockOperator::new(vec![create_int_chunk(&[2, 2, 4])]);
468 let mut op = IntersectOperator::new(
469 Box::new(left),
470 Box::new(right),
471 true,
472 vec![LogicalType::Int64],
473 );
474
475 let mut result = collect_ints(&mut op);
476 result.sort_unstable();
477 assert_eq!(result, vec![2, 2]);
478 }
479
480 #[test]
481 fn test_intersect_no_overlap() {
482 let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
483 let right = MockOperator::new(vec![create_int_chunk(&[3, 4])]);
484 let mut op = IntersectOperator::new(
485 Box::new(left),
486 Box::new(right),
487 false,
488 vec![LogicalType::Int64],
489 );
490
491 let result = collect_ints(&mut op);
492 assert!(result.is_empty());
493 }
494
495 #[test]
496 fn test_otherwise_left_nonempty() {
497 let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
498 let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
499 let mut op = OtherwiseOperator::new(Box::new(left), Box::new(right));
500
501 let result = collect_ints(&mut op);
502 assert_eq!(result, vec![1, 2]);
503 }
504
505 #[test]
506 fn test_otherwise_left_empty() {
507 let left = MockOperator::new(vec![]);
508 let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
509 let mut op = OtherwiseOperator::new(Box::new(left), Box::new(right));
510
511 let result = collect_ints(&mut op);
512 assert_eq!(result, vec![10, 20]);
513 }
514
515 #[test]
516 fn test_otherwise_both_empty() {
517 let left = MockOperator::new(vec![]);
518 let right = MockOperator::new(vec![]);
519 let mut op = OtherwiseOperator::new(Box::new(left), Box::new(right));
520
521 let result = collect_ints(&mut op);
522 assert!(result.is_empty());
523 }
524
525 #[test]
526 fn test_operator_names() {
527 let empty = || MockOperator::new(vec![]);
528
529 let op = ExceptOperator::new(Box::new(empty()), Box::new(empty()), false, vec![]);
530 assert_eq!(op.name(), "Except");
531
532 let op = IntersectOperator::new(Box::new(empty()), Box::new(empty()), false, vec![]);
533 assert_eq!(op.name(), "Intersect");
534
535 let op = OtherwiseOperator::new(Box::new(empty()), Box::new(empty()));
536 assert_eq!(op.name(), "Otherwise");
537 }
538
539 #[test]
540 fn test_into_any() {
541 let empty = || MockOperator::new(vec![]);
542
543 let op: Box<dyn Operator> = Box::new(ExceptOperator::new(
544 Box::new(empty()),
545 Box::new(empty()),
546 false,
547 vec![],
548 ));
549 assert!(op.into_any().downcast::<ExceptOperator>().is_ok());
550
551 let op: Box<dyn Operator> = Box::new(IntersectOperator::new(
552 Box::new(empty()),
553 Box::new(empty()),
554 false,
555 vec![],
556 ));
557 assert!(op.into_any().downcast::<IntersectOperator>().is_ok());
558
559 let op: Box<dyn Operator> =
560 Box::new(OtherwiseOperator::new(Box::new(empty()), Box::new(empty())));
561 assert!(op.into_any().downcast::<OtherwiseOperator>().is_ok());
562 }
563}