1use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7use crate::execution::vector::ValueVector;
8use grafeo_common::types::Value;
9use std::collections::HashSet;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13struct RowKey(Vec<u64>);
14
15impl RowKey {
16 fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
17 let hashes: Vec<u64> = columns
18 .iter()
19 .map(|&col| {
20 chunk
21 .column(col)
22 .and_then(|c| c.get_value(row))
23 .map_or(0, |v| hash_value(&v))
24 })
25 .collect();
26 Self(hashes)
27 }
28
29 fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
30 let hashes: Vec<u64> = (0..chunk.column_count())
31 .map(|col| {
32 chunk
33 .column(col)
34 .and_then(|c| c.get_value(row))
35 .map_or(0, |v| hash_value(&v))
36 })
37 .collect();
38 Self(hashes)
39 }
40}
41
42fn hash_value(value: &Value) -> u64 {
43 use std::collections::hash_map::DefaultHasher;
44 use std::hash::Hasher;
45
46 let mut hasher = DefaultHasher::new();
47 hash_value_into(value, &mut hasher);
48 hasher.finish()
49}
50
51fn hash_value_into(value: &Value, hasher: &mut impl std::hash::Hasher) {
55 use std::hash::Hash;
56
57 std::mem::discriminant(value).hash(hasher);
58 match value {
59 Value::Null => {}
60 Value::Bool(b) => b.hash(hasher),
61 Value::Int64(i) => i.hash(hasher),
62 Value::Float64(f) => f.to_bits().hash(hasher),
63 Value::String(s) => s.hash(hasher),
64 Value::Bytes(b) => b.hash(hasher),
65 Value::List(items) => {
66 items.len().hash(hasher);
67 for item in items.iter() {
68 hash_value_into(item, hasher);
69 }
70 }
71 Value::Map(map) => {
72 map.len().hash(hasher);
73 for (k, v) in map.iter() {
75 k.as_str().hash(hasher);
76 hash_value_into(v, hasher);
77 }
78 }
79 Value::Vector(vec) => {
80 vec.len().hash(hasher);
81 for f in vec.iter() {
82 f.to_bits().hash(hasher);
83 }
84 }
85 Value::Path { nodes, edges } => {
86 nodes.len().hash(hasher);
87 for n in nodes.iter() {
88 hash_value_into(n, hasher);
89 }
90 edges.len().hash(hasher);
91 for e in edges.iter() {
92 hash_value_into(e, hasher);
93 }
94 }
95 _ => format!("{value}").hash(hasher),
98 }
99}
100
101pub struct DistinctPushOperator {
107 columns: Option<Vec<usize>>,
109 seen: HashSet<RowKey>,
111}
112
113impl DistinctPushOperator {
114 pub fn new() -> Self {
116 Self {
117 columns: None,
118 seen: HashSet::new(),
119 }
120 }
121
122 pub fn on_columns(columns: Vec<usize>) -> Self {
124 Self {
125 columns: Some(columns),
126 seen: HashSet::new(),
127 }
128 }
129
130 pub fn unique_count(&self) -> usize {
132 self.seen.len()
133 }
134}
135
136impl Default for DistinctPushOperator {
137 fn default() -> Self {
138 Self::new()
139 }
140}
141
142impl PushOperator for DistinctPushOperator {
143 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
144 if chunk.is_empty() {
145 return Ok(true);
146 }
147
148 let mut new_indices = Vec::new();
150
151 for row in chunk.selected_indices() {
152 let key = match &self.columns {
153 Some(cols) => RowKey::from_row(&chunk, row, cols),
154 None => RowKey::from_all_columns(&chunk, row),
155 };
156
157 if self.seen.insert(key) {
158 new_indices.push(row);
159 }
160 }
161
162 if new_indices.is_empty() {
163 return Ok(true);
164 }
165
166 let selection = SelectionVector::from_predicate(chunk.len(), |i| new_indices.contains(&i));
168 let filtered = chunk.filter(&selection);
169
170 sink.consume(filtered)
171 }
172
173 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
174 Ok(())
176 }
177
178 fn preferred_chunk_size(&self) -> ChunkSizeHint {
179 ChunkSizeHint::Default
180 }
181
182 fn name(&self) -> &'static str {
183 "DistinctPush"
184 }
185}
186
187pub struct DistinctMaterializingOperator {
193 columns: Option<Vec<usize>>,
195 rows: Vec<Vec<Value>>,
197 seen: HashSet<RowKey>,
199 num_columns: Option<usize>,
201}
202
203impl DistinctMaterializingOperator {
204 pub fn new() -> Self {
206 Self {
207 columns: None,
208 rows: Vec::new(),
209 seen: HashSet::new(),
210 num_columns: None,
211 }
212 }
213
214 pub fn on_columns(columns: Vec<usize>) -> Self {
216 Self {
217 columns: Some(columns),
218 rows: Vec::new(),
219 seen: HashSet::new(),
220 num_columns: None,
221 }
222 }
223}
224
225impl Default for DistinctMaterializingOperator {
226 fn default() -> Self {
227 Self::new()
228 }
229}
230
231impl PushOperator for DistinctMaterializingOperator {
232 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
233 if chunk.is_empty() {
234 return Ok(true);
235 }
236
237 if self.num_columns.is_none() {
238 self.num_columns = Some(chunk.column_count());
239 }
240
241 let num_cols = chunk.column_count();
242
243 for row in chunk.selected_indices() {
244 let key = match &self.columns {
245 Some(cols) => RowKey::from_row(&chunk, row, cols),
246 None => RowKey::from_all_columns(&chunk, row),
247 };
248
249 if self.seen.insert(key) {
250 let row_values: Vec<Value> = (0..num_cols)
252 .map(|col| {
253 chunk
254 .column(col)
255 .and_then(|c| c.get_value(row))
256 .unwrap_or(Value::Null)
257 })
258 .collect();
259 self.rows.push(row_values);
260 }
261 }
262
263 Ok(true)
264 }
265
266 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
267 if self.rows.is_empty() {
268 return Ok(());
269 }
270
271 let num_cols = self.num_columns.unwrap_or(0);
272 let mut columns: Vec<ValueVector> = (0..num_cols).map(|_| ValueVector::new()).collect();
273
274 for row in &self.rows {
275 for (col_idx, col) in columns.iter_mut().enumerate() {
276 let val = row.get(col_idx).cloned().unwrap_or(Value::Null);
277 col.push(val);
278 }
279 }
280
281 let chunk = DataChunk::new(columns);
282 sink.consume(chunk)?;
283
284 Ok(())
285 }
286
287 fn preferred_chunk_size(&self) -> ChunkSizeHint {
288 ChunkSizeHint::Default
289 }
290
291 fn name(&self) -> &'static str {
292 "DistinctMaterializing"
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use crate::execution::sink::CollectorSink;
300
301 fn create_test_chunk(values: &[i64]) -> DataChunk {
302 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
303 let vector = ValueVector::from_values(&v);
304 DataChunk::new(vec![vector])
305 }
306
307 #[test]
308 fn test_distinct_all_unique() {
309 let mut distinct = DistinctPushOperator::new();
310 let mut sink = CollectorSink::new();
311
312 distinct
313 .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
314 .unwrap();
315 distinct.finalize(&mut sink).unwrap();
316
317 assert_eq!(sink.row_count(), 5);
318 assert_eq!(distinct.unique_count(), 5);
319 }
320
321 #[test]
322 fn test_distinct_with_duplicates() {
323 let mut distinct = DistinctPushOperator::new();
324 let mut sink = CollectorSink::new();
325
326 distinct
327 .push(create_test_chunk(&[1, 2, 1, 3, 2, 1, 4]), &mut sink)
328 .unwrap();
329 distinct.finalize(&mut sink).unwrap();
330
331 assert_eq!(sink.row_count(), 4); assert_eq!(distinct.unique_count(), 4);
333 }
334
335 #[test]
336 fn test_distinct_all_same() {
337 let mut distinct = DistinctPushOperator::new();
338 let mut sink = CollectorSink::new();
339
340 distinct
341 .push(create_test_chunk(&[5, 5, 5, 5, 5]), &mut sink)
342 .unwrap();
343 distinct.finalize(&mut sink).unwrap();
344
345 assert_eq!(sink.row_count(), 1);
346 assert_eq!(distinct.unique_count(), 1);
347 }
348
349 #[test]
350 fn test_distinct_multiple_chunks() {
351 let mut distinct = DistinctPushOperator::new();
352 let mut sink = CollectorSink::new();
353
354 distinct
355 .push(create_test_chunk(&[1, 2, 3]), &mut sink)
356 .unwrap();
357 distinct
358 .push(create_test_chunk(&[2, 3, 4]), &mut sink)
359 .unwrap();
360 distinct
361 .push(create_test_chunk(&[3, 4, 5]), &mut sink)
362 .unwrap();
363 distinct.finalize(&mut sink).unwrap();
364
365 assert_eq!(sink.row_count(), 5); }
367
368 #[test]
369 fn test_distinct_materializing() {
370 let mut distinct = DistinctMaterializingOperator::new();
371 let mut sink = CollectorSink::new();
372
373 distinct
374 .push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
375 .unwrap();
376 distinct.finalize(&mut sink).unwrap();
377
378 let chunks = sink.into_chunks();
380 assert_eq!(chunks.len(), 1);
381 assert_eq!(chunks[0].len(), 7); }
383
384 fn create_mixed_chunk(values: &[Value]) -> DataChunk {
385 let vector = ValueVector::from_values(values);
386 DataChunk::new(vec![vector])
387 }
388
389 #[test]
390 fn test_distinct_null_values() {
391 let mut distinct = DistinctPushOperator::new();
392 let mut sink = CollectorSink::new();
393
394 let chunk = create_mixed_chunk(&[Value::Null, Value::Null, Value::Int64(1)]);
395 distinct.push(chunk, &mut sink).unwrap();
396 distinct.finalize(&mut sink).unwrap();
397 assert_eq!(distinct.unique_count(), 2); }
399
400 #[test]
401 fn test_distinct_bool_values() {
402 let mut distinct = DistinctPushOperator::new();
403 let mut sink = CollectorSink::new();
404
405 let chunk = create_mixed_chunk(&[Value::Bool(true), Value::Bool(false), Value::Bool(true)]);
406 distinct.push(chunk, &mut sink).unwrap();
407 distinct.finalize(&mut sink).unwrap();
408 assert_eq!(distinct.unique_count(), 2);
409 }
410
411 #[test]
412 fn test_distinct_float_values() {
413 let mut distinct = DistinctPushOperator::new();
414 let mut sink = CollectorSink::new();
415
416 let chunk = create_mixed_chunk(&[
417 Value::Float64(1.0),
418 Value::Float64(2.0),
419 Value::Float64(1.0),
420 Value::Float64(f64::NAN),
421 ]);
422 distinct.push(chunk, &mut sink).unwrap();
423 distinct.finalize(&mut sink).unwrap();
424 assert_eq!(distinct.unique_count(), 3); }
426
427 #[test]
428 fn test_distinct_string_values() {
429 let mut distinct = DistinctPushOperator::new();
430 let mut sink = CollectorSink::new();
431
432 let chunk =
433 create_mixed_chunk(&[Value::from("Alix"), Value::from("Gus"), Value::from("Alix")]);
434 distinct.push(chunk, &mut sink).unwrap();
435 distinct.finalize(&mut sink).unwrap();
436 assert_eq!(distinct.unique_count(), 2);
437 }
438
439 #[test]
440 fn test_distinct_bytes_values() {
441 let mut distinct = DistinctPushOperator::new();
442 let mut sink = CollectorSink::new();
443
444 let chunk = create_mixed_chunk(&[
445 Value::Bytes(vec![1u8, 2, 3].into()),
446 Value::Bytes(vec![4u8, 5, 6].into()),
447 Value::Bytes(vec![1u8, 2, 3].into()),
448 ]);
449 distinct.push(chunk, &mut sink).unwrap();
450 distinct.finalize(&mut sink).unwrap();
451 assert_eq!(distinct.unique_count(), 2);
452 }
453
454 #[test]
455 fn test_distinct_list_values() {
456 let mut distinct = DistinctPushOperator::new();
457 let mut sink = CollectorSink::new();
458
459 let chunk = create_mixed_chunk(&[
460 Value::List(vec![Value::Int64(1), Value::Int64(2)].into()),
461 Value::List(vec![Value::Int64(3), Value::Int64(4)].into()),
462 Value::List(vec![Value::Int64(1), Value::Int64(2)].into()),
463 ]);
464 distinct.push(chunk, &mut sink).unwrap();
465 distinct.finalize(&mut sink).unwrap();
466 assert_eq!(distinct.unique_count(), 2);
467 }
468
469 #[test]
470 fn test_distinct_map_values() {
471 use std::collections::BTreeMap;
472
473 let mut map1 = BTreeMap::new();
474 map1.insert("a".into(), Value::Int64(1));
475 let mut map2 = BTreeMap::new();
476 map2.insert("b".into(), Value::Int64(2));
477
478 let mut distinct = DistinctPushOperator::new();
479 let mut sink = CollectorSink::new();
480
481 let chunk = create_mixed_chunk(&[
482 Value::Map(map1.clone().into()),
483 Value::Map(map2.into()),
484 Value::Map(map1.into()),
485 ]);
486 distinct.push(chunk, &mut sink).unwrap();
487 distinct.finalize(&mut sink).unwrap();
488 assert_eq!(distinct.unique_count(), 2);
489 }
490
491 #[test]
492 fn test_distinct_vector_values() {
493 let mut distinct = DistinctPushOperator::new();
494 let mut sink = CollectorSink::new();
495
496 let chunk = create_mixed_chunk(&[
497 Value::Vector(vec![1.0_f32, 2.0].into()),
498 Value::Vector(vec![3.0_f32, 4.0].into()),
499 Value::Vector(vec![1.0_f32, 2.0].into()),
500 ]);
501 distinct.push(chunk, &mut sink).unwrap();
502 distinct.finalize(&mut sink).unwrap();
503 assert_eq!(distinct.unique_count(), 2);
504 }
505
506 #[test]
507 fn test_distinct_path_values() {
508 let mut distinct = DistinctPushOperator::new();
509 let mut sink = CollectorSink::new();
510
511 let path1 = Value::Path {
512 nodes: vec![Value::Int64(1), Value::Int64(2)].into(),
513 edges: vec![Value::Int64(10)].into(),
514 };
515 let path2 = Value::Path {
516 nodes: vec![Value::Int64(3), Value::Int64(4)].into(),
517 edges: vec![Value::Int64(20)].into(),
518 };
519
520 let chunk = create_mixed_chunk(&[path1.clone(), path2, path1]);
521 distinct.push(chunk, &mut sink).unwrap();
522 distinct.finalize(&mut sink).unwrap();
523 assert_eq!(distinct.unique_count(), 2);
524 }
525
526 #[test]
527 fn test_distinct_mixed_types_are_distinct() {
528 let mut distinct = DistinctPushOperator::new();
529 let mut sink = CollectorSink::new();
530
531 let chunk = create_mixed_chunk(&[
533 Value::Int64(1),
534 Value::Float64(1.0),
535 Value::from("1"),
536 Value::Bool(true),
537 ]);
538 distinct.push(chunk, &mut sink).unwrap();
539 distinct.finalize(&mut sink).unwrap();
540 assert_eq!(distinct.unique_count(), 4);
541 }
542
543 #[test]
544 fn test_hash_value_deterministic() {
545 let v1 = Value::from("test");
547 let v2 = Value::from("test");
548 assert_eq!(hash_value(&v1), hash_value(&v2));
549
550 let v3 = Value::from("other");
552 assert_ne!(hash_value(&v1), hash_value(&v3));
553 }
554}