graphos_core/execution/operators/
distinct.rs1use std::collections::HashSet;
7
8use graphos_common::types::{LogicalType, Value};
9
10use super::{Operator, OperatorResult};
11use crate::execution::DataChunk;
12use crate::execution::chunk::DataChunkBuilder;
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16struct RowKey(Vec<KeyPart>);
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash)]
19enum KeyPart {
20 Null,
21 Bool(bool),
22 Int64(i64),
23 String(String),
24}
25
26impl RowKey {
27 fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
29 let parts: Vec<KeyPart> = columns
30 .iter()
31 .map(|&col_idx| {
32 chunk
33 .column(col_idx)
34 .and_then(|col| col.get_value(row))
35 .map(|v| match v {
36 Value::Null => KeyPart::Null,
37 Value::Bool(b) => KeyPart::Bool(b),
38 Value::Int64(i) => KeyPart::Int64(i),
39 Value::Float64(f) => KeyPart::Int64(f.to_bits() as i64),
40 Value::String(s) => KeyPart::String(s.to_string()),
41 _ => KeyPart::String(format!("{v:?}")),
42 })
43 .unwrap_or(KeyPart::Null)
44 })
45 .collect();
46 RowKey(parts)
47 }
48
49 fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
51 let columns: Vec<usize> = (0..chunk.column_count()).collect();
52 Self::from_row(chunk, row, &columns)
53 }
54}
55
56pub struct DistinctOperator {
60 child: Box<dyn Operator>,
62 distinct_columns: Option<Vec<usize>>,
64 output_schema: Vec<LogicalType>,
66 seen: HashSet<RowKey>,
68}
69
70impl DistinctOperator {
71 pub fn new(child: Box<dyn Operator>, output_schema: Vec<LogicalType>) -> Self {
73 Self {
74 child,
75 distinct_columns: None,
76 output_schema,
77 seen: HashSet::new(),
78 }
79 }
80
81 pub fn on_columns(
83 child: Box<dyn Operator>,
84 columns: Vec<usize>,
85 output_schema: Vec<LogicalType>,
86 ) -> Self {
87 Self {
88 child,
89 distinct_columns: Some(columns),
90 output_schema,
91 seen: HashSet::new(),
92 }
93 }
94}
95
96impl Operator for DistinctOperator {
97 fn next(&mut self) -> OperatorResult {
98 loop {
99 let chunk = match self.child.next()? {
100 Some(c) => c,
101 None => return Ok(None),
102 };
103
104 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
105
106 for row in chunk.selected_indices() {
107 let key = match &self.distinct_columns {
108 Some(cols) => RowKey::from_row(&chunk, row, cols),
109 None => RowKey::from_all_columns(&chunk, row),
110 };
111
112 if self.seen.insert(key) {
113 for col_idx in 0..chunk.column_count() {
115 if let (Some(src_col), Some(dst_col)) =
116 (chunk.column(col_idx), builder.column_mut(col_idx))
117 {
118 if let Some(value) = src_col.get_value(row) {
119 dst_col.push_value(value);
120 } else {
121 dst_col.push_value(Value::Null);
122 }
123 }
124 }
125 builder.advance_row();
126
127 if builder.is_full() {
128 return Ok(Some(builder.finish()));
129 }
130 }
131 }
132
133 if builder.row_count() > 0 {
134 return Ok(Some(builder.finish()));
135 }
136 }
138 }
139
140 fn reset(&mut self) {
141 self.child.reset();
142 self.seen.clear();
143 }
144
145 fn name(&self) -> &'static str {
146 "Distinct"
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use super::*;
153 use crate::execution::chunk::DataChunkBuilder;
154
155 struct MockOperator {
156 chunks: Vec<DataChunk>,
157 position: usize,
158 }
159
160 impl MockOperator {
161 fn new(chunks: Vec<DataChunk>) -> Self {
162 Self {
163 chunks,
164 position: 0,
165 }
166 }
167 }
168
169 impl Operator for MockOperator {
170 fn next(&mut self) -> OperatorResult {
171 if self.position < self.chunks.len() {
172 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
173 self.position += 1;
174 Ok(Some(chunk))
175 } else {
176 Ok(None)
177 }
178 }
179
180 fn reset(&mut self) {
181 self.position = 0;
182 }
183
184 fn name(&self) -> &'static str {
185 "Mock"
186 }
187 }
188
189 fn create_chunk_with_duplicates() -> DataChunk {
190 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
191
192 let data = [
193 (1i64, "a"),
194 (2, "b"),
195 (1, "a"), (3, "c"),
197 (2, "b"), (1, "a"), ];
200
201 for (num, text) in data {
202 builder.column_mut(0).unwrap().push_int64(num);
203 builder.column_mut(1).unwrap().push_string(text);
204 builder.advance_row();
205 }
206
207 builder.finish()
208 }
209
210 #[test]
211 fn test_distinct_all_columns() {
212 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
213
214 let mut distinct = DistinctOperator::new(
215 Box::new(mock),
216 vec![LogicalType::Int64, LogicalType::String],
217 );
218
219 let mut results = Vec::new();
220 while let Some(chunk) = distinct.next().unwrap() {
221 for row in chunk.selected_indices() {
222 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
223 let text = chunk
224 .column(1)
225 .unwrap()
226 .get_string(row)
227 .unwrap()
228 .to_string();
229 results.push((num, text));
230 }
231 }
232
233 assert_eq!(results.len(), 3);
235
236 results.sort();
238 assert_eq!(
239 results,
240 vec![
241 (1, "a".to_string()),
242 (2, "b".to_string()),
243 (3, "c".to_string()),
244 ]
245 );
246 }
247
248 #[test]
249 fn test_distinct_single_column() {
250 let mock = MockOperator::new(vec![create_chunk_with_duplicates()]);
251
252 let mut distinct = DistinctOperator::on_columns(
253 Box::new(mock),
254 vec![0], vec![LogicalType::Int64, LogicalType::String],
256 );
257
258 let mut results = Vec::new();
259 while let Some(chunk) = distinct.next().unwrap() {
260 for row in chunk.selected_indices() {
261 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
262 results.push(num);
263 }
264 }
265
266 results.sort();
268 assert_eq!(results, vec![1, 2, 3]);
269 }
270
271 #[test]
272 fn test_distinct_across_chunks() {
273 let mut builder1 = DataChunkBuilder::new(&[LogicalType::Int64]);
275 for i in [1, 2, 3] {
276 builder1.column_mut(0).unwrap().push_int64(i);
277 builder1.advance_row();
278 }
279
280 let mut builder2 = DataChunkBuilder::new(&[LogicalType::Int64]);
281 for i in [2, 3, 4] {
282 builder2.column_mut(0).unwrap().push_int64(i);
283 builder2.advance_row();
284 }
285
286 let mock = MockOperator::new(vec![builder1.finish(), builder2.finish()]);
287
288 let mut distinct = DistinctOperator::new(Box::new(mock), vec![LogicalType::Int64]);
289
290 let mut results = Vec::new();
291 while let Some(chunk) = distinct.next().unwrap() {
292 for row in chunk.selected_indices() {
293 let num = chunk.column(0).unwrap().get_int64(row).unwrap();
294 results.push(num);
295 }
296 }
297
298 results.sort();
300 assert_eq!(results, vec![1, 2, 3, 4]);
301 }
302}