datafusion_spark/function/hash/
sha1.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::fmt::Write;
20use std::sync::Arc;
21
22use arrow::array::{ArrayRef, StringArray};
23use arrow::datatypes::DataType;
24use datafusion_common::cast::{
25    as_binary_array, as_binary_view_array, as_large_binary_array,
26};
27use datafusion_common::{exec_err, internal_err, Result};
28use datafusion_expr::{
29    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32use sha1::{Digest, Sha1};
33
34/// <https://spark.apache.org/docs/latest/api/sql/index.html#sha1>
35#[derive(Debug, PartialEq, Eq, Hash)]
36pub struct SparkSha1 {
37    signature: Signature,
38    aliases: Vec<String>,
39}
40
41impl Default for SparkSha1 {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl SparkSha1 {
48    pub fn new() -> Self {
49        Self {
50            signature: Signature::user_defined(Volatility::Immutable),
51            aliases: vec!["sha".to_string()],
52        }
53    }
54}
55
56impl ScalarUDFImpl for SparkSha1 {
57    fn as_any(&self) -> &dyn Any {
58        self
59    }
60
61    fn name(&self) -> &str {
62        "sha1"
63    }
64
65    fn aliases(&self) -> &[String] {
66        &self.aliases
67    }
68
69    fn signature(&self) -> &Signature {
70        &self.signature
71    }
72
73    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
74        Ok(DataType::Utf8)
75    }
76
77    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
78        make_scalar_function(spark_sha1, vec![])(&args.args)
79    }
80
81    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
82        if arg_types.len() != 1 {
83            return exec_err!(
84                "`sha1` function requires 1 argument, got {}",
85                arg_types.len()
86            );
87        }
88        match arg_types[0] {
89            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
90                Ok(vec![arg_types[0].clone()])
91            }
92            DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]),
93            DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]),
94            DataType::Null => Ok(vec![DataType::Binary]),
95            _ => exec_err!("`sha1` function does not support type {}", arg_types[0]),
96        }
97    }
98}
99
100fn spark_sha1_digest(value: &[u8]) -> String {
101    let result = Sha1::digest(value);
102    let mut s = String::with_capacity(result.len() * 2);
103    for b in result.as_slice() {
104        #[allow(clippy::unwrap_used)]
105        write!(&mut s, "{b:02x}").unwrap();
106    }
107    s
108}
109
110fn spark_sha1_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRef {
111    let result = input
112        .map(|value| value.map(spark_sha1_digest))
113        .collect::<StringArray>();
114    Arc::new(result)
115}
116
117fn spark_sha1(args: &[ArrayRef]) -> Result<ArrayRef> {
118    let [input] = args else {
119        return internal_err!(
120            "Spark `sha1` function requires 1 argument, got {}",
121            args.len()
122        );
123    };
124
125    match input.data_type() {
126        DataType::Binary => {
127            let input = as_binary_array(input)?;
128            Ok(spark_sha1_impl(input.iter()))
129        }
130        DataType::LargeBinary => {
131            let input = as_large_binary_array(input)?;
132            Ok(spark_sha1_impl(input.iter()))
133        }
134        DataType::BinaryView => {
135            let input = as_binary_view_array(input)?;
136            Ok(spark_sha1_impl(input.iter()))
137        }
138        _ => {
139            exec_err!(
140                "Spark `sha1` function: argument must be binary or large binary, got {:?}",
141                input.data_type()
142            )
143        }
144    }
145}