grafeo_core/execution/operators/push/
distinct.rs1use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7use crate::execution::vector::ValueVector;
8use grafeo_common::types::Value;
9use std::collections::HashSet;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13struct RowKey(Vec<u64>);
14
15impl RowKey {
16 fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
17 let hashes: Vec<u64> = columns
18 .iter()
19 .map(|&col| {
20 chunk
21 .column(col)
22 .and_then(|c| c.get_value(row))
23 .map(|v| hash_value(&v))
24 .unwrap_or(0)
25 })
26 .collect();
27 Self(hashes)
28 }
29
30 fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
31 let hashes: Vec<u64> = (0..chunk.column_count())
32 .map(|col| {
33 chunk
34 .column(col)
35 .and_then(|c| c.get_value(row))
36 .map(|v| hash_value(&v))
37 .unwrap_or(0)
38 })
39 .collect();
40 Self(hashes)
41 }
42}
43
44fn hash_value(value: &Value) -> u64 {
45 use std::collections::hash_map::DefaultHasher;
46 use std::hash::{Hash, Hasher};
47
48 let mut hasher = DefaultHasher::new();
49 match value {
50 Value::Null => 0u8.hash(&mut hasher),
51 Value::Bool(b) => b.hash(&mut hasher),
52 Value::Int64(i) => i.hash(&mut hasher),
53 Value::Float64(f) => f.to_bits().hash(&mut hasher),
54 Value::String(s) => s.hash(&mut hasher),
55 _ => 0u8.hash(&mut hasher),
56 }
57 hasher.finish()
58}
59
60pub struct DistinctPushOperator {
66 columns: Option<Vec<usize>>,
68 seen: HashSet<RowKey>,
70}
71
72impl DistinctPushOperator {
73 pub fn new() -> Self {
75 Self {
76 columns: None,
77 seen: HashSet::new(),
78 }
79 }
80
81 pub fn on_columns(columns: Vec<usize>) -> Self {
83 Self {
84 columns: Some(columns),
85 seen: HashSet::new(),
86 }
87 }
88
89 pub fn unique_count(&self) -> usize {
91 self.seen.len()
92 }
93}
94
95impl Default for DistinctPushOperator {
96 fn default() -> Self {
97 Self::new()
98 }
99}
100
101impl PushOperator for DistinctPushOperator {
102 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
103 if chunk.is_empty() {
104 return Ok(true);
105 }
106
107 let mut new_indices = Vec::new();
109
110 for row in chunk.selected_indices() {
111 let key = match &self.columns {
112 Some(cols) => RowKey::from_row(&chunk, row, cols),
113 None => RowKey::from_all_columns(&chunk, row),
114 };
115
116 if self.seen.insert(key) {
117 new_indices.push(row);
118 }
119 }
120
121 if new_indices.is_empty() {
122 return Ok(true);
123 }
124
125 let selection = SelectionVector::from_predicate(chunk.len(), |i| new_indices.contains(&i));
127 let filtered = chunk.filter(&selection);
128
129 sink.consume(filtered)
130 }
131
132 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
133 Ok(())
135 }
136
137 fn preferred_chunk_size(&self) -> ChunkSizeHint {
138 ChunkSizeHint::Default
139 }
140
141 fn name(&self) -> &'static str {
142 "DistinctPush"
143 }
144}
145
146pub struct DistinctMaterializingOperator {
152 columns: Option<Vec<usize>>,
154 rows: Vec<Vec<Value>>,
156 seen: HashSet<RowKey>,
158 num_columns: Option<usize>,
160}
161
162impl DistinctMaterializingOperator {
163 pub fn new() -> Self {
165 Self {
166 columns: None,
167 rows: Vec::new(),
168 seen: HashSet::new(),
169 num_columns: None,
170 }
171 }
172
173 pub fn on_columns(columns: Vec<usize>) -> Self {
175 Self {
176 columns: Some(columns),
177 rows: Vec::new(),
178 seen: HashSet::new(),
179 num_columns: None,
180 }
181 }
182}
183
184impl Default for DistinctMaterializingOperator {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190impl PushOperator for DistinctMaterializingOperator {
191 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
192 if chunk.is_empty() {
193 return Ok(true);
194 }
195
196 if self.num_columns.is_none() {
197 self.num_columns = Some(chunk.column_count());
198 }
199
200 let num_cols = chunk.column_count();
201
202 for row in chunk.selected_indices() {
203 let key = match &self.columns {
204 Some(cols) => RowKey::from_row(&chunk, row, cols),
205 None => RowKey::from_all_columns(&chunk, row),
206 };
207
208 if self.seen.insert(key) {
209 let row_values: Vec<Value> = (0..num_cols)
211 .map(|col| {
212 chunk
213 .column(col)
214 .and_then(|c| c.get_value(row))
215 .unwrap_or(Value::Null)
216 })
217 .collect();
218 self.rows.push(row_values);
219 }
220 }
221
222 Ok(true)
223 }
224
225 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
226 if self.rows.is_empty() {
227 return Ok(());
228 }
229
230 let num_cols = self.num_columns.unwrap_or(0);
231 let mut columns: Vec<ValueVector> = (0..num_cols).map(|_| ValueVector::new()).collect();
232
233 for row in &self.rows {
234 for (col_idx, col) in columns.iter_mut().enumerate() {
235 let val = row.get(col_idx).cloned().unwrap_or(Value::Null);
236 col.push(val);
237 }
238 }
239
240 let chunk = DataChunk::new(columns);
241 sink.consume(chunk)?;
242
243 Ok(())
244 }
245
246 fn preferred_chunk_size(&self) -> ChunkSizeHint {
247 ChunkSizeHint::Default
248 }
249
250 fn name(&self) -> &'static str {
251 "DistinctMaterializing"
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::execution::sink::CollectorSink;
259
260 fn create_test_chunk(values: &[i64]) -> DataChunk {
261 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
262 let vector = ValueVector::from_values(&v);
263 DataChunk::new(vec![vector])
264 }
265
266 #[test]
267 fn test_distinct_all_unique() {
268 let mut distinct = DistinctPushOperator::new();
269 let mut sink = CollectorSink::new();
270
271 distinct
272 .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
273 .unwrap();
274 distinct.finalize(&mut sink).unwrap();
275
276 assert_eq!(sink.row_count(), 5);
277 assert_eq!(distinct.unique_count(), 5);
278 }
279
280 #[test]
281 fn test_distinct_with_duplicates() {
282 let mut distinct = DistinctPushOperator::new();
283 let mut sink = CollectorSink::new();
284
285 distinct
286 .push(create_test_chunk(&[1, 2, 1, 3, 2, 1, 4]), &mut sink)
287 .unwrap();
288 distinct.finalize(&mut sink).unwrap();
289
290 assert_eq!(sink.row_count(), 4); assert_eq!(distinct.unique_count(), 4);
292 }
293
294 #[test]
295 fn test_distinct_all_same() {
296 let mut distinct = DistinctPushOperator::new();
297 let mut sink = CollectorSink::new();
298
299 distinct
300 .push(create_test_chunk(&[5, 5, 5, 5, 5]), &mut sink)
301 .unwrap();
302 distinct.finalize(&mut sink).unwrap();
303
304 assert_eq!(sink.row_count(), 1);
305 assert_eq!(distinct.unique_count(), 1);
306 }
307
308 #[test]
309 fn test_distinct_multiple_chunks() {
310 let mut distinct = DistinctPushOperator::new();
311 let mut sink = CollectorSink::new();
312
313 distinct
314 .push(create_test_chunk(&[1, 2, 3]), &mut sink)
315 .unwrap();
316 distinct
317 .push(create_test_chunk(&[2, 3, 4]), &mut sink)
318 .unwrap();
319 distinct
320 .push(create_test_chunk(&[3, 4, 5]), &mut sink)
321 .unwrap();
322 distinct.finalize(&mut sink).unwrap();
323
324 assert_eq!(sink.row_count(), 5); }
326
327 #[test]
328 fn test_distinct_materializing() {
329 let mut distinct = DistinctMaterializingOperator::new();
330 let mut sink = CollectorSink::new();
331
332 distinct
333 .push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
334 .unwrap();
335 distinct.finalize(&mut sink).unwrap();
336
337 let chunks = sink.into_chunks();
339 assert_eq!(chunks.len(), 1);
340 assert_eq!(chunks[0].len(), 7); }
342}