datafusion_functions/datetime/
common.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{
21    Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
22    StringArrayType, StringViewArray,
23};
24use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
25use arrow::datatypes::DataType;
26use chrono::format::{parse, Parsed, StrftimeItems};
27use chrono::LocalResult::Single;
28use chrono::{DateTime, TimeZone, Utc};
29
30use datafusion_common::cast::as_generic_string_array;
31use datafusion_common::{
32    exec_datafusion_err, exec_err, unwrap_or_internal_err, DataFusionError, Result,
33    ScalarType, ScalarValue,
34};
35use datafusion_expr::ColumnarValue;
36
37/// Error message if nanosecond conversion request beyond supported interval
38const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
39
40/// Calls string_to_timestamp_nanos and converts the error type
41pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
42    string_to_timestamp_nanos(s).map_err(|e| e.into())
43}
44
45/// Checks that all the arguments from the second are of type [Utf8], [LargeUtf8] or [Utf8View]
46///
47/// [Utf8]: DataType::Utf8
48/// [LargeUtf8]: DataType::LargeUtf8
49/// [Utf8View]: DataType::Utf8View
50pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
51    for (idx, a) in args.iter().skip(1).enumerate() {
52        match a.data_type() {
53            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
54                // all good
55            }
56            _ => {
57                return exec_err!(
58                    "{name} function unsupported data type at index {}: {}",
59                    idx + 1,
60                    a.data_type()
61                );
62            }
63        }
64    }
65
66    Ok(())
67}
68
69/// Accepts a string and parses it using the [`chrono::format::strftime`] specifiers
70/// relative to the provided `timezone`
71///
72/// [IANA timezones] are only supported if the `arrow-array/chrono-tz` feature is enabled
73///
74/// * `2023-01-01 040506 America/Los_Angeles`
75///
76/// If a timestamp is ambiguous, for example as a result of daylight-savings time, an error
77/// will be returned
78///
79/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
80/// [IANA timezones]: https://www.iana.org/time-zones
81pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
82    timezone: &T,
83    s: &str,
84    format: &str,
85) -> Result<DateTime<T>, DataFusionError> {
86    let err = |err_ctx: &str| {
87        exec_datafusion_err!(
88            "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
89        )
90    };
91
92    let mut parsed = Parsed::new();
93    parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
94
95    // attempt to parse the string assuming it has a timezone
96    let dt = parsed.to_datetime();
97
98    if let Err(e) = &dt {
99        // no timezone or other failure, try without a timezone
100        let ndt = parsed
101            .to_naive_datetime_with_offset(0)
102            .or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
103        if let Err(e) = &ndt {
104            return Err(err(&e.to_string()));
105        }
106
107        if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
108            Ok(e.to_owned())
109        } else {
110            Err(err(&e.to_string()))
111        }
112    } else {
113        Ok(dt.unwrap().with_timezone(timezone))
114    }
115}
116
117/// Accepts a string with a `chrono` format and converts it to a
118/// nanosecond precision timestamp.
119///
120/// See [`chrono::format::strftime`] for the full set of supported formats.
121///
122/// Implements the `to_timestamp` function to convert a string to a
123/// timestamp, following the model of spark SQL’s to_`timestamp`.
124///
125/// Internally, this function uses the `chrono` library for the
126/// datetime parsing
127///
128/// ## Timestamp Precision
129///
130/// Function uses the maximum precision timestamps supported by
131/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This
132/// means the range of dates that timestamps can represent is ~1677 AD
133/// to 2262 AM
134///
135/// ## Timezone / Offset Handling
136///
137/// Numerical values of timestamps are stored compared to offset UTC.
138///
139/// Any timestamp in the formatting string is handled according to the rules
140/// defined by `chrono`.
141///
142/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
143#[inline]
144pub(crate) fn string_to_timestamp_nanos_formatted(
145    s: &str,
146    format: &str,
147) -> Result<i64, DataFusionError> {
148    string_to_datetime_formatted(&Utc, s, format)?
149        .naive_utc()
150        .and_utc()
151        .timestamp_nanos_opt()
152        .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
153}
154
155/// Accepts a string with a `chrono` format and converts it to a
156/// millisecond precision timestamp.
157///
158/// See [`chrono::format::strftime`] for the full set of supported formats.
159///
160/// Internally, this function uses the `chrono` library for the
161/// datetime parsing
162///
163/// ## Timezone / Offset Handling
164///
165/// Numerical values of timestamps are stored compared to offset UTC.
166///
167/// Any timestamp in the formatting string is handled according to the rules
168/// defined by `chrono`.
169///
170/// [`chrono::format::strftime`]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html
171#[inline]
172pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
173    Ok(string_to_datetime_formatted(&Utc, s, format)?
174        .naive_utc()
175        .and_utc()
176        .timestamp_millis())
177}
178
179pub(crate) fn handle<O, F, S>(
180    args: &[ColumnarValue],
181    op: F,
182    name: &str,
183) -> Result<ColumnarValue>
184where
185    O: ArrowPrimitiveType,
186    S: ScalarType<O::Native>,
187    F: Fn(&str) -> Result<O::Native>,
188{
189    match &args[0] {
190        ColumnarValue::Array(a) => match a.data_type() {
191            DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
192                unary_string_to_primitive_function::<&StringViewArray, O, _>(
193                    a.as_ref().as_string_view(),
194                    op,
195                )?,
196            ))),
197            DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
198                unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
199                    a.as_ref().as_string::<i64>(),
200                    op,
201                )?,
202            ))),
203            DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
204                unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
205                    a.as_ref().as_string::<i32>(),
206                    op,
207                )?,
208            ))),
209            other => exec_err!("Unsupported data type {other:?} for function {name}"),
210        },
211        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
212            Some(a) => {
213                let result = a.as_ref().map(|x| op(x)).transpose()?;
214                Ok(ColumnarValue::Scalar(S::scalar(result)))
215            }
216            _ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
217        },
218    }
219}
220
221// Given a function that maps a `&str`, `&str` to an arrow native type,
222// returns a `ColumnarValue` where the function is applied to either a `ArrayRef` or `ScalarValue`
223// depending on the `args`'s variant.
224pub(crate) fn handle_multiple<O, F, S, M>(
225    args: &[ColumnarValue],
226    op: F,
227    op2: M,
228    name: &str,
229) -> Result<ColumnarValue>
230where
231    O: ArrowPrimitiveType,
232    S: ScalarType<O::Native>,
233    F: Fn(&str, &str) -> Result<O::Native>,
234    M: Fn(O::Native) -> O::Native,
235{
236    match &args[0] {
237        ColumnarValue::Array(a) => match a.data_type() {
238            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
239                // validate the column types
240                for (pos, arg) in args.iter().enumerate() {
241                    match arg {
242                        ColumnarValue::Array(arg) => match arg.data_type() {
243                            DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
244                                // all good
245                            }
246                            other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
247                        },
248                        ColumnarValue::Scalar(arg) => {
249                            match arg.data_type() {
250                                DataType::Utf8View| DataType::LargeUtf8 | DataType::Utf8 => {
251                                    // all good
252                                }
253                                other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
254                            }
255                        }
256                    }
257                }
258
259                Ok(ColumnarValue::Array(Arc::new(
260                    strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
261                )))
262            }
263            other => {
264                exec_err!("Unsupported data type {other:?} for function {name}")
265            }
266        },
267        // if the first argument is a scalar utf8 all arguments are expected to be scalar utf8
268        ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
269            Some(a) => {
270                let a = a.as_ref();
271                // ASK: Why do we trust `a` to be non-null at this point?
272                let a = unwrap_or_internal_err!(a);
273
274                let mut ret = None;
275
276                for (pos, v) in args.iter().enumerate().skip(1) {
277                    let ColumnarValue::Scalar(
278                        ScalarValue::Utf8View(x)
279                        | ScalarValue::LargeUtf8(x)
280                        | ScalarValue::Utf8(x),
281                    ) = v
282                    else {
283                        return exec_err!("Unsupported data type {v:?} for function {name}, arg # {pos}");
284                    };
285
286                    if let Some(s) = x {
287                        match op(a, s.as_str()) {
288                            Ok(r) => {
289                                ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
290                                    op2(r),
291                                )))));
292                                break;
293                            }
294                            Err(e) => ret = Some(Err(e)),
295                        }
296                    }
297                }
298
299                unwrap_or_internal_err!(ret)
300            }
301            other => {
302                exec_err!("Unsupported data type {other:?} for function {name}")
303            }
304        },
305    }
306}
307
308/// given a function `op` that maps `&str`, `&str` to the first successful Result
309/// of an arrow native type, returns a `PrimitiveArray` after the application of the
310/// function to `args` and the subsequence application of the `op2` function to any
311/// successful result. This function calls the `op` function with the first and second
312/// argument and if not successful continues with first and third, first and fourth,
313/// etc until the result was successful or no more arguments are present.
314/// # Errors
315/// This function errors iff:
316/// * the number of arguments is not > 1 or
317/// * the function `op` errors for all input
318pub(crate) fn strings_to_primitive_function<O, F, F2>(
319    args: &[ColumnarValue],
320    op: F,
321    op2: F2,
322    name: &str,
323) -> Result<PrimitiveArray<O>>
324where
325    O: ArrowPrimitiveType,
326    F: Fn(&str, &str) -> Result<O::Native>,
327    F2: Fn(O::Native) -> O::Native,
328{
329    if args.len() < 2 {
330        return exec_err!(
331            "{:?} args were supplied but {} takes 2 or more arguments",
332            args.len(),
333            name
334        );
335    }
336
337    match &args[0] {
338        ColumnarValue::Array(a) => match a.data_type() {
339            DataType::Utf8View => {
340                let string_array = a.as_string_view();
341                handle_array_op::<O, &StringViewArray, F, F2>(
342                    &string_array,
343                    &args[1..],
344                    op,
345                    op2,
346                )
347            }
348            DataType::LargeUtf8 => {
349                let string_array = as_generic_string_array::<i64>(&a)?;
350                handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
351                    &string_array,
352                    &args[1..],
353                    op,
354                    op2,
355                )
356            }
357            DataType::Utf8 => {
358                let string_array = as_generic_string_array::<i32>(&a)?;
359                handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
360                    &string_array,
361                    &args[1..],
362                    op,
363                    op2,
364                )
365            }
366            other => exec_err!(
367                "Unsupported data type {other:?} for function substr,\
368                    expected Utf8View, Utf8 or LargeUtf8."
369            ),
370        },
371        other => exec_err!(
372            "Received {} data type, expected only array",
373            other.data_type()
374        ),
375    }
376}
377
378fn handle_array_op<'a, O, V, F, F2>(
379    first: &V,
380    args: &[ColumnarValue],
381    op: F,
382    op2: F2,
383) -> Result<PrimitiveArray<O>>
384where
385    V: StringArrayType<'a>,
386    O: ArrowPrimitiveType,
387    F: Fn(&str, &str) -> Result<O::Native>,
388    F2: Fn(O::Native) -> O::Native,
389{
390    first
391        .iter()
392        .enumerate()
393        .map(|(pos, x)| {
394            let mut val = None;
395            if let Some(x) = x {
396                for arg in args {
397                    let v = match arg {
398                        ColumnarValue::Array(a) => match a.data_type() {
399                            DataType::Utf8View => Ok(a.as_string_view().value(pos)),
400                            DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
401                            DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
402                            other => exec_err!("Unexpected type encountered '{other}'"),
403                        },
404                        ColumnarValue::Scalar(s) => match s.try_as_str() {
405                            Some(Some(v)) => Ok(v),
406                            Some(None) => continue, // null string
407                            None => exec_err!("Unexpected scalar type encountered '{s}'"),
408                        },
409                    }?;
410
411                    let r = op(x, v);
412                    if let Ok(inner) = r {
413                        val = Some(Ok(op2(inner)));
414                        break;
415                    } else {
416                        val = Some(r);
417                    }
418                }
419            };
420
421            val.transpose()
422        })
423        .collect()
424}
425
426/// given a function `op` that maps a `&str` to a Result of an arrow native type,
427/// returns a `PrimitiveArray` after the application
428/// of the function to `args[0]`.
429/// # Errors
430/// This function errors iff:
431/// * the number of arguments is not 1 or
432/// * the function `op` errors
433fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
434    array: StringArrType,
435    op: F,
436) -> Result<PrimitiveArray<O>>
437where
438    StringArrType: StringArrayType<'a>,
439    O: ArrowPrimitiveType,
440    F: Fn(&'a str) -> Result<O::Native>,
441{
442    // first map is the iterator, second is for the `Option<_>`
443    array.iter().map(|x| x.map(&op).transpose()).collect()
444}