Skip to main content

datafusion_functions/datetime/
to_local_time.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ops::Add;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{ArrayRef, PrimitiveArray};
23use arrow::datatypes::DataType::Timestamp;
24use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
25use arrow::datatypes::{
26    ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
27    TimestampNanosecondType, TimestampSecondType,
28};
29use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
30
31use datafusion_common::cast::as_primitive_array;
32use datafusion_common::{
33    Result, ScalarValue, exec_err, internal_datafusion_err, internal_err,
34    utils::take_function_args,
35};
36use datafusion_expr::{
37    Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
38    TypeSignatureClass, Volatility,
39};
40use datafusion_macros::user_doc;
41
42/// A UDF function that converts a timezone-aware timestamp to local time (with no offset or
43/// timezone information). In other words, this function strips off the timezone from the timestamp,
44/// while keep the display value of the timestamp the same.
45#[user_doc(
46    doc_section(label = "Time and Date Functions"),
47    description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
48    syntax_example = "to_local_time(expression)",
49    sql_example = r#"```sql
50> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
51+---------------------------------------------+
52| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
53+---------------------------------------------+
54| 2024-04-01T00:00:20                         |
55+---------------------------------------------+
56
57> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
58+---------------------------------------------+
59| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
60+---------------------------------------------+
61| 2024-04-01T00:00:20                         |
62+---------------------------------------------+
63
64> SELECT
65  time,
66  arrow_typeof(time) as type,
67  to_local_time(time) as to_local_time,
68  arrow_typeof(to_local_time(time)) as to_local_time_type
69FROM (
70  SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
71);
72+---------------------------+----------------------------------+---------------------+--------------------+
73| time                      | type                             | to_local_time       | to_local_time_type |
74+---------------------------+----------------------------------+---------------------+--------------------+
75| 2024-04-01T00:00:20+02:00 | Timestamp(ns, "Europe/Brussels") | 2024-04-01T00:00:20 | Timestamp(ns)      |
76+---------------------------+----------------------------------+---------------------+--------------------+
77
78# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
79# than UTC boundaries
80
81> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
82+---------------------+
83| date_bin            |
84+---------------------+
85| 2024-04-01T00:00:00 |
86+---------------------+
87
88> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
89+---------------------------+
90| date_bin_with_timezone    |
91+---------------------------+
92| 2024-04-01T00:00:00+02:00 |
93+---------------------------+
94```"#,
95    argument(
96        name = "expression",
97        description = "Time expression to operate on. Can be a constant, column, or function."
98    )
99)]
100#[derive(Debug, PartialEq, Eq, Hash)]
101pub struct ToLocalTimeFunc {
102    signature: Signature,
103}
104
105impl Default for ToLocalTimeFunc {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111impl ToLocalTimeFunc {
112    pub fn new() -> Self {
113        Self {
114            signature: Signature::coercible(
115                vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
116                Volatility::Immutable,
117            ),
118        }
119    }
120}
121
122impl ScalarUDFImpl for ToLocalTimeFunc {
123    fn name(&self) -> &str {
124        "to_local_time"
125    }
126
127    fn signature(&self) -> &Signature {
128        &self.signature
129    }
130
131    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
132        match &arg_types[0] {
133            DataType::Null => Ok(Timestamp(Nanosecond, None)),
134            Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
135            dt => internal_err!(
136                "The to_local_time function can only accept timestamp as the arg, got {dt}"
137            ),
138        }
139    }
140
141    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
142        let [time_value] = take_function_args(self.name(), &args.args)?;
143        to_local_time(time_value)
144    }
145
146    fn documentation(&self) -> Option<&Documentation> {
147        self.doc()
148    }
149}
150
151fn transform_array<T: ArrowTimestampType>(
152    array: &ArrayRef,
153    tz: Tz,
154) -> Result<ColumnarValue> {
155    let primitive_array = as_primitive_array::<T>(array)?;
156    let result: PrimitiveArray<T> =
157        primitive_array.try_unary(|ts| adjust_to_local_time::<T>(ts, tz))?;
158    Ok(ColumnarValue::Array(Arc::new(result)))
159}
160
161fn to_local_time(time_value: &ColumnarValue) -> Result<ColumnarValue> {
162    let arg_type = time_value.data_type();
163
164    let tz: Tz = match &arg_type {
165        Timestamp(_, Some(timezone)) => timezone.parse()?,
166        Timestamp(_, None) => {
167            // if no timezone specified, just return the input
168            return Ok(time_value.clone());
169        }
170        DataType::Null => {
171            return Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
172                None, None,
173            )));
174        }
175        dt => {
176            return internal_err!(
177                "to_local_time function requires timestamp argument, got {dt}"
178            );
179        }
180    };
181
182    // If has timezone, adjust the underlying time value. The current time value
183    // is stored as i64 in UTC, even though the timezone may not be in UTC. Therefore,
184    // we need to adjust the time value to the local time. See [`adjust_to_local_time`]
185    // for more details.
186    //
187    // Then remove the timezone in return type, i.e. return None
188    match time_value {
189        ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, Some(_))) => Ok(
190            ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, None)),
191        ),
192        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, Some(_))) => Ok(
193            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, None)),
194        ),
195        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, Some(_))) => Ok(
196            ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, None)),
197        ),
198        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, Some(_))) => Ok(
199            ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, None)),
200        ),
201        ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), Some(_))) => {
202            let adjusted_ts = adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
203            Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
204                Some(adjusted_ts),
205                None,
206            )))
207        }
208        ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(ts), Some(_))) => {
209            let adjusted_ts = adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
210            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
211                Some(adjusted_ts),
212                None,
213            )))
214        }
215        ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(ts), Some(_))) => {
216            let adjusted_ts = adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
217            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
218                Some(adjusted_ts),
219                None,
220            )))
221        }
222        ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), Some(_))) => {
223            let adjusted_ts = adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
224            Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
225                Some(adjusted_ts),
226                None,
227            )))
228        }
229        ColumnarValue::Array(array)
230            if matches!(array.data_type(), Timestamp(Nanosecond, Some(_))) =>
231        {
232            transform_array::<TimestampNanosecondType>(array, tz)
233        }
234        ColumnarValue::Array(array)
235            if matches!(array.data_type(), Timestamp(Microsecond, Some(_))) =>
236        {
237            transform_array::<TimestampMicrosecondType>(array, tz)
238        }
239        ColumnarValue::Array(array)
240            if matches!(array.data_type(), Timestamp(Millisecond, Some(_))) =>
241        {
242            transform_array::<TimestampMillisecondType>(array, tz)
243        }
244        ColumnarValue::Array(array)
245            if matches!(array.data_type(), Timestamp(Second, Some(_))) =>
246        {
247            transform_array::<TimestampSecondType>(array, tz)
248        }
249        _ => {
250            internal_err!(
251                "to_local_time function requires timestamp argument, got {arg_type}"
252            )
253        }
254    }
255}
256
257/// This function converts a timestamp with a timezone to a timestamp without a timezone.
258/// The display value of the adjusted timestamp remain the same, but the underlying timestamp
259/// representation is adjusted according to the relative timezone offset to UTC.
260///
261/// This function uses chrono to handle daylight saving time changes.
262///
263/// For example,
264///
265/// ```text
266/// '2019-03-31T01:00:00Z'::timestamp at time zone 'Europe/Brussels'
267/// ```
268///
269/// is displayed as follows in datafusion-cli:
270///
271/// ```text
272/// 2019-03-31T01:00:00+01:00
273/// ```
274///
275/// and is represented in DataFusion as:
276///
277/// ```text
278/// TimestampNanosecond(Some(1_553_990_400_000_000_000), Some("Europe/Brussels"))
279/// ```
280///
281/// To strip off the timezone while keeping the display value the same, we need to
282/// adjust the underlying timestamp with the timezone offset value using `adjust_to_local_time()`
283///
284/// ```text
285/// adjust_to_local_time(1_553_990_400_000_000_000, "Europe/Brussels") --> 1_553_994_000_000_000_000
286/// ```
287///
288/// The difference between `1_553_990_400_000_000_000` and `1_553_994_000_000_000_000` is
289/// `3600_000_000_000` ns, which corresponds to 1 hour. This matches with the timezone
290/// offset for "Europe/Brussels" for this date.
291///
292/// Note that the offset varies with daylight savings time (DST), which makes this tricky! For
293/// example, timezone "Europe/Brussels" has a 2-hour offset during DST and a 1-hour offset
294/// when DST ends.
295///
296/// Consequently, DataFusion can represent the timestamp in local time (with no offset or
297/// timezone information) as
298///
299/// ```text
300/// TimestampNanosecond(Some(1_553_994_000_000_000_000), None)
301/// ```
302///
303/// which is displayed as follows in datafusion-cli:
304///
305/// ```text
306/// 2019-03-31T01:00:00
307/// ```
308///
309/// See `test_adjust_to_local_time()` for example
310pub fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
311    fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
312    where
313        F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
314    {
315        match converter(ts) {
316            MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
317                "Ambiguous timestamp. Do you mean {:?} or {:?}",
318                earliest,
319                latest
320            ),
321            MappedLocalTime::None => exec_err!(
322                "The local time does not exist because there is a gap in the local time."
323            ),
324            MappedLocalTime::Single(date_time) => Ok(date_time),
325        }
326    }
327
328    let date_time = match T::UNIT {
329        Nanosecond => Utc.timestamp_nanos(ts),
330        Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
331        Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
332        Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
333    };
334
335    let offset_seconds: i64 = tz
336        .offset_from_utc_datetime(&date_time.naive_utc())
337        .fix()
338        .local_minus_utc() as i64;
339
340    let adjusted_date_time = date_time.add(
341        // This should not fail under normal circumstances as the
342        // maximum possible offset is 26 hours (93,600 seconds)
343        TimeDelta::try_seconds(offset_seconds)
344            .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
345    );
346
347    // convert the naive datetime back to i64
348    match T::UNIT {
349        Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(||
350            internal_datafusion_err!(
351                "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
352            )
353        ),
354        Microsecond => Ok(adjusted_date_time.timestamp_micros()),
355        Millisecond => Ok(adjusted_date_time.timestamp_millis()),
356        Second => Ok(adjusted_date_time.timestamp()),
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use std::sync::Arc;
363
364    use arrow::array::{Array, TimestampNanosecondArray, types::TimestampNanosecondType};
365    use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
366    use arrow::datatypes::{DataType, Field, TimeUnit};
367    use chrono::NaiveDateTime;
368    use datafusion_common::ScalarValue;
369    use datafusion_common::config::ConfigOptions;
370    use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
371
372    use super::{ToLocalTimeFunc, adjust_to_local_time};
373
374    #[test]
375    fn test_adjust_to_local_time() {
376        let timestamp_str = "2020-03-31T13:40:00";
377        let tz: arrow::array::timezone::Tz =
378            "America/New_York".parse().expect("Invalid timezone");
379
380        let timestamp = timestamp_str
381            .parse::<NaiveDateTime>()
382            .unwrap()
383            .and_local_timezone(tz) // this is in a local timezone
384            .unwrap()
385            .timestamp_nanos_opt()
386            .unwrap();
387
388        let expected_timestamp = timestamp_str
389            .parse::<NaiveDateTime>()
390            .unwrap()
391            .and_utc() // this is in UTC
392            .timestamp_nanos_opt()
393            .unwrap();
394
395        let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
396        assert_eq!(res, expected_timestamp);
397    }
398
399    #[test]
400    fn test_to_local_time_scalar() {
401        let timezone = Some("Europe/Brussels".into());
402        let timestamps_with_timezone = vec![
403            (
404                ScalarValue::TimestampNanosecond(
405                    Some(1_123_123_000_000_000_000),
406                    timezone.clone(),
407                ),
408                ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
409            ),
410            (
411                ScalarValue::TimestampMicrosecond(
412                    Some(1_123_123_000_000_000),
413                    timezone.clone(),
414                ),
415                ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
416            ),
417            (
418                ScalarValue::TimestampMillisecond(
419                    Some(1_123_123_000_000),
420                    timezone.clone(),
421                ),
422                ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
423            ),
424            (
425                ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
426                ScalarValue::TimestampSecond(Some(1_123_130_200), None),
427            ),
428        ];
429
430        for (input, expected) in timestamps_with_timezone {
431            test_to_local_time_helper(input, expected);
432        }
433    }
434
435    #[test]
436    fn test_timezone_with_daylight_savings() {
437        let timezone_str = "America/New_York";
438        let tz: arrow::array::timezone::Tz =
439            timezone_str.parse().expect("Invalid timezone");
440
441        // Test data:
442        // (
443        //    the string display of the input timestamp,
444        //    the i64 representation of the timestamp before adjustment in nanosecond,
445        //    the i64 representation of the timestamp after adjustment in nanosecond,
446        // )
447        let test_cases = vec![
448            (
449                // DST time
450                "2020-03-31T13:40:00",
451                1_585_676_400_000_000_000,
452                1_585_662_000_000_000_000,
453            ),
454            (
455                // End of DST
456                "2020-11-04T14:06:40",
457                1_604_516_800_000_000_000,
458                1_604_498_800_000_000_000,
459            ),
460        ];
461
462        for (
463            input_timestamp_str,
464            expected_input_timestamp,
465            expected_adjusted_timestamp,
466        ) in test_cases
467        {
468            let input_timestamp = input_timestamp_str
469                .parse::<NaiveDateTime>()
470                .unwrap()
471                .and_local_timezone(tz) // this is in a local timezone
472                .unwrap()
473                .timestamp_nanos_opt()
474                .unwrap();
475            assert_eq!(input_timestamp, expected_input_timestamp);
476
477            let expected_timestamp = input_timestamp_str
478                .parse::<NaiveDateTime>()
479                .unwrap()
480                .and_utc() // this is in UTC
481                .timestamp_nanos_opt()
482                .unwrap();
483            assert_eq!(expected_timestamp, expected_adjusted_timestamp);
484
485            let input = ScalarValue::TimestampNanosecond(
486                Some(input_timestamp),
487                Some(timezone_str.into()),
488            );
489            let expected =
490                ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
491            test_to_local_time_helper(input, expected)
492        }
493    }
494
495    fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
496        let arg_field = Field::new("a", input.data_type(), true).into();
497        let res = ToLocalTimeFunc::new()
498            .invoke_with_args(ScalarFunctionArgs {
499                args: vec![ColumnarValue::Scalar(input)],
500                arg_fields: vec![arg_field],
501                number_rows: 1,
502                return_field: Field::new("f", expected.data_type(), true).into(),
503                config_options: Arc::new(ConfigOptions::default()),
504            })
505            .unwrap();
506        match res {
507            ColumnarValue::Scalar(res) => {
508                assert_eq!(res, expected);
509            }
510            _ => panic!("unexpected return type"),
511        }
512    }
513
514    #[test]
515    fn test_to_local_time_timezones_array() {
516        let cases = [
517            (
518                vec![
519                    "2020-09-08T00:00:00",
520                    "2020-09-08T01:00:00",
521                    "2020-09-08T02:00:00",
522                    "2020-09-08T03:00:00",
523                    "2020-09-08T04:00:00",
524                ],
525                None::<Arc<str>>,
526                vec![
527                    "2020-09-08T00:00:00",
528                    "2020-09-08T01:00:00",
529                    "2020-09-08T02:00:00",
530                    "2020-09-08T03:00:00",
531                    "2020-09-08T04:00:00",
532                ],
533            ),
534            (
535                vec![
536                    "2020-09-08T00:00:00",
537                    "2020-09-08T01:00:00",
538                    "2020-09-08T02:00:00",
539                    "2020-09-08T03:00:00",
540                    "2020-09-08T04:00:00",
541                ],
542                Some("+01:00".into()),
543                vec![
544                    "2020-09-08T00:00:00",
545                    "2020-09-08T01:00:00",
546                    "2020-09-08T02:00:00",
547                    "2020-09-08T03:00:00",
548                    "2020-09-08T04:00:00",
549                ],
550            ),
551        ];
552
553        cases.iter().for_each(|(source, _tz_opt, expected)| {
554            let input = source
555                .iter()
556                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
557                .collect::<TimestampNanosecondArray>();
558            let right = expected
559                .iter()
560                .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
561                .collect::<TimestampNanosecondArray>();
562            let batch_size = input.len();
563            let arg_field = Field::new("a", input.data_type().clone(), true).into();
564            let args = ScalarFunctionArgs {
565                args: vec![ColumnarValue::Array(Arc::new(input))],
566                arg_fields: vec![arg_field],
567                number_rows: batch_size,
568                return_field: Field::new(
569                    "f",
570                    DataType::Timestamp(TimeUnit::Nanosecond, None),
571                    true,
572                )
573                .into(),
574                config_options: Arc::new(ConfigOptions::default()),
575            };
576            let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
577            if let ColumnarValue::Array(result) = result {
578                assert_eq!(
579                    result.data_type(),
580                    &DataType::Timestamp(TimeUnit::Nanosecond, None)
581                );
582                let left = arrow::array::cast::as_primitive_array::<
583                    TimestampNanosecondType,
584                >(&result);
585                assert_eq!(left, &right);
586            } else {
587                panic!("unexpected column type");
588            }
589        });
590    }
591}