Skip to main content

datafusion_functions/datetime/
date_part.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::str::FromStr;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{Array, ArrayRef, Float64Array, Int32Array};
24use arrow::compute::kernels::cast_utils::IntervalUnit;
25use arrow::compute::{DatePart, binary, date_part};
26use arrow::datatypes::DataType::{
27    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
28};
29use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
30use arrow::datatypes::{
31    DataType, Date32Type, Date64Type, Field, FieldRef, IntervalUnit as ArrowIntervalUnit,
32    TimeUnit,
33};
34use chrono::{Datelike, NaiveDate, TimeZone, Utc};
35use datafusion_common::types::{NativeType, logical_date};
36
37use datafusion_common::{
38    Result, ScalarValue,
39    cast::{
40        as_date32_array, as_date64_array, as_int32_array, as_interval_dt_array,
41        as_interval_mdn_array, as_interval_ym_array, as_time32_millisecond_array,
42        as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
43        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
44        as_timestamp_nanosecond_array, as_timestamp_second_array,
45    },
46    exec_err, internal_err, not_impl_err,
47    types::logical_string,
48    utils::take_function_args,
49};
50use datafusion_expr::preimage::PreimageResult;
51use datafusion_expr::simplify::SimplifyContext;
52use datafusion_expr::{
53    ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature,
54    TypeSignature, Volatility, interval_arithmetic,
55};
56use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
57use datafusion_macros::user_doc;
58
59#[user_doc(
60    doc_section(label = "Time and Date Functions"),
61    description = "Returns the specified part of the date as an integer.",
62    syntax_example = "date_part(part, expression)",
63    alternative_syntax = "extract(field FROM source)",
64    argument(
65        name = "part",
66        description = r#"Part of the date to return. The following date parts are supported:
67
68    - year
69    - isoyear (ISO 8601 week-numbering year)
70    - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
71    - month
72    - week (week of the year)
73    - day (day of the month)
74    - hour
75    - minute
76    - second
77    - millisecond
78    - microsecond
79    - nanosecond
80    - dow (day of the week where Sunday is 0)
81    - doy (day of the year)
82    - epoch (seconds since Unix epoch for timestamps/dates, total seconds for intervals)
83    - isodow (day of the week where Monday is 0)
84"#
85    ),
86    argument(
87        name = "expression",
88        description = "Time expression to operate on. Can be a constant, column, or function."
89    )
90)]
91#[derive(Debug, PartialEq, Eq, Hash)]
92pub struct DatePartFunc {
93    signature: Signature,
94    aliases: Vec<String>,
95}
96
97impl Default for DatePartFunc {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl DatePartFunc {
104    pub fn new() -> Self {
105        Self {
106            signature: Signature::one_of(
107                vec![
108                    TypeSignature::Coercible(vec![
109                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
110                        Coercion::new_implicit(
111                            TypeSignatureClass::Timestamp,
112                            // Not consistent with Postgres and DuckDB but to avoid regression we implicit cast string to timestamp
113                            vec![TypeSignatureClass::Native(logical_string())],
114                            NativeType::Timestamp(Nanosecond, None),
115                        ),
116                    ]),
117                    TypeSignature::Coercible(vec![
118                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
119                        Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
120                    ]),
121                    TypeSignature::Coercible(vec![
122                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
123                        Coercion::new_exact(TypeSignatureClass::Time),
124                    ]),
125                    TypeSignature::Coercible(vec![
126                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
127                        Coercion::new_exact(TypeSignatureClass::Interval),
128                    ]),
129                    TypeSignature::Coercible(vec![
130                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
131                        Coercion::new_exact(TypeSignatureClass::Duration),
132                    ]),
133                ],
134                Volatility::Immutable,
135            ),
136            aliases: vec![String::from("datepart")],
137        }
138    }
139}
140
141impl ScalarUDFImpl for DatePartFunc {
142    fn as_any(&self) -> &dyn Any {
143        self
144    }
145
146    fn name(&self) -> &str {
147        "date_part"
148    }
149
150    fn signature(&self) -> &Signature {
151        &self.signature
152    }
153
154    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
155        internal_err!("return_field_from_args should be called instead")
156    }
157
158    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
159        let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
160        let nullable = args.arg_fields[1].is_nullable();
161
162        field
163            .and_then(|sv| {
164                sv.try_as_str()
165                    .flatten()
166                    .filter(|s| !s.is_empty())
167                    .map(|part| {
168                        if is_epoch(part) {
169                            Field::new(self.name(), DataType::Float64, nullable)
170                        } else {
171                            Field::new(self.name(), DataType::Int32, nullable)
172                        }
173                    })
174            })
175            .map(Arc::new)
176            .map_or_else(
177                || exec_err!("{} requires non-empty constant string", self.name()),
178                Ok,
179            )
180    }
181
182    fn invoke_with_args(
183        &self,
184        args: datafusion_expr::ScalarFunctionArgs,
185    ) -> Result<ColumnarValue> {
186        let args = args.args;
187        let [part, array] = take_function_args(self.name(), args)?;
188
189        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
190            v
191        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part {
192            v
193        } else {
194            return exec_err!(
195                "First argument of `DATE_PART` must be non-null scalar Utf8"
196            );
197        };
198
199        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
200
201        let array = match array {
202            ColumnarValue::Array(array) => Arc::clone(&array),
203            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
204        };
205
206        let part_trim = part_normalization(&part);
207
208        // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds")
209        // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
210        let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
211            match interval_unit {
212                IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
213                IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
214                IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
215                IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
216                IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?,
217                IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?,
218                IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?,
219                IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
220                IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
221                IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?,
222                // century and decade are not supported by `DatePart`, although they are supported in postgres
223                _ => return exec_err!("Date part '{part}' not supported"),
224            }
225        } else {
226            // special cases that can be extracted (in postgres) but are not interval units
227            match part_trim.to_lowercase().as_str() {
228                "isoyear" => date_part(array.as_ref(), DatePart::YearISO)?,
229                "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
230                "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
231                "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
232                "isodow" => date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?,
233                "epoch" => epoch(array.as_ref())?,
234                _ => return exec_err!("Date part '{part}' not supported"),
235            }
236        };
237
238        Ok(if is_scalar {
239            ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
240        } else {
241            ColumnarValue::Array(arr)
242        })
243    }
244
245    // Only casting the year is supported since pruning other IntervalUnit is not possible
246    // date_part(col, YEAR) = 2024 => col >= '2024-01-01' and col < '2025-01-01'
247    // But for anything less than YEAR simplifying is not possible without specifying the bigger interval
248    // date_part(col, MONTH) = 1 => col = '2023-01-01' or col = '2024-01-01' or ... or col = '3000-01-01'
249    fn preimage(
250        &self,
251        args: &[Expr],
252        lit_expr: &Expr,
253        info: &SimplifyContext,
254    ) -> Result<PreimageResult> {
255        let [part, col_expr] = take_function_args(self.name(), args)?;
256
257        // Get the interval unit from the part argument
258        let interval_unit = part
259            .as_literal()
260            .and_then(|sv| sv.try_as_str().flatten())
261            .map(part_normalization)
262            .and_then(|s| IntervalUnit::from_str(s).ok());
263
264        // only support extracting year
265        match interval_unit {
266            Some(IntervalUnit::Year) => (),
267            _ => return Ok(PreimageResult::None),
268        }
269
270        // Check if the argument is a literal (e.g. date_part(YEAR, col) = 2024)
271        let Some(argument_literal) = lit_expr.as_literal() else {
272            return Ok(PreimageResult::None);
273        };
274
275        // Extract i32 year from Scalar value
276        let year = match argument_literal {
277            ScalarValue::Int32(Some(y)) => *y,
278            _ => return Ok(PreimageResult::None),
279        };
280
281        // Can only extract year from Date32/64 and Timestamp column
282        let target_type = match info.get_data_type(col_expr)? {
283            Date32 | Date64 | Timestamp(_, _) => &info.get_data_type(col_expr)?,
284            _ => return Ok(PreimageResult::None),
285        };
286
287        // Compute the Interval bounds
288        let Some(start_time) = NaiveDate::from_ymd_opt(year, 1, 1) else {
289            return Ok(PreimageResult::None);
290        };
291        let Some(end_time) = start_time.with_year(year + 1) else {
292            return Ok(PreimageResult::None);
293        };
294
295        // Convert to ScalarValues
296        let (Some(lower), Some(upper)) = (
297            date_to_scalar(start_time, target_type),
298            date_to_scalar(end_time, target_type),
299        ) else {
300            return Ok(PreimageResult::None);
301        };
302        let interval = Box::new(interval_arithmetic::Interval::try_new(lower, upper)?);
303
304        Ok(PreimageResult::Range {
305            expr: col_expr.clone(),
306            interval,
307        })
308    }
309
310    fn aliases(&self) -> &[String] {
311        &self.aliases
312    }
313
314    fn documentation(&self) -> Option<&Documentation> {
315        self.doc()
316    }
317}
318
319fn is_epoch(part: &str) -> bool {
320    let part = part_normalization(part);
321    matches!(part.to_lowercase().as_str(), "epoch")
322}
323
324fn date_to_scalar(date: NaiveDate, target_type: &DataType) -> Option<ScalarValue> {
325    Some(match target_type {
326        Date32 => ScalarValue::Date32(Some(Date32Type::from_naive_date(date))),
327        Date64 => ScalarValue::Date64(Some(Date64Type::from_naive_date(date))),
328
329        Timestamp(unit, tz_opt) => {
330            let naive_midnight = date.and_hms_opt(0, 0, 0)?;
331
332            let utc_dt = if let Some(tz_str) = tz_opt {
333                let tz: Tz = tz_str.parse().ok()?;
334
335                let local = tz.from_local_datetime(&naive_midnight);
336
337                let local_dt = match local {
338                    chrono::offset::LocalResult::Single(dt) => dt,
339                    chrono::offset::LocalResult::Ambiguous(dt1, _dt2) => dt1,
340                    chrono::offset::LocalResult::None => local.earliest()?,
341                };
342
343                local_dt.with_timezone(&Utc)
344            } else {
345                Utc.from_utc_datetime(&naive_midnight)
346            };
347
348            match unit {
349                Second => {
350                    ScalarValue::TimestampSecond(Some(utc_dt.timestamp()), tz_opt.clone())
351                }
352                Millisecond => ScalarValue::TimestampMillisecond(
353                    Some(utc_dt.timestamp_millis()),
354                    tz_opt.clone(),
355                ),
356                Microsecond => ScalarValue::TimestampMicrosecond(
357                    Some(utc_dt.timestamp_micros()),
358                    tz_opt.clone(),
359                ),
360                Nanosecond => ScalarValue::TimestampNanosecond(
361                    Some(utc_dt.timestamp_nanos_opt()?),
362                    tz_opt.clone(),
363                ),
364            }
365        }
366        _ => return None,
367    })
368}
369
370// Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error
371fn part_normalization(part: &str) -> &str {
372    part.strip_prefix(|c| c == '\'' || c == '\"')
373        .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
374        .unwrap_or(part)
375}
376
377/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
378/// result to a total number of seconds, milliseconds, microseconds or
379/// nanoseconds
380fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
381    // Nanosecond is neither supported in Postgres nor DuckDB, to avoid dealing
382    // with overflow and precision issue we don't support nanosecond
383    if unit == Nanosecond {
384        return not_impl_err!("Date part {unit:?} not supported");
385    }
386
387    let conversion_factor = match unit {
388        Second => 1_000_000_000,
389        Millisecond => 1_000_000,
390        Microsecond => 1_000,
391        Nanosecond => 1,
392    };
393
394    let second_factor = match unit {
395        Second => 1,
396        Millisecond => 1_000,
397        Microsecond => 1_000_000,
398        Nanosecond => 1_000_000_000,
399    };
400
401    let secs = date_part(array, DatePart::Second)?;
402    // This assumes array is primitive and not a dictionary
403    let secs = as_int32_array(secs.as_ref())?;
404    let subsecs = date_part(array, DatePart::Nanosecond)?;
405    let subsecs = as_int32_array(subsecs.as_ref())?;
406
407    // Special case where there are no nulls.
408    if subsecs.null_count() == 0 {
409        let r: Int32Array = binary(secs, subsecs, |secs, subsecs| {
410            secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
411        })?;
412        Ok(Arc::new(r))
413    } else {
414        // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case
415        // where the number of nanoseconds overflows.
416        let r: Int32Array = secs
417            .iter()
418            .zip(subsecs)
419            .map(|(secs, subsecs)| {
420                secs.map(|secs| {
421                    let subsecs = subsecs.unwrap_or(0);
422                    secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
423                })
424            })
425            .collect();
426        Ok(Arc::new(r))
427    }
428}
429
430/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
431/// result to a total number of seconds, milliseconds, microseconds or
432/// nanoseconds
433///
434/// Given epoch return f64, this is a duplicated function to optimize for f64 type
435fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
436    let sf = match unit {
437        Second => 1_f64,
438        Millisecond => 1_000_f64,
439        Microsecond => 1_000_000_f64,
440        Nanosecond => 1_000_000_000_f64,
441    };
442    let secs = date_part(array, DatePart::Second)?;
443    // This assumes array is primitive and not a dictionary
444    let secs = as_int32_array(secs.as_ref())?;
445    let subsecs = date_part(array, DatePart::Nanosecond)?;
446    let subsecs = as_int32_array(subsecs.as_ref())?;
447
448    // Special case where there are no nulls.
449    if subsecs.null_count() == 0 {
450        let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
451            (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
452        })?;
453        Ok(Arc::new(r))
454    } else {
455        // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case
456        // where the number of nanoseconds overflows.
457        let r: Float64Array = secs
458            .iter()
459            .zip(subsecs)
460            .map(|(secs, subsecs)| {
461                secs.map(|secs| {
462                    let subsecs = subsecs.unwrap_or(0);
463                    (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64))
464                        * sf
465                })
466            })
467            .collect();
468        Ok(Arc::new(r))
469    }
470}
471
472fn epoch(array: &dyn Array) -> Result<ArrayRef> {
473    const SECONDS_IN_A_DAY: f64 = 86400_f64;
474    // Note: Month-to-second conversion uses 30 days as an approximation.
475    // This matches PostgreSQL's behavior for interval epoch extraction,
476    // but does not represent exact calendar months (which vary 28-31 days).
477    // See: https://doxygen.postgresql.org/datatype_2timestamp_8h.html
478    const DAYS_PER_MONTH: f64 = 30_f64;
479
480    let f: Float64Array = match array.data_type() {
481        Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64),
482        Timestamp(Millisecond, _) => {
483            as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
484        }
485        Timestamp(Microsecond, _) => {
486            as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
487        }
488        Timestamp(Nanosecond, _) => {
489            as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
490        }
491        Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY),
492        Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64),
493        Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64),
494        Time32(Millisecond) => {
495            as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
496        }
497        Time64(Microsecond) => {
498            as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
499        }
500        Time64(Nanosecond) => {
501            as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
502        }
503        Interval(ArrowIntervalUnit::YearMonth) => as_interval_ym_array(array)?
504            .unary(|x| x as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY),
505        Interval(ArrowIntervalUnit::DayTime) => as_interval_dt_array(array)?.unary(|x| {
506            x.days as f64 * SECONDS_IN_A_DAY + x.milliseconds as f64 / 1_000_f64
507        }),
508        Interval(ArrowIntervalUnit::MonthDayNano) => {
509            as_interval_mdn_array(array)?.unary(|x| {
510                x.months as f64 * DAYS_PER_MONTH * SECONDS_IN_A_DAY
511                    + x.days as f64 * SECONDS_IN_A_DAY
512                    + x.nanoseconds as f64 / 1_000_000_000_f64
513            })
514        }
515        Duration(_) => return seconds(array, Second),
516        d => return exec_err!("Cannot convert {d:?} to epoch"),
517    };
518    Ok(Arc::new(f))
519}