datafusion_functions/datetime/
date_part.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::str::FromStr;
20use std::sync::Arc;
21
22use arrow::array::{Array, ArrayRef, Float64Array, Int32Array};
23use arrow::compute::kernels::cast_utils::IntervalUnit;
24use arrow::compute::{binary, date_part, DatePart};
25use arrow::datatypes::DataType::{
26    Date32, Date64, Duration, Interval, Time32, Time64, Timestamp,
27};
28use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
29use arrow::datatypes::{DataType, Field, FieldRef, TimeUnit};
30use datafusion_common::types::{logical_date, NativeType};
31
32use datafusion_common::{
33    cast::{
34        as_date32_array, as_date64_array, as_int32_array, as_time32_millisecond_array,
35        as_time32_second_array, as_time64_microsecond_array, as_time64_nanosecond_array,
36        as_timestamp_microsecond_array, as_timestamp_millisecond_array,
37        as_timestamp_nanosecond_array, as_timestamp_second_array,
38    },
39    exec_err, internal_err, not_impl_err,
40    types::logical_string,
41    utils::take_function_args,
42    Result, ScalarValue,
43};
44use datafusion_expr::{
45    ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature,
46    TypeSignature, Volatility,
47};
48use datafusion_expr_common::signature::{Coercion, TypeSignatureClass};
49use datafusion_macros::user_doc;
50
51#[user_doc(
52    doc_section(label = "Time and Date Functions"),
53    description = "Returns the specified part of the date as an integer.",
54    syntax_example = "date_part(part, expression)",
55    alternative_syntax = "extract(field FROM source)",
56    argument(
57        name = "part",
58        description = r#"Part of the date to return. The following date parts are supported:
59        
60    - year
61    - quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
62    - month
63    - week (week of the year)
64    - day (day of the month)
65    - hour
66    - minute
67    - second
68    - millisecond
69    - microsecond
70    - nanosecond
71    - dow (day of the week where Sunday is 0)
72    - doy (day of the year)
73    - epoch (seconds since Unix epoch)
74    - isodow (day of the week where Monday is 0)
75"#
76    ),
77    argument(
78        name = "expression",
79        description = "Time expression to operate on. Can be a constant, column, or function."
80    )
81)]
82#[derive(Debug, PartialEq, Eq, Hash)]
83pub struct DatePartFunc {
84    signature: Signature,
85    aliases: Vec<String>,
86}
87
88impl Default for DatePartFunc {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl DatePartFunc {
95    pub fn new() -> Self {
96        Self {
97            signature: Signature::one_of(
98                vec![
99                    TypeSignature::Coercible(vec![
100                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
101                        Coercion::new_implicit(
102                            TypeSignatureClass::Timestamp,
103                            // Not consistent with Postgres and DuckDB but to avoid regression we implicit cast string to timestamp
104                            vec![TypeSignatureClass::Native(logical_string())],
105                            NativeType::Timestamp(Nanosecond, None),
106                        ),
107                    ]),
108                    TypeSignature::Coercible(vec![
109                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
110                        Coercion::new_exact(TypeSignatureClass::Native(logical_date())),
111                    ]),
112                    TypeSignature::Coercible(vec![
113                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
114                        Coercion::new_exact(TypeSignatureClass::Time),
115                    ]),
116                    TypeSignature::Coercible(vec![
117                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
118                        Coercion::new_exact(TypeSignatureClass::Interval),
119                    ]),
120                    TypeSignature::Coercible(vec![
121                        Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
122                        Coercion::new_exact(TypeSignatureClass::Duration),
123                    ]),
124                ],
125                Volatility::Immutable,
126            ),
127            aliases: vec![String::from("datepart")],
128        }
129    }
130}
131
132impl ScalarUDFImpl for DatePartFunc {
133    fn as_any(&self) -> &dyn Any {
134        self
135    }
136
137    fn name(&self) -> &str {
138        "date_part"
139    }
140
141    fn signature(&self) -> &Signature {
142        &self.signature
143    }
144
145    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
146        internal_err!("return_field_from_args should be called instead")
147    }
148
149    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
150        let [field, _] = take_function_args(self.name(), args.scalar_arguments)?;
151
152        field
153            .and_then(|sv| {
154                sv.try_as_str()
155                    .flatten()
156                    .filter(|s| !s.is_empty())
157                    .map(|part| {
158                        if is_epoch(part) {
159                            Field::new(self.name(), DataType::Float64, true)
160                        } else {
161                            Field::new(self.name(), DataType::Int32, true)
162                        }
163                    })
164            })
165            .map(Arc::new)
166            .map_or_else(
167                || exec_err!("{} requires non-empty constant string", self.name()),
168                Ok,
169            )
170    }
171
172    fn invoke_with_args(
173        &self,
174        args: datafusion_expr::ScalarFunctionArgs,
175    ) -> Result<ColumnarValue> {
176        let args = args.args;
177        let [part, array] = take_function_args(self.name(), args)?;
178
179        let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
180            v
181        } else if let ColumnarValue::Scalar(ScalarValue::Utf8View(Some(v))) = part {
182            v
183        } else {
184            return exec_err!(
185                "First argument of `DATE_PART` must be non-null scalar Utf8"
186            );
187        };
188
189        let is_scalar = matches!(array, ColumnarValue::Scalar(_));
190
191        let array = match array {
192            ColumnarValue::Array(array) => Arc::clone(&array),
193            ColumnarValue::Scalar(scalar) => scalar.to_array()?,
194        };
195
196        let part_trim = part_normalization(&part);
197
198        // using IntervalUnit here means we hand off all the work of supporting plurals (like "seconds")
199        // and synonyms ( like "ms,msec,msecond,millisecond") to Arrow
200        let arr = if let Ok(interval_unit) = IntervalUnit::from_str(part_trim) {
201            match interval_unit {
202                IntervalUnit::Year => date_part(array.as_ref(), DatePart::Year)?,
203                IntervalUnit::Month => date_part(array.as_ref(), DatePart::Month)?,
204                IntervalUnit::Week => date_part(array.as_ref(), DatePart::Week)?,
205                IntervalUnit::Day => date_part(array.as_ref(), DatePart::Day)?,
206                IntervalUnit::Hour => date_part(array.as_ref(), DatePart::Hour)?,
207                IntervalUnit::Minute => date_part(array.as_ref(), DatePart::Minute)?,
208                IntervalUnit::Second => seconds_as_i32(array.as_ref(), Second)?,
209                IntervalUnit::Millisecond => seconds_as_i32(array.as_ref(), Millisecond)?,
210                IntervalUnit::Microsecond => seconds_as_i32(array.as_ref(), Microsecond)?,
211                IntervalUnit::Nanosecond => seconds_as_i32(array.as_ref(), Nanosecond)?,
212                // century and decade are not supported by `DatePart`, although they are supported in postgres
213                _ => return exec_err!("Date part '{part}' not supported"),
214            }
215        } else {
216            // special cases that can be extracted (in postgres) but are not interval units
217            match part_trim.to_lowercase().as_str() {
218                "qtr" | "quarter" => date_part(array.as_ref(), DatePart::Quarter)?,
219                "doy" => date_part(array.as_ref(), DatePart::DayOfYear)?,
220                "dow" => date_part(array.as_ref(), DatePart::DayOfWeekSunday0)?,
221                "isodow" => date_part(array.as_ref(), DatePart::DayOfWeekMonday0)?,
222                "epoch" => epoch(array.as_ref())?,
223                _ => return exec_err!("Date part '{part}' not supported"),
224            }
225        };
226
227        Ok(if is_scalar {
228            ColumnarValue::Scalar(ScalarValue::try_from_array(arr.as_ref(), 0)?)
229        } else {
230            ColumnarValue::Array(arr)
231        })
232    }
233
234    fn aliases(&self) -> &[String] {
235        &self.aliases
236    }
237
238    fn documentation(&self) -> Option<&Documentation> {
239        self.doc()
240    }
241}
242
243fn is_epoch(part: &str) -> bool {
244    let part = part_normalization(part);
245    matches!(part.to_lowercase().as_str(), "epoch")
246}
247
248// Try to remove quote if exist, if the quote is invalid, return original string and let the downstream function handle the error
249fn part_normalization(part: &str) -> &str {
250    part.strip_prefix(|c| c == '\'' || c == '\"')
251        .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"'))
252        .unwrap_or(part)
253}
254
255/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
256/// result to a total number of seconds, milliseconds, microseconds or
257/// nanoseconds
258fn seconds_as_i32(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
259    // Nanosecond is neither supported in Postgres nor DuckDB, to avoid dealing
260    // with overflow and precision issue we don't support nanosecond
261    if unit == Nanosecond {
262        return not_impl_err!("Date part {unit:?} not supported");
263    }
264
265    let conversion_factor = match unit {
266        Second => 1_000_000_000,
267        Millisecond => 1_000_000,
268        Microsecond => 1_000,
269        Nanosecond => 1,
270    };
271
272    let second_factor = match unit {
273        Second => 1,
274        Millisecond => 1_000,
275        Microsecond => 1_000_000,
276        Nanosecond => 1_000_000_000,
277    };
278
279    let secs = date_part(array, DatePart::Second)?;
280    // This assumes array is primitive and not a dictionary
281    let secs = as_int32_array(secs.as_ref())?;
282    let subsecs = date_part(array, DatePart::Nanosecond)?;
283    let subsecs = as_int32_array(subsecs.as_ref())?;
284
285    // Special case where there are no nulls.
286    if subsecs.null_count() == 0 {
287        let r: Int32Array = binary(secs, subsecs, |secs, subsecs| {
288            secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
289        })?;
290        Ok(Arc::new(r))
291    } else {
292        // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case
293        // where the number of nanoseconds overflows.
294        let r: Int32Array = secs
295            .iter()
296            .zip(subsecs)
297            .map(|(secs, subsecs)| {
298                secs.map(|secs| {
299                    let subsecs = subsecs.unwrap_or(0);
300                    secs * second_factor + (subsecs % 1_000_000_000) / conversion_factor
301                })
302            })
303            .collect();
304        Ok(Arc::new(r))
305    }
306}
307
308/// Invoke [`date_part`] on an `array` (e.g. Timestamp) and convert the
309/// result to a total number of seconds, milliseconds, microseconds or
310/// nanoseconds
311///
312/// Given epoch return f64, this is a duplicated function to optimize for f64 type
313fn seconds(array: &dyn Array, unit: TimeUnit) -> Result<ArrayRef> {
314    let sf = match unit {
315        Second => 1_f64,
316        Millisecond => 1_000_f64,
317        Microsecond => 1_000_000_f64,
318        Nanosecond => 1_000_000_000_f64,
319    };
320    let secs = date_part(array, DatePart::Second)?;
321    // This assumes array is primitive and not a dictionary
322    let secs = as_int32_array(secs.as_ref())?;
323    let subsecs = date_part(array, DatePart::Nanosecond)?;
324    let subsecs = as_int32_array(subsecs.as_ref())?;
325
326    // Special case where there are no nulls.
327    if subsecs.null_count() == 0 {
328        let r: Float64Array = binary(secs, subsecs, |secs, subsecs| {
329            (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64)) * sf
330        })?;
331        Ok(Arc::new(r))
332    } else {
333        // Nulls in secs are preserved, nulls in subsecs are treated as zero to account for the case
334        // where the number of nanoseconds overflows.
335        let r: Float64Array = secs
336            .iter()
337            .zip(subsecs)
338            .map(|(secs, subsecs)| {
339                secs.map(|secs| {
340                    let subsecs = subsecs.unwrap_or(0);
341                    (secs as f64 + ((subsecs % 1_000_000_000) as f64 / 1_000_000_000_f64))
342                        * sf
343                })
344            })
345            .collect();
346        Ok(Arc::new(r))
347    }
348}
349
350fn epoch(array: &dyn Array) -> Result<ArrayRef> {
351    const SECONDS_IN_A_DAY: f64 = 86400_f64;
352
353    let f: Float64Array = match array.data_type() {
354        Timestamp(Second, _) => as_timestamp_second_array(array)?.unary(|x| x as f64),
355        Timestamp(Millisecond, _) => {
356            as_timestamp_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
357        }
358        Timestamp(Microsecond, _) => {
359            as_timestamp_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
360        }
361        Timestamp(Nanosecond, _) => {
362            as_timestamp_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
363        }
364        Date32 => as_date32_array(array)?.unary(|x| x as f64 * SECONDS_IN_A_DAY),
365        Date64 => as_date64_array(array)?.unary(|x| x as f64 / 1_000_f64),
366        Time32(Second) => as_time32_second_array(array)?.unary(|x| x as f64),
367        Time32(Millisecond) => {
368            as_time32_millisecond_array(array)?.unary(|x| x as f64 / 1_000_f64)
369        }
370        Time64(Microsecond) => {
371            as_time64_microsecond_array(array)?.unary(|x| x as f64 / 1_000_000_f64)
372        }
373        Time64(Nanosecond) => {
374            as_time64_nanosecond_array(array)?.unary(|x| x as f64 / 1_000_000_000_f64)
375        }
376        Interval(_) | Duration(_) => return seconds(array, Second),
377        d => return exec_err!("Cannot convert {d:?} to epoch"),
378    };
379    Ok(Arc::new(f))
380}