datafusion_functions/crypto/
basic.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! "crypto" DataFusion functions
19
20use arrow::array::{
21    Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType, StringViewArray,
22};
23use arrow::datatypes::DataType;
24use blake2::{Blake2b512, Blake2s256, Digest};
25use blake3::Hasher as Blake3;
26use datafusion_common::cast::as_binary_array;
27
28use arrow::compute::StringArrayType;
29use datafusion_common::{
30    DataFusionError, Result, ScalarValue, exec_err, internal_err, plan_err,
31    utils::take_function_args,
32};
33use datafusion_expr::ColumnarValue;
34use md5::Md5;
35use sha2::{Sha224, Sha256, Sha384, Sha512};
36use std::fmt;
37use std::str::FromStr;
38use std::sync::Arc;
39
40macro_rules! define_digest_function {
41    ($NAME: ident, $METHOD: ident, $DOC: expr) => {
42        #[doc = $DOC]
43        pub fn $NAME(args: &[ColumnarValue]) -> Result<ColumnarValue> {
44            let [data] = take_function_args(&DigestAlgorithm::$METHOD.to_string(), args)?;
45            digest_process(data, DigestAlgorithm::$METHOD)
46        }
47    };
48}
49define_digest_function!(
50    sha224,
51    Sha224,
52    "computes sha224 hash digest of the given input"
53);
54define_digest_function!(
55    sha256,
56    Sha256,
57    "computes sha256 hash digest of the given input"
58);
59define_digest_function!(
60    sha384,
61    Sha384,
62    "computes sha384 hash digest of the given input"
63);
64define_digest_function!(
65    sha512,
66    Sha512,
67    "computes sha512 hash digest of the given input"
68);
69define_digest_function!(
70    blake2b,
71    Blake2b,
72    "computes blake2b hash digest of the given input"
73);
74define_digest_function!(
75    blake2s,
76    Blake2s,
77    "computes blake2s hash digest of the given input"
78);
79define_digest_function!(
80    blake3,
81    Blake3,
82    "computes blake3 hash digest of the given input"
83);
84
85#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
86pub enum DigestAlgorithm {
87    Md5,
88    Sha224,
89    Sha256,
90    Sha384,
91    Sha512,
92    Blake2s,
93    Blake2b,
94    Blake3,
95}
96
97impl FromStr for DigestAlgorithm {
98    type Err = DataFusionError;
99    fn from_str(name: &str) -> Result<DigestAlgorithm> {
100        Ok(match name {
101            "md5" => Self::Md5,
102            "sha224" => Self::Sha224,
103            "sha256" => Self::Sha256,
104            "sha384" => Self::Sha384,
105            "sha512" => Self::Sha512,
106            "blake2b" => Self::Blake2b,
107            "blake2s" => Self::Blake2s,
108            "blake3" => Self::Blake3,
109            _ => {
110                let options = [
111                    Self::Md5,
112                    Self::Sha224,
113                    Self::Sha256,
114                    Self::Sha384,
115                    Self::Sha512,
116                    Self::Blake2s,
117                    Self::Blake2b,
118                    Self::Blake3,
119                ]
120                .iter()
121                .map(|i| i.to_string())
122                .collect::<Vec<_>>()
123                .join(", ");
124                return plan_err!(
125                    "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
126                );
127            }
128        })
129    }
130}
131
132impl fmt::Display for DigestAlgorithm {
133    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
134        write!(f, "{}", format!("{self:?}").to_lowercase())
135    }
136}
137
138/// computes md5 hash digest of the given input
139pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
140    let [data] = take_function_args("md5", args)?;
141    let value = digest_process(data, DigestAlgorithm::Md5)?;
142
143    // md5 requires special handling because of its unique utf8view return type
144    Ok(match value {
145        ColumnarValue::Array(array) => {
146            let binary_array = as_binary_array(&array)?;
147            let string_array: StringViewArray = binary_array
148                .iter()
149                .map(|opt| opt.map(hex_encode::<_>))
150                .collect();
151            ColumnarValue::Array(Arc::new(string_array))
152        }
153        ColumnarValue::Scalar(ScalarValue::Binary(opt)) => {
154            ColumnarValue::Scalar(ScalarValue::Utf8View(opt.map(hex_encode::<_>)))
155        }
156        _ => return internal_err!("Impossibly got invalid results from digest"),
157    })
158}
159
160/// Hex encoding lookup table for fast byte-to-hex conversion
161const HEX_CHARS_LOWER: &[u8; 16] = b"0123456789abcdef";
162
163/// Fast hex encoding using a lookup table instead of format strings.
164/// This is significantly faster than using `write!("{:02x}")` for each byte.
165#[inline]
166fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
167    let bytes = data.as_ref();
168    let mut s = String::with_capacity(bytes.len() * 2);
169    for &b in bytes {
170        s.push(HEX_CHARS_LOWER[(b >> 4) as usize] as char);
171        s.push(HEX_CHARS_LOWER[(b & 0x0f) as usize] as char);
172    }
173    s
174}
175
176macro_rules! digest_to_array {
177    ($METHOD:ident, $INPUT:expr) => {{
178        let binary_array: BinaryArray = $INPUT
179            .iter()
180            .map(|x| x.map(|x| $METHOD::digest(x)))
181            .collect();
182        Arc::new(binary_array)
183    }};
184}
185
186macro_rules! digest_to_scalar {
187    ($METHOD: ident, $INPUT:expr) => {{ ScalarValue::Binary($INPUT.map(|v| $METHOD::digest(v).as_slice().to_vec())) }};
188}
189
190impl DigestAlgorithm {
191    /// digest an optional string to its hash value, null values are returned as is
192    fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
193        ColumnarValue::Scalar(match self {
194            Self::Md5 => digest_to_scalar!(Md5, value),
195            Self::Sha224 => digest_to_scalar!(Sha224, value),
196            Self::Sha256 => digest_to_scalar!(Sha256, value),
197            Self::Sha384 => digest_to_scalar!(Sha384, value),
198            Self::Sha512 => digest_to_scalar!(Sha512, value),
199            Self::Blake2b => digest_to_scalar!(Blake2b512, value),
200            Self::Blake2s => digest_to_scalar!(Blake2s256, value),
201            Self::Blake3 => ScalarValue::Binary(value.map(|v| {
202                let mut digest = Blake3::default();
203                digest.update(v);
204                Blake3::finalize(&digest).as_bytes().to_vec()
205            })),
206        })
207    }
208
209    fn digest_utf8_array_impl<'a, StringArrType>(
210        self,
211        input_value: &StringArrType,
212    ) -> ArrayRef
213    where
214        StringArrType: StringArrayType<'a>,
215    {
216        match self {
217            Self::Md5 => digest_to_array!(Md5, input_value),
218            Self::Sha224 => digest_to_array!(Sha224, input_value),
219            Self::Sha256 => digest_to_array!(Sha256, input_value),
220            Self::Sha384 => digest_to_array!(Sha384, input_value),
221            Self::Sha512 => digest_to_array!(Sha512, input_value),
222            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
223            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
224            Self::Blake3 => {
225                let binary_array: BinaryArray = input_value
226                    .iter()
227                    .map(|opt| {
228                        opt.map(|x| {
229                            let mut digest = Blake3::default();
230                            digest.update(x.as_bytes());
231                            Blake3::finalize(&digest).as_bytes().to_vec()
232                        })
233                    })
234                    .collect();
235                Arc::new(binary_array)
236            }
237        }
238    }
239
240    fn digest_binary_array_impl<'a, BinaryArrType>(
241        self,
242        input_value: &BinaryArrType,
243    ) -> ArrayRef
244    where
245        BinaryArrType: BinaryArrayType<'a>,
246    {
247        match self {
248            Self::Md5 => digest_to_array!(Md5, input_value),
249            Self::Sha224 => digest_to_array!(Sha224, input_value),
250            Self::Sha256 => digest_to_array!(Sha256, input_value),
251            Self::Sha384 => digest_to_array!(Sha384, input_value),
252            Self::Sha512 => digest_to_array!(Sha512, input_value),
253            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
254            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
255            Self::Blake3 => {
256                let binary_array: BinaryArray = input_value
257                    .iter()
258                    .map(|opt| {
259                        opt.map(|x| {
260                            let mut digest = Blake3::default();
261                            digest.update(x);
262                            Blake3::finalize(&digest).as_bytes().to_vec()
263                        })
264                    })
265                    .collect();
266                Arc::new(binary_array)
267            }
268        }
269    }
270}
271
272pub fn digest_process(
273    value: &ColumnarValue,
274    digest_algorithm: DigestAlgorithm,
275) -> Result<ColumnarValue> {
276    match value {
277        ColumnarValue::Array(a) => {
278            let output = match a.data_type() {
279                DataType::Utf8View => {
280                    digest_algorithm.digest_utf8_array_impl(&a.as_string_view())
281                }
282                DataType::Utf8 => {
283                    digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>())
284                }
285                DataType::LargeUtf8 => {
286                    digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>())
287                }
288                DataType::Binary => {
289                    digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>())
290                }
291                DataType::LargeBinary => {
292                    digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>())
293                }
294                DataType::BinaryView => {
295                    digest_algorithm.digest_binary_array_impl(&a.as_binary_view())
296                }
297                other => {
298                    return exec_err!(
299                        "Unsupported data type {other:?} for function {digest_algorithm}"
300                    );
301                }
302            };
303            Ok(ColumnarValue::Array(output))
304        }
305        ColumnarValue::Scalar(scalar) => {
306            match scalar {
307                ScalarValue::Utf8View(a)
308                | ScalarValue::Utf8(a)
309                | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
310                    .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
311                ScalarValue::Binary(a)
312                | ScalarValue::LargeBinary(a)
313                | ScalarValue::BinaryView(a) => Ok(digest_algorithm
314                    .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
315                other => exec_err!(
316                    "Unsupported data type {other:?} for function {digest_algorithm}"
317                ),
318            }
319        }
320    }
321}