datafusion_functions/crypto/
basic.rs1use arrow::array::{
21 Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType, StringViewArray,
22};
23use arrow::datatypes::DataType;
24use blake2::{Blake2b512, Blake2s256, Digest};
25use blake3::Hasher as Blake3;
26use datafusion_common::cast::as_binary_array;
27
28use arrow::compute::StringArrayType;
29use datafusion_common::{
30 DataFusionError, Result, ScalarValue, exec_err, internal_err, plan_err,
31 utils::take_function_args,
32};
33use datafusion_expr::ColumnarValue;
34use md5::Md5;
35use sha2::{Sha224, Sha256, Sha384, Sha512};
36use std::fmt;
37use std::str::FromStr;
38use std::sync::Arc;
39
40macro_rules! define_digest_function {
41 ($NAME: ident, $METHOD: ident, $DOC: expr) => {
42 #[doc = $DOC]
43 pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
44 let [data] = take_function_args(&DigestAlgorithm::$METHOD.to_string(), args)?;
45 digest_process(data, DigestAlgorithm::$METHOD)
46 }
47 };
48}
49define_digest_function!(
50 sha224,
51 Sha224,
52 "computes sha224 hash digest of the given input"
53);
54define_digest_function!(
55 sha256,
56 Sha256,
57 "computes sha256 hash digest of the given input"
58);
59define_digest_function!(
60 sha384,
61 Sha384,
62 "computes sha384 hash digest of the given input"
63);
64define_digest_function!(
65 sha512,
66 Sha512,
67 "computes sha512 hash digest of the given input"
68);
69define_digest_function!(
70 blake2b,
71 Blake2b,
72 "computes blake2b hash digest of the given input"
73);
74define_digest_function!(
75 blake2s,
76 Blake2s,
77 "computes blake2s hash digest of the given input"
78);
79define_digest_function!(
80 blake3,
81 Blake3,
82 "computes blake3 hash digest of the given input"
83);
84
85#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
86pub enum DigestAlgorithm {
87 Md5,
88 Sha224,
89 Sha256,
90 Sha384,
91 Sha512,
92 Blake2s,
93 Blake2b,
94 Blake3,
95}
96
97impl FromStr for DigestAlgorithm {
98 type Err = DataFusionError;
99 fn from_str(name: &str) -> Result<DigestAlgorithm> {
100 Ok(match name {
101 "md5" => Self::Md5,
102 "sha224" => Self::Sha224,
103 "sha256" => Self::Sha256,
104 "sha384" => Self::Sha384,
105 "sha512" => Self::Sha512,
106 "blake2b" => Self::Blake2b,
107 "blake2s" => Self::Blake2s,
108 "blake3" => Self::Blake3,
109 _ => {
110 let options = [
111 Self::Md5,
112 Self::Sha224,
113 Self::Sha256,
114 Self::Sha384,
115 Self::Sha512,
116 Self::Blake2s,
117 Self::Blake2b,
118 Self::Blake3,
119 ]
120 .iter()
121 .map(|i| i.to_string())
122 .collect::<Vec<_>>()
123 .join(", ");
124 return plan_err!(
125 "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
126 );
127 }
128 })
129 }
130}
131
132impl fmt::Display for DigestAlgorithm {
133 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
134 write!(f, "{}", format!("{self:?}").to_lowercase())
135 }
136}
137
138pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
140 let [data] = take_function_args("md5", args)?;
141 let value = digest_process(data, DigestAlgorithm::Md5)?;
142
143 Ok(match value {
145 ColumnarValue::Array(array) => {
146 let binary_array = as_binary_array(&array)?;
147 let string_array: StringViewArray = binary_array
148 .iter()
149 .map(|opt| opt.map(hex_encode::<_>))
150 .collect();
151 ColumnarValue::Array(Arc::new(string_array))
152 }
153 ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
154 ColumnarValue::Scalar(ScalarValue::Utf8View(opt.map(hex_encode::<_>)))
155 }
156 _ => return internal_err!("Impossibly got invalid results from digest"),
157 })
158}
159
160const HEX_CHARS_LOWER: &[u8; 16] = b"0123456789abcdef";
162
163#[inline]
166fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
167 let bytes = data.as_ref();
168 let mut s = String::with_capacity(bytes.len() * 2);
169 for &b in bytes {
170 s.push(HEX_CHARS_LOWER[(b >> 4) as usize] as char);
171 s.push(HEX_CHARS_LOWER[(b & 0x0f) as usize] as char);
172 }
173 s
174}
175
176macro_rules! digest_to_array {
177 ($METHOD:ident, $INPUT:expr) => {{
178 let binary_array: BinaryArray = $INPUT
179 .iter()
180 .map(|x| x.map(|x| $METHOD::digest(x)))
181 .collect();
182 Arc::new(binary_array)
183 }};
184}
185
186macro_rules! digest_to_scalar {
187 ($METHOD: ident, $INPUT:expr) => {{ ScalarValue::Binary($INPUT.map(|v| $METHOD::digest(v).as_slice().to_vec())) }};
188}
189
190impl DigestAlgorithm {
191 fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
193 ColumnarValue::Scalar(match self {
194 Self::Md5 => digest_to_scalar!(Md5, value),
195 Self::Sha224 => digest_to_scalar!(Sha224, value),
196 Self::Sha256 => digest_to_scalar!(Sha256, value),
197 Self::Sha384 => digest_to_scalar!(Sha384, value),
198 Self::Sha512 => digest_to_scalar!(Sha512, value),
199 Self::Blake2b => digest_to_scalar!(Blake2b512, value),
200 Self::Blake2s => digest_to_scalar!(Blake2s256, value),
201 Self::Blake3 => ScalarValue::Binary(value.map(|v| {
202 let mut digest = Blake3::default();
203 digest.update(v);
204 Blake3::finalize(&digest).as_bytes().to_vec()
205 })),
206 })
207 }
208
209 fn digest_utf8_array_impl<'a, StringArrType>(
210 self,
211 input_value: &StringArrType,
212 ) -> ArrayRef
213 where
214 StringArrType: StringArrayType<'a>,
215 {
216 match self {
217 Self::Md5 => digest_to_array!(Md5, input_value),
218 Self::Sha224 => digest_to_array!(Sha224, input_value),
219 Self::Sha256 => digest_to_array!(Sha256, input_value),
220 Self::Sha384 => digest_to_array!(Sha384, input_value),
221 Self::Sha512 => digest_to_array!(Sha512, input_value),
222 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
223 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
224 Self::Blake3 => {
225 let binary_array: BinaryArray = input_value
226 .iter()
227 .map(|opt| {
228 opt.map(|x| {
229 let mut digest = Blake3::default();
230 digest.update(x.as_bytes());
231 Blake3::finalize(&digest).as_bytes().to_vec()
232 })
233 })
234 .collect();
235 Arc::new(binary_array)
236 }
237 }
238 }
239
240 fn digest_binary_array_impl<'a, BinaryArrType>(
241 self,
242 input_value: &BinaryArrType,
243 ) -> ArrayRef
244 where
245 BinaryArrType: BinaryArrayType<'a>,
246 {
247 match self {
248 Self::Md5 => digest_to_array!(Md5, input_value),
249 Self::Sha224 => digest_to_array!(Sha224, input_value),
250 Self::Sha256 => digest_to_array!(Sha256, input_value),
251 Self::Sha384 => digest_to_array!(Sha384, input_value),
252 Self::Sha512 => digest_to_array!(Sha512, input_value),
253 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
254 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
255 Self::Blake3 => {
256 let binary_array: BinaryArray = input_value
257 .iter()
258 .map(|opt| {
259 opt.map(|x| {
260 let mut digest = Blake3::default();
261 digest.update(x);
262 Blake3::finalize(&digest).as_bytes().to_vec()
263 })
264 })
265 .collect();
266 Arc::new(binary_array)
267 }
268 }
269 }
270}
271
272pub fn digest_process(
273 value: &ColumnarValue,
274 digest_algorithm: DigestAlgorithm,
275) -> Result<ColumnarValue> {
276 match value {
277 ColumnarValue::Array(a) => {
278 let output = match a.data_type() {
279 DataType::Utf8View => {
280 digest_algorithm.digest_utf8_array_impl(&a.as_string_view())
281 }
282 DataType::Utf8 => {
283 digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>())
284 }
285 DataType::LargeUtf8 => {
286 digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>())
287 }
288 DataType::Binary => {
289 digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>())
290 }
291 DataType::LargeBinary => {
292 digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>())
293 }
294 DataType::BinaryView => {
295 digest_algorithm.digest_binary_array_impl(&a.as_binary_view())
296 }
297 other => {
298 return exec_err!(
299 "Unsupported data type {other:?} for function {digest_algorithm}"
300 );
301 }
302 };
303 Ok(ColumnarValue::Array(output))
304 }
305 ColumnarValue::Scalar(scalar) => {
306 match scalar {
307 ScalarValue::Utf8View(a)
308 | ScalarValue::Utf8(a)
309 | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
310 .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
311 ScalarValue::Binary(a)
312 | ScalarValue::LargeBinary(a)
313 | ScalarValue::BinaryView(a) => Ok(digest_algorithm
314 .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
315 other => exec_err!(
316 "Unsupported data type {other:?} for function {digest_algorithm}"
317 ),
318 }
319 }
320 }
321}