Skip to main content

grafeo_core/execution/operators/push/
distinct.rs

1//! Push-based distinct operator.
2
3use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7use crate::execution::vector::ValueVector;
8use grafeo_common::types::Value;
9use std::collections::HashSet;
10
11/// Hash key for distinct tracking.
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13struct RowKey(Vec<u64>);
14
15impl RowKey {
16    fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
17        let hashes: Vec<u64> = columns
18            .iter()
19            .map(|&col| {
20                chunk
21                    .column(col)
22                    .and_then(|c| c.get_value(row))
23                    .map(|v| hash_value(&v))
24                    .unwrap_or(0)
25            })
26            .collect();
27        Self(hashes)
28    }
29
30    fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
31        let hashes: Vec<u64> = (0..chunk.column_count())
32            .map(|col| {
33                chunk
34                    .column(col)
35                    .and_then(|c| c.get_value(row))
36                    .map(|v| hash_value(&v))
37                    .unwrap_or(0)
38            })
39            .collect();
40        Self(hashes)
41    }
42}
43
44fn hash_value(value: &Value) -> u64 {
45    use std::collections::hash_map::DefaultHasher;
46    use std::hash::{Hash, Hasher};
47
48    let mut hasher = DefaultHasher::new();
49    match value {
50        Value::Null => 0u8.hash(&mut hasher),
51        Value::Bool(b) => b.hash(&mut hasher),
52        Value::Int64(i) => i.hash(&mut hasher),
53        Value::Float64(f) => f.to_bits().hash(&mut hasher),
54        Value::String(s) => s.hash(&mut hasher),
55        _ => 0u8.hash(&mut hasher),
56    }
57    hasher.finish()
58}
59
60/// Push-based distinct operator.
61///
62/// Filters out duplicate rows based on all columns or specified columns.
63/// This operator maintains state (seen values) but can produce output
64/// incrementally as new unique rows arrive.
65pub struct DistinctPushOperator {
66    /// Columns to check for distinctness (None = all columns).
67    columns: Option<Vec<usize>>,
68    /// Set of seen row hashes.
69    seen: HashSet<RowKey>,
70}
71
72impl DistinctPushOperator {
73    /// Create a distinct operator on all columns.
74    pub fn new() -> Self {
75        Self {
76            columns: None,
77            seen: HashSet::new(),
78        }
79    }
80
81    /// Create a distinct operator on specific columns.
82    pub fn on_columns(columns: Vec<usize>) -> Self {
83        Self {
84            columns: Some(columns),
85            seen: HashSet::new(),
86        }
87    }
88
89    /// Get the number of unique rows seen.
90    pub fn unique_count(&self) -> usize {
91        self.seen.len()
92    }
93}
94
95impl Default for DistinctPushOperator {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101impl PushOperator for DistinctPushOperator {
102    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
103        if chunk.is_empty() {
104            return Ok(true);
105        }
106
107        // Find rows that are new (not seen before)
108        let mut new_indices = Vec::new();
109
110        for row in chunk.selected_indices() {
111            let key = match &self.columns {
112                Some(cols) => RowKey::from_row(&chunk, row, cols),
113                None => RowKey::from_all_columns(&chunk, row),
114            };
115
116            if self.seen.insert(key) {
117                new_indices.push(row);
118            }
119        }
120
121        if new_indices.is_empty() {
122            return Ok(true);
123        }
124
125        // Create filtered chunk with only new rows
126        let selection = SelectionVector::from_predicate(chunk.len(), |i| new_indices.contains(&i));
127        let filtered = chunk.filter(&selection);
128
129        sink.consume(filtered)
130    }
131
132    fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
133        // Nothing to finalize - all output was produced incrementally
134        Ok(())
135    }
136
137    fn preferred_chunk_size(&self) -> ChunkSizeHint {
138        ChunkSizeHint::Default
139    }
140
141    fn name(&self) -> &'static str {
142        "DistinctPush"
143    }
144}
145
146/// Push-based distinct operator that materializes all input first.
147///
148/// This is a true pipeline breaker that buffers all rows and produces
149/// distinct output in the finalize phase. Use this when you need
150/// deterministic ordering of output.
151pub struct DistinctMaterializingOperator {
152    /// Columns to check for distinctness.
153    columns: Option<Vec<usize>>,
154    /// Buffered unique rows.
155    rows: Vec<Vec<Value>>,
156    /// Set of seen row hashes.
157    seen: HashSet<RowKey>,
158    /// Number of columns.
159    num_columns: Option<usize>,
160}
161
162impl DistinctMaterializingOperator {
163    /// Create a distinct operator on all columns.
164    pub fn new() -> Self {
165        Self {
166            columns: None,
167            rows: Vec::new(),
168            seen: HashSet::new(),
169            num_columns: None,
170        }
171    }
172
173    /// Create a distinct operator on specific columns.
174    pub fn on_columns(columns: Vec<usize>) -> Self {
175        Self {
176            columns: Some(columns),
177            rows: Vec::new(),
178            seen: HashSet::new(),
179            num_columns: None,
180        }
181    }
182}
183
184impl Default for DistinctMaterializingOperator {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl PushOperator for DistinctMaterializingOperator {
191    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
192        if chunk.is_empty() {
193            return Ok(true);
194        }
195
196        if self.num_columns.is_none() {
197            self.num_columns = Some(chunk.column_count());
198        }
199
200        let num_cols = chunk.column_count();
201
202        for row in chunk.selected_indices() {
203            let key = match &self.columns {
204                Some(cols) => RowKey::from_row(&chunk, row, cols),
205                None => RowKey::from_all_columns(&chunk, row),
206            };
207
208            if self.seen.insert(key) {
209                // Store the full row
210                let row_values: Vec<Value> = (0..num_cols)
211                    .map(|col| {
212                        chunk
213                            .column(col)
214                            .and_then(|c| c.get_value(row))
215                            .unwrap_or(Value::Null)
216                    })
217                    .collect();
218                self.rows.push(row_values);
219            }
220        }
221
222        Ok(true)
223    }
224
225    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
226        if self.rows.is_empty() {
227            return Ok(());
228        }
229
230        let num_cols = self.num_columns.unwrap_or(0);
231        let mut columns: Vec<ValueVector> = (0..num_cols).map(|_| ValueVector::new()).collect();
232
233        for row in &self.rows {
234            for (col_idx, col) in columns.iter_mut().enumerate() {
235                let val = row.get(col_idx).cloned().unwrap_or(Value::Null);
236                col.push(val);
237            }
238        }
239
240        let chunk = DataChunk::new(columns);
241        sink.consume(chunk)?;
242
243        Ok(())
244    }
245
246    fn preferred_chunk_size(&self) -> ChunkSizeHint {
247        ChunkSizeHint::Default
248    }
249
250    fn name(&self) -> &'static str {
251        "DistinctMaterializing"
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use crate::execution::sink::CollectorSink;
259
260    fn create_test_chunk(values: &[i64]) -> DataChunk {
261        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
262        let vector = ValueVector::from_values(&v);
263        DataChunk::new(vec![vector])
264    }
265
266    #[test]
267    fn test_distinct_all_unique() {
268        let mut distinct = DistinctPushOperator::new();
269        let mut sink = CollectorSink::new();
270
271        distinct
272            .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
273            .unwrap();
274        distinct.finalize(&mut sink).unwrap();
275
276        assert_eq!(sink.row_count(), 5);
277        assert_eq!(distinct.unique_count(), 5);
278    }
279
280    #[test]
281    fn test_distinct_with_duplicates() {
282        let mut distinct = DistinctPushOperator::new();
283        let mut sink = CollectorSink::new();
284
285        distinct
286            .push(create_test_chunk(&[1, 2, 1, 3, 2, 1, 4]), &mut sink)
287            .unwrap();
288        distinct.finalize(&mut sink).unwrap();
289
290        assert_eq!(sink.row_count(), 4); // 1, 2, 3, 4
291        assert_eq!(distinct.unique_count(), 4);
292    }
293
294    #[test]
295    fn test_distinct_all_same() {
296        let mut distinct = DistinctPushOperator::new();
297        let mut sink = CollectorSink::new();
298
299        distinct
300            .push(create_test_chunk(&[5, 5, 5, 5, 5]), &mut sink)
301            .unwrap();
302        distinct.finalize(&mut sink).unwrap();
303
304        assert_eq!(sink.row_count(), 1);
305        assert_eq!(distinct.unique_count(), 1);
306    }
307
308    #[test]
309    fn test_distinct_multiple_chunks() {
310        let mut distinct = DistinctPushOperator::new();
311        let mut sink = CollectorSink::new();
312
313        distinct
314            .push(create_test_chunk(&[1, 2, 3]), &mut sink)
315            .unwrap();
316        distinct
317            .push(create_test_chunk(&[2, 3, 4]), &mut sink)
318            .unwrap();
319        distinct
320            .push(create_test_chunk(&[3, 4, 5]), &mut sink)
321            .unwrap();
322        distinct.finalize(&mut sink).unwrap();
323
324        assert_eq!(sink.row_count(), 5); // 1, 2, 3, 4, 5
325    }
326
327    #[test]
328    fn test_distinct_materializing() {
329        let mut distinct = DistinctMaterializingOperator::new();
330        let mut sink = CollectorSink::new();
331
332        distinct
333            .push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
334            .unwrap();
335        distinct.finalize(&mut sink).unwrap();
336
337        // All output comes in finalize
338        let chunks = sink.into_chunks();
339        assert_eq!(chunks.len(), 1);
340        assert_eq!(chunks[0].len(), 7); // 7 unique values
341    }
342}