datafusion_functions_aggregate_common/aggregate/count_distinct/
native.rs1use std::collections::HashSet;
24use std::fmt::Debug;
25use std::hash::Hash;
26use std::mem::size_of_val;
27use std::sync::Arc;
28
29use arrow::array::ArrayRef;
30use arrow::array::PrimitiveArray;
31use arrow::array::types::ArrowPrimitiveType;
32use arrow::datatypes::DataType;
33use datafusion_common::hash_utils::RandomState;
34
35use datafusion_common::ScalarValue;
36use datafusion_common::cast::{as_list_array, as_primitive_array};
37use datafusion_common::utils::SingleRowListArrayBuilder;
38use datafusion_common::utils::memory::estimate_memory_size;
39use datafusion_expr_common::accumulator::Accumulator;
40
41use crate::utils::GenericDistinctBuffer;
42
43#[derive(Debug)]
44pub struct PrimitiveDistinctCountAccumulator<T>
45where
46 T: ArrowPrimitiveType + Send,
47 T::Native: Eq + Hash,
48{
49 values: HashSet<T::Native, RandomState>,
50 data_type: DataType,
51}
52
53impl<T> PrimitiveDistinctCountAccumulator<T>
54where
55 T: ArrowPrimitiveType + Send,
56 T::Native: Eq + Hash,
57{
58 pub fn new(data_type: &DataType) -> Self {
59 Self {
60 values: HashSet::default(),
61 data_type: data_type.clone(),
62 }
63 }
64}
65
66impl<T> Accumulator for PrimitiveDistinctCountAccumulator<T>
67where
68 T: ArrowPrimitiveType + Send + Debug,
69 T::Native: Eq + Hash,
70{
71 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
72 let arr = Arc::new(
73 PrimitiveArray::<T>::from_iter_values(self.values.iter().cloned())
74 .with_data_type(self.data_type.clone()),
75 );
76 Ok(vec![
77 SingleRowListArrayBuilder::new(arr).build_list_scalar(),
78 ])
79 }
80
81 #[inline(never)]
82 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
83 if values.is_empty() {
84 return Ok(());
85 }
86
87 let arr = as_primitive_array::<T>(&values[0])?;
88 arr.iter().for_each(|value| {
89 if let Some(value) = value {
90 self.values.insert(value);
91 }
92 });
93
94 Ok(())
95 }
96
97 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
98 if states.is_empty() {
99 return Ok(());
100 }
101 assert_eq!(
102 states.len(),
103 1,
104 "count_distinct states must be single array"
105 );
106
107 let arr = as_list_array(&states[0])?;
108 arr.iter().try_for_each(|maybe_list| {
109 if let Some(list) = maybe_list {
110 let list = as_primitive_array::<T>(&list)?;
111 self.values.extend(list.values())
112 };
113 Ok(())
114 })
115 }
116
117 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
118 Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
119 }
120
121 fn size(&self) -> usize {
122 let num_elements = self.values.len();
123 let fixed_size = size_of_val(self) + size_of_val(&self.values);
124
125 estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
126 }
127}
128
129#[derive(Debug)]
130pub struct FloatDistinctCountAccumulator<T: ArrowPrimitiveType> {
131 values: GenericDistinctBuffer<T>,
132}
133
134impl<T: ArrowPrimitiveType> FloatDistinctCountAccumulator<T> {
135 pub fn new() -> Self {
136 Self {
137 values: GenericDistinctBuffer::new(T::DATA_TYPE),
138 }
139 }
140}
141
142impl<T: ArrowPrimitiveType> Default for FloatDistinctCountAccumulator<T> {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulator<T> {
149 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
150 self.values.state()
151 }
152
153 #[inline(never)]
154 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
155 self.values.update_batch(values)
156 }
157
158 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
159 self.values.merge_batch(states)
160 }
161
162 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
163 Ok(ScalarValue::Int64(Some(self.values.values.len() as i64)))
164 }
165
166 fn size(&self) -> usize {
167 size_of_val(self) + self.values.size()
168 }
169}
170
171#[derive(Debug)]
174pub struct BoolArray256DistinctCountAccumulator {
175 seen: [bool; 256],
176}
177
178impl BoolArray256DistinctCountAccumulator {
179 pub fn new() -> Self {
180 Self { seen: [false; 256] }
181 }
182
183 #[inline]
184 fn count(&self) -> i64 {
185 self.seen.iter().filter(|&&b| b).count() as i64
186 }
187}
188
189impl Default for BoolArray256DistinctCountAccumulator {
190 fn default() -> Self {
191 Self::new()
192 }
193}
194
195impl Accumulator for BoolArray256DistinctCountAccumulator {
196 #[inline(never)]
197 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
198 if values.is_empty() {
199 return Ok(());
200 }
201
202 let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?;
203 for value in arr.iter().flatten() {
204 self.seen[value as usize] = true;
205 }
206 Ok(())
207 }
208
209 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
210 if states.is_empty() {
211 return Ok(());
212 }
213
214 let arr = as_list_array(&states[0])?;
215 arr.iter().try_for_each(|maybe_list| {
216 if let Some(list) = maybe_list {
217 let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?;
218 for value in list.values().iter() {
219 self.seen[*value as usize] = true;
220 }
221 };
222 Ok(())
223 })
224 }
225
226 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
227 let values: Vec<u8> = self
228 .seen
229 .iter()
230 .enumerate()
231 .filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None })
232 .collect();
233
234 let arr = Arc::new(
235 PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(values),
236 );
237 Ok(vec![
238 SingleRowListArrayBuilder::new(arr).build_list_scalar(),
239 ])
240 }
241
242 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
243 Ok(ScalarValue::Int64(Some(self.count())))
244 }
245
246 fn size(&self) -> usize {
247 size_of_val(self) + 256
248 }
249}
250
251#[derive(Debug)]
254pub struct BoolArray256DistinctCountAccumulatorI8 {
255 seen: [bool; 256],
256}
257
258impl BoolArray256DistinctCountAccumulatorI8 {
259 pub fn new() -> Self {
260 Self { seen: [false; 256] }
261 }
262
263 #[inline]
264 fn count(&self) -> i64 {
265 self.seen.iter().filter(|&&b| b).count() as i64
266 }
267}
268
269impl Default for BoolArray256DistinctCountAccumulatorI8 {
270 fn default() -> Self {
271 Self::new()
272 }
273}
274
275impl Accumulator for BoolArray256DistinctCountAccumulatorI8 {
276 #[inline(never)]
277 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
278 if values.is_empty() {
279 return Ok(());
280 }
281
282 let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?;
283 for value in arr.iter().flatten() {
284 self.seen[value as u8 as usize] = true;
285 }
286 Ok(())
287 }
288
289 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
290 if states.is_empty() {
291 return Ok(());
292 }
293
294 let arr = as_list_array(&states[0])?;
295 arr.iter().try_for_each(|maybe_list| {
296 if let Some(list) = maybe_list {
297 let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?;
298 for value in list.values().iter() {
299 self.seen[*value as u8 as usize] = true;
300 }
301 };
302 Ok(())
303 })
304 }
305
306 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
307 let values: Vec<i8> = self
308 .seen
309 .iter()
310 .enumerate()
311 .filter_map(
312 |(idx, &seen)| {
313 if seen { Some(idx as u8 as i8) } else { None }
314 },
315 )
316 .collect();
317
318 let arr = Arc::new(
319 PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(values),
320 );
321 Ok(vec![
322 SingleRowListArrayBuilder::new(arr).build_list_scalar(),
323 ])
324 }
325
326 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
327 Ok(ScalarValue::Int64(Some(self.count())))
328 }
329
330 fn size(&self) -> usize {
331 size_of_val(self) + 256
332 }
333}
334
335#[derive(Debug)]
338pub struct Bitmap65536DistinctCountAccumulator {
339 bitmap: Box<[u64; 1024]>,
340}
341
342impl Bitmap65536DistinctCountAccumulator {
343 pub fn new() -> Self {
344 Self {
345 bitmap: Box::new([0; 1024]),
346 }
347 }
348
349 #[inline]
350 fn set_bit(&mut self, value: u16) {
351 let word = (value / 64) as usize;
352 let bit = value % 64;
353 self.bitmap[word] |= 1u64 << bit;
354 }
355
356 #[inline]
357 fn count(&self) -> i64 {
358 self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
359 }
360}
361
362impl Default for Bitmap65536DistinctCountAccumulator {
363 fn default() -> Self {
364 Self::new()
365 }
366}
367
368impl Accumulator for Bitmap65536DistinctCountAccumulator {
369 #[inline(never)]
370 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
371 if values.is_empty() {
372 return Ok(());
373 }
374
375 let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?;
376 for value in arr.iter().flatten() {
377 self.set_bit(value);
378 }
379 Ok(())
380 }
381
382 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
383 if states.is_empty() {
384 return Ok(());
385 }
386
387 let arr = as_list_array(&states[0])?;
388 arr.iter().try_for_each(|maybe_list| {
389 if let Some(list) = maybe_list {
390 let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?;
391 for value in list.values().iter() {
392 self.set_bit(*value);
393 }
394 };
395 Ok(())
396 })
397 }
398
399 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
400 let mut values = Vec::new();
401 for (word_idx, &word) in self.bitmap.iter().enumerate() {
402 if word != 0 {
403 for bit in 0..64 {
404 if (word & (1u64 << bit)) != 0 {
405 values.push((word_idx as u16) * 64 + bit);
406 }
407 }
408 }
409 }
410
411 let arr = Arc::new(
412 PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(values),
413 );
414 Ok(vec![
415 SingleRowListArrayBuilder::new(arr).build_list_scalar(),
416 ])
417 }
418
419 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
420 Ok(ScalarValue::Int64(Some(self.count())))
421 }
422
423 fn size(&self) -> usize {
424 size_of_val(self) + 8192
425 }
426}
427
428#[derive(Debug)]
431pub struct Bitmap65536DistinctCountAccumulatorI16 {
432 bitmap: Box<[u64; 1024]>,
433}
434
435impl Bitmap65536DistinctCountAccumulatorI16 {
436 pub fn new() -> Self {
437 Self {
438 bitmap: Box::new([0; 1024]),
439 }
440 }
441
442 #[inline]
443 fn set_bit(&mut self, value: i16) {
444 let idx = value as u16;
445 let word = (idx / 64) as usize;
446 let bit = idx % 64;
447 self.bitmap[word] |= 1u64 << bit;
448 }
449
450 #[inline]
451 fn count(&self) -> i64 {
452 self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
453 }
454}
455
456impl Default for Bitmap65536DistinctCountAccumulatorI16 {
457 fn default() -> Self {
458 Self::new()
459 }
460}
461
462impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 {
463 #[inline(never)]
464 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
465 if values.is_empty() {
466 return Ok(());
467 }
468
469 let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?;
470 for value in arr.iter().flatten() {
471 self.set_bit(value);
472 }
473 Ok(())
474 }
475
476 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
477 if states.is_empty() {
478 return Ok(());
479 }
480
481 let arr = as_list_array(&states[0])?;
482 arr.iter().try_for_each(|maybe_list| {
483 if let Some(list) = maybe_list {
484 let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?;
485 for value in list.values().iter() {
486 self.set_bit(*value);
487 }
488 };
489 Ok(())
490 })
491 }
492
493 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
494 let mut values = Vec::new();
495 for (word_idx, &word) in self.bitmap.iter().enumerate() {
496 if word != 0 {
497 for bit in 0..64 {
498 if (word & (1u64 << bit)) != 0 {
499 values.push(((word_idx as u16) * 64 + bit) as i16);
500 }
501 }
502 }
503 }
504
505 let arr = Arc::new(
506 PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(values),
507 );
508 Ok(vec![
509 SingleRowListArrayBuilder::new(arr).build_list_scalar(),
510 ])
511 }
512
513 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
514 Ok(ScalarValue::Int64(Some(self.count())))
515 }
516
517 fn size(&self) -> usize {
518 size_of_val(self) + 8192
519 }
520}