datafusion_spark/function/hash/
sha1.rs1use std::any::Any;
19use std::fmt::Write;
20use std::sync::Arc;
21
22use arrow::array::{ArrayRef, StringArray};
23use arrow::datatypes::DataType;
24use datafusion_common::cast::{
25 as_binary_array, as_binary_view_array, as_large_binary_array,
26};
27use datafusion_common::{exec_err, internal_err, Result};
28use datafusion_expr::{
29 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32use sha1::{Digest, Sha1};
33
34#[derive(Debug, PartialEq, Eq, Hash)]
36pub struct SparkSha1 {
37 signature: Signature,
38 aliases: Vec<String>,
39}
40
41impl Default for SparkSha1 {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl SparkSha1 {
48 pub fn new() -> Self {
49 Self {
50 signature: Signature::user_defined(Volatility::Immutable),
51 aliases: vec!["sha".to_string()],
52 }
53 }
54}
55
56impl ScalarUDFImpl for SparkSha1 {
57 fn as_any(&self) -> &dyn Any {
58 self
59 }
60
61 fn name(&self) -> &str {
62 "sha1"
63 }
64
65 fn aliases(&self) -> &[String] {
66 &self.aliases
67 }
68
69 fn signature(&self) -> &Signature {
70 &self.signature
71 }
72
73 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
74 Ok(DataType::Utf8)
75 }
76
77 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
78 make_scalar_function(spark_sha1, vec![])(&args.args)
79 }
80
81 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
82 if arg_types.len() != 1 {
83 return exec_err!(
84 "`sha1` function requires 1 argument, got {}",
85 arg_types.len()
86 );
87 }
88 match arg_types[0] {
89 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
90 Ok(vec![arg_types[0].clone()])
91 }
92 DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]),
93 DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]),
94 DataType::Null => Ok(vec![DataType::Binary]),
95 _ => exec_err!("`sha1` function does not support type {}", arg_types[0]),
96 }
97 }
98}
99
100fn spark_sha1_digest(value: &[u8]) -> String {
101 let result = Sha1::digest(value);
102 let mut s = String::with_capacity(result.len() * 2);
103 for b in result.as_slice() {
104 #[allow(clippy::unwrap_used)]
105 write!(&mut s, "{b:02x}").unwrap();
106 }
107 s
108}
109
110fn spark_sha1_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRef {
111 let result = input
112 .map(|value| value.map(spark_sha1_digest))
113 .collect::<StringArray>();
114 Arc::new(result)
115}
116
117fn spark_sha1(args: &[ArrayRef]) -> Result<ArrayRef> {
118 let [input] = args else {
119 return internal_err!(
120 "Spark `sha1` function requires 1 argument, got {}",
121 args.len()
122 );
123 };
124
125 match input.data_type() {
126 DataType::Binary => {
127 let input = as_binary_array(input)?;
128 Ok(spark_sha1_impl(input.iter()))
129 }
130 DataType::LargeBinary => {
131 let input = as_large_binary_array(input)?;
132 Ok(spark_sha1_impl(input.iter()))
133 }
134 DataType::BinaryView => {
135 let input = as_binary_view_array(input)?;
136 Ok(spark_sha1_impl(input.iter()))
137 }
138 _ => {
139 exec_err!(
140 "Spark `sha1` function: argument must be binary or large binary, got {:?}",
141 input.data_type()
142 )
143 }
144 }
145}