datafusion_functions/datetime/
to_local_time.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ops::Add;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{Array, ArrayRef, PrimitiveBuilder};
24use arrow::datatypes::DataType::Timestamp;
25use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
26use arrow::datatypes::{
27    ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
28    TimestampNanosecondType, TimestampSecondType,
29};
30use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
31
32use datafusion_common::cast::as_primitive_array;
33use datafusion_common::{
34    exec_err, plan_err, utils::take_function_args, DataFusionError, Result, ScalarValue,
35};
36use datafusion_expr::{
37    ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
38};
39use datafusion_macros::user_doc;
40
41/// A UDF function that converts a timezone-aware timestamp to local time (with no offset or
42/// timezone information). In other words, this function strips off the timezone from the timestamp,
43/// while keep the display value of the timestamp the same.
44#[user_doc(
45    doc_section(label = "Time and Date Functions"),
46    description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
47    syntax_example = "to_local_time(expression)",
48    sql_example = r#"```sql
49> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
50+---------------------------------------------+
51| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
52+---------------------------------------------+
53| 2024-04-01T00:00:20                         |
54+---------------------------------------------+
55
56> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
57+---------------------------------------------+
58| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
59+---------------------------------------------+
60| 2024-04-01T00:00:20                         |
61+---------------------------------------------+
62
63> SELECT
64  time,
65  arrow_typeof(time) as type,
66  to_local_time(time) as to_local_time,
67  arrow_typeof(to_local_time(time)) as to_local_time_type
68FROM (
69  SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
70);
71+---------------------------+------------------------------------------------+---------------------+-----------------------------+
72| time                      | type                                           | to_local_time       | to_local_time_type          |
73+---------------------------+------------------------------------------------+---------------------+-----------------------------+
74| 2024-04-01T00:00:20+02:00 | Timestamp(Nanosecond, Some("Europe/Brussels")) | 2024-04-01T00:00:20 | Timestamp(Nanosecond, None) |
75+---------------------------+------------------------------------------------+---------------------+-----------------------------+
76
77# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
78# than UTC boundaries
79
80> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
81+---------------------+
82| date_bin            |
83+---------------------+
84| 2024-04-01T00:00:00 |
85+---------------------+
86
87> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
88+---------------------------+
89| date_bin_with_timezone    |
90+---------------------------+
91| 2024-04-01T00:00:00+02:00 |
92+---------------------------+
93```"#,
94    argument(
95        name = "expression",
96        description = "Time expression to operate on. Can be a constant, column, or function."
97    )
98)]
99#[derive(Debug, PartialEq, Eq, Hash)]
100pub struct ToLocalTimeFunc {
101    signature: Signature,
102}
103
104impl Default for ToLocalTimeFunc {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110impl ToLocalTimeFunc {
111    pub fn new() -> Self {
112        Self {
113            signature: Signature::user_defined(Volatility::Immutable),
114        }
115    }
116
117    fn to_local_time(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
118        let [time_value] = take_function_args(self.name(), args)?;
119
120        let arg_type = time_value.data_type();
121        match arg_type {
122            Timestamp(_, None) => {
123                // if no timezone specified, just return the input
124                Ok(time_value.clone())
125            }
126            // If has timezone, adjust the underlying time value. The current time value
127            // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
128            // we need to adjust the time value to the local time. See [`adjust_to_local_time`]
129            // for more details.
130            //
131            // Then remove the timezone in return type, i.e. return None
132            Timestamp(_, Some(timezone)) => {
133                let tz: Tz = timezone.parse()?;
134
135                match time_value {
136                    ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
137                        Some(ts),
138                        Some(_),
139                    )) => {
140                        let adjusted_ts =
141                            adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
142                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
143                            Some(adjusted_ts),
144                            None,
145                        )))
146                    }
147                    ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
148                        Some(ts),
149                        Some(_),
150                    )) => {
151                        let adjusted_ts =
152                            adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
153                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
154                            Some(adjusted_ts),
155                            None,
156                        )))
157                    }
158                    ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
159                        Some(ts),
160                        Some(_),
161                    )) => {
162                        let adjusted_ts =
163                            adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
164                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
165                            Some(adjusted_ts),
166                            None,
167                        )))
168                    }
169                    ColumnarValue::Scalar(ScalarValue::TimestampSecond(
170                        Some(ts),
171                        Some(_),
172                    )) => {
173                        let adjusted_ts =
174                            adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
175                        Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
176                            Some(adjusted_ts),
177                            None,
178                        )))
179                    }
180                    ColumnarValue::Array(array) => {
181                        fn transform_array<T: ArrowTimestampType>(
182                            array: &ArrayRef,
183                            tz: Tz,
184                        ) -> Result<ColumnarValue> {
185                            let mut builder = PrimitiveBuilder::<T>::new();
186
187                            let primitive_array = as_primitive_array::<T>(array)?;
188                            for ts_opt in primitive_array.iter() {
189                                match ts_opt {
190                                    None => builder.append_null(),
191                                    Some(ts) => {
192                                        let adjusted_ts: i64 =
193                                            adjust_to_local_time::<T>(ts, tz)?;
194                                        builder.append_value(adjusted_ts)
195                                    }
196                                }
197                            }
198
199                            Ok(ColumnarValue::Array(Arc::new(builder.finish())))
200                        }
201
202                        match array.data_type() {
203                            Timestamp(_, None) => {
204                                // if no timezone specified, just return the input
205                                Ok(time_value.clone())
206                            }
207                            Timestamp(Nanosecond, Some(_)) => {
208                                transform_array::<TimestampNanosecondType>(array, tz)
209                            }
210                            Timestamp(Microsecond, Some(_)) => {
211                                transform_array::<TimestampMicrosecondType>(array, tz)
212                            }
213                            Timestamp(Millisecond, Some(_)) => {
214                                transform_array::<TimestampMillisecondType>(array, tz)
215                            }
216                            Timestamp(Second, Some(_)) => {
217                                transform_array::<TimestampSecondType>(array, tz)
218                            }
219                            _ => {
220                                exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type())
221                            }
222                        }
223                    }
224                    _ => {
225                        exec_err!(
226                        "to_local_time function requires timestamp argument, got {:?}",
227                        time_value.data_type()
228                    )
229                    }
230                }
231            }
232            _ => {
233                exec_err!(
234                    "to_local_time function requires timestamp argument, got {:?}",
235                    arg_type
236                )
237            }
238        }
239    }
240}
241
242/// This function converts a timestamp with a timezone to a timestamp without a timezone.
243/// The display value of the adjusted timestamp remain the same, but the underlying timestamp
244/// representation is adjusted according to the relative timezone offset to UTC.
245///
246/// This function uses chrono to handle daylight saving time changes.
247///
248/// For example,
249///
250/// ```text
251/// '2019-03-31T01:00:00Z'::timestamp at time zone 'Europe/Brussels'
252/// ```
253///
254/// is displayed as follows in datafusion-cli:
255///
256/// ```text
257/// 2019-03-31T01:00:00+01:00
258/// ```
259///
260/// and is represented in DataFusion as:
261///
262/// ```text
263/// TimestampNanosecond(Some(1_553_990_400_000_000_000), Some("Europe/Brussels"))
264/// ```
265///
266/// To strip off the timezone while keeping the display value the same, we need to
267/// adjust the underlying timestamp with the timezone offset value using `adjust_to_local_time()`
268///
269/// ```text
270/// adjust_to_local_time(1_553_990_400_000_000_000, "Europe/Brussels") --> 1_553_994_000_000_000_000
271/// ```
272///
273/// The difference between `1_553_990_400_000_000_000` and `1_553_994_000_000_000_000` is
274/// `3600_000_000_000` ns, which corresponds to 1 hour. This matches with the timezone
275/// offset for "Europe/Brussels" for this date.
276///
277/// Note that the offset varies with daylight savings time (DST), which makes this tricky! For
278/// example, timezone "Europe/Brussels" has a 2-hour offset during DST and a 1-hour offset
279/// when DST ends.
280///
281/// Consequently, DataFusion can represent the timestamp in local time (with no offset or
282/// timezone information) as
283///
284/// ```text
285/// TimestampNanosecond(Some(1_553_994_000_000_000_000), None)
286/// ```
287///
288/// which is displayed as follows in datafusion-cli:
289///
290/// ```text
291/// 2019-03-31T01:00:00
292/// ```
293///
294/// See `test_adjust_to_local_time()` for example
295fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
296    fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
297    where
298        F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
299    {
300        match converter(ts) {
301            MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
302                "Ambiguous timestamp. Do you mean {:?} or {:?}",
303                earliest,
304                latest
305            ),
306            MappedLocalTime::None => exec_err!(
307                "The local time does not exist because there is a gap in the local time."
308            ),
309            MappedLocalTime::Single(date_time) => Ok(date_time),
310        }
311    }
312
313    let date_time = match T::UNIT {
314        Nanosecond => Utc.timestamp_nanos(ts),
315        Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
316        Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
317        Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
318    };
319
320    let offset_seconds: i64 = tz
321        .offset_from_utc_datetime(&date_time.naive_utc())
322        .fix()
323        .local_minus_utc() as i64;
324
325    let adjusted_date_time = date_time.add(
326        // This should not fail under normal circumstances as the
327        // maximum possible offset is 26 hours (93,600 seconds)
328        TimeDelta::try_seconds(offset_seconds)
329            .ok_or(DataFusionError::Internal("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000".to_string()))?,
330    );
331
332    // convert the naive datetime back to i64
333    match T::UNIT {
334        Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or(
335            DataFusionError::Internal(
336                "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807".to_string(),
337            ),
338        ),
339        Microsecond => Ok(adjusted_date_time.timestamp_micros()),
340        Millisecond => Ok(adjusted_date_time.timestamp_millis()),
341        Second => Ok(adjusted_date_time.timestamp()),
342    }
343}
344
345impl ScalarUDFImpl for ToLocalTimeFunc {
346    fn as_any(&self) -> &dyn Any {
347        self
348    }
349
350    fn name(&self) -> &str {
351        "to_local_time"
352    }
353
354    fn signature(&self) -> &Signature {
355        &self.signature
356    }
357
358    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
359        let [time_value] = take_function_args(self.name(), arg_types)?;
360
361        match time_value {
362            Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
363            _ => exec_err!(
364                "The to_local_time function can only accept timestamp as the arg, got {:?}", time_value
365            )
366        }
367    }
368
369    fn invoke_with_args(
370        &self,
371        args: datafusion_expr::ScalarFunctionArgs,
372    ) -> Result<ColumnarValue> {
373        let [time_value] = take_function_args(self.name(), args.args)?;
374
375        self.to_local_time(std::slice::from_ref(&time_value))
376    }
377
378    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
379        if arg_types.len() != 1 {
380            return plan_err!(
381                "to_local_time function requires 1 argument, got {:?}",
382                arg_types.len()
383            );
384        }
385
386        let first_arg = arg_types[0].clone();
387        match &first_arg {
388            Timestamp(Nanosecond, timezone) => {
389                Ok(vec![Timestamp(Nanosecond, timezone.clone())])
390            }
391            Timestamp(Microsecond, timezone) => {
392                Ok(vec![Timestamp(Microsecond, timezone.clone())])
393            }
394            Timestamp(Millisecond, timezone) => {
395                Ok(vec![Timestamp(Millisecond, timezone.clone())])
396            }
397            Timestamp(Second, timezone) => Ok(vec![Timestamp(Second, timezone.clone())]),
398            _ => plan_err!("The to_local_time function can only accept Timestamp as the arg got {first_arg}"),
399        }
400    }
401    fn documentation(&self) -> Option<&Documentation> {
402        self.doc()
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use std::sync::Arc;
409
410    use arrow::array::{types::TimestampNanosecondType, Array, TimestampNanosecondArray};
411    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
412    use arrow::datatypes::{DataType, Field, TimeUnit};
413    use chrono::NaiveDateTime;
414    use datafusion_common::config::ConfigOptions;
415    use datafusion_common::ScalarValue;
416    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
417
418    use super::{adjust_to_local_time, ToLocalTimeFunc};
419
420    #[test]
421    fn test_adjust_to_local_time() {
422        let timestamp_str = "2020-03-31T13:40:00";
423        let tz: arrow::array::timezone::Tz =
424            "America/New_York".parse().expect("Invalid timezone");
425
426        let timestamp = timestamp_str
427            .parse::<NaiveDateTime>()
428            .unwrap()
429            .and_local_timezone(tz) // this is in a local timezone
430            .unwrap()
431            .timestamp_nanos_opt()
432            .unwrap();
433
434        let expected_timestamp = timestamp_str
435            .parse::<NaiveDateTime>()
436            .unwrap()
437            .and_utc() // this is in UTC
438            .timestamp_nanos_opt()
439            .unwrap();
440
441        let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
442        assert_eq!(res, expected_timestamp);
443    }
444
445    #[test]
446    fn test_to_local_time_scalar() {
447        let timezone = Some("Europe/Brussels".into());
448        let timestamps_with_timezone = vec![
449            (
450                ScalarValue::TimestampNanosecond(
451                    Some(1_123_123_000_000_000_000),
452                    timezone.clone(),
453                ),
454                ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
455            ),
456            (
457                ScalarValue::TimestampMicrosecond(
458                    Some(1_123_123_000_000_000),
459                    timezone.clone(),
460                ),
461                ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
462            ),
463            (
464                ScalarValue::TimestampMillisecond(
465                    Some(1_123_123_000_000),
466                    timezone.clone(),
467                ),
468                ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
469            ),
470            (
471                ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
472                ScalarValue::TimestampSecond(Some(1_123_130_200), None),
473            ),
474        ];
475
476        for (input, expected) in timestamps_with_timezone {
477            test_to_local_time_helper(input, expected);
478        }
479    }
480
481    #[test]
482    fn test_timezone_with_daylight_savings() {
483        let timezone_str = "America/New_York";
484        let tz: arrow::array::timezone::Tz =
485            timezone_str.parse().expect("Invalid timezone");
486
487        // Test data:
488        // (
489        //    the string display of the input timestamp,
490        //    the i64 representation of the timestamp before adjustment in nanosecond,
491        //    the i64 representation of the timestamp after adjustment in nanosecond,
492        // )
493        let test_cases = vec![
494            (
495                // DST time
496                "2020-03-31T13:40:00",
497                1_585_676_400_000_000_000,
498                1_585_662_000_000_000_000,
499            ),
500            (
501                // End of DST
502                "2020-11-04T14:06:40",
503                1_604_516_800_000_000_000,
504                1_604_498_800_000_000_000,
505            ),
506        ];
507
508        for (
509            input_timestamp_str,
510            expected_input_timestamp,
511            expected_adjusted_timestamp,
512        ) in test_cases
513        {
514            let input_timestamp = input_timestamp_str
515                .parse::<NaiveDateTime>()
516                .unwrap()
517                .and_local_timezone(tz) // this is in a local timezone
518                .unwrap()
519                .timestamp_nanos_opt()
520                .unwrap();
521            assert_eq!(input_timestamp, expected_input_timestamp);
522
523            let expected_timestamp = input_timestamp_str
524                .parse::<NaiveDateTime>()
525                .unwrap()
526                .and_utc() // this is in UTC
527                .timestamp_nanos_opt()
528                .unwrap();
529            assert_eq!(expected_timestamp, expected_adjusted_timestamp);
530
531            let input = ScalarValue::TimestampNanosecond(
532                Some(input_timestamp),
533                Some(timezone_str.into()),
534            );
535            let expected =
536                ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
537            test_to_local_time_helper(input, expected)
538        }
539    }
540
541    fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
542        let arg_field = Field::new("a", input.data_type(), true).into();
543        let res = ToLocalTimeFunc::new()
544            .invoke_with_args(ScalarFunctionArgs {
545                args: vec![ColumnarValue::Scalar(input)],
546                arg_fields: vec![arg_field],
547                number_rows: 1,
548                return_field: Field::new("f", expected.data_type(), true).into(),
549                config_options: Arc::new(ConfigOptions::default()),
550            })
551            .unwrap();
552        match res {
553            ColumnarValue::Scalar(res) => {
554                assert_eq!(res, expected);
555            }
556            _ => panic!("unexpected return type"),
557        }
558    }
559
560    #[test]
561    fn test_to_local_time_timezones_array() {
562        let cases = [
563            (
564                vec![
565                    "2020-09-08T00:00:00",
566                    "2020-09-08T01:00:00",
567                    "2020-09-08T02:00:00",
568                    "2020-09-08T03:00:00",
569                    "2020-09-08T04:00:00",
570                ],
571                None::<Arc<str>>,
572                vec![
573                    "2020-09-08T00:00:00",
574                    "2020-09-08T01:00:00",
575                    "2020-09-08T02:00:00",
576                    "2020-09-08T03:00:00",
577                    "2020-09-08T04:00:00",
578                ],
579            ),
580            (
581                vec![
582                    "2020-09-08T00:00:00",
583                    "2020-09-08T01:00:00",
584                    "2020-09-08T02:00:00",
585                    "2020-09-08T03:00:00",
586                    "2020-09-08T04:00:00",
587                ],
588                Some("+01:00".into()),
589                vec![
590                    "2020-09-08T00:00:00",
591                    "2020-09-08T01:00:00",
592                    "2020-09-08T02:00:00",
593                    "2020-09-08T03:00:00",
594                    "2020-09-08T04:00:00",
595                ],
596            ),
597        ];
598
599        cases.iter().for_each(|(source, _tz_opt, expected)| {
600            let input = source
601                .iter()
602                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
603                .collect::<TimestampNanosecondArray>();
604            let right = expected
605                .iter()
606                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
607                .collect::<TimestampNanosecondArray>();
608            let batch_size = input.len();
609            let arg_field = Field::new("a", input.data_type().clone(), true).into();
610            let args = ScalarFunctionArgs {
611                args: vec![ColumnarValue::Array(Arc::new(input))],
612                arg_fields: vec![arg_field],
613                number_rows: batch_size,
614                return_field: Field::new(
615                    "f",
616                    DataType::Timestamp(TimeUnit::Nanosecond, None),
617                    true,
618                )
619                .into(),
620                config_options: Arc::new(ConfigOptions::default()),
621            };
622            let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
623            if let ColumnarValue::Array(result) = result {
624                assert_eq!(
625                    result.data_type(),
626                    &DataType::Timestamp(TimeUnit::Nanosecond, None)
627                );
628                let left = arrow::array::cast::as_primitive_array::<
629                    TimestampNanosecondType,
630                >(&result);
631                assert_eq!(left, &right);
632            } else {
633                panic!("unexpected column type");
634            }
635        });
636    }
637}