1use std::any::Any;
19use std::ops::Add;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{Array, ArrayRef, PrimitiveBuilder};
24use arrow::datatypes::DataType::Timestamp;
25use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
26use arrow::datatypes::{
27 ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
28 TimestampNanosecondType, TimestampSecondType,
29};
30use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
31
32use datafusion_common::cast::as_primitive_array;
33use datafusion_common::{
34 exec_err, internal_datafusion_err, plan_err, utils::take_function_args, Result,
35 ScalarValue,
36};
37use datafusion_expr::{
38 ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility,
39};
40use datafusion_macros::user_doc;
41
42#[user_doc(
46 doc_section(label = "Time and Date Functions"),
47 description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
48 syntax_example = "to_local_time(expression)",
49 sql_example = r#"```sql
50> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
51+---------------------------------------------+
52| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
53+---------------------------------------------+
54| 2024-04-01T00:00:20 |
55+---------------------------------------------+
56
57> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
58+---------------------------------------------+
59| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
60+---------------------------------------------+
61| 2024-04-01T00:00:20 |
62+---------------------------------------------+
63
64> SELECT
65 time,
66 arrow_typeof(time) as type,
67 to_local_time(time) as to_local_time,
68 arrow_typeof(to_local_time(time)) as to_local_time_type
69FROM (
70 SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
71);
72+---------------------------+----------------------------------+---------------------+--------------------+
73| time | type | to_local_time | to_local_time_type |
74+---------------------------+----------------------------------+---------------------+--------------------+
75| 2024-04-01T00:00:20+02:00 | Timestamp(ns, "Europe/Brussels") | 2024-04-01T00:00:20 | Timestamp(ns) |
76+---------------------------+----------------------------------+---------------------+--------------------+
77
78# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
79# than UTC boundaries
80
81> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
82+---------------------+
83| date_bin |
84+---------------------+
85| 2024-04-01T00:00:00 |
86+---------------------+
87
88> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
89+---------------------------+
90| date_bin_with_timezone |
91+---------------------------+
92| 2024-04-01T00:00:00+02:00 |
93+---------------------------+
94```"#,
95 argument(
96 name = "expression",
97 description = "Time expression to operate on. Can be a constant, column, or function."
98 )
99)]
100#[derive(Debug, PartialEq, Eq, Hash)]
101pub struct ToLocalTimeFunc {
102 signature: Signature,
103}
104
105impl Default for ToLocalTimeFunc {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl ToLocalTimeFunc {
112 pub fn new() -> Self {
113 Self {
114 signature: Signature::user_defined(Volatility::Immutable),
115 }
116 }
117
118 fn to_local_time(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
119 let [time_value] = take_function_args(self.name(), args)?;
120
121 let arg_type = time_value.data_type();
122 match arg_type {
123 Timestamp(_, None) => {
124 Ok(time_value.clone())
126 }
127 Timestamp(_, Some(timezone)) => {
134 let tz: Tz = timezone.parse()?;
135
136 match time_value {
137 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
138 Some(ts),
139 Some(_),
140 )) => {
141 let adjusted_ts =
142 adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
143 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
144 Some(adjusted_ts),
145 None,
146 )))
147 }
148 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
149 Some(ts),
150 Some(_),
151 )) => {
152 let adjusted_ts =
153 adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
154 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
155 Some(adjusted_ts),
156 None,
157 )))
158 }
159 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
160 Some(ts),
161 Some(_),
162 )) => {
163 let adjusted_ts =
164 adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
165 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
166 Some(adjusted_ts),
167 None,
168 )))
169 }
170 ColumnarValue::Scalar(ScalarValue::TimestampSecond(
171 Some(ts),
172 Some(_),
173 )) => {
174 let adjusted_ts =
175 adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
176 Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
177 Some(adjusted_ts),
178 None,
179 )))
180 }
181 ColumnarValue::Array(array) => {
182 fn transform_array<T: ArrowTimestampType>(
183 array: &ArrayRef,
184 tz: Tz,
185 ) -> Result<ColumnarValue> {
186 let mut builder = PrimitiveBuilder::<T>::new();
187
188 let primitive_array = as_primitive_array::<T>(array)?;
189 for ts_opt in primitive_array.iter() {
190 match ts_opt {
191 None => builder.append_null(),
192 Some(ts) => {
193 let adjusted_ts: i64 =
194 adjust_to_local_time::<T>(ts, tz)?;
195 builder.append_value(adjusted_ts)
196 }
197 }
198 }
199
200 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
201 }
202
203 match array.data_type() {
204 Timestamp(_, None) => {
205 Ok(time_value.clone())
207 }
208 Timestamp(Nanosecond, Some(_)) => {
209 transform_array::<TimestampNanosecondType>(array, tz)
210 }
211 Timestamp(Microsecond, Some(_)) => {
212 transform_array::<TimestampMicrosecondType>(array, tz)
213 }
214 Timestamp(Millisecond, Some(_)) => {
215 transform_array::<TimestampMillisecondType>(array, tz)
216 }
217 Timestamp(Second, Some(_)) => {
218 transform_array::<TimestampSecondType>(array, tz)
219 }
220 _ => {
221 exec_err!("to_local_time function requires timestamp argument in array, got {:?}", array.data_type())
222 }
223 }
224 }
225 _ => {
226 exec_err!(
227 "to_local_time function requires timestamp argument, got {:?}",
228 time_value.data_type()
229 )
230 }
231 }
232 }
233 _ => {
234 exec_err!(
235 "to_local_time function requires timestamp argument, got {:?}",
236 arg_type
237 )
238 }
239 }
240 }
241}
242
243fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
297 fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
298 where
299 F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
300 {
301 match converter(ts) {
302 MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
303 "Ambiguous timestamp. Do you mean {:?} or {:?}",
304 earliest,
305 latest
306 ),
307 MappedLocalTime::None => exec_err!(
308 "The local time does not exist because there is a gap in the local time."
309 ),
310 MappedLocalTime::Single(date_time) => Ok(date_time),
311 }
312 }
313
314 let date_time = match T::UNIT {
315 Nanosecond => Utc.timestamp_nanos(ts),
316 Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
317 Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
318 Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
319 };
320
321 let offset_seconds: i64 = tz
322 .offset_from_utc_datetime(&date_time.naive_utc())
323 .fix()
324 .local_minus_utc() as i64;
325
326 let adjusted_date_time = date_time.add(
327 TimeDelta::try_seconds(offset_seconds)
330 .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
331 );
332
333 match T::UNIT {
335 Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(||
336 internal_datafusion_err!(
337 "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
338 )
339 ),
340 Microsecond => Ok(adjusted_date_time.timestamp_micros()),
341 Millisecond => Ok(adjusted_date_time.timestamp_millis()),
342 Second => Ok(adjusted_date_time.timestamp()),
343 }
344}
345
346impl ScalarUDFImpl for ToLocalTimeFunc {
347 fn as_any(&self) -> &dyn Any {
348 self
349 }
350
351 fn name(&self) -> &str {
352 "to_local_time"
353 }
354
355 fn signature(&self) -> &Signature {
356 &self.signature
357 }
358
359 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
360 let [time_value] = take_function_args(self.name(), arg_types)?;
361
362 match time_value {
363 Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
364 _ => exec_err!(
365 "The to_local_time function can only accept timestamp as the arg, got {:?}", time_value
366 )
367 }
368 }
369
370 fn invoke_with_args(
371 &self,
372 args: datafusion_expr::ScalarFunctionArgs,
373 ) -> Result<ColumnarValue> {
374 let [time_value] = take_function_args(self.name(), args.args)?;
375
376 self.to_local_time(std::slice::from_ref(&time_value))
377 }
378
379 fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
380 if arg_types.len() != 1 {
381 return plan_err!(
382 "to_local_time function requires 1 argument, got {:?}",
383 arg_types.len()
384 );
385 }
386
387 let first_arg = arg_types[0].clone();
388 match &first_arg {
389 DataType::Null => Ok(vec![Timestamp(Nanosecond, None)]),
390 Timestamp(Nanosecond, timezone) => {
391 Ok(vec![Timestamp(Nanosecond, timezone.clone())])
392 }
393 Timestamp(Microsecond, timezone) => {
394 Ok(vec![Timestamp(Microsecond, timezone.clone())])
395 }
396 Timestamp(Millisecond, timezone) => {
397 Ok(vec![Timestamp(Millisecond, timezone.clone())])
398 }
399 Timestamp(Second, timezone) => Ok(vec![Timestamp(Second, timezone.clone())]),
400 _ => plan_err!("The to_local_time function can only accept Timestamp as the arg got {first_arg}"),
401 }
402 }
403 fn documentation(&self) -> Option<&Documentation> {
404 self.doc()
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use std::sync::Arc;
411
412 use arrow::array::{types::TimestampNanosecondType, Array, TimestampNanosecondArray};
413 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
414 use arrow::datatypes::{DataType, Field, TimeUnit};
415 use chrono::NaiveDateTime;
416 use datafusion_common::config::ConfigOptions;
417 use datafusion_common::ScalarValue;
418 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
419
420 use super::{adjust_to_local_time, ToLocalTimeFunc};
421
422 #[test]
423 fn test_adjust_to_local_time() {
424 let timestamp_str = "2020-03-31T13:40:00";
425 let tz: arrow::array::timezone::Tz =
426 "America/New_York".parse().expect("Invalid timezone");
427
428 let timestamp = timestamp_str
429 .parse::<NaiveDateTime>()
430 .unwrap()
431 .and_local_timezone(tz) .unwrap()
433 .timestamp_nanos_opt()
434 .unwrap();
435
436 let expected_timestamp = timestamp_str
437 .parse::<NaiveDateTime>()
438 .unwrap()
439 .and_utc() .timestamp_nanos_opt()
441 .unwrap();
442
443 let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
444 assert_eq!(res, expected_timestamp);
445 }
446
447 #[test]
448 fn test_to_local_time_scalar() {
449 let timezone = Some("Europe/Brussels".into());
450 let timestamps_with_timezone = vec![
451 (
452 ScalarValue::TimestampNanosecond(
453 Some(1_123_123_000_000_000_000),
454 timezone.clone(),
455 ),
456 ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
457 ),
458 (
459 ScalarValue::TimestampMicrosecond(
460 Some(1_123_123_000_000_000),
461 timezone.clone(),
462 ),
463 ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
464 ),
465 (
466 ScalarValue::TimestampMillisecond(
467 Some(1_123_123_000_000),
468 timezone.clone(),
469 ),
470 ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
471 ),
472 (
473 ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
474 ScalarValue::TimestampSecond(Some(1_123_130_200), None),
475 ),
476 ];
477
478 for (input, expected) in timestamps_with_timezone {
479 test_to_local_time_helper(input, expected);
480 }
481 }
482
483 #[test]
484 fn test_timezone_with_daylight_savings() {
485 let timezone_str = "America/New_York";
486 let tz: arrow::array::timezone::Tz =
487 timezone_str.parse().expect("Invalid timezone");
488
489 let test_cases = vec![
496 (
497 "2020-03-31T13:40:00",
499 1_585_676_400_000_000_000,
500 1_585_662_000_000_000_000,
501 ),
502 (
503 "2020-11-04T14:06:40",
505 1_604_516_800_000_000_000,
506 1_604_498_800_000_000_000,
507 ),
508 ];
509
510 for (
511 input_timestamp_str,
512 expected_input_timestamp,
513 expected_adjusted_timestamp,
514 ) in test_cases
515 {
516 let input_timestamp = input_timestamp_str
517 .parse::<NaiveDateTime>()
518 .unwrap()
519 .and_local_timezone(tz) .unwrap()
521 .timestamp_nanos_opt()
522 .unwrap();
523 assert_eq!(input_timestamp, expected_input_timestamp);
524
525 let expected_timestamp = input_timestamp_str
526 .parse::<NaiveDateTime>()
527 .unwrap()
528 .and_utc() .timestamp_nanos_opt()
530 .unwrap();
531 assert_eq!(expected_timestamp, expected_adjusted_timestamp);
532
533 let input = ScalarValue::TimestampNanosecond(
534 Some(input_timestamp),
535 Some(timezone_str.into()),
536 );
537 let expected =
538 ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
539 test_to_local_time_helper(input, expected)
540 }
541 }
542
543 fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
544 let arg_field = Field::new("a", input.data_type(), true).into();
545 let res = ToLocalTimeFunc::new()
546 .invoke_with_args(ScalarFunctionArgs {
547 args: vec![ColumnarValue::Scalar(input)],
548 arg_fields: vec![arg_field],
549 number_rows: 1,
550 return_field: Field::new("f", expected.data_type(), true).into(),
551 config_options: Arc::new(ConfigOptions::default()),
552 })
553 .unwrap();
554 match res {
555 ColumnarValue::Scalar(res) => {
556 assert_eq!(res, expected);
557 }
558 _ => panic!("unexpected return type"),
559 }
560 }
561
562 #[test]
563 fn test_to_local_time_timezones_array() {
564 let cases = [
565 (
566 vec![
567 "2020-09-08T00:00:00",
568 "2020-09-08T01:00:00",
569 "2020-09-08T02:00:00",
570 "2020-09-08T03:00:00",
571 "2020-09-08T04:00:00",
572 ],
573 None::<Arc<str>>,
574 vec![
575 "2020-09-08T00:00:00",
576 "2020-09-08T01:00:00",
577 "2020-09-08T02:00:00",
578 "2020-09-08T03:00:00",
579 "2020-09-08T04:00:00",
580 ],
581 ),
582 (
583 vec![
584 "2020-09-08T00:00:00",
585 "2020-09-08T01:00:00",
586 "2020-09-08T02:00:00",
587 "2020-09-08T03:00:00",
588 "2020-09-08T04:00:00",
589 ],
590 Some("+01:00".into()),
591 vec![
592 "2020-09-08T00:00:00",
593 "2020-09-08T01:00:00",
594 "2020-09-08T02:00:00",
595 "2020-09-08T03:00:00",
596 "2020-09-08T04:00:00",
597 ],
598 ),
599 ];
600
601 cases.iter().for_each(|(source, _tz_opt, expected)| {
602 let input = source
603 .iter()
604 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
605 .collect::<TimestampNanosecondArray>();
606 let right = expected
607 .iter()
608 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
609 .collect::<TimestampNanosecondArray>();
610 let batch_size = input.len();
611 let arg_field = Field::new("a", input.data_type().clone(), true).into();
612 let args = ScalarFunctionArgs {
613 args: vec![ColumnarValue::Array(Arc::new(input))],
614 arg_fields: vec![arg_field],
615 number_rows: batch_size,
616 return_field: Field::new(
617 "f",
618 DataType::Timestamp(TimeUnit::Nanosecond, None),
619 true,
620 )
621 .into(),
622 config_options: Arc::new(ConfigOptions::default()),
623 };
624 let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
625 if let ColumnarValue::Array(result) = result {
626 assert_eq!(
627 result.data_type(),
628 &DataType::Timestamp(TimeUnit::Nanosecond, None)
629 );
630 let left = arrow::array::cast::as_primitive_array::<
631 TimestampNanosecondType,
632 >(&result);
633 assert_eq!(left, &right);
634 } else {
635 panic!("unexpected column type");
636 }
637 });
638 }
639}