grafeo_core/execution/operators/
distinct.rs1use std::collections::HashSet;
7
8use grafeo_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20 Null,
21 Bool(bool),
22 Int64(i64),
23 String(String),
24}
25
26impl RowKey {
27 fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29 let parts: Vec<KeyPart> = columns
30 .iter()
31 .map(|&col_idx| {
32 chunk
33 .column(col_idx)
34 .and_then(|col| col.get_value(row))
35 .map_or(KeyPart::Null, |v| match v {
36 Value::Null => KeyPart::Null,
37 Value::Bool(b) => KeyPart::Bool(b),
38 Value::Int64(i) => KeyPart::Int64(i),
39 Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
40 Value::String(s) => KeyPart::String(s.to_string()),
41 _ => KeyPart::String(format!("{v:?}")),
42 })
43 })
44 .collect();
45 RowKey(parts)
46 }
47
48 fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
50 let columns: Vec<usize> = (0..chunk.column_count()).collect();
51 Self::from_row(chunk, row, &columns)
52 }
53}
54
55pub struct DistinctOperator {
59 child: Box<dyn Operator>,
61 distinct_columns: Option<Vec<usize>>,
63 output_schema: Vec<LogicalType>,
65 seen: HashSet<RowKey>,
67}
68
69impl DistinctOperator {
70 pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
72 Self {
73 child,
74 distinct_columns: None,
75 output_schema,
76 seen: HashSet::new(),
77 }
78 }
79
80 pub fn on_columns(
82 child: Box<dyn Operator>,
83 columns: Vec<usize>,
84 output_schema: Vec<LogicalType>,
85 ) -> Self {
86 Self {
87 child,
88 distinct_columns: Some(columns),
89 output_schema,
90 seen: HashSet::new(),
91 }
92 }
93}
94
95impl Operator for DistinctOperator {
96 fn next(&mut self) -> OperatorResult {
97 loop {
98 let Some(chunk) = self.child.next()? else {
99 return Ok(None);
100 };
101
102 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
103
104 for row in chunk.selected_indices() {
105 let key = match &self.distinct_columns {
106 Some(cols) => RowKey::from_row(&chunk, row, cols),
107 None => RowKey::from_all_columns(&chunk, row),
108 };
109
110 if self.seen.insert(key) {
111 for col_idx in 0..chunk.column_count() {
113 if let (Some(src_col), Some(dst_col)) =
114 (chunk.column(col_idx), builder.column_mut(col_idx))
115 {
116 if let Some(value) = src_col.get_value(row) {
117 dst_col.push_value(value);
118 } else {
119 dst_col.push_value(Value::Null);
120 }
121 }
122 }
123 builder.advance_row();
124
125 if builder.is_full() {
126 return Ok(Some(builder.finish()));
127 }
128 }
129 }
130
131 if builder.row_count() > 0 {
132 return Ok(Some(builder.finish()));
133 }
134 }
136 }
137
138 fn reset(&mut self) {
139 self.child.reset();
140 self.seen.clear();
141 }
142
143 fn name(&self) -> &'static str {
144 "Distinct"
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::execution::chunk::DataChunkBuilder;
152
153 struct MockOperator {
154 chunks: Vec<DataChunk>,
155 position: usize,
156 }
157
158 impl MockOperator {
159 fn new(chunks: Vec<DataChunk>) -> Self {
160 Self {
161 chunks,
162 position: 0,
163 }
164 }
165 }
166
167 impl Operator for MockOperator {
168 fn next(&mut self) -> OperatorResult {
169 if self.position < self.chunks.len() {
170 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
171 self.position += 1;
172 Ok(Some(chunk))
173 } else {
174 Ok(None)
175 }
176 }
177
178 fn reset(&mut self) {
179 self.position = 0;
180 }
181
182 fn name(&self) -> &'static str {
183 "Mock"
184 }
185 }
186
187 fn create_chunk_with_duplicates() -> DataChunk {
188 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
189
190 let data = [
191 (1i64, "a"),
192 (2, "b"),
193 (1, "a"), (3, "c"),
195 (2, "b"), (1, "a"), ];
198
199 for (num, text) in data {
200 builder.column_mut(0).unwrap().push_int64(num);
201 builder.column_mut(1).unwrap().push_string(text);
202 builder.advance_row();
203 }
204
205 builder.finish()
206 }
207
208 #[test]
209 fn test_distinct_all_columns() {
210 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
211
212 let mut distinct = DistinctOperator::new(
213 Box::new(mock),
214 vec![LogicalType::Int64, LogicalType::String],
215 );
216
217 let mut results = Vec::new();
218 while let Some(chunk) = distinct.next().unwrap() {
219 for row in chunk.selected_indices() {
220 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
221 let text = chunk
222 .column(1)
223 .unwrap()
224 .get_string(row)
225 .unwrap()
226 .to_string();
227 results.push((num, text));
228 }
229 }
230
231 assert_eq!(results.len(), 3);
233
234 results.sort();
236 assert_eq!(
237 results,
238 vec![
239 (1, "a".to_string()),
240 (2, "b".to_string()),
241 (3, "c".to_string()),
242 ]
243 );
244 }
245
246 #[test]
247 fn test_distinct_single_column() {
248 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
249
250 let mut distinct = DistinctOperator::on_columns(
251 Box::new(mock),
252 vec![0], vec![LogicalType::Int64, LogicalType::String],
254 );
255
256 let mut results = Vec::new();
257 while let Some(chunk) = distinct.next().unwrap() {
258 for row in chunk.selected_indices() {
259 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
260 results.push(num);
261 }
262 }
263
264 results.sort_unstable();
266 assert_eq!(results, vec![1, 2, 3]);
267 }
268
269 #[test]
270 fn test_distinct_across_chunks() {
271 let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
273 for i in [1, 2, 3] {
274 builder1.column_mut(0).unwrap().push_int64(i);
275 builder1.advance_row();
276 }
277
278 let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
279 for i in [2, 3, 4] {
280 builder2.column_mut(0).unwrap().push_int64(i);
281 builder2.advance_row();
282 }
283
284 let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
285
286 let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
287
288 let mut results = Vec::new();
289 while let Some(chunk) = distinct.next().unwrap() {
290 for row in chunk.selected_indices() {
291 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
292 results.push(num);
293 }
294 }
295
296 results.sort_unstable();
298 assert_eq!(results, vec![1, 2, 3, 4]);
299 }
300}