Skip to main content

grafeo_core/execution/operators/
distinct.rs

1//! Distinct operator for removing duplicate rows.
2//!
3//! This module provides:
4//! - `DistinctOperator`: Removes duplicate rows based on all or specified columns
5
6use std::collections::HashSet;
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14/// A row key for duplicate detection.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20    Null,
21    Bool(bool),
22    Int64(i64),
23    String(String),
24}
25
26impl RowKey {
27    /// Creates a row key from specified columns.
28    fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29        let parts: Vec<KeyPart> = columns
30            .iter()
31            .map(|&col_idx| {
32                chunk
33                    .column(col_idx)
34                    .and_then(|col| col.get_value(row))
35                    .map_or(KeyPart::Null, |v| match v {
36                        Value::Null => KeyPart::Null,
37                        Value::Bool(b) => KeyPart::Bool(b),
38                        Value::Int64(i) => KeyPart::Int64(i),
39                        // reason: intentional bit-level reinterpretation for equality comparison
40                        #[allow(clippy::cast_possible_wrap)]
41                        Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
42                        Value::String(s) => KeyPart::String(s.to_string()),
43                        _ => KeyPart::String(format!("{v:?}")),
44                    })
45            })
46            .collect();
47        RowKey(parts)
48    }
49
50    /// Creates a row key from all columns.
51    fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
52        let columns: Vec<usize> = (0..chunk.column_count()).collect();
53        Self::from_row(chunk, row, &columns)
54    }
55}
56
57/// Distinct operator.
58///
59/// Removes duplicate rows from the input. Can operate on all columns or a subset.
60pub struct DistinctOperator {
61    /// Child operator.
62    child: Box<dyn Operator>,
63    /// Columns to consider for uniqueness (None = all columns).
64    distinct_columns: Option<Vec<usize>>,
65    /// Output schema.
66    output_schema: Vec<LogicalType>,
67    /// Set of seen row keys.
68    seen: HashSet<RowKey>,
69}
70
71impl DistinctOperator {
72    /// Creates a new distinct operator that considers all columns.
73    pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
74        Self {
75            child,
76            distinct_columns: None,
77            output_schema,
78            seen: HashSet::new(),
79        }
80    }
81
82    /// Decomposes this operator for push-based conversion.
83    pub fn into_parts(self) -> (Box<dyn Operator>, Option<Vec<usize>>) {
84        (self.child, self.distinct_columns)
85    }
86
87    /// Creates a distinct operator that considers only specified columns.
88    pub fn on_columns(
89        child: Box<dyn Operator>,
90        columns: Vec<usize>,
91        output_schema: Vec<LogicalType>,
92    ) -> Self {
93        Self {
94            child,
95            distinct_columns: Some(columns),
96            output_schema,
97            seen: HashSet::new(),
98        }
99    }
100}
101
102impl Operator for DistinctOperator {
103    fn next(&mut self) -> OperatorResult {
104        loop {
105            let Some(chunk) = self.child.next()? else {
106                return Ok(None);
107            };
108
109            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
110
111            for row in chunk.selected_indices() {
112                let key = match &self.distinct_columns {
113                    Some(cols) => RowKey::from_row(&chunk, row, cols),
114                    None => RowKey::from_all_columns(&chunk, row),
115                };
116
117                if self.seen.insert(key) {
118                    // New unique row - copy it
119                    for col_idx in 0..chunk.column_count() {
120                        if let (Some(src_col), Some(dst_col)) =
121                            (chunk.column(col_idx), builder.column_mut(col_idx))
122                        {
123                            if let Some(value) = src_col.get_value(row) {
124                                dst_col.push_value(value);
125                            } else {
126                                dst_col.push_value(Value::Null);
127                            }
128                        }
129                    }
130                    builder.advance_row();
131
132                    if builder.is_full() {
133                        return Ok(Some(builder.finish()));
134                    }
135                }
136            }
137
138            if builder.row_count() > 0 {
139                return Ok(Some(builder.finish()));
140            }
141            // If no unique rows in this chunk, continue to next
142        }
143    }
144
145    fn reset(&mut self) {
146        self.child.reset();
147        self.seen.clear();
148    }
149
150    fn name(&self) -> &'static str {
151        "Distinct"
152    }
153
154    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
155        self
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::execution::chunk::DataChunkBuilder;
163
164    struct MockOperator {
165        chunks: Vec<DataChunk>,
166        position: usize,
167    }
168
169    impl MockOperator {
170        fn new(chunks: Vec<DataChunk>) -> Self {
171            Self {
172                chunks,
173                position: 0,
174            }
175        }
176    }
177
178    impl Operator for MockOperator {
179        fn next(&mut self) -> OperatorResult {
180            if self.position < self.chunks.len() {
181                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
182                self.position += 1;
183                Ok(Some(chunk))
184            } else {
185                Ok(None)
186            }
187        }
188
189        fn reset(&mut self) {
190            self.position = 0;
191        }
192
193        fn name(&self) -> &'static str {
194            "Mock"
195        }
196
197        fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
198            self
199        }
200    }
201
202    fn create_chunk_with_duplicates() -> DataChunk {
203        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
204
205        let data = [
206            (1i64, "a"),
207            (2, "b"),
208            (1, "a"), // Duplicate
209            (3, "c"),
210            (2, "b"), // Duplicate
211            (1, "a"), // Duplicate
212        ];
213
214        for (num, text) in data {
215            builder.column_mut(0).unwrap().push_int64(num);
216            builder.column_mut(1).unwrap().push_string(text);
217            builder.advance_row();
218        }
219
220        builder.finish()
221    }
222
223    #[test]
224    fn test_distinct_all_columns() {
225        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
226
227        let mut distinct = DistinctOperator::new(
228            Box::new(mock),
229            vec![LogicalType::Int64, LogicalType::String],
230        );
231
232        let mut results = Vec::new();
233        while let Some(chunk) = distinct.next().unwrap() {
234            for row in chunk.selected_indices() {
235                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
236                let text = chunk
237                    .column(1)
238                    .unwrap()
239                    .get_string(row)
240                    .unwrap()
241                    .to_string();
242                results.push((num, text));
243            }
244        }
245
246        // Should have 3 unique rows
247        assert_eq!(results.len(), 3);
248
249        // Sort for consistent comparison
250        results.sort();
251        assert_eq!(
252            results,
253            vec![
254                (1, "a".to_string()),
255                (2, "b".to_string()),
256                (3, "c".to_string()),
257            ]
258        );
259    }
260
261    #[test]
262    fn test_distinct_single_column() {
263        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
264
265        let mut distinct = DistinctOperator::on_columns(
266            Box::new(mock),
267            vec![0], // Only consider first column
268            vec![LogicalType::Int64, LogicalType::String],
269        );
270
271        let mut results = Vec::new();
272        while let Some(chunk) = distinct.next().unwrap() {
273            for row in chunk.selected_indices() {
274                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
275                results.push(num);
276            }
277        }
278
279        // Should have 3 unique values in column 0
280        results.sort_unstable();
281        assert_eq!(results, vec![1, 2, 3]);
282    }
283
284    #[test]
285    fn test_distinct_across_chunks() {
286        // Create two chunks with overlapping values
287        let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
288        for i in [1, 2, 3] {
289            builder1.column_mut(0).unwrap().push_int64(i);
290            builder1.advance_row();
291        }
292
293        let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
294        for i in [2, 3, 4] {
295            builder2.column_mut(0).unwrap().push_int64(i);
296            builder2.advance_row();
297        }
298
299        let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
300
301        let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
302
303        let mut results = Vec::new();
304        while let Some(chunk) = distinct.next().unwrap() {
305            for row in chunk.selected_indices() {
306                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
307                results.push(num);
308            }
309        }
310
311        // Should have 4 unique values: 1, 2, 3, 4
312        results.sort_unstable();
313        assert_eq!(results, vec![1, 2, 3, 4]);
314    }
315
316    #[test]
317    fn test_distinct_into_any() {
318        let mock = MockOperator::new(vec![]);
319        let op = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
320        let any = Box::new(op).into_any();
321        assert!(any.downcast::<DistinctOperator>().is_ok());
322    }
323
324    #[test]
325    fn test_distinct_into_parts() {
326        let mock = MockOperator::new(vec![]);
327        let op = DistinctOperator::on_columns(
328            Box::new(mock),
329            vec![0, 2],
330            vec![LogicalType::Int64, LogicalType::String, LogicalType::Int64],
331        );
332        let (mut child, distinct_columns) = op.into_parts();
333        assert_eq!(distinct_columns, Some(vec![0, 2]));
334        assert!(child.next().unwrap().is_none());
335    }
336
337    #[test]
338    fn test_distinct_into_parts_all_columns() {
339        let mock = MockOperator::new(vec![]);
340        let op = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
341        let (_child, distinct_columns) = op.into_parts();
342        assert!(distinct_columns.is_none());
343    }
344}