Skip to main content

datafusion_functions_aggregate_common/aggregate/count_distinct/
native.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Specialized implementation of `COUNT DISTINCT` for "Native" arrays such as
19//! [`Int64Array`] and [`Float64Array`]
20//!
21//! [`Int64Array`]: arrow::array::Int64Array
22//! [`Float64Array`]: arrow::array::Float64Array
23use std::collections::HashSet;
24use std::fmt::Debug;
25use std::hash::Hash;
26use std::mem::size_of_val;
27use std::sync::Arc;
28
29use arrow::array::ArrayRef;
30use arrow::array::PrimitiveArray;
31use arrow::array::types::ArrowPrimitiveType;
32use arrow::datatypes::DataType;
33use datafusion_common::hash_utils::RandomState;
34
35use datafusion_common::ScalarValue;
36use datafusion_common::cast::{as_list_array, as_primitive_array};
37use datafusion_common::utils::SingleRowListArrayBuilder;
38use datafusion_common::utils::memory::estimate_memory_size;
39use datafusion_expr_common::accumulator::Accumulator;
40
41use crate::utils::GenericDistinctBuffer;
42
43#[derive(Debug)]
44pub struct PrimitiveDistinctCountAccumulator<T>
45where
46    T: ArrowPrimitiveType + Send,
47    T::Native: Eq + Hash,
48{
49    values: HashSet<T::Native, RandomState>,
50    data_type: DataType,
51}
52
53impl<T> PrimitiveDistinctCountAccumulator<T>
54where
55    T: ArrowPrimitiveType + Send,
56    T::Native: Eq + Hash,
57{
58    pub fn new(data_type: &DataType) -> Self {
59        Self {
60            values: HashSet::default(),
61            data_type: data_type.clone(),
62        }
63    }
64}
65
66impl<T> Accumulator for PrimitiveDistinctCountAccumulator<T>
67where
68    T: ArrowPrimitiveType + Send + Debug,
69    T::Native: Eq + Hash,
70{
71    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
72        let arr = Arc::new(
73            PrimitiveArray::<T>::from_iter_values(self.values.iter().cloned())
74                .with_data_type(self.data_type.clone()),
75        );
76        Ok(vec![
77            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
78        ])
79    }
80
81    #[inline(never)]
82    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
83        if values.is_empty() {
84            return Ok(());
85        }
86
87        let arr = as_primitive_array::<T>(&values[0])?;
88        arr.iter().for_each(|value| {
89            if let Some(value) = value {
90                self.values.insert(value);
91            }
92        });
93
94        Ok(())
95    }
96
97    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
98        if states.is_empty() {
99            return Ok(());
100        }
101        assert_eq!(
102            states.len(),
103            1,
104            "count_distinct states must be single array"
105        );
106
107        let arr = as_list_array(&states[0])?;
108        arr.iter().try_for_each(|maybe_list| {
109            if let Some(list) = maybe_list {
110                let list = as_primitive_array::<T>(&list)?;
111                self.values.extend(list.values())
112            };
113            Ok(())
114        })
115    }
116
117    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
118        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
119    }
120
121    fn size(&self) -> usize {
122        let num_elements = self.values.len();
123        let fixed_size = size_of_val(self) + size_of_val(&self.values);
124
125        estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
126    }
127}
128
129#[derive(Debug)]
130pub struct FloatDistinctCountAccumulator<T: ArrowPrimitiveType> {
131    values: GenericDistinctBuffer<T>,
132}
133
134impl<T: ArrowPrimitiveType> FloatDistinctCountAccumulator<T> {
135    pub fn new() -> Self {
136        Self {
137            values: GenericDistinctBuffer::new(T::DATA_TYPE),
138        }
139    }
140}
141
142impl<T: ArrowPrimitiveType> Default for FloatDistinctCountAccumulator<T> {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulator<T> {
149    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
150        self.values.state()
151    }
152
153    #[inline(never)]
154    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
155        self.values.update_batch(values)
156    }
157
158    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
159        self.values.merge_batch(states)
160    }
161
162    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
163        Ok(ScalarValue::Int64(Some(self.values.values.len() as i64)))
164    }
165
166    fn size(&self) -> usize {
167        size_of_val(self) + self.values.size()
168    }
169}
170
171/// Optimized COUNT DISTINCT accumulator for u8 using a bool array.
172/// Uses 256 bytes to track all possible u8 values.
173#[derive(Debug)]
174pub struct BoolArray256DistinctCountAccumulator {
175    seen: [bool; 256],
176}
177
178impl BoolArray256DistinctCountAccumulator {
179    pub fn new() -> Self {
180        Self { seen: [false; 256] }
181    }
182
183    #[inline]
184    fn count(&self) -> i64 {
185        self.seen.iter().filter(|&&b| b).count() as i64
186    }
187}
188
189impl Default for BoolArray256DistinctCountAccumulator {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195impl Accumulator for BoolArray256DistinctCountAccumulator {
196    #[inline(never)]
197    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
198        if values.is_empty() {
199            return Ok(());
200        }
201
202        let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?;
203        for value in arr.iter().flatten() {
204            self.seen[value as usize] = true;
205        }
206        Ok(())
207    }
208
209    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
210        if states.is_empty() {
211            return Ok(());
212        }
213
214        let arr = as_list_array(&states[0])?;
215        arr.iter().try_for_each(|maybe_list| {
216            if let Some(list) = maybe_list {
217                let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?;
218                for value in list.values().iter() {
219                    self.seen[*value as usize] = true;
220                }
221            };
222            Ok(())
223        })
224    }
225
226    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
227        let values: Vec<u8> = self
228            .seen
229            .iter()
230            .enumerate()
231            .filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None })
232            .collect();
233
234        let arr = Arc::new(
235            PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(values),
236        );
237        Ok(vec![
238            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
239        ])
240    }
241
242    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
243        Ok(ScalarValue::Int64(Some(self.count())))
244    }
245
246    fn size(&self) -> usize {
247        size_of_val(self) + 256
248    }
249}
250
251/// Optimized COUNT DISTINCT accumulator for i8 using a bool array.
252/// Uses 256 bytes to track all possible i8 values (mapped to 0..255).
253#[derive(Debug)]
254pub struct BoolArray256DistinctCountAccumulatorI8 {
255    seen: [bool; 256],
256}
257
258impl BoolArray256DistinctCountAccumulatorI8 {
259    pub fn new() -> Self {
260        Self { seen: [false; 256] }
261    }
262
263    #[inline]
264    fn count(&self) -> i64 {
265        self.seen.iter().filter(|&&b| b).count() as i64
266    }
267}
268
269impl Default for BoolArray256DistinctCountAccumulatorI8 {
270    fn default() -> Self {
271        Self::new()
272    }
273}
274
275impl Accumulator for BoolArray256DistinctCountAccumulatorI8 {
276    #[inline(never)]
277    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
278        if values.is_empty() {
279            return Ok(());
280        }
281
282        let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?;
283        for value in arr.iter().flatten() {
284            self.seen[value as u8 as usize] = true;
285        }
286        Ok(())
287    }
288
289    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
290        if states.is_empty() {
291            return Ok(());
292        }
293
294        let arr = as_list_array(&states[0])?;
295        arr.iter().try_for_each(|maybe_list| {
296            if let Some(list) = maybe_list {
297                let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?;
298                for value in list.values().iter() {
299                    self.seen[*value as u8 as usize] = true;
300                }
301            };
302            Ok(())
303        })
304    }
305
306    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
307        let values: Vec<i8> = self
308            .seen
309            .iter()
310            .enumerate()
311            .filter_map(
312                |(idx, &seen)| {
313                    if seen { Some(idx as u8 as i8) } else { None }
314                },
315            )
316            .collect();
317
318        let arr = Arc::new(
319            PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(values),
320        );
321        Ok(vec![
322            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
323        ])
324    }
325
326    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
327        Ok(ScalarValue::Int64(Some(self.count())))
328    }
329
330    fn size(&self) -> usize {
331        size_of_val(self) + 256
332    }
333}
334
335/// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap.
336/// Uses 8KB (1024 x u64) to track all possible u16 values.
337#[derive(Debug)]
338pub struct Bitmap65536DistinctCountAccumulator {
339    bitmap: Box<[u64; 1024]>,
340}
341
342impl Bitmap65536DistinctCountAccumulator {
343    pub fn new() -> Self {
344        Self {
345            bitmap: Box::new([0; 1024]),
346        }
347    }
348
349    #[inline]
350    fn set_bit(&mut self, value: u16) {
351        let word = (value / 64) as usize;
352        let bit = value % 64;
353        self.bitmap[word] |= 1u64 << bit;
354    }
355
356    #[inline]
357    fn count(&self) -> i64 {
358        self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
359    }
360}
361
362impl Default for Bitmap65536DistinctCountAccumulator {
363    fn default() -> Self {
364        Self::new()
365    }
366}
367
368impl Accumulator for Bitmap65536DistinctCountAccumulator {
369    #[inline(never)]
370    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
371        if values.is_empty() {
372            return Ok(());
373        }
374
375        let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?;
376        for value in arr.iter().flatten() {
377            self.set_bit(value);
378        }
379        Ok(())
380    }
381
382    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
383        if states.is_empty() {
384            return Ok(());
385        }
386
387        let arr = as_list_array(&states[0])?;
388        arr.iter().try_for_each(|maybe_list| {
389            if let Some(list) = maybe_list {
390                let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?;
391                for value in list.values().iter() {
392                    self.set_bit(*value);
393                }
394            };
395            Ok(())
396        })
397    }
398
399    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
400        let mut values = Vec::new();
401        for (word_idx, &word) in self.bitmap.iter().enumerate() {
402            if word != 0 {
403                for bit in 0..64 {
404                    if (word & (1u64 << bit)) != 0 {
405                        values.push((word_idx as u16) * 64 + bit);
406                    }
407                }
408            }
409        }
410
411        let arr = Arc::new(
412            PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(values),
413        );
414        Ok(vec![
415            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
416        ])
417    }
418
419    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
420        Ok(ScalarValue::Int64(Some(self.count())))
421    }
422
423    fn size(&self) -> usize {
424        size_of_val(self) + 8192
425    }
426}
427
428/// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap.
429/// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535).
430#[derive(Debug)]
431pub struct Bitmap65536DistinctCountAccumulatorI16 {
432    bitmap: Box<[u64; 1024]>,
433}
434
435impl Bitmap65536DistinctCountAccumulatorI16 {
436    pub fn new() -> Self {
437        Self {
438            bitmap: Box::new([0; 1024]),
439        }
440    }
441
442    #[inline]
443    fn set_bit(&mut self, value: i16) {
444        let idx = value as u16;
445        let word = (idx / 64) as usize;
446        let bit = idx % 64;
447        self.bitmap[word] |= 1u64 << bit;
448    }
449
450    #[inline]
451    fn count(&self) -> i64 {
452        self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
453    }
454}
455
456impl Default for Bitmap65536DistinctCountAccumulatorI16 {
457    fn default() -> Self {
458        Self::new()
459    }
460}
461
462impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 {
463    #[inline(never)]
464    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
465        if values.is_empty() {
466            return Ok(());
467        }
468
469        let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?;
470        for value in arr.iter().flatten() {
471            self.set_bit(value);
472        }
473        Ok(())
474    }
475
476    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
477        if states.is_empty() {
478            return Ok(());
479        }
480
481        let arr = as_list_array(&states[0])?;
482        arr.iter().try_for_each(|maybe_list| {
483            if let Some(list) = maybe_list {
484                let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?;
485                for value in list.values().iter() {
486                    self.set_bit(*value);
487                }
488            };
489            Ok(())
490        })
491    }
492
493    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
494        let mut values = Vec::new();
495        for (word_idx, &word) in self.bitmap.iter().enumerate() {
496            if word != 0 {
497                for bit in 0..64 {
498                    if (word & (1u64 << bit)) != 0 {
499                        values.push(((word_idx as u16) * 64 + bit) as i16);
500                    }
501                }
502            }
503        }
504
505        let arr = Arc::new(
506            PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(values),
507        );
508        Ok(vec![
509            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
510        ])
511    }
512
513    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
514        Ok(ScalarValue::Int64(Some(self.count())))
515    }
516
517    fn size(&self) -> usize {
518        size_of_val(self) + 8192
519    }
520}