datafusion_functions_aggregate_common/aggregate/sum_distinct/
numeric.rs1use std::fmt::Debug;
21use std::mem::size_of_val;
22
23use arrow::array::ArrayRef;
24use arrow::array::ArrowNativeTypeOp;
25use arrow::array::ArrowPrimitiveType;
26use arrow::datatypes::ArrowNativeType;
27use arrow::datatypes::DataType;
28
29use datafusion_common::Result;
30use datafusion_common::ScalarValue;
31use datafusion_expr_common::accumulator::Accumulator;
32
33use crate::utils::GenericDistinctBuffer;
34
35#[derive(Debug)]
37pub struct DistinctSumAccumulator<T: ArrowPrimitiveType> {
38 values: GenericDistinctBuffer<T>,
39 data_type: DataType,
40}
41
42impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> {
43 pub fn new(data_type: &DataType) -> Self {
44 Self {
45 values: GenericDistinctBuffer::new(data_type.clone()),
46 data_type: data_type.clone(),
47 }
48 }
49
50 pub fn distinct_count(&self) -> usize {
51 self.values.values.len()
52 }
53}
54
55impl<T: ArrowPrimitiveType + Debug> Accumulator for DistinctSumAccumulator<T> {
56 fn state(&mut self) -> Result<Vec<ScalarValue>> {
57 self.values.state()
58 }
59
60 fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
61 self.values.update_batch(values)
62 }
63
64 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
65 self.values.merge_batch(states)
66 }
67
68 fn evaluate(&mut self) -> Result<ScalarValue> {
69 if self.distinct_count() == 0 {
70 ScalarValue::new_primitive::<T>(None, &self.data_type)
71 } else {
72 let mut acc = T::Native::usize_as(0);
73 for distinct_value in self.values.values.iter() {
74 acc = acc.add_wrapping(distinct_value.0)
75 }
76 ScalarValue::new_primitive::<T>(Some(acc), &self.data_type)
77 }
78 }
79
80 fn size(&self) -> usize {
81 size_of_val(self) + self.values.size()
82 }
83}