datafusion_spark/function/datetime/
last_day.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{ArrayRef, AsArray, Date32Array};
22use arrow::datatypes::{DataType, Date32Type, Field, FieldRef};
23use chrono::{Datelike, Duration, NaiveDate};
24use datafusion_common::utils::take_function_args;
25use datafusion_common::{Result, ScalarValue, exec_datafusion_err, internal_err};
26use datafusion_expr::{
27 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
28 Volatility,
29};
30
31#[derive(Debug, PartialEq, Eq, Hash)]
32pub struct SparkLastDay {
33 signature: Signature,
34}
35
36impl Default for SparkLastDay {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl SparkLastDay {
43 pub fn new() -> Self {
44 Self {
45 signature: Signature::exact(vec![DataType::Date32], Volatility::Immutable),
46 }
47 }
48}
49
50impl ScalarUDFImpl for SparkLastDay {
51 fn as_any(&self) -> &dyn Any {
52 self
53 }
54
55 fn name(&self) -> &str {
56 "last_day"
57 }
58
59 fn signature(&self) -> &Signature {
60 &self.signature
61 }
62
63 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
64 internal_err!("return_field_from_args should be used instead")
65 }
66
67 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
68 let Some(field) = args.arg_fields.first() else {
69 return internal_err!("Spark `last_day` expects exactly one argument");
70 };
71
72 Ok(Arc::new(Field::new(
73 self.name(),
74 DataType::Date32,
75 field.is_nullable(),
76 )))
77 }
78
79 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80 let ScalarFunctionArgs { args, .. } = args;
81 let [arg] = take_function_args("last_day", args)?;
82 match arg {
83 ColumnarValue::Scalar(ScalarValue::Date32(days)) => {
84 if let Some(days) = days {
85 Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(
86 spark_last_day(days)?,
87 ))))
88 } else {
89 Ok(ColumnarValue::Scalar(ScalarValue::Date32(None)))
90 }
91 }
92 ColumnarValue::Array(array) => {
93 let result = match array.data_type() {
94 DataType::Date32 => {
95 let result: Date32Array = array
96 .as_primitive::<Date32Type>()
97 .try_unary(spark_last_day)?
98 .with_data_type(DataType::Date32);
99 Ok(Arc::new(result) as ArrayRef)
100 }
101 other => {
102 internal_err!(
103 "Unsupported data type {other:?} for Spark function `last_day`"
104 )
105 }
106 }?;
107 Ok(ColumnarValue::Array(result))
108 }
109 other => {
110 internal_err!("Unsupported arg {other:?} for Spark function `last_day")
111 }
112 }
113 }
114}
115
116fn spark_last_day(days: i32) -> Result<i32> {
117 let date = Date32Type::to_naive_date(days);
118
119 let (year, month) = (date.year(), date.month());
120 let (next_year, next_month) = if month == 12 {
121 (year + 1, 1)
122 } else {
123 (year, month + 1)
124 };
125
126 let first_day_next_month = NaiveDate::from_ymd_opt(next_year, next_month, 1)
127 .ok_or_else(|| {
128 exec_datafusion_err!(
129 "Spark `last_day`: Unable to parse date from {next_year}, {next_month}, 1"
130 )
131 })?;
132
133 Ok(Date32Type::from_naive_date(
134 first_day_next_month - Duration::days(1),
135 ))
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use crate::function::utils::test::test_scalar_function;
142 use arrow::array::{Array, Date32Array};
143 use arrow::datatypes::Field;
144 use datafusion_common::ScalarValue;
145 use datafusion_expr::{ColumnarValue, ReturnFieldArgs};
146
147 #[test]
148 fn test_last_day_nullability_matches_input() {
149 let func = SparkLastDay::new();
150
151 let non_nullable_arg = Arc::new(Field::new("arg", DataType::Date32, false));
152 let nullable_arg = Arc::new(Field::new("arg", DataType::Date32, true));
153
154 let non_nullable_out = func
155 .return_field_from_args(ReturnFieldArgs {
156 arg_fields: &[Arc::clone(&non_nullable_arg)],
157 scalar_arguments: &[None],
158 })
159 .expect("non-nullable arg should succeed");
160 assert_eq!(non_nullable_out.data_type(), &DataType::Date32);
161 assert!(!non_nullable_out.is_nullable());
162
163 let nullable_out = func
164 .return_field_from_args(ReturnFieldArgs {
165 arg_fields: &[Arc::clone(&nullable_arg)],
166 scalar_arguments: &[None],
167 })
168 .expect("nullable arg should succeed");
169 assert_eq!(nullable_out.data_type(), &DataType::Date32);
170 assert!(nullable_out.is_nullable());
171 }
172
173 #[test]
174 fn test_last_day_scalar_evaluation() {
175 test_scalar_function!(
176 SparkLastDay::new(),
177 vec![ColumnarValue::Scalar(ScalarValue::Date32(Some(0)))],
178 Ok(Some(30)),
179 i32,
180 DataType::Date32,
181 Date32Array
182 );
183
184 test_scalar_function!(
185 SparkLastDay::new(),
186 vec![ColumnarValue::Scalar(ScalarValue::Date32(None))],
187 Ok(None),
188 i32,
189 DataType::Date32,
190 Date32Array
191 );
192 }
193}