datafusion_functions/encoding/
inner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Encoding expressions
19
20use arrow::{
21    array::{
22        Array, ArrayRef, AsArray, BinaryArrayType, FixedSizeBinaryArray,
23        GenericBinaryArray, GenericStringArray, OffsetSizeTrait,
24    },
25    datatypes::DataType,
26};
27use arrow_buffer::{Buffer, OffsetBufferBuilder};
28use base64::{
29    Engine as _,
30    engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig},
31};
32use datafusion_common::{
33    DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err,
34    not_impl_err, plan_err,
35    types::{NativeType, logical_string},
36    utils::take_function_args,
37};
38use datafusion_expr::{
39    Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
40    TypeSignatureClass, Volatility,
41};
42use datafusion_macros::user_doc;
43use std::any::Any;
44use std::fmt;
45use std::sync::Arc;
46
47// Allow padding characters, but don't require them, and don't generate them.
48const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new(
49    &base64::alphabet::STANDARD,
50    GeneralPurposeConfig::new()
51        .with_encode_padding(false)
52        .with_decode_padding_mode(DecodePaddingMode::Indifferent),
53);
54
55#[user_doc(
56    doc_section(label = "Binary String Functions"),
57    description = "Encode binary data into a textual representation.",
58    syntax_example = "encode(expression, format)",
59    argument(
60        name = "expression",
61        description = "Expression containing string or binary data"
62    ),
63    argument(
64        name = "format",
65        description = "Supported formats are: `base64`, `hex`"
66    ),
67    related_udf(name = "decode")
68)]
69#[derive(Debug, PartialEq, Eq, Hash)]
70pub struct EncodeFunc {
71    signature: Signature,
72}
73
74impl Default for EncodeFunc {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl EncodeFunc {
81    pub fn new() -> Self {
82        Self {
83            signature: Signature::coercible(
84                vec![
85                    Coercion::new_implicit(
86                        TypeSignatureClass::Binary,
87                        vec![TypeSignatureClass::Native(logical_string())],
88                        NativeType::Binary,
89                    ),
90                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
91                ],
92                Volatility::Immutable,
93            ),
94        }
95    }
96}
97
98impl ScalarUDFImpl for EncodeFunc {
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102
103    fn name(&self) -> &str {
104        "encode"
105    }
106
107    fn signature(&self) -> &Signature {
108        &self.signature
109    }
110
111    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
112        match &arg_types[0] {
113            DataType::LargeBinary => Ok(DataType::LargeUtf8),
114            _ => Ok(DataType::Utf8),
115        }
116    }
117
118    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
119        let [expression, encoding] = take_function_args("encode", &args.args)?;
120        let encoding = Encoding::try_from(encoding)?;
121        match expression {
122            _ if expression.data_type().is_null() => {
123                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
124            }
125            ColumnarValue::Array(array) => encode_array(array, encoding),
126            ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding),
127        }
128    }
129
130    fn documentation(&self) -> Option<&Documentation> {
131        self.doc()
132    }
133}
134
135#[user_doc(
136    doc_section(label = "Binary String Functions"),
137    description = "Decode binary data from textual representation in string.",
138    syntax_example = "decode(expression, format)",
139    argument(
140        name = "expression",
141        description = "Expression containing encoded string data"
142    ),
143    argument(name = "format", description = "Same arguments as [encode](#encode)"),
144    related_udf(name = "encode")
145)]
146#[derive(Debug, PartialEq, Eq, Hash)]
147pub struct DecodeFunc {
148    signature: Signature,
149}
150
151impl Default for DecodeFunc {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157impl DecodeFunc {
158    pub fn new() -> Self {
159        Self {
160            signature: Signature::coercible(
161                vec![
162                    Coercion::new_implicit(
163                        TypeSignatureClass::Binary,
164                        vec![TypeSignatureClass::Native(logical_string())],
165                        NativeType::Binary,
166                    ),
167                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
168                ],
169                Volatility::Immutable,
170            ),
171        }
172    }
173}
174
175impl ScalarUDFImpl for DecodeFunc {
176    fn as_any(&self) -> &dyn Any {
177        self
178    }
179
180    fn name(&self) -> &str {
181        "decode"
182    }
183
184    fn signature(&self) -> &Signature {
185        &self.signature
186    }
187
188    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
189        match &arg_types[0] {
190            DataType::LargeBinary => Ok(DataType::LargeBinary),
191            _ => Ok(DataType::Binary),
192        }
193    }
194
195    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
196        let [expression, encoding] = take_function_args("decode", &args.args)?;
197        let encoding = Encoding::try_from(encoding)?;
198        match expression {
199            _ if expression.data_type().is_null() => {
200                Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
201            }
202            ColumnarValue::Array(array) => decode_array(array, encoding),
203            ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding),
204        }
205    }
206
207    fn documentation(&self) -> Option<&Documentation> {
208        self.doc()
209    }
210}
211
212fn encode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
213    match value {
214        ScalarValue::Binary(maybe_bytes)
215        | ScalarValue::BinaryView(maybe_bytes)
216        | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
217            Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
218                maybe_bytes
219                    .as_ref()
220                    .map(|bytes| encoding.encode_bytes(bytes)),
221            )))
222        }
223        ScalarValue::LargeBinary(maybe_bytes) => {
224            Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
225                maybe_bytes
226                    .as_ref()
227                    .map(|bytes| encoding.encode_bytes(bytes)),
228            )))
229        }
230        v => internal_err!("Unexpected value for encode: {v}"),
231    }
232}
233
234fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
235    let array = match array.data_type() {
236        DataType::Binary => encoding.encode_array::<_, i32>(&array.as_binary::<i32>()),
237        DataType::BinaryView => encoding.encode_array::<_, i32>(&array.as_binary_view()),
238        DataType::LargeBinary => {
239            encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
240        }
241        DataType::FixedSizeBinary(_) => {
242            encoding.encode_fsb_array(array.as_fixed_size_binary())
243        }
244        dt => {
245            internal_err!("Unexpected data type for encode: {dt}")
246        }
247    };
248    array.map(ColumnarValue::Array)
249}
250
251fn decode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
252    match value {
253        ScalarValue::Binary(maybe_bytes)
254        | ScalarValue::BinaryView(maybe_bytes)
255        | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
256            Ok(ColumnarValue::Scalar(ScalarValue::Binary(
257                maybe_bytes
258                    .as_ref()
259                    .map(|x| encoding.decode_bytes(x))
260                    .transpose()?,
261            )))
262        }
263        ScalarValue::LargeBinary(maybe_bytes) => {
264            Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
265                maybe_bytes
266                    .as_ref()
267                    .map(|x| encoding.decode_bytes(x))
268                    .transpose()?,
269            )))
270        }
271        v => internal_err!("Unexpected value for decode: {v}"),
272    }
273}
274
275/// Estimate how many bytes are actually represented by the array; in case the
276/// the array slices it's internal buffer, this returns the byte size of that slice
277/// but not the byte size of the entire buffer.
278///
279/// This is an estimation only as it can estimate higher if null slots are non-zero
280/// sized.
281fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) -> usize {
282    let offsets = array.value_offsets();
283    // Unwraps are safe as should always have 1 element in offset buffer
284    let start = *offsets.first().unwrap();
285    let end = *offsets.last().unwrap();
286    let data_size = end - start;
287    data_size.as_usize()
288}
289
290fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
291    let array = match array.data_type() {
292        DataType::Binary => {
293            let array = array.as_binary::<i32>();
294            encoding.decode_array::<_, i32>(&array, estimate_byte_data_size(array))
295        }
296        DataType::BinaryView => {
297            let array = array.as_binary_view();
298            // Don't know if there is a more strict upper bound we can infer
299            // for view arrays byte data size.
300            encoding.decode_array::<_, i32>(&array, array.get_buffer_memory_size())
301        }
302        DataType::LargeBinary => {
303            let array = array.as_binary::<i64>();
304            encoding.decode_array::<_, i64>(&array, estimate_byte_data_size(array))
305        }
306        DataType::FixedSizeBinary(size) => {
307            let array = array.as_fixed_size_binary();
308            // TODO: could we be more conservative by accounting for nulls?
309            let estimate = array.len().saturating_mul(*size as usize);
310            encoding.decode_fsb_array(array, estimate)
311        }
312        dt => {
313            internal_err!("Unexpected data type for decode: {dt}")
314        }
315    };
316    array.map(ColumnarValue::Array)
317}
318
319#[derive(Debug, Copy, Clone)]
320enum Encoding {
321    Base64,
322    Hex,
323}
324
325impl fmt::Display for Encoding {
326    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
327        write!(f, "{}", format!("{self:?}").to_lowercase())
328    }
329}
330
331impl TryFrom<&ColumnarValue> for Encoding {
332    type Error = DataFusionError;
333
334    fn try_from(encoding: &ColumnarValue) -> Result<Self> {
335        let encoding = match encoding {
336            ColumnarValue::Scalar(encoding) => match encoding.try_as_str().flatten() {
337                Some(encoding) => encoding,
338                _ => return exec_err!("Encoding must be a non-null string"),
339            },
340            ColumnarValue::Array(_) => {
341                return not_impl_err!(
342                    "Encoding must be a scalar; array specified encoding is not yet supported"
343                );
344            }
345        };
346        match encoding {
347            "base64" => Ok(Self::Base64),
348            "hex" => Ok(Self::Hex),
349            _ => {
350                let options = [Self::Base64, Self::Hex]
351                    .iter()
352                    .map(|i| i.to_string())
353                    .collect::<Vec<_>>()
354                    .join(", ");
355                plan_err!(
356                    "There is no built-in encoding named '{encoding}', currently supported encodings are: {options}"
357                )
358            }
359        }
360    }
361}
362
363impl Encoding {
364    fn encode_bytes(self, value: &[u8]) -> String {
365        match self {
366            Self::Base64 => BASE64_ENGINE.encode(value),
367            Self::Hex => hex::encode(value),
368        }
369    }
370
371    fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
372        match self {
373            Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
374                exec_datafusion_err!("Failed to decode value using base64: {e}")
375            }),
376            Self::Hex => hex::decode(value).map_err(|e| {
377                exec_datafusion_err!("Failed to decode value using hex: {e}")
378            }),
379        }
380    }
381
382    // OutputOffset important to ensure Large types output Large arrays
383    fn encode_array<'a, InputBinaryArray, OutputOffset>(
384        self,
385        array: &InputBinaryArray,
386    ) -> Result<ArrayRef>
387    where
388        InputBinaryArray: BinaryArrayType<'a>,
389        OutputOffset: OffsetSizeTrait,
390    {
391        match self {
392            Self::Base64 => {
393                let array: GenericStringArray<OutputOffset> = array
394                    .iter()
395                    .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
396                    .collect();
397                Ok(Arc::new(array))
398            }
399            Self::Hex => {
400                let array: GenericStringArray<OutputOffset> =
401                    array.iter().map(|x| x.map(hex::encode)).collect();
402                Ok(Arc::new(array))
403            }
404        }
405    }
406
407    // TODO: refactor this away once https://github.com/apache/arrow-rs/pull/8993 lands
408    fn encode_fsb_array(self, array: &FixedSizeBinaryArray) -> Result<ArrayRef> {
409        match self {
410            Self::Base64 => {
411                let array: GenericStringArray<i32> = array
412                    .iter()
413                    .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
414                    .collect();
415                Ok(Arc::new(array))
416            }
417            Self::Hex => {
418                let array: GenericStringArray<i32> =
419                    array.iter().map(|x| x.map(hex::encode)).collect();
420                Ok(Arc::new(array))
421            }
422        }
423    }
424
425    // OutputOffset important to ensure Large types output Large arrays
426    fn decode_array<'a, InputBinaryArray, OutputOffset>(
427        self,
428        value: &InputBinaryArray,
429        approx_data_size: usize,
430    ) -> Result<ArrayRef>
431    where
432        InputBinaryArray: BinaryArrayType<'a>,
433        OutputOffset: OffsetSizeTrait,
434    {
435        fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
436            // only write input / 2 bytes to buf
437            let out_len = input.len() / 2;
438            let buf = &mut buf[..out_len];
439            hex::decode_to_slice(input, buf)
440                .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
441            Ok(out_len)
442        }
443
444        fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
445            BASE64_ENGINE
446                .decode_slice(input, buf)
447                .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
448        }
449
450        match self {
451            Self::Base64 => {
452                let upper_bound = base64::decoded_len_estimate(approx_data_size);
453                delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound)
454            }
455            Self::Hex => {
456                // Calculate the upper bound for decoded byte size
457                // For hex encoding, each pair of hex characters (2 bytes) represents 1 byte when decoded
458                // So the upper bound is half the length of the input values.
459                let upper_bound = approx_data_size / 2;
460                delegated_decode::<_, _, OutputOffset>(hex_decode, value, upper_bound)
461            }
462        }
463    }
464
465    // TODO: refactor this away once https://github.com/apache/arrow-rs/pull/8993 lands
466    fn decode_fsb_array(
467        self,
468        value: &FixedSizeBinaryArray,
469        approx_data_size: usize,
470    ) -> Result<ArrayRef> {
471        fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
472            // only write input / 2 bytes to buf
473            let out_len = input.len() / 2;
474            let buf = &mut buf[..out_len];
475            hex::decode_to_slice(input, buf)
476                .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
477            Ok(out_len)
478        }
479
480        fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
481            BASE64_ENGINE
482                .decode_slice(input, buf)
483                .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
484        }
485
486        fn delegated_decode<DecodeFunction>(
487            decode: DecodeFunction,
488            input: &FixedSizeBinaryArray,
489            conservative_upper_bound_size: usize,
490        ) -> Result<ArrayRef>
491        where
492            DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
493        {
494            let mut values = vec![0; conservative_upper_bound_size];
495            let mut offsets = OffsetBufferBuilder::new(input.len());
496            let mut total_bytes_decoded = 0;
497            for v in input.iter() {
498                if let Some(v) = v {
499                    let cursor = &mut values[total_bytes_decoded..];
500                    let decoded = decode(v, cursor)?;
501                    total_bytes_decoded += decoded;
502                    offsets.push_length(decoded);
503                } else {
504                    offsets.push_length(0);
505                }
506            }
507            // We reserved an upper bound size for the values buffer, but we only use the actual size
508            values.truncate(total_bytes_decoded);
509            let binary_array = GenericBinaryArray::<i32>::try_new(
510                offsets.finish(),
511                Buffer::from_vec(values),
512                input.nulls().cloned(),
513            )?;
514            Ok(Arc::new(binary_array))
515        }
516
517        match self {
518            Self::Base64 => {
519                let upper_bound = base64::decoded_len_estimate(approx_data_size);
520                delegated_decode(base64_decode, value, upper_bound)
521            }
522            Self::Hex => {
523                // Calculate the upper bound for decoded byte size
524                // For hex encoding, each pair of hex characters (2 bytes) represents 1 byte when decoded
525                // So the upper bound is half the length of the input values.
526                let upper_bound = approx_data_size / 2;
527                delegated_decode(hex_decode, value, upper_bound)
528            }
529        }
530    }
531}
532
533fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
534    decode: DecodeFunction,
535    input: &InputBinaryArray,
536    conservative_upper_bound_size: usize,
537) -> Result<ArrayRef>
538where
539    DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
540    InputBinaryArray: BinaryArrayType<'a>,
541    OutputOffset: OffsetSizeTrait,
542{
543    let mut values = vec![0; conservative_upper_bound_size];
544    let mut offsets = OffsetBufferBuilder::new(input.len());
545    let mut total_bytes_decoded = 0;
546    for v in input.iter() {
547        if let Some(v) = v {
548            let cursor = &mut values[total_bytes_decoded..];
549            let decoded = decode(v, cursor)?;
550            total_bytes_decoded += decoded;
551            offsets.push_length(decoded);
552        } else {
553            offsets.push_length(0);
554        }
555    }
556    // We reserved an upper bound size for the values buffer, but we only use the actual size
557    values.truncate(total_bytes_decoded);
558    let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
559        offsets.finish(),
560        Buffer::from_vec(values),
561        input.nulls().cloned(),
562    )?;
563    Ok(Arc::new(binary_array))
564}
565
566#[cfg(test)]
567mod tests {
568    use arrow::array::BinaryArray;
569    use arrow_buffer::OffsetBuffer;
570
571    use super::*;
572
573    #[test]
574    fn test_estimate_byte_data_size() {
575        // Offsets starting at 0, but don't count entire data buffer size
576        let array = BinaryArray::new(
577            OffsetBuffer::new(vec![0, 5, 10, 15].into()),
578            vec![0; 100].into(),
579            None,
580        );
581        let size = estimate_byte_data_size(&array);
582        assert_eq!(size, 15);
583
584        // Offsets starting at 0, but don't count entire data buffer size
585        let array = BinaryArray::new(
586            OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()),
587            vec![0; 100].into(),
588            Some(vec![true, false, false, true, true].into()),
589        );
590        let size = estimate_byte_data_size(&array);
591        assert_eq!(size, 31);
592    }
593}