datafusion_functions/datetime/
to_local_time.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::ops::Add;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{ArrayRef, PrimitiveBuilder};
24use arrow::datatypes::DataType::Timestamp;
25use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
26use arrow::datatypes::{
27    ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
28    TimestampNanosecondType, TimestampSecondType,
29};
30use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
31
32use datafusion_common::cast::as_primitive_array;
33use datafusion_common::{
34    Result, ScalarValue, exec_err, internal_datafusion_err, internal_err,
35    utils::take_function_args,
36};
37use datafusion_expr::{
38    Coercion, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignatureClass,
39    Volatility,
40};
41use datafusion_macros::user_doc;
42
43/// A UDF function that converts a timezone-aware timestamp to local time (with no offset or
44/// timezone information). In other words, this function strips off the timezone from the timestamp,
45/// while keep the display value of the timestamp the same.
46#[user_doc(
47    doc_section(label = "Time and Date Functions"),
48    description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
49    syntax_example = "to_local_time(expression)",
50    sql_example = r#"```sql
51> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
52+---------------------------------------------+
53| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
54+---------------------------------------------+
55| 2024-04-01T00:00:20                         |
56+---------------------------------------------+
57
58> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
59+---------------------------------------------+
60| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
61+---------------------------------------------+
62| 2024-04-01T00:00:20                         |
63+---------------------------------------------+
64
65> SELECT
66  time,
67  arrow_typeof(time) as type,
68  to_local_time(time) as to_local_time,
69  arrow_typeof(to_local_time(time)) as to_local_time_type
70FROM (
71  SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
72);
73+---------------------------+----------------------------------+---------------------+--------------------+
74| time                      | type                             | to_local_time       | to_local_time_type |
75+---------------------------+----------------------------------+---------------------+--------------------+
76| 2024-04-01T00:00:20+02:00 | Timestamp(ns, "Europe/Brussels") | 2024-04-01T00:00:20 | Timestamp(ns)      |
77+---------------------------+----------------------------------+---------------------+--------------------+
78
79# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
80# than UTC boundaries
81
82> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
83+---------------------+
84| date_bin            |
85+---------------------+
86| 2024-04-01T00:00:00 |
87+---------------------+
88
89> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
90+---------------------------+
91| date_bin_with_timezone    |
92+---------------------------+
93| 2024-04-01T00:00:00+02:00 |
94+---------------------------+
95```"#,
96    argument(
97        name = "expression",
98        description = "Time expression to operate on. Can be a constant, column, or function."
99    )
100)]
101#[derive(Debug, PartialEq, Eq, Hash)]
102pub struct ToLocalTimeFunc {
103    signature: Signature,
104}
105
106impl Default for ToLocalTimeFunc {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112impl ToLocalTimeFunc {
113    pub fn new() -> Self {
114        Self {
115            signature: Signature::coercible(
116                vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
117                Volatility::Immutable,
118            ),
119        }
120    }
121}
122
123impl ScalarUDFImpl for ToLocalTimeFunc {
124    fn as_any(&self) -> &dyn Any {
125        self
126    }
127
128    fn name(&self) -> &str {
129        "to_local_time"
130    }
131
132    fn signature(&self) -> &Signature {
133        &self.signature
134    }
135
136    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
137        match &arg_types[0] {
138            DataType::Null => Ok(Timestamp(Nanosecond, None)),
139            Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
140            dt => internal_err!(
141                "The to_local_time function can only accept timestamp as the arg, got {dt}"
142            ),
143        }
144    }
145
146    fn invoke_with_args(
147        &self,
148        args: datafusion_expr::ScalarFunctionArgs,
149    ) -> Result<ColumnarValue> {
150        let [time_value] = take_function_args(self.name(), &args.args)?;
151        to_local_time(time_value)
152    }
153
154    fn documentation(&self) -> Option<&Documentation> {
155        self.doc()
156    }
157}
158
159fn transform_array<T: ArrowTimestampType>(
160    array: &ArrayRef,
161    tz: Tz,
162) -> Result<ColumnarValue> {
163    let primitive_array = as_primitive_array::<T>(array)?;
164    let mut builder = PrimitiveBuilder::<T>::with_capacity(primitive_array.len());
165    for ts_opt in primitive_array.iter() {
166        match ts_opt {
167            None => builder.append_null(),
168            Some(ts) => {
169                let adjusted_ts: i64 = adjust_to_local_time::<T>(ts, tz)?;
170                builder.append_value(adjusted_ts)
171            }
172        }
173    }
174
175    Ok(ColumnarValue::Array(Arc::new(builder.finish())))
176}
177
178fn to_local_time(time_value: &ColumnarValue) -> Result<ColumnarValue> {
179    let arg_type = time_value.data_type();
180
181    let tz: Tz = match &arg_type {
182        Timestamp(_, Some(timezone)) => timezone.parse()?,
183        Timestamp(_, None) => {
184            // if no timezone specified, just return the input
185            return Ok(time_value.clone());
186        }
187        DataType::Null => {
188            return Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
189                None, None,
190            )));
191        }
192        dt => {
193            return internal_err!(
194                "to_local_time function requires timestamp argument, got {dt}"
195            );
196        }
197    };
198
199    // If has timezone, adjust the underlying time value. The current time value
200    // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
201    // we need to adjust the time value to the local time. See [`adjust_to_local_time`]
202    // for more details.
203    //
204    // Then remove the timezone in return type, i.e. return None
205    match time_value {
206        ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, Some(_))) => Ok(
207            ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, None)),
208        ),
209        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, Some(_))) => Ok(
210            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, None)),
211        ),
212        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, Some(_))) => Ok(
213            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, None)),
214        ),
215        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, Some(_))) => Ok(
216            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, None)),
217        ),
218        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), Some(_))) => {
219            let adjusted_ts = adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
220            Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
221                Some(adjusted_ts),
222                None,
223            )))
224        }
225        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(ts), Some(_))) => {
226            let adjusted_ts = adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
227            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
228                Some(adjusted_ts),
229                None,
230            )))
231        }
232        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(ts), Some(_))) => {
233            let adjusted_ts = adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
234            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
235                Some(adjusted_ts),
236                None,
237            )))
238        }
239        ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), Some(_))) => {
240            let adjusted_ts = adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
241            Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
242                Some(adjusted_ts),
243                None,
244            )))
245        }
246        ColumnarValue::Array(array)
247            if matches!(array.data_type(), Timestamp(Nanosecond, Some(_))) =>
248        {
249            transform_array::<TimestampNanosecondType>(array, tz)
250        }
251        ColumnarValue::Array(array)
252            if matches!(array.data_type(), Timestamp(Microsecond, Some(_))) =>
253        {
254            transform_array::<TimestampMicrosecondType>(array, tz)
255        }
256        ColumnarValue::Array(array)
257            if matches!(array.data_type(), Timestamp(Millisecond, Some(_))) =>
258        {
259            transform_array::<TimestampMillisecondType>(array, tz)
260        }
261        ColumnarValue::Array(array)
262            if matches!(array.data_type(), Timestamp(Second, Some(_))) =>
263        {
264            transform_array::<TimestampSecondType>(array, tz)
265        }
266        _ => {
267            internal_err!(
268                "to_local_time function requires timestamp argument, got {arg_type}"
269            )
270        }
271    }
272}
273
274/// This function converts a timestamp with a timezone to a timestamp without a timezone.
275/// The display value of the adjusted timestamp remain the same, but the underlying timestamp
276/// representation is adjusted according to the relative timezone offset to UTC.
277///
278/// This function uses chrono to handle daylight saving time changes.
279///
280/// For example,
281///
282/// ```text
283/// '2019-03-31T01:00:00Z'::timestamp at time zone 'Europe/Brussels'
284/// ```
285///
286/// is displayed as follows in datafusion-cli:
287///
288/// ```text
289/// 2019-03-31T01:00:00+01:00
290/// ```
291///
292/// and is represented in DataFusion as:
293///
294/// ```text
295/// TimestampNanosecond(Some(1_553_990_400_000_000_000), Some("Europe/Brussels"))
296/// ```
297///
298/// To strip off the timezone while keeping the display value the same, we need to
299/// adjust the underlying timestamp with the timezone offset value using `adjust_to_local_time()`
300///
301/// ```text
302/// adjust_to_local_time(1_553_990_400_000_000_000, "Europe/Brussels") --> 1_553_994_000_000_000_000
303/// ```
304///
305/// The difference between `1_553_990_400_000_000_000` and `1_553_994_000_000_000_000` is
306/// `3600_000_000_000` ns, which corresponds to 1 hour. This matches with the timezone
307/// offset for "Europe/Brussels" for this date.
308///
309/// Note that the offset varies with daylight savings time (DST), which makes this tricky! For
310/// example, timezone "Europe/Brussels" has a 2-hour offset during DST and a 1-hour offset
311/// when DST ends.
312///
313/// Consequently, DataFusion can represent the timestamp in local time (with no offset or
314/// timezone information) as
315///
316/// ```text
317/// TimestampNanosecond(Some(1_553_994_000_000_000_000), None)
318/// ```
319///
320/// which is displayed as follows in datafusion-cli:
321///
322/// ```text
323/// 2019-03-31T01:00:00
324/// ```
325///
326/// See `test_adjust_to_local_time()` for example
327fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
328    fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
329    where
330        F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
331    {
332        match converter(ts) {
333            MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
334                "Ambiguous timestamp. Do you mean {:?} or {:?}",
335                earliest,
336                latest
337            ),
338            MappedLocalTime::None => exec_err!(
339                "The local time does not exist because there is a gap in the local time."
340            ),
341            MappedLocalTime::Single(date_time) => Ok(date_time),
342        }
343    }
344
345    let date_time = match T::UNIT {
346        Nanosecond => Utc.timestamp_nanos(ts),
347        Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
348        Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
349        Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
350    };
351
352    let offset_seconds: i64 = tz
353        .offset_from_utc_datetime(&date_time.naive_utc())
354        .fix()
355        .local_minus_utc() as i64;
356
357    let adjusted_date_time = date_time.add(
358        // This should not fail under normal circumstances as the
359        // maximum possible offset is 26 hours (93,600 seconds)
360        TimeDelta::try_seconds(offset_seconds)
361            .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
362    );
363
364    // convert the naive datetime back to i64
365    match T::UNIT {
366        Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(||
367            internal_datafusion_err!(
368                "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
369            )
370        ),
371        Microsecond => Ok(adjusted_date_time.timestamp_micros()),
372        Millisecond => Ok(adjusted_date_time.timestamp_millis()),
373        Second => Ok(adjusted_date_time.timestamp()),
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use std::sync::Arc;
380
381    use arrow::array::{Array, TimestampNanosecondArray, types::TimestampNanosecondType};
382    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
383    use arrow::datatypes::{DataType, Field, TimeUnit};
384    use chrono::NaiveDateTime;
385    use datafusion_common::ScalarValue;
386    use datafusion_common::config::ConfigOptions;
387    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
388
389    use super::{ToLocalTimeFunc, adjust_to_local_time};
390
391    #[test]
392    fn test_adjust_to_local_time() {
393        let timestamp_str = "2020-03-31T13:40:00";
394        let tz: arrow::array::timezone::Tz =
395            "America/New_York".parse().expect("Invalid timezone");
396
397        let timestamp = timestamp_str
398            .parse::<NaiveDateTime>()
399            .unwrap()
400            .and_local_timezone(tz) // this is in a local timezone
401            .unwrap()
402            .timestamp_nanos_opt()
403            .unwrap();
404
405        let expected_timestamp = timestamp_str
406            .parse::<NaiveDateTime>()
407            .unwrap()
408            .and_utc() // this is in UTC
409            .timestamp_nanos_opt()
410            .unwrap();
411
412        let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
413        assert_eq!(res, expected_timestamp);
414    }
415
416    #[test]
417    fn test_to_local_time_scalar() {
418        let timezone = Some("Europe/Brussels".into());
419        let timestamps_with_timezone = vec![
420            (
421                ScalarValue::TimestampNanosecond(
422                    Some(1_123_123_000_000_000_000),
423                    timezone.clone(),
424                ),
425                ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
426            ),
427            (
428                ScalarValue::TimestampMicrosecond(
429                    Some(1_123_123_000_000_000),
430                    timezone.clone(),
431                ),
432                ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
433            ),
434            (
435                ScalarValue::TimestampMillisecond(
436                    Some(1_123_123_000_000),
437                    timezone.clone(),
438                ),
439                ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
440            ),
441            (
442                ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
443                ScalarValue::TimestampSecond(Some(1_123_130_200), None),
444            ),
445        ];
446
447        for (input, expected) in timestamps_with_timezone {
448            test_to_local_time_helper(input, expected);
449        }
450    }
451
452    #[test]
453    fn test_timezone_with_daylight_savings() {
454        let timezone_str = "America/New_York";
455        let tz: arrow::array::timezone::Tz =
456            timezone_str.parse().expect("Invalid timezone");
457
458        // Test data:
459        // (
460        //    the string display of the input timestamp,
461        //    the i64 representation of the timestamp before adjustment in nanosecond,
462        //    the i64 representation of the timestamp after adjustment in nanosecond,
463        // )
464        let test_cases = vec![
465            (
466                // DST time
467                "2020-03-31T13:40:00",
468                1_585_676_400_000_000_000,
469                1_585_662_000_000_000_000,
470            ),
471            (
472                // End of DST
473                "2020-11-04T14:06:40",
474                1_604_516_800_000_000_000,
475                1_604_498_800_000_000_000,
476            ),
477        ];
478
479        for (
480            input_timestamp_str,
481            expected_input_timestamp,
482            expected_adjusted_timestamp,
483        ) in test_cases
484        {
485            let input_timestamp = input_timestamp_str
486                .parse::<NaiveDateTime>()
487                .unwrap()
488                .and_local_timezone(tz) // this is in a local timezone
489                .unwrap()
490                .timestamp_nanos_opt()
491                .unwrap();
492            assert_eq!(input_timestamp, expected_input_timestamp);
493
494            let expected_timestamp = input_timestamp_str
495                .parse::<NaiveDateTime>()
496                .unwrap()
497                .and_utc() // this is in UTC
498                .timestamp_nanos_opt()
499                .unwrap();
500            assert_eq!(expected_timestamp, expected_adjusted_timestamp);
501
502            let input = ScalarValue::TimestampNanosecond(
503                Some(input_timestamp),
504                Some(timezone_str.into()),
505            );
506            let expected =
507                ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
508            test_to_local_time_helper(input, expected)
509        }
510    }
511
512    fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
513        let arg_field = Field::new("a", input.data_type(), true).into();
514        let res = ToLocalTimeFunc::new()
515            .invoke_with_args(ScalarFunctionArgs {
516                args: vec![ColumnarValue::Scalar(input)],
517                arg_fields: vec![arg_field],
518                number_rows: 1,
519                return_field: Field::new("f", expected.data_type(), true).into(),
520                config_options: Arc::new(ConfigOptions::default()),
521            })
522            .unwrap();
523        match res {
524            ColumnarValue::Scalar(res) => {
525                assert_eq!(res, expected);
526            }
527            _ => panic!("unexpected return type"),
528        }
529    }
530
531    #[test]
532    fn test_to_local_time_timezones_array() {
533        let cases = [
534            (
535                vec![
536                    "2020-09-08T00:00:00",
537                    "2020-09-08T01:00:00",
538                    "2020-09-08T02:00:00",
539                    "2020-09-08T03:00:00",
540                    "2020-09-08T04:00:00",
541                ],
542                None::<Arc<str>>,
543                vec![
544                    "2020-09-08T00:00:00",
545                    "2020-09-08T01:00:00",
546                    "2020-09-08T02:00:00",
547                    "2020-09-08T03:00:00",
548                    "2020-09-08T04:00:00",
549                ],
550            ),
551            (
552                vec![
553                    "2020-09-08T00:00:00",
554                    "2020-09-08T01:00:00",
555                    "2020-09-08T02:00:00",
556                    "2020-09-08T03:00:00",
557                    "2020-09-08T04:00:00",
558                ],
559                Some("+01:00".into()),
560                vec![
561                    "2020-09-08T00:00:00",
562                    "2020-09-08T01:00:00",
563                    "2020-09-08T02:00:00",
564                    "2020-09-08T03:00:00",
565                    "2020-09-08T04:00:00",
566                ],
567            ),
568        ];
569
570        cases.iter().for_each(|(source, _tz_opt, expected)| {
571            let input = source
572                .iter()
573                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
574                .collect::<TimestampNanosecondArray>();
575            let right = expected
576                .iter()
577                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
578                .collect::<TimestampNanosecondArray>();
579            let batch_size = input.len();
580            let arg_field = Field::new("a", input.data_type().clone(), true).into();
581            let args = ScalarFunctionArgs {
582                args: vec![ColumnarValue::Array(Arc::new(input))],
583                arg_fields: vec![arg_field],
584                number_rows: batch_size,
585                return_field: Field::new(
586                    "f",
587                    DataType::Timestamp(TimeUnit::Nanosecond, None),
588                    true,
589                )
590                .into(),
591                config_options: Arc::new(ConfigOptions::default()),
592            };
593            let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
594            if let ColumnarValue::Array(result) = result {
595                assert_eq!(
596                    result.data_type(),
597                    &DataType::Timestamp(TimeUnit::Nanosecond, None)
598                );
599                let left = arrow::array::cast::as_primitive_array::<
600                    TimestampNanosecondType,
601                >(&result);
602                assert_eq!(left, &right);
603            } else {
604                panic!("unexpected column type");
605            }
606        });
607    }
608}