datafusion_functions_aggregate_common/aggregate/sum_distinct/
numeric.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines the accumulator for `SUM DISTINCT` for primitive numeric types
19
20use std::fmt::Debug;
21use std::mem::size_of_val;
22
23use arrow::array::ArrayRef;
24use arrow::array::ArrowNativeTypeOp;
25use arrow::array::ArrowPrimitiveType;
26use arrow::datatypes::ArrowNativeType;
27use arrow::datatypes::DataType;
28
29use datafusion_common::Result;
30use datafusion_common::ScalarValue;
31use datafusion_expr_common::accumulator::Accumulator;
32
33use crate::utils::GenericDistinctBuffer;
34
35/// Accumulator for computing SUM(DISTINCT expr)
36#[derive(Debug)]
37pub struct DistinctSumAccumulator<T: ArrowPrimitiveType> {
38    values: GenericDistinctBuffer<T>,
39    data_type: DataType,
40}
41
42impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> {
43    pub fn new(data_type: &DataType) -> Self {
44        Self {
45            values: GenericDistinctBuffer::new(data_type.clone()),
46            data_type: data_type.clone(),
47        }
48    }
49
50    pub fn distinct_count(&self) -> usize {
51        self.values.values.len()
52    }
53}
54
55impl<T: ArrowPrimitiveType + Debug> Accumulator for DistinctSumAccumulator<T> {
56    fn state(&mut self) -> Result<Vec<ScalarValue>> {
57        self.values.state()
58    }
59
60    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
61        self.values.update_batch(values)
62    }
63
64    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
65        self.values.merge_batch(states)
66    }
67
68    fn evaluate(&mut self) -> Result<ScalarValue> {
69        if self.distinct_count() == 0 {
70            ScalarValue::new_primitive::<T>(None, &self.data_type)
71        } else {
72            let mut acc = T::Native::usize_as(0);
73            for distinct_value in self.values.values.iter() {
74                acc = acc.add_wrapping(distinct_value.0)
75            }
76            ScalarValue::new_primitive::<T>(Some(acc), &self.data_type)
77        }
78    }
79
80    fn size(&self) -> usize {
81        size_of_val(self) + self.values.size()
82    }
83}