Skip to main content

datafusion_functions/datetime/
date_part.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::str::FromStr;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{Array, ArrayRef, Float64Array, Int32Array, Int64Array};
23use arrow::compute::kernels::cast_utils::IntervalUnit;
24use arrow::compute::{DatePart, binary, date_part};
25use arrow::datatypes::DataType::{
26    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
27};
28use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
29use arrow::datatypes::{
30    ArrowTimestampType, DataType, Date32Type, Date64Type, Field, FieldRef,
31    IntervalUnit as ArrowIntervalUnit, TimeUnit, TimestampMicrosecondType,
32    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
33};
34use chrono::{Datelike, NaiveDate};
35use datafusion_common::types::{NativeType, logical_date};
36
37use datafusion_common::{
38    Result, ScalarValue,
39    cast::{
40        as_date32_array, as_date64_array, as_int32_array, as_interval_dt_array,
41        as_interval_mdn_array, as_interval_ym_array, as_time32_millisecond_array,
42        as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
43        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
44        as_timestamp_nanosecond_array, as_timestamp_second_array,
45    },
46    exec_err, internal_err, not_impl_err,
47    types::logical_string,
48    utils::take_function_args,
49};
50use datafusion_expr::preimage::PreimageResult;
51use datafusion_expr::simplify::SimplifyContext;
52use datafusion_expr::{
53    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarFunctionArgs,
54    ScalarUDFImpl, Signature, TypeSignature, Volatility, interval_arithmetic,
55};
56use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
57use datafusion_macros::user_doc;
58
59#[user_doc(
60    doc_section(label = "Time and Date Functions"),
61    description = "Returns the specified part of the date as an integer.",
62    syntax_example = "date_part(part, expression)",
63    alternative_syntax = "extract(field FROM source)",
64    argument(
65        name = "part",
66        description = r#"Part of the date to return. The following date parts are supported:
67
68    - year
69    - isoyear (ISO 8601 week-numbering year)
70    - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
71    - month
72    - week (week of the year)
73    - day (day of the month)
74    - hour
75    - minute
76    - second
77    - millisecond
78    - microsecond
79    - nanosecond
80    - dow (day of the week where Sunday is 0)
81    - doy (day of the year)
82    - epoch (seconds since Unix epoch for timestamps/dates, total seconds for intervals)
83    - isodow (ISO 8601 day of the week where Monday is 1 and Sunday is 7)
84"#
85    ),
86    argument(
87        name = "expression",
88        description = "Time expression to operate on. Can be a constant, column, or function."
89    ),
90    sql_example = r#"```sql
91> SELECT date_part('year', '2024-05-01T00:00:00');
92+-----------------------------------------------------+
93| date_part(Utf8("year"),Utf8("2024-05-01T00:00:00")) |
94+-----------------------------------------------------+
95| 2024                                                |
96+-----------------------------------------------------+
97> SELECT extract(day FROM timestamp '2024-05-01T00:00:00');
98+----------------------------------------------------+
99| date_part(Utf8("DAY"),Utf8("2024-05-01T00:00:00")) |
100+----------------------------------------------------+
101| 1                                                  |
102+----------------------------------------------------+
103```"#
104)]
105#[derive(Debug, PartialEq, Eq, Hash)]
106pub struct DatePartFunc {
107    signature: Signature,
108    aliases: Vec<String>,
109}
110
111impl Default for DatePartFunc {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117impl DatePartFunc {
118    pub fn new() -> Self {
119        Self {
120            signature: Signature::one_of(
121                vec![
122                    TypeSignature::Coercible(vec![
123                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
124                        Coercion::new_implicit(
125                            TypeSignatureClass::Timestamp,
126                            // Not consistent with Postgres and DuckDB but to avoid regression we implicit cast string to timestamp
127                            vec![TypeSignatureClass::Native(logical_string())],
128                            NativeType::Timestamp(Nanosecond, None),
129                        ),
130                    ]),
131                    TypeSignature::Coercible(vec![
132                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
133                        Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
134                    ]),
135                    TypeSignature::Coercible(vec![
136                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
137                        Coercion::new_exact(TypeSignatureClass::Time),
138                    ]),
139                    TypeSignature::Coercible(vec![
140                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
141                        Coercion::new_exact(TypeSignatureClass::Interval),
142                    ]),
143                    TypeSignature::Coercible(vec![
144                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
145                        Coercion::new_exact(TypeSignatureClass::Duration),
146                    ]),
147                ],
148                Volatility::Immutable,
149            ),
150            aliases: vec![String::from("datepart")],
151        }
152    }
153}
154
155impl ScalarUDFImpl for DatePartFunc {
156    fn name(&self) -> &str {
157        "date_part"
158    }
159
160    fn signature(&self) -> &Signature {
161        &self.signature
162    }
163
164    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
165        internal_err!("return_field_from_args should be called instead")
166    }
167
168    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
169        let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
170        let nullable = args.arg_fields[1].is_nullable();
171
172        field
173            .and_then(|sv| {
174                sv.try_as_str()
175                    .flatten()
176                    .filter(|s| !s.is_empty())
177                    .map(|part| {
178                        if is_epoch(part) {
179                            Field::new(self.name(), DataType::Float64, nullable)
180                        } else if is_nanosecond(part) {
181                            // See notes on [seconds_ns] for rationale
182                            Field::new(self.name(), DataType::Int64, nullable)
183                        } else {
184                            Field::new(self.name(), DataType::Int32, nullable)
185                        }
186                    })
187            })
188            .map(Arc::new)
189            .map_or_else(
190                || exec_err!("{} requires non-empty constant string", self.name()),
191                Ok,
192            )
193    }
194
195    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
196        let args = args.args;
197        let [part, array] = take_function_args(self.name(), args)?;
198
199        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
200            v
201        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part {
202            v
203        } else {
204            return exec_err!(
205                "First argument of `DATE_PART` must be non-null scalar Utf8"
206            );
207        };
208
209        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
210
211        let array = match array {
212            ColumnarValue::Array(array) => Arc::clone(&array),
213            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
214        };
215
216        let part_trim = part_normalization(&part);
217
218        // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds")
219        // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
220        let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
221            match interval_unit {
222                IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
223                IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
224                IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
225                IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
226                IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?,
227                IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?,
228                IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?,
229                IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
230                IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
231                IntervalUnit::Nanosecond => seconds_ns(array.as_ref())?,
232                // century and decade are not supported by `DatePart`, although they are supported in postgres
233                _ => return exec_err!("Date part '{part}' not supported"),
234            }
235        } else {
236            // special cases that can be extracted (in postgres) but are not interval units
237            match part_trim.to_lowercase().as_str() {
238                "isoyear" => date_part(array.as_ref(), DatePart::YearISO)?,
239                "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
240                "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
241                "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
242                "isodow" => {
243                    // Postgres `isodow` is 1..=7 with Mon=1. Arrow's
244                    // `DayOfWeekMonday0` returns 0..=6 with Mon=0; shift by
245                    // +1 to match Postgres. TODO: switch to a future
246                    // `DatePart::DayOfWeekMonday1` upstream variant once it
247                    // exists, so this kernel-then-add becomes a single call.
248                    let zero_based =
249                        date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?;
250                    let int_arr = as_int32_array(&zero_based)?;
251                    let one_based: Int32Array = int_arr.unary(|v| v + 1);
252                    Arc::new(one_based) as ArrayRef
253                }
254                "epoch" => epoch(array.as_ref())?,
255                _ => return exec_err!("Date part '{part}' not supported"),
256            }
257        };
258
259        Ok(if is_scalar {
260            ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
261        } else {
262            ColumnarValue::Array(arr)
263        })
264    }
265
266    // Only casting the year is supported since pruning other IntervalUnit is not possible
267    // date_part(col, YEAR) = 2024 => col >= '2024-01-01' and col < '2025-01-01'
268    // But for anything less than YEAR simplifying is not possible without specifying the bigger interval
269    // date_part(col, MONTH) = 1 => col = '2023-01-01' or col = '2024-01-01' or ... or col = '3000-01-01'
270    fn preimage(
271        &self,
272        args: &[Expr],
273        lit_expr: &Expr,
274        info: &SimplifyContext,
275    ) -> Result<PreimageResult> {
276        let [part, col_expr] = take_function_args(self.name(), args)?;
277
278        // Get the interval unit from the part argument
279        let interval_unit = part
280            .as_literal()
281            .and_then(|sv| sv.try_as_str().flatten())
282            .map(part_normalization)
283            .and_then(|s| IntervalUnit::from_str(s).ok());
284
285        // only support extracting year
286        match interval_unit {
287            Some(IntervalUnit::Year) => (),
288            _ => return Ok(PreimageResult::None),
289        }
290
291        // Check if the argument is a literal (e.g. date_part(YEAR, col) = 2024)
292        let Some(argument_literal) = lit_expr.as_literal() else {
293            return Ok(PreimageResult::None);
294        };
295
296        // Extract i32 year from Scalar value
297        let year = match argument_literal {
298            ScalarValue::Int32(Some(y)) => *y,
299            _ => return Ok(PreimageResult::None),
300        };
301
302        // Can only extract year from Date32/64 and Timestamp column
303        let target_type = match info.get_data_type(col_expr)? {
304            Date32 | Date64 | Timestamp(_, _) => &info.get_data_type(col_expr)?,
305            _ => return Ok(PreimageResult::None),
306        };
307
308        // Compute the Interval bounds
309        let Some(start_time) = NaiveDate::from_ymd_opt(year, 1, 1) else {
310            return Ok(PreimageResult::None);
311        };
312        let Some(end_time) = start_time.with_year(year + 1) else {
313            return Ok(PreimageResult::None);
314        };
315
316        // Convert to ScalarValues
317        let (Some(lower), Some(upper)) = (
318            date_to_scalar(start_time, target_type),
319            date_to_scalar(end_time, target_type),
320        ) else {
321            return Ok(PreimageResult::None);
322        };
323        let interval = Box::new(interval_arithmetic::Interval::try_new(lower, upper)?);
324
325        Ok(PreimageResult::Range {
326            expr: col_expr.clone(),
327            interval,
328        })
329    }
330
331    fn aliases(&self) -> &[String] {
332        &self.aliases
333    }
334
335    fn documentation(&self) -> Option<&Documentation> {
336        self.doc()
337    }
338}
339
340fn is_epoch(part: &str) -> bool {
341    let part = part_normalization(part);
342    matches!(part.to_lowercase().as_str(), "epoch")
343}
344
345fn is_nanosecond(part: &str) -> bool {
346    IntervalUnit::from_str(part_normalization(part))
347        .map(|p| matches!(p, IntervalUnit::Nanosecond))
348        .unwrap_or(false)
349}
350
351fn date_to_scalar(date: NaiveDate, target_type: &DataType) -> Option<ScalarValue> {
352    Some(match target_type {
353        Date32 => ScalarValue::Date32(Some(Date32Type::from_naive_date(date))),
354        Date64 => ScalarValue::Date64(Some(Date64Type::from_naive_date(date))),
355
356        Timestamp(unit, tz_opt) => {
357            let naive_midnight = date.and_hms_opt(0, 0, 0)?;
358            let tz: Option<Tz> = tz_opt.clone().and_then(|s| s.parse().ok());
359
360            match unit {
361                Second => ScalarValue::TimestampSecond(
362                    TimestampSecondType::from_naive_datetime(naive_midnight, tz.as_ref()),
363                    tz_opt.clone(),
364                ),
365                Millisecond => ScalarValue::TimestampMillisecond(
366                    TimestampMillisecondType::from_naive_datetime(
367                        naive_midnight,
368                        tz.as_ref(),
369                    ),
370                    tz_opt.clone(),
371                ),
372                Microsecond => ScalarValue::TimestampMicrosecond(
373                    TimestampMicrosecondType::from_naive_datetime(
374                        naive_midnight,
375                        tz.as_ref(),
376                    ),
377                    tz_opt.clone(),
378                ),
379                Nanosecond => ScalarValue::TimestampNanosecond(
380                    TimestampNanosecondType::from_naive_datetime(
381                        naive_midnight,
382                        tz.as_ref(),
383                    ),
384                    tz_opt.clone(),
385                ),
386            }
387        }
388        _ => return None,
389    })
390}
391
392// Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error
393fn part_normalization(part: &str) -> &str {
394    part.strip_prefix(|c| c == '\'' || c == '\"')
395        .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
396        .unwrap_or(part)
397}
398
399/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
400/// result to a total number of seconds, milliseconds, microseconds or
401/// nanoseconds
402fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
403    // Nanosecond is neither supported in Postgres nor DuckDB, to avoid dealing
404    // with overflow and precision issue we don't support nanosecond
405    if unit == Nanosecond {
406        return not_impl_err!("Date part {unit:?} not supported");
407    }
408
409    let conversion_factor = match unit {
410        Second => 1_000_000_000,
411        Millisecond => 1_000_000,
412        Microsecond => 1_000,
413        Nanosecond => 1,
414    };
415
416    let second_factor = match unit {
417        Second => 1,
418        Millisecond => 1_000,
419        Microsecond => 1_000_000,
420        Nanosecond => 1_000_000_000,
421    };
422
423    let secs = date_part(array, DatePart::Second)?;
424    // This assumes array is primitive and not a dictionary
425    let secs = as_int32_array(secs.as_ref())?;
426    let subsecs = date_part(array, DatePart::Nanosecond)?;
427    let subsecs = as_int32_array(subsecs.as_ref())?;
428
429    // Special case where there are no nulls.
430    if subsecs.null_count() == 0 {
431        let r: Int32Array = binary(secs, subsecs, |secs, subsecs| {
432            secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
433        })?;
434        Ok(Arc::new(r))
435    } else {
436        // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case
437        // where the number of nanoseconds overflows.
438        let r: Int32Array = secs
439            .iter()
440            .zip(subsecs)
441            .map(|(secs, subsecs)| {
442                secs.map(|secs| {
443                    let subsecs = subsecs.unwrap_or(0);
444                    secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
445                })
446            })
447            .collect();
448        Ok(Arc::new(r))
449    }
450}
451
452/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
453/// result to a total number of seconds, milliseconds, microseconds or
454/// nanoseconds
455///
456/// Given epoch return f64, this is a duplicated function to optimize for f64 type
457fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
458    let sf = match unit {
459        Second => 1_f64,
460        Millisecond => 1_000_f64,
461        Microsecond => 1_000_000_f64,
462        Nanosecond => 1_000_000_000_f64,
463    };
464    let secs = date_part(array, DatePart::Second)?;
465    // This assumes array is primitive and not a dictionary
466    let secs = as_int32_array(secs.as_ref())?;
467    let subsecs = date_part(array, DatePart::Nanosecond)?;
468    let subsecs = as_int32_array(subsecs.as_ref())?;
469
470    // Special case where there are no nulls.
471    if subsecs.null_count() == 0 {
472        let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
473            (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
474        })?;
475        Ok(Arc::new(r))
476    } else {
477        // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case
478        // where the number of nanoseconds overflows.
479        let r: Float64Array = secs
480            .iter()
481            .zip(subsecs)
482            .map(|(secs, subsecs)| {
483                secs.map(|secs| {
484                    let subsecs = subsecs.unwrap_or(0);
485                    (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64))
486                        * sf
487                })
488            })
489            .collect();
490        Ok(Arc::new(r))
491    }
492}
493
494fn epoch(array: &dyn Array) -> Result<ArrayRef> {
495    const SECONDS_IN_A_DAY: f64 = 86400_f64;
496    // Note: Month-to-second conversion uses 30 days as an approximation.
497    // This matches PostgreSQL's behavior for interval epoch extraction,
498    // but does not represent exact calendar months (which vary 28-31 days).
499    // See: https://doxygen.postgresql.org/datatype_2timestamp_8h.html
500    const DAYS_PER_MONTH: f64 = 30_f64;
501
502    let f: Float64Array = match array.data_type() {
503        Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64),
504        Timestamp(Millisecond, _) => {
505            as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
506        }
507        Timestamp(Microsecond, _) => {
508            as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
509        }
510        Timestamp(Nanosecond, _) => {
511            as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
512        }
513        Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY),
514        Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64),
515        Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64),
516        Time32(Millisecond) => {
517            as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
518        }
519        Time64(Microsecond) => {
520            as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
521        }
522        Time64(Nanosecond) => {
523            as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
524        }
525        Interval(ArrowIntervalUnit::YearMonth) => as_interval_ym_array(array)?
526            .unary(|x| x as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY),
527        Interval(ArrowIntervalUnit::DayTime) => as_interval_dt_array(array)?.unary(|x| {
528            x.days as f64 * SECONDS_IN_A_DAY + x.milliseconds as f64 / 1_000_f64
529        }),
530        Interval(ArrowIntervalUnit::MonthDayNano) => {
531            as_interval_mdn_array(array)?.unary(|x| {
532                x.months as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY
533                    + x.days as f64 * SECONDS_IN_A_DAY
534                    + x.nanoseconds as f64 / 1_000_000_000_f64
535            })
536        }
537        Duration(_) => return seconds(array, Second),
538        d => return exec_err!("Cannot convert {d:?} to epoch"),
539    };
540    Ok(Arc::new(f))
541}
542
543/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
544/// result to a total number of nanoseconds as an Int64 array.
545///
546/// This returns an Int64 rather than Int32 because  there 1 billion
547/// `nanosecond`s in each second, so representing up to 60 seconds as
548/// nanoseconds can be values up to 60 billion, which does not fit in Int32.
549fn seconds_ns(array: &dyn Array) -> Result<ArrayRef> {
550    let secs = date_part(array, DatePart::Second)?;
551    // This assumes array is primitive and not a dictionary
552    let secs = as_int32_array(secs.as_ref())?;
553    let subsecs = date_part(array, DatePart::Nanosecond)?;
554    let subsecs = as_int32_array(subsecs.as_ref())?;
555
556    // Special case where there are no nulls.
557    if subsecs.null_count() == 0 {
558        let r: Int64Array = binary(secs, subsecs, |secs, subsecs| {
559            (secs as i64) * 1_000_000_000 + (subsecs as i64)
560        })?;
561        Ok(Arc::new(r))
562    } else {
563        // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case
564        // where the number of nanoseconds overflows.
565        let r: Int64Array = secs
566            .iter()
567            .zip(subsecs)
568            .map(|(secs, subsecs)| {
569                secs.map(|secs| {
570                    let subsecs = subsecs.unwrap_or(0);
571                    (secs as i64) * 1_000_000_000 + (subsecs as i64)
572                })
573            })
574            .collect();
575        Ok(Arc::new(r))
576    }
577}