datafusion_functions_aggregate_common/aggregate/count_distinct/
dict.rs1use arrow::array::{ArrayRef, BooleanArray};
19use arrow::downcast_dictionary_array;
20use datafusion_common::{arrow_datafusion_err, ScalarValue};
21use datafusion_common::{internal_err, DataFusionError};
22use datafusion_expr_common::accumulator::Accumulator;
23
24#[derive(Debug)]
25pub struct DictionaryCountAccumulator {
26    inner: Box<dyn Accumulator>,
27}
28
29impl DictionaryCountAccumulator {
30    pub fn new(inner: Box<dyn Accumulator>) -> Self {
31        Self { inner }
32    }
33}
34
35impl Accumulator for DictionaryCountAccumulator {
36    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
37        let values: Vec<_> = values
38            .iter()
39            .map(|dict| {
40                downcast_dictionary_array! {
41                    dict => {
42                        let buff: BooleanArray = dict.occupancy().into();
43                        arrow::compute::filter(
44                            dict.values(),
45                            &buff
46                        ).map_err(|e| arrow_datafusion_err!(e))
47                    },
48                    _ => internal_err!("DictionaryCountAccumulator only supports dictionary arrays")
49                }
50            })
51            .collect::<Result<Vec<_>, _>>()?;
52        self.inner.update_batch(values.as_slice())
53    }
54
55    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
56        self.inner.evaluate()
57    }
58
59    fn size(&self) -> usize {
60        self.inner.size()
61    }
62
63    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
64        self.inner.state()
65    }
66
67    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
68        self.inner.merge_batch(states)
69    }
70}