Skip to main content

datafusion_spark/function/string/
base64.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::datatypes::DataType;
22use datafusion_common::arrow::datatypes::{Field, FieldRef};
23use datafusion_common::types::{NativeType, logical_string};
24use datafusion_common::utils::take_function_args;
25use datafusion_common::{Result, exec_err, internal_err};
26use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext};
27use datafusion_expr::{Coercion, Expr, ReturnFieldArgs, TypeSignatureClass, lit};
28use datafusion_expr::{
29    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::expr_fn::{decode, encode};
32
33/// Apache Spark base64 uses padded base64 encoding.
34/// <https://spark.apache.org/docs/latest/api/sql/index.html#base64>
35#[derive(Debug, PartialEq, Eq, Hash)]
36pub struct SparkBase64 {
37    signature: Signature,
38}
39
40impl Default for SparkBase64 {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl SparkBase64 {
47    pub fn new() -> Self {
48        Self {
49            signature: Signature::coercible(
50                vec![Coercion::new_implicit(
51                    TypeSignatureClass::Binary,
52                    vec![TypeSignatureClass::Native(logical_string())],
53                    NativeType::Binary,
54                )],
55                Volatility::Immutable,
56            ),
57        }
58    }
59}
60
61impl ScalarUDFImpl for SparkBase64 {
62    fn as_any(&self) -> &dyn Any {
63        self
64    }
65
66    fn name(&self) -> &str {
67        "base64"
68    }
69
70    fn signature(&self) -> &Signature {
71        &self.signature
72    }
73
74    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
75        internal_err!("return_type should not be called for {}", self.name())
76    }
77
78    fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
79        let [bin] = take_function_args(self.name(), args.arg_fields)?;
80        let return_type = match bin.data_type() {
81            DataType::LargeBinary => DataType::LargeUtf8,
82            _ => DataType::Utf8,
83        };
84        Ok(Arc::new(Field::new(
85            self.name(),
86            return_type,
87            bin.is_nullable(),
88        )))
89    }
90
91    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
92        exec_err!(
93            "invoke should not be called on a simplified {} function",
94            self.name()
95        )
96    }
97
98    fn simplify(
99        &self,
100        args: Vec<Expr>,
101        _info: &SimplifyContext,
102    ) -> Result<ExprSimplifyResult> {
103        let [bin] = take_function_args(self.name(), args)?;
104        Ok(ExprSimplifyResult::Simplified(encode(
105            bin,
106            lit("base64pad"),
107        )))
108    }
109}
110
111/// <https://spark.apache.org/docs/latest/api/sql/index.html#unbase64>
112#[derive(Debug, PartialEq, Eq, Hash)]
113pub struct SparkUnBase64 {
114    signature: Signature,
115}
116
117impl Default for SparkUnBase64 {
118    fn default() -> Self {
119        Self::new()
120    }
121}
122
123impl SparkUnBase64 {
124    pub fn new() -> Self {
125        Self {
126            signature: Signature::coercible(
127                vec![Coercion::new_implicit(
128                    TypeSignatureClass::Binary,
129                    vec![TypeSignatureClass::Native(logical_string())],
130                    NativeType::Binary,
131                )],
132                Volatility::Immutable,
133            ),
134        }
135    }
136}
137
138impl ScalarUDFImpl for SparkUnBase64 {
139    fn as_any(&self) -> &dyn Any {
140        self
141    }
142
143    fn name(&self) -> &str {
144        "unbase64"
145    }
146
147    fn signature(&self) -> &Signature {
148        &self.signature
149    }
150
151    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
152        internal_err!("return_type should not be called for {}", self.name())
153    }
154
155    fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> {
156        let [str] = take_function_args(self.name(), args.arg_fields)?;
157        let return_type = match str.data_type() {
158            DataType::LargeBinary => DataType::LargeBinary,
159            _ => DataType::Binary,
160        };
161        Ok(Arc::new(Field::new(
162            self.name(),
163            return_type,
164            str.is_nullable(),
165        )))
166    }
167
168    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
169        exec_err!("{} should have been simplified", self.name())
170    }
171
172    fn simplify(
173        &self,
174        args: Vec<Expr>,
175        _info: &SimplifyContext,
176    ) -> Result<ExprSimplifyResult> {
177        let [bin] = take_function_args(self.name(), args)?;
178        Ok(ExprSimplifyResult::Simplified(decode(
179            bin,
180            lit("base64pad"),
181        )))
182    }
183}