datafusion_spark/function/datetime/
last_day.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{ArrayRef, AsArray, Date32Array};
22use arrow::datatypes::{DataType, Date32Type};
23use chrono::{Datelike, Duration, NaiveDate};
24use datafusion_common::{exec_datafusion_err, internal_err, Result, ScalarValue};
25use datafusion_expr::{
26 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
27};
28
29#[derive(Debug, PartialEq, Eq, Hash)]
30pub struct SparkLastDay {
31 signature: Signature,
32}
33
34impl Default for SparkLastDay {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl SparkLastDay {
41 pub fn new() -> Self {
42 Self {
43 signature: Signature::exact(vec![DataType::Date32], Volatility::Immutable),
44 }
45 }
46}
47
48impl ScalarUDFImpl for SparkLastDay {
49 fn as_any(&self) -> &dyn Any {
50 self
51 }
52
53 fn name(&self) -> &str {
54 "last_day"
55 }
56
57 fn signature(&self) -> &Signature {
58 &self.signature
59 }
60
61 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
62 Ok(DataType::Date32)
63 }
64
65 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
66 let ScalarFunctionArgs { args, .. } = args;
67 let [arg] = args.as_slice() else {
68 return internal_err!(
69 "Spark `last_day` function requires 1 argument, got {}",
70 args.len()
71 );
72 };
73 match arg {
74 ColumnarValue::Scalar(ScalarValue::Date32(days)) => {
75 if let Some(days) = days {
76 Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some(
77 spark_last_day(*days)?,
78 ))))
79 } else {
80 Ok(ColumnarValue::Scalar(ScalarValue::Date32(None)))
81 }
82 }
83 ColumnarValue::Array(array) => {
84 let result = match array.data_type() {
85 DataType::Date32 => {
86 let result: Date32Array = array
87 .as_primitive::<Date32Type>()
88 .try_unary(spark_last_day)?
89 .with_data_type(DataType::Date32);
90 Ok(Arc::new(result) as ArrayRef)
91 }
92 other => {
93 internal_err!("Unsupported data type {other:?} for Spark function `last_day`")
94 }
95 }?;
96 Ok(ColumnarValue::Array(result))
97 }
98 other => {
99 internal_err!("Unsupported arg {other:?} for Spark function `last_day")
100 }
101 }
102 }
103}
104
105fn spark_last_day(days: i32) -> Result<i32> {
106 let date = Date32Type::to_naive_date(days);
107
108 let (year, month) = (date.year(), date.month());
109 let (next_year, next_month) = if month == 12 {
110 (year + 1, 1)
111 } else {
112 (year, month + 1)
113 };
114
115 let first_day_next_month = NaiveDate::from_ymd_opt(next_year, next_month, 1)
116 .ok_or_else(|| {
117 exec_datafusion_err!(
118 "Spark `last_day`: Unable to parse date from {next_year}, {next_month}, 1"
119 )
120 })?;
121
122 Ok(Date32Type::from_naive_date(
123 first_day_next_month - Duration::days(1),
124 ))
125}