1use arrow::{
21 array::{
22 Array, ArrayRef, AsArray, BinaryArrayType, FixedSizeBinaryArray,
23 GenericBinaryArray, GenericStringArray, OffsetSizeTrait,
24 },
25 datatypes::DataType,
26};
27use arrow_buffer::{Buffer, OffsetBufferBuilder};
28use base64::{
29 Engine as _,
30 engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig},
31};
32use datafusion_common::{
33 DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err,
34 not_impl_err, plan_err,
35 types::{NativeType, logical_string},
36 utils::take_function_args,
37};
38use datafusion_expr::{
39 Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
40 TypeSignatureClass, Volatility,
41};
42use datafusion_macros::user_doc;
43use std::any::Any;
44use std::fmt;
45use std::sync::Arc;
46
47const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new(
49 &base64::alphabet::STANDARD,
50 GeneralPurposeConfig::new()
51 .with_encode_padding(false)
52 .with_decode_padding_mode(DecodePaddingMode::Indifferent),
53);
54
55#[user_doc(
56 doc_section(label = "Binary String Functions"),
57 description = "Encode binary data into a textual representation.",
58 syntax_example = "encode(expression, format)",
59 argument(
60 name = "expression",
61 description = "Expression containing string or binary data"
62 ),
63 argument(
64 name = "format",
65 description = "Supported formats are: `base64`, `hex`"
66 ),
67 related_udf(name = "decode")
68)]
69#[derive(Debug, PartialEq, Eq, Hash)]
70pub struct EncodeFunc {
71 signature: Signature,
72}
73
74impl Default for EncodeFunc {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl EncodeFunc {
81 pub fn new() -> Self {
82 Self {
83 signature: Signature::coercible(
84 vec![
85 Coercion::new_implicit(
86 TypeSignatureClass::Binary,
87 vec![TypeSignatureClass::Native(logical_string())],
88 NativeType::Binary,
89 ),
90 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
91 ],
92 Volatility::Immutable,
93 ),
94 }
95 }
96}
97
98impl ScalarUDFImpl for EncodeFunc {
99 fn as_any(&self) -> &dyn Any {
100 self
101 }
102
103 fn name(&self) -> &str {
104 "encode"
105 }
106
107 fn signature(&self) -> &Signature {
108 &self.signature
109 }
110
111 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
112 match &arg_types[0] {
113 DataType::LargeBinary => Ok(DataType::LargeUtf8),
114 _ => Ok(DataType::Utf8),
115 }
116 }
117
118 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
119 let [expression, encoding] = take_function_args("encode", &args.args)?;
120 let encoding = Encoding::try_from(encoding)?;
121 match expression {
122 _ if expression.data_type().is_null() => {
123 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
124 }
125 ColumnarValue::Array(array) => encode_array(array, encoding),
126 ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding),
127 }
128 }
129
130 fn documentation(&self) -> Option<&Documentation> {
131 self.doc()
132 }
133}
134
135#[user_doc(
136 doc_section(label = "Binary String Functions"),
137 description = "Decode binary data from textual representation in string.",
138 syntax_example = "decode(expression, format)",
139 argument(
140 name = "expression",
141 description = "Expression containing encoded string data"
142 ),
143 argument(name = "format", description = "Same arguments as [encode](#encode)"),
144 related_udf(name = "encode")
145)]
146#[derive(Debug, PartialEq, Eq, Hash)]
147pub struct DecodeFunc {
148 signature: Signature,
149}
150
151impl Default for DecodeFunc {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157impl DecodeFunc {
158 pub fn new() -> Self {
159 Self {
160 signature: Signature::coercible(
161 vec![
162 Coercion::new_implicit(
163 TypeSignatureClass::Binary,
164 vec![TypeSignatureClass::Native(logical_string())],
165 NativeType::Binary,
166 ),
167 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
168 ],
169 Volatility::Immutable,
170 ),
171 }
172 }
173}
174
175impl ScalarUDFImpl for DecodeFunc {
176 fn as_any(&self) -> &dyn Any {
177 self
178 }
179
180 fn name(&self) -> &str {
181 "decode"
182 }
183
184 fn signature(&self) -> &Signature {
185 &self.signature
186 }
187
188 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
189 match &arg_types[0] {
190 DataType::LargeBinary => Ok(DataType::LargeBinary),
191 _ => Ok(DataType::Binary),
192 }
193 }
194
195 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
196 let [expression, encoding] = take_function_args("decode", &args.args)?;
197 let encoding = Encoding::try_from(encoding)?;
198 match expression {
199 _ if expression.data_type().is_null() => {
200 Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
201 }
202 ColumnarValue::Array(array) => decode_array(array, encoding),
203 ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding),
204 }
205 }
206
207 fn documentation(&self) -> Option<&Documentation> {
208 self.doc()
209 }
210}
211
212fn encode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
213 match value {
214 ScalarValue::Binary(maybe_bytes)
215 | ScalarValue::BinaryView(maybe_bytes)
216 | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
217 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
218 maybe_bytes
219 .as_ref()
220 .map(|bytes| encoding.encode_bytes(bytes)),
221 )))
222 }
223 ScalarValue::LargeBinary(maybe_bytes) => {
224 Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
225 maybe_bytes
226 .as_ref()
227 .map(|bytes| encoding.encode_bytes(bytes)),
228 )))
229 }
230 v => internal_err!("Unexpected value for encode: {v}"),
231 }
232}
233
234fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
235 let array = match array.data_type() {
236 DataType::Binary => encoding.encode_array::<_, i32>(&array.as_binary::<i32>()),
237 DataType::BinaryView => encoding.encode_array::<_, i32>(&array.as_binary_view()),
238 DataType::LargeBinary => {
239 encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
240 }
241 DataType::FixedSizeBinary(_) => {
242 encoding.encode_fsb_array(array.as_fixed_size_binary())
243 }
244 dt => {
245 internal_err!("Unexpected data type for encode: {dt}")
246 }
247 };
248 array.map(ColumnarValue::Array)
249}
250
251fn decode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
252 match value {
253 ScalarValue::Binary(maybe_bytes)
254 | ScalarValue::BinaryView(maybe_bytes)
255 | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
256 Ok(ColumnarValue::Scalar(ScalarValue::Binary(
257 maybe_bytes
258 .as_ref()
259 .map(|x| encoding.decode_bytes(x))
260 .transpose()?,
261 )))
262 }
263 ScalarValue::LargeBinary(maybe_bytes) => {
264 Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
265 maybe_bytes
266 .as_ref()
267 .map(|x| encoding.decode_bytes(x))
268 .transpose()?,
269 )))
270 }
271 v => internal_err!("Unexpected value for decode: {v}"),
272 }
273}
274
275fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) -> usize {
282 let offsets = array.value_offsets();
283 let start = *offsets.first().unwrap();
285 let end = *offsets.last().unwrap();
286 let data_size = end - start;
287 data_size.as_usize()
288}
289
290fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
291 let array = match array.data_type() {
292 DataType::Binary => {
293 let array = array.as_binary::<i32>();
294 encoding.decode_array::<_, i32>(&array, estimate_byte_data_size(array))
295 }
296 DataType::BinaryView => {
297 let array = array.as_binary_view();
298 encoding.decode_array::<_, i32>(&array, array.get_buffer_memory_size())
301 }
302 DataType::LargeBinary => {
303 let array = array.as_binary::<i64>();
304 encoding.decode_array::<_, i64>(&array, estimate_byte_data_size(array))
305 }
306 DataType::FixedSizeBinary(size) => {
307 let array = array.as_fixed_size_binary();
308 let estimate = array.len().saturating_mul(*size as usize);
310 encoding.decode_fsb_array(array, estimate)
311 }
312 dt => {
313 internal_err!("Unexpected data type for decode: {dt}")
314 }
315 };
316 array.map(ColumnarValue::Array)
317}
318
319#[derive(Debug, Copy, Clone)]
320enum Encoding {
321 Base64,
322 Hex,
323}
324
325impl fmt::Display for Encoding {
326 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
327 write!(f, "{}", format!("{self:?}").to_lowercase())
328 }
329}
330
331impl TryFrom<&ColumnarValue> for Encoding {
332 type Error = DataFusionError;
333
334 fn try_from(encoding: &ColumnarValue) -> Result<Self> {
335 let encoding = match encoding {
336 ColumnarValue::Scalar(encoding) => match encoding.try_as_str().flatten() {
337 Some(encoding) => encoding,
338 _ => return exec_err!("Encoding must be a non-null string"),
339 },
340 ColumnarValue::Array(_) => {
341 return not_impl_err!(
342 "Encoding must be a scalar; array specified encoding is not yet supported"
343 );
344 }
345 };
346 match encoding {
347 "base64" => Ok(Self::Base64),
348 "hex" => Ok(Self::Hex),
349 _ => {
350 let options = [Self::Base64, Self::Hex]
351 .iter()
352 .map(|i| i.to_string())
353 .collect::<Vec<_>>()
354 .join(", ");
355 plan_err!(
356 "There is no built-in encoding named '{encoding}', currently supported encodings are: {options}"
357 )
358 }
359 }
360 }
361}
362
363impl Encoding {
364 fn encode_bytes(self, value: &[u8]) -> String {
365 match self {
366 Self::Base64 => BASE64_ENGINE.encode(value),
367 Self::Hex => hex::encode(value),
368 }
369 }
370
371 fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
372 match self {
373 Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
374 exec_datafusion_err!("Failed to decode value using base64: {e}")
375 }),
376 Self::Hex => hex::decode(value).map_err(|e| {
377 exec_datafusion_err!("Failed to decode value using hex: {e}")
378 }),
379 }
380 }
381
382 fn encode_array<'a, InputBinaryArray, OutputOffset>(
384 self,
385 array: &InputBinaryArray,
386 ) -> Result<ArrayRef>
387 where
388 InputBinaryArray: BinaryArrayType<'a>,
389 OutputOffset: OffsetSizeTrait,
390 {
391 match self {
392 Self::Base64 => {
393 let array: GenericStringArray<OutputOffset> = array
394 .iter()
395 .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
396 .collect();
397 Ok(Arc::new(array))
398 }
399 Self::Hex => {
400 let array: GenericStringArray<OutputOffset> =
401 array.iter().map(|x| x.map(hex::encode)).collect();
402 Ok(Arc::new(array))
403 }
404 }
405 }
406
407 fn encode_fsb_array(self, array: &FixedSizeBinaryArray) -> Result<ArrayRef> {
409 match self {
410 Self::Base64 => {
411 let array: GenericStringArray<i32> = array
412 .iter()
413 .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
414 .collect();
415 Ok(Arc::new(array))
416 }
417 Self::Hex => {
418 let array: GenericStringArray<i32> =
419 array.iter().map(|x| x.map(hex::encode)).collect();
420 Ok(Arc::new(array))
421 }
422 }
423 }
424
425 fn decode_array<'a, InputBinaryArray, OutputOffset>(
427 self,
428 value: &InputBinaryArray,
429 approx_data_size: usize,
430 ) -> Result<ArrayRef>
431 where
432 InputBinaryArray: BinaryArrayType<'a>,
433 OutputOffset: OffsetSizeTrait,
434 {
435 fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
436 let out_len = input.len() / 2;
438 let buf = &mut buf[..out_len];
439 hex::decode_to_slice(input, buf)
440 .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
441 Ok(out_len)
442 }
443
444 fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
445 BASE64_ENGINE
446 .decode_slice(input, buf)
447 .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
448 }
449
450 match self {
451 Self::Base64 => {
452 let upper_bound = base64::decoded_len_estimate(approx_data_size);
453 delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound)
454 }
455 Self::Hex => {
456 let upper_bound = approx_data_size / 2;
460 delegated_decode::<_, _, OutputOffset>(hex_decode, value, upper_bound)
461 }
462 }
463 }
464
465 fn decode_fsb_array(
467 self,
468 value: &FixedSizeBinaryArray,
469 approx_data_size: usize,
470 ) -> Result<ArrayRef> {
471 fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
472 let out_len = input.len() / 2;
474 let buf = &mut buf[..out_len];
475 hex::decode_to_slice(input, buf)
476 .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
477 Ok(out_len)
478 }
479
480 fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
481 BASE64_ENGINE
482 .decode_slice(input, buf)
483 .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
484 }
485
486 fn delegated_decode<DecodeFunction>(
487 decode: DecodeFunction,
488 input: &FixedSizeBinaryArray,
489 conservative_upper_bound_size: usize,
490 ) -> Result<ArrayRef>
491 where
492 DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
493 {
494 let mut values = vec![0; conservative_upper_bound_size];
495 let mut offsets = OffsetBufferBuilder::new(input.len());
496 let mut total_bytes_decoded = 0;
497 for v in input.iter() {
498 if let Some(v) = v {
499 let cursor = &mut values[total_bytes_decoded..];
500 let decoded = decode(v, cursor)?;
501 total_bytes_decoded += decoded;
502 offsets.push_length(decoded);
503 } else {
504 offsets.push_length(0);
505 }
506 }
507 values.truncate(total_bytes_decoded);
509 let binary_array = GenericBinaryArray::<i32>::try_new(
510 offsets.finish(),
511 Buffer::from_vec(values),
512 input.nulls().cloned(),
513 )?;
514 Ok(Arc::new(binary_array))
515 }
516
517 match self {
518 Self::Base64 => {
519 let upper_bound = base64::decoded_len_estimate(approx_data_size);
520 delegated_decode(base64_decode, value, upper_bound)
521 }
522 Self::Hex => {
523 let upper_bound = approx_data_size / 2;
527 delegated_decode(hex_decode, value, upper_bound)
528 }
529 }
530 }
531}
532
533fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
534 decode: DecodeFunction,
535 input: &InputBinaryArray,
536 conservative_upper_bound_size: usize,
537) -> Result<ArrayRef>
538where
539 DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
540 InputBinaryArray: BinaryArrayType<'a>,
541 OutputOffset: OffsetSizeTrait,
542{
543 let mut values = vec![0; conservative_upper_bound_size];
544 let mut offsets = OffsetBufferBuilder::new(input.len());
545 let mut total_bytes_decoded = 0;
546 for v in input.iter() {
547 if let Some(v) = v {
548 let cursor = &mut values[total_bytes_decoded..];
549 let decoded = decode(v, cursor)?;
550 total_bytes_decoded += decoded;
551 offsets.push_length(decoded);
552 } else {
553 offsets.push_length(0);
554 }
555 }
556 values.truncate(total_bytes_decoded);
558 let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
559 offsets.finish(),
560 Buffer::from_vec(values),
561 input.nulls().cloned(),
562 )?;
563 Ok(Arc::new(binary_array))
564}
565
566#[cfg(test)]
567mod tests {
568 use arrow::array::BinaryArray;
569 use arrow_buffer::OffsetBuffer;
570
571 use super::*;
572
573 #[test]
574 fn test_estimate_byte_data_size() {
575 let array = BinaryArray::new(
577 OffsetBuffer::new(vec![0, 5, 10, 15].into()),
578 vec![0; 100].into(),
579 None,
580 );
581 let size = estimate_byte_data_size(&array);
582 assert_eq!(size, 15);
583
584 let array = BinaryArray::new(
586 OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()),
587 vec![0; 100].into(),
588 Some(vec![true, false, false, true, true].into()),
589 );
590 let size = estimate_byte_data_size(&array);
591 assert_eq!(size, 31);
592 }
593}