1use arrow::{
21 array::{
22 Array, ArrayRef, AsArray, BinaryArrayType, GenericBinaryArray,
23 GenericStringArray, OffsetSizeTrait,
24 },
25 datatypes::DataType,
26};
27use arrow_buffer::{Buffer, OffsetBufferBuilder};
28use base64::{
29 Engine as _,
30 engine::{DecodePaddingMode, GeneralPurpose, GeneralPurposeConfig},
31};
32use datafusion_common::{
33 DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err,
34 not_impl_err, plan_err,
35 types::{NativeType, logical_string},
36 utils::take_function_args,
37};
38use datafusion_expr::{
39 Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
40 TypeSignatureClass, Volatility,
41};
42use datafusion_macros::user_doc;
43use std::any::Any;
44use std::fmt;
45use std::sync::Arc;
46
47const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new(
49 &base64::alphabet::STANDARD,
50 GeneralPurposeConfig::new()
51 .with_encode_padding(false)
52 .with_decode_padding_mode(DecodePaddingMode::Indifferent),
53);
54
55const BASE64_ENGINE_PADDED: GeneralPurpose = GeneralPurpose::new(
57 &base64::alphabet::STANDARD,
58 GeneralPurposeConfig::new().with_encode_padding(true),
59);
60
61#[user_doc(
62 doc_section(label = "Binary String Functions"),
63 description = "Encode binary data into a textual representation.",
64 syntax_example = "encode(expression, format)",
65 argument(
66 name = "expression",
67 description = "Expression containing string or binary data"
68 ),
69 argument(
70 name = "format",
71 description = "Supported formats are: `base64`, `base64pad`, `hex`"
72 ),
73 related_udf(name = "decode")
74)]
75#[derive(Debug, PartialEq, Eq, Hash)]
76pub struct EncodeFunc {
77 signature: Signature,
78}
79
80impl Default for EncodeFunc {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl EncodeFunc {
87 pub fn new() -> Self {
88 Self {
89 signature: Signature::coercible(
90 vec![
91 Coercion::new_implicit(
92 TypeSignatureClass::Binary,
93 vec![TypeSignatureClass::Native(logical_string())],
94 NativeType::Binary,
95 ),
96 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
97 ],
98 Volatility::Immutable,
99 ),
100 }
101 }
102}
103
104impl ScalarUDFImpl for EncodeFunc {
105 fn as_any(&self) -> &dyn Any {
106 self
107 }
108
109 fn name(&self) -> &str {
110 "encode"
111 }
112
113 fn signature(&self) -> &Signature {
114 &self.signature
115 }
116
117 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
118 match &arg_types[0] {
119 DataType::LargeBinary => Ok(DataType::LargeUtf8),
120 _ => Ok(DataType::Utf8),
121 }
122 }
123
124 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
125 let [expression, encoding] = take_function_args("encode", &args.args)?;
126 let encoding = Encoding::try_from(encoding)?;
127 match expression {
128 _ if expression.data_type().is_null() => {
129 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
130 }
131 ColumnarValue::Array(array) => encode_array(array, encoding),
132 ColumnarValue::Scalar(scalar) => encode_scalar(scalar, encoding),
133 }
134 }
135
136 fn documentation(&self) -> Option<&Documentation> {
137 self.doc()
138 }
139}
140
141#[user_doc(
142 doc_section(label = "Binary String Functions"),
143 description = "Decode binary data from textual representation in string.",
144 syntax_example = "decode(expression, format)",
145 argument(
146 name = "expression",
147 description = "Expression containing encoded string data"
148 ),
149 argument(name = "format", description = "Same arguments as [encode](#encode)"),
150 related_udf(name = "encode")
151)]
152#[derive(Debug, PartialEq, Eq, Hash)]
153pub struct DecodeFunc {
154 signature: Signature,
155}
156
157impl Default for DecodeFunc {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163impl DecodeFunc {
164 pub fn new() -> Self {
165 Self {
166 signature: Signature::coercible(
167 vec![
168 Coercion::new_implicit(
169 TypeSignatureClass::Binary,
170 vec![TypeSignatureClass::Native(logical_string())],
171 NativeType::Binary,
172 ),
173 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
174 ],
175 Volatility::Immutable,
176 ),
177 }
178 }
179}
180
181impl ScalarUDFImpl for DecodeFunc {
182 fn as_any(&self) -> &dyn Any {
183 self
184 }
185
186 fn name(&self) -> &str {
187 "decode"
188 }
189
190 fn signature(&self) -> &Signature {
191 &self.signature
192 }
193
194 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
195 match &arg_types[0] {
196 DataType::LargeBinary => Ok(DataType::LargeBinary),
197 _ => Ok(DataType::Binary),
198 }
199 }
200
201 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
202 let [expression, encoding] = take_function_args("decode", &args.args)?;
203 let encoding = Encoding::try_from(encoding)?;
204 match expression {
205 _ if expression.data_type().is_null() => {
206 Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
207 }
208 ColumnarValue::Array(array) => decode_array(array, encoding),
209 ColumnarValue::Scalar(scalar) => decode_scalar(scalar, encoding),
210 }
211 }
212
213 fn documentation(&self) -> Option<&Documentation> {
214 self.doc()
215 }
216}
217
218fn encode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
219 match value {
220 ScalarValue::Binary(maybe_bytes)
221 | ScalarValue::BinaryView(maybe_bytes)
222 | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
223 Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
224 maybe_bytes
225 .as_ref()
226 .map(|bytes| encoding.encode_bytes(bytes)),
227 )))
228 }
229 ScalarValue::LargeBinary(maybe_bytes) => {
230 Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
231 maybe_bytes
232 .as_ref()
233 .map(|bytes| encoding.encode_bytes(bytes)),
234 )))
235 }
236 v => internal_err!("Unexpected value for encode: {v}"),
237 }
238}
239
240fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
241 let array = match array.data_type() {
242 DataType::Binary => encoding.encode_array::<_, i32>(&array.as_binary::<i32>()),
243 DataType::BinaryView => encoding.encode_array::<_, i32>(&array.as_binary_view()),
244 DataType::LargeBinary => {
245 encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
246 }
247 DataType::FixedSizeBinary(_) => {
248 encoding.encode_array::<_, i32>(&array.as_fixed_size_binary())
249 }
250 dt => {
251 internal_err!("Unexpected data type for encode: {dt}")
252 }
253 };
254 array.map(ColumnarValue::Array)
255}
256
257fn decode_scalar(value: &ScalarValue, encoding: Encoding) -> Result<ColumnarValue> {
258 match value {
259 ScalarValue::Binary(maybe_bytes)
260 | ScalarValue::BinaryView(maybe_bytes)
261 | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
262 Ok(ColumnarValue::Scalar(ScalarValue::Binary(
263 maybe_bytes
264 .as_ref()
265 .map(|x| encoding.decode_bytes(x))
266 .transpose()?,
267 )))
268 }
269 ScalarValue::LargeBinary(maybe_bytes) => {
270 Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
271 maybe_bytes
272 .as_ref()
273 .map(|x| encoding.decode_bytes(x))
274 .transpose()?,
275 )))
276 }
277 v => internal_err!("Unexpected value for decode: {v}"),
278 }
279}
280
281fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>) -> usize {
288 let offsets = array.value_offsets();
289 let start = *offsets.first().unwrap();
291 let end = *offsets.last().unwrap();
292 let data_size = end - start;
293 data_size.as_usize()
294}
295
296fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> {
297 let array = match array.data_type() {
298 DataType::Binary => {
299 let array = array.as_binary::<i32>();
300 encoding.decode_array::<_, i32>(&array, estimate_byte_data_size(array))
301 }
302 DataType::BinaryView => {
303 let array = array.as_binary_view();
304 encoding.decode_array::<_, i32>(&array, array.get_buffer_memory_size())
307 }
308 DataType::LargeBinary => {
309 let array = array.as_binary::<i64>();
310 encoding.decode_array::<_, i64>(&array, estimate_byte_data_size(array))
311 }
312 DataType::FixedSizeBinary(size) => {
313 let array = array.as_fixed_size_binary();
314 let estimate = array.len().saturating_mul(*size as usize);
316 encoding.decode_array::<_, i32>(&array, estimate)
317 }
318 dt => {
319 internal_err!("Unexpected data type for decode: {dt}")
320 }
321 };
322 array.map(ColumnarValue::Array)
323}
324
325#[derive(Debug, Copy, Clone)]
326enum Encoding {
327 Base64,
328 Base64Padded,
329 Hex,
330}
331
332impl fmt::Display for Encoding {
333 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
334 let name = match self {
335 Self::Base64 => "base64",
336 Self::Base64Padded => "base64pad",
337 Self::Hex => "hex",
338 };
339 write!(f, "{name}")
340 }
341}
342
343impl TryFrom<&ColumnarValue> for Encoding {
344 type Error = DataFusionError;
345
346 fn try_from(encoding: &ColumnarValue) -> Result<Self> {
347 let encoding = match encoding {
348 ColumnarValue::Scalar(encoding) => match encoding.try_as_str().flatten() {
349 Some(encoding) => encoding,
350 _ => return exec_err!("Encoding must be a non-null string"),
351 },
352 ColumnarValue::Array(_) => {
353 return not_impl_err!(
354 "Encoding must be a scalar; array specified encoding is not yet supported"
355 );
356 }
357 };
358 match encoding {
359 "base64" => Ok(Self::Base64),
360 "base64pad" => Ok(Self::Base64Padded),
361 "hex" => Ok(Self::Hex),
362 _ => {
363 let options = [Self::Base64, Self::Base64Padded, Self::Hex]
364 .iter()
365 .map(|i| i.to_string())
366 .collect::<Vec<_>>()
367 .join(", ");
368 plan_err!(
369 "There is no built-in encoding named '{encoding}', currently supported encodings are: {options}"
370 )
371 }
372 }
373 }
374}
375
376impl Encoding {
377 fn encode_bytes(self, value: &[u8]) -> String {
378 match self {
379 Self::Base64 => BASE64_ENGINE.encode(value),
380 Self::Base64Padded => BASE64_ENGINE_PADDED.encode(value),
381 Self::Hex => hex::encode(value),
382 }
383 }
384
385 fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
386 match self {
387 Self::Base64 | Self::Base64Padded => {
388 BASE64_ENGINE.decode(value).map_err(|e| {
389 exec_datafusion_err!("Failed to decode value using {self}: {e}")
390 })
391 }
392 Self::Hex => hex::decode(value).map_err(|e| {
393 exec_datafusion_err!("Failed to decode value using hex: {e}")
394 }),
395 }
396 }
397
398 fn encode_array<'a, InputBinaryArray, OutputOffset>(
400 self,
401 array: &InputBinaryArray,
402 ) -> Result<ArrayRef>
403 where
404 InputBinaryArray: BinaryArrayType<'a>,
405 OutputOffset: OffsetSizeTrait,
406 {
407 match self {
408 Self::Base64 => {
409 let array: GenericStringArray<OutputOffset> = array
410 .iter()
411 .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
412 .collect();
413 Ok(Arc::new(array))
414 }
415 Self::Base64Padded => {
416 let array: GenericStringArray<OutputOffset> = array
417 .iter()
418 .map(|x| x.map(|x| BASE64_ENGINE_PADDED.encode(x)))
419 .collect();
420 Ok(Arc::new(array))
421 }
422 Self::Hex => {
423 let array: GenericStringArray<OutputOffset> =
424 array.iter().map(|x| x.map(hex::encode)).collect();
425 Ok(Arc::new(array))
426 }
427 }
428 }
429
430 fn decode_array<'a, InputBinaryArray, OutputOffset>(
432 self,
433 value: &InputBinaryArray,
434 approx_data_size: usize,
435 ) -> Result<ArrayRef>
436 where
437 InputBinaryArray: BinaryArrayType<'a>,
438 OutputOffset: OffsetSizeTrait,
439 {
440 fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
441 let out_len = input.len() / 2;
443 let buf = &mut buf[..out_len];
444 hex::decode_to_slice(input, buf)
445 .map_err(|e| exec_datafusion_err!("Failed to decode from hex: {e}"))?;
446 Ok(out_len)
447 }
448
449 fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
450 BASE64_ENGINE
451 .decode_slice(input, buf)
452 .map_err(|e| exec_datafusion_err!("Failed to decode from base64: {e}"))
453 }
454
455 match self {
456 Self::Base64 | Self::Base64Padded => {
457 let upper_bound = base64::decoded_len_estimate(approx_data_size);
458 delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound)
459 }
460 Self::Hex => {
461 let upper_bound = approx_data_size / 2;
465 delegated_decode::<_, _, OutputOffset>(hex_decode, value, upper_bound)
466 }
467 }
468 }
469}
470
471fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
472 decode: DecodeFunction,
473 input: &InputBinaryArray,
474 conservative_upper_bound_size: usize,
475) -> Result<ArrayRef>
476where
477 DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
478 InputBinaryArray: BinaryArrayType<'a>,
479 OutputOffset: OffsetSizeTrait,
480{
481 let mut values = vec![0; conservative_upper_bound_size];
482 let mut offsets = OffsetBufferBuilder::new(input.len());
483 let mut total_bytes_decoded = 0;
484 for v in input.iter() {
485 if let Some(v) = v {
486 let cursor = &mut values[total_bytes_decoded..];
487 let decoded = decode(v, cursor)?;
488 total_bytes_decoded += decoded;
489 offsets.push_length(decoded);
490 } else {
491 offsets.push_length(0);
492 }
493 }
494 values.truncate(total_bytes_decoded);
496 let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
497 offsets.finish(),
498 Buffer::from_vec(values),
499 input.nulls().cloned(),
500 )?;
501 Ok(Arc::new(binary_array))
502}
503
504#[cfg(test)]
505mod tests {
506 use arrow::array::BinaryArray;
507 use arrow_buffer::OffsetBuffer;
508
509 use super::*;
510
511 #[test]
512 fn test_estimate_byte_data_size() {
513 let array = BinaryArray::new(
515 OffsetBuffer::new(vec![0, 5, 10, 15].into()),
516 vec![0; 100].into(),
517 None,
518 );
519 let size = estimate_byte_data_size(&array);
520 assert_eq!(size, 15);
521
522 let array = BinaryArray::new(
524 OffsetBuffer::new(vec![50, 51, 51, 60, 80, 81].into()),
525 vec![0; 100].into(),
526 Some(vec![true, false, false, true, true].into()),
527 );
528 let size = estimate_byte_data_size(&array);
529 assert_eq!(size, 31);
530 }
531}