Skip to main content

datafusion_functions/crypto/
basic.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! "crypto" DataFusion functions
19
20use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType};
21use arrow::datatypes::DataType;
22use blake2::{Blake2b512, Blake2s256, Digest};
23use blake3::Hasher as Blake3;
24
25use arrow::compute::StringArrayType;
26use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err, plan_err};
27use datafusion_expr::ColumnarValue;
28use md5::Md5;
29use sha2::{Sha224, Sha256, Sha384, Sha512};
30use std::fmt;
31use std::str::FromStr;
32use std::sync::Arc;
33
34#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
35pub(crate) enum DigestAlgorithm {
36    Md5,
37    Sha224,
38    Sha256,
39    Sha384,
40    Sha512,
41    Blake2s,
42    Blake2b,
43    Blake3,
44}
45
46impl FromStr for DigestAlgorithm {
47    type Err = DataFusionError;
48    fn from_str(name: &str) -> Result<DigestAlgorithm> {
49        Ok(match name {
50            "md5" => Self::Md5,
51            "sha224" => Self::Sha224,
52            "sha256" => Self::Sha256,
53            "sha384" => Self::Sha384,
54            "sha512" => Self::Sha512,
55            "blake2b" => Self::Blake2b,
56            "blake2s" => Self::Blake2s,
57            "blake3" => Self::Blake3,
58            _ => {
59                let options = [
60                    Self::Md5,
61                    Self::Sha224,
62                    Self::Sha256,
63                    Self::Sha384,
64                    Self::Sha512,
65                    Self::Blake2s,
66                    Self::Blake2b,
67                    Self::Blake3,
68                ]
69                .iter()
70                .map(|i| i.to_string())
71                .collect::<Vec<_>>()
72                .join(", ");
73                return plan_err!(
74                    "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
75                );
76            }
77        })
78    }
79}
80
81impl fmt::Display for DigestAlgorithm {
82    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83        write!(f, "{}", format!("{self:?}").to_lowercase())
84    }
85}
86
87macro_rules! digest_to_array {
88    ($METHOD:ident, $INPUT:expr) => {{
89        let binary_array: BinaryArray = $INPUT
90            .iter()
91            .map(|x| x.map(|x| $METHOD::digest(x)))
92            .collect();
93        Arc::new(binary_array)
94    }};
95}
96
97macro_rules! digest_to_scalar {
98    ($METHOD: ident, $INPUT:expr) => {{ ScalarValue::Binary($INPUT.map(|v| $METHOD::digest(v).as_slice().to_vec())) }};
99}
100
101impl DigestAlgorithm {
102    /// digest an optional string to its hash value, null values are returned as is
103    fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
104        ColumnarValue::Scalar(match self {
105            Self::Md5 => digest_to_scalar!(Md5, value),
106            Self::Sha224 => digest_to_scalar!(Sha224, value),
107            Self::Sha256 => digest_to_scalar!(Sha256, value),
108            Self::Sha384 => digest_to_scalar!(Sha384, value),
109            Self::Sha512 => digest_to_scalar!(Sha512, value),
110            Self::Blake2b => digest_to_scalar!(Blake2b512, value),
111            Self::Blake2s => digest_to_scalar!(Blake2s256, value),
112            Self::Blake3 => ScalarValue::Binary(value.map(|v| {
113                let mut digest = Blake3::default();
114                digest.update(v);
115                Blake3::finalize(&digest).as_bytes().to_vec()
116            })),
117        })
118    }
119
120    fn digest_utf8_array_impl<'a, StringArrType>(
121        self,
122        input_value: &StringArrType,
123    ) -> ArrayRef
124    where
125        StringArrType: StringArrayType<'a>,
126    {
127        match self {
128            Self::Md5 => digest_to_array!(Md5, input_value),
129            Self::Sha224 => digest_to_array!(Sha224, input_value),
130            Self::Sha256 => digest_to_array!(Sha256, input_value),
131            Self::Sha384 => digest_to_array!(Sha384, input_value),
132            Self::Sha512 => digest_to_array!(Sha512, input_value),
133            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
134            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
135            Self::Blake3 => {
136                let binary_array: BinaryArray = input_value
137                    .iter()
138                    .map(|opt| {
139                        opt.map(|x| {
140                            let mut digest = Blake3::default();
141                            digest.update(x.as_bytes());
142                            Blake3::finalize(&digest).as_bytes().to_vec()
143                        })
144                    })
145                    .collect();
146                Arc::new(binary_array)
147            }
148        }
149    }
150
151    fn digest_binary_array_impl<'a, BinaryArrType>(
152        self,
153        input_value: &BinaryArrType,
154    ) -> ArrayRef
155    where
156        BinaryArrType: BinaryArrayType<'a>,
157    {
158        match self {
159            Self::Md5 => digest_to_array!(Md5, input_value),
160            Self::Sha224 => digest_to_array!(Sha224, input_value),
161            Self::Sha256 => digest_to_array!(Sha256, input_value),
162            Self::Sha384 => digest_to_array!(Sha384, input_value),
163            Self::Sha512 => digest_to_array!(Sha512, input_value),
164            Self::Blake2b => digest_to_array!(Blake2b512, input_value),
165            Self::Blake2s => digest_to_array!(Blake2s256, input_value),
166            Self::Blake3 => {
167                let binary_array: BinaryArray = input_value
168                    .iter()
169                    .map(|opt| {
170                        opt.map(|x| {
171                            let mut digest = Blake3::default();
172                            digest.update(x);
173                            Blake3::finalize(&digest).as_bytes().to_vec()
174                        })
175                    })
176                    .collect();
177                Arc::new(binary_array)
178            }
179        }
180    }
181}
182
183pub(crate) fn digest_process(
184    value: &ColumnarValue,
185    digest_algorithm: DigestAlgorithm,
186) -> Result<ColumnarValue> {
187    match value {
188        ColumnarValue::Array(a) => {
189            let output = match a.data_type() {
190                DataType::Utf8View => {
191                    digest_algorithm.digest_utf8_array_impl(&a.as_string_view())
192                }
193                DataType::Utf8 => {
194                    digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>())
195                }
196                DataType::LargeUtf8 => {
197                    digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>())
198                }
199                DataType::Binary => {
200                    digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>())
201                }
202                DataType::LargeBinary => {
203                    digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>())
204                }
205                DataType::BinaryView => {
206                    digest_algorithm.digest_binary_array_impl(&a.as_binary_view())
207                }
208                other => {
209                    return exec_err!(
210                        "Unsupported data type {other:?} for function {digest_algorithm}"
211                    );
212                }
213            };
214            Ok(ColumnarValue::Array(output))
215        }
216        ColumnarValue::Scalar(scalar) => {
217            match scalar {
218                ScalarValue::Utf8View(a)
219                | ScalarValue::Utf8(a)
220                | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
221                    .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
222                ScalarValue::Binary(a)
223                | ScalarValue::LargeBinary(a)
224                | ScalarValue::BinaryView(a) => Ok(digest_algorithm
225                    .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
226                other => exec_err!(
227                    "Unsupported data type {other:?} for function {digest_algorithm}"
228                ),
229            }
230        }
231    }
232}