datafusion_spark/function/datetime/
extract.rs1use arrow::array::ArrayRef;
19use arrow::compute::{DatePart, date_part};
20use arrow::datatypes::DataType;
21use datafusion_common::Result;
22use datafusion_common::utils::take_function_args;
23use datafusion_expr::{
24 Coercion, ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature,
25 TypeSignatureClass, Volatility,
26};
27use datafusion_functions::utils::make_scalar_function;
28
29fn extract_signature() -> Signature {
31 Signature::coercible(
32 vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
33 Volatility::Immutable,
34 )
35}
36
37#[derive(Debug, PartialEq, Eq, Hash)]
42pub struct SparkHour {
43 signature: Signature,
44}
45
46impl Default for SparkHour {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52impl SparkHour {
53 pub fn new() -> Self {
54 Self {
55 signature: extract_signature(),
56 }
57 }
58}
59
60impl ScalarUDFImpl for SparkHour {
61 fn name(&self) -> &str {
62 "hour"
63 }
64
65 fn signature(&self) -> &Signature {
66 &self.signature
67 }
68
69 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
70 Ok(DataType::Int32)
71 }
72
73 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
74 make_scalar_function(spark_hour, vec![])(&args.args)
75 }
76}
77
78fn spark_hour(args: &[ArrayRef]) -> Result<ArrayRef> {
79 let [ts_arg] = take_function_args("hour", args)?;
80 let result = date_part(ts_arg.as_ref(), DatePart::Hour)?;
81 Ok(result)
82}
83
84#[derive(Debug, PartialEq, Eq, Hash)]
89pub struct SparkMinute {
90 signature: Signature,
91}
92
93impl Default for SparkMinute {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl SparkMinute {
100 pub fn new() -> Self {
101 Self {
102 signature: extract_signature(),
103 }
104 }
105}
106
107impl ScalarUDFImpl for SparkMinute {
108 fn name(&self) -> &str {
109 "minute"
110 }
111
112 fn signature(&self) -> &Signature {
113 &self.signature
114 }
115
116 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
117 Ok(DataType::Int32)
118 }
119
120 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
121 make_scalar_function(spark_minute, vec![])(&args.args)
122 }
123}
124
125fn spark_minute(args: &[ArrayRef]) -> Result<ArrayRef> {
126 let [ts_arg] = take_function_args("minute", args)?;
127 let result = date_part(ts_arg.as_ref(), DatePart::Minute)?;
128 Ok(result)
129}
130
131#[derive(Debug, PartialEq, Eq, Hash)]
136pub struct SparkSecond {
137 signature: Signature,
138}
139
140impl Default for SparkSecond {
141 fn default() -> Self {
142 Self::new()
143 }
144}
145
146impl SparkSecond {
147 pub fn new() -> Self {
148 Self {
149 signature: extract_signature(),
150 }
151 }
152}
153
154impl ScalarUDFImpl for SparkSecond {
155 fn name(&self) -> &str {
156 "second"
157 }
158
159 fn signature(&self) -> &Signature {
160 &self.signature
161 }
162
163 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
164 Ok(DataType::Int32)
165 }
166
167 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
168 make_scalar_function(spark_second, vec![])(&args.args)
169 }
170}
171
172fn spark_second(args: &[ArrayRef]) -> Result<ArrayRef> {
173 let [ts_arg] = take_function_args("second", args)?;
174 let result = date_part(ts_arg.as_ref(), DatePart::Second)?;
175 Ok(result)
176}
177
178#[cfg(test)]
179mod tests {
180 use super::*;
181 use arrow::array::{Array, Int32Array, TimestampMicrosecondArray};
182 use arrow::datatypes::TimeUnit;
183 use std::sync::Arc;
184
185 #[test]
186 fn test_spark_hour() {
187 let ts_micros = 1_705_329_045_000_000_i64; let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
191 let ts_array = Arc::new(ts_array) as ArrayRef;
192
193 let result = spark_hour(&[ts_array]).unwrap();
194 let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
195
196 assert_eq!(result.value(0), 14);
197 assert!(result.is_null(1));
198 }
199
200 #[test]
201 fn test_spark_minute() {
202 let ts_micros = 1_705_329_045_000_000_i64;
204 let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
205 let ts_array = Arc::new(ts_array) as ArrayRef;
206
207 let result = spark_minute(&[ts_array]).unwrap();
208 let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
209
210 assert_eq!(result.value(0), 30);
211 assert!(result.is_null(1));
212 }
213
214 #[test]
215 fn test_spark_second() {
216 let ts_micros = 1_705_329_045_000_000_i64;
218 let ts_array = TimestampMicrosecondArray::from(vec![Some(ts_micros), None]);
219 let ts_array = Arc::new(ts_array) as ArrayRef;
220
221 let result = spark_second(&[ts_array]).unwrap();
222 let result = result.as_any().downcast_ref::<Int32Array>().unwrap();
223
224 assert_eq!(result.value(0), 45);
225 assert!(result.is_null(1));
226 }
227
228 #[test]
229 fn test_hour_return_type() {
230 let func = SparkHour::new();
231 let result = func
232 .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
233 .unwrap();
234 assert_eq!(result, DataType::Int32);
235 }
236
237 #[test]
238 fn test_minute_return_type() {
239 let func = SparkMinute::new();
240 let result = func
241 .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
242 .unwrap();
243 assert_eq!(result, DataType::Int32);
244 }
245
246 #[test]
247 fn test_second_return_type() {
248 let func = SparkSecond::new();
249 let result = func
250 .return_type(&[DataType::Timestamp(TimeUnit::Microsecond, None)])
251 .unwrap();
252 assert_eq!(result, DataType::Int32);
253 }
254}