datafusion_functions_aggregate_common/aggregate/count_distinct/
bytes.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`BytesDistinctCountAccumulator`] for Utf8/LargeUtf8/Binary/LargeBinary values
19
20use arrow::array::{ArrayRef, OffsetSizeTrait};
21use datafusion_common::ScalarValue;
22use datafusion_common::cast::as_list_array;
23use datafusion_common::utils::SingleRowListArrayBuilder;
24use datafusion_expr_common::accumulator::Accumulator;
25use datafusion_physical_expr_common::binary_map::{ArrowBytesSet, OutputType};
26use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewSet;
27use std::fmt::Debug;
28use std::mem::size_of_val;
29
30/// Specialized implementation of
31/// `COUNT DISTINCT` for [`StringArray`] [`LargeStringArray`],
32/// [`BinaryArray`] and [`LargeBinaryArray`].
33///
34/// [`StringArray`]: arrow::array::StringArray
35/// [`LargeStringArray`]: arrow::array::LargeStringArray
36/// [`BinaryArray`]: arrow::array::BinaryArray
37/// [`LargeBinaryArray`]: arrow::array::LargeBinaryArray
38#[derive(Debug)]
39pub struct BytesDistinctCountAccumulator<O: OffsetSizeTrait>(ArrowBytesSet<O>);
40
41impl<O: OffsetSizeTrait> BytesDistinctCountAccumulator<O> {
42    pub fn new(output_type: OutputType) -> Self {
43        Self(ArrowBytesSet::new(output_type))
44    }
45}
46
47impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
48    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
49        let set = self.0.take();
50        let arr = set.into_state();
51        Ok(vec![
52            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
53        ])
54    }
55
56    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
57        if values.is_empty() {
58            return Ok(());
59        }
60
61        self.0.insert(&values[0]);
62
63        Ok(())
64    }
65
66    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
67        if states.is_empty() {
68            return Ok(());
69        }
70        assert_eq!(
71            states.len(),
72            1,
73            "count_distinct states must be single array"
74        );
75
76        let arr = as_list_array(&states[0])?;
77        arr.iter().try_for_each(|maybe_list| {
78            if let Some(list) = maybe_list {
79                self.0.insert(&list);
80            };
81            Ok(())
82        })
83    }
84
85    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
86        Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
87    }
88
89    fn size(&self) -> usize {
90        size_of_val(self) + self.0.size()
91    }
92}
93
94/// Specialized implementation of
95/// `COUNT DISTINCT` for [`StringViewArray`] and [`BinaryViewArray`].
96///
97/// [`StringViewArray`]: arrow::array::StringViewArray
98/// [`BinaryViewArray`]: arrow::array::BinaryViewArray
99#[derive(Debug)]
100pub struct BytesViewDistinctCountAccumulator(ArrowBytesViewSet);
101
102impl BytesViewDistinctCountAccumulator {
103    pub fn new(output_type: OutputType) -> Self {
104        Self(ArrowBytesViewSet::new(output_type))
105    }
106}
107
108impl Accumulator for BytesViewDistinctCountAccumulator {
109    fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
110        let set = self.0.take();
111        let arr = set.into_state();
112        Ok(vec![
113            SingleRowListArrayBuilder::new(arr).build_list_scalar(),
114        ])
115    }
116
117    fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
118        if values.is_empty() {
119            return Ok(());
120        }
121
122        self.0.insert(&values[0]);
123
124        Ok(())
125    }
126
127    fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
128        if states.is_empty() {
129            return Ok(());
130        }
131        assert_eq!(
132            states.len(),
133            1,
134            "count_distinct states must be single array"
135        );
136
137        let arr = as_list_array(&states[0])?;
138        arr.iter().try_for_each(|maybe_list| {
139            if let Some(list) = maybe_list {
140                self.0.insert(&list);
141            };
142            Ok(())
143        })
144    }
145
146    fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
147        Ok(ScalarValue::Int64(Some(self.0.non_null_len() as i64)))
148    }
149
150    fn size(&self) -> usize {
151        size_of_val(self) + self.0.size()
152    }
153}