grafeo_core/execution/operators/push/
distinct.rs1use crate::execution::chunk::DataChunk;
4use crate::execution::operators::OperatorError;
5use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
6use crate::execution::selection::SelectionVector;
7use crate::execution::vector::ValueVector;
8use grafeo_common::types::Value;
9use std::collections::HashSet;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
13struct RowKey(Vec<u64>);
14
15impl RowKey {
16 fn from_row(chunk: &DataChunk, row: usize, columns: &[usize]) -> Self {
17 let hashes: Vec<u64> = columns
18 .iter()
19 .map(|&col| {
20 chunk
21 .column(col)
22 .and_then(|c| c.get_value(row))
23 .map_or(0, |v| hash_value(&v))
24 })
25 .collect();
26 Self(hashes)
27 }
28
29 fn from_all_columns(chunk: &DataChunk, row: usize) -> Self {
30 let hashes: Vec<u64> = (0..chunk.column_count())
31 .map(|col| {
32 chunk
33 .column(col)
34 .and_then(|c| c.get_value(row))
35 .map_or(0, |v| hash_value(&v))
36 })
37 .collect();
38 Self(hashes)
39 }
40}
41
42fn hash_value(value: &Value) -> u64 {
43 use std::collections::hash_map::DefaultHasher;
44 use std::hash::{Hash, Hasher};
45
46 let mut hasher = DefaultHasher::new();
47 match value {
48 Value::Null => 0u8.hash(&mut hasher),
49 Value::Bool(b) => b.hash(&mut hasher),
50 Value::Int64(i) => i.hash(&mut hasher),
51 Value::Float64(f) => f.to_bits().hash(&mut hasher),
52 Value::String(s) => s.hash(&mut hasher),
53 _ => 0u8.hash(&mut hasher),
54 }
55 hasher.finish()
56}
57
58pub struct DistinctPushOperator {
64 columns: Option<Vec<usize>>,
66 seen: HashSet<RowKey>,
68}
69
70impl DistinctPushOperator {
71 pub fn new() -> Self {
73 Self {
74 columns: None,
75 seen: HashSet::new(),
76 }
77 }
78
79 pub fn on_columns(columns: Vec<usize>) -> Self {
81 Self {
82 columns: Some(columns),
83 seen: HashSet::new(),
84 }
85 }
86
87 pub fn unique_count(&self) -> usize {
89 self.seen.len()
90 }
91}
92
93impl Default for DistinctPushOperator {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl PushOperator for DistinctPushOperator {
100 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
101 if chunk.is_empty() {
102 return Ok(true);
103 }
104
105 let mut new_indices = Vec::new();
107
108 for row in chunk.selected_indices() {
109 let key = match &self.columns {
110 Some(cols) => RowKey::from_row(&chunk, row, cols),
111 None => RowKey::from_all_columns(&chunk, row),
112 };
113
114 if self.seen.insert(key) {
115 new_indices.push(row);
116 }
117 }
118
119 if new_indices.is_empty() {
120 return Ok(true);
121 }
122
123 let selection = SelectionVector::from_predicate(chunk.len(), |i| new_indices.contains(&i));
125 let filtered = chunk.filter(&selection);
126
127 sink.consume(filtered)
128 }
129
130 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
131 Ok(())
133 }
134
135 fn preferred_chunk_size(&self) -> ChunkSizeHint {
136 ChunkSizeHint::Default
137 }
138
139 fn name(&self) -> &'static str {
140 "DistinctPush"
141 }
142}
143
144pub struct DistinctMaterializingOperator {
150 columns: Option<Vec<usize>>,
152 rows: Vec<Vec<Value>>,
154 seen: HashSet<RowKey>,
156 num_columns: Option<usize>,
158}
159
160impl DistinctMaterializingOperator {
161 pub fn new() -> Self {
163 Self {
164 columns: None,
165 rows: Vec::new(),
166 seen: HashSet::new(),
167 num_columns: None,
168 }
169 }
170
171 pub fn on_columns(columns: Vec<usize>) -> Self {
173 Self {
174 columns: Some(columns),
175 rows: Vec::new(),
176 seen: HashSet::new(),
177 num_columns: None,
178 }
179 }
180}
181
182impl Default for DistinctMaterializingOperator {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188impl PushOperator for DistinctMaterializingOperator {
189 fn push(&mut self, chunk: DataChunk, _sink: &mut dyn Sink) -> Result<bool, OperatorError> {
190 if chunk.is_empty() {
191 return Ok(true);
192 }
193
194 if self.num_columns.is_none() {
195 self.num_columns = Some(chunk.column_count());
196 }
197
198 let num_cols = chunk.column_count();
199
200 for row in chunk.selected_indices() {
201 let key = match &self.columns {
202 Some(cols) => RowKey::from_row(&chunk, row, cols),
203 None => RowKey::from_all_columns(&chunk, row),
204 };
205
206 if self.seen.insert(key) {
207 let row_values: Vec<Value> = (0..num_cols)
209 .map(|col| {
210 chunk
211 .column(col)
212 .and_then(|c| c.get_value(row))
213 .unwrap_or(Value::Null)
214 })
215 .collect();
216 self.rows.push(row_values);
217 }
218 }
219
220 Ok(true)
221 }
222
223 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
224 if self.rows.is_empty() {
225 return Ok(());
226 }
227
228 let num_cols = self.num_columns.unwrap_or(0);
229 let mut columns: Vec<ValueVector> = (0..num_cols).map(|_| ValueVector::new()).collect();
230
231 for row in &self.rows {
232 for (col_idx, col) in columns.iter_mut().enumerate() {
233 let val = row.get(col_idx).cloned().unwrap_or(Value::Null);
234 col.push(val);
235 }
236 }
237
238 let chunk = DataChunk::new(columns);
239 sink.consume(chunk)?;
240
241 Ok(())
242 }
243
244 fn preferred_chunk_size(&self) -> ChunkSizeHint {
245 ChunkSizeHint::Default
246 }
247
248 fn name(&self) -> &'static str {
249 "DistinctMaterializing"
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use crate::execution::sink::CollectorSink;
257
258 fn create_test_chunk(values: &[i64]) -> DataChunk {
259 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
260 let vector = ValueVector::from_values(&v);
261 DataChunk::new(vec![vector])
262 }
263
264 #[test]
265 fn test_distinct_all_unique() {
266 let mut distinct = DistinctPushOperator::new();
267 let mut sink = CollectorSink::new();
268
269 distinct
270 .push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
271 .unwrap();
272 distinct.finalize(&mut sink).unwrap();
273
274 assert_eq!(sink.row_count(), 5);
275 assert_eq!(distinct.unique_count(), 5);
276 }
277
278 #[test]
279 fn test_distinct_with_duplicates() {
280 let mut distinct = DistinctPushOperator::new();
281 let mut sink = CollectorSink::new();
282
283 distinct
284 .push(create_test_chunk(&[1, 2, 1, 3, 2, 1, 4]), &mut sink)
285 .unwrap();
286 distinct.finalize(&mut sink).unwrap();
287
288 assert_eq!(sink.row_count(), 4); assert_eq!(distinct.unique_count(), 4);
290 }
291
292 #[test]
293 fn test_distinct_all_same() {
294 let mut distinct = DistinctPushOperator::new();
295 let mut sink = CollectorSink::new();
296
297 distinct
298 .push(create_test_chunk(&[5, 5, 5, 5, 5]), &mut sink)
299 .unwrap();
300 distinct.finalize(&mut sink).unwrap();
301
302 assert_eq!(sink.row_count(), 1);
303 assert_eq!(distinct.unique_count(), 1);
304 }
305
306 #[test]
307 fn test_distinct_multiple_chunks() {
308 let mut distinct = DistinctPushOperator::new();
309 let mut sink = CollectorSink::new();
310
311 distinct
312 .push(create_test_chunk(&[1, 2, 3]), &mut sink)
313 .unwrap();
314 distinct
315 .push(create_test_chunk(&[2, 3, 4]), &mut sink)
316 .unwrap();
317 distinct
318 .push(create_test_chunk(&[3, 4, 5]), &mut sink)
319 .unwrap();
320 distinct.finalize(&mut sink).unwrap();
321
322 assert_eq!(sink.row_count(), 5); }
324
325 #[test]
326 fn test_distinct_materializing() {
327 let mut distinct = DistinctMaterializingOperator::new();
328 let mut sink = CollectorSink::new();
329
330 distinct
331 .push(create_test_chunk(&[3, 1, 4, 1, 5, 9, 2, 6]), &mut sink)
332 .unwrap();
333 distinct.finalize(&mut sink).unwrap();
334
335 let chunks = sink.into_chunks();
337 assert_eq!(chunks.len(), 1);
338 assert_eq!(chunks[0].len(), 7); }
340}