datafusion_spark/function/string/
base64.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::DataType;
22use datafusion_common::arrow::datatypes::{Field, FieldRef};
23use datafusion_common::types::{NativeType, logical_string};
24use datafusion_common::utils::take_function_args;
25use datafusion_common::{Result, exec_err, internal_err};
26use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
27use datafusion_expr::{Coercion, Expr, ReturnFieldArgs, TypeSignatureClass, lit};
28use datafusion_expr::{
29 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::expr_fn::{decode, encode};
32
33#[derive(Debug, PartialEq, Eq, Hash)]
36pub struct SparkBase64 {
37 signature: Signature,
38}
39
40impl Default for SparkBase64 {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl SparkBase64 {
47 pub fn new() -> Self {
48 Self {
49 signature: Signature::coercible(
50 vec![Coercion::new_implicit(
51 TypeSignatureClass::Binary,
52 vec![TypeSignatureClass::Native(logical_string())],
53 NativeType::Binary,
54 )],
55 Volatility::Immutable,
56 ),
57 }
58 }
59}
60
61impl ScalarUDFImpl for SparkBase64 {
62 fn as_any(&self) -> &dyn Any {
63 self
64 }
65
66 fn name(&self) -> &str {
67 "base64"
68 }
69
70 fn signature(&self) -> &Signature {
71 &self.signature
72 }
73
74 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
75 internal_err!("return_type should not be called for {}", self.name())
76 }
77
78 fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
79 let [bin] = take_function_args(self.name(), args.arg_fields)?;
80 let return_type = match bin.data_type() {
81 DataType::LargeBinary => DataType::LargeUtf8,
82 _ => DataType::Utf8,
83 };
84 Ok(Arc::new(Field::new(
85 self.name(),
86 return_type,
87 bin.is_nullable(),
88 )))
89 }
90
91 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
92 exec_err!(
93 "invoke should not be called on a simplified {} function",
94 self.name()
95 )
96 }
97
98 fn simplify(
99 &self,
100 args: Vec<Expr>,
101 _info: &SimplifyContext,
102 ) -> Result<ExprSimplifyResult> {
103 let [bin] = take_function_args(self.name(), args)?;
104 Ok(ExprSimplifyResult::Simplified(encode(
105 bin,
106 lit("base64pad"),
107 )))
108 }
109}
110
111#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct SparkUnBase64 {
114 signature: Signature,
115}
116
117impl Default for SparkUnBase64 {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl SparkUnBase64 {
124 pub fn new() -> Self {
125 Self {
126 signature: Signature::coercible(
127 vec![Coercion::new_implicit(
128 TypeSignatureClass::Binary,
129 vec![TypeSignatureClass::Native(logical_string())],
130 NativeType::Binary,
131 )],
132 Volatility::Immutable,
133 ),
134 }
135 }
136}
137
138impl ScalarUDFImpl for SparkUnBase64 {
139 fn as_any(&self) -> &dyn Any {
140 self
141 }
142
143 fn name(&self) -> &str {
144 "unbase64"
145 }
146
147 fn signature(&self) -> &Signature {
148 &self.signature
149 }
150
151 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
152 internal_err!("return_type should not be called for {}", self.name())
153 }
154
155 fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
156 let [str] = take_function_args(self.name(), args.arg_fields)?;
157 let return_type = match str.data_type() {
158 DataType::LargeBinary => DataType::LargeBinary,
159 _ => DataType::Binary,
160 };
161 Ok(Arc::new(Field::new(
162 self.name(),
163 return_type,
164 str.is_nullable(),
165 )))
166 }
167
168 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
169 exec_err!("{} should have been simplified", self.name())
170 }
171
172 fn simplify(
173 &self,
174 args: Vec<Expr>,
175 _info: &SimplifyContext,
176 ) -> Result<ExprSimplifyResult> {
177 let [bin] = take_function_args(self.name(), args)?;
178 Ok(ExprSimplifyResult::Simplified(decode(
179 bin,
180 lit("base64pad"),
181 )))
182 }
183}