datafusion_functions_aggregate_common/aggregate/count_distinct/
bytes.rs1use arrow::array::{ArrayRef, OffsetSizeTrait};
21use datafusion_common::cast::as_list_array;
22use datafusion_common::utils::SingleRowListArrayBuilder;
23use datafusion_common::ScalarValue;
24use datafusion_expr_common::accumulator::Accumulator;
25use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType};
26use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet;
27use std::fmt::Debug;
28use std::mem::size_of_val;
29
30#[derive(Debug)]
39pub struct BytesDistinctCountAccumulator<O: OffsetSizeTrait>(ArrowBytesSet<O>);
40
41impl<O: OffsetSizeTrait> BytesDistinctCountAccumulator<O> {
42    pub fn new(output_type: OutputType) -> Self {
43        Self(ArrowBytesSet::new(output_type))
44    }
45}
46
47impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
48    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
49        let set = self.0.take();
50        let arr = set.into_state();
51        Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
52    }
53
54    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
55        if values.is_empty() {
56            return Ok(());
57        }
58
59        self.0.insert(&values[0]);
60
61        Ok(())
62    }
63
64    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
65        if states.is_empty() {
66            return Ok(());
67        }
68        assert_eq!(
69            states.len(),
70            1,
71            "count_distinct states must be single array"
72        );
73
74        let arr = as_list_array(&states[0])?;
75        arr.iter().try_for_each(|maybe_list| {
76            if let Some(list) = maybe_list {
77                self.0.insert(&list);
78            };
79            Ok(())
80        })
81    }
82
83    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
84        Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
85    }
86
87    fn size(&self) -> usize {
88        size_of_val(self) + self.0.size()
89    }
90}
91
92#[derive(Debug)]
98pub struct BytesViewDistinctCountAccumulator(ArrowBytesViewSet);
99
100impl BytesViewDistinctCountAccumulator {
101    pub fn new(output_type: OutputType) -> Self {
102        Self(ArrowBytesViewSet::new(output_type))
103    }
104}
105
106impl Accumulator for BytesViewDistinctCountAccumulator {
107    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
108        let set = self.0.take();
109        let arr = set.into_state();
110        Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
111    }
112
113    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
114        if values.is_empty() {
115            return Ok(());
116        }
117
118        self.0.insert(&values[0]);
119
120        Ok(())
121    }
122
123    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
124        if states.is_empty() {
125            return Ok(());
126        }
127        assert_eq!(
128            states.len(),
129            1,
130            "count_distinct states must be single array"
131        );
132
133        let arr = as_list_array(&states[0])?;
134        arr.iter().try_for_each(|maybe_list| {
135            if let Some(list) = maybe_list {
136                self.0.insert(&list);
137            };
138            Ok(())
139        })
140    }
141
142    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
143        Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
144    }
145
146    fn size(&self) -> usize {
147        size_of_val(self) + self.0.size()
148    }
149}