datafusion_spark/function/datetime/
extract.rs1use std::any::Any;
19
20use arrow::array::ArrayRef;
21use arrow::compute::{DatePart, date_part};
22use arrow::datatypes::DataType;
23use datafusion_common::Result;
24use datafusion_common::utils::take_function_args;
25use datafusion_expr::{
26 Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
27 TypeSignatureClass, Volatility,
28};
29use datafusion_functions::utils::make_scalar_function;
30
31fn extract_signature() -> Signature {
33 Signature::coercible(
34 vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
35 Volatility::Immutable,
36 )
37}
38
39#[derive(Debug, PartialEq, Eq, Hash)]
44pub struct SparkHour {
45 signature: Signature,
46}
47
48impl Default for SparkHour {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl SparkHour {
55 pub fn new() -> Self {
56 Self {
57 signature: extract_signature(),
58 }
59 }
60}
61
62impl ScalarUDFImpl for SparkHour {
63 fn as_any(&self) -> &dyn Any {
64 self
65 }
66
67 fn name(&self) -> &str {
68 "hour"
69 }
70
71 fn signature(&self) -> &Signature {
72 &self.signature
73 }
74
75 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
76 Ok(DataType::Int32)
77 }
78
79 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
80 make_scalar_function(spark_hour, vec![])(&args.args)
81 }
82}
83
84fn spark_hour(args: &[ArrayRef]) -> Result<ArrayRef> {
85 let [ts_arg] = take_function_args("hour", args)?;
86 let result = date_part(ts_arg.as_ref(), DatePart::Hour)?;
87 Ok(result)
88}
89
90#[derive(Debug, PartialEq, Eq, Hash)]
95pub struct SparkMinute {
96 signature: Signature,
97}
98
99impl Default for SparkMinute {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl SparkMinute {
106 pub fn new() -> Self {
107 Self {
108 signature: extract_signature(),
109 }
110 }
111}
112
113impl ScalarUDFImpl for SparkMinute {
114 fn as_any(&self) -> &dyn Any {
115 self
116 }
117
118 fn name(&self) -> &str {
119 "minute"
120 }
121
122 fn signature(&self) -> &Signature {
123 &self.signature
124 }
125
126 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
127 Ok(DataType::Int32)
128 }
129
130 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
131 make_scalar_function(spark_minute, vec![])(&args.args)
132 }
133}
134
135fn spark_minute(args: &[ArrayRef]) -> Result<ArrayRef> {
136 let [ts_arg] = take_function_args("minute", args)?;
137 let result = date_part(ts_arg.as_ref(), DatePart::Minute)?;
138 Ok(result)
139}
140
141#[derive(Debug, PartialEq, Eq, Hash)]
146pub struct SparkSecond {
147 signature: Signature,
148}
149
150impl Default for SparkSecond {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156impl SparkSecond {
157 pub fn new() -> Self {
158 Self {
159 signature: extract_signature(),
160 }
161 }
162}
163
164impl ScalarUDFImpl for SparkSecond {
165 fn as_any(&self) -> &dyn Any {
166 self
167 }
168
169 fn name(&self) -> &str {
170 "second"
171 }
172
173 fn signature(&self) -> &Signature {
174 &self.signature
175 }
176
177 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
178 Ok(DataType::Int32)
179 }
180
181 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
182 make_scalar_function(spark_second, vec![])(&args.args)
183 }
184}
185
186fn spark_second(args: &[ArrayRef]) -> Result<ArrayRef> {
187 let [ts_arg] = take_function_args("second", args)?;
188 let result = date_part(ts_arg.as_ref(), DatePart::Second)?;
189 Ok(result)
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use arrow::array::{Array, Int32Array, TimestampMicrosecondArray};
196 use arrow::datatypes::TimeUnit;
197 use std::sync::Arc;
198
199 #[test]
200 fn test_spark_hour() {
201 let ts_micros = 1_705_329_045_000_000_i64; let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
205 let ts_array = Arc::new(ts_array) as ArrayRef;
206
207 let result = spark_hour(&[ts_array]).unwrap();
208 let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
209
210 assert_eq!(result.value(0), 14);
211 assert!(result.is_null(1));
212 }
213
214 #[test]
215 fn test_spark_minute() {
216 let ts_micros = 1_705_329_045_000_000_i64;
218 let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
219 let ts_array = Arc::new(ts_array) as ArrayRef;
220
221 let result = spark_minute(&[ts_array]).unwrap();
222 let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
223
224 assert_eq!(result.value(0), 30);
225 assert!(result.is_null(1));
226 }
227
228 #[test]
229 fn test_spark_second() {
230 let ts_micros = 1_705_329_045_000_000_i64;
232 let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
233 let ts_array = Arc::new(ts_array) as ArrayRef;
234
235 let result = spark_second(&[ts_array]).unwrap();
236 let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
237
238 assert_eq!(result.value(0), 45);
239 assert!(result.is_null(1));
240 }
241
242 #[test]
243 fn test_hour_return_type() {
244 let func = SparkHour::new();
245 let result = func
246 .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
247 .unwrap();
248 assert_eq!(result, DataType::Int32);
249 }
250
251 #[test]
252 fn test_minute_return_type() {
253 let func = SparkMinute::new();
254 let result = func
255 .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
256 .unwrap();
257 assert_eq!(result, DataType::Int32);
258 }
259
260 #[test]
261 fn test_second_return_type() {
262 let func = SparkSecond::new();
263 let result = func
264 .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
265 .unwrap();
266 assert_eq!(result, DataType::Int32);
267 }
268}