Skip to main content

grafeo_core/execution/operators/push/
distinct.rs

1//! Push-based distinct operator.
2
3use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7use crate::execution::vector::ValueVector;
8use grafeo_common::types::Value;
9use std::collections::HashSet;
10
11/// Hash key for distinct tracking.
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13struct RowKey(Vec<u64>);
14
15impl RowKey {
16    fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
17        let hashes: Vec<u64> = columns
18            .iter()
19            .map(|&col| {
20                chunk
21                    .column(col)
22                    .and_then(|c| c.get_value(row))
23                    .map_or(0, |v| hash_value(&v))
24            })
25            .collect();
26        Self(hashes)
27    }
28
29    fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
30        let hashes: Vec<u64> = (0..chunk.column_count())
31            .map(|col| {
32                chunk
33                    .column(col)
34                    .and_then(|c| c.get_value(row))
35                    .map_or(0, |v| hash_value(&v))
36            })
37            .collect();
38        Self(hashes)
39    }
40}
41
42fn hash_value(value: &Value) -> u64 {
43    use std::collections::hash_map::DefaultHasher;
44    use std::hash::Hasher;
45
46    let mut hasher = DefaultHasher::new();
47    hash_value_into(value, &mut hasher);
48    hasher.finish()
49}
50
51/// Recursively hashes a Value into a Hasher without relying on Debug output.
52///
53/// Each variant is prefixed with a discriminant tag to prevent cross-type collisions.
54fn hash_value_into(value: &Value, hasher: &mut impl std::hash::Hasher) {
55    use std::hash::Hash;
56
57    std::mem::discriminant(value).hash(hasher);
58    match value {
59        Value::Null => {}
60        Value::Bool(b) => b.hash(hasher),
61        Value::Int64(i) => i.hash(hasher),
62        Value::Float64(f) => f.to_bits().hash(hasher),
63        Value::String(s) => s.hash(hasher),
64        Value::Bytes(b) => b.hash(hasher),
65        Value::List(items) => {
66            items.len().hash(hasher);
67            for item in items.iter() {
68                hash_value_into(item, hasher);
69            }
70        }
71        Value::Map(map) => {
72            map.len().hash(hasher);
73            // BTreeMap iterates in key order: deterministic
74            for (k, v) in map.iter() {
75                k.as_str().hash(hasher);
76                hash_value_into(v, hasher);
77            }
78        }
79        Value::Vector(vec) => {
80            vec.len().hash(hasher);
81            for f in vec.iter() {
82                f.to_bits().hash(hasher);
83            }
84        }
85        Value::Path { nodes, edges } => {
86            nodes.len().hash(hasher);
87            for n in nodes.iter() {
88                hash_value_into(n, hasher);
89            }
90            edges.len().hash(hasher);
91            for e in edges.iter() {
92                hash_value_into(e, hasher);
93            }
94        }
95        // Temporal and other scalar types: use their Display representation
96        // which is stable and semantically meaningful (ISO 8601 for dates, etc.)
97        _ => format!("{value}").hash(hasher),
98    }
99}
100
101/// Push-based distinct operator.
102///
103/// Filters out duplicate rows based on all columns or specified columns.
104/// This operator maintains state (seen values) but can produce output
105/// incrementally as new unique rows arrive.
106pub struct DistinctPushOperator {
107    /// Columns to check for distinctness (None = all columns).
108    columns: Option<Vec<usize>>,
109    /// Set of seen row hashes.
110    seen: HashSet<RowKey>,
111}
112
113impl DistinctPushOperator {
114    /// Create a distinct operator on all columns.
115    pub fn new() -> Self {
116        Self {
117            columns: None,
118            seen: HashSet::new(),
119        }
120    }
121
122    /// Create a distinct operator on specific columns.
123    pub fn on_columns(columns: Vec<usize>) -> Self {
124        Self {
125            columns: Some(columns),
126            seen: HashSet::new(),
127        }
128    }
129
130    /// Get the number of unique rows seen.
131    pub fn unique_count(&self) -> usize {
132        self.seen.len()
133    }
134}
135
136impl Default for DistinctPushOperator {
137    fn default() -> Self {
138        Self::new()
139    }
140}
141
142impl PushOperator for DistinctPushOperator {
143    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
144        if chunk.is_empty() {
145            return Ok(true);
146        }
147
148        // Find rows that are new (not seen before)
149        let mut new_indices = Vec::new();
150
151        for row in chunk.selected_indices() {
152            let key = match &self.columns {
153                Some(cols) => RowKey::from_row(&chunk, row, cols),
154                None => RowKey::from_all_columns(&chunk, row),
155            };
156
157            if self.seen.insert(key) {
158                new_indices.push(row);
159            }
160        }
161
162        if new_indices.is_empty() {
163            return Ok(true);
164        }
165
166        // Create filtered chunk with only new rows
167        let selection = SelectionVector::from_predicate(chunk.len(), |i| new_indices.contains(&i));
168        let filtered = chunk.filter(&selection);
169
170        sink.consume(filtered)
171    }
172
173    fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
174        // Nothing to finalize - all output was produced incrementally
175        Ok(())
176    }
177
178    fn preferred_chunk_size(&self) -> ChunkSizeHint {
179        ChunkSizeHint::Default
180    }
181
182    fn name(&self) -> &'static str {
183        "DistinctPush"
184    }
185}
186
187/// Push-based distinct operator that materializes all input first.
188///
189/// This is a true pipeline breaker that buffers all rows and produces
190/// distinct output in the finalize phase. Use this when you need
191/// deterministic ordering of output.
192pub struct DistinctMaterializingOperator {
193    /// Columns to check for distinctness.
194    columns: Option<Vec<usize>>,
195    /// Buffered unique rows.
196    rows: Vec<Vec<Value>>,
197    /// Set of seen row hashes.
198    seen: HashSet<RowKey>,
199    /// Number of columns.
200    num_columns: Option<usize>,
201}
202
203impl DistinctMaterializingOperator {
204    /// Create a distinct operator on all columns.
205    pub fn new() -> Self {
206        Self {
207            columns: None,
208            rows: Vec::new(),
209            seen: HashSet::new(),
210            num_columns: None,
211        }
212    }
213
214    /// Create a distinct operator on specific columns.
215    pub fn on_columns(columns: Vec<usize>) -> Self {
216        Self {
217            columns: Some(columns),
218            rows: Vec::new(),
219            seen: HashSet::new(),
220            num_columns: None,
221        }
222    }
223}
224
225impl Default for DistinctMaterializingOperator {
226    fn default() -> Self {
227        Self::new()
228    }
229}
230
231impl PushOperator for DistinctMaterializingOperator {
232    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
233        if chunk.is_empty() {
234            return Ok(true);
235        }
236
237        if self.num_columns.is_none() {
238            self.num_columns = Some(chunk.column_count());
239        }
240
241        let num_cols = chunk.column_count();
242
243        for row in chunk.selected_indices() {
244            let key = match &self.columns {
245                Some(cols) => RowKey::from_row(&chunk, row, cols),
246                None => RowKey::from_all_columns(&chunk, row),
247            };
248
249            if self.seen.insert(key) {
250                // Store the full row
251                let row_values: Vec<Value> = (0..num_cols)
252                    .map(|col| {
253                        chunk
254                            .column(col)
255                            .and_then(|c| c.get_value(row))
256                            .unwrap_or(Value::Null)
257                    })
258                    .collect();
259                self.rows.push(row_values);
260            }
261        }
262
263        Ok(true)
264    }
265
266    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
267        if self.rows.is_empty() {
268            return Ok(());
269        }
270
271        let num_cols = self.num_columns.unwrap_or(0);
272        let mut columns: Vec<ValueVector> = (0..num_cols).map(|_| ValueVector::new()).collect();
273
274        for row in &self.rows {
275            for (col_idx, col) in columns.iter_mut().enumerate() {
276                let val = row.get(col_idx).cloned().unwrap_or(Value::Null);
277                col.push(val);
278            }
279        }
280
281        let chunk = DataChunk::new(columns);
282        sink.consume(chunk)?;
283
284        Ok(())
285    }
286
287    fn preferred_chunk_size(&self) -> ChunkSizeHint {
288        ChunkSizeHint::Default
289    }
290
291    fn name(&self) -> &'static str {
292        "DistinctMaterializing"
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use crate::execution::sink::CollectorSink;
300
301    fn create_test_chunk(values: &[i64]) -> DataChunk {
302        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
303        let vector = ValueVector::from_values(&v);
304        DataChunk::new(vec![vector])
305    }
306
307    #[test]
308    fn test_distinct_all_unique() {
309        let mut distinct = DistinctPushOperator::new();
310        let mut sink = CollectorSink::new();
311
312        distinct
313            .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
314            .unwrap();
315        distinct.finalize(&mut sink).unwrap();
316
317        assert_eq!(sink.row_count(), 5);
318        assert_eq!(distinct.unique_count(), 5);
319    }
320
321    #[test]
322    fn test_distinct_with_duplicates() {
323        let mut distinct = DistinctPushOperator::new();
324        let mut sink = CollectorSink::new();
325
326        distinct
327            .push(create_test_chunk(&[1, 2, 1, 3, 2, 1, 4]), &mut sink)
328            .unwrap();
329        distinct.finalize(&mut sink).unwrap();
330
331        assert_eq!(sink.row_count(), 4); // 1, 2, 3, 4
332        assert_eq!(distinct.unique_count(), 4);
333    }
334
335    #[test]
336    fn test_distinct_all_same() {
337        let mut distinct = DistinctPushOperator::new();
338        let mut sink = CollectorSink::new();
339
340        distinct
341            .push(create_test_chunk(&[5, 5, 5, 5, 5]), &mut sink)
342            .unwrap();
343        distinct.finalize(&mut sink).unwrap();
344
345        assert_eq!(sink.row_count(), 1);
346        assert_eq!(distinct.unique_count(), 1);
347    }
348
349    #[test]
350    fn test_distinct_multiple_chunks() {
351        let mut distinct = DistinctPushOperator::new();
352        let mut sink = CollectorSink::new();
353
354        distinct
355            .push(create_test_chunk(&[1, 2, 3]), &mut sink)
356            .unwrap();
357        distinct
358            .push(create_test_chunk(&[2, 3, 4]), &mut sink)
359            .unwrap();
360        distinct
361            .push(create_test_chunk(&[3, 4, 5]), &mut sink)
362            .unwrap();
363        distinct.finalize(&mut sink).unwrap();
364
365        assert_eq!(sink.row_count(), 5); // 1, 2, 3, 4, 5
366    }
367
368    #[test]
369    fn test_distinct_materializing() {
370        let mut distinct = DistinctMaterializingOperator::new();
371        let mut sink = CollectorSink::new();
372
373        distinct
374            .push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
375            .unwrap();
376        distinct.finalize(&mut sink).unwrap();
377
378        // All output comes in finalize
379        let chunks = sink.into_chunks();
380        assert_eq!(chunks.len(), 1);
381        assert_eq!(chunks[0].len(), 7); // 7 unique values
382    }
383
384    fn create_mixed_chunk(values: &[Value]) -> DataChunk {
385        let vector = ValueVector::from_values(values);
386        DataChunk::new(vec![vector])
387    }
388
389    #[test]
390    fn test_distinct_null_values() {
391        let mut distinct = DistinctPushOperator::new();
392        let mut sink = CollectorSink::new();
393
394        let chunk = create_mixed_chunk(&[Value::Null, Value::Null, Value::Int64(1)]);
395        distinct.push(chunk, &mut sink).unwrap();
396        distinct.finalize(&mut sink).unwrap();
397        assert_eq!(distinct.unique_count(), 2); // Null + 1
398    }
399
400    #[test]
401    fn test_distinct_bool_values() {
402        let mut distinct = DistinctPushOperator::new();
403        let mut sink = CollectorSink::new();
404
405        let chunk = create_mixed_chunk(&[Value::Bool(true), Value::Bool(false), Value::Bool(true)]);
406        distinct.push(chunk, &mut sink).unwrap();
407        distinct.finalize(&mut sink).unwrap();
408        assert_eq!(distinct.unique_count(), 2);
409    }
410
411    #[test]
412    fn test_distinct_float_values() {
413        let mut distinct = DistinctPushOperator::new();
414        let mut sink = CollectorSink::new();
415
416        let chunk = create_mixed_chunk(&[
417            Value::Float64(1.0),
418            Value::Float64(2.0),
419            Value::Float64(1.0),
420            Value::Float64(f64::NAN),
421        ]);
422        distinct.push(chunk, &mut sink).unwrap();
423        distinct.finalize(&mut sink).unwrap();
424        assert_eq!(distinct.unique_count(), 3); // 1.0, 2.0, NaN
425    }
426
427    #[test]
428    fn test_distinct_string_values() {
429        let mut distinct = DistinctPushOperator::new();
430        let mut sink = CollectorSink::new();
431
432        let chunk =
433            create_mixed_chunk(&[Value::from("Alix"), Value::from("Gus"), Value::from("Alix")]);
434        distinct.push(chunk, &mut sink).unwrap();
435        distinct.finalize(&mut sink).unwrap();
436        assert_eq!(distinct.unique_count(), 2);
437    }
438
439    #[test]
440    fn test_distinct_bytes_values() {
441        let mut distinct = DistinctPushOperator::new();
442        let mut sink = CollectorSink::new();
443
444        let chunk = create_mixed_chunk(&[
445            Value::Bytes(vec![1u8, 2, 3].into()),
446            Value::Bytes(vec![4u8, 5, 6].into()),
447            Value::Bytes(vec![1u8, 2, 3].into()),
448        ]);
449        distinct.push(chunk, &mut sink).unwrap();
450        distinct.finalize(&mut sink).unwrap();
451        assert_eq!(distinct.unique_count(), 2);
452    }
453
454    #[test]
455    fn test_distinct_list_values() {
456        let mut distinct = DistinctPushOperator::new();
457        let mut sink = CollectorSink::new();
458
459        let chunk = create_mixed_chunk(&[
460            Value::List(vec![Value::Int64(1), Value::Int64(2)].into()),
461            Value::List(vec![Value::Int64(3), Value::Int64(4)].into()),
462            Value::List(vec![Value::Int64(1), Value::Int64(2)].into()),
463        ]);
464        distinct.push(chunk, &mut sink).unwrap();
465        distinct.finalize(&mut sink).unwrap();
466        assert_eq!(distinct.unique_count(), 2);
467    }
468
469    #[test]
470    fn test_distinct_map_values() {
471        use std::collections::BTreeMap;
472
473        let mut map1 = BTreeMap::new();
474        map1.insert("a".into(), Value::Int64(1));
475        let mut map2 = BTreeMap::new();
476        map2.insert("b".into(), Value::Int64(2));
477
478        let mut distinct = DistinctPushOperator::new();
479        let mut sink = CollectorSink::new();
480
481        let chunk = create_mixed_chunk(&[
482            Value::Map(map1.clone().into()),
483            Value::Map(map2.into()),
484            Value::Map(map1.into()),
485        ]);
486        distinct.push(chunk, &mut sink).unwrap();
487        distinct.finalize(&mut sink).unwrap();
488        assert_eq!(distinct.unique_count(), 2);
489    }
490
491    #[test]
492    fn test_distinct_vector_values() {
493        let mut distinct = DistinctPushOperator::new();
494        let mut sink = CollectorSink::new();
495
496        let chunk = create_mixed_chunk(&[
497            Value::Vector(vec![1.0_f32, 2.0].into()),
498            Value::Vector(vec![3.0_f32, 4.0].into()),
499            Value::Vector(vec![1.0_f32, 2.0].into()),
500        ]);
501        distinct.push(chunk, &mut sink).unwrap();
502        distinct.finalize(&mut sink).unwrap();
503        assert_eq!(distinct.unique_count(), 2);
504    }
505
506    #[test]
507    fn test_distinct_path_values() {
508        let mut distinct = DistinctPushOperator::new();
509        let mut sink = CollectorSink::new();
510
511        let path1 = Value::Path {
512            nodes: vec![Value::Int64(1), Value::Int64(2)].into(),
513            edges: vec![Value::Int64(10)].into(),
514        };
515        let path2 = Value::Path {
516            nodes: vec![Value::Int64(3), Value::Int64(4)].into(),
517            edges: vec![Value::Int64(20)].into(),
518        };
519
520        let chunk = create_mixed_chunk(&[path1.clone(), path2, path1]);
521        distinct.push(chunk, &mut sink).unwrap();
522        distinct.finalize(&mut sink).unwrap();
523        assert_eq!(distinct.unique_count(), 2);
524    }
525
526    #[test]
527    fn test_distinct_mixed_types_are_distinct() {
528        let mut distinct = DistinctPushOperator::new();
529        let mut sink = CollectorSink::new();
530
531        // Different types with "similar" content should be distinct
532        let chunk = create_mixed_chunk(&[
533            Value::Int64(1),
534            Value::Float64(1.0),
535            Value::from("1"),
536            Value::Bool(true),
537        ]);
538        distinct.push(chunk, &mut sink).unwrap();
539        distinct.finalize(&mut sink).unwrap();
540        assert_eq!(distinct.unique_count(), 4);
541    }
542
543    #[test]
544    fn test_hash_value_deterministic() {
545        // Same value should always produce the same hash
546        let v1 = Value::from("test");
547        let v2 = Value::from("test");
548        assert_eq!(hash_value(&v1), hash_value(&v2));
549
550        // Different values should (almost certainly) produce different hashes
551        let v3 = Value::from("other");
552        assert_ne!(hash_value(&v1), hash_value(&v3));
553    }
554}