grafeo_core/execution/operators/
distinct.rs1use std::collections::HashSet;
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20 Null,
21 Bool(bool),
22 Int64(i64),
23 String(String),
24}
25
26impl RowKey {
27 fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29 let parts: Vec<KeyPart> = columns
30 .iter()
31 .map(|&col_idx| {
32 chunk
33 .column(col_idx)
34 .and_then(|col| col.get_value(row))
35 .map(|v| match v {
36 Value::Null => KeyPart::Null,
37 Value::Bool(b) => KeyPart::Bool(b),
38 Value::Int64(i) => KeyPart::Int64(i),
39 Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
40 Value::String(s) => KeyPart::String(s.to_string()),
41 _ => KeyPart::String(format!("{v:?}")),
42 })
43 .unwrap_or(KeyPart::Null)
44 })
45 .collect();
46 RowKey(parts)
47 }
48
49 fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
51 let columns: Vec<usize> = (0..chunk.column_count()).collect();
52 Self::from_row(chunk, row, &columns)
53 }
54}
55
56pub struct DistinctOperator {
60 child: Box<dyn Operator>,
62 distinct_columns: Option<Vec<usize>>,
64 output_schema: Vec<LogicalType>,
66 seen: HashSet<RowKey>,
68}
69
70impl DistinctOperator {
71 pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
73 Self {
74 child,
75 distinct_columns: None,
76 output_schema,
77 seen: HashSet::new(),
78 }
79 }
80
81 pub fn on_columns(
83 child: Box<dyn Operator>,
84 columns: Vec<usize>,
85 output_schema: Vec<LogicalType>,
86 ) -> Self {
87 Self {
88 child,
89 distinct_columns: Some(columns),
90 output_schema,
91 seen: HashSet::new(),
92 }
93 }
94}
95
96impl Operator for DistinctOperator {
97 fn next(&mut self) -> OperatorResult {
98 loop {
99 let Some(chunk) = self.child.next()? else {
100 return Ok(None);
101 };
102
103 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
104
105 for row in chunk.selected_indices() {
106 let key = match &self.distinct_columns {
107 Some(cols) => RowKey::from_row(&chunk, row, cols),
108 None => RowKey::from_all_columns(&chunk, row),
109 };
110
111 if self.seen.insert(key) {
112 for col_idx in 0..chunk.column_count() {
114 if let (Some(src_col), Some(dst_col)) =
115 (chunk.column(col_idx), builder.column_mut(col_idx))
116 {
117 if let Some(value) = src_col.get_value(row) {
118 dst_col.push_value(value);
119 } else {
120 dst_col.push_value(Value::Null);
121 }
122 }
123 }
124 builder.advance_row();
125
126 if builder.is_full() {
127 return Ok(Some(builder.finish()));
128 }
129 }
130 }
131
132 if builder.row_count() > 0 {
133 return Ok(Some(builder.finish()));
134 }
135 }
137 }
138
139 fn reset(&mut self) {
140 self.child.reset();
141 self.seen.clear();
142 }
143
144 fn name(&self) -> &'static str {
145 "Distinct"
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::execution::chunk::DataChunkBuilder;
153
154 struct MockOperator {
155 chunks: Vec<DataChunk>,
156 position: usize,
157 }
158
159 impl MockOperator {
160 fn new(chunks: Vec<DataChunk>) -> Self {
161 Self {
162 chunks,
163 position: 0,
164 }
165 }
166 }
167
168 impl Operator for MockOperator {
169 fn next(&mut self) -> OperatorResult {
170 if self.position < self.chunks.len() {
171 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
172 self.position += 1;
173 Ok(Some(chunk))
174 } else {
175 Ok(None)
176 }
177 }
178
179 fn reset(&mut self) {
180 self.position = 0;
181 }
182
183 fn name(&self) -> &'static str {
184 "Mock"
185 }
186 }
187
188 fn create_chunk_with_duplicates() -> DataChunk {
189 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
190
191 let data = [
192 (1i64, "a"),
193 (2, "b"),
194 (1, "a"), (3, "c"),
196 (2, "b"), (1, "a"), ];
199
200 for (num, text) in data {
201 builder.column_mut(0).unwrap().push_int64(num);
202 builder.column_mut(1).unwrap().push_string(text);
203 builder.advance_row();
204 }
205
206 builder.finish()
207 }
208
209 #[test]
210 fn test_distinct_all_columns() {
211 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
212
213 let mut distinct = DistinctOperator::new(
214 Box::new(mock),
215 vec![LogicalType::Int64, LogicalType::String],
216 );
217
218 let mut results = Vec::new();
219 while let Some(chunk) = distinct.next().unwrap() {
220 for row in chunk.selected_indices() {
221 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
222 let text = chunk
223 .column(1)
224 .unwrap()
225 .get_string(row)
226 .unwrap()
227 .to_string();
228 results.push((num, text));
229 }
230 }
231
232 assert_eq!(results.len(), 3);
234
235 results.sort();
237 assert_eq!(
238 results,
239 vec![
240 (1, "a".to_string()),
241 (2, "b".to_string()),
242 (3, "c".to_string()),
243 ]
244 );
245 }
246
247 #[test]
248 fn test_distinct_single_column() {
249 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
250
251 let mut distinct = DistinctOperator::on_columns(
252 Box::new(mock),
253 vec![0], vec![LogicalType::Int64, LogicalType::String],
255 );
256
257 let mut results = Vec::new();
258 while let Some(chunk) = distinct.next().unwrap() {
259 for row in chunk.selected_indices() {
260 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
261 results.push(num);
262 }
263 }
264
265 results.sort();
267 assert_eq!(results, vec![1, 2, 3]);
268 }
269
270 #[test]
271 fn test_distinct_across_chunks() {
272 let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
274 for i in [1, 2, 3] {
275 builder1.column_mut(0).unwrap().push_int64(i);
276 builder1.advance_row();
277 }
278
279 let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
280 for i in [2, 3, 4] {
281 builder2.column_mut(0).unwrap().push_int64(i);
282 builder2.advance_row();
283 }
284
285 let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
286
287 let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
288
289 let mut results = Vec::new();
290 while let Some(chunk) = distinct.next().unwrap() {
291 for row in chunk.selected_indices() {
292 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
293 results.push(num);
294 }
295 }
296
297 results.sort();
299 assert_eq!(results, vec![1, 2, 3, 4]);
300 }
301}