1use std::any::Any;
19use std::ops::Add;
20use std::sync::Arc;
21
22use arrow::array::timezone::Tz;
23use arrow::array::{ArrayRef, PrimitiveBuilder};
24use arrow::datatypes::DataType::Timestamp;
25use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
26use arrow::datatypes::{
27 ArrowTimestampType, DataType, TimestampMicrosecondType, TimestampMillisecondType,
28 TimestampNanosecondType, TimestampSecondType,
29};
30use chrono::{DateTime, MappedLocalTime, Offset, TimeDelta, TimeZone, Utc};
31
32use datafusion_common::cast::as_primitive_array;
33use datafusion_common::{
34 Result, ScalarValue, exec_err, internal_datafusion_err, internal_err,
35 utils::take_function_args,
36};
37use datafusion_expr::{
38 Coercion, ColumnarValue, Documentation, ScalarUDFImpl, Signature, TypeSignatureClass,
39 Volatility,
40};
41use datafusion_macros::user_doc;
42
43#[user_doc(
47 doc_section(label = "Time and Date Functions"),
48 description = "Converts a timestamp with a timezone to a timestamp without a timezone (with no offset or timezone information). This function handles daylight saving time changes.",
49 syntax_example = "to_local_time(expression)",
50 sql_example = r#"```sql
51> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp);
52+---------------------------------------------+
53| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
54+---------------------------------------------+
55| 2024-04-01T00:00:20 |
56+---------------------------------------------+
57
58> SELECT to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels');
59+---------------------------------------------+
60| to_local_time(Utf8("2024-04-01T00:00:20Z")) |
61+---------------------------------------------+
62| 2024-04-01T00:00:20 |
63+---------------------------------------------+
64
65> SELECT
66 time,
67 arrow_typeof(time) as type,
68 to_local_time(time) as to_local_time,
69 arrow_typeof(to_local_time(time)) as to_local_time_type
70FROM (
71 SELECT '2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels' AS time
72);
73+---------------------------+----------------------------------+---------------------+--------------------+
74| time | type | to_local_time | to_local_time_type |
75+---------------------------+----------------------------------+---------------------+--------------------+
76| 2024-04-01T00:00:20+02:00 | Timestamp(ns, "Europe/Brussels") | 2024-04-01T00:00:20 | Timestamp(ns) |
77+---------------------------+----------------------------------+---------------------+--------------------+
78
79# combine `to_local_time()` with `date_bin()` to bin on boundaries in the timezone rather
80# than UTC boundaries
81
82> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AS date_bin;
83+---------------------+
84| date_bin |
85+---------------------+
86| 2024-04-01T00:00:00 |
87+---------------------+
88
89> SELECT date_bin(interval '1 day', to_local_time('2024-04-01T00:00:20Z'::timestamp AT TIME ZONE 'Europe/Brussels')) AT TIME ZONE 'Europe/Brussels' AS date_bin_with_timezone;
90+---------------------------+
91| date_bin_with_timezone |
92+---------------------------+
93| 2024-04-01T00:00:00+02:00 |
94+---------------------------+
95```"#,
96 argument(
97 name = "expression",
98 description = "Time expression to operate on. Can be a constant, column, or function."
99 )
100)]
101#[derive(Debug, PartialEq, Eq, Hash)]
102pub struct ToLocalTimeFunc {
103 signature: Signature,
104}
105
106impl Default for ToLocalTimeFunc {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112impl ToLocalTimeFunc {
113 pub fn new() -> Self {
114 Self {
115 signature: Signature::coercible(
116 vec![Coercion::new_exact(TypeSignatureClass::Timestamp)],
117 Volatility::Immutable,
118 ),
119 }
120 }
121}
122
123impl ScalarUDFImpl for ToLocalTimeFunc {
124 fn as_any(&self) -> &dyn Any {
125 self
126 }
127
128 fn name(&self) -> &str {
129 "to_local_time"
130 }
131
132 fn signature(&self) -> &Signature {
133 &self.signature
134 }
135
136 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
137 match &arg_types[0] {
138 DataType::Null => Ok(Timestamp(Nanosecond, None)),
139 Timestamp(timeunit, _) => Ok(Timestamp(*timeunit, None)),
140 dt => internal_err!(
141 "The to_local_time function can only accept timestamp as the arg, got {dt}"
142 ),
143 }
144 }
145
146 fn invoke_with_args(
147 &self,
148 args: datafusion_expr::ScalarFunctionArgs,
149 ) -> Result<ColumnarValue> {
150 let [time_value] = take_function_args(self.name(), &args.args)?;
151 to_local_time(time_value)
152 }
153
154 fn documentation(&self) -> Option<&Documentation> {
155 self.doc()
156 }
157}
158
159fn transform_array<T: ArrowTimestampType>(
160 array: &ArrayRef,
161 tz: Tz,
162) -> Result<ColumnarValue> {
163 let primitive_array = as_primitive_array::<T>(array)?;
164 let mut builder = PrimitiveBuilder::<T>::with_capacity(primitive_array.len());
165 for ts_opt in primitive_array.iter() {
166 match ts_opt {
167 None => builder.append_null(),
168 Some(ts) => {
169 let adjusted_ts: i64 = adjust_to_local_time::<T>(ts, tz)?;
170 builder.append_value(adjusted_ts)
171 }
172 }
173 }
174
175 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
176}
177
178fn to_local_time(time_value: &ColumnarValue) -> Result<ColumnarValue> {
179 let arg_type = time_value.data_type();
180
181 let tz: Tz = match &arg_type {
182 Timestamp(_, Some(timezone)) => timezone.parse()?,
183 Timestamp(_, None) => {
184 return Ok(time_value.clone());
186 }
187 DataType::Null => {
188 return Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
189 None, None,
190 )));
191 }
192 dt => {
193 return internal_err!(
194 "to_local_time function requires timestamp argument, got {dt}"
195 );
196 }
197 };
198
199 match time_value {
206 ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, Some(_))) => Ok(
207 ColumnarValue::Scalar(ScalarValue::TimestampSecond(None, None)),
208 ),
209 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, Some(_))) => Ok(
210 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, None)),
211 ),
212 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, Some(_))) => Ok(
213 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(None, None)),
214 ),
215 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, Some(_))) => Ok(
216 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, None)),
217 ),
218 ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(ts), Some(_))) => {
219 let adjusted_ts = adjust_to_local_time::<TimestampNanosecondType>(*ts, tz)?;
220 Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
221 Some(adjusted_ts),
222 None,
223 )))
224 }
225 ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(ts), Some(_))) => {
226 let adjusted_ts = adjust_to_local_time::<TimestampMicrosecondType>(*ts, tz)?;
227 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
228 Some(adjusted_ts),
229 None,
230 )))
231 }
232 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(ts), Some(_))) => {
233 let adjusted_ts = adjust_to_local_time::<TimestampMillisecondType>(*ts, tz)?;
234 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
235 Some(adjusted_ts),
236 None,
237 )))
238 }
239 ColumnarValue::Scalar(ScalarValue::TimestampSecond(Some(ts), Some(_))) => {
240 let adjusted_ts = adjust_to_local_time::<TimestampSecondType>(*ts, tz)?;
241 Ok(ColumnarValue::Scalar(ScalarValue::TimestampSecond(
242 Some(adjusted_ts),
243 None,
244 )))
245 }
246 ColumnarValue::Array(array)
247 if matches!(array.data_type(), Timestamp(Nanosecond, Some(_))) =>
248 {
249 transform_array::<TimestampNanosecondType>(array, tz)
250 }
251 ColumnarValue::Array(array)
252 if matches!(array.data_type(), Timestamp(Microsecond, Some(_))) =>
253 {
254 transform_array::<TimestampMicrosecondType>(array, tz)
255 }
256 ColumnarValue::Array(array)
257 if matches!(array.data_type(), Timestamp(Millisecond, Some(_))) =>
258 {
259 transform_array::<TimestampMillisecondType>(array, tz)
260 }
261 ColumnarValue::Array(array)
262 if matches!(array.data_type(), Timestamp(Second, Some(_))) =>
263 {
264 transform_array::<TimestampSecondType>(array, tz)
265 }
266 _ => {
267 internal_err!(
268 "to_local_time function requires timestamp argument, got {arg_type}"
269 )
270 }
271 }
272}
273
274fn adjust_to_local_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
328 fn convert_timestamp<F>(ts: i64, converter: F) -> Result<DateTime<Utc>>
329 where
330 F: Fn(i64) -> MappedLocalTime<DateTime<Utc>>,
331 {
332 match converter(ts) {
333 MappedLocalTime::Ambiguous(earliest, latest) => exec_err!(
334 "Ambiguous timestamp. Do you mean {:?} or {:?}",
335 earliest,
336 latest
337 ),
338 MappedLocalTime::None => exec_err!(
339 "The local time does not exist because there is a gap in the local time."
340 ),
341 MappedLocalTime::Single(date_time) => Ok(date_time),
342 }
343 }
344
345 let date_time = match T::UNIT {
346 Nanosecond => Utc.timestamp_nanos(ts),
347 Microsecond => convert_timestamp(ts, |ts| Utc.timestamp_micros(ts))?,
348 Millisecond => convert_timestamp(ts, |ts| Utc.timestamp_millis_opt(ts))?,
349 Second => convert_timestamp(ts, |ts| Utc.timestamp_opt(ts, 0))?,
350 };
351
352 let offset_seconds: i64 = tz
353 .offset_from_utc_datetime(&date_time.naive_utc())
354 .fix()
355 .local_minus_utc() as i64;
356
357 let adjusted_date_time = date_time.add(
358 TimeDelta::try_seconds(offset_seconds)
361 .ok_or_else(|| internal_datafusion_err!("Offset seconds should be less than i64::MAX / 1_000 or greater than -i64::MAX / 1_000"))?,
362 );
363
364 match T::UNIT {
366 Nanosecond => adjusted_date_time.timestamp_nanos_opt().ok_or_else(||
367 internal_datafusion_err!(
368 "Failed to convert DateTime to timestamp in nanosecond. This error may occur if the date is out of range. The supported date ranges are between 1677-09-21T00:12:43.145224192 and 2262-04-11T23:47:16.854775807"
369 )
370 ),
371 Microsecond => Ok(adjusted_date_time.timestamp_micros()),
372 Millisecond => Ok(adjusted_date_time.timestamp_millis()),
373 Second => Ok(adjusted_date_time.timestamp()),
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use std::sync::Arc;
380
381 use arrow::array::{Array, TimestampNanosecondArray, types::TimestampNanosecondType};
382 use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
383 use arrow::datatypes::{DataType, Field, TimeUnit};
384 use chrono::NaiveDateTime;
385 use datafusion_common::ScalarValue;
386 use datafusion_common::config::ConfigOptions;
387 use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
388
389 use super::{ToLocalTimeFunc, adjust_to_local_time};
390
391 #[test]
392 fn test_adjust_to_local_time() {
393 let timestamp_str = "2020-03-31T13:40:00";
394 let tz: arrow::array::timezone::Tz =
395 "America/New_York".parse().expect("Invalid timezone");
396
397 let timestamp = timestamp_str
398 .parse::<NaiveDateTime>()
399 .unwrap()
400 .and_local_timezone(tz) .unwrap()
402 .timestamp_nanos_opt()
403 .unwrap();
404
405 let expected_timestamp = timestamp_str
406 .parse::<NaiveDateTime>()
407 .unwrap()
408 .and_utc() .timestamp_nanos_opt()
410 .unwrap();
411
412 let res = adjust_to_local_time::<TimestampNanosecondType>(timestamp, tz).unwrap();
413 assert_eq!(res, expected_timestamp);
414 }
415
416 #[test]
417 fn test_to_local_time_scalar() {
418 let timezone = Some("Europe/Brussels".into());
419 let timestamps_with_timezone = vec![
420 (
421 ScalarValue::TimestampNanosecond(
422 Some(1_123_123_000_000_000_000),
423 timezone.clone(),
424 ),
425 ScalarValue::TimestampNanosecond(Some(1_123_130_200_000_000_000), None),
426 ),
427 (
428 ScalarValue::TimestampMicrosecond(
429 Some(1_123_123_000_000_000),
430 timezone.clone(),
431 ),
432 ScalarValue::TimestampMicrosecond(Some(1_123_130_200_000_000), None),
433 ),
434 (
435 ScalarValue::TimestampMillisecond(
436 Some(1_123_123_000_000),
437 timezone.clone(),
438 ),
439 ScalarValue::TimestampMillisecond(Some(1_123_130_200_000), None),
440 ),
441 (
442 ScalarValue::TimestampSecond(Some(1_123_123_000), timezone),
443 ScalarValue::TimestampSecond(Some(1_123_130_200), None),
444 ),
445 ];
446
447 for (input, expected) in timestamps_with_timezone {
448 test_to_local_time_helper(input, expected);
449 }
450 }
451
452 #[test]
453 fn test_timezone_with_daylight_savings() {
454 let timezone_str = "America/New_York";
455 let tz: arrow::array::timezone::Tz =
456 timezone_str.parse().expect("Invalid timezone");
457
458 let test_cases = vec![
465 (
466 "2020-03-31T13:40:00",
468 1_585_676_400_000_000_000,
469 1_585_662_000_000_000_000,
470 ),
471 (
472 "2020-11-04T14:06:40",
474 1_604_516_800_000_000_000,
475 1_604_498_800_000_000_000,
476 ),
477 ];
478
479 for (
480 input_timestamp_str,
481 expected_input_timestamp,
482 expected_adjusted_timestamp,
483 ) in test_cases
484 {
485 let input_timestamp = input_timestamp_str
486 .parse::<NaiveDateTime>()
487 .unwrap()
488 .and_local_timezone(tz) .unwrap()
490 .timestamp_nanos_opt()
491 .unwrap();
492 assert_eq!(input_timestamp, expected_input_timestamp);
493
494 let expected_timestamp = input_timestamp_str
495 .parse::<NaiveDateTime>()
496 .unwrap()
497 .and_utc() .timestamp_nanos_opt()
499 .unwrap();
500 assert_eq!(expected_timestamp, expected_adjusted_timestamp);
501
502 let input = ScalarValue::TimestampNanosecond(
503 Some(input_timestamp),
504 Some(timezone_str.into()),
505 );
506 let expected =
507 ScalarValue::TimestampNanosecond(Some(expected_timestamp), None);
508 test_to_local_time_helper(input, expected)
509 }
510 }
511
512 fn test_to_local_time_helper(input: ScalarValue, expected: ScalarValue) {
513 let arg_field = Field::new("a", input.data_type(), true).into();
514 let res = ToLocalTimeFunc::new()
515 .invoke_with_args(ScalarFunctionArgs {
516 args: vec![ColumnarValue::Scalar(input)],
517 arg_fields: vec![arg_field],
518 number_rows: 1,
519 return_field: Field::new("f", expected.data_type(), true).into(),
520 config_options: Arc::new(ConfigOptions::default()),
521 })
522 .unwrap();
523 match res {
524 ColumnarValue::Scalar(res) => {
525 assert_eq!(res, expected);
526 }
527 _ => panic!("unexpected return type"),
528 }
529 }
530
531 #[test]
532 fn test_to_local_time_timezones_array() {
533 let cases = [
534 (
535 vec![
536 "2020-09-08T00:00:00",
537 "2020-09-08T01:00:00",
538 "2020-09-08T02:00:00",
539 "2020-09-08T03:00:00",
540 "2020-09-08T04:00:00",
541 ],
542 None::<Arc<str>>,
543 vec![
544 "2020-09-08T00:00:00",
545 "2020-09-08T01:00:00",
546 "2020-09-08T02:00:00",
547 "2020-09-08T03:00:00",
548 "2020-09-08T04:00:00",
549 ],
550 ),
551 (
552 vec![
553 "2020-09-08T00:00:00",
554 "2020-09-08T01:00:00",
555 "2020-09-08T02:00:00",
556 "2020-09-08T03:00:00",
557 "2020-09-08T04:00:00",
558 ],
559 Some("+01:00".into()),
560 vec![
561 "2020-09-08T00:00:00",
562 "2020-09-08T01:00:00",
563 "2020-09-08T02:00:00",
564 "2020-09-08T03:00:00",
565 "2020-09-08T04:00:00",
566 ],
567 ),
568 ];
569
570 cases.iter().for_each(|(source, _tz_opt, expected)| {
571 let input = source
572 .iter()
573 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
574 .collect::<TimestampNanosecondArray>();
575 let right = expected
576 .iter()
577 .map(|s| Some(string_to_timestamp_nanos(s).unwrap()))
578 .collect::<TimestampNanosecondArray>();
579 let batch_size = input.len();
580 let arg_field = Field::new("a", input.data_type().clone(), true).into();
581 let args = ScalarFunctionArgs {
582 args: vec![ColumnarValue::Array(Arc::new(input))],
583 arg_fields: vec![arg_field],
584 number_rows: batch_size,
585 return_field: Field::new(
586 "f",
587 DataType::Timestamp(TimeUnit::Nanosecond, None),
588 true,
589 )
590 .into(),
591 config_options: Arc::new(ConfigOptions::default()),
592 };
593 let result = ToLocalTimeFunc::new().invoke_with_args(args).unwrap();
594 if let ColumnarValue::Array(result) = result {
595 assert_eq!(
596 result.data_type(),
597 &DataType::Timestamp(TimeUnit::Nanosecond, None)
598 );
599 let left = arrow::array::cast::as_primitive_array::<
600 TimestampNanosecondType,
601 >(&result);
602 assert_eq!(left, &right);
603 } else {
604 panic!("unexpected column type");
605 }
606 });
607 }
608}