datafusion_spark/function/bitmap/
bitmap_count.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{
22 as_dictionary_array, Array, ArrayRef, BinaryArray, BinaryViewArray,
23 FixedSizeBinaryArray, Int64Array, LargeBinaryArray,
24};
25use arrow::datatypes::DataType::{
26 Binary, BinaryView, Dictionary, FixedSizeBinary, LargeBinary,
27};
28use arrow::datatypes::{DataType, Int16Type, Int32Type, Int64Type, Int8Type};
29use datafusion_common::utils::take_function_args;
30use datafusion_common::{internal_err, Result};
31use datafusion_expr::{
32 Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
33 TypeSignatureClass, Volatility,
34};
35use datafusion_functions::downcast_arg;
36use datafusion_functions::utils::make_scalar_function;
37
38#[derive(Debug, PartialEq, Eq, Hash)]
39pub struct BitmapCount {
40 signature: Signature,
41}
42
43impl Default for BitmapCount {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl BitmapCount {
50 pub fn new() -> Self {
51 Self {
52 signature: Signature::coercible(
53 vec![Coercion::new_exact(TypeSignatureClass::Binary)],
54 Volatility::Immutable,
55 ),
56 }
57 }
58}
59
60impl ScalarUDFImpl for BitmapCount {
61 fn as_any(&self) -> &dyn Any {
62 self
63 }
64
65 fn name(&self) -> &str {
66 "bitmap_count"
67 }
68
69 fn signature(&self) -> &Signature {
70 &self.signature
71 }
72
73 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
74 Ok(DataType::Int64)
75 }
76
77 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
78 make_scalar_function(bitmap_count_inner, vec![])(&args.args)
79 }
80}
81
82fn binary_count_ones(opt: Option<&[u8]>) -> Option<i64> {
83 opt.map(|value| value.iter().map(|b| b.count_ones() as i64).sum())
84}
85
86macro_rules! downcast_and_count_ones {
87 ($input_array:expr, $array_type:ident) => {{
88 let arr = downcast_arg!($input_array, $array_type);
89 Ok(arr.iter().map(binary_count_ones).collect::<Int64Array>())
90 }};
91}
92
93macro_rules! downcast_dict_and_count_ones {
94 ($input_dict:expr, $key_array_type:ident) => {{
95 let dict_array = as_dictionary_array::<$key_array_type>($input_dict);
96 let array = dict_array.downcast_dict::<BinaryArray>().unwrap();
97 Ok(array
98 .into_iter()
99 .map(binary_count_ones)
100 .collect::<Int64Array>())
101 }};
102}
103
104pub fn bitmap_count_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
105 let [input_array] = take_function_args("bitmap_count", arg)?;
106
107 let res: Result<Int64Array> = match &input_array.data_type() {
108 Binary => downcast_and_count_ones!(input_array, BinaryArray),
109 BinaryView => downcast_and_count_ones!(input_array, BinaryViewArray),
110 LargeBinary => downcast_and_count_ones!(input_array, LargeBinaryArray),
111 FixedSizeBinary(_size) => {
112 downcast_and_count_ones!(input_array, FixedSizeBinaryArray)
113 }
114 Dictionary(k, v) if v.as_ref() == &Binary => match k.as_ref() {
115 DataType::Int8 => downcast_dict_and_count_ones!(input_array, Int8Type),
116 DataType::Int16 => downcast_dict_and_count_ones!(input_array, Int16Type),
117 DataType::Int32 => downcast_dict_and_count_ones!(input_array, Int32Type),
118 DataType::Int64 => downcast_dict_and_count_ones!(input_array, Int64Type),
119 data_type => {
120 internal_err!(
121 "bitmap_count does not support Dictionary({data_type}, Binary)"
122 )
123 }
124 },
125 data_type => {
126 internal_err!("bitmap_count does not support {data_type}")
127 }
128 };
129
130 Ok(Arc::new(res?))
131}
132
133#[cfg(test)]
134mod tests {
135 use crate::function::bitmap::bitmap_count::BitmapCount;
136 use crate::function::utils::test::test_scalar_function;
137 use arrow::array::{Array, Int64Array};
138 use arrow::datatypes::DataType::Int64;
139 use arrow::datatypes::{DataType, Field};
140 use datafusion_common::config::ConfigOptions;
141 use datafusion_common::{Result, ScalarValue};
142 use datafusion_expr::ColumnarValue::Scalar;
143 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
144 use std::sync::Arc;
145
146 macro_rules! test_bitmap_count_binary_invoke {
147 ($INPUT:expr, $EXPECTED:expr) => {
148 test_scalar_function!(
149 BitmapCount::new(),
150 vec![ColumnarValue::Scalar(ScalarValue::Binary($INPUT))],
151 $EXPECTED,
152 i64,
153 Int64,
154 Int64Array
155 );
156
157 test_scalar_function!(
158 BitmapCount::new(),
159 vec![ColumnarValue::Scalar(ScalarValue::LargeBinary($INPUT))],
160 $EXPECTED,
161 i64,
162 Int64,
163 Int64Array
164 );
165
166 test_scalar_function!(
167 BitmapCount::new(),
168 vec![ColumnarValue::Scalar(ScalarValue::BinaryView($INPUT))],
169 $EXPECTED,
170 i64,
171 Int64,
172 Int64Array
173 );
174
175 test_scalar_function!(
176 BitmapCount::new(),
177 vec![ColumnarValue::Scalar(ScalarValue::FixedSizeBinary(
178 $INPUT.map(|a| a.len()).unwrap_or(0) as i32,
179 $INPUT
180 ))],
181 $EXPECTED,
182 i64,
183 Int64,
184 Int64Array
185 );
186 };
187 }
188
189 #[test]
190 fn test_bitmap_count_invoke() -> Result<()> {
191 test_bitmap_count_binary_invoke!(None::<Vec<u8>>, Ok(None));
192 test_bitmap_count_binary_invoke!(Some(vec![0x0Au8]), Ok(Some(2)));
193 test_bitmap_count_binary_invoke!(Some(vec![0xFFu8, 0xFFu8]), Ok(Some(16)));
194 test_bitmap_count_binary_invoke!(
195 Some(vec![0x0Au8, 0xB0u8, 0xCDu8]),
196 Ok(Some(10))
197 );
198 Ok(())
199 }
200
201 #[test]
202 fn test_dictionary_encoded_bitmap_count_invoke() -> Result<()> {
203 let dict = Scalar(ScalarValue::Dictionary(
204 Box::new(DataType::Int32),
205 Box::new(ScalarValue::Binary(Some(vec![0xFFu8, 0xFFu8]))),
206 ));
207
208 let arg_fields = vec![Field::new(
209 "a",
210 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)),
211 true,
212 )
213 .into()];
214 let args = ScalarFunctionArgs {
215 args: vec![dict.clone()],
216 arg_fields,
217 number_rows: 1,
218 return_field: Field::new("f", Int64, true).into(),
219 config_options: Arc::new(ConfigOptions::default()),
220 };
221 let udf = BitmapCount::new();
222 let actual = udf.invoke_with_args(args)?;
223 let expect = Scalar(ScalarValue::Int64(Some(16)));
224 assert_eq!(*actual.into_array(1)?, *expect.into_array(1)?);
225 Ok(())
226 }
227}