datafusion_spark/function/datetime/
date_sub.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::ArrayRef;
22use arrow::compute;
23use arrow::datatypes::{DataType, Date32Type};
24use arrow::error::ArrowError;
25use datafusion_common::cast::{
26 as_date32_array, as_int16_array, as_int32_array, as_int8_array,
27};
28use datafusion_common::{internal_err, Result};
29use datafusion_expr::{
30 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
31 Volatility,
32};
33use datafusion_functions::utils::make_scalar_function;
34
35#[derive(Debug, PartialEq, Eq, Hash)]
36pub struct SparkDateSub {
37 signature: Signature,
38}
39
40impl Default for SparkDateSub {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl SparkDateSub {
47 pub fn new() -> Self {
48 Self {
49 signature: Signature::one_of(
50 vec![
51 TypeSignature::Exact(vec![DataType::Date32, DataType::Int8]),
52 TypeSignature::Exact(vec![DataType::Date32, DataType::Int16]),
53 TypeSignature::Exact(vec![DataType::Date32, DataType::Int32]),
54 ],
55 Volatility::Immutable,
56 ),
57 }
58 }
59}
60
61impl ScalarUDFImpl for SparkDateSub {
62 fn as_any(&self) -> &dyn Any {
63 self
64 }
65
66 fn name(&self) -> &str {
67 "date_sub"
68 }
69
70 fn signature(&self) -> &Signature {
71 &self.signature
72 }
73
74 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
75 Ok(DataType::Date32)
76 }
77
78 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
79 make_scalar_function(spark_date_sub, vec![])(&args.args)
80 }
81}
82
83fn spark_date_sub(args: &[ArrayRef]) -> Result<ArrayRef> {
84 let [date_arg, days_arg] = args else {
85 return internal_err!(
86 "Spark `date_sub` function requires 2 arguments, got {}",
87 args.len()
88 );
89 };
90 let date_array = as_date32_array(date_arg)?;
91 let result = match days_arg.data_type() {
92 DataType::Int8 => {
93 let days_array = as_int8_array(days_arg)?;
94 compute::try_binary::<_, _, _, Date32Type>(
95 date_array,
96 days_array,
97 |date, days| {
98 date.checked_sub(days as i32).ok_or_else(|| {
99 ArrowError::ArithmeticOverflow("date_sub".to_string())
100 })
101 },
102 )?
103 }
104 DataType::Int16 => {
105 let days_array = as_int16_array(days_arg)?;
106 compute::try_binary::<_, _, _, Date32Type>(
107 date_array,
108 days_array,
109 |date, days| {
110 date.checked_sub(days as i32).ok_or_else(|| {
111 ArrowError::ArithmeticOverflow("date_sub".to_string())
112 })
113 },
114 )?
115 }
116 DataType::Int32 => {
117 let days_array = as_int32_array(days_arg)?;
118 compute::try_binary::<_, _, _, Date32Type>(
119 date_array,
120 days_array,
121 |date, days| {
122 date.checked_sub(days).ok_or_else(|| {
123 ArrowError::ArithmeticOverflow("date_sub".to_string())
124 })
125 },
126 )?
127 }
128 _ => {
129 return internal_err!(
130 "Spark `date_sub` function: argument must be int8, int16, int32, got {:?}",
131 days_arg.data_type()
132 );
133 }
134 };
135 Ok(Arc::new(result))
136}