1use std::str::FromStr;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, Int64Array};
23use arrow::compute::kernels::cast_utils::IntervalUnit;
24use arrow::compute::{DatePart, binary, date_part};
25use arrow::datatypes::DataType::{
26 Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
27};
28use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
29use arrow::datatypes::{
30 ArrowTimestampType, DataType, Date32Type, Date64Type, Field, FieldRef,
31 IntervalUnit as ArrowIntervalUnit, TimeUnit, TimestampMicrosecondType,
32 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
33};
34use chrono::{Datelike, NaiveDate};
35use datafusion_common::types::{NativeType, logical_date};
36
37use datafusion_common::{
38 Result, ScalarValue,
39 cast::{
40 as_date32_array, as_date64_array, as_int32_array, as_interval_dt_array,
41 as_interval_mdn_array, as_interval_ym_array, as_time32_millisecond_array,
42 as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
43 as_timestamp_microsecond_array, as_timestamp_millisecond_array,
44 as_timestamp_nanosecond_array, as_timestamp_second_array,
45 },
46 exec_err, internal_err, not_impl_err,
47 types::logical_string,
48 utils::take_function_args,
49};
50use datafusion_expr::preimage::PreimageResult;
51use datafusion_expr::simplify::SimplifyContext;
52use datafusion_expr::{
53 ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
54 ScalarUDFImpl, Signature, TypeSignature, Volatility, interval_arithmetic,
55};
56use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
57use datafusion_macros::user_doc;
58
59#[user_doc(
60 doc_section(label = "Time and Date Functions"),
61 description = "Returns the specified part of the date as an integer.",
62 syntax_example = "date_part(part, expression)",
63 alternative_syntax = "extract(field FROM source)",
64 argument(
65 name = "part",
66 description = r#"Part of the date to return. The following date parts are supported:
67
68 - year
69 - isoyear (ISO 8601 week-numbering year)
70 - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
71 - month
72 - week (week of the year)
73 - day (day of the month)
74 - hour
75 - minute
76 - second
77 - millisecond
78 - microsecond
79 - nanosecond
80 - dow (day of the week where Sunday is 0)
81 - doy (day of the year)
82 - epoch (seconds since Unix epoch for timestamps/dates, total seconds for intervals)
83 - isodow (ISO 8601 day of the week where Monday is 1 and Sunday is 7)
84"#
85 ),
86 argument(
87 name = "expression",
88 description = "Time expression to operate on. Can be a constant, column, or function."
89 ),
90 sql_example = r#"```sql
91> SELECT date_part('year', '2024-05-01T00:00:00');
92+-----------------------------------------------------+
93| date_part(Utf8("year"),Utf8("2024-05-01T00:00:00")) |
94+-----------------------------------------------------+
95| 2024 |
96+-----------------------------------------------------+
97> SELECT extract(day FROM timestamp '2024-05-01T00:00:00');
98+----------------------------------------------------+
99| date_part(Utf8("DAY"),Utf8("2024-05-01T00:00:00")) |
100+----------------------------------------------------+
101| 1 |
102+----------------------------------------------------+
103```"#
104)]
105#[derive(Debug, PartialEq, Eq, Hash)]
106pub struct DatePartFunc {
107 signature: Signature,
108 aliases: Vec<String>,
109}
110
111impl Default for DatePartFunc {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117impl DatePartFunc {
118 pub fn new() -> Self {
119 Self {
120 signature: Signature::one_of(
121 vec![
122 TypeSignature::Coercible(vec![
123 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
124 Coercion::new_implicit(
125 TypeSignatureClass::Timestamp,
126 vec![TypeSignatureClass::Native(logical_string())],
128 NativeType::Timestamp(Nanosecond, None),
129 ),
130 ]),
131 TypeSignature::Coercible(vec![
132 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
133 Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
134 ]),
135 TypeSignature::Coercible(vec![
136 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
137 Coercion::new_exact(TypeSignatureClass::Time),
138 ]),
139 TypeSignature::Coercible(vec![
140 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
141 Coercion::new_exact(TypeSignatureClass::Interval),
142 ]),
143 TypeSignature::Coercible(vec![
144 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
145 Coercion::new_exact(TypeSignatureClass::Duration),
146 ]),
147 ],
148 Volatility::Immutable,
149 ),
150 aliases: vec![String::from("datepart")],
151 }
152 }
153}
154
155impl ScalarUDFImpl for DatePartFunc {
156 fn name(&self) -> &str {
157 "date_part"
158 }
159
160 fn signature(&self) -> &Signature {
161 &self.signature
162 }
163
164 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
165 internal_err!("return_field_from_args should be called instead")
166 }
167
168 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
169 let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
170 let nullable = args.arg_fields[1].is_nullable();
171
172 field
173 .and_then(|sv| {
174 sv.try_as_str()
175 .flatten()
176 .filter(|s| !s.is_empty())
177 .map(|part| {
178 if is_epoch(part) {
179 Field::new(self.name(), DataType::Float64, nullable)
180 } else if is_nanosecond(part) {
181 Field::new(self.name(), DataType::Int64, nullable)
183 } else {
184 Field::new(self.name(), DataType::Int32, nullable)
185 }
186 })
187 })
188 .map(Arc::new)
189 .map_or_else(
190 || exec_err!("{} requires non-empty constant string", self.name()),
191 Ok,
192 )
193 }
194
195 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
196 let args = args.args;
197 let [part, array] = take_function_args(self.name(), args)?;
198
199 let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
200 v
201 } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part {
202 v
203 } else {
204 return exec_err!(
205 "First argument of `DATE_PART` must be non-null scalar Utf8"
206 );
207 };
208
209 let is_scalar = matches!(array, ColumnarValue::Scalar(_));
210
211 let array = match array {
212 ColumnarValue::Array(array) => Arc::clone(&array),
213 ColumnarValue::Scalar(scalar) => scalar.to_array()?,
214 };
215
216 let part_trim = part_normalization(&part);
217
218 let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
221 match interval_unit {
222 IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
223 IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
224 IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
225 IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
226 IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?,
227 IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?,
228 IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?,
229 IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
230 IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
231 IntervalUnit::Nanosecond => seconds_ns(array.as_ref())?,
232 _ => return exec_err!("Date part '{part}' not supported"),
234 }
235 } else {
236 match part_trim.to_lowercase().as_str() {
238 "isoyear" => date_part(array.as_ref(), DatePart::YearISO)?,
239 "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
240 "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
241 "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
242 "isodow" => {
243 let zero_based =
249 date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?;
250 let int_arr = as_int32_array(&zero_based)?;
251 let one_based: Int32Array = int_arr.unary(|v| v + 1);
252 Arc::new(one_based) as ArrayRef
253 }
254 "epoch" => epoch(array.as_ref())?,
255 _ => return exec_err!("Date part '{part}' not supported"),
256 }
257 };
258
259 Ok(if is_scalar {
260 ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
261 } else {
262 ColumnarValue::Array(arr)
263 })
264 }
265
266 fn preimage(
271 &self,
272 args: &[Expr],
273 lit_expr: &Expr,
274 info: &SimplifyContext,
275 ) -> Result<PreimageResult> {
276 let [part, col_expr] = take_function_args(self.name(), args)?;
277
278 let interval_unit = part
280 .as_literal()
281 .and_then(|sv| sv.try_as_str().flatten())
282 .map(part_normalization)
283 .and_then(|s| IntervalUnit::from_str(s).ok());
284
285 match interval_unit {
287 Some(IntervalUnit::Year) => (),
288 _ => return Ok(PreimageResult::None),
289 }
290
291 let Some(argument_literal) = lit_expr.as_literal() else {
293 return Ok(PreimageResult::None);
294 };
295
296 let year = match argument_literal {
298 ScalarValue::Int32(Some(y)) => *y,
299 _ => return Ok(PreimageResult::None),
300 };
301
302 let target_type = match info.get_data_type(col_expr)? {
304 Date32 | Date64 | Timestamp(_, _) => &info.get_data_type(col_expr)?,
305 _ => return Ok(PreimageResult::None),
306 };
307
308 let Some(start_time) = NaiveDate::from_ymd_opt(year, 1, 1) else {
310 return Ok(PreimageResult::None);
311 };
312 let Some(end_time) = start_time.with_year(year + 1) else {
313 return Ok(PreimageResult::None);
314 };
315
316 let (Some(lower), Some(upper)) = (
318 date_to_scalar(start_time, target_type),
319 date_to_scalar(end_time, target_type),
320 ) else {
321 return Ok(PreimageResult::None);
322 };
323 let interval = Box::new(interval_arithmetic::Interval::try_new(lower, upper)?);
324
325 Ok(PreimageResult::Range {
326 expr: col_expr.clone(),
327 interval,
328 })
329 }
330
331 fn aliases(&self) -> &[String] {
332 &self.aliases
333 }
334
335 fn documentation(&self) -> Option<&Documentation> {
336 self.doc()
337 }
338}
339
340fn is_epoch(part: &str) -> bool {
341 let part = part_normalization(part);
342 matches!(part.to_lowercase().as_str(), "epoch")
343}
344
345fn is_nanosecond(part: &str) -> bool {
346 IntervalUnit::from_str(part_normalization(part))
347 .map(|p| matches!(p, IntervalUnit::Nanosecond))
348 .unwrap_or(false)
349}
350
351fn date_to_scalar(date: NaiveDate, target_type: &DataType) -> Option<ScalarValue> {
352 Some(match target_type {
353 Date32 => ScalarValue::Date32(Some(Date32Type::from_naive_date(date))),
354 Date64 => ScalarValue::Date64(Some(Date64Type::from_naive_date(date))),
355
356 Timestamp(unit, tz_opt) => {
357 let naive_midnight = date.and_hms_opt(0, 0, 0)?;
358 let tz: Option<Tz> = tz_opt.clone().and_then(|s| s.parse().ok());
359
360 match unit {
361 Second => ScalarValue::TimestampSecond(
362 TimestampSecondType::from_naive_datetime(naive_midnight, tz.as_ref()),
363 tz_opt.clone(),
364 ),
365 Millisecond => ScalarValue::TimestampMillisecond(
366 TimestampMillisecondType::from_naive_datetime(
367 naive_midnight,
368 tz.as_ref(),
369 ),
370 tz_opt.clone(),
371 ),
372 Microsecond => ScalarValue::TimestampMicrosecond(
373 TimestampMicrosecondType::from_naive_datetime(
374 naive_midnight,
375 tz.as_ref(),
376 ),
377 tz_opt.clone(),
378 ),
379 Nanosecond => ScalarValue::TimestampNanosecond(
380 TimestampNanosecondType::from_naive_datetime(
381 naive_midnight,
382 tz.as_ref(),
383 ),
384 tz_opt.clone(),
385 ),
386 }
387 }
388 _ => return None,
389 })
390}
391
392fn part_normalization(part: &str) -> &str {
394 part.strip_prefix(|c| c == '\'' || c == '\"')
395 .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
396 .unwrap_or(part)
397}
398
399fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
403 if unit == Nanosecond {
406 return not_impl_err!("Date part {unit:?} not supported");
407 }
408
409 let conversion_factor = match unit {
410 Second => 1_000_000_000,
411 Millisecond => 1_000_000,
412 Microsecond => 1_000,
413 Nanosecond => 1,
414 };
415
416 let second_factor = match unit {
417 Second => 1,
418 Millisecond => 1_000,
419 Microsecond => 1_000_000,
420 Nanosecond => 1_000_000_000,
421 };
422
423 let secs = date_part(array, DatePart::Second)?;
424 let secs = as_int32_array(secs.as_ref())?;
426 let subsecs = date_part(array, DatePart::Nanosecond)?;
427 let subsecs = as_int32_array(subsecs.as_ref())?;
428
429 if subsecs.null_count() == 0 {
431 let r: Int32Array = binary(secs, subsecs, |secs, subsecs| {
432 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
433 })?;
434 Ok(Arc::new(r))
435 } else {
436 let r: Int32Array = secs
439 .iter()
440 .zip(subsecs)
441 .map(|(secs, subsecs)| {
442 secs.map(|secs| {
443 let subsecs = subsecs.unwrap_or(0);
444 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
445 })
446 })
447 .collect();
448 Ok(Arc::new(r))
449 }
450}
451
452fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
458 let sf = match unit {
459 Second => 1_f64,
460 Millisecond => 1_000_f64,
461 Microsecond => 1_000_000_f64,
462 Nanosecond => 1_000_000_000_f64,
463 };
464 let secs = date_part(array, DatePart::Second)?;
465 let secs = as_int32_array(secs.as_ref())?;
467 let subsecs = date_part(array, DatePart::Nanosecond)?;
468 let subsecs = as_int32_array(subsecs.as_ref())?;
469
470 if subsecs.null_count() == 0 {
472 let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
473 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
474 })?;
475 Ok(Arc::new(r))
476 } else {
477 let r: Float64Array = secs
480 .iter()
481 .zip(subsecs)
482 .map(|(secs, subsecs)| {
483 secs.map(|secs| {
484 let subsecs = subsecs.unwrap_or(0);
485 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64))
486 * sf
487 })
488 })
489 .collect();
490 Ok(Arc::new(r))
491 }
492}
493
494fn epoch(array: &dyn Array) -> Result<ArrayRef> {
495 const SECONDS_IN_A_DAY: f64 = 86400_f64;
496 const DAYS_PER_MONTH: f64 = 30_f64;
501
502 let f: Float64Array = match array.data_type() {
503 Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64),
504 Timestamp(Millisecond, _) => {
505 as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
506 }
507 Timestamp(Microsecond, _) => {
508 as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
509 }
510 Timestamp(Nanosecond, _) => {
511 as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
512 }
513 Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY),
514 Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64),
515 Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64),
516 Time32(Millisecond) => {
517 as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
518 }
519 Time64(Microsecond) => {
520 as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
521 }
522 Time64(Nanosecond) => {
523 as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
524 }
525 Interval(ArrowIntervalUnit::YearMonth) => as_interval_ym_array(array)?
526 .unary(|x| x as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY),
527 Interval(ArrowIntervalUnit::DayTime) => as_interval_dt_array(array)?.unary(|x| {
528 x.days as f64 * SECONDS_IN_A_DAY + x.milliseconds as f64 / 1_000_f64
529 }),
530 Interval(ArrowIntervalUnit::MonthDayNano) => {
531 as_interval_mdn_array(array)?.unary(|x| {
532 x.months as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY
533 + x.days as f64 * SECONDS_IN_A_DAY
534 + x.nanoseconds as f64 / 1_000_000_000_f64
535 })
536 }
537 Duration(_) => return seconds(array, Second),
538 d => return exec_err!("Cannot convert {d:?} to epoch"),
539 };
540 Ok(Arc::new(f))
541}
542
543fn seconds_ns(array: &dyn Array) -> Result<ArrayRef> {
550 let secs = date_part(array, DatePart::Second)?;
551 let secs = as_int32_array(secs.as_ref())?;
553 let subsecs = date_part(array, DatePart::Nanosecond)?;
554 let subsecs = as_int32_array(subsecs.as_ref())?;
555
556 if subsecs.null_count() == 0 {
558 let r: Int64Array = binary(secs, subsecs, |secs, subsecs| {
559 (secs as i64) * 1_000_000_000 + (subsecs as i64)
560 })?;
561 Ok(Arc::new(r))
562 } else {
563 let r: Int64Array = secs
566 .iter()
567 .zip(subsecs)
568 .map(|(secs, subsecs)| {
569 secs.map(|secs| {
570 let subsecs = subsecs.unwrap_or(0);
571 (secs as i64) * 1_000_000_000 + (subsecs as i64)
572 })
573 })
574 .collect();
575 Ok(Arc::new(r))
576 }
577}