datafusion_spark/function/hash/
crc32.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{ArrayRef, Int64Array};
22use arrow::datatypes::DataType;
23use crc32fast::Hasher;
24use datafusion_common::cast::{
25 as_binary_array, as_binary_view_array, as_large_binary_array,
26};
27use datafusion_common::{exec_err, internal_err, Result};
28use datafusion_expr::{
29 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
30};
31use datafusion_functions::utils::make_scalar_function;
32
33#[derive(Debug, PartialEq, Eq, Hash)]
35pub struct SparkCrc32 {
36 signature: Signature,
37}
38
39impl Default for SparkCrc32 {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl SparkCrc32 {
46 pub fn new() -> Self {
47 Self {
48 signature: Signature::user_defined(Volatility::Immutable),
49 }
50 }
51}
52
53impl ScalarUDFImpl for SparkCrc32 {
54 fn as_any(&self) -> &dyn Any {
55 self
56 }
57
58 fn name(&self) -> &str {
59 "crc32"
60 }
61
62 fn signature(&self) -> &Signature {
63 &self.signature
64 }
65
66 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
67 Ok(DataType::Int64)
68 }
69
70 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
71 make_scalar_function(spark_crc32, vec![])(&args.args)
72 }
73
74 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
75 if arg_types.len() != 1 {
76 return exec_err!(
77 "`crc32` function requires 1 argument, got {}",
78 arg_types.len()
79 );
80 }
81 match arg_types[0] {
82 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
83 Ok(vec![arg_types[0].clone()])
84 }
85 DataType::Utf8 | DataType::Utf8View => Ok(vec![DataType::Binary]),
86 DataType::LargeUtf8 => Ok(vec![DataType::LargeBinary]),
87 DataType::Null => Ok(vec![DataType::Binary]),
88 _ => exec_err!("`crc32` function does not support type {}", arg_types[0]),
89 }
90 }
91}
92
93fn spark_crc32_digest(value: &[u8]) -> i64 {
94 let mut hasher = Hasher::new();
95 hasher.update(value);
96 hasher.finalize() as i64
97}
98
99fn spark_crc32_impl<'a>(input: impl Iterator<Item = Option<&'a [u8]>>) -> ArrayRef {
100 let result = input
101 .map(|value| value.map(spark_crc32_digest))
102 .collect::<Int64Array>();
103 Arc::new(result)
104}
105
106fn spark_crc32(args: &[ArrayRef]) -> Result<ArrayRef> {
107 let [input] = args else {
108 return internal_err!(
109 "Spark `crc32` function requires 1 argument, got {}",
110 args.len()
111 );
112 };
113
114 match input.data_type() {
115 DataType::Binary => {
116 let input = as_binary_array(input)?;
117 Ok(spark_crc32_impl(input.iter()))
118 }
119 DataType::LargeBinary => {
120 let input = as_large_binary_array(input)?;
121 Ok(spark_crc32_impl(input.iter()))
122 }
123 DataType::BinaryView => {
124 let input = as_binary_view_array(input)?;
125 Ok(spark_crc32_impl(input.iter()))
126 }
127 _ => {
128 exec_err!(
129 "Spark `crc32` function: argument must be binary or large binary, got {:?}",
130 input.data_type()
131 )
132 }
133 }
134}