Skip to main content

datafusion_spark/function/bitmap/
bitmap_bucket_number.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::{ArrayRef, AsArray, Int64Array};
19use arrow::datatypes::Field;
20use arrow::datatypes::{DataType, FieldRef, Int8Type, Int16Type, Int32Type, Int64Type};
21use datafusion_common::utils::take_function_args;
22use datafusion_common::{Result, internal_err};
23use datafusion_expr::{
24    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
25    Volatility,
26};
27use datafusion_functions::utils::make_scalar_function;
28use std::sync::Arc;
29
30/// Spark-compatible `bitmap_bucket_number` expression
31/// <https://spark.apache.org/docs/latest/api/sql/index.html#bitmap_bucket_number>
32#[derive(Debug, PartialEq, Eq, Hash)]
33pub struct BitmapBucketNumber {
34    signature: Signature,
35}
36
37impl Default for BitmapBucketNumber {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl BitmapBucketNumber {
44    pub fn new() -> Self {
45        Self {
46            signature: Signature::one_of(
47                vec![
48                    TypeSignature::Exact(vec![DataType::Int8]),
49                    TypeSignature::Exact(vec![DataType::Int16]),
50                    TypeSignature::Exact(vec![DataType::Int32]),
51                    TypeSignature::Exact(vec![DataType::Int64]),
52                ],
53                Volatility::Immutable,
54            ),
55        }
56    }
57}
58
59impl ScalarUDFImpl for BitmapBucketNumber {
60    fn name(&self) -> &str {
61        "bitmap_bucket_number"
62    }
63
64    fn signature(&self) -> &Signature {
65        &self.signature
66    }
67
68    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
69        internal_err!("return_field_from_args should be used instead")
70    }
71
72    fn return_field_from_args(
73        &self,
74        args: datafusion_expr::ReturnFieldArgs,
75    ) -> Result<FieldRef> {
76        Ok(Arc::new(Field::new(
77            self.name(),
78            DataType::Int64,
79            args.arg_fields[0].is_nullable(),
80        )))
81    }
82
83    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
84        make_scalar_function(bitmap_bucket_number_inner, vec![])(&args.args)
85    }
86}
87
88pub fn bitmap_bucket_number_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
89    let [array] = take_function_args("bitmap_bucket_number", arg)?;
90    match &array.data_type() {
91        DataType::Int8 => {
92            let result: Int64Array = array
93                .as_primitive::<Int8Type>()
94                .iter()
95                .map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
96                .collect();
97            Ok(Arc::new(result))
98        }
99        DataType::Int16 => {
100            let result: Int64Array = array
101                .as_primitive::<Int16Type>()
102                .iter()
103                .map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
104                .collect();
105            Ok(Arc::new(result))
106        }
107        DataType::Int32 => {
108            let result: Int64Array = array
109                .as_primitive::<Int32Type>()
110                .iter()
111                .map(|opt| opt.map(|value| bitmap_bucket_number(value.into())))
112                .collect();
113            Ok(Arc::new(result))
114        }
115        DataType::Int64 => {
116            let result: Int64Array = array
117                .as_primitive::<Int64Type>()
118                .iter()
119                .map(|opt| opt.map(bitmap_bucket_number))
120                .collect();
121            Ok(Arc::new(result))
122        }
123        data_type => {
124            internal_err!("bitmap_bucket_number does not support {data_type}")
125        }
126    }
127}
128
129const NUM_BYTES: i64 = 4 * 1024;
130const NUM_BITS: i64 = NUM_BYTES * 8;
131
132fn bitmap_bucket_number(value: i64) -> i64 {
133    if value > 0 {
134        1 + (value - 1) / NUM_BITS
135    } else {
136        value / NUM_BITS
137    }
138}