datafusion_functions_aggregate_common/aggregate/count_distinct/
bytes.rs1use arrow::array::{ArrayRef, OffsetSizeTrait};
21use datafusion_common::ScalarValue;
22use datafusion_common::cast::as_list_array;
23use datafusion_common::utils::SingleRowListArrayBuilder;
24use datafusion_expr_common::accumulator::Accumulator;
25use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType};
26use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet;
27use std::fmt::Debug;
28use std::mem::size_of_val;
29
30#[derive(Debug)]
39pub struct BytesDistinctCountAccumulator<O: OffsetSizeTrait>(ArrowBytesSet<O>);
40
41impl<O: OffsetSizeTrait> BytesDistinctCountAccumulator<O> {
42 pub fn new(output_type: OutputType) -> Self {
43 Self(ArrowBytesSet::new(output_type))
44 }
45}
46
47impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
48 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
49 let set = self.0.take();
50 let arr = set.into_state();
51 Ok(vec![
52 SingleRowListArrayBuilder::new(arr).build_list_scalar(),
53 ])
54 }
55
56 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
57 if values.is_empty() {
58 return Ok(());
59 }
60
61 self.0.insert(&values[0]);
62
63 Ok(())
64 }
65
66 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
67 if states.is_empty() {
68 return Ok(());
69 }
70 assert_eq!(
71 states.len(),
72 1,
73 "count_distinct states must be single array"
74 );
75
76 let arr = as_list_array(&states[0])?;
77 arr.iter().try_for_each(|maybe_list| {
78 if let Some(list) = maybe_list {
79 self.0.insert(&list);
80 };
81 Ok(())
82 })
83 }
84
85 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
86 Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
87 }
88
89 fn size(&self) -> usize {
90 size_of_val(self) + self.0.size()
91 }
92}
93
94#[derive(Debug)]
100pub struct BytesViewDistinctCountAccumulator(ArrowBytesViewSet);
101
102impl BytesViewDistinctCountAccumulator {
103 pub fn new(output_type: OutputType) -> Self {
104 Self(ArrowBytesViewSet::new(output_type))
105 }
106}
107
108impl Accumulator for BytesViewDistinctCountAccumulator {
109 fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
110 let set = self.0.take();
111 let arr = set.into_state();
112 Ok(vec![
113 SingleRowListArrayBuilder::new(arr).build_list_scalar(),
114 ])
115 }
116
117 fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
118 if values.is_empty() {
119 return Ok(());
120 }
121
122 self.0.insert(&values[0]);
123
124 Ok(())
125 }
126
127 fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
128 if states.is_empty() {
129 return Ok(());
130 }
131 assert_eq!(
132 states.len(),
133 1,
134 "count_distinct states must be single array"
135 );
136
137 let arr = as_list_array(&states[0])?;
138 arr.iter().try_for_each(|maybe_list| {
139 if let Some(list) = maybe_list {
140 self.0.insert(&list);
141 };
142 Ok(())
143 })
144 }
145
146 fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
147 Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
148 }
149
150 fn size(&self) -> usize {
151 size_of_val(self) + self.0.size()
152 }
153}