Skip to main content

datafusion_functions/crypto/
basic.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! "crypto" DataFusion functions
19
20use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType};
21use arrow::datatypes::DataType;
22use blake2::{Blake2b512, Blake2s256};
23use blake3::Hasher as Blake3;
24
25use arrow::compute::StringArrayType;
26use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err, plan_err};
27use datafusion_expr::ColumnarValue;
28use md5::Md5;
29use sha2::{Sha224, Sha256, Sha384, Sha512};
30use std::fmt;
31use std::str::FromStr;
32use std::sync::Arc;
33
34#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
35pub(crate) enum DigestAlgorithm {
36    Md5,
37    Sha224,
38    Sha256,
39    Sha384,
40    Sha512,
41    Blake2s,
42    Blake2b,
43    Blake3,
44}
45
46impl FromStr for DigestAlgorithm {
47    type Err = DataFusionError;
48    fn from_str(name: &str) -> Result<DigestAlgorithm> {
49        Ok(match name {
50            "md5" => Self::Md5,
51            "sha224" => Self::Sha224,
52            "sha256" => Self::Sha256,
53            "sha384" => Self::Sha384,
54            "sha512" => Self::Sha512,
55            "blake2b" => Self::Blake2b,
56            "blake2s" => Self::Blake2s,
57            "blake3" => Self::Blake3,
58            _ => {
59                let options = [
60                    Self::Md5,
61                    Self::Sha224,
62                    Self::Sha256,
63                    Self::Sha384,
64                    Self::Sha512,
65                    Self::Blake2s,
66                    Self::Blake2b,
67                    Self::Blake3,
68                ]
69                .iter()
70                .map(|i| i.to_string())
71                .collect::<Vec<_>>()
72                .join(", ");
73                return plan_err!(
74                    "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
75                );
76            }
77        })
78    }
79}
80
81impl fmt::Display for DigestAlgorithm {
82    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83        write!(f, "{}", format!("{self:?}").to_lowercase())
84    }
85}
86
87macro_rules! digest_to_array {
88    ($MODULE:ident, $METHOD:ident, $INPUT:expr) => {{
89        use $MODULE::Digest;
90        let binary_array: BinaryArray = $INPUT
91            .iter()
92            .map(|x| x.map(|x| $METHOD::digest(x)))
93            .collect();
94        Arc::new(binary_array)
95    }};
96}
97
98macro_rules! digest_to_scalar {
99    ($MODULE: ident, $METHOD: ident, $INPUT:expr) => {{
100        use $MODULE::Digest;
101        ScalarValue::Binary($INPUT.map(|v| $METHOD::digest(v).as_slice().to_vec()))
102    }};
103}
104
105impl DigestAlgorithm {
106    /// digest an optional string to its hash value, null values are returned as is
107    fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
108        ColumnarValue::Scalar(match self {
109            Self::Md5 => digest_to_scalar!(md5, Md5, value),
110            Self::Sha224 => digest_to_scalar!(sha2, Sha224, value),
111            Self::Sha256 => digest_to_scalar!(sha2, Sha256, value),
112            Self::Sha384 => digest_to_scalar!(sha2, Sha384, value),
113            Self::Sha512 => digest_to_scalar!(sha2, Sha512, value),
114            Self::Blake2b => digest_to_scalar!(blake2, Blake2b512, value),
115            Self::Blake2s => digest_to_scalar!(blake2, Blake2s256, value),
116            Self::Blake3 => ScalarValue::Binary(value.map(|v| {
117                let mut digest = Blake3::default();
118                digest.update(v);
119                Blake3::finalize(&digest).as_bytes().to_vec()
120            })),
121        })
122    }
123
124    fn digest_utf8_array_impl<'a, StringArrType>(
125        self,
126        input_value: &StringArrType,
127    ) -> ArrayRef
128    where
129        StringArrType: StringArrayType<'a>,
130    {
131        match self {
132            Self::Md5 => digest_to_array!(md5, Md5, input_value),
133            Self::Sha224 => digest_to_array!(sha2, Sha224, input_value),
134            Self::Sha256 => digest_to_array!(sha2, Sha256, input_value),
135            Self::Sha384 => digest_to_array!(sha2, Sha384, input_value),
136            Self::Sha512 => digest_to_array!(sha2, Sha512, input_value),
137            Self::Blake2b => digest_to_array!(blake2, Blake2b512, input_value),
138            Self::Blake2s => digest_to_array!(blake2, Blake2s256, input_value),
139            Self::Blake3 => {
140                let binary_array: BinaryArray = input_value
141                    .iter()
142                    .map(|opt| {
143                        opt.map(|x| {
144                            let mut digest = Blake3::default();
145                            digest.update(x.as_bytes());
146                            Blake3::finalize(&digest).as_bytes().to_vec()
147                        })
148                    })
149                    .collect();
150                Arc::new(binary_array)
151            }
152        }
153    }
154
155    fn digest_binary_array_impl<'a, BinaryArrType>(
156        self,
157        input_value: &BinaryArrType,
158    ) -> ArrayRef
159    where
160        BinaryArrType: BinaryArrayType<'a>,
161    {
162        match self {
163            Self::Md5 => digest_to_array!(md5, Md5, input_value),
164            Self::Sha224 => digest_to_array!(sha2, Sha224, input_value),
165            Self::Sha256 => digest_to_array!(sha2, Sha256, input_value),
166            Self::Sha384 => digest_to_array!(sha2, Sha384, input_value),
167            Self::Sha512 => digest_to_array!(sha2, Sha512, input_value),
168            Self::Blake2b => digest_to_array!(blake2, Blake2b512, input_value),
169            Self::Blake2s => digest_to_array!(blake2, Blake2s256, input_value),
170            Self::Blake3 => {
171                let binary_array: BinaryArray = input_value
172                    .iter()
173                    .map(|opt| {
174                        opt.map(|x| {
175                            let mut digest = Blake3::default();
176                            digest.update(x);
177                            Blake3::finalize(&digest).as_bytes().to_vec()
178                        })
179                    })
180                    .collect();
181                Arc::new(binary_array)
182            }
183        }
184    }
185}
186
187pub(crate) fn digest_process(
188    value: &ColumnarValue,
189    digest_algorithm: DigestAlgorithm,
190) -> Result<ColumnarValue> {
191    match value {
192        ColumnarValue::Array(a) => {
193            let output = match a.data_type() {
194                DataType::Utf8View => {
195                    digest_algorithm.digest_utf8_array_impl(&a.as_string_view())
196                }
197                DataType::Utf8 => {
198                    digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>())
199                }
200                DataType::LargeUtf8 => {
201                    digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>())
202                }
203                DataType::Binary => {
204                    digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>())
205                }
206                DataType::LargeBinary => {
207                    digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>())
208                }
209                DataType::BinaryView => {
210                    digest_algorithm.digest_binary_array_impl(&a.as_binary_view())
211                }
212                other => {
213                    return exec_err!(
214                        "Unsupported data type {other:?} for function {digest_algorithm}"
215                    );
216                }
217            };
218            Ok(ColumnarValue::Array(output))
219        }
220        ColumnarValue::Scalar(scalar) => {
221            match scalar {
222                ScalarValue::Utf8View(a)
223                | ScalarValue::Utf8(a)
224                | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
225                    .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
226                ScalarValue::Binary(a)
227                | ScalarValue::LargeBinary(a)
228                | ScalarValue::BinaryView(a) => Ok(digest_algorithm
229                    .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
230                other => exec_err!(
231                    "Unsupported data type {other:?} for function {digest_algorithm}"
232                ),
233            }
234        }
235    }
236}