datafusion_spark/function/hash/
sha1.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{ArrayRef, StringArray};
22use arrow::datatypes::{DataType, Field, FieldRef};
23use datafusion_common::cast::{
24    as_binary_array, as_binary_view_array, as_fixed_size_binary_array,
25    as_large_binary_array,
26};
27use datafusion_common::types::{NativeType, logical_string};
28use datafusion_common::utils::take_function_args;
29use datafusion_common::{Result, internal_err};
30use datafusion_expr::{
31    Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
32    Signature, TypeSignatureClass, Volatility,
33};
34use datafusion_functions::utils::make_scalar_function;
35use sha1::{Digest, Sha1};
36
37/// <https://spark.apache.org/docs/latest/api/sql/index.html#sha1>
38#[derive(Debug, PartialEq, Eq, Hash)]
39pub struct SparkSha1 {
40    signature: Signature,
41    aliases: Vec<String>,
42}
43
44impl Default for SparkSha1 {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl SparkSha1 {
51    pub fn new() -> Self {
52        Self {
53            signature: Signature::coercible(
54                vec![Coercion::new_implicit(
55                    TypeSignatureClass::Binary,
56                    vec![TypeSignatureClass::Native(logical_string())],
57                    NativeType::Binary,
58                )],
59                Volatility::Immutable,
60            ),
61            aliases: vec!["sha".to_string()],
62        }
63    }
64}
65
66impl ScalarUDFImpl for SparkSha1 {
67    fn as_any(&self) -> &dyn Any {
68        self
69    }
70
71    fn name(&self) -> &str {
72        "sha1"
73    }
74
75    fn aliases(&self) -> &[String] {
76        &self.aliases
77    }
78
79    fn signature(&self) -> &Signature {
80        &self.signature
81    }
82
83    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
84        internal_err!("return_field_from_args should be used instead")
85    }
86
87    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
88        let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
89        Ok(Arc::new(Field::new(self.name(), DataType::Utf8, nullable)))
90    }
91
92    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
93        make_scalar_function(spark_sha1, vec![])(&args.args)
94    }
95}
96
97/// Hex encoding lookup table for fast byte-to-hex conversion
98const HEX_CHARS_LOWER: &[u8; 16] = b"0123456789abcdef";
99
100#[inline]
101fn spark_sha1_digest(value: &[u8]) -> String {
102    let result = Sha1::digest(value);
103    let mut s = String::with_capacity(result.len() * 2);
104    for &b in result.as_slice() {
105        s.push(HEX_CHARS_LOWER[(b >> 4) as usize] as char);
106        s.push(HEX_CHARS_LOWER[(b & 0x0f) as usize] as char);
107    }
108    s
109}
110
111fn spark_sha1_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRef {
112    let result = input
113        .map(|value| value.map(spark_sha1_digest))
114        .collect::<StringArray>();
115    Arc::new(result)
116}
117
118fn spark_sha1(args: &[ArrayRef]) -> Result<ArrayRef> {
119    let [input] = take_function_args("sha1", args)?;
120
121    match input.data_type() {
122        DataType::Null => Ok(Arc::new(StringArray::new_null(input.len()))),
123        DataType::Binary => {
124            let input = as_binary_array(input)?;
125            Ok(spark_sha1_impl(input.iter()))
126        }
127        DataType::LargeBinary => {
128            let input = as_large_binary_array(input)?;
129            Ok(spark_sha1_impl(input.iter()))
130        }
131        DataType::BinaryView => {
132            let input = as_binary_view_array(input)?;
133            Ok(spark_sha1_impl(input.iter()))
134        }
135        DataType::FixedSizeBinary(_) => {
136            let input = as_fixed_size_binary_array(input)?;
137            Ok(spark_sha1_impl(input.iter()))
138        }
139        dt => {
140            internal_err!("Unsupported data type for sha1: {dt}")
141        }
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_sha1_nullability() -> Result<()> {
151        let func = SparkSha1::new();
152
153        // Non-nullable input keeps output non-nullable
154        let non_nullable: FieldRef = Arc::new(Field::new("col", DataType::Binary, false));
155        let out = func.return_field_from_args(ReturnFieldArgs {
156            arg_fields: &[Arc::clone(&non_nullable)],
157            scalar_arguments: &[None],
158        })?;
159        assert!(!out.is_nullable());
160        assert_eq!(out.data_type(), &DataType::Utf8);
161
162        // Nullable input makes output nullable
163        let nullable: FieldRef = Arc::new(Field::new("col", DataType::Binary, true));
164        let out = func.return_field_from_args(ReturnFieldArgs {
165            arg_fields: &[Arc::clone(&nullable)],
166            scalar_arguments: &[None],
167        })?;
168        assert!(out.is_nullable());
169        assert_eq!(out.data_type(), &DataType::Utf8);
170
171        Ok(())
172    }
173}