datafusion_functions/datetime/
to_local_time.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ops::Add;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{Array, ArrayRef, PrimitiveBuilder};
24use arrow::datatypes::DataType::Timestamp;
25use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
26use arrow::datatypes::{
27    ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
28    TimestampNanosecondType, TimestampSecondType,
29};
30use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
31
32use datafusion_common::cast::as_primitive_array;
33use datafusion_common::{
34    exec_err, internal_datafusion_err, plan_err, utils::take_function_args, Result,
35    ScalarValue,
36};
37use datafusion_expr::{
38    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
39};
40use datafusion_macros::user_doc;
41
42/// A UDF function that converts a timezone-aware timestamp to local time (with no offset or
43/// timezone information). In other words, this function strips off the timezone from the timestamp,
44/// while keep the display value of the timestamp the same.
45#[user_doc(
46    doc_section(label = "Time and Date Functions"),
47    description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
48    syntax_example = "to_local_time(expression)",
49    sql_example = r#"```sql
50> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
51+---------------------------------------------+
52| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
53+---------------------------------------------+
54| 2024-04-01T00:00:20                         |
55+---------------------------------------------+
56
57> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
58+---------------------------------------------+
59| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
60+---------------------------------------------+
61| 2024-04-01T00:00:20                         |
62+---------------------------------------------+
63
64> SELECT
65  time,
66  arrow_typeof(time) as type,
67  to_local_time(time) as to_local_time,
68  arrow_typeof(to_local_time(time)) as to_local_time_type
69FROM (
70  SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
71);
72+---------------------------+----------------------------------+---------------------+--------------------+
73| time                      | type                             | to_local_time       | to_local_time_type |
74+---------------------------+----------------------------------+---------------------+--------------------+
75| 2024-04-01T00:00:20+02:00 | Timestamp(ns, "Europe/Brussels") | 2024-04-01T00:00:20 | Timestamp(ns)      |
76+---------------------------+----------------------------------+---------------------+--------------------+
77
78# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
79# than UTC boundaries
80
81> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
82+---------------------+
83| date_bin            |
84+---------------------+
85| 2024-04-01T00:00:00 |
86+---------------------+
87
88> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
89+---------------------------+
90| date_bin_with_timezone    |
91+---------------------------+
92| 2024-04-01T00:00:00+02:00 |
93+---------------------------+
94```"#,
95    argument(
96        name = "expression",
97        description = "Time expression to operate on. Can be a constant, column, or function."
98    )
99)]
100#[derive(Debug, PartialEq, Eq, Hash)]
101pub struct ToLocalTimeFunc {
102    signature: Signature,
103}
104
105impl Default for ToLocalTimeFunc {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl ToLocalTimeFunc {
112    pub fn new() -> Self {
113        Self {
114            signature: Signature::user_defined(Volatility::Immutable),
115        }
116    }
117
118    fn to_local_time(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
119        let [time_value] = take_function_args(self.name(), args)?;
120
121        let arg_type = time_value.data_type();
122        match arg_type {
123            Timestamp(_, None) => {
124                // if no timezone specified, just return the input
125                Ok(time_value.clone())
126            }
127            // If has timezone, adjust the underlying time value. The current time value
128            // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
129            // we need to adjust the time value to the local time. See [`adjust_to_local_time`]
130            // for more details.
131            //
132            // Then remove the timezone in return type, i.e. return None
133            Timestamp(_, Some(timezone)) => {
134                let tz: Tz = timezone.parse()?;
135
136                match time_value {
137                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
138                        Some(ts),
139                        Some(_),
140                    )) => {
141                        let adjusted_ts =
142                            adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
143                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
144                            Some(adjusted_ts),
145                            None,
146                        )))
147                    }
148                    ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
149                        Some(ts),
150                        Some(_),
151                    )) => {
152                        let adjusted_ts =
153                            adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
154                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
155                            Some(adjusted_ts),
156                            None,
157                        )))
158                    }
159                    ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
160                        Some(ts),
161                        Some(_),
162                    )) => {
163                        let adjusted_ts =
164                            adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
165                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
166                            Some(adjusted_ts),
167                            None,
168                        )))
169                    }
170                    ColumnarValue::Scalar(ScalarValue::TimestampSecond(
171                        Some(ts),
172                        Some(_),
173                    )) => {
174                        let adjusted_ts =
175                            adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
176                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
177                            Some(adjusted_ts),
178                            None,
179                        )))
180                    }
181                    ColumnarValue::Array(array) => {
182                        fn transform_array<T: ArrowTimestampType>(
183                            array: &ArrayRef,
184                            tz: Tz,
185                        ) -> Result<ColumnarValue> {
186                            let mut builder = PrimitiveBuilder::<T>::new();
187
188                            let primitive_array = as_primitive_array::<T>(array)?;
189                            for ts_opt in primitive_array.iter() {
190                                match ts_opt {
191                                    None => builder.append_null(),
192                                    Some(ts) => {
193                                        let adjusted_ts: i64 =
194                                            adjust_to_local_time::<T>(ts, tz)?;
195                                        builder.append_value(adjusted_ts)
196                                    }
197                                }
198                            }
199
200                            Ok(ColumnarValue::Array(Arc::new(builder.finish())))
201                        }
202
203                        match array.data_type() {
204                            Timestamp(_, None) => {
205                                // if no timezone specified, just return the input
206                                Ok(time_value.clone())
207                            }
208                            Timestamp(Nanosecond, Some(_)) => {
209                                transform_array::<TimestampNanosecondType>(array, tz)
210                            }
211                            Timestamp(Microsecond, Some(_)) => {
212                                transform_array::<TimestampMicrosecondType>(array, tz)
213                            }
214                            Timestamp(Millisecond, Some(_)) => {
215                                transform_array::<TimestampMillisecondType>(array, tz)
216                            }
217                            Timestamp(Second, Some(_)) => {
218                                transform_array::<TimestampSecondType>(array, tz)
219                            }
220                            _ => {
221                                exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type())
222                            }
223                        }
224                    }
225                    _ => {
226                        exec_err!(
227                        "to_local_time function requires timestamp argument, got {:?}",
228                        time_value.data_type()
229                    )
230                    }
231                }
232            }
233            _ => {
234                exec_err!(
235                    "to_local_time function requires timestamp argument, got {:?}",
236                    arg_type
237                )
238            }
239        }
240    }
241}
242
243/// This function converts a timestamp with a timezone to a timestamp without a timezone.
244/// The display value of the adjusted timestamp remain the same, but the underlying timestamp
245/// representation is adjusted according to the relative timezone offset to UTC.
246///
247/// This function uses chrono to handle daylight saving time changes.
248///
249/// For example,
250///
251/// ```text
252/// '2019-03-31T01:00:00Z'::timestamp at time zone 'Europe/Brussels'
253/// ```
254///
255/// is displayed as follows in datafusion-cli:
256///
257/// ```text
258/// 2019-03-31T01:00:00+01:00
259/// ```
260///
261/// and is represented in DataFusion as:
262///
263/// ```text
264/// TimestampNanosecond(Some(1_553_990_400_000_000_000), Some("Europe/Brussels"))
265/// ```
266///
267/// To strip off the timezone while keeping the display value the same, we need to
268/// adjust the underlying timestamp with the timezone offset value using `adjust_to_local_time()`
269///
270/// ```text
271/// adjust_to_local_time(1_553_990_400_000_000_000, "Europe/Brussels") --> 1_553_994_000_000_000_000
272/// ```
273///
274/// The difference between `1_553_990_400_000_000_000` and `1_553_994_000_000_000_000` is
275/// `3600_000_000_000` ns, which corresponds to 1 hour. This matches with the timezone
276/// offset for "Europe/Brussels" for this date.
277///
278/// Note that the offset varies with daylight savings time (DST), which makes this tricky! For
279/// example, timezone "Europe/Brussels" has a 2-hour offset during DST and a 1-hour offset
280/// when DST ends.
281///
282/// Consequently, DataFusion can represent the timestamp in local time (with no offset or
283/// timezone information) as
284///
285/// ```text
286/// TimestampNanosecond(Some(1_553_994_000_000_000_000), None)
287/// ```
288///
289/// which is displayed as follows in datafusion-cli:
290///
291/// ```text
292/// 2019-03-31T01:00:00
293/// ```
294///
295/// See `test_adjust_to_local_time()` for example
296fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
297    fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
298    where
299        F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
300    {
301        match converter(ts) {
302            MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
303                "Ambiguous timestamp. Do you mean {:?} or {:?}",
304                earliest,
305                latest
306            ),
307            MappedLocalTime::None => exec_err!(
308                "The local time does not exist because there is a gap in the local time."
309            ),
310            MappedLocalTime::Single(date_time) => Ok(date_time),
311        }
312    }
313
314    let date_time = match T::UNIT {
315        Nanosecond => Utc.timestamp_nanos(ts),
316        Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
317        Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
318        Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
319    };
320
321    let offset_seconds: i64 = tz
322        .offset_from_utc_datetime(&date_time.naive_utc())
323        .fix()
324        .local_minus_utc() as i64;
325
326    let adjusted_date_time = date_time.add(
327        // This should not fail under normal circumstances as the
328        // maximum possible offset is 26 hours (93,600 seconds)
329        TimeDelta::try_seconds(offset_seconds)
330            .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
331    );
332
333    // convert the naive datetime back to i64
334    match T::UNIT {
335        Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(||
336            internal_datafusion_err!(
337                "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
338            )
339        ),
340        Microsecond => Ok(adjusted_date_time.timestamp_micros()),
341        Millisecond => Ok(adjusted_date_time.timestamp_millis()),
342        Second => Ok(adjusted_date_time.timestamp()),
343    }
344}
345
346impl ScalarUDFImpl for ToLocalTimeFunc {
347    fn as_any(&self) -> &dyn Any {
348        self
349    }
350
351    fn name(&self) -> &str {
352        "to_local_time"
353    }
354
355    fn signature(&self) -> &Signature {
356        &self.signature
357    }
358
359    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
360        let [time_value] = take_function_args(self.name(), arg_types)?;
361
362        match time_value {
363            Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
364            _ => exec_err!(
365                "The to_local_time function can only accept timestamp as the arg, got {:?}", time_value
366            )
367        }
368    }
369
370    fn invoke_with_args(
371        &self,
372        args: datafusion_expr::ScalarFunctionArgs,
373    ) -> Result<ColumnarValue> {
374        let [time_value] = take_function_args(self.name(), args.args)?;
375
376        self.to_local_time(std::slice::from_ref(&time_value))
377    }
378
379    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
380        if arg_types.len() != 1 {
381            return plan_err!(
382                "to_local_time function requires 1 argument, got {:?}",
383                arg_types.len()
384            );
385        }
386
387        let first_arg = arg_types[0].clone();
388        match &first_arg {
389            DataType::Null => Ok(vec![Timestamp(Nanosecond, None)]),
390            Timestamp(Nanosecond, timezone) => {
391                Ok(vec![Timestamp(Nanosecond, timezone.clone())])
392            }
393            Timestamp(Microsecond, timezone) => {
394                Ok(vec![Timestamp(Microsecond, timezone.clone())])
395            }
396            Timestamp(Millisecond, timezone) => {
397                Ok(vec![Timestamp(Millisecond, timezone.clone())])
398            }
399            Timestamp(Second, timezone) => Ok(vec![Timestamp(Second, timezone.clone())]),
400            _ => plan_err!("The to_local_time function can only accept Timestamp as the arg got {first_arg}"),
401        }
402    }
403    fn documentation(&self) -> Option<&Documentation> {
404        self.doc()
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use std::sync::Arc;
411
412    use arrow::array::{types::TimestampNanosecondType, Array, TimestampNanosecondArray};
413    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
414    use arrow::datatypes::{DataType, Field, TimeUnit};
415    use chrono::NaiveDateTime;
416    use datafusion_common::config::ConfigOptions;
417    use datafusion_common::ScalarValue;
418    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
419
420    use super::{adjust_to_local_time, ToLocalTimeFunc};
421
422    #[test]
423    fn test_adjust_to_local_time() {
424        let timestamp_str = "2020-03-31T13:40:00";
425        let tz: arrow::array::timezone::Tz =
426            "America/New_York".parse().expect("Invalid timezone");
427
428        let timestamp = timestamp_str
429            .parse::<NaiveDateTime>()
430            .unwrap()
431            .and_local_timezone(tz) // this is in a local timezone
432            .unwrap()
433            .timestamp_nanos_opt()
434            .unwrap();
435
436        let expected_timestamp = timestamp_str
437            .parse::<NaiveDateTime>()
438            .unwrap()
439            .and_utc() // this is in UTC
440            .timestamp_nanos_opt()
441            .unwrap();
442
443        let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
444        assert_eq!(res, expected_timestamp);
445    }
446
447    #[test]
448    fn test_to_local_time_scalar() {
449        let timezone = Some("Europe/Brussels".into());
450        let timestamps_with_timezone = vec![
451            (
452                ScalarValue::TimestampNanosecond(
453                    Some(1_123_123_000_000_000_000),
454                    timezone.clone(),
455                ),
456                ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
457            ),
458            (
459                ScalarValue::TimestampMicrosecond(
460                    Some(1_123_123_000_000_000),
461                    timezone.clone(),
462                ),
463                ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
464            ),
465            (
466                ScalarValue::TimestampMillisecond(
467                    Some(1_123_123_000_000),
468                    timezone.clone(),
469                ),
470                ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
471            ),
472            (
473                ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
474                ScalarValue::TimestampSecond(Some(1_123_130_200), None),
475            ),
476        ];
477
478        for (input, expected) in timestamps_with_timezone {
479            test_to_local_time_helper(input, expected);
480        }
481    }
482
483    #[test]
484    fn test_timezone_with_daylight_savings() {
485        let timezone_str = "America/New_York";
486        let tz: arrow::array::timezone::Tz =
487            timezone_str.parse().expect("Invalid timezone");
488
489        // Test data:
490        // (
491        //    the string display of the input timestamp,
492        //    the i64 representation of the timestamp before adjustment in nanosecond,
493        //    the i64 representation of the timestamp after adjustment in nanosecond,
494        // )
495        let test_cases = vec![
496            (
497                // DST time
498                "2020-03-31T13:40:00",
499                1_585_676_400_000_000_000,
500                1_585_662_000_000_000_000,
501            ),
502            (
503                // End of DST
504                "2020-11-04T14:06:40",
505                1_604_516_800_000_000_000,
506                1_604_498_800_000_000_000,
507            ),
508        ];
509
510        for (
511            input_timestamp_str,
512            expected_input_timestamp,
513            expected_adjusted_timestamp,
514        ) in test_cases
515        {
516            let input_timestamp = input_timestamp_str
517                .parse::<NaiveDateTime>()
518                .unwrap()
519                .and_local_timezone(tz) // this is in a local timezone
520                .unwrap()
521                .timestamp_nanos_opt()
522                .unwrap();
523            assert_eq!(input_timestamp, expected_input_timestamp);
524
525            let expected_timestamp = input_timestamp_str
526                .parse::<NaiveDateTime>()
527                .unwrap()
528                .and_utc() // this is in UTC
529                .timestamp_nanos_opt()
530                .unwrap();
531            assert_eq!(expected_timestamp, expected_adjusted_timestamp);
532
533            let input = ScalarValue::TimestampNanosecond(
534                Some(input_timestamp),
535                Some(timezone_str.into()),
536            );
537            let expected =
538                ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
539            test_to_local_time_helper(input, expected)
540        }
541    }
542
543    fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
544        let arg_field = Field::new("a", input.data_type(), true).into();
545        let res = ToLocalTimeFunc::new()
546            .invoke_with_args(ScalarFunctionArgs {
547                args: vec![ColumnarValue::Scalar(input)],
548                arg_fields: vec![arg_field],
549                number_rows: 1,
550                return_field: Field::new("f", expected.data_type(), true).into(),
551                config_options: Arc::new(ConfigOptions::default()),
552            })
553            .unwrap();
554        match res {
555            ColumnarValue::Scalar(res) => {
556                assert_eq!(res, expected);
557            }
558            _ => panic!("unexpected return type"),
559        }
560    }
561
562    #[test]
563    fn test_to_local_time_timezones_array() {
564        let cases = [
565            (
566                vec![
567                    "2020-09-08T00:00:00",
568                    "2020-09-08T01:00:00",
569                    "2020-09-08T02:00:00",
570                    "2020-09-08T03:00:00",
571                    "2020-09-08T04:00:00",
572                ],
573                None::<Arc<str>>,
574                vec![
575                    "2020-09-08T00:00:00",
576                    "2020-09-08T01:00:00",
577                    "2020-09-08T02:00:00",
578                    "2020-09-08T03:00:00",
579                    "2020-09-08T04:00:00",
580                ],
581            ),
582            (
583                vec![
584                    "2020-09-08T00:00:00",
585                    "2020-09-08T01:00:00",
586                    "2020-09-08T02:00:00",
587                    "2020-09-08T03:00:00",
588                    "2020-09-08T04:00:00",
589                ],
590                Some("+01:00".into()),
591                vec![
592                    "2020-09-08T00:00:00",
593                    "2020-09-08T01:00:00",
594                    "2020-09-08T02:00:00",
595                    "2020-09-08T03:00:00",
596                    "2020-09-08T04:00:00",
597                ],
598            ),
599        ];
600
601        cases.iter().for_each(|(source, _tz_opt, expected)| {
602            let input = source
603                .iter()
604                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
605                .collect::<TimestampNanosecondArray>();
606            let right = expected
607                .iter()
608                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
609                .collect::<TimestampNanosecondArray>();
610            let batch_size = input.len();
611            let arg_field = Field::new("a", input.data_type().clone(), true).into();
612            let args = ScalarFunctionArgs {
613                args: vec![ColumnarValue::Array(Arc::new(input))],
614                arg_fields: vec![arg_field],
615                number_rows: batch_size,
616                return_field: Field::new(
617                    "f",
618                    DataType::Timestamp(TimeUnit::Nanosecond, None),
619                    true,
620                )
621                .into(),
622                config_options: Arc::new(ConfigOptions::default()),
623            };
624            let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
625            if let ColumnarValue::Array(result) = result {
626                assert_eq!(
627                    result.data_type(),
628                    &DataType::Timestamp(TimeUnit::Nanosecond, None)
629                );
630                let left = arrow::array::cast::as_primitive_array::<
631                    TimestampNanosecondType,
632                >(&result);
633                assert_eq!(left, &right);
634            } else {
635                panic!("unexpected column type");
636            }
637        });
638    }
639}