datafusion_functions/datetime/
common.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::{Arc, LazyLock};
19
20use arrow::array::timezone::Tz;
21use arrow::array::{
22    Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
23    StringArrayType, StringViewArray,
24};
25use arrow::compute::DecimalCast;
26use arrow::compute::kernels::cast_utils::string_to_datetime;
27use arrow::datatypes::{DataType, TimeUnit};
28use arrow_buffer::ArrowNativeType;
29use chrono::LocalResult::Single;
30use chrono::format::{Parsed, StrftimeItems, parse};
31use chrono::{DateTime, TimeZone, Utc};
32use datafusion_common::cast::as_generic_string_array;
33use datafusion_common::{
34    DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
35    internal_datafusion_err, unwrap_or_internal_err,
36};
37use datafusion_expr::ColumnarValue;
38
39/// Error message if nanosecond conversion request beyond supported interval
40const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
41
42static UTC: LazyLock<Tz> = LazyLock::new(|| "UTC".parse().expect("UTC is always valid"));
43
44/// Converts a string representation of a date‑time into a timestamp expressed in
45/// nanoseconds since the Unix epoch.
46///
47/// This helper is a thin wrapper around the more general `string_to_datetime`
48/// function. It accepts an optional `timezone` which, if `None`, defaults to
49/// Coordinated Universal Time (UTC). The string `s` must contain a valid
50/// date‑time format that can be parsed by the underlying chrono parser.
51///
52/// # Return Value
53///
54/// * `Ok(i64)` – The number of nanoseconds since `1970‑01‑01T00:00:00Z`.
55/// * `Err(DataFusionError)` – If the string cannot be parsed, the parsed
56///   value is out of range (between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804)
57///   or the parsed value does not correspond to an unambiguous time.
58pub(crate) fn string_to_timestamp_nanos_with_timezone(
59    timezone: &Option<Tz>,
60    s: &str,
61) -> Result<i64> {
62    let tz = timezone.as_ref().unwrap_or(&UTC);
63    let dt = string_to_datetime(tz, s)?;
64    let parsed = dt
65        .timestamp_nanos_opt()
66        .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
67
68    Ok(parsed)
69}
70
71/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
72///
73/// [Utf8]: DataType::Utf8
74/// [LargeUtf8]: DataType::LargeUtf8
75/// [Utf8View]: DataType::Utf8View
76pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
77    for (idx, a) in args.iter().skip(1).enumerate() {
78        match a.data_type() {
79            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
80                // all good
81            }
82            _ => {
83                return exec_err!(
84                    "{name} function unsupported data type at index {}: {}",
85                    idx + 1,
86                    a.data_type()
87                );
88            }
89        }
90    }
91
92    Ok(())
93}
94
95/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers
96/// relative to the provided `timezone`
97///
98/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error
99/// will be returned
100///
101/// Note that parsing [IANA timezones] is not supported yet in chrono - <https://github.com/chronotope/chrono/issues/38>
102/// and this implementation only supports named timezones at the end of the string preceded by a space.
103///
104/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
105/// [IANA timezones]: https://www.iana.org/time-zones
106pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
107    timezone: &T,
108    s: &str,
109    format: &str,
110) -> Result<DateTime<T>, DataFusionError> {
111    let err = |err_ctx: &str| {
112        exec_datafusion_err!(
113            "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
114        )
115    };
116
117    let mut datetime_str = s;
118    let mut format = format;
119
120    // Manually handle the most common case of a named timezone at the end of the timestamp.
121    // Note that %+ handles 'Z' at the end of the string without a space. This code doesn't
122    // handle named timezones with no preceding space since that would require writing a
123    // custom parser (or switching to Jiff)
124    let tz: Option<chrono_tz::Tz> = if format.trim_end().ends_with(" %Z") {
125        // grab the string after the last space as the named timezone
126        if let Some((dt_str, timezone_name)) = datetime_str.trim_end().rsplit_once(' ') {
127            datetime_str = dt_str;
128
129            // attempt to parse the timezone name
130            let result: Result<chrono_tz::Tz, chrono_tz::ParseError> =
131                timezone_name.parse();
132            let Ok(tz) = result else {
133                return Err(err(&result.unwrap_err().to_string()));
134            };
135
136            // successfully parsed the timezone name, remove the ' %Z' from the format
137            format = &format[..format.len() - 3];
138
139            Some(tz)
140        } else {
141            None
142        }
143    } else if format.contains("%Z") {
144        return Err(err(
145            "'%Z' is only supported at the end of the format string preceded by a space",
146        ));
147    } else {
148        None
149    };
150
151    let mut parsed = Parsed::new();
152    parse(&mut parsed, datetime_str, StrftimeItems::new(format))
153        .map_err(|e| err(&e.to_string()))?;
154
155    let dt = match tz {
156        Some(tz) => {
157            // A timezone was manually parsed out, convert it to a fixed offset
158            match parsed.to_datetime_with_timezone(&tz) {
159                Ok(dt) => Ok(dt.fixed_offset()),
160                Err(e) => Err(e),
161            }
162        }
163        // default to parse the string assuming it has a timezone
164        None => parsed.to_datetime(),
165    };
166
167    if let Err(e) = &dt {
168        // no timezone or other failure, try without a timezone
169        let ndt = parsed
170            .to_naive_datetime_with_offset(0)
171            .or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
172        if let Err(e) = &ndt {
173            return Err(err(&e.to_string()));
174        }
175
176        if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
177            Ok(e.to_owned())
178        } else {
179            Err(err(&e.to_string()))
180        }
181    } else {
182        Ok(dt.unwrap().with_timezone(timezone))
183    }
184}
185
186/// Accepts a string with a `chrono` format and converts it to a
187/// nanosecond precision timestamp relative to the provided `timezone`.
188///
189/// See [`chrono::format::strftime`] for the full set of supported formats.
190///
191/// Implements the `to_timestamp` function to convert a string to a
192/// timestamp, following the model of spark SQL’s to_`timestamp`.
193///
194/// Internally, this function uses the `chrono` library for the
195/// datetime parsing
196///
197/// ## Timestamp Precision
198///
199/// Function uses the maximum precision timestamps supported by
200/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This
201/// means the range of dates that timestamps can represent is ~1677 AD
202/// to 2262 AM
203///
204/// ## Timezone / Offset Handling
205///
206/// Numerical values of timestamps are stored compared to offset UTC.
207///
208/// Any timestamp in the formatting string is handled according to the rules
209/// defined by `chrono`.
210///
211/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
212#[inline]
213pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
214    timezone: &Option<Tz>,
215    s: &str,
216    format: &str,
217) -> Result<i64, DataFusionError> {
218    let dt = string_to_datetime_formatted(timezone.as_ref().unwrap_or(&UTC), s, format)?;
219    let parsed = dt
220        .timestamp_nanos_opt()
221        .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
222
223    Ok(parsed)
224}
225
226/// Accepts a string with a `chrono` format and converts it to a
227/// millisecond precision timestamp relative to the provided `timezone`.
228///
229/// See [`chrono::format::strftime`] for the full set of supported formats.
230///
231/// Internally, this function uses the `chrono` library for the
232/// datetime parsing
233///
234/// ## Timezone / Offset Handling
235///
236/// Numerical values of timestamps are stored compared to offset UTC.
237///
238/// Any timestamp in the formatting string is handled according to the rules
239/// defined by `chrono`.
240///
241/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
242#[inline]
243pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
244    Ok(string_to_datetime_formatted(&Utc, s, format)?
245        .naive_utc()
246        .and_utc()
247        .timestamp_millis())
248}
249
250pub(crate) fn handle<O, F>(
251    args: &[ColumnarValue],
252    op: F,
253    name: &str,
254    dt: &DataType,
255) -> Result<ColumnarValue>
256where
257    O: ArrowPrimitiveType,
258    F: Fn(&str) -> Result<O::Native>,
259{
260    match &args[0] {
261        ColumnarValue::Array(a) => match a.data_type() {
262            DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
263                unary_string_to_primitive_function::<&StringViewArray, O, _>(
264                    &a.as_string_view(),
265                    op,
266                )?,
267            ))),
268            DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
269                unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
270                    &a.as_string::<i64>(),
271                    op,
272                )?,
273            ))),
274            DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
275                unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
276                    &a.as_string::<i32>(),
277                    op,
278                )?,
279            ))),
280            other => exec_err!("Unsupported data type {other:?} for function {name}"),
281        },
282        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
283            Some(a) => {
284                let result = a
285                    .as_ref()
286                    .map(|x| op(x))
287                    .transpose()?
288                    .and_then(|v| v.to_i64());
289                let s = scalar_value(dt, result)?;
290                Ok(ColumnarValue::Scalar(s))
291            }
292            _ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
293        },
294    }
295}
296
297// Given a function that maps a `&str`, `&str` to an arrow native type,
298// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
299// depending on the `args`'s variant.
300pub(crate) fn handle_multiple<O, F, M>(
301    args: &[ColumnarValue],
302    op: F,
303    op2: M,
304    name: &str,
305    dt: &DataType,
306) -> Result<ColumnarValue>
307where
308    O: ArrowPrimitiveType,
309    F: Fn(&str, &str) -> Result<O::Native>,
310    M: Fn(O::Native) -> O::Native,
311{
312    match &args[0] {
313        ColumnarValue::Array(a) => match a.data_type() {
314            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
315                // validate the column types
316                for (pos, arg) in args.iter().enumerate() {
317                    match arg {
318                        ColumnarValue::Array(arg) => match arg.data_type() {
319                            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
320                                // all good
321                            }
322                            other => {
323                                return exec_err!(
324                                    "Unsupported data type {other:?} for function {name}, arg # {pos}"
325                                );
326                            }
327                        },
328                        ColumnarValue::Scalar(arg) => {
329                            match arg.data_type() {
330                                DataType::Utf8View
331                                | DataType::LargeUtf8
332                                | DataType::Utf8 => {
333                                    // all good
334                                }
335                                other => {
336                                    return exec_err!(
337                                        "Unsupported data type {other:?} for function {name}, arg # {pos}"
338                                    );
339                                }
340                            }
341                        }
342                    }
343                }
344
345                Ok(ColumnarValue::Array(Arc::new(
346                    strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
347                )))
348            }
349            other => {
350                exec_err!("Unsupported data type {other:?} for function {name}")
351            }
352        },
353        // if the first argument is a scalar utf8 all arguments are expected to be scalar utf8
354        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
355            Some(a) => {
356                let a = a.as_ref();
357                // ASK: Why do we trust `a` to be non-null at this point?
358                let a = unwrap_or_internal_err!(a);
359
360                let mut ret = None;
361
362                for (pos, v) in args.iter().enumerate().skip(1) {
363                    let ColumnarValue::Scalar(
364                        ScalarValue::Utf8View(x)
365                        | ScalarValue::LargeUtf8(x)
366                        | ScalarValue::Utf8(x),
367                    ) = v
368                    else {
369                        return exec_err!(
370                            "Unsupported data type {v:?} for function {name}, arg # {pos}"
371                        );
372                    };
373
374                    if let Some(s) = x {
375                        match op(a, s.as_str()) {
376                            Ok(r) => {
377                                let result = op2(r).to_i64();
378                                let s = scalar_value(dt, result)?;
379                                ret = Some(Ok(ColumnarValue::Scalar(s)));
380                                break;
381                            }
382                            Err(e) => ret = Some(Err(e)),
383                        }
384                    }
385                }
386
387                unwrap_or_internal_err!(ret)
388            }
389            other => {
390                exec_err!("Unsupported data type {other:?} for function {name}")
391            }
392        },
393    }
394}
395
396/// given a function `op` that maps `&str`, `&str` to the first successful Result
397/// of an arrow native type, returns a `PrimitiveArray` after the application of the
398/// function to `args` and the subsequence application of the `op2` function to any
399/// successful result. This function calls the `op` function with the first and second
400/// argument and if not successful continues with first and third, first and fourth,
401/// etc until the result was successful or no more arguments are present.
402/// # Errors
403/// This function errors iff:
404/// * the number of arguments is not > 1 or
405/// * the function `op` errors for all input
406pub(crate) fn strings_to_primitive_function<O, F, F2>(
407    args: &[ColumnarValue],
408    op: F,
409    op2: F2,
410    name: &str,
411) -> Result<PrimitiveArray<O>>
412where
413    O: ArrowPrimitiveType,
414    F: Fn(&str, &str) -> Result<O::Native>,
415    F2: Fn(O::Native) -> O::Native,
416{
417    if args.len() < 2 {
418        return exec_err!(
419            "{:?} args were supplied but {} takes 2 or more arguments",
420            args.len(),
421            name
422        );
423    }
424
425    match &args[0] {
426        ColumnarValue::Array(a) => match a.data_type() {
427            DataType::Utf8View => {
428                let string_array = a.as_string_view();
429                handle_array_op::<O, &StringViewArray, F, F2>(
430                    &string_array,
431                    &args[1..],
432                    op,
433                    op2,
434                )
435            }
436            DataType::LargeUtf8 => {
437                let string_array = as_generic_string_array::<i64>(&a)?;
438                handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
439                    &string_array,
440                    &args[1..],
441                    op,
442                    op2,
443                )
444            }
445            DataType::Utf8 => {
446                let string_array = as_generic_string_array::<i32>(&a)?;
447                handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
448                    &string_array,
449                    &args[1..],
450                    op,
451                    op2,
452                )
453            }
454            other => exec_err!(
455                "Unsupported data type {other:?} for function substr,\
456                    expected Utf8View, Utf8 or LargeUtf8."
457            ),
458        },
459        other => exec_err!(
460            "Received {} data type, expected only array",
461            other.data_type()
462        ),
463    }
464}
465
466fn handle_array_op<'a, O, V, F, F2>(
467    first: &V,
468    args: &[ColumnarValue],
469    op: F,
470    op2: F2,
471) -> Result<PrimitiveArray<O>>
472where
473    V: StringArrayType<'a>,
474    O: ArrowPrimitiveType,
475    F: Fn(&str, &str) -> Result<O::Native>,
476    F2: Fn(O::Native) -> O::Native,
477{
478    first
479        .iter()
480        .enumerate()
481        .map(|(pos, x)| {
482            let mut val = None;
483            if let Some(x) = x {
484                for arg in args {
485                    let v = match arg {
486                        ColumnarValue::Array(a) => match a.data_type() {
487                            DataType::Utf8View => Ok(a.as_string_view().value(pos)),
488                            DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
489                            DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
490                            other => exec_err!("Unexpected type encountered '{other}'"),
491                        },
492                        ColumnarValue::Scalar(s) => match s.try_as_str() {
493                            Some(Some(v)) => Ok(v),
494                            Some(None) => continue, // null string
495                            None => exec_err!("Unexpected scalar type encountered '{s}'"),
496                        },
497                    }?;
498
499                    let r = op(x, v);
500                    if let Ok(inner) = r {
501                        val = Some(Ok(op2(inner)));
502                        break;
503                    } else {
504                        val = Some(r);
505                    }
506                }
507            };
508
509            val.transpose()
510        })
511        .collect()
512}
513
514/// given a function `op` that maps a `&str` to a Result of an arrow native type,
515/// returns a `PrimitiveArray` after the application
516/// of the function to `args[0]`.
517/// # Errors
518/// This function errors iff:
519/// * the number of arguments is not 1 or
520/// * the function `op` errors
521fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
522    array: &StringArrType,
523    op: F,
524) -> Result<PrimitiveArray<O>>
525where
526    StringArrType: StringArrayType<'a>,
527    O: ArrowPrimitiveType,
528    F: Fn(&'a str) -> Result<O::Native>,
529{
530    // first map is the iterator, second is for the `Option<_>`
531    array.iter().map(|x| x.map(&op).transpose()).collect()
532}
533
534fn scalar_value(dt: &DataType, r: Option<i64>) -> Result<ScalarValue> {
535    match dt {
536        DataType::Date32 => Ok(ScalarValue::Date32(r.and_then(|v| v.to_i32()))),
537        DataType::Timestamp(u, tz) => match u {
538            TimeUnit::Second => Ok(ScalarValue::TimestampSecond(r, tz.clone())),
539            TimeUnit::Millisecond => Ok(ScalarValue::TimestampMillisecond(r, tz.clone())),
540            TimeUnit::Microsecond => Ok(ScalarValue::TimestampMicrosecond(r, tz.clone())),
541            TimeUnit::Nanosecond => Ok(ScalarValue::TimestampNanosecond(r, tz.clone())),
542        },
543        t => Err(internal_datafusion_err!("Unsupported data type: {t:?}")),
544    }
545}