Skip to main content

grafeo_core/execution/operators/
distinct.rs

1//! Distinct operator for removing duplicate rows.
2//!
3//! This module provides:
4//! - `DistinctOperator`: Removes duplicate rows based on all or specified columns
5
6use std::collections::HashSet;
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14/// A row key for duplicate detection.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20    Null,
21    Bool(bool),
22    Int64(i64),
23    String(String),
24}
25
26impl RowKey {
27    /// Creates a row key from specified columns.
28    fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29        let parts: Vec<KeyPart> = columns
30            .iter()
31            .map(|&col_idx| {
32                chunk
33                    .column(col_idx)
34                    .and_then(|col| col.get_value(row))
35                    .map(|v| match v {
36                        Value::Null => KeyPart::Null,
37                        Value::Bool(b) => KeyPart::Bool(b),
38                        Value::Int64(i) => KeyPart::Int64(i),
39                        Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
40                        Value::String(s) => KeyPart::String(s.to_string()),
41                        _ => KeyPart::String(format!("{v:?}")),
42                    })
43                    .unwrap_or(KeyPart::Null)
44            })
45            .collect();
46        RowKey(parts)
47    }
48
49    /// Creates a row key from all columns.
50    fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
51        let columns: Vec<usize> = (0..chunk.column_count()).collect();
52        Self::from_row(chunk, row, &columns)
53    }
54}
55
56/// Distinct operator.
57///
58/// Removes duplicate rows from the input. Can operate on all columns or a subset.
59pub struct DistinctOperator {
60    /// Child operator.
61    child: Box<dyn Operator>,
62    /// Columns to consider for uniqueness (None = all columns).
63    distinct_columns: Option<Vec<usize>>,
64    /// Output schema.
65    output_schema: Vec<LogicalType>,
66    /// Set of seen row keys.
67    seen: HashSet<RowKey>,
68}
69
70impl DistinctOperator {
71    /// Creates a new distinct operator that considers all columns.
72    pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
73        Self {
74            child,
75            distinct_columns: None,
76            output_schema,
77            seen: HashSet::new(),
78        }
79    }
80
81    /// Creates a distinct operator that considers only specified columns.
82    pub fn on_columns(
83        child: Box<dyn Operator>,
84        columns: Vec<usize>,
85        output_schema: Vec<LogicalType>,
86    ) -> Self {
87        Self {
88            child,
89            distinct_columns: Some(columns),
90            output_schema,
91            seen: HashSet::new(),
92        }
93    }
94}
95
96impl Operator for DistinctOperator {
97    fn next(&mut self) -> OperatorResult {
98        loop {
99            let Some(chunk) = self.child.next()? else {
100                return Ok(None);
101            };
102
103            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
104
105            for row in chunk.selected_indices() {
106                let key = match &self.distinct_columns {
107                    Some(cols) => RowKey::from_row(&chunk, row, cols),
108                    None => RowKey::from_all_columns(&chunk, row),
109                };
110
111                if self.seen.insert(key) {
112                    // New unique row - copy it
113                    for col_idx in 0..chunk.column_count() {
114                        if let (Some(src_col), Some(dst_col)) =
115                            (chunk.column(col_idx), builder.column_mut(col_idx))
116                        {
117                            if let Some(value) = src_col.get_value(row) {
118                                dst_col.push_value(value);
119                            } else {
120                                dst_col.push_value(Value::Null);
121                            }
122                        }
123                    }
124                    builder.advance_row();
125
126                    if builder.is_full() {
127                        return Ok(Some(builder.finish()));
128                    }
129                }
130            }
131
132            if builder.row_count() > 0 {
133                return Ok(Some(builder.finish()));
134            }
135            // If no unique rows in this chunk, continue to next
136        }
137    }
138
139    fn reset(&mut self) {
140        self.child.reset();
141        self.seen.clear();
142    }
143
144    fn name(&self) -> &'static str {
145        "Distinct"
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::execution::chunk::DataChunkBuilder;
153
154    struct MockOperator {
155        chunks: Vec<DataChunk>,
156        position: usize,
157    }
158
159    impl MockOperator {
160        fn new(chunks: Vec<DataChunk>) -> Self {
161            Self {
162                chunks,
163                position: 0,
164            }
165        }
166    }
167
168    impl Operator for MockOperator {
169        fn next(&mut self) -> OperatorResult {
170            if self.position < self.chunks.len() {
171                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
172                self.position += 1;
173                Ok(Some(chunk))
174            } else {
175                Ok(None)
176            }
177        }
178
179        fn reset(&mut self) {
180            self.position = 0;
181        }
182
183        fn name(&self) -> &'static str {
184            "Mock"
185        }
186    }
187
188    fn create_chunk_with_duplicates() -> DataChunk {
189        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
190
191        let data = [
192            (1i64, "a"),
193            (2, "b"),
194            (1, "a"), // Duplicate
195            (3, "c"),
196            (2, "b"), // Duplicate
197            (1, "a"), // Duplicate
198        ];
199
200        for (num, text) in data {
201            builder.column_mut(0).unwrap().push_int64(num);
202            builder.column_mut(1).unwrap().push_string(text);
203            builder.advance_row();
204        }
205
206        builder.finish()
207    }
208
209    #[test]
210    fn test_distinct_all_columns() {
211        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
212
213        let mut distinct = DistinctOperator::new(
214            Box::new(mock),
215            vec![LogicalType::Int64, LogicalType::String],
216        );
217
218        let mut results = Vec::new();
219        while let Some(chunk) = distinct.next().unwrap() {
220            for row in chunk.selected_indices() {
221                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
222                let text = chunk
223                    .column(1)
224                    .unwrap()
225                    .get_string(row)
226                    .unwrap()
227                    .to_string();
228                results.push((num, text));
229            }
230        }
231
232        // Should have 3 unique rows
233        assert_eq!(results.len(), 3);
234
235        // Sort for consistent comparison
236        results.sort();
237        assert_eq!(
238            results,
239            vec![
240                (1, "a".to_string()),
241                (2, "b".to_string()),
242                (3, "c".to_string()),
243            ]
244        );
245    }
246
247    #[test]
248    fn test_distinct_single_column() {
249        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
250
251        let mut distinct = DistinctOperator::on_columns(
252            Box::new(mock),
253            vec![0], // Only consider first column
254            vec![LogicalType::Int64, LogicalType::String],
255        );
256
257        let mut results = Vec::new();
258        while let Some(chunk) = distinct.next().unwrap() {
259            for row in chunk.selected_indices() {
260                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
261                results.push(num);
262            }
263        }
264
265        // Should have 3 unique values in column 0
266        results.sort();
267        assert_eq!(results, vec![1, 2, 3]);
268    }
269
270    #[test]
271    fn test_distinct_across_chunks() {
272        // Create two chunks with overlapping values
273        let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
274        for i in [1, 2, 3] {
275            builder1.column_mut(0).unwrap().push_int64(i);
276            builder1.advance_row();
277        }
278
279        let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
280        for i in [2, 3, 4] {
281            builder2.column_mut(0).unwrap().push_int64(i);
282            builder2.advance_row();
283        }
284
285        let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
286
287        let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
288
289        let mut results = Vec::new();
290        while let Some(chunk) = distinct.next().unwrap() {
291            for row in chunk.selected_indices() {
292                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
293                results.push(num);
294            }
295        }
296
297        // Should have 4 unique values: 1, 2, 3, 4
298        results.sort();
299        assert_eq!(results, vec![1, 2, 3, 4]);
300    }
301}