Skip to main content

datafusion_spark/function/datetime/
extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::array::ArrayRef;
19use arrow::compute::{DatePart, date_part};
20use arrow::datatypes::DataType;
21use datafusion_common::Result;
22use datafusion_common::utils::take_function_args;
23use datafusion_expr::{
24    Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
25    TypeSignatureClass, Volatility,
26};
27use datafusion_functions::utils::make_scalar_function;
28
29/// Creates a signature for datetime extraction functions that accept timestamp types.
30fn extract_signature() -> Signature {
31    Signature::coercible(
32        vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
33        Volatility::Immutable,
34    )
35}
36
37// -----------------------------------------------------------------------------
38// SparkHour
39// -----------------------------------------------------------------------------
40
41#[derive(Debug, PartialEq, Eq, Hash)]
42pub struct SparkHour {
43    signature: Signature,
44}
45
46impl Default for SparkHour {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl SparkHour {
53    pub fn new() -> Self {
54        Self {
55            signature: extract_signature(),
56        }
57    }
58}
59
60impl ScalarUDFImpl for SparkHour {
61    fn name(&self) -> &str {
62        "hour"
63    }
64
65    fn signature(&self) -> &Signature {
66        &self.signature
67    }
68
69    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
70        Ok(DataType::Int32)
71    }
72
73    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
74        make_scalar_function(spark_hour, vec![])(&args.args)
75    }
76}
77
78fn spark_hour(args: &[ArrayRef]) -> Result<ArrayRef> {
79    let [ts_arg] = take_function_args("hour", args)?;
80    let result = date_part(ts_arg.as_ref(), DatePart::Hour)?;
81    Ok(result)
82}
83
84// -----------------------------------------------------------------------------
85// SparkMinute
86// -----------------------------------------------------------------------------
87
88#[derive(Debug, PartialEq, Eq, Hash)]
89pub struct SparkMinute {
90    signature: Signature,
91}
92
93impl Default for SparkMinute {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl SparkMinute {
100    pub fn new() -> Self {
101        Self {
102            signature: extract_signature(),
103        }
104    }
105}
106
107impl ScalarUDFImpl for SparkMinute {
108    fn name(&self) -> &str {
109        "minute"
110    }
111
112    fn signature(&self) -> &Signature {
113        &self.signature
114    }
115
116    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
117        Ok(DataType::Int32)
118    }
119
120    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
121        make_scalar_function(spark_minute, vec![])(&args.args)
122    }
123}
124
125fn spark_minute(args: &[ArrayRef]) -> Result<ArrayRef> {
126    let [ts_arg] = take_function_args("minute", args)?;
127    let result = date_part(ts_arg.as_ref(), DatePart::Minute)?;
128    Ok(result)
129}
130
131// -----------------------------------------------------------------------------
132// SparkSecond
133// -----------------------------------------------------------------------------
134
135#[derive(Debug, PartialEq, Eq, Hash)]
136pub struct SparkSecond {
137    signature: Signature,
138}
139
140impl Default for SparkSecond {
141    fn default() -> Self {
142        Self::new()
143    }
144}
145
146impl SparkSecond {
147    pub fn new() -> Self {
148        Self {
149            signature: extract_signature(),
150        }
151    }
152}
153
154impl ScalarUDFImpl for SparkSecond {
155    fn name(&self) -> &str {
156        "second"
157    }
158
159    fn signature(&self) -> &Signature {
160        &self.signature
161    }
162
163    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
164        Ok(DataType::Int32)
165    }
166
167    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
168        make_scalar_function(spark_second, vec![])(&args.args)
169    }
170}
171
172fn spark_second(args: &[ArrayRef]) -> Result<ArrayRef> {
173    let [ts_arg] = take_function_args("second", args)?;
174    let result = date_part(ts_arg.as_ref(), DatePart::Second)?;
175    Ok(result)
176}
177
178#[cfg(test)]
179mod tests {
180    use super::*;
181    use arrow::array::{Array, Int32Array, TimestampMicrosecondArray};
182    use arrow::datatypes::TimeUnit;
183    use std::sync::Arc;
184
185    #[test]
186    fn test_spark_hour() {
187        // Create a timestamp array: 2024-01-15 14:30:45 UTC (in microseconds)
188        // 14:30:45 -> hour = 14
189        let ts_micros = 1_705_329_045_000_000_i64; // 2024-01-15 14:30:45 UTC
190        let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
191        let ts_array = Arc::new(ts_array) as ArrayRef;
192
193        let result = spark_hour(&[ts_array]).unwrap();
194        let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
195
196        assert_eq!(result.value(0), 14);
197        assert!(result.is_null(1));
198    }
199
200    #[test]
201    fn test_spark_minute() {
202        // 14:30:45 -> minute = 30
203        let ts_micros = 1_705_329_045_000_000_i64;
204        let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
205        let ts_array = Arc::new(ts_array) as ArrayRef;
206
207        let result = spark_minute(&[ts_array]).unwrap();
208        let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
209
210        assert_eq!(result.value(0), 30);
211        assert!(result.is_null(1));
212    }
213
214    #[test]
215    fn test_spark_second() {
216        // 14:30:45 -> second = 45
217        let ts_micros = 1_705_329_045_000_000_i64;
218        let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
219        let ts_array = Arc::new(ts_array) as ArrayRef;
220
221        let result = spark_second(&[ts_array]).unwrap();
222        let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
223
224        assert_eq!(result.value(0), 45);
225        assert!(result.is_null(1));
226    }
227
228    #[test]
229    fn test_hour_return_type() {
230        let func = SparkHour::new();
231        let result = func
232            .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
233            .unwrap();
234        assert_eq!(result, DataType::Int32);
235    }
236
237    #[test]
238    fn test_minute_return_type() {
239        let func = SparkMinute::new();
240        let result = func
241            .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
242            .unwrap();
243        assert_eq!(result, DataType::Int32);
244    }
245
246    #[test]
247    fn test_second_return_type() {
248        let func = SparkSecond::new();
249        let result = func
250            .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
251            .unwrap();
252        assert_eq!(result, DataType::Int32);
253    }
254}