datafusion_spark/function/hash/
crc32.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{ArrayRef, Int64Array};
22use arrow::datatypes::DataType;
23use crc32fast::Hasher;
24use datafusion_common::cast::{
25    as_binary_array, as_binary_view_array, as_large_binary_array,
26};
27use datafusion_common::{exec_err, internal_err, Result};
28use datafusion_expr::{
29    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32
33/// <https://spark.apache.org/docs/latest/api/sql/index.html#crc32>
34#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct SparkCrc32 {
36    signature: Signature,
37}
38
39impl Default for SparkCrc32 {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl SparkCrc32 {
46    pub fn new() -> Self {
47        Self {
48            signature: Signature::user_defined(Volatility::Immutable),
49        }
50    }
51}
52
53impl ScalarUDFImpl for SparkCrc32 {
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57
58    fn name(&self) -> &str {
59        "crc32"
60    }
61
62    fn signature(&self) -> &Signature {
63        &self.signature
64    }
65
66    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
67        Ok(DataType::Int64)
68    }
69
70    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
71        make_scalar_function(spark_crc32, vec![])(&args.args)
72    }
73
74    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
75        if arg_types.len() != 1 {
76            return exec_err!(
77                "`crc32` function requires 1 argument, got {}",
78                arg_types.len()
79            );
80        }
81        match arg_types[0] {
82            DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
83                Ok(vec![arg_types[0].clone()])
84            }
85            DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]),
86            DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]),
87            DataType::Null => Ok(vec![DataType::Binary]),
88            _ => exec_err!("`crc32` function does not support type {}", arg_types[0]),
89        }
90    }
91}
92
93fn spark_crc32_digest(value: &[u8]) -> i64 {
94    let mut hasher = Hasher::new();
95    hasher.update(value);
96    hasher.finalize() as i64
97}
98
99fn spark_crc32_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRef {
100    let result = input
101        .map(|value| value.map(spark_crc32_digest))
102        .collect::<Int64Array>();
103    Arc::new(result)
104}
105
106fn spark_crc32(args: &[ArrayRef]) -> Result<ArrayRef> {
107    let [input] = args else {
108        return internal_err!(
109            "Spark `crc32` function requires 1 argument, got {}",
110            args.len()
111        );
112    };
113
114    match input.data_type() {
115        DataType::Binary => {
116            let input = as_binary_array(input)?;
117            Ok(spark_crc32_impl(input.iter()))
118        }
119        DataType::LargeBinary => {
120            let input = as_large_binary_array(input)?;
121            Ok(spark_crc32_impl(input.iter()))
122        }
123        DataType::BinaryView => {
124            let input = as_binary_view_array(input)?;
125            Ok(spark_crc32_impl(input.iter()))
126        }
127        _ => {
128            exec_err!(
129                "Spark `crc32` function: argument must be binary or large binary, got {:?}",
130                input.data_type()
131            )
132        }
133    }
134}