Skip to main content

datafusion_spark/function/datetime/
extract.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19
20use arrow::array::ArrayRef;
21use arrow::compute::{DatePart, date_part};
22use arrow::datatypes::DataType;
23use datafusion_common::Result;
24use datafusion_common::utils::take_function_args;
25use datafusion_expr::{
26    Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
27    TypeSignatureClass, Volatility,
28};
29use datafusion_functions::utils::make_scalar_function;
30
31/// Creates a signature for datetime extraction functions that accept timestamp types.
32fn extract_signature() -> Signature {
33    Signature::coercible(
34        vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
35        Volatility::Immutable,
36    )
37}
38
39// -----------------------------------------------------------------------------
40// SparkHour
41// -----------------------------------------------------------------------------
42
43#[derive(Debug, PartialEq, Eq, Hash)]
44pub struct SparkHour {
45    signature: Signature,
46}
47
48impl Default for SparkHour {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl SparkHour {
55    pub fn new() -> Self {
56        Self {
57            signature: extract_signature(),
58        }
59    }
60}
61
62impl ScalarUDFImpl for SparkHour {
63    fn as_any(&self) -> &dyn Any {
64        self
65    }
66
67    fn name(&self) -> &str {
68        "hour"
69    }
70
71    fn signature(&self) -> &Signature {
72        &self.signature
73    }
74
75    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
76        Ok(DataType::Int32)
77    }
78
79    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80        make_scalar_function(spark_hour, vec![])(&args.args)
81    }
82}
83
84fn spark_hour(args: &[ArrayRef]) -> Result<ArrayRef> {
85    let [ts_arg] = take_function_args("hour", args)?;
86    let result = date_part(ts_arg.as_ref(), DatePart::Hour)?;
87    Ok(result)
88}
89
90// -----------------------------------------------------------------------------
91// SparkMinute
92// -----------------------------------------------------------------------------
93
94#[derive(Debug, PartialEq, Eq, Hash)]
95pub struct SparkMinute {
96    signature: Signature,
97}
98
99impl Default for SparkMinute {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105impl SparkMinute {
106    pub fn new() -> Self {
107        Self {
108            signature: extract_signature(),
109        }
110    }
111}
112
113impl ScalarUDFImpl for SparkMinute {
114    fn as_any(&self) -> &dyn Any {
115        self
116    }
117
118    fn name(&self) -> &str {
119        "minute"
120    }
121
122    fn signature(&self) -> &Signature {
123        &self.signature
124    }
125
126    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
127        Ok(DataType::Int32)
128    }
129
130    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
131        make_scalar_function(spark_minute, vec![])(&args.args)
132    }
133}
134
135fn spark_minute(args: &[ArrayRef]) -> Result<ArrayRef> {
136    let [ts_arg] = take_function_args("minute", args)?;
137    let result = date_part(ts_arg.as_ref(), DatePart::Minute)?;
138    Ok(result)
139}
140
141// -----------------------------------------------------------------------------
142// SparkSecond
143// -----------------------------------------------------------------------------
144
145#[derive(Debug, PartialEq, Eq, Hash)]
146pub struct SparkSecond {
147    signature: Signature,
148}
149
150impl Default for SparkSecond {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156impl SparkSecond {
157    pub fn new() -> Self {
158        Self {
159            signature: extract_signature(),
160        }
161    }
162}
163
164impl ScalarUDFImpl for SparkSecond {
165    fn as_any(&self) -> &dyn Any {
166        self
167    }
168
169    fn name(&self) -> &str {
170        "second"
171    }
172
173    fn signature(&self) -> &Signature {
174        &self.signature
175    }
176
177    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
178        Ok(DataType::Int32)
179    }
180
181    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
182        make_scalar_function(spark_second, vec![])(&args.args)
183    }
184}
185
186fn spark_second(args: &[ArrayRef]) -> Result<ArrayRef> {
187    let [ts_arg] = take_function_args("second", args)?;
188    let result = date_part(ts_arg.as_ref(), DatePart::Second)?;
189    Ok(result)
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use arrow::array::{Array, Int32Array, TimestampMicrosecondArray};
196    use arrow::datatypes::TimeUnit;
197    use std::sync::Arc;
198
199    #[test]
200    fn test_spark_hour() {
201        // Create a timestamp array: 2024-01-15 14:30:45 UTC (in microseconds)
202        // 14:30:45 -> hour = 14
203        let ts_micros = 1_705_329_045_000_000_i64; // 2024-01-15 14:30:45 UTC
204        let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
205        let ts_array = Arc::new(ts_array) as ArrayRef;
206
207        let result = spark_hour(&[ts_array]).unwrap();
208        let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
209
210        assert_eq!(result.value(0), 14);
211        assert!(result.is_null(1));
212    }
213
214    #[test]
215    fn test_spark_minute() {
216        // 14:30:45 -> minute = 30
217        let ts_micros = 1_705_329_045_000_000_i64;
218        let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
219        let ts_array = Arc::new(ts_array) as ArrayRef;
220
221        let result = spark_minute(&[ts_array]).unwrap();
222        let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
223
224        assert_eq!(result.value(0), 30);
225        assert!(result.is_null(1));
226    }
227
228    #[test]
229    fn test_spark_second() {
230        // 14:30:45 -> second = 45
231        let ts_micros = 1_705_329_045_000_000_i64;
232        let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
233        let ts_array = Arc::new(ts_array) as ArrayRef;
234
235        let result = spark_second(&[ts_array]).unwrap();
236        let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
237
238        assert_eq!(result.value(0), 45);
239        assert!(result.is_null(1));
240    }
241
242    #[test]
243    fn test_hour_return_type() {
244        let func = SparkHour::new();
245        let result = func
246            .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
247            .unwrap();
248        assert_eq!(result, DataType::Int32);
249    }
250
251    #[test]
252    fn test_minute_return_type() {
253        let func = SparkMinute::new();
254        let result = func
255            .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
256            .unwrap();
257        assert_eq!(result, DataType::Int32);
258    }
259
260    #[test]
261    fn test_second_return_type() {
262        let func = SparkSecond::new();
263        let result = func
264            .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
265            .unwrap();
266        assert_eq!(result, DataType::Int32);
267    }
268}