datafusion_functions/crypto/
basic.rs1use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType};
21use arrow::datatypes::DataType;
22use blake2::{Blake2b512, Blake2s256};
23use blake3::Hasher as Blake3;
24
25use arrow::compute::StringArrayType;
26use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err, plan_err};
27use datafusion_expr::ColumnarValue;
28use md5::Md5;
29use sha2::{Sha224, Sha256, Sha384, Sha512};
30use std::fmt;
31use std::str::FromStr;
32use std::sync::Arc;
33
34#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
35pub(crate) enum DigestAlgorithm {
36 Md5,
37 Sha224,
38 Sha256,
39 Sha384,
40 Sha512,
41 Blake2s,
42 Blake2b,
43 Blake3,
44}
45
46impl FromStr for DigestAlgorithm {
47 type Err = DataFusionError;
48 fn from_str(name: &str) -> Result<DigestAlgorithm> {
49 Ok(match name {
50 "md5" => Self::Md5,
51 "sha224" => Self::Sha224,
52 "sha256" => Self::Sha256,
53 "sha384" => Self::Sha384,
54 "sha512" => Self::Sha512,
55 "blake2b" => Self::Blake2b,
56 "blake2s" => Self::Blake2s,
57 "blake3" => Self::Blake3,
58 _ => {
59 let options = [
60 Self::Md5,
61 Self::Sha224,
62 Self::Sha256,
63 Self::Sha384,
64 Self::Sha512,
65 Self::Blake2s,
66 Self::Blake2b,
67 Self::Blake3,
68 ]
69 .iter()
70 .map(|i| i.to_string())
71 .collect::<Vec<_>>()
72 .join(", ");
73 return plan_err!(
74 "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
75 );
76 }
77 })
78 }
79}
80
81impl fmt::Display for DigestAlgorithm {
82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83 write!(f, "{}", format!("{self:?}").to_lowercase())
84 }
85}
86
87macro_rules! digest_to_array {
88 ($MODULE:ident, $METHOD:ident, $INPUT:expr) => {{
89 use $MODULE::Digest;
90 let binary_array: BinaryArray = $INPUT
91 .iter()
92 .map(|x| x.map(|x| $METHOD::digest(x)))
93 .collect();
94 Arc::new(binary_array)
95 }};
96}
97
98macro_rules! digest_to_scalar {
99 ($MODULE: ident, $METHOD: ident, $INPUT:expr) => {{
100 use $MODULE::Digest;
101 ScalarValue::Binary($INPUT.map(|v| $METHOD::digest(v).as_slice().to_vec()))
102 }};
103}
104
105impl DigestAlgorithm {
106 fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
108 ColumnarValue::Scalar(match self {
109 Self::Md5 => digest_to_scalar!(md5, Md5, value),
110 Self::Sha224 => digest_to_scalar!(sha2, Sha224, value),
111 Self::Sha256 => digest_to_scalar!(sha2, Sha256, value),
112 Self::Sha384 => digest_to_scalar!(sha2, Sha384, value),
113 Self::Sha512 => digest_to_scalar!(sha2, Sha512, value),
114 Self::Blake2b => digest_to_scalar!(blake2, Blake2b512, value),
115 Self::Blake2s => digest_to_scalar!(blake2, Blake2s256, value),
116 Self::Blake3 => ScalarValue::Binary(value.map(|v| {
117 let mut digest = Blake3::default();
118 digest.update(v);
119 Blake3::finalize(&digest).as_bytes().to_vec()
120 })),
121 })
122 }
123
124 fn digest_utf8_array_impl<'a, StringArrType>(
125 self,
126 input_value: &StringArrType,
127 ) -> ArrayRef
128 where
129 StringArrType: StringArrayType<'a>,
130 {
131 match self {
132 Self::Md5 => digest_to_array!(md5, Md5, input_value),
133 Self::Sha224 => digest_to_array!(sha2, Sha224, input_value),
134 Self::Sha256 => digest_to_array!(sha2, Sha256, input_value),
135 Self::Sha384 => digest_to_array!(sha2, Sha384, input_value),
136 Self::Sha512 => digest_to_array!(sha2, Sha512, input_value),
137 Self::Blake2b => digest_to_array!(blake2, Blake2b512, input_value),
138 Self::Blake2s => digest_to_array!(blake2, Blake2s256, input_value),
139 Self::Blake3 => {
140 let binary_array: BinaryArray = input_value
141 .iter()
142 .map(|opt| {
143 opt.map(|x| {
144 let mut digest = Blake3::default();
145 digest.update(x.as_bytes());
146 Blake3::finalize(&digest).as_bytes().to_vec()
147 })
148 })
149 .collect();
150 Arc::new(binary_array)
151 }
152 }
153 }
154
155 fn digest_binary_array_impl<'a, BinaryArrType>(
156 self,
157 input_value: &BinaryArrType,
158 ) -> ArrayRef
159 where
160 BinaryArrType: BinaryArrayType<'a>,
161 {
162 match self {
163 Self::Md5 => digest_to_array!(md5, Md5, input_value),
164 Self::Sha224 => digest_to_array!(sha2, Sha224, input_value),
165 Self::Sha256 => digest_to_array!(sha2, Sha256, input_value),
166 Self::Sha384 => digest_to_array!(sha2, Sha384, input_value),
167 Self::Sha512 => digest_to_array!(sha2, Sha512, input_value),
168 Self::Blake2b => digest_to_array!(blake2, Blake2b512, input_value),
169 Self::Blake2s => digest_to_array!(blake2, Blake2s256, input_value),
170 Self::Blake3 => {
171 let binary_array: BinaryArray = input_value
172 .iter()
173 .map(|opt| {
174 opt.map(|x| {
175 let mut digest = Blake3::default();
176 digest.update(x);
177 Blake3::finalize(&digest).as_bytes().to_vec()
178 })
179 })
180 .collect();
181 Arc::new(binary_array)
182 }
183 }
184 }
185}
186
187pub(crate) fn digest_process(
188 value: &ColumnarValue,
189 digest_algorithm: DigestAlgorithm,
190) -> Result<ColumnarValue> {
191 match value {
192 ColumnarValue::Array(a) => {
193 let output = match a.data_type() {
194 DataType::Utf8View => {
195 digest_algorithm.digest_utf8_array_impl(&a.as_string_view())
196 }
197 DataType::Utf8 => {
198 digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>())
199 }
200 DataType::LargeUtf8 => {
201 digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>())
202 }
203 DataType::Binary => {
204 digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>())
205 }
206 DataType::LargeBinary => {
207 digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>())
208 }
209 DataType::BinaryView => {
210 digest_algorithm.digest_binary_array_impl(&a.as_binary_view())
211 }
212 other => {
213 return exec_err!(
214 "Unsupported data type {other:?} for function {digest_algorithm}"
215 );
216 }
217 };
218 Ok(ColumnarValue::Array(output))
219 }
220 ColumnarValue::Scalar(scalar) => {
221 match scalar {
222 ScalarValue::Utf8View(a)
223 | ScalarValue::Utf8(a)
224 | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
225 .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
226 ScalarValue::Binary(a)
227 | ScalarValue::LargeBinary(a)
228 | ScalarValue::BinaryView(a) => Ok(digest_algorithm
229 .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
230 other => exec_err!(
231 "Unsupported data type {other:?} for function {digest_algorithm}"
232 ),
233 }
234 }
235 }
236}