datafusion_spark/function/math/
unhex.rs1use arrow::array::{Array, ArrayRef, BinaryBuilder};
19use arrow::datatypes::DataType;
20use datafusion_common::cast::{
21 as_large_string_array, as_string_array, as_string_view_array,
22};
23use datafusion_common::types::logical_string;
24use datafusion_common::utils::take_function_args;
25use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err};
26use datafusion_expr::{
27 Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
28 TypeSignatureClass, Volatility,
29};
30use std::sync::Arc;
31
32#[derive(Debug, PartialEq, Eq, Hash)]
34pub struct SparkUnhex {
35 signature: Signature,
36}
37
38impl Default for SparkUnhex {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl SparkUnhex {
45 pub fn new() -> Self {
46 let string = Coercion::new_exact(TypeSignatureClass::Native(logical_string()));
47
48 Self {
49 signature: Signature::coercible(vec![string], Volatility::Immutable),
50 }
51 }
52}
53
54impl ScalarUDFImpl for SparkUnhex {
55 fn name(&self) -> &str {
56 "unhex"
57 }
58
59 fn signature(&self) -> &Signature {
60 &self.signature
61 }
62
63 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
64 Ok(DataType::Binary)
65 }
66
67 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
68 spark_unhex(&args.args)
69 }
70}
71
72#[inline]
73fn hex_nibble(c: u8) -> Option<u8> {
74 match c {
75 b'0'..=b'9' => Some(c - b'0'),
76 b'a'..=b'f' => Some(c - b'a' + 10),
77 b'A'..=b'F' => Some(c - b'A' + 10),
78 _ => None,
79 }
80}
81
82fn unhex_common(bytes: &[u8], out: &mut Vec<u8>) -> bool {
85 if bytes.is_empty() {
86 return true;
87 }
88
89 let mut i = 0usize;
90
91 if (bytes.len() & 1) == 1 {
93 match hex_nibble(bytes[0]) {
94 Some(lo) => out.push(lo),
96 None => return false,
97 }
98 i = 1;
99 }
100
101 while i + 1 < bytes.len() {
102 match (hex_nibble(bytes[i]), hex_nibble(bytes[i + 1])) {
103 (Some(hi), Some(lo)) => out.push((hi << 4) | lo),
104 _ => return false,
105 }
106 i += 2;
107 }
108
109 true
110}
111
112fn unhex_array<I, T>(
114 iter: I,
115 len: usize,
116 capacity: usize,
117) -> Result<ArrayRef, DataFusionError>
118where
119 I: Iterator<Item = Option<T>>,
120 T: AsRef<str>,
121{
122 let mut builder = BinaryBuilder::with_capacity(len, capacity);
123 let mut buffer = Vec::new();
124
125 for v in iter {
126 if let Some(s) = v {
127 buffer.clear();
128 buffer.reserve(s.as_ref().len().div_ceil(2));
129 if unhex_common(s.as_ref().as_bytes(), &mut buffer) {
130 builder.append_value(&buffer);
131 } else {
132 builder.append_null();
133 }
134 } else {
135 builder.append_null();
136 }
137 }
138
139 Ok(Arc::new(builder.finish()))
140}
141
142fn unhex_scalar(s: &str) -> Option<Vec<u8>> {
144 let mut buffer = Vec::with_capacity(s.len().div_ceil(2));
145 if unhex_common(s.as_bytes(), &mut buffer) {
146 Some(buffer)
147 } else {
148 None
149 }
150}
151
152fn spark_unhex(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
153 let [args] = take_function_args("unhex", args)?;
154
155 match args {
156 ColumnarValue::Array(array) => match array.data_type() {
157 DataType::Utf8 => {
158 let array = as_string_array(array)?;
159 let capacity = array.values().len().div_ceil(2);
160 Ok(ColumnarValue::Array(unhex_array(
161 array.iter(),
162 array.len(),
163 capacity,
164 )?))
165 }
166 DataType::Utf8View => {
167 let array = as_string_view_array(array)?;
168 let capacity = array.len() * 32;
170 Ok(ColumnarValue::Array(unhex_array(
171 array.iter(),
172 array.len(),
173 capacity,
174 )?))
175 }
176 DataType::LargeUtf8 => {
177 let array = as_large_string_array(array)?;
178 let capacity = array.values().len().div_ceil(2);
179 Ok(ColumnarValue::Array(unhex_array(
180 array.iter(),
181 array.len(),
182 capacity,
183 )?))
184 }
185 _ => exec_err!(
186 "unhex only supports string argument, but got: {}",
187 array.data_type()
188 ),
189 },
190 ColumnarValue::Scalar(sv) => match sv {
191 ScalarValue::Utf8(None)
192 | ScalarValue::Utf8View(None)
193 | ScalarValue::LargeUtf8(None) => {
194 Ok(ColumnarValue::Scalar(ScalarValue::Binary(None)))
195 }
196 ScalarValue::Utf8(Some(s))
197 | ScalarValue::Utf8View(Some(s))
198 | ScalarValue::LargeUtf8(Some(s)) => {
199 Ok(ColumnarValue::Scalar(ScalarValue::Binary(unhex_scalar(s))))
200 }
201 _ => {
202 exec_err!(
203 "unhex only supports string argument, but got: {}",
204 sv.data_type()
205 )
206 }
207 },
208 }
209}