datafusion_spark/function/math/
bin.rs1use arrow::array::{ArrayRef, AsArray, StringArray};
19use arrow::datatypes::{DataType, Field, FieldRef, Int64Type};
20use datafusion_common::types::{NativeType, logical_int64};
21use datafusion_common::utils::take_function_args;
22use datafusion_common::{Result, internal_err};
23use datafusion_expr::{
24 Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
25 TypeSignatureClass, Volatility,
26};
27use datafusion_functions::utils::make_scalar_function;
28use std::any::Any;
29use std::sync::Arc;
30
31#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkBin {
35 signature: Signature,
36}
37
38impl Default for SparkBin {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl SparkBin {
45 pub fn new() -> Self {
46 Self {
47 signature: Signature::one_of(
48 vec![TypeSignature::Coercible(vec![Coercion::new_implicit(
49 TypeSignatureClass::Native(logical_int64()),
50 vec![TypeSignatureClass::Numeric],
51 NativeType::Int64,
52 )])],
53 Volatility::Immutable,
54 ),
55 }
56 }
57}
58
59impl ScalarUDFImpl for SparkBin {
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63
64 fn name(&self) -> &str {
65 "bin"
66 }
67
68 fn signature(&self) -> &Signature {
69 &self.signature
70 }
71
72 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
73 internal_err!("return_field_from_args should be used instead")
74 }
75
76 fn return_field_from_args(
77 &self,
78 args: datafusion_expr::ReturnFieldArgs,
79 ) -> Result<FieldRef> {
80 Ok(Arc::new(Field::new(
81 self.name(),
82 DataType::Utf8,
83 args.arg_fields[0].is_nullable(),
84 )))
85 }
86
87 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
88 make_scalar_function(spark_bin_inner, vec![])(&args.args)
89 }
90}
91
92fn spark_bin_inner(arg: &[ArrayRef]) -> Result<ArrayRef> {
93 let [array] = take_function_args("bin", arg)?;
94 match &array.data_type() {
95 DataType::Int64 => {
96 let result: StringArray = array
97 .as_primitive::<Int64Type>()
98 .iter()
99 .map(|opt| opt.map(spark_bin))
100 .collect();
101 Ok(Arc::new(result))
102 }
103 data_type => {
104 internal_err!("bin does not support: {data_type}")
105 }
106 }
107}
108
109fn spark_bin(value: i64) -> String {
110 format!("{value:b}")
111}