kyu_executor/operators/
distinct.rs1use hashbrown::HashSet;
4use kyu_common::KyuResult;
5use kyu_types::TypedValue;
6
7use crate::context::ExecutionContext;
8use crate::data_chunk::DataChunk;
9use crate::physical_plan::PhysicalOperator;
10
11pub struct DistinctOp {
12 pub child: Box<PhysicalOperator>,
13 seen: HashSet<Vec<TypedValue>>,
14}
15
16impl DistinctOp {
17 pub fn new(child: PhysicalOperator) -> Self {
18 Self {
19 child: Box::new(child),
20 seen: HashSet::new(),
21 }
22 }
23
24 pub fn next(&mut self, ctx: &ExecutionContext<'_>) -> KyuResult<Option<DataChunk>> {
25 loop {
26 let chunk = match self.child.next(ctx)? {
27 Some(c) => c,
28 None => return Ok(None),
29 };
30
31 let num_cols = chunk.num_columns();
32 let mut result = DataChunk::with_capacity(num_cols, chunk.num_rows());
33
34 for row_idx in 0..chunk.num_rows() {
35 let row = chunk.get_row(row_idx);
36 if self.seen.insert(row) {
37 result.append_row_from_chunk(&chunk, row_idx);
38 }
39 }
40
41 if !result.is_empty() {
42 return Ok(Some(result));
43 }
44 }
45 }
46}
47
48#[cfg(test)]
49mod tests {
50 use super::*;
51 use crate::context::MockStorage;
52 use kyu_types::TypedValue;
53
54 #[test]
55 fn distinct_dedup() {
56 let mut storage = MockStorage::new();
57 storage.insert_table(
58 kyu_common::id::TableId(0),
59 vec![
60 vec![TypedValue::Int64(1)],
61 vec![TypedValue::Int64(2)],
62 vec![TypedValue::Int64(1)],
63 vec![TypedValue::Int64(3)],
64 vec![TypedValue::Int64(2)],
65 ],
66 );
67 let ctx = ExecutionContext::new(kyu_catalog::CatalogContent::new(), &storage);
68
69 let scan = PhysicalOperator::ScanNode(crate::operators::scan::ScanNodeOp::new(
70 kyu_common::id::TableId(0),
71 ));
72 let mut distinct = DistinctOp::new(scan);
73 let chunk = distinct.next(&ctx).unwrap().unwrap();
74 assert_eq!(chunk.num_rows(), 3);
75 }
76}