Skip to main content

grafeo_core/execution/operators/
distinct.rs

1//! Distinct operator for removing duplicate rows.
2//!
3//! This module provides:
4//! - `DistinctOperator`: Removes duplicate rows based on all or specified columns
5
6use std::collections::HashSet;
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14/// A row key for duplicate detection.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20    Null,
21    Bool(bool),
22    Int64(i64),
23    String(String),
24}
25
26impl RowKey {
27    /// Creates a row key from specified columns.
28    fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29        let parts: Vec<KeyPart> = columns
30            .iter()
31            .map(|&col_idx| {
32                chunk
33                    .column(col_idx)
34                    .and_then(|col| col.get_value(row))
35                    .map_or(KeyPart::Null, |v| match v {
36                        Value::Null => KeyPart::Null,
37                        Value::Bool(b) => KeyPart::Bool(b),
38                        Value::Int64(i) => KeyPart::Int64(i),
39                        Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
40                        Value::String(s) => KeyPart::String(s.to_string()),
41                        _ => KeyPart::String(format!("{v:?}")),
42                    })
43            })
44            .collect();
45        RowKey(parts)
46    }
47
48    /// Creates a row key from all columns.
49    fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
50        let columns: Vec<usize> = (0..chunk.column_count()).collect();
51        Self::from_row(chunk, row, &columns)
52    }
53}
54
55/// Distinct operator.
56///
57/// Removes duplicate rows from the input. Can operate on all columns or a subset.
58pub struct DistinctOperator {
59    /// Child operator.
60    child: Box<dyn Operator>,
61    /// Columns to consider for uniqueness (None = all columns).
62    distinct_columns: Option<Vec<usize>>,
63    /// Output schema.
64    output_schema: Vec<LogicalType>,
65    /// Set of seen row keys.
66    seen: HashSet<RowKey>,
67}
68
69impl DistinctOperator {
70    /// Creates a new distinct operator that considers all columns.
71    pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
72        Self {
73            child,
74            distinct_columns: None,
75            output_schema,
76            seen: HashSet::new(),
77        }
78    }
79
80    /// Creates a distinct operator that considers only specified columns.
81    pub fn on_columns(
82        child: Box<dyn Operator>,
83        columns: Vec<usize>,
84        output_schema: Vec<LogicalType>,
85    ) -> Self {
86        Self {
87            child,
88            distinct_columns: Some(columns),
89            output_schema,
90            seen: HashSet::new(),
91        }
92    }
93}
94
95impl Operator for DistinctOperator {
96    fn next(&mut self) -> OperatorResult {
97        loop {
98            let Some(chunk) = self.child.next()? else {
99                return Ok(None);
100            };
101
102            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
103
104            for row in chunk.selected_indices() {
105                let key = match &self.distinct_columns {
106                    Some(cols) => RowKey::from_row(&chunk, row, cols),
107                    None => RowKey::from_all_columns(&chunk, row),
108                };
109
110                if self.seen.insert(key) {
111                    // New unique row - copy it
112                    for col_idx in 0..chunk.column_count() {
113                        if let (Some(src_col), Some(dst_col)) =
114                            (chunk.column(col_idx), builder.column_mut(col_idx))
115                        {
116                            if let Some(value) = src_col.get_value(row) {
117                                dst_col.push_value(value);
118                            } else {
119                                dst_col.push_value(Value::Null);
120                            }
121                        }
122                    }
123                    builder.advance_row();
124
125                    if builder.is_full() {
126                        return Ok(Some(builder.finish()));
127                    }
128                }
129            }
130
131            if builder.row_count() > 0 {
132                return Ok(Some(builder.finish()));
133            }
134            // If no unique rows in this chunk, continue to next
135        }
136    }
137
138    fn reset(&mut self) {
139        self.child.reset();
140        self.seen.clear();
141    }
142
143    fn name(&self) -> &'static str {
144        "Distinct"
145    }
146}
147
148#[cfg(test)]
149mod tests {
150    use super::*;
151    use crate::execution::chunk::DataChunkBuilder;
152
153    struct MockOperator {
154        chunks: Vec<DataChunk>,
155        position: usize,
156    }
157
158    impl MockOperator {
159        fn new(chunks: Vec<DataChunk>) -> Self {
160            Self {
161                chunks,
162                position: 0,
163            }
164        }
165    }
166
167    impl Operator for MockOperator {
168        fn next(&mut self) -> OperatorResult {
169            if self.position < self.chunks.len() {
170                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
171                self.position += 1;
172                Ok(Some(chunk))
173            } else {
174                Ok(None)
175            }
176        }
177
178        fn reset(&mut self) {
179            self.position = 0;
180        }
181
182        fn name(&self) -> &'static str {
183            "Mock"
184        }
185    }
186
187    fn create_chunk_with_duplicates() -> DataChunk {
188        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
189
190        let data = [
191            (1i64, "a"),
192            (2, "b"),
193            (1, "a"), // Duplicate
194            (3, "c"),
195            (2, "b"), // Duplicate
196            (1, "a"), // Duplicate
197        ];
198
199        for (num, text) in data {
200            builder.column_mut(0).unwrap().push_int64(num);
201            builder.column_mut(1).unwrap().push_string(text);
202            builder.advance_row();
203        }
204
205        builder.finish()
206    }
207
208    #[test]
209    fn test_distinct_all_columns() {
210        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
211
212        let mut distinct = DistinctOperator::new(
213            Box::new(mock),
214            vec![LogicalType::Int64, LogicalType::String],
215        );
216
217        let mut results = Vec::new();
218        while let Some(chunk) = distinct.next().unwrap() {
219            for row in chunk.selected_indices() {
220                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
221                let text = chunk
222                    .column(1)
223                    .unwrap()
224                    .get_string(row)
225                    .unwrap()
226                    .to_string();
227                results.push((num, text));
228            }
229        }
230
231        // Should have 3 unique rows
232        assert_eq!(results.len(), 3);
233
234        // Sort for consistent comparison
235        results.sort();
236        assert_eq!(
237            results,
238            vec![
239                (1, "a".to_string()),
240                (2, "b".to_string()),
241                (3, "c".to_string()),
242            ]
243        );
244    }
245
246    #[test]
247    fn test_distinct_single_column() {
248        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
249
250        let mut distinct = DistinctOperator::on_columns(
251            Box::new(mock),
252            vec![0], // Only consider first column
253            vec![LogicalType::Int64, LogicalType::String],
254        );
255
256        let mut results = Vec::new();
257        while let Some(chunk) = distinct.next().unwrap() {
258            for row in chunk.selected_indices() {
259                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
260                results.push(num);
261            }
262        }
263
264        // Should have 3 unique values in column 0
265        results.sort_unstable();
266        assert_eq!(results, vec![1, 2, 3]);
267    }
268
269    #[test]
270    fn test_distinct_across_chunks() {
271        // Create two chunks with overlapping values
272        let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
273        for i in [1, 2, 3] {
274            builder1.column_mut(0).unwrap().push_int64(i);
275            builder1.advance_row();
276        }
277
278        let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
279        for i in [2, 3, 4] {
280            builder2.column_mut(0).unwrap().push_int64(i);
281            builder2.advance_row();
282        }
283
284        let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
285
286        let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
287
288        let mut results = Vec::new();
289        while let Some(chunk) = distinct.next().unwrap() {
290            for row in chunk.selected_indices() {
291                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
292                results.push(num);
293            }
294        }
295
296        // Should have 4 unique values: 1, 2, 3, 4
297        results.sort_unstable();
298        assert_eq!(results, vec![1, 2, 3, 4]);
299    }
300}