Skip to main content

kyu_executor/operators/
distinct.rs

1//! Distinct operator — hash-based deduplication.
2
3use hashbrown::HashSet;
4use kyu_common::KyuResult;
5use kyu_types::TypedValue;
6
7use crate::context::ExecutionContext;
8use crate::data_chunk::DataChunk;
9use crate::physical_plan::PhysicalOperator;
10
11pub struct DistinctOp {
12    pub child: Box<PhysicalOperator>,
13    seen: HashSet<Vec<TypedValue>>,
14}
15
16impl DistinctOp {
17    pub fn new(child: PhysicalOperator) -> Self {
18        Self {
19            child: Box::new(child),
20            seen: HashSet::new(),
21        }
22    }
23
24    pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
25        loop {
26            let chunk = match self.child.next(ctx)? {
27                Some(c) => c,
28                None => return Ok(None),
29            };
30
31            let num_cols = chunk.num_columns();
32            let mut result = DataChunk::with_capacity(num_cols, chunk.num_rows());
33
34            for row_idx in 0..chunk.num_rows() {
35                let row = chunk.get_row(row_idx);
36                if self.seen.insert(row) {
37                    result.append_row_from_chunk(&chunk, row_idx);
38                }
39            }
40
41            if !result.is_empty() {
42                return Ok(Some(result));
43            }
44        }
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use super::*;
51    use crate::context::MockStorage;
52    use kyu_types::TypedValue;
53
54    #[test]
55    fn distinct_dedup() {
56        let mut storage = MockStorage::new();
57        storage.insert_table(
58            kyu_common::id::TableId(0),
59            vec![
60                vec![TypedValue::Int64(1)],
61                vec![TypedValue::Int64(2)],
62                vec![TypedValue::Int64(1)],
63                vec![TypedValue::Int64(3)],
64                vec![TypedValue::Int64(2)],
65            ],
66        );
67        let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
68
69        let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
70            kyu_common::id::TableId(0),
71        ));
72        let mut distinct = DistinctOp::new(scan);
73        let chunk = distinct.next(&ctx).unwrap().unwrap();
74        assert_eq!(chunk.num_rows(), 3);
75    }
76}