Skip to main content

datafusion_functions/encoding/
inner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Encoding expressions
19
20use arrow::{
21    array::{
22        Array, ArrayRef, AsArray, BinaryArrayType, GenericBinaryArray,
23        GenericStringArray, OffsetSizeTrait,
24    },
25    datatypes::DataType,
26};
27use arrow_buffer::{Buffer, OffsetBufferBuilder};
28use base64::{
29    Engine as _,
30    engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig},
31};
32use datafusion_common::{
33    DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err,
34    not_impl_err, plan_err,
35    types::{NativeType, logical_string},
36    utils::take_function_args,
37};
38use datafusion_expr::{
39    Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
40    TypeSignatureClass, Volatility,
41};
42use datafusion_macros::user_doc;
43use std::fmt;
44use std::sync::Arc;
45
46// Allow padding characters, but don't require them, and don't generate them.
47const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new(
48    &base64::alphabet::STANDARD,
49    GeneralPurposeConfig::new()
50        .with_encode_padding(false)
51        .with_decode_padding_mode(DecodePaddingMode::Indifferent),
52);
53
54// Generate padding characters when encoding
55const BASE64_ENGINE_PADDED: GeneralPurpose = GeneralPurpose::new(
56    &base64::alphabet::STANDARD,
57    GeneralPurposeConfig::new().with_encode_padding(true),
58);
59
60#[user_doc(
61    doc_section(label = "Binary String Functions"),
62    description = "Encode binary data into a textual representation.",
63    syntax_example = "encode(expression, format)",
64    argument(
65        name = "expression",
66        description = "Expression containing string or binary data"
67    ),
68    argument(
69        name = "format",
70        description = "Supported formats are: `base64`, `base64pad`, `hex`"
71    ),
72    related_udf(name = "decode")
73)]
74#[derive(Debug, PartialEq, Eq, Hash)]
75pub struct EncodeFunc {
76    signature: Signature,
77}
78
79impl Default for EncodeFunc {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl EncodeFunc {
86    pub fn new() -> Self {
87        Self {
88            signature: Signature::coercible(
89                vec![
90                    Coercion::new_implicit(
91                        TypeSignatureClass::Binary,
92                        vec![TypeSignatureClass::Native(logical_string())],
93                        NativeType::Binary,
94                    ),
95                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
96                ],
97                Volatility::Immutable,
98            ),
99        }
100    }
101}
102
103impl ScalarUDFImpl for EncodeFunc {
104    fn name(&self) -> &str {
105        "encode"
106    }
107
108    fn signature(&self) -> &Signature {
109        &self.signature
110    }
111
112    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
113        match &arg_types[0] {
114            DataType::LargeBinary => Ok(DataType::LargeUtf8),
115            _ => Ok(DataType::Utf8),
116        }
117    }
118
119    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
120        let [expression, encoding] = take_function_args("encode", &args.args)?;
121        let encoding = Encoding::try_from(encoding)?;
122        match expression {
123            _ if expression.data_type().is_null() => {
124                Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
125            }
126            ColumnarValue::Array(array) => encode_array(array, encoding),
127            ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding),
128        }
129    }
130
131    fn documentation(&self) -> Option<&Documentation> {
132        self.doc()
133    }
134}
135
136#[user_doc(
137    doc_section(label = "Binary String Functions"),
138    description = "Decode binary data from textual representation in string.",
139    syntax_example = "decode(expression, format)",
140    argument(
141        name = "expression",
142        description = "Expression containing encoded string data"
143    ),
144    argument(name = "format", description = "Same arguments as [encode](#encode)"),
145    related_udf(name = "encode")
146)]
147#[derive(Debug, PartialEq, Eq, Hash)]
148pub struct DecodeFunc {
149    signature: Signature,
150}
151
152impl Default for DecodeFunc {
153    fn default() -> Self {
154        Self::new()
155    }
156}
157
158impl DecodeFunc {
159    pub fn new() -> Self {
160        Self {
161            signature: Signature::coercible(
162                vec![
163                    Coercion::new_implicit(
164                        TypeSignatureClass::Binary,
165                        vec![TypeSignatureClass::Native(logical_string())],
166                        NativeType::Binary,
167                    ),
168                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
169                ],
170                Volatility::Immutable,
171            ),
172        }
173    }
174}
175
176impl ScalarUDFImpl for DecodeFunc {
177    fn name(&self) -> &str {
178        "decode"
179    }
180
181    fn signature(&self) -> &Signature {
182        &self.signature
183    }
184
185    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
186        match &arg_types[0] {
187            DataType::LargeBinary => Ok(DataType::LargeBinary),
188            _ => Ok(DataType::Binary),
189        }
190    }
191
192    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
193        let [expression, encoding] = take_function_args("decode", &args.args)?;
194        let encoding = Encoding::try_from(encoding)?;
195        match expression {
196            _ if expression.data_type().is_null() => {
197                Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
198            }
199            ColumnarValue::Array(array) => decode_array(array, encoding),
200            ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding),
201        }
202    }
203
204    fn documentation(&self) -> Option<&Documentation> {
205        self.doc()
206    }
207}
208
209fn encode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
210    match value {
211        ScalarValue::Binary(maybe_bytes)
212        | ScalarValue::BinaryView(maybe_bytes)
213        | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
214            Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
215                maybe_bytes
216                    .as_ref()
217                    .map(|bytes| encoding.encode_bytes(bytes)),
218            )))
219        }
220        ScalarValue::LargeBinary(maybe_bytes) => {
221            Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
222                maybe_bytes
223                    .as_ref()
224                    .map(|bytes| encoding.encode_bytes(bytes)),
225            )))
226        }
227        v => internal_err!("Unexpected value for encode: {v}"),
228    }
229}
230
231fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
232    let array = match array.data_type() {
233        DataType::Binary => encoding.encode_array::<_, i32>(&array.as_binary::<i32>()),
234        DataType::BinaryView => encoding.encode_array::<_, i32>(&array.as_binary_view()),
235        DataType::LargeBinary => {
236            encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
237        }
238        DataType::FixedSizeBinary(_) => {
239            encoding.encode_array::<_, i32>(&array.as_fixed_size_binary())
240        }
241        dt => {
242            internal_err!("Unexpected data type for encode: {dt}")
243        }
244    };
245    array.map(ColumnarValue::Array)
246}
247
248fn decode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
249    match value {
250        ScalarValue::Binary(maybe_bytes)
251        | ScalarValue::BinaryView(maybe_bytes)
252        | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
253            Ok(ColumnarValue::Scalar(ScalarValue::Binary(
254                maybe_bytes
255                    .as_ref()
256                    .map(|x| encoding.decode_bytes(x))
257                    .transpose()?,
258            )))
259        }
260        ScalarValue::LargeBinary(maybe_bytes) => {
261            Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
262                maybe_bytes
263                    .as_ref()
264                    .map(|x| encoding.decode_bytes(x))
265                    .transpose()?,
266            )))
267        }
268        v => internal_err!("Unexpected value for decode: {v}"),
269    }
270}
271
272/// Estimate how many bytes are actually represented by the array; in case the
273/// the array slices it's internal buffer, this returns the byte size of that slice
274/// but not the byte size of the entire buffer.
275///
276/// This is an estimation only as it can estimate higher if null slots are non-zero
277/// sized.
278fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) -> usize {
279    let offsets = array.value_offsets();
280    // Unwraps are safe as should always have 1 element in offset buffer
281    let start = *offsets.first().unwrap();
282    let end = *offsets.last().unwrap();
283    let data_size = end - start;
284    data_size.as_usize()
285}
286
287fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
288    let array = match array.data_type() {
289        DataType::Binary => {
290            let array = array.as_binary::<i32>();
291            encoding.decode_array::<_, i32>(&array, estimate_byte_data_size(array))
292        }
293        DataType::BinaryView => {
294            let array = array.as_binary_view();
295            // Don't know if there is a more strict upper bound we can infer
296            // for view arrays byte data size.
297            encoding.decode_array::<_, i32>(&array, array.get_buffer_memory_size())
298        }
299        DataType::LargeBinary => {
300            let array = array.as_binary::<i64>();
301            encoding.decode_array::<_, i64>(&array, estimate_byte_data_size(array))
302        }
303        DataType::FixedSizeBinary(size) => {
304            let array = array.as_fixed_size_binary();
305            // TODO: could we be more conservative by accounting for nulls?
306            let estimate = array.len().saturating_mul(*size as usize);
307            encoding.decode_array::<_, i32>(&array, estimate)
308        }
309        dt => {
310            internal_err!("Unexpected data type for decode: {dt}")
311        }
312    };
313    array.map(ColumnarValue::Array)
314}
315
316#[derive(Debug, Copy, Clone)]
317enum Encoding {
318    Base64,
319    Base64Padded,
320    Hex,
321}
322
323impl fmt::Display for Encoding {
324    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325        let name = match self {
326            Self::Base64 => "base64",
327            Self::Base64Padded => "base64pad",
328            Self::Hex => "hex",
329        };
330        write!(f, "{name}")
331    }
332}
333
334impl TryFrom<&ColumnarValue> for Encoding {
335    type Error = DataFusionError;
336
337    fn try_from(encoding: &ColumnarValue) -> Result<Self> {
338        let encoding = match encoding {
339            ColumnarValue::Scalar(encoding) => match encoding.try_as_str().flatten() {
340                Some(encoding) => encoding,
341                _ => return exec_err!("Encoding must be a non-null string"),
342            },
343            ColumnarValue::Array(_) => {
344                return not_impl_err!(
345                    "Encoding must be a scalar; array specified encoding is not yet supported"
346                );
347            }
348        };
349        match encoding {
350            "base64" => Ok(Self::Base64),
351            "base64pad" => Ok(Self::Base64Padded),
352            "hex" => Ok(Self::Hex),
353            _ => {
354                let options = [Self::Base64, Self::Base64Padded, Self::Hex]
355                    .iter()
356                    .map(|i| i.to_string())
357                    .collect::<Vec<_>>()
358                    .join(", ");
359                plan_err!(
360                    "There is no built-in encoding named '{encoding}', currently supported encodings are: {options}"
361                )
362            }
363        }
364    }
365}
366
367impl Encoding {
368    fn encode_bytes(self, value: &[u8]) -> String {
369        match self {
370            Self::Base64 => BASE64_ENGINE.encode(value),
371            Self::Base64Padded => BASE64_ENGINE_PADDED.encode(value),
372            Self::Hex => hex::encode(value),
373        }
374    }
375
376    fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
377        match self {
378            Self::Base64 | Self::Base64Padded => {
379                BASE64_ENGINE.decode(value).map_err(|e| {
380                    exec_datafusion_err!("Failed to decode value using {self}: {e}")
381                })
382            }
383            Self::Hex => hex::decode(value).map_err(|e| {
384                exec_datafusion_err!("Failed to decode value using hex: {e}")
385            }),
386        }
387    }
388
389    // OutputOffset important to ensure Large types output Large arrays
390    fn encode_array<'a, InputBinaryArray, OutputOffset>(
391        self,
392        array: &InputBinaryArray,
393    ) -> Result<ArrayRef>
394    where
395        InputBinaryArray: BinaryArrayType<'a>,
396        OutputOffset: OffsetSizeTrait,
397    {
398        match self {
399            Self::Base64 => {
400                let array: GenericStringArray<OutputOffset> = array
401                    .iter()
402                    .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
403                    .collect();
404                Ok(Arc::new(array))
405            }
406            Self::Base64Padded => {
407                let array: GenericStringArray<OutputOffset> = array
408                    .iter()
409                    .map(|x| x.map(|x| BASE64_ENGINE_PADDED.encode(x)))
410                    .collect();
411                Ok(Arc::new(array))
412            }
413            Self::Hex => {
414                let array: GenericStringArray<OutputOffset> =
415                    array.iter().map(|x| x.map(hex::encode)).collect();
416                Ok(Arc::new(array))
417            }
418        }
419    }
420
421    // OutputOffset important to ensure Large types output Large arrays
422    fn decode_array<'a, InputBinaryArray, OutputOffset>(
423        self,
424        value: &InputBinaryArray,
425        approx_data_size: usize,
426    ) -> Result<ArrayRef>
427    where
428        InputBinaryArray: BinaryArrayType<'a>,
429        OutputOffset: OffsetSizeTrait,
430    {
431        fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
432            // only write input / 2 bytes to buf
433            let out_len = input.len() / 2;
434            let buf = &mut buf[..out_len];
435            hex::decode_to_slice(input, buf)
436                .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
437            Ok(out_len)
438        }
439
440        fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
441            BASE64_ENGINE
442                .decode_slice(input, buf)
443                .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
444        }
445
446        match self {
447            Self::Base64 | Self::Base64Padded => {
448                let upper_bound = base64::decoded_len_estimate(approx_data_size);
449                delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound)
450            }
451            Self::Hex => {
452                // Calculate the upper bound for decoded byte size
453                // For hex encoding, each pair of hex characters (2 bytes) represents 1 byte when decoded
454                // So the upper bound is half the length of the input values.
455                let upper_bound = approx_data_size / 2;
456                delegated_decode::<_, _, OutputOffset>(hex_decode, value, upper_bound)
457            }
458        }
459    }
460}
461
462fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
463    decode: DecodeFunction,
464    input: &InputBinaryArray,
465    conservative_upper_bound_size: usize,
466) -> Result<ArrayRef>
467where
468    DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
469    InputBinaryArray: BinaryArrayType<'a>,
470    OutputOffset: OffsetSizeTrait,
471{
472    let mut values = vec![0; conservative_upper_bound_size];
473    let mut offsets = OffsetBufferBuilder::new(input.len());
474    let mut total_bytes_decoded = 0;
475    for v in input.iter() {
476        if let Some(v) = v {
477            let cursor = &mut values[total_bytes_decoded..];
478            let decoded = decode(v, cursor)?;
479            total_bytes_decoded += decoded;
480            offsets.push_length(decoded);
481        } else {
482            offsets.push_length(0);
483        }
484    }
485    // We reserved an upper bound size for the values buffer, but we only use the actual size
486    values.truncate(total_bytes_decoded);
487    let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
488        offsets.finish(),
489        Buffer::from_vec(values),
490        input.nulls().cloned(),
491    )?;
492    Ok(Arc::new(binary_array))
493}
494
495#[cfg(test)]
496mod tests {
497    use arrow::array::BinaryArray;
498    use arrow_buffer::OffsetBuffer;
499
500    use super::*;
501
502    #[test]
503    fn test_estimate_byte_data_size() {
504        // Offsets starting at 0, but don't count entire data buffer size
505        let array = BinaryArray::new(
506            OffsetBuffer::new(vec![0, 5, 10, 15].into()),
507            vec![0; 100].into(),
508            None,
509        );
510        let size = estimate_byte_data_size(&array);
511        assert_eq!(size, 15);
512
513        // Offsets starting at 0, but don't count entire data buffer size
514        let array = BinaryArray::new(
515            OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()),
516            vec![0; 100].into(),
517            Some(vec![true, false, false, true, true].into()),
518        );
519        let size = estimate_byte_data_size(&array);
520        assert_eq!(size, 31);
521    }
522}