grafeo_core/execution/operators/
distinct.rs1use std::collections::HashSet;
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20 Null,
21 Bool(bool),
22 Int64(i64),
23 String(String),
24}
25
26impl RowKey {
27 fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29 let parts: Vec<KeyPart> = columns
30 .iter()
31 .map(|&col_idx| {
32 chunk
33 .column(col_idx)
34 .and_then(|col| col.get_value(row))
35 .map_or(KeyPart::Null, |v| match v {
36 Value::Null => KeyPart::Null,
37 Value::Bool(b) => KeyPart::Bool(b),
38 Value::Int64(i) => KeyPart::Int64(i),
39 #[allow(clippy::cast_possible_wrap)]
41 Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
42 Value::String(s) => KeyPart::String(s.to_string()),
43 _ => KeyPart::String(format!("{v:?}")),
44 })
45 })
46 .collect();
47 RowKey(parts)
48 }
49
50 fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
52 let columns: Vec<usize> = (0..chunk.column_count()).collect();
53 Self::from_row(chunk, row, &columns)
54 }
55}
56
57pub struct DistinctOperator {
61 child: Box<dyn Operator>,
63 distinct_columns: Option<Vec<usize>>,
65 output_schema: Vec<LogicalType>,
67 seen: HashSet<RowKey>,
69}
70
71impl DistinctOperator {
72 pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
74 Self {
75 child,
76 distinct_columns: None,
77 output_schema,
78 seen: HashSet::new(),
79 }
80 }
81
82 pub fn into_parts(self) -> (Box<dyn Operator>, Option<Vec<usize>>) {
84 (self.child, self.distinct_columns)
85 }
86
87 pub fn on_columns(
89 child: Box<dyn Operator>,
90 columns: Vec<usize>,
91 output_schema: Vec<LogicalType>,
92 ) -> Self {
93 Self {
94 child,
95 distinct_columns: Some(columns),
96 output_schema,
97 seen: HashSet::new(),
98 }
99 }
100}
101
102impl Operator for DistinctOperator {
103 fn next(&mut self) -> OperatorResult {
104 loop {
105 let Some(chunk) = self.child.next()? else {
106 return Ok(None);
107 };
108
109 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
110
111 for row in chunk.selected_indices() {
112 let key = match &self.distinct_columns {
113 Some(cols) => RowKey::from_row(&chunk, row, cols),
114 None => RowKey::from_all_columns(&chunk, row),
115 };
116
117 if self.seen.insert(key) {
118 for col_idx in 0..chunk.column_count() {
120 if let (Some(src_col), Some(dst_col)) =
121 (chunk.column(col_idx), builder.column_mut(col_idx))
122 {
123 if let Some(value) = src_col.get_value(row) {
124 dst_col.push_value(value);
125 } else {
126 dst_col.push_value(Value::Null);
127 }
128 }
129 }
130 builder.advance_row();
131
132 if builder.is_full() {
133 return Ok(Some(builder.finish()));
134 }
135 }
136 }
137
138 if builder.row_count() > 0 {
139 return Ok(Some(builder.finish()));
140 }
141 }
143 }
144
145 fn reset(&mut self) {
146 self.child.reset();
147 self.seen.clear();
148 }
149
150 fn name(&self) -> &'static str {
151 "Distinct"
152 }
153
154 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
155 self
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::execution::chunk::DataChunkBuilder;
163
164 struct MockOperator {
165 chunks: Vec<DataChunk>,
166 position: usize,
167 }
168
169 impl MockOperator {
170 fn new(chunks: Vec<DataChunk>) -> Self {
171 Self {
172 chunks,
173 position: 0,
174 }
175 }
176 }
177
178 impl Operator for MockOperator {
179 fn next(&mut self) -> OperatorResult {
180 if self.position < self.chunks.len() {
181 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
182 self.position += 1;
183 Ok(Some(chunk))
184 } else {
185 Ok(None)
186 }
187 }
188
189 fn reset(&mut self) {
190 self.position = 0;
191 }
192
193 fn name(&self) -> &'static str {
194 "Mock"
195 }
196
197 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
198 self
199 }
200 }
201
202 fn create_chunk_with_duplicates() -> DataChunk {
203 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
204
205 let data = [
206 (1i64, "a"),
207 (2, "b"),
208 (1, "a"), (3, "c"),
210 (2, "b"), (1, "a"), ];
213
214 for (num, text) in data {
215 builder.column_mut(0).unwrap().push_int64(num);
216 builder.column_mut(1).unwrap().push_string(text);
217 builder.advance_row();
218 }
219
220 builder.finish()
221 }
222
223 #[test]
224 fn test_distinct_all_columns() {
225 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
226
227 let mut distinct = DistinctOperator::new(
228 Box::new(mock),
229 vec![LogicalType::Int64, LogicalType::String],
230 );
231
232 let mut results = Vec::new();
233 while let Some(chunk) = distinct.next().unwrap() {
234 for row in chunk.selected_indices() {
235 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
236 let text = chunk
237 .column(1)
238 .unwrap()
239 .get_string(row)
240 .unwrap()
241 .to_string();
242 results.push((num, text));
243 }
244 }
245
246 assert_eq!(results.len(), 3);
248
249 results.sort();
251 assert_eq!(
252 results,
253 vec![
254 (1, "a".to_string()),
255 (2, "b".to_string()),
256 (3, "c".to_string()),
257 ]
258 );
259 }
260
261 #[test]
262 fn test_distinct_single_column() {
263 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
264
265 let mut distinct = DistinctOperator::on_columns(
266 Box::new(mock),
267 vec![0], vec![LogicalType::Int64, LogicalType::String],
269 );
270
271 let mut results = Vec::new();
272 while let Some(chunk) = distinct.next().unwrap() {
273 for row in chunk.selected_indices() {
274 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
275 results.push(num);
276 }
277 }
278
279 results.sort_unstable();
281 assert_eq!(results, vec![1, 2, 3]);
282 }
283
284 #[test]
285 fn test_distinct_across_chunks() {
286 let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
288 for i in [1, 2, 3] {
289 builder1.column_mut(0).unwrap().push_int64(i);
290 builder1.advance_row();
291 }
292
293 let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
294 for i in [2, 3, 4] {
295 builder2.column_mut(0).unwrap().push_int64(i);
296 builder2.advance_row();
297 }
298
299 let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
300
301 let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
302
303 let mut results = Vec::new();
304 while let Some(chunk) = distinct.next().unwrap() {
305 for row in chunk.selected_indices() {
306 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
307 results.push(num);
308 }
309 }
310
311 results.sort_unstable();
313 assert_eq!(results, vec![1, 2, 3, 4]);
314 }
315
316 #[test]
317 fn test_distinct_into_any() {
318 let mock = MockOperator::new(vec![]);
319 let op = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
320 let any = Box::new(op).into_any();
321 assert!(any.downcast::<DistinctOperator>().is_ok());
322 }
323
324 #[test]
325 fn test_distinct_into_parts() {
326 let mock = MockOperator::new(vec![]);
327 let op = DistinctOperator::on_columns(
328 Box::new(mock),
329 vec![0, 2],
330 vec![LogicalType::Int64, LogicalType::String, LogicalType::Int64],
331 );
332 let (mut child, distinct_columns) = op.into_parts();
333 assert_eq!(distinct_columns, Some(vec![0, 2]));
334 assert!(child.next().unwrap().is_none());
335 }
336
337 #[test]
338 fn test_distinct_into_parts_all_columns() {
339 let mock = MockOperator::new(vec![]);
340 let op = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
341 let (_child, distinct_columns) = op.into_parts();
342 assert!(distinct_columns.is_none());
343 }
344}