datafusion_functions/crypto/
basic.rs1use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, BinaryArrayType};
21use arrow::datatypes::DataType;
22use blake2::{Blake2b512, Blake2s256, Digest};
23use blake3::Hasher as Blake3;
24
25use arrow::compute::StringArrayType;
26use datafusion_common::{DataFusionError, Result, ScalarValue, exec_err, plan_err};
27use datafusion_expr::ColumnarValue;
28use md5::Md5;
29use sha2::{Sha224, Sha256, Sha384, Sha512};
30use std::fmt;
31use std::str::FromStr;
32use std::sync::Arc;
33
34#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
35pub(crate) enum DigestAlgorithm {
36 Md5,
37 Sha224,
38 Sha256,
39 Sha384,
40 Sha512,
41 Blake2s,
42 Blake2b,
43 Blake3,
44}
45
46impl FromStr for DigestAlgorithm {
47 type Err = DataFusionError;
48 fn from_str(name: &str) -> Result<DigestAlgorithm> {
49 Ok(match name {
50 "md5" => Self::Md5,
51 "sha224" => Self::Sha224,
52 "sha256" => Self::Sha256,
53 "sha384" => Self::Sha384,
54 "sha512" => Self::Sha512,
55 "blake2b" => Self::Blake2b,
56 "blake2s" => Self::Blake2s,
57 "blake3" => Self::Blake3,
58 _ => {
59 let options = [
60 Self::Md5,
61 Self::Sha224,
62 Self::Sha256,
63 Self::Sha384,
64 Self::Sha512,
65 Self::Blake2s,
66 Self::Blake2b,
67 Self::Blake3,
68 ]
69 .iter()
70 .map(|i| i.to_string())
71 .collect::<Vec<_>>()
72 .join(", ");
73 return plan_err!(
74 "There is no built-in digest algorithm named '{name}', currently supported algorithms are: {options}"
75 );
76 }
77 })
78 }
79}
80
81impl fmt::Display for DigestAlgorithm {
82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83 write!(f, "{}", format!("{self:?}").to_lowercase())
84 }
85}
86
87macro_rules! digest_to_array {
88 ($METHOD:ident, $INPUT:expr) => {{
89 let binary_array: BinaryArray = $INPUT
90 .iter()
91 .map(|x| x.map(|x| $METHOD::digest(x)))
92 .collect();
93 Arc::new(binary_array)
94 }};
95}
96
97macro_rules! digest_to_scalar {
98 ($METHOD: ident, $INPUT:expr) => {{ ScalarValue::Binary($INPUT.map(|v| $METHOD::digest(v).as_slice().to_vec())) }};
99}
100
101impl DigestAlgorithm {
102 fn digest_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
104 ColumnarValue::Scalar(match self {
105 Self::Md5 => digest_to_scalar!(Md5, value),
106 Self::Sha224 => digest_to_scalar!(Sha224, value),
107 Self::Sha256 => digest_to_scalar!(Sha256, value),
108 Self::Sha384 => digest_to_scalar!(Sha384, value),
109 Self::Sha512 => digest_to_scalar!(Sha512, value),
110 Self::Blake2b => digest_to_scalar!(Blake2b512, value),
111 Self::Blake2s => digest_to_scalar!(Blake2s256, value),
112 Self::Blake3 => ScalarValue::Binary(value.map(|v| {
113 let mut digest = Blake3::default();
114 digest.update(v);
115 Blake3::finalize(&digest).as_bytes().to_vec()
116 })),
117 })
118 }
119
120 fn digest_utf8_array_impl<'a, StringArrType>(
121 self,
122 input_value: &StringArrType,
123 ) -> ArrayRef
124 where
125 StringArrType: StringArrayType<'a>,
126 {
127 match self {
128 Self::Md5 => digest_to_array!(Md5, input_value),
129 Self::Sha224 => digest_to_array!(Sha224, input_value),
130 Self::Sha256 => digest_to_array!(Sha256, input_value),
131 Self::Sha384 => digest_to_array!(Sha384, input_value),
132 Self::Sha512 => digest_to_array!(Sha512, input_value),
133 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
134 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
135 Self::Blake3 => {
136 let binary_array: BinaryArray = input_value
137 .iter()
138 .map(|opt| {
139 opt.map(|x| {
140 let mut digest = Blake3::default();
141 digest.update(x.as_bytes());
142 Blake3::finalize(&digest).as_bytes().to_vec()
143 })
144 })
145 .collect();
146 Arc::new(binary_array)
147 }
148 }
149 }
150
151 fn digest_binary_array_impl<'a, BinaryArrType>(
152 self,
153 input_value: &BinaryArrType,
154 ) -> ArrayRef
155 where
156 BinaryArrType: BinaryArrayType<'a>,
157 {
158 match self {
159 Self::Md5 => digest_to_array!(Md5, input_value),
160 Self::Sha224 => digest_to_array!(Sha224, input_value),
161 Self::Sha256 => digest_to_array!(Sha256, input_value),
162 Self::Sha384 => digest_to_array!(Sha384, input_value),
163 Self::Sha512 => digest_to_array!(Sha512, input_value),
164 Self::Blake2b => digest_to_array!(Blake2b512, input_value),
165 Self::Blake2s => digest_to_array!(Blake2s256, input_value),
166 Self::Blake3 => {
167 let binary_array: BinaryArray = input_value
168 .iter()
169 .map(|opt| {
170 opt.map(|x| {
171 let mut digest = Blake3::default();
172 digest.update(x);
173 Blake3::finalize(&digest).as_bytes().to_vec()
174 })
175 })
176 .collect();
177 Arc::new(binary_array)
178 }
179 }
180 }
181}
182
183pub(crate) fn digest_process(
184 value: &ColumnarValue,
185 digest_algorithm: DigestAlgorithm,
186) -> Result<ColumnarValue> {
187 match value {
188 ColumnarValue::Array(a) => {
189 let output = match a.data_type() {
190 DataType::Utf8View => {
191 digest_algorithm.digest_utf8_array_impl(&a.as_string_view())
192 }
193 DataType::Utf8 => {
194 digest_algorithm.digest_utf8_array_impl(&a.as_string::<i32>())
195 }
196 DataType::LargeUtf8 => {
197 digest_algorithm.digest_utf8_array_impl(&a.as_string::<i64>())
198 }
199 DataType::Binary => {
200 digest_algorithm.digest_binary_array_impl(&a.as_binary::<i32>())
201 }
202 DataType::LargeBinary => {
203 digest_algorithm.digest_binary_array_impl(&a.as_binary::<i64>())
204 }
205 DataType::BinaryView => {
206 digest_algorithm.digest_binary_array_impl(&a.as_binary_view())
207 }
208 other => {
209 return exec_err!(
210 "Unsupported data type {other:?} for function {digest_algorithm}"
211 );
212 }
213 };
214 Ok(ColumnarValue::Array(output))
215 }
216 ColumnarValue::Scalar(scalar) => {
217 match scalar {
218 ScalarValue::Utf8View(a)
219 | ScalarValue::Utf8(a)
220 | ScalarValue::LargeUtf8(a) => Ok(digest_algorithm
221 .digest_scalar(a.as_ref().map(|s: &String| s.as_bytes()))),
222 ScalarValue::Binary(a)
223 | ScalarValue::LargeBinary(a)
224 | ScalarValue::BinaryView(a) => Ok(digest_algorithm
225 .digest_scalar(a.as_ref().map(|v: &Vec<u8>| v.as_slice()))),
226 other => exec_err!(
227 "Unsupported data type {other:?} for function {digest_algorithm}"
228 ),
229 }
230 }
231 }
232}