datafusion_spark/function/datetime/
last_day.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{ArrayRef, AsArray, Date32Array};
22use arrow::datatypes::{DataType, Date32Type, Field, FieldRef};
23use chrono::{Datelike, Duration, NaiveDate};
24use datafusion_common::utils::take_function_args;
25use datafusion_common::{Result, ScalarValue, exec_datafusion_err, internal_err};
26use datafusion_expr::{
27 ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature,
28 Volatility,
29};
30
31#[derive(Debug, PartialEq, Eq, Hash)]
32pub struct SparkLastDay {
33 signature: Signature,
34}
35
36impl Default for SparkLastDay {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl SparkLastDay {
43 pub fn new() -> Self {
44 Self {
45 signature: Signature::exact(vec![DataType::Date32], Volatility::Immutable),
46 }
47 }
48}
49
50impl ScalarUDFImpl for SparkLastDay {
51 fn as_any(&self) -> &dyn Any {
52 self
53 }
54
55 fn name(&self) -> &str {
56 "last_day"
57 }
58
59 fn signature(&self) -> &Signature {
60 &self.signature
61 }
62
63 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
64 internal_err!("return_field_from_args should be used instead")
65 }
66
67 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
68 let Some(field) = args.arg_fields.first() else {
69 return internal_err!("Spark `last_day` expects exactly one argument");
70 };
71
72 Ok(Arc::new(Field::new(
73 self.name(),
74 DataType::Date32,
75 field.is_nullable(),
76 )))
77 }
78
79 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80 let ScalarFunctionArgs { args, .. } = args;
81 let [arg] = take_function_args("last_day", args)?;
82 match arg {
83 ColumnarValue::Scalar(ScalarValue::Date32(days)) => {
84 if let Some(days) = days {
85 Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(
86 spark_last_day(days)?,
87 ))))
88 } else {
89 Ok(ColumnarValue::Scalar(ScalarValue::Date32(None)))
90 }
91 }
92 ColumnarValue::Array(array) => {
93 let result = match array.data_type() {
94 DataType::Date32 => {
95 let result: Date32Array = array
96 .as_primitive::<Date32Type>()
97 .try_unary(spark_last_day)?
98 .with_data_type(DataType::Date32);
99 Ok(Arc::new(result) as ArrayRef)
100 }
101 other => {
102 internal_err!(
103 "Unsupported data type {other:?} for Spark function `last_day`"
104 )
105 }
106 }?;
107 Ok(ColumnarValue::Array(result))
108 }
109 other => {
110 internal_err!("Unsupported arg {other:?} for Spark function `last_day")
111 }
112 }
113 }
114}
115
116fn spark_last_day(days: i32) -> Result<i32> {
117 let date = Date32Type::to_naive_date_opt(days).ok_or_else(|| {
118 exec_datafusion_err!(
119 "Spark `last_day`: Unable to convert days value {days} to date"
120 )
121 })?;
122
123 let (year, month) = (date.year(), date.month());
124 let (next_year, next_month) = if month == 12 {
125 (year + 1, 1)
126 } else {
127 (year, month + 1)
128 };
129
130 let first_day_next_month = NaiveDate::from_ymd_opt(next_year, next_month, 1)
131 .ok_or_else(|| {
132 exec_datafusion_err!(
133 "Spark `last_day`: Unable to parse date from {next_year}, {next_month}, 1"
134 )
135 })?;
136
137 Ok(Date32Type::from_naive_date(
138 first_day_next_month - Duration::days(1),
139 ))
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use crate::function::utils::test::test_scalar_function;
146 use arrow::array::{Array, Date32Array};
147 use arrow::datatypes::Field;
148 use datafusion_common::ScalarValue;
149 use datafusion_expr::{ColumnarValue, ReturnFieldArgs};
150
151 #[test]
152 fn test_last_day_nullability_matches_input() {
153 let func = SparkLastDay::new();
154
155 let non_nullable_arg = Arc::new(Field::new("arg", DataType::Date32, false));
156 let nullable_arg = Arc::new(Field::new("arg", DataType::Date32, true));
157
158 let non_nullable_out = func
159 .return_field_from_args(ReturnFieldArgs {
160 arg_fields: &[Arc::clone(&non_nullable_arg)],
161 scalar_arguments: &[None],
162 })
163 .expect("non-nullable arg should succeed");
164 assert_eq!(non_nullable_out.data_type(), &DataType::Date32);
165 assert!(!non_nullable_out.is_nullable());
166
167 let nullable_out = func
168 .return_field_from_args(ReturnFieldArgs {
169 arg_fields: &[Arc::clone(&nullable_arg)],
170 scalar_arguments: &[None],
171 })
172 .expect("nullable arg should succeed");
173 assert_eq!(nullable_out.data_type(), &DataType::Date32);
174 assert!(nullable_out.is_nullable());
175 }
176
177 #[test]
178 fn test_last_day_scalar_evaluation() {
179 test_scalar_function!(
180 SparkLastDay::new(),
181 vec![ColumnarValue::Scalar(ScalarValue::Date32(Some(0)))],
182 Ok(Some(30)),
183 i32,
184 DataType::Date32,
185 Date32Array
186 );
187
188 test_scalar_function!(
189 SparkLastDay::new(),
190 vec![ColumnarValue::Scalar(ScalarValue::Date32(None))],
191 Ok(None),
192 i32,
193 DataType::Date32,
194 Date32Array
195 );
196 }
197}