1use std::any::Any;
19use std::str::FromStr;
20use std::sync::Arc;
21
22use arrow::array::{Array, ArrayRef, Float64Array, Int32Array};
23use arrow::compute::kernels::cast_utils::IntervalUnit;
24use arrow::compute::{binary, date_part, DatePart};
25use arrow::datatypes::DataType::{
26 Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
27};
28use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
29use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
30use datafusion_common::types::{logical_date, NativeType};
31
32use datafusion_common::{
33 cast::{
34 as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array,
35 as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
36 as_timestamp_microsecond_array, as_timestamp_millisecond_array,
37 as_timestamp_nanosecond_array, as_timestamp_second_array,
38 },
39 exec_err, internal_err, not_impl_err,
40 types::logical_string,
41 utils::take_function_args,
42 Result, ScalarValue,
43};
44use datafusion_expr::{
45 ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
46 TypeSignature, Volatility,
47};
48use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
49use datafusion_macros::user_doc;
50
51#[user_doc(
52 doc_section(label = "Time and Date Functions"),
53 description = "Returns the specified part of the date as an integer.",
54 syntax_example = "date_part(part, expression)",
55 alternative_syntax = "extract(field FROM source)",
56 argument(
57 name = "part",
58 description = r#"Part of the date to return. The following date parts are supported:
59
60 - year
61 - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
62 - month
63 - week (week of the year)
64 - day (day of the month)
65 - hour
66 - minute
67 - second
68 - millisecond
69 - microsecond
70 - nanosecond
71 - dow (day of the week)
72 - doy (day of the year)
73 - epoch (seconds since Unix epoch)
74"#
75 ),
76 argument(
77 name = "expression",
78 description = "Time expression to operate on. Can be a constant, column, or function."
79 )
80)]
81#[derive(Debug)]
82pub struct DatePartFunc {
83 signature: Signature,
84 aliases: Vec<String>,
85}
86
87impl Default for DatePartFunc {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl DatePartFunc {
94 pub fn new() -> Self {
95 Self {
96 signature: Signature::one_of(
97 vec![
98 TypeSignature::Coercible(vec![
99 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
100 Coercion::new_implicit(
101 TypeSignatureClass::Timestamp,
102 vec![TypeSignatureClass::Native(logical_string())],
104 NativeType::Timestamp(Nanosecond, None),
105 ),
106 ]),
107 TypeSignature::Coercible(vec![
108 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
109 Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
110 ]),
111 TypeSignature::Coercible(vec![
112 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
113 Coercion::new_exact(TypeSignatureClass::Time),
114 ]),
115 TypeSignature::Coercible(vec![
116 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
117 Coercion::new_exact(TypeSignatureClass::Interval),
118 ]),
119 TypeSignature::Coercible(vec![
120 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
121 Coercion::new_exact(TypeSignatureClass::Duration),
122 ]),
123 ],
124 Volatility::Immutable,
125 ),
126 aliases: vec![String::from("datepart")],
127 }
128 }
129}
130
131impl ScalarUDFImpl for DatePartFunc {
132 fn as_any(&self) -> &dyn Any {
133 self
134 }
135
136 fn name(&self) -> &str {
137 "date_part"
138 }
139
140 fn signature(&self) -> &Signature {
141 &self.signature
142 }
143
144 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
145 internal_err!("return_field_from_args should be called instead")
146 }
147
148 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
149 let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
150
151 field
152 .and_then(|sv| {
153 sv.try_as_str()
154 .flatten()
155 .filter(|s| !s.is_empty())
156 .map(|part| {
157 if is_epoch(part) {
158 Field::new(self.name(), DataType::Float64, true)
159 } else {
160 Field::new(self.name(), DataType::Int32, true)
161 }
162 })
163 })
164 .map(Arc::new)
165 .map_or_else(
166 || exec_err!("{} requires non-empty constant string", self.name()),
167 Ok,
168 )
169 }
170
171 fn invoke_with_args(
172 &self,
173 args: datafusion_expr::ScalarFunctionArgs,
174 ) -> Result<ColumnarValue> {
175 let args = args.args;
176 let [part, array] = take_function_args(self.name(), args)?;
177
178 let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
179 v
180 } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part {
181 v
182 } else {
183 return exec_err!(
184 "First argument of `DATE_PART` must be non-null scalar Utf8"
185 );
186 };
187
188 let is_scalar = matches!(array, ColumnarValue::Scalar(_));
189
190 let array = match array {
191 ColumnarValue::Array(array) => Arc::clone(&array),
192 ColumnarValue::Scalar(scalar) => scalar.to_array()?,
193 };
194
195 let part_trim = part_normalization(&part);
196
197 let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
200 match interval_unit {
201 IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
202 IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
203 IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
204 IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
205 IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?,
206 IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?,
207 IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?,
208 IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
209 IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
210 IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?,
211 _ => return exec_err!("Date part '{part}' not supported"),
213 }
214 } else {
215 match part_trim.to_lowercase().as_str() {
217 "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
218 "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
219 "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
220 "epoch" => epoch(array.as_ref())?,
221 _ => return exec_err!("Date part '{part}' not supported"),
222 }
223 };
224
225 Ok(if is_scalar {
226 ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
227 } else {
228 ColumnarValue::Array(arr)
229 })
230 }
231
232 fn aliases(&self) -> &[String] {
233 &self.aliases
234 }
235
236 fn documentation(&self) -> Option<&Documentation> {
237 self.doc()
238 }
239}
240
241fn is_epoch(part: &str) -> bool {
242 let part = part_normalization(part);
243 matches!(part.to_lowercase().as_str(), "epoch")
244}
245
246fn part_normalization(part: &str) -> &str {
248 part.strip_prefix(|c| c == '\'' || c == '\"')
249 .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
250 .unwrap_or(part)
251}
252
253fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
257 if unit == Nanosecond {
260 return not_impl_err!("Date part {unit:?} not supported");
261 }
262
263 let conversion_factor = match unit {
264 Second => 1_000_000_000,
265 Millisecond => 1_000_000,
266 Microsecond => 1_000,
267 Nanosecond => 1,
268 };
269
270 let second_factor = match unit {
271 Second => 1,
272 Millisecond => 1_000,
273 Microsecond => 1_000_000,
274 Nanosecond => 1_000_000_000,
275 };
276
277 let secs = date_part(array, DatePart::Second)?;
278 let secs = as_int32_array(secs.as_ref())?;
280 let subsecs = date_part(array, DatePart::Nanosecond)?;
281 let subsecs = as_int32_array(subsecs.as_ref())?;
282
283 if subsecs.null_count() == 0 {
285 let r: Int32Array = binary(secs, subsecs, |secs, subsecs| {
286 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
287 })?;
288 Ok(Arc::new(r))
289 } else {
290 let r: Int32Array = secs
293 .iter()
294 .zip(subsecs)
295 .map(|(secs, subsecs)| {
296 secs.map(|secs| {
297 let subsecs = subsecs.unwrap_or(0);
298 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
299 })
300 })
301 .collect();
302 Ok(Arc::new(r))
303 }
304}
305
306fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
312 let sf = match unit {
313 Second => 1_f64,
314 Millisecond => 1_000_f64,
315 Microsecond => 1_000_000_f64,
316 Nanosecond => 1_000_000_000_f64,
317 };
318 let secs = date_part(array, DatePart::Second)?;
319 let secs = as_int32_array(secs.as_ref())?;
321 let subsecs = date_part(array, DatePart::Nanosecond)?;
322 let subsecs = as_int32_array(subsecs.as_ref())?;
323
324 if subsecs.null_count() == 0 {
326 let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
327 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
328 })?;
329 Ok(Arc::new(r))
330 } else {
331 let r: Float64Array = secs
334 .iter()
335 .zip(subsecs)
336 .map(|(secs, subsecs)| {
337 secs.map(|secs| {
338 let subsecs = subsecs.unwrap_or(0);
339 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64))
340 * sf
341 })
342 })
343 .collect();
344 Ok(Arc::new(r))
345 }
346}
347
348fn epoch(array: &dyn Array) -> Result<ArrayRef> {
349 const SECONDS_IN_A_DAY: f64 = 86400_f64;
350
351 let f: Float64Array = match array.data_type() {
352 Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64),
353 Timestamp(Millisecond, _) => {
354 as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
355 }
356 Timestamp(Microsecond, _) => {
357 as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
358 }
359 Timestamp(Nanosecond, _) => {
360 as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
361 }
362 Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY),
363 Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64),
364 Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64),
365 Time32(Millisecond) => {
366 as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
367 }
368 Time64(Microsecond) => {
369 as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
370 }
371 Time64(Nanosecond) => {
372 as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
373 }
374 Interval(_) | Duration(_) => return seconds(array, Second),
375 d => return exec_err!("Cannot convert {d:?} to epoch"),
376 };
377 Ok(Arc::new(f))
378}