Skip to main content

grafeo_core/execution/operators/push/
distinct.rs

1//! Push-based distinct operator.
2
3use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7use crate::execution::vector::ValueVector;
8use grafeo_common::types::Value;
9use std::collections::HashSet;
10
11/// Hash key for distinct tracking.
12#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13struct RowKey(Vec<u64>);
14
15impl RowKey {
16    fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
17        let hashes: Vec<u64> = columns
18            .iter()
19            .map(|&col| {
20                chunk
21                    .column(col)
22                    .and_then(|c| c.get_value(row))
23                    .map_or(0, |v| hash_value(&v))
24            })
25            .collect();
26        Self(hashes)
27    }
28
29    fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
30        let hashes: Vec<u64> = (0..chunk.column_count())
31            .map(|col| {
32                chunk
33                    .column(col)
34                    .and_then(|c| c.get_value(row))
35                    .map_or(0, |v| hash_value(&v))
36            })
37            .collect();
38        Self(hashes)
39    }
40}
41
42fn hash_value(value: &Value) -> u64 {
43    use std::collections::hash_map::DefaultHasher;
44    use std::hash::{Hash, Hasher};
45
46    let mut hasher = DefaultHasher::new();
47    match value {
48        Value::Null => 0u8.hash(&mut hasher),
49        Value::Bool(b) => b.hash(&mut hasher),
50        Value::Int64(i) => i.hash(&mut hasher),
51        Value::Float64(f) => f.to_bits().hash(&mut hasher),
52        Value::String(s) => s.hash(&mut hasher),
53        _ => 0u8.hash(&mut hasher),
54    }
55    hasher.finish()
56}
57
58/// Push-based distinct operator.
59///
60/// Filters out duplicate rows based on all columns or specified columns.
61/// This operator maintains state (seen values) but can produce output
62/// incrementally as new unique rows arrive.
63pub struct DistinctPushOperator {
64    /// Columns to check for distinctness (None = all columns).
65    columns: Option<Vec<usize>>,
66    /// Set of seen row hashes.
67    seen: HashSet<RowKey>,
68}
69
70impl DistinctPushOperator {
71    /// Create a distinct operator on all columns.
72    pub fn new() -> Self {
73        Self {
74            columns: None,
75            seen: HashSet::new(),
76        }
77    }
78
79    /// Create a distinct operator on specific columns.
80    pub fn on_columns(columns: Vec<usize>) -> Self {
81        Self {
82            columns: Some(columns),
83            seen: HashSet::new(),
84        }
85    }
86
87    /// Get the number of unique rows seen.
88    pub fn unique_count(&self) -> usize {
89        self.seen.len()
90    }
91}
92
93impl Default for DistinctPushOperator {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl PushOperator for DistinctPushOperator {
100    fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
101        if chunk.is_empty() {
102            return Ok(true);
103        }
104
105        // Find rows that are new (not seen before)
106        let mut new_indices = Vec::new();
107
108        for row in chunk.selected_indices() {
109            let key = match &self.columns {
110                Some(cols) => RowKey::from_row(&chunk, row, cols),
111                None => RowKey::from_all_columns(&chunk, row),
112            };
113
114            if self.seen.insert(key) {
115                new_indices.push(row);
116            }
117        }
118
119        if new_indices.is_empty() {
120            return Ok(true);
121        }
122
123        // Create filtered chunk with only new rows
124        let selection = SelectionVector::from_predicate(chunk.len(), |i| new_indices.contains(&i));
125        let filtered = chunk.filter(&selection);
126
127        sink.consume(filtered)
128    }
129
130    fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
131        // Nothing to finalize - all output was produced incrementally
132        Ok(())
133    }
134
135    fn preferred_chunk_size(&self) -> ChunkSizeHint {
136        ChunkSizeHint::Default
137    }
138
139    fn name(&self) -> &'static str {
140        "DistinctPush"
141    }
142}
143
144/// Push-based distinct operator that materializes all input first.
145///
146/// This is a true pipeline breaker that buffers all rows and produces
147/// distinct output in the finalize phase. Use this when you need
148/// deterministic ordering of output.
149pub struct DistinctMaterializingOperator {
150    /// Columns to check for distinctness.
151    columns: Option<Vec<usize>>,
152    /// Buffered unique rows.
153    rows: Vec<Vec<Value>>,
154    /// Set of seen row hashes.
155    seen: HashSet<RowKey>,
156    /// Number of columns.
157    num_columns: Option<usize>,
158}
159
160impl DistinctMaterializingOperator {
161    /// Create a distinct operator on all columns.
162    pub fn new() -> Self {
163        Self {
164            columns: None,
165            rows: Vec::new(),
166            seen: HashSet::new(),
167            num_columns: None,
168        }
169    }
170
171    /// Create a distinct operator on specific columns.
172    pub fn on_columns(columns: Vec<usize>) -> Self {
173        Self {
174            columns: Some(columns),
175            rows: Vec::new(),
176            seen: HashSet::new(),
177            num_columns: None,
178        }
179    }
180}
181
182impl Default for DistinctMaterializingOperator {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188impl PushOperator for DistinctMaterializingOperator {
189    fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
190        if chunk.is_empty() {
191            return Ok(true);
192        }
193
194        if self.num_columns.is_none() {
195            self.num_columns = Some(chunk.column_count());
196        }
197
198        let num_cols = chunk.column_count();
199
200        for row in chunk.selected_indices() {
201            let key = match &self.columns {
202                Some(cols) => RowKey::from_row(&chunk, row, cols),
203                None => RowKey::from_all_columns(&chunk, row),
204            };
205
206            if self.seen.insert(key) {
207                // Store the full row
208                let row_values: Vec<Value> = (0..num_cols)
209                    .map(|col| {
210                        chunk
211                            .column(col)
212                            .and_then(|c| c.get_value(row))
213                            .unwrap_or(Value::Null)
214                    })
215                    .collect();
216                self.rows.push(row_values);
217            }
218        }
219
220        Ok(true)
221    }
222
223    fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
224        if self.rows.is_empty() {
225            return Ok(());
226        }
227
228        let num_cols = self.num_columns.unwrap_or(0);
229        let mut columns: Vec<ValueVector> = (0..num_cols).map(|_| ValueVector::new()).collect();
230
231        for row in &self.rows {
232            for (col_idx, col) in columns.iter_mut().enumerate() {
233                let val = row.get(col_idx).cloned().unwrap_or(Value::Null);
234                col.push(val);
235            }
236        }
237
238        let chunk = DataChunk::new(columns);
239        sink.consume(chunk)?;
240
241        Ok(())
242    }
243
244    fn preferred_chunk_size(&self) -> ChunkSizeHint {
245        ChunkSizeHint::Default
246    }
247
248    fn name(&self) -> &'static str {
249        "DistinctMaterializing"
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use crate::execution::sink::CollectorSink;
257
258    fn create_test_chunk(values: &[i64]) -> DataChunk {
259        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
260        let vector = ValueVector::from_values(&v);
261        DataChunk::new(vec![vector])
262    }
263
264    #[test]
265    fn test_distinct_all_unique() {
266        let mut distinct = DistinctPushOperator::new();
267        let mut sink = CollectorSink::new();
268
269        distinct
270            .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
271            .unwrap();
272        distinct.finalize(&mut sink).unwrap();
273
274        assert_eq!(sink.row_count(), 5);
275        assert_eq!(distinct.unique_count(), 5);
276    }
277
278    #[test]
279    fn test_distinct_with_duplicates() {
280        let mut distinct = DistinctPushOperator::new();
281        let mut sink = CollectorSink::new();
282
283        distinct
284            .push(create_test_chunk(&[1, 2, 1, 3, 2, 1, 4]), &mut sink)
285            .unwrap();
286        distinct.finalize(&mut sink).unwrap();
287
288        assert_eq!(sink.row_count(), 4); // 1, 2, 3, 4
289        assert_eq!(distinct.unique_count(), 4);
290    }
291
292    #[test]
293    fn test_distinct_all_same() {
294        let mut distinct = DistinctPushOperator::new();
295        let mut sink = CollectorSink::new();
296
297        distinct
298            .push(create_test_chunk(&[5, 5, 5, 5, 5]), &mut sink)
299            .unwrap();
300        distinct.finalize(&mut sink).unwrap();
301
302        assert_eq!(sink.row_count(), 1);
303        assert_eq!(distinct.unique_count(), 1);
304    }
305
306    #[test]
307    fn test_distinct_multiple_chunks() {
308        let mut distinct = DistinctPushOperator::new();
309        let mut sink = CollectorSink::new();
310
311        distinct
312            .push(create_test_chunk(&[1, 2, 3]), &mut sink)
313            .unwrap();
314        distinct
315            .push(create_test_chunk(&[2, 3, 4]), &mut sink)
316            .unwrap();
317        distinct
318            .push(create_test_chunk(&[3, 4, 5]), &mut sink)
319            .unwrap();
320        distinct.finalize(&mut sink).unwrap();
321
322        assert_eq!(sink.row_count(), 5); // 1, 2, 3, 4, 5
323    }
324
325    #[test]
326    fn test_distinct_materializing() {
327        let mut distinct = DistinctMaterializingOperator::new();
328        let mut sink = CollectorSink::new();
329
330        distinct
331            .push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
332            .unwrap();
333        distinct.finalize(&mut sink).unwrap();
334
335        // All output comes in finalize
336        let chunks = sink.into_chunks();
337        assert_eq!(chunks.len(), 1);
338        assert_eq!(chunks[0].len(), 7); // 7 unique values
339    }
340}