Skip to main content

graphos_core/execution/operators/
distinct.rs

1//! Distinct operator for removing duplicate rows.
2//!
3//! This module provides:
4//! - `DistinctOperator`: Removes duplicate rows based on all or specified columns
5
6use std::collections::HashSet;
7
8use graphos_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14/// A row key for duplicate detection.
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20    Null,
21    Bool(bool),
22    Int64(i64),
23    String(String),
24}
25
26impl RowKey {
27    /// Creates a row key from specified columns.
28    fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29        let parts: Vec<KeyPart> = columns
30            .iter()
31            .map(|&col_idx| {
32                chunk
33                    .column(col_idx)
34                    .and_then(|col| col.get_value(row))
35                    .map(|v| match v {
36                        Value::Null => KeyPart::Null,
37                        Value::Bool(b) => KeyPart::Bool(b),
38                        Value::Int64(i) => KeyPart::Int64(i),
39                        Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
40                        Value::String(s) => KeyPart::String(s.to_string()),
41                        _ => KeyPart::String(format!("{v:?}")),
42                    })
43                    .unwrap_or(KeyPart::Null)
44            })
45            .collect();
46        RowKey(parts)
47    }
48
49    /// Creates a row key from all columns.
50    fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
51        let columns: Vec<usize> = (0..chunk.column_count()).collect();
52        Self::from_row(chunk, row, &columns)
53    }
54}
55
56/// Distinct operator.
57///
58/// Removes duplicate rows from the input. Can operate on all columns or a subset.
59pub struct DistinctOperator {
60    /// Child operator.
61    child: Box<dyn Operator>,
62    /// Columns to consider for uniqueness (None = all columns).
63    distinct_columns: Option<Vec<usize>>,
64    /// Output schema.
65    output_schema: Vec<LogicalType>,
66    /// Set of seen row keys.
67    seen: HashSet<RowKey>,
68}
69
70impl DistinctOperator {
71    /// Creates a new distinct operator that considers all columns.
72    pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
73        Self {
74            child,
75            distinct_columns: None,
76            output_schema,
77            seen: HashSet::new(),
78        }
79    }
80
81    /// Creates a distinct operator that considers only specified columns.
82    pub fn on_columns(
83        child: Box<dyn Operator>,
84        columns: Vec<usize>,
85        output_schema: Vec<LogicalType>,
86    ) -> Self {
87        Self {
88            child,
89            distinct_columns: Some(columns),
90            output_schema,
91            seen: HashSet::new(),
92        }
93    }
94}
95
96impl Operator for DistinctOperator {
97    fn next(&mut self) -> OperatorResult {
98        loop {
99            let chunk = match self.child.next()? {
100                Some(c) => c,
101                None => return Ok(None),
102            };
103
104            let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
105
106            for row in chunk.selected_indices() {
107                let key = match &self.distinct_columns {
108                    Some(cols) => RowKey::from_row(&chunk, row, cols),
109                    None => RowKey::from_all_columns(&chunk, row),
110                };
111
112                if self.seen.insert(key) {
113                    // New unique row - copy it
114                    for col_idx in 0..chunk.column_count() {
115                        if let (Some(src_col), Some(dst_col)) =
116                            (chunk.column(col_idx), builder.column_mut(col_idx))
117                        {
118                            if let Some(value) = src_col.get_value(row) {
119                                dst_col.push_value(value);
120                            } else {
121                                dst_col.push_value(Value::Null);
122                            }
123                        }
124                    }
125                    builder.advance_row();
126
127                    if builder.is_full() {
128                        return Ok(Some(builder.finish()));
129                    }
130                }
131            }
132
133            if builder.row_count() > 0 {
134                return Ok(Some(builder.finish()));
135            }
136            // If no unique rows in this chunk, continue to next
137        }
138    }
139
140    fn reset(&mut self) {
141        self.child.reset();
142        self.seen.clear();
143    }
144
145    fn name(&self) -> &'static str {
146        "Distinct"
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use crate::execution::chunk::DataChunkBuilder;
154
155    struct MockOperator {
156        chunks: Vec<DataChunk>,
157        position: usize,
158    }
159
160    impl MockOperator {
161        fn new(chunks: Vec<DataChunk>) -> Self {
162            Self {
163                chunks,
164                position: 0,
165            }
166        }
167    }
168
169    impl Operator for MockOperator {
170        fn next(&mut self) -> OperatorResult {
171            if self.position < self.chunks.len() {
172                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
173                self.position += 1;
174                Ok(Some(chunk))
175            } else {
176                Ok(None)
177            }
178        }
179
180        fn reset(&mut self) {
181            self.position = 0;
182        }
183
184        fn name(&self) -> &'static str {
185            "Mock"
186        }
187    }
188
189    fn create_chunk_with_duplicates() -> DataChunk {
190        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
191
192        let data = [
193            (1i64, "a"),
194            (2, "b"),
195            (1, "a"), // Duplicate
196            (3, "c"),
197            (2, "b"), // Duplicate
198            (1, "a"), // Duplicate
199        ];
200
201        for (num, text) in data {
202            builder.column_mut(0).unwrap().push_int64(num);
203            builder.column_mut(1).unwrap().push_string(text);
204            builder.advance_row();
205        }
206
207        builder.finish()
208    }
209
210    #[test]
211    fn test_distinct_all_columns() {
212        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
213
214        let mut distinct = DistinctOperator::new(
215            Box::new(mock),
216            vec![LogicalType::Int64, LogicalType::String],
217        );
218
219        let mut results = Vec::new();
220        while let Some(chunk) = distinct.next().unwrap() {
221            for row in chunk.selected_indices() {
222                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
223                let text = chunk
224                    .column(1)
225                    .unwrap()
226                    .get_string(row)
227                    .unwrap()
228                    .to_string();
229                results.push((num, text));
230            }
231        }
232
233        // Should have 3 unique rows
234        assert_eq!(results.len(), 3);
235
236        // Sort for consistent comparison
237        results.sort();
238        assert_eq!(
239            results,
240            vec![
241                (1, "a".to_string()),
242                (2, "b".to_string()),
243                (3, "c".to_string()),
244            ]
245        );
246    }
247
248    #[test]
249    fn test_distinct_single_column() {
250        let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
251
252        let mut distinct = DistinctOperator::on_columns(
253            Box::new(mock),
254            vec![0], // Only consider first column
255            vec![LogicalType::Int64, LogicalType::String],
256        );
257
258        let mut results = Vec::new();
259        while let Some(chunk) = distinct.next().unwrap() {
260            for row in chunk.selected_indices() {
261                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
262                results.push(num);
263            }
264        }
265
266        // Should have 3 unique values in column 0
267        results.sort();
268        assert_eq!(results, vec![1, 2, 3]);
269    }
270
271    #[test]
272    fn test_distinct_across_chunks() {
273        // Create two chunks with overlapping values
274        let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
275        for i in [1, 2, 3] {
276            builder1.column_mut(0).unwrap().push_int64(i);
277            builder1.advance_row();
278        }
279
280        let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
281        for i in [2, 3, 4] {
282            builder2.column_mut(0).unwrap().push_int64(i);
283            builder2.advance_row();
284        }
285
286        let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
287
288        let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
289
290        let mut results = Vec::new();
291        while let Some(chunk) = distinct.next().unwrap() {
292            for row in chunk.selected_indices() {
293                let num = chunk.column(0).unwrap().get_int64(row).unwrap();
294                results.push(num);
295            }
296        }
297
298        // Should have 4 unique values: 1, 2, 3, 4
299        results.sort();
300        assert_eq!(results, vec![1, 2, 3, 4]);
301    }
302}