Skip to main content

datafusion_functions/encoding/
inner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Encoding expressions
19
20use arrow::{
21    array::{
22        Array, ArrayRef, AsArray, BinaryArrayType, GenericBinaryArray,
23        GenericStringArray, OffsetSizeTrait,
24    },
25    datatypes::DataType,
26};
27use arrow_buffer::{Buffer, OffsetBufferBuilder};
28use base64::{
29    Engine as _,
30    engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig},
31};
32use datafusion_common::{
33    DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err,
34    not_impl_err, plan_err,
35    types::{NativeType, logical_string},
36    utils::take_function_args,
37};
38use datafusion_expr::{
39    Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
40    TypeSignatureClass, Volatility,
41};
42use datafusion_macros::user_doc;
43use std::any::Any;
44use std::fmt;
45use std::sync::Arc;
46
47// Allow padding characters, but don't require them, and don't generate them.
48const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new(
49    &base64::alphabet::STANDARD,
50    GeneralPurposeConfig::new()
51        .with_encode_padding(false)
52        .with_decode_padding_mode(DecodePaddingMode::Indifferent),
53);
54
55// Generate padding characters when encoding
56const BASE64_ENGINE_PADDED: GeneralPurpose = GeneralPurpose::new(
57    &base64::alphabet::STANDARD,
58    GeneralPurposeConfig::new().with_encode_padding(true),
59);
60
61#[user_doc(
62    doc_section(label = "Binary String Functions"),
63    description = "Encode binary data into a textual representation.",
64    syntax_example = "encode(expression, format)",
65    argument(
66        name = "expression",
67        description = "Expression containing string or binary data"
68    ),
69    argument(
70        name = "format",
71        description = "Supported formats are: `base64`, `base64pad`, `hex`"
72    ),
73    related_udf(name = "decode")
74)]
75#[derive(Debug, PartialEq, Eq, Hash)]
76pub struct EncodeFunc {
77    signature: Signature,
78}
79
80impl Default for EncodeFunc {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl EncodeFunc {
87    pub fn new() -> Self {
88        Self {
89            signature: Signature::coercible(
90                vec![
91                    Coercion::new_implicit(
92                        TypeSignatureClass::Binary,
93                        vec![TypeSignatureClass::Native(logical_string())],
94                        NativeType::Binary,
95                    ),
96                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
97                ],
98                Volatility::Immutable,
99            ),
100        }
101    }
102}
103
104impl ScalarUDFImpl for EncodeFunc {
105    fn as_any(&self) -> &dyn Any {
106        self
107    }
108
109    fn name(&self) -> &str {
110        "encode"
111    }
112
113    fn signature(&self) -> &Signature {
114        &self.signature
115    }
116
117    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
118        match &arg_types[0] {
119            DataType::LargeBinary => Ok(DataType::LargeUtf8),
120            _ => Ok(DataType::Utf8),
121        }
122    }
123
124    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
125        let [expression, encoding] = take_function_args("encode", &args.args)?;
126        let encoding = Encoding::try_from(encoding)?;
127        match expression {
128            _ if expression.data_type().is_null() => {
129                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
130            }
131            ColumnarValue::Array(array) => encode_array(array, encoding),
132            ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding),
133        }
134    }
135
136    fn documentation(&self) -> Option<&Documentation> {
137        self.doc()
138    }
139}
140
141#[user_doc(
142    doc_section(label = "Binary String Functions"),
143    description = "Decode binary data from textual representation in string.",
144    syntax_example = "decode(expression, format)",
145    argument(
146        name = "expression",
147        description = "Expression containing encoded string data"
148    ),
149    argument(name = "format", description = "Same arguments as [encode](#encode)"),
150    related_udf(name = "encode")
151)]
152#[derive(Debug, PartialEq, Eq, Hash)]
153pub struct DecodeFunc {
154    signature: Signature,
155}
156
157impl Default for DecodeFunc {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163impl DecodeFunc {
164    pub fn new() -> Self {
165        Self {
166            signature: Signature::coercible(
167                vec![
168                    Coercion::new_implicit(
169                        TypeSignatureClass::Binary,
170                        vec![TypeSignatureClass::Native(logical_string())],
171                        NativeType::Binary,
172                    ),
173                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
174                ],
175                Volatility::Immutable,
176            ),
177        }
178    }
179}
180
181impl ScalarUDFImpl for DecodeFunc {
182    fn as_any(&self) -> &dyn Any {
183        self
184    }
185
186    fn name(&self) -> &str {
187        "decode"
188    }
189
190    fn signature(&self) -> &Signature {
191        &self.signature
192    }
193
194    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
195        match &arg_types[0] {
196            DataType::LargeBinary => Ok(DataType::LargeBinary),
197            _ => Ok(DataType::Binary),
198        }
199    }
200
201    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
202        let [expression, encoding] = take_function_args("decode", &args.args)?;
203        let encoding = Encoding::try_from(encoding)?;
204        match expression {
205            _ if expression.data_type().is_null() => {
206                Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
207            }
208            ColumnarValue::Array(array) => decode_array(array, encoding),
209            ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding),
210        }
211    }
212
213    fn documentation(&self) -> Option<&Documentation> {
214        self.doc()
215    }
216}
217
218fn encode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
219    match value {
220        ScalarValue::Binary(maybe_bytes)
221        | ScalarValue::BinaryView(maybe_bytes)
222        | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
223            Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
224                maybe_bytes
225                    .as_ref()
226                    .map(|bytes| encoding.encode_bytes(bytes)),
227            )))
228        }
229        ScalarValue::LargeBinary(maybe_bytes) => {
230            Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
231                maybe_bytes
232                    .as_ref()
233                    .map(|bytes| encoding.encode_bytes(bytes)),
234            )))
235        }
236        v => internal_err!("Unexpected value for encode: {v}"),
237    }
238}
239
240fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
241    let array = match array.data_type() {
242        DataType::Binary => encoding.encode_array::<_, i32>(&array.as_binary::<i32>()),
243        DataType::BinaryView => encoding.encode_array::<_, i32>(&array.as_binary_view()),
244        DataType::LargeBinary => {
245            encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
246        }
247        DataType::FixedSizeBinary(_) => {
248            encoding.encode_array::<_, i32>(&array.as_fixed_size_binary())
249        }
250        dt => {
251            internal_err!("Unexpected data type for encode: {dt}")
252        }
253    };
254    array.map(ColumnarValue::Array)
255}
256
257fn decode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
258    match value {
259        ScalarValue::Binary(maybe_bytes)
260        | ScalarValue::BinaryView(maybe_bytes)
261        | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
262            Ok(ColumnarValue::Scalar(ScalarValue::Binary(
263                maybe_bytes
264                    .as_ref()
265                    .map(|x| encoding.decode_bytes(x))
266                    .transpose()?,
267            )))
268        }
269        ScalarValue::LargeBinary(maybe_bytes) => {
270            Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
271                maybe_bytes
272                    .as_ref()
273                    .map(|x| encoding.decode_bytes(x))
274                    .transpose()?,
275            )))
276        }
277        v => internal_err!("Unexpected value for decode: {v}"),
278    }
279}
280
281/// Estimate how many bytes are actually represented by the array; in case the
282/// the array slices it's internal buffer, this returns the byte size of that slice
283/// but not the byte size of the entire buffer.
284///
285/// This is an estimation only as it can estimate higher if null slots are non-zero
286/// sized.
287fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) -> usize {
288    let offsets = array.value_offsets();
289    // Unwraps are safe as should always have 1 element in offset buffer
290    let start = *offsets.first().unwrap();
291    let end = *offsets.last().unwrap();
292    let data_size = end - start;
293    data_size.as_usize()
294}
295
296fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
297    let array = match array.data_type() {
298        DataType::Binary => {
299            let array = array.as_binary::<i32>();
300            encoding.decode_array::<_, i32>(&array, estimate_byte_data_size(array))
301        }
302        DataType::BinaryView => {
303            let array = array.as_binary_view();
304            // Don't know if there is a more strict upper bound we can infer
305            // for view arrays byte data size.
306            encoding.decode_array::<_, i32>(&array, array.get_buffer_memory_size())
307        }
308        DataType::LargeBinary => {
309            let array = array.as_binary::<i64>();
310            encoding.decode_array::<_, i64>(&array, estimate_byte_data_size(array))
311        }
312        DataType::FixedSizeBinary(size) => {
313            let array = array.as_fixed_size_binary();
314            // TODO: could we be more conservative by accounting for nulls?
315            let estimate = array.len().saturating_mul(*size as usize);
316            encoding.decode_array::<_, i32>(&array, estimate)
317        }
318        dt => {
319            internal_err!("Unexpected data type for decode: {dt}")
320        }
321    };
322    array.map(ColumnarValue::Array)
323}
324
325#[derive(Debug, Copy, Clone)]
326enum Encoding {
327    Base64,
328    Base64Padded,
329    Hex,
330}
331
332impl fmt::Display for Encoding {
333    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
334        let name = match self {
335            Self::Base64 => "base64",
336            Self::Base64Padded => "base64pad",
337            Self::Hex => "hex",
338        };
339        write!(f, "{name}")
340    }
341}
342
343impl TryFrom<&ColumnarValue> for Encoding {
344    type Error = DataFusionError;
345
346    fn try_from(encoding: &ColumnarValue) -> Result<Self> {
347        let encoding = match encoding {
348            ColumnarValue::Scalar(encoding) => match encoding.try_as_str().flatten() {
349                Some(encoding) => encoding,
350                _ => return exec_err!("Encoding must be a non-null string"),
351            },
352            ColumnarValue::Array(_) => {
353                return not_impl_err!(
354                    "Encoding must be a scalar; array specified encoding is not yet supported"
355                );
356            }
357        };
358        match encoding {
359            "base64" => Ok(Self::Base64),
360            "base64pad" => Ok(Self::Base64Padded),
361            "hex" => Ok(Self::Hex),
362            _ => {
363                let options = [Self::Base64, Self::Base64Padded, Self::Hex]
364                    .iter()
365                    .map(|i| i.to_string())
366                    .collect::<Vec<_>>()
367                    .join(", ");
368                plan_err!(
369                    "There is no built-in encoding named '{encoding}', currently supported encodings are: {options}"
370                )
371            }
372        }
373    }
374}
375
376impl Encoding {
377    fn encode_bytes(self, value: &[u8]) -> String {
378        match self {
379            Self::Base64 => BASE64_ENGINE.encode(value),
380            Self::Base64Padded => BASE64_ENGINE_PADDED.encode(value),
381            Self::Hex => hex::encode(value),
382        }
383    }
384
385    fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
386        match self {
387            Self::Base64 | Self::Base64Padded => {
388                BASE64_ENGINE.decode(value).map_err(|e| {
389                    exec_datafusion_err!("Failed to decode value using {self}: {e}")
390                })
391            }
392            Self::Hex => hex::decode(value).map_err(|e| {
393                exec_datafusion_err!("Failed to decode value using hex: {e}")
394            }),
395        }
396    }
397
398    // OutputOffset important to ensure Large types output Large arrays
399    fn encode_array<'a, InputBinaryArray, OutputOffset>(
400        self,
401        array: &InputBinaryArray,
402    ) -> Result<ArrayRef>
403    where
404        InputBinaryArray: BinaryArrayType<'a>,
405        OutputOffset: OffsetSizeTrait,
406    {
407        match self {
408            Self::Base64 => {
409                let array: GenericStringArray<OutputOffset> = array
410                    .iter()
411                    .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
412                    .collect();
413                Ok(Arc::new(array))
414            }
415            Self::Base64Padded => {
416                let array: GenericStringArray<OutputOffset> = array
417                    .iter()
418                    .map(|x| x.map(|x| BASE64_ENGINE_PADDED.encode(x)))
419                    .collect();
420                Ok(Arc::new(array))
421            }
422            Self::Hex => {
423                let array: GenericStringArray<OutputOffset> =
424                    array.iter().map(|x| x.map(hex::encode)).collect();
425                Ok(Arc::new(array))
426            }
427        }
428    }
429
430    // OutputOffset important to ensure Large types output Large arrays
431    fn decode_array<'a, InputBinaryArray, OutputOffset>(
432        self,
433        value: &InputBinaryArray,
434        approx_data_size: usize,
435    ) -> Result<ArrayRef>
436    where
437        InputBinaryArray: BinaryArrayType<'a>,
438        OutputOffset: OffsetSizeTrait,
439    {
440        fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
441            // only write input / 2 bytes to buf
442            let out_len = input.len() / 2;
443            let buf = &mut buf[..out_len];
444            hex::decode_to_slice(input, buf)
445                .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
446            Ok(out_len)
447        }
448
449        fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
450            BASE64_ENGINE
451                .decode_slice(input, buf)
452                .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
453        }
454
455        match self {
456            Self::Base64 | Self::Base64Padded => {
457                let upper_bound = base64::decoded_len_estimate(approx_data_size);
458                delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound)
459            }
460            Self::Hex => {
461                // Calculate the upper bound for decoded byte size
462                // For hex encoding, each pair of hex characters (2 bytes) represents 1 byte when decoded
463                // So the upper bound is half the length of the input values.
464                let upper_bound = approx_data_size / 2;
465                delegated_decode::<_, _, OutputOffset>(hex_decode, value, upper_bound)
466            }
467        }
468    }
469}
470
471fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
472    decode: DecodeFunction,
473    input: &InputBinaryArray,
474    conservative_upper_bound_size: usize,
475) -> Result<ArrayRef>
476where
477    DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
478    InputBinaryArray: BinaryArrayType<'a>,
479    OutputOffset: OffsetSizeTrait,
480{
481    let mut values = vec![0; conservative_upper_bound_size];
482    let mut offsets = OffsetBufferBuilder::new(input.len());
483    let mut total_bytes_decoded = 0;
484    for v in input.iter() {
485        if let Some(v) = v {
486            let cursor = &mut values[total_bytes_decoded..];
487            let decoded = decode(v, cursor)?;
488            total_bytes_decoded += decoded;
489            offsets.push_length(decoded);
490        } else {
491            offsets.push_length(0);
492        }
493    }
494    // We reserved an upper bound size for the values buffer, but we only use the actual size
495    values.truncate(total_bytes_decoded);
496    let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
497        offsets.finish(),
498        Buffer::from_vec(values),
499        input.nulls().cloned(),
500    )?;
501    Ok(Arc::new(binary_array))
502}
503
504#[cfg(test)]
505mod tests {
506    use arrow::array::BinaryArray;
507    use arrow_buffer::OffsetBuffer;
508
509    use super::*;
510
511    #[test]
512    fn test_estimate_byte_data_size() {
513        // Offsets starting at 0, but don't count entire data buffer size
514        let array = BinaryArray::new(
515            OffsetBuffer::new(vec![0, 5, 10, 15].into()),
516            vec![0; 100].into(),
517            None,
518        );
519        let size = estimate_byte_data_size(&array);
520        assert_eq!(size, 15);
521
522        // Offsets starting at 0, but don't count entire data buffer size
523        let array = BinaryArray::new(
524            OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()),
525            vec![0; 100].into(),
526            Some(vec![true, false, false, true, true].into()),
527        );
528        let size = estimate_byte_data_size(&array);
529        assert_eq!(size, 31);
530    }
531}