datafusion_functions_aggregate_common/aggregate/count_distinct/
bytes.rs1use arrow::array::{ArrayRef, OffsetSizeTrait};
21use datafusion_common::cast::as_list_array;
22use datafusion_common::utils::SingleRowListArrayBuilder;
23use datafusion_common::ScalarValue;
24use datafusion_expr_common::accumulator::Accumulator;
25use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType};
26use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet;
27use std::fmt::Debug;
28use std::mem::size_of_val;
29
30#[derive(Debug)]
39pub struct BytesDistinctCountAccumulator<O: OffsetSizeTrait>(ArrowBytesSet<O>);
40
41impl<O: OffsetSizeTrait> BytesDistinctCountAccumulator<O> {
42 pub fn new(output_type: OutputType) -> Self {
43 Self(ArrowBytesSet::new(output_type))
44 }
45}
46
47impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
48 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
49 let set = self.0.take();
50 let arr = set.into_state();
51 Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
52 }
53
54 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
55 if values.is_empty() {
56 return Ok(());
57 }
58
59 self.0.insert(&values[0]);
60
61 Ok(())
62 }
63
64 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
65 if states.is_empty() {
66 return Ok(());
67 }
68 assert_eq!(
69 states.len(),
70 1,
71 "count_distinct states must be single array"
72 );
73
74 let arr = as_list_array(&states[0])?;
75 arr.iter().try_for_each(|maybe_list| {
76 if let Some(list) = maybe_list {
77 self.0.insert(&list);
78 };
79 Ok(())
80 })
81 }
82
83 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
84 Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
85 }
86
87 fn size(&self) -> usize {
88 size_of_val(self) + self.0.size()
89 }
90}
91
92#[derive(Debug)]
98pub struct BytesViewDistinctCountAccumulator(ArrowBytesViewSet);
99
100impl BytesViewDistinctCountAccumulator {
101 pub fn new(output_type: OutputType) -> Self {
102 Self(ArrowBytesViewSet::new(output_type))
103 }
104}
105
106impl Accumulator for BytesViewDistinctCountAccumulator {
107 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
108 let set = self.0.take();
109 let arr = set.into_state();
110 Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
111 }
112
113 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
114 if values.is_empty() {
115 return Ok(());
116 }
117
118 self.0.insert(&values[0]);
119
120 Ok(())
121 }
122
123 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
124 if states.is_empty() {
125 return Ok(());
126 }
127 assert_eq!(
128 states.len(),
129 1,
130 "count_distinct states must be single array"
131 );
132
133 let arr = as_list_array(&states[0])?;
134 arr.iter().try_for_each(|maybe_list| {
135 if let Some(list) = maybe_list {
136 self.0.insert(&list);
137 };
138 Ok(())
139 })
140 }
141
142 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
143 Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
144 }
145
146 fn size(&self) -> usize {
147 size_of_val(self) + self.0.size()
148 }
149}