1use std::ops::Add;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{ArrayRef, PrimitiveArray};
23use arrow::datatypes::DataType::Timestamp;
24use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
25use arrow::datatypes::{
26 ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
27 TimestampNanosecondType, TimestampSecondType,
28};
29use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
30
31use datafusion_common::cast::as_primitive_array;
32use datafusion_common::{
33 Result, ScalarValue, exec_err, internal_datafusion_err, internal_err,
34 utils::take_function_args,
35};
36use datafusion_expr::{
37 Coercion, ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
38 TypeSignatureClass, Volatility,
39};
40use datafusion_macros::user_doc;
41
42#[user_doc(
46 doc_section(label = "Time and Date Functions"),
47 description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
48 syntax_example = "to_local_time(expression)",
49 sql_example = r#"```sql
50> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
51+---------------------------------------------+
52| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
53+---------------------------------------------+
54| 2024-04-01T00:00:20 |
55+---------------------------------------------+
56
57> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
58+---------------------------------------------+
59| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
60+---------------------------------------------+
61| 2024-04-01T00:00:20 |
62+---------------------------------------------+
63
64> SELECT
65 time,
66 arrow_typeof(time) as type,
67 to_local_time(time) as to_local_time,
68 arrow_typeof(to_local_time(time)) as to_local_time_type
69FROM (
70 SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
71);
72+---------------------------+----------------------------------+---------------------+--------------------+
73| time | type | to_local_time | to_local_time_type |
74+---------------------------+----------------------------------+---------------------+--------------------+
75| 2024-04-01T00:00:20+02:00 | Timestamp(ns, "Europe/Brussels") | 2024-04-01T00:00:20 | Timestamp(ns) |
76+---------------------------+----------------------------------+---------------------+--------------------+
77
78# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
79# than UTC boundaries
80
81> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
82+---------------------+
83| date_bin |
84+---------------------+
85| 2024-04-01T00:00:00 |
86+---------------------+
87
88> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
89+---------------------------+
90| date_bin_with_timezone |
91+---------------------------+
92| 2024-04-01T00:00:00+02:00 |
93+---------------------------+
94```"#,
95 argument(
96 name = "expression",
97 description = "Time expression to operate on. Can be a constant, column, or function."
98 )
99)]
100#[derive(Debug, PartialEq, Eq, Hash)]
101pub struct ToLocalTimeFunc {
102 signature: Signature,
103}
104
105impl Default for ToLocalTimeFunc {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111impl ToLocalTimeFunc {
112 pub fn new() -> Self {
113 Self {
114 signature: Signature::coercible(
115 vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
116 Volatility::Immutable,
117 ),
118 }
119 }
120}
121
122impl ScalarUDFImpl for ToLocalTimeFunc {
123 fn name(&self) -> &str {
124 "to_local_time"
125 }
126
127 fn signature(&self) -> &Signature {
128 &self.signature
129 }
130
131 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
132 match &arg_types[0] {
133 DataType::Null => Ok(Timestamp(Nanosecond, None)),
134 Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
135 dt => internal_err!(
136 "The to_local_time function can only accept timestamp as the arg, got {dt}"
137 ),
138 }
139 }
140
141 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
142 let [time_value] = take_function_args(self.name(), &args.args)?;
143 to_local_time(time_value)
144 }
145
146 fn documentation(&self) -> Option<&Documentation> {
147 self.doc()
148 }
149}
150
151fn transform_array<T: ArrowTimestampType>(
152 array: &ArrayRef,
153 tz: Tz,
154) -> Result<ColumnarValue> {
155 let primitive_array = as_primitive_array::<T>(array)?;
156 let result: PrimitiveArray<T> =
157 primitive_array.try_unary(|ts| adjust_to_local_time::<T>(ts, tz))?;
158 Ok(ColumnarValue::Array(Arc::new(result)))
159}
160
161fn to_local_time(time_value: &ColumnarValue) -> Result<ColumnarValue> {
162 let arg_type = time_value.data_type();
163
164 let tz: Tz = match &arg_type {
165 Timestamp(_, Some(timezone)) => timezone.parse()?,
166 Timestamp(_, None) => {
167 return Ok(time_value.clone());
169 }
170 DataType::Null => {
171 return Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
172 None, None,
173 )));
174 }
175 dt => {
176 return internal_err!(
177 "to_local_time function requires timestamp argument, got {dt}"
178 );
179 }
180 };
181
182 match time_value {
189 ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, Some(_))) => Ok(
190 ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, None)),
191 ),
192 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, Some(_))) => Ok(
193 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, None)),
194 ),
195 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, Some(_))) => Ok(
196 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, None)),
197 ),
198 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, Some(_))) => Ok(
199 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, None)),
200 ),
201 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), Some(_))) => {
202 let adjusted_ts = adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
203 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
204 Some(adjusted_ts),
205 None,
206 )))
207 }
208 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(ts), Some(_))) => {
209 let adjusted_ts = adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
210 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
211 Some(adjusted_ts),
212 None,
213 )))
214 }
215 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(ts), Some(_))) => {
216 let adjusted_ts = adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
217 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
218 Some(adjusted_ts),
219 None,
220 )))
221 }
222 ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), Some(_))) => {
223 let adjusted_ts = adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
224 Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
225 Some(adjusted_ts),
226 None,
227 )))
228 }
229 ColumnarValue::Array(array)
230 if matches!(array.data_type(), Timestamp(Nanosecond, Some(_))) =>
231 {
232 transform_array::<TimestampNanosecondType>(array, tz)
233 }
234 ColumnarValue::Array(array)
235 if matches!(array.data_type(), Timestamp(Microsecond, Some(_))) =>
236 {
237 transform_array::<TimestampMicrosecondType>(array, tz)
238 }
239 ColumnarValue::Array(array)
240 if matches!(array.data_type(), Timestamp(Millisecond, Some(_))) =>
241 {
242 transform_array::<TimestampMillisecondType>(array, tz)
243 }
244 ColumnarValue::Array(array)
245 if matches!(array.data_type(), Timestamp(Second, Some(_))) =>
246 {
247 transform_array::<TimestampSecondType>(array, tz)
248 }
249 _ => {
250 internal_err!(
251 "to_local_time function requires timestamp argument, got {arg_type}"
252 )
253 }
254 }
255}
256
257pub fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
311 fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
312 where
313 F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
314 {
315 match converter(ts) {
316 MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
317 "Ambiguous timestamp. Do you mean {:?} or {:?}",
318 earliest,
319 latest
320 ),
321 MappedLocalTime::None => exec_err!(
322 "The local time does not exist because there is a gap in the local time."
323 ),
324 MappedLocalTime::Single(date_time) => Ok(date_time),
325 }
326 }
327
328 let date_time = match T::UNIT {
329 Nanosecond => Utc.timestamp_nanos(ts),
330 Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
331 Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
332 Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
333 };
334
335 let offset_seconds: i64 = tz
336 .offset_from_utc_datetime(&date_time.naive_utc())
337 .fix()
338 .local_minus_utc() as i64;
339
340 let adjusted_date_time = date_time.add(
341 TimeDelta::try_seconds(offset_seconds)
344 .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
345 );
346
347 match T::UNIT {
349 Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(||
350 internal_datafusion_err!(
351 "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
352 )
353 ),
354 Microsecond => Ok(adjusted_date_time.timestamp_micros()),
355 Millisecond => Ok(adjusted_date_time.timestamp_millis()),
356 Second => Ok(adjusted_date_time.timestamp()),
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use std::sync::Arc;
363
364 use arrow::array::{Array, TimestampNanosecondArray, types::TimestampNanosecondType};
365 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
366 use arrow::datatypes::{DataType, Field, TimeUnit};
367 use chrono::NaiveDateTime;
368 use datafusion_common::ScalarValue;
369 use datafusion_common::config::ConfigOptions;
370 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
371
372 use super::{ToLocalTimeFunc, adjust_to_local_time};
373
374 #[test]
375 fn test_adjust_to_local_time() {
376 let timestamp_str = "2020-03-31T13:40:00";
377 let tz: arrow::array::timezone::Tz =
378 "America/New_York".parse().expect("Invalid timezone");
379
380 let timestamp = timestamp_str
381 .parse::<NaiveDateTime>()
382 .unwrap()
383 .and_local_timezone(tz) .unwrap()
385 .timestamp_nanos_opt()
386 .unwrap();
387
388 let expected_timestamp = timestamp_str
389 .parse::<NaiveDateTime>()
390 .unwrap()
391 .and_utc() .timestamp_nanos_opt()
393 .unwrap();
394
395 let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
396 assert_eq!(res, expected_timestamp);
397 }
398
399 #[test]
400 fn test_to_local_time_scalar() {
401 let timezone = Some("Europe/Brussels".into());
402 let timestamps_with_timezone = vec![
403 (
404 ScalarValue::TimestampNanosecond(
405 Some(1_123_123_000_000_000_000),
406 timezone.clone(),
407 ),
408 ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
409 ),
410 (
411 ScalarValue::TimestampMicrosecond(
412 Some(1_123_123_000_000_000),
413 timezone.clone(),
414 ),
415 ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
416 ),
417 (
418 ScalarValue::TimestampMillisecond(
419 Some(1_123_123_000_000),
420 timezone.clone(),
421 ),
422 ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
423 ),
424 (
425 ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
426 ScalarValue::TimestampSecond(Some(1_123_130_200), None),
427 ),
428 ];
429
430 for (input, expected) in timestamps_with_timezone {
431 test_to_local_time_helper(input, expected);
432 }
433 }
434
435 #[test]
436 fn test_timezone_with_daylight_savings() {
437 let timezone_str = "America/New_York";
438 let tz: arrow::array::timezone::Tz =
439 timezone_str.parse().expect("Invalid timezone");
440
441 let test_cases = vec![
448 (
449 "2020-03-31T13:40:00",
451 1_585_676_400_000_000_000,
452 1_585_662_000_000_000_000,
453 ),
454 (
455 "2020-11-04T14:06:40",
457 1_604_516_800_000_000_000,
458 1_604_498_800_000_000_000,
459 ),
460 ];
461
462 for (
463 input_timestamp_str,
464 expected_input_timestamp,
465 expected_adjusted_timestamp,
466 ) in test_cases
467 {
468 let input_timestamp = input_timestamp_str
469 .parse::<NaiveDateTime>()
470 .unwrap()
471 .and_local_timezone(tz) .unwrap()
473 .timestamp_nanos_opt()
474 .unwrap();
475 assert_eq!(input_timestamp, expected_input_timestamp);
476
477 let expected_timestamp = input_timestamp_str
478 .parse::<NaiveDateTime>()
479 .unwrap()
480 .and_utc() .timestamp_nanos_opt()
482 .unwrap();
483 assert_eq!(expected_timestamp, expected_adjusted_timestamp);
484
485 let input = ScalarValue::TimestampNanosecond(
486 Some(input_timestamp),
487 Some(timezone_str.into()),
488 );
489 let expected =
490 ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
491 test_to_local_time_helper(input, expected)
492 }
493 }
494
495 fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
496 let arg_field = Field::new("a", input.data_type(), true).into();
497 let res = ToLocalTimeFunc::new()
498 .invoke_with_args(ScalarFunctionArgs {
499 args: vec![ColumnarValue::Scalar(input)],
500 arg_fields: vec![arg_field],
501 number_rows: 1,
502 return_field: Field::new("f", expected.data_type(), true).into(),
503 config_options: Arc::new(ConfigOptions::default()),
504 })
505 .unwrap();
506 match res {
507 ColumnarValue::Scalar(res) => {
508 assert_eq!(res, expected);
509 }
510 _ => panic!("unexpected return type"),
511 }
512 }
513
514 #[test]
515 fn test_to_local_time_timezones_array() {
516 let cases = [
517 (
518 vec![
519 "2020-09-08T00:00:00",
520 "2020-09-08T01:00:00",
521 "2020-09-08T02:00:00",
522 "2020-09-08T03:00:00",
523 "2020-09-08T04:00:00",
524 ],
525 None::<Arc<str>>,
526 vec![
527 "2020-09-08T00:00:00",
528 "2020-09-08T01:00:00",
529 "2020-09-08T02:00:00",
530 "2020-09-08T03:00:00",
531 "2020-09-08T04:00:00",
532 ],
533 ),
534 (
535 vec![
536 "2020-09-08T00:00:00",
537 "2020-09-08T01:00:00",
538 "2020-09-08T02:00:00",
539 "2020-09-08T03:00:00",
540 "2020-09-08T04:00:00",
541 ],
542 Some("+01:00".into()),
543 vec![
544 "2020-09-08T00:00:00",
545 "2020-09-08T01:00:00",
546 "2020-09-08T02:00:00",
547 "2020-09-08T03:00:00",
548 "2020-09-08T04:00:00",
549 ],
550 ),
551 ];
552
553 cases.iter().for_each(|(source, _tz_opt, expected)| {
554 let input = source
555 .iter()
556 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
557 .collect::<TimestampNanosecondArray>();
558 let right = expected
559 .iter()
560 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
561 .collect::<TimestampNanosecondArray>();
562 let batch_size = input.len();
563 let arg_field = Field::new("a", input.data_type().clone(), true).into();
564 let args = ScalarFunctionArgs {
565 args: vec![ColumnarValue::Array(Arc::new(input))],
566 arg_fields: vec![arg_field],
567 number_rows: batch_size,
568 return_field: Field::new(
569 "f",
570 DataType::Timestamp(TimeUnit::Nanosecond, None),
571 true,
572 )
573 .into(),
574 config_options: Arc::new(ConfigOptions::default()),
575 };
576 let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
577 if let ColumnarValue::Array(result) = result {
578 assert_eq!(
579 result.data_type(),
580 &DataType::Timestamp(TimeUnit::Nanosecond, None)
581 );
582 let left = arrow::array::cast::as_primitive_array::<
583 TimestampNanosecondType,
584 >(&result);
585 assert_eq!(left, &right);
586 } else {
587 panic!("unexpected column type");
588 }
589 });
590 }
591}