datafusion_spark/function/bitmap/
bitmap_bucket_number.rs1use arrow::array::{ArrayRef, AsArray, Int64Array};
19use arrow::datatypes::Field;
20use arrow::datatypes::{DataType, FieldRef, Int8Type, Int16Type, Int32Type, Int64Type};
21use datafusion_common::utils::take_function_args;
22use datafusion_common::{Result, internal_err};
23use datafusion_expr::{
24 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
25 Volatility,
26};
27use datafusion_functions::utils::make_scalar_function;
28use std::sync::Arc;
29
30#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct BitmapBucketNumber {
34 signature: Signature,
35}
36
37impl Default for BitmapBucketNumber {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl BitmapBucketNumber {
44 pub fn new() -> Self {
45 Self {
46 signature: Signature::one_of(
47 vec![
48 TypeSignature::Exact(vec![DataType::Int8]),
49 TypeSignature::Exact(vec![DataType::Int16]),
50 TypeSignature::Exact(vec![DataType::Int32]),
51 TypeSignature::Exact(vec![DataType::Int64]),
52 ],
53 Volatility::Immutable,
54 ),
55 }
56 }
57}
58
59impl ScalarUDFImpl for BitmapBucketNumber {
60 fn name(&self) -> &str {
61 "bitmap_bucket_number"
62 }
63
64 fn signature(&self) -> &Signature {
65 &self.signature
66 }
67
68 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
69 internal_err!("return_field_from_args should be used instead")
70 }
71
72 fn return_field_from_args(
73 &self,
74 args: datafusion_expr::ReturnFieldArgs,
75 ) -> Result<FieldRef> {
76 Ok(Arc::new(Field::new(
77 self.name(),
78 DataType::Int64,
79 args.arg_fields[0].is_nullable(),
80 )))
81 }
82
83 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
84 make_scalar_function(bitmap_bucket_number_inner, vec![])(&args.args)
85 }
86}
87
88pub fn bitmap_bucket_number_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
89 let [array] = take_function_args("bitmap_bucket_number", arg)?;
90 match &array.data_type() {
91 DataType::Int8 => {
92 let result: Int64Array = array
93 .as_primitive::<Int8Type>()
94 .iter()
95 .map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
96 .collect();
97 Ok(Arc::new(result))
98 }
99 DataType::Int16 => {
100 let result: Int64Array = array
101 .as_primitive::<Int16Type>()
102 .iter()
103 .map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
104 .collect();
105 Ok(Arc::new(result))
106 }
107 DataType::Int32 => {
108 let result: Int64Array = array
109 .as_primitive::<Int32Type>()
110 .iter()
111 .map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
112 .collect();
113 Ok(Arc::new(result))
114 }
115 DataType::Int64 => {
116 let result: Int64Array = array
117 .as_primitive::<Int64Type>()
118 .iter()
119 .map(|opt| opt.map(bitmap_bucket_number))
120 .collect();
121 Ok(Arc::new(result))
122 }
123 data_type => {
124 internal_err!("bitmap_bucket_number does not support {data_type}")
125 }
126 }
127}
128
129const NUM_BYTES: i64 = 4 * 1024;
130const NUM_BITS: i64 = NUM_BYTES * 8;
131
132fn bitmap_bucket_number(value: i64) -> i64 {
133 if value > 0 {
134 1 + (value - 1) / NUM_BITS
135 } else {
136 value / NUM_BITS
137 }
138}