1use std::any::Any;
19use std::str::FromStr;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{Array, ArrayRef, Float64Array, Int32Array};
24use arrow::compute::kernels::cast_utils::IntervalUnit;
25use arrow::compute::{DatePart, binary, date_part};
26use arrow::datatypes::DataType::{
27 Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
28};
29use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
30use arrow::datatypes::{
31 DataType, Date32Type, Date64Type, Field, FieldRef, IntervalUnit as ArrowIntervalUnit,
32 TimeUnit,
33};
34use chrono::{Datelike, NaiveDate, TimeZone, Utc};
35use datafusion_common::types::{NativeType, logical_date};
36
37use datafusion_common::{
38 Result, ScalarValue,
39 cast::{
40 as_date32_array, as_date64_array, as_int32_array, as_interval_dt_array,
41 as_interval_mdn_array, as_interval_ym_array, as_time32_millisecond_array,
42 as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
43 as_timestamp_microsecond_array, as_timestamp_millisecond_array,
44 as_timestamp_nanosecond_array, as_timestamp_second_array,
45 },
46 exec_err, internal_err, not_impl_err,
47 types::logical_string,
48 utils::take_function_args,
49};
50use datafusion_expr::preimage::PreimageResult;
51use datafusion_expr::simplify::SimplifyContext;
52use datafusion_expr::{
53 ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature,
54 TypeSignature, Volatility, interval_arithmetic,
55};
56use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
57use datafusion_macros::user_doc;
58
59#[user_doc(
60 doc_section(label = "Time and Date Functions"),
61 description = "Returns the specified part of the date as an integer.",
62 syntax_example = "date_part(part, expression)",
63 alternative_syntax = "extract(field FROM source)",
64 argument(
65 name = "part",
66 description = r#"Part of the date to return. The following date parts are supported:
67
68 - year
69 - isoyear (ISO 8601 week-numbering year)
70 - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
71 - month
72 - week (week of the year)
73 - day (day of the month)
74 - hour
75 - minute
76 - second
77 - millisecond
78 - microsecond
79 - nanosecond
80 - dow (day of the week where Sunday is 0)
81 - doy (day of the year)
82 - epoch (seconds since Unix epoch for timestamps/dates, total seconds for intervals)
83 - isodow (day of the week where Monday is 0)
84"#
85 ),
86 argument(
87 name = "expression",
88 description = "Time expression to operate on. Can be a constant, column, or function."
89 )
90)]
91#[derive(Debug, PartialEq, Eq, Hash)]
92pub struct DatePartFunc {
93 signature: Signature,
94 aliases: Vec<String>,
95}
96
97impl Default for DatePartFunc {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl DatePartFunc {
104 pub fn new() -> Self {
105 Self {
106 signature: Signature::one_of(
107 vec![
108 TypeSignature::Coercible(vec![
109 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
110 Coercion::new_implicit(
111 TypeSignatureClass::Timestamp,
112 vec![TypeSignatureClass::Native(logical_string())],
114 NativeType::Timestamp(Nanosecond, None),
115 ),
116 ]),
117 TypeSignature::Coercible(vec![
118 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
119 Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
120 ]),
121 TypeSignature::Coercible(vec![
122 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
123 Coercion::new_exact(TypeSignatureClass::Time),
124 ]),
125 TypeSignature::Coercible(vec![
126 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
127 Coercion::new_exact(TypeSignatureClass::Interval),
128 ]),
129 TypeSignature::Coercible(vec![
130 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
131 Coercion::new_exact(TypeSignatureClass::Duration),
132 ]),
133 ],
134 Volatility::Immutable,
135 ),
136 aliases: vec![String::from("datepart")],
137 }
138 }
139}
140
141impl ScalarUDFImpl for DatePartFunc {
142 fn as_any(&self) -> &dyn Any {
143 self
144 }
145
146 fn name(&self) -> &str {
147 "date_part"
148 }
149
150 fn signature(&self) -> &Signature {
151 &self.signature
152 }
153
154 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
155 internal_err!("return_field_from_args should be called instead")
156 }
157
158 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
159 let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
160 let nullable = args.arg_fields[1].is_nullable();
161
162 field
163 .and_then(|sv| {
164 sv.try_as_str()
165 .flatten()
166 .filter(|s| !s.is_empty())
167 .map(|part| {
168 if is_epoch(part) {
169 Field::new(self.name(), DataType::Float64, nullable)
170 } else {
171 Field::new(self.name(), DataType::Int32, nullable)
172 }
173 })
174 })
175 .map(Arc::new)
176 .map_or_else(
177 || exec_err!("{} requires non-empty constant string", self.name()),
178 Ok,
179 )
180 }
181
182 fn invoke_with_args(
183 &self,
184 args: datafusion_expr::ScalarFunctionArgs,
185 ) -> Result<ColumnarValue> {
186 let args = args.args;
187 let [part, array] = take_function_args(self.name(), args)?;
188
189 let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
190 v
191 } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part {
192 v
193 } else {
194 return exec_err!(
195 "First argument of `DATE_PART` must be non-null scalar Utf8"
196 );
197 };
198
199 let is_scalar = matches!(array, ColumnarValue::Scalar(_));
200
201 let array = match array {
202 ColumnarValue::Array(array) => Arc::clone(&array),
203 ColumnarValue::Scalar(scalar) => scalar.to_array()?,
204 };
205
206 let part_trim = part_normalization(&part);
207
208 let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
211 match interval_unit {
212 IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
213 IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
214 IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
215 IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
216 IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?,
217 IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?,
218 IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?,
219 IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
220 IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
221 IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?,
222 _ => return exec_err!("Date part '{part}' not supported"),
224 }
225 } else {
226 match part_trim.to_lowercase().as_str() {
228 "isoyear" => date_part(array.as_ref(), DatePart::YearISO)?,
229 "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
230 "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
231 "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
232 "isodow" => date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?,
233 "epoch" => epoch(array.as_ref())?,
234 _ => return exec_err!("Date part '{part}' not supported"),
235 }
236 };
237
238 Ok(if is_scalar {
239 ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
240 } else {
241 ColumnarValue::Array(arr)
242 })
243 }
244
245 fn preimage(
250 &self,
251 args: &[Expr],
252 lit_expr: &Expr,
253 info: &SimplifyContext,
254 ) -> Result<PreimageResult> {
255 let [part, col_expr] = take_function_args(self.name(), args)?;
256
257 let interval_unit = part
259 .as_literal()
260 .and_then(|sv| sv.try_as_str().flatten())
261 .map(part_normalization)
262 .and_then(|s| IntervalUnit::from_str(s).ok());
263
264 match interval_unit {
266 Some(IntervalUnit::Year) => (),
267 _ => return Ok(PreimageResult::None),
268 }
269
270 let Some(argument_literal) = lit_expr.as_literal() else {
272 return Ok(PreimageResult::None);
273 };
274
275 let year = match argument_literal {
277 ScalarValue::Int32(Some(y)) => *y,
278 _ => return Ok(PreimageResult::None),
279 };
280
281 let target_type = match info.get_data_type(col_expr)? {
283 Date32 | Date64 | Timestamp(_, _) => &info.get_data_type(col_expr)?,
284 _ => return Ok(PreimageResult::None),
285 };
286
287 let Some(start_time) = NaiveDate::from_ymd_opt(year, 1, 1) else {
289 return Ok(PreimageResult::None);
290 };
291 let Some(end_time) = start_time.with_year(year + 1) else {
292 return Ok(PreimageResult::None);
293 };
294
295 let (Some(lower), Some(upper)) = (
297 date_to_scalar(start_time, target_type),
298 date_to_scalar(end_time, target_type),
299 ) else {
300 return Ok(PreimageResult::None);
301 };
302 let interval = Box::new(interval_arithmetic::Interval::try_new(lower, upper)?);
303
304 Ok(PreimageResult::Range {
305 expr: col_expr.clone(),
306 interval,
307 })
308 }
309
310 fn aliases(&self) -> &[String] {
311 &self.aliases
312 }
313
314 fn documentation(&self) -> Option<&Documentation> {
315 self.doc()
316 }
317}
318
319fn is_epoch(part: &str) -> bool {
320 let part = part_normalization(part);
321 matches!(part.to_lowercase().as_str(), "epoch")
322}
323
324fn date_to_scalar(date: NaiveDate, target_type: &DataType) -> Option<ScalarValue> {
325 Some(match target_type {
326 Date32 => ScalarValue::Date32(Some(Date32Type::from_naive_date(date))),
327 Date64 => ScalarValue::Date64(Some(Date64Type::from_naive_date(date))),
328
329 Timestamp(unit, tz_opt) => {
330 let naive_midnight = date.and_hms_opt(0, 0, 0)?;
331
332 let utc_dt = if let Some(tz_str) = tz_opt {
333 let tz: Tz = tz_str.parse().ok()?;
334
335 let local = tz.from_local_datetime(&naive_midnight);
336
337 let local_dt = match local {
338 chrono::offset::LocalResult::Single(dt) => dt,
339 chrono::offset::LocalResult::Ambiguous(dt1, _dt2) => dt1,
340 chrono::offset::LocalResult::None => local.earliest()?,
341 };
342
343 local_dt.with_timezone(&Utc)
344 } else {
345 Utc.from_utc_datetime(&naive_midnight)
346 };
347
348 match unit {
349 Second => {
350 ScalarValue::TimestampSecond(Some(utc_dt.timestamp()), tz_opt.clone())
351 }
352 Millisecond => ScalarValue::TimestampMillisecond(
353 Some(utc_dt.timestamp_millis()),
354 tz_opt.clone(),
355 ),
356 Microsecond => ScalarValue::TimestampMicrosecond(
357 Some(utc_dt.timestamp_micros()),
358 tz_opt.clone(),
359 ),
360 Nanosecond => ScalarValue::TimestampNanosecond(
361 Some(utc_dt.timestamp_nanos_opt()?),
362 tz_opt.clone(),
363 ),
364 }
365 }
366 _ => return None,
367 })
368}
369
370fn part_normalization(part: &str) -> &str {
372 part.strip_prefix(|c| c == '\'' || c == '\"')
373 .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
374 .unwrap_or(part)
375}
376
377fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
381 if unit == Nanosecond {
384 return not_impl_err!("Date part {unit:?} not supported");
385 }
386
387 let conversion_factor = match unit {
388 Second => 1_000_000_000,
389 Millisecond => 1_000_000,
390 Microsecond => 1_000,
391 Nanosecond => 1,
392 };
393
394 let second_factor = match unit {
395 Second => 1,
396 Millisecond => 1_000,
397 Microsecond => 1_000_000,
398 Nanosecond => 1_000_000_000,
399 };
400
401 let secs = date_part(array, DatePart::Second)?;
402 let secs = as_int32_array(secs.as_ref())?;
404 let subsecs = date_part(array, DatePart::Nanosecond)?;
405 let subsecs = as_int32_array(subsecs.as_ref())?;
406
407 if subsecs.null_count() == 0 {
409 let r: Int32Array = binary(secs, subsecs, |secs, subsecs| {
410 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
411 })?;
412 Ok(Arc::new(r))
413 } else {
414 let r: Int32Array = secs
417 .iter()
418 .zip(subsecs)
419 .map(|(secs, subsecs)| {
420 secs.map(|secs| {
421 let subsecs = subsecs.unwrap_or(0);
422 secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
423 })
424 })
425 .collect();
426 Ok(Arc::new(r))
427 }
428}
429
430fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
436 let sf = match unit {
437 Second => 1_f64,
438 Millisecond => 1_000_f64,
439 Microsecond => 1_000_000_f64,
440 Nanosecond => 1_000_000_000_f64,
441 };
442 let secs = date_part(array, DatePart::Second)?;
443 let secs = as_int32_array(secs.as_ref())?;
445 let subsecs = date_part(array, DatePart::Nanosecond)?;
446 let subsecs = as_int32_array(subsecs.as_ref())?;
447
448 if subsecs.null_count() == 0 {
450 let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
451 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
452 })?;
453 Ok(Arc::new(r))
454 } else {
455 let r: Float64Array = secs
458 .iter()
459 .zip(subsecs)
460 .map(|(secs, subsecs)| {
461 secs.map(|secs| {
462 let subsecs = subsecs.unwrap_or(0);
463 (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64))
464 * sf
465 })
466 })
467 .collect();
468 Ok(Arc::new(r))
469 }
470}
471
472fn epoch(array: &dyn Array) -> Result<ArrayRef> {
473 const SECONDS_IN_A_DAY: f64 = 86400_f64;
474 const DAYS_PER_MONTH: f64 = 30_f64;
479
480 let f: Float64Array = match array.data_type() {
481 Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64),
482 Timestamp(Millisecond, _) => {
483 as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
484 }
485 Timestamp(Microsecond, _) => {
486 as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
487 }
488 Timestamp(Nanosecond, _) => {
489 as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
490 }
491 Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY),
492 Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64),
493 Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64),
494 Time32(Millisecond) => {
495 as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
496 }
497 Time64(Microsecond) => {
498 as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
499 }
500 Time64(Nanosecond) => {
501 as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
502 }
503 Interval(ArrowIntervalUnit::YearMonth) => as_interval_ym_array(array)?
504 .unary(|x| x as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY),
505 Interval(ArrowIntervalUnit::DayTime) => as_interval_dt_array(array)?.unary(|x| {
506 x.days as f64 * SECONDS_IN_A_DAY + x.milliseconds as f64 / 1_000_f64
507 }),
508 Interval(ArrowIntervalUnit::MonthDayNano) => {
509 as_interval_mdn_array(array)?.unary(|x| {
510 x.months as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY
511 + x.days as f64 * SECONDS_IN_A_DAY
512 + x.nanoseconds as f64 / 1_000_000_000_f64
513 })
514 }
515 Duration(_) => return seconds(array, Second),
516 d => return exec_err!("Cannot convert {d:?} to epoch"),
517 };
518 Ok(Arc::new(f))
519}