1use arrow::{
21 array::{
22 Array, ArrayRef, AsArray, BinaryArrayType, GenericBinaryArray,
23 GenericStringArray, OffsetSizeTrait,
24 },
25 datatypes::DataType,
26};
27use arrow_buffer::{Buffer, OffsetBufferBuilder};
28use base64::{
29 Engine as _,
30 engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig},
31};
32use datafusion_common::{
33 DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err,
34 not_impl_err, plan_err,
35 types::{NativeType, logical_string},
36 utils::take_function_args,
37};
38use datafusion_expr::{
39 Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
40 TypeSignatureClass, Volatility,
41};
42use datafusion_macros::user_doc;
43use std::fmt;
44use std::sync::Arc;
45
46const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new(
48 &base64::alphabet::STANDARD,
49 GeneralPurposeConfig::new()
50 .with_encode_padding(false)
51 .with_decode_padding_mode(DecodePaddingMode::Indifferent),
52);
53
54const BASE64_ENGINE_PADDED: GeneralPurpose = GeneralPurpose::new(
56 &base64::alphabet::STANDARD,
57 GeneralPurposeConfig::new().with_encode_padding(true),
58);
59
60#[user_doc(
61 doc_section(label = "Binary String Functions"),
62 description = "Encode binary data into a textual representation.",
63 syntax_example = "encode(expression, format)",
64 argument(
65 name = "expression",
66 description = "Expression containing string or binary data"
67 ),
68 argument(
69 name = "format",
70 description = "Supported formats are: `base64`, `base64pad`, `hex`"
71 ),
72 related_udf(name = "decode")
73)]
74#[derive(Debug, PartialEq, Eq, Hash)]
75pub struct EncodeFunc {
76 signature: Signature,
77}
78
79impl Default for EncodeFunc {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl EncodeFunc {
86 pub fn new() -> Self {
87 Self {
88 signature: Signature::coercible(
89 vec![
90 Coercion::new_implicit(
91 TypeSignatureClass::Binary,
92 vec![TypeSignatureClass::Native(logical_string())],
93 NativeType::Binary,
94 ),
95 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
96 ],
97 Volatility::Immutable,
98 ),
99 }
100 }
101}
102
103impl ScalarUDFImpl for EncodeFunc {
104 fn name(&self) -> &str {
105 "encode"
106 }
107
108 fn signature(&self) -> &Signature {
109 &self.signature
110 }
111
112 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
113 match &arg_types[0] {
114 DataType::LargeBinary => Ok(DataType::LargeUtf8),
115 _ => Ok(DataType::Utf8),
116 }
117 }
118
119 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
120 let [expression, encoding] = take_function_args("encode", &args.args)?;
121 let encoding = Encoding::try_from(encoding)?;
122 match expression {
123 _ if expression.data_type().is_null() => {
124 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
125 }
126 ColumnarValue::Array(array) => encode_array(array, encoding),
127 ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding),
128 }
129 }
130
131 fn documentation(&self) -> Option<&Documentation> {
132 self.doc()
133 }
134}
135
136#[user_doc(
137 doc_section(label = "Binary String Functions"),
138 description = "Decode binary data from textual representation in string.",
139 syntax_example = "decode(expression, format)",
140 argument(
141 name = "expression",
142 description = "Expression containing encoded string data"
143 ),
144 argument(name = "format", description = "Same arguments as [encode](#encode)"),
145 related_udf(name = "encode")
146)]
147#[derive(Debug, PartialEq, Eq, Hash)]
148pub struct DecodeFunc {
149 signature: Signature,
150}
151
152impl Default for DecodeFunc {
153 fn default() -> Self {
154 Self::new()
155 }
156}
157
158impl DecodeFunc {
159 pub fn new() -> Self {
160 Self {
161 signature: Signature::coercible(
162 vec![
163 Coercion::new_implicit(
164 TypeSignatureClass::Binary,
165 vec![TypeSignatureClass::Native(logical_string())],
166 NativeType::Binary,
167 ),
168 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
169 ],
170 Volatility::Immutable,
171 ),
172 }
173 }
174}
175
176impl ScalarUDFImpl for DecodeFunc {
177 fn name(&self) -> &str {
178 "decode"
179 }
180
181 fn signature(&self) -> &Signature {
182 &self.signature
183 }
184
185 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
186 match &arg_types[0] {
187 DataType::LargeBinary => Ok(DataType::LargeBinary),
188 _ => Ok(DataType::Binary),
189 }
190 }
191
192 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
193 let [expression, encoding] = take_function_args("decode", &args.args)?;
194 let encoding = Encoding::try_from(encoding)?;
195 match expression {
196 _ if expression.data_type().is_null() => {
197 Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
198 }
199 ColumnarValue::Array(array) => decode_array(array, encoding),
200 ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding),
201 }
202 }
203
204 fn documentation(&self) -> Option<&Documentation> {
205 self.doc()
206 }
207}
208
209fn encode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
210 match value {
211 ScalarValue::Binary(maybe_bytes)
212 | ScalarValue::BinaryView(maybe_bytes)
213 | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
214 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
215 maybe_bytes
216 .as_ref()
217 .map(|bytes| encoding.encode_bytes(bytes)),
218 )))
219 }
220 ScalarValue::LargeBinary(maybe_bytes) => {
221 Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
222 maybe_bytes
223 .as_ref()
224 .map(|bytes| encoding.encode_bytes(bytes)),
225 )))
226 }
227 v => internal_err!("Unexpected value for encode: {v}"),
228 }
229}
230
231fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
232 let array = match array.data_type() {
233 DataType::Binary => encoding.encode_array::<_, i32>(&array.as_binary::<i32>()),
234 DataType::BinaryView => encoding.encode_array::<_, i32>(&array.as_binary_view()),
235 DataType::LargeBinary => {
236 encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
237 }
238 DataType::FixedSizeBinary(_) => {
239 encoding.encode_array::<_, i32>(&array.as_fixed_size_binary())
240 }
241 dt => {
242 internal_err!("Unexpected data type for encode: {dt}")
243 }
244 };
245 array.map(ColumnarValue::Array)
246}
247
248fn decode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
249 match value {
250 ScalarValue::Binary(maybe_bytes)
251 | ScalarValue::BinaryView(maybe_bytes)
252 | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
253 Ok(ColumnarValue::Scalar(ScalarValue::Binary(
254 maybe_bytes
255 .as_ref()
256 .map(|x| encoding.decode_bytes(x))
257 .transpose()?,
258 )))
259 }
260 ScalarValue::LargeBinary(maybe_bytes) => {
261 Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
262 maybe_bytes
263 .as_ref()
264 .map(|x| encoding.decode_bytes(x))
265 .transpose()?,
266 )))
267 }
268 v => internal_err!("Unexpected value for decode: {v}"),
269 }
270}
271
272fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) -> usize {
279 let offsets = array.value_offsets();
280 let start = *offsets.first().unwrap();
282 let end = *offsets.last().unwrap();
283 let data_size = end - start;
284 data_size.as_usize()
285}
286
287fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
288 let array = match array.data_type() {
289 DataType::Binary => {
290 let array = array.as_binary::<i32>();
291 encoding.decode_array::<_, i32>(&array, estimate_byte_data_size(array))
292 }
293 DataType::BinaryView => {
294 let array = array.as_binary_view();
295 encoding.decode_array::<_, i32>(&array, array.get_buffer_memory_size())
298 }
299 DataType::LargeBinary => {
300 let array = array.as_binary::<i64>();
301 encoding.decode_array::<_, i64>(&array, estimate_byte_data_size(array))
302 }
303 DataType::FixedSizeBinary(size) => {
304 let array = array.as_fixed_size_binary();
305 let estimate = array.len().saturating_mul(*size as usize);
307 encoding.decode_array::<_, i32>(&array, estimate)
308 }
309 dt => {
310 internal_err!("Unexpected data type for decode: {dt}")
311 }
312 };
313 array.map(ColumnarValue::Array)
314}
315
316#[derive(Debug, Copy, Clone)]
317enum Encoding {
318 Base64,
319 Base64Padded,
320 Hex,
321}
322
323impl fmt::Display for Encoding {
324 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
325 let name = match self {
326 Self::Base64 => "base64",
327 Self::Base64Padded => "base64pad",
328 Self::Hex => "hex",
329 };
330 write!(f, "{name}")
331 }
332}
333
334impl TryFrom<&ColumnarValue> for Encoding {
335 type Error = DataFusionError;
336
337 fn try_from(encoding: &ColumnarValue) -> Result<Self> {
338 let encoding = match encoding {
339 ColumnarValue::Scalar(encoding) => match encoding.try_as_str().flatten() {
340 Some(encoding) => encoding,
341 _ => return exec_err!("Encoding must be a non-null string"),
342 },
343 ColumnarValue::Array(_) => {
344 return not_impl_err!(
345 "Encoding must be a scalar; array specified encoding is not yet supported"
346 );
347 }
348 };
349 match encoding {
350 "base64" => Ok(Self::Base64),
351 "base64pad" => Ok(Self::Base64Padded),
352 "hex" => Ok(Self::Hex),
353 _ => {
354 let options = [Self::Base64, Self::Base64Padded, Self::Hex]
355 .iter()
356 .map(|i| i.to_string())
357 .collect::<Vec<_>>()
358 .join(", ");
359 plan_err!(
360 "There is no built-in encoding named '{encoding}', currently supported encodings are: {options}"
361 )
362 }
363 }
364 }
365}
366
367impl Encoding {
368 fn encode_bytes(self, value: &[u8]) -> String {
369 match self {
370 Self::Base64 => BASE64_ENGINE.encode(value),
371 Self::Base64Padded => BASE64_ENGINE_PADDED.encode(value),
372 Self::Hex => hex::encode(value),
373 }
374 }
375
376 fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
377 match self {
378 Self::Base64 | Self::Base64Padded => {
379 BASE64_ENGINE.decode(value).map_err(|e| {
380 exec_datafusion_err!("Failed to decode value using {self}: {e}")
381 })
382 }
383 Self::Hex => hex::decode(value).map_err(|e| {
384 exec_datafusion_err!("Failed to decode value using hex: {e}")
385 }),
386 }
387 }
388
389 fn encode_array<'a, InputBinaryArray, OutputOffset>(
391 self,
392 array: &InputBinaryArray,
393 ) -> Result<ArrayRef>
394 where
395 InputBinaryArray: BinaryArrayType<'a>,
396 OutputOffset: OffsetSizeTrait,
397 {
398 match self {
399 Self::Base64 => {
400 let array: GenericStringArray<OutputOffset> = array
401 .iter()
402 .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
403 .collect();
404 Ok(Arc::new(array))
405 }
406 Self::Base64Padded => {
407 let array: GenericStringArray<OutputOffset> = array
408 .iter()
409 .map(|x| x.map(|x| BASE64_ENGINE_PADDED.encode(x)))
410 .collect();
411 Ok(Arc::new(array))
412 }
413 Self::Hex => {
414 let array: GenericStringArray<OutputOffset> =
415 array.iter().map(|x| x.map(hex::encode)).collect();
416 Ok(Arc::new(array))
417 }
418 }
419 }
420
421 fn decode_array<'a, InputBinaryArray, OutputOffset>(
423 self,
424 value: &InputBinaryArray,
425 approx_data_size: usize,
426 ) -> Result<ArrayRef>
427 where
428 InputBinaryArray: BinaryArrayType<'a>,
429 OutputOffset: OffsetSizeTrait,
430 {
431 fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
432 let out_len = input.len() / 2;
434 let buf = &mut buf[..out_len];
435 hex::decode_to_slice(input, buf)
436 .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
437 Ok(out_len)
438 }
439
440 fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
441 BASE64_ENGINE
442 .decode_slice(input, buf)
443 .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
444 }
445
446 match self {
447 Self::Base64 | Self::Base64Padded => {
448 let upper_bound = base64::decoded_len_estimate(approx_data_size);
449 delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound)
450 }
451 Self::Hex => {
452 let upper_bound = approx_data_size / 2;
456 delegated_decode::<_, _, OutputOffset>(hex_decode, value, upper_bound)
457 }
458 }
459 }
460}
461
462fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
463 decode: DecodeFunction,
464 input: &InputBinaryArray,
465 conservative_upper_bound_size: usize,
466) -> Result<ArrayRef>
467where
468 DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
469 InputBinaryArray: BinaryArrayType<'a>,
470 OutputOffset: OffsetSizeTrait,
471{
472 let mut values = vec![0; conservative_upper_bound_size];
473 let mut offsets = OffsetBufferBuilder::new(input.len());
474 let mut total_bytes_decoded = 0;
475 for v in input.iter() {
476 if let Some(v) = v {
477 let cursor = &mut values[total_bytes_decoded..];
478 let decoded = decode(v, cursor)?;
479 total_bytes_decoded += decoded;
480 offsets.push_length(decoded);
481 } else {
482 offsets.push_length(0);
483 }
484 }
485 values.truncate(total_bytes_decoded);
487 let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
488 offsets.finish(),
489 Buffer::from_vec(values),
490 input.nulls().cloned(),
491 )?;
492 Ok(Arc::new(binary_array))
493}
494
495#[cfg(test)]
496mod tests {
497 use arrow::array::BinaryArray;
498 use arrow_buffer::OffsetBuffer;
499
500 use super::*;
501
502 #[test]
503 fn test_estimate_byte_data_size() {
504 let array = BinaryArray::new(
506 OffsetBuffer::new(vec![0, 5, 10, 15].into()),
507 vec![0; 100].into(),
508 None,
509 );
510 let size = estimate_byte_data_size(&array);
511 assert_eq!(size, 15);
512
513 let array = BinaryArray::new(
515 OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()),
516 vec![0; 100].into(),
517 Some(vec![true, false, false, true, true].into()),
518 );
519 let size = estimate_byte_data_size(&array);
520 assert_eq!(size, 31);
521 }
522}