Skip to main content

datafusion_spark/function/datetime/
to_utc_timestamp.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType};
23use arrow::datatypes::TimeUnit;
24use arrow::datatypes::{
25    ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType,
26    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
27};
28use chrono::{DateTime, Offset, TimeZone};
29use datafusion_common::types::{NativeType, logical_string};
30use datafusion_common::utils::take_function_args;
31use datafusion_common::{
32    Result, exec_datafusion_err, exec_err, internal_datafusion_err, internal_err,
33};
34use datafusion_expr::{
35    Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
36    Signature, TypeSignatureClass, Volatility,
37};
38use datafusion_functions::utils::make_scalar_function;
39
40/// Apache Spark `to_utc_timestamp` function.
41///
42/// Interprets the given timestamp in the provided timezone and then converts it to UTC.
43///
44/// Timestamp in Apache Spark represents number of microseconds from the Unix epoch, which is not
45/// timezone-agnostic. So in Apache Spark this function just shift the timestamp value from the given
46/// timezone to UTC timezone.
47///
48/// See <https://spark.apache.org/docs/latest/api/sql/index.html#to_utc_timestamp>
49#[derive(Debug, PartialEq, Eq, Hash)]
50pub struct SparkToUtcTimestamp {
51    signature: Signature,
52}
53
54impl Default for SparkToUtcTimestamp {
55    fn default() -> Self {
56        Self::new()
57    }
58}
59
60impl SparkToUtcTimestamp {
61    pub fn new() -> Self {
62        Self {
63            signature: Signature::coercible(
64                vec![
65                    Coercion::new_implicit(
66                        TypeSignatureClass::Timestamp,
67                        vec![TypeSignatureClass::Native(logical_string())],
68                        NativeType::Timestamp(TimeUnit::Microsecond, None),
69                    ),
70                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
71                ],
72                Volatility::Immutable,
73            ),
74        }
75    }
76}
77
78impl ScalarUDFImpl for SparkToUtcTimestamp {
79    fn as_any(&self) -> &dyn Any {
80        self
81    }
82
83    fn name(&self) -> &str {
84        "to_utc_timestamp"
85    }
86
87    fn signature(&self) -> &Signature {
88        &self.signature
89    }
90
91    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
92        internal_err!("return_field_from_args should be used instead")
93    }
94
95    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
96        let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
97
98        Ok(Arc::new(Field::new(
99            self.name(),
100            args.arg_fields[0].data_type().clone(),
101            nullable,
102        )))
103    }
104
105    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
106        make_scalar_function(to_utc_timestamp, vec![])(&args.args)
107    }
108}
109
110fn to_utc_timestamp(args: &[ArrayRef]) -> Result<ArrayRef> {
111    let [timestamp, timezone] = take_function_args("to_utc_timestamp", args)?;
112
113    match timestamp.data_type() {
114        DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
115            process_timestamp_with_tz_array::<TimestampNanosecondType>(
116                timestamp,
117                timezone,
118                tz_opt.clone(),
119            )
120        }
121        DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
122            process_timestamp_with_tz_array::<TimestampMicrosecondType>(
123                timestamp,
124                timezone,
125                tz_opt.clone(),
126            )
127        }
128        DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
129            process_timestamp_with_tz_array::<TimestampMillisecondType>(
130                timestamp,
131                timezone,
132                tz_opt.clone(),
133            )
134        }
135        DataType::Timestamp(TimeUnit::Second, tz_opt) => {
136            process_timestamp_with_tz_array::<TimestampSecondType>(
137                timestamp,
138                timezone,
139                tz_opt.clone(),
140            )
141        }
142        ts_type => {
143            exec_err!("`to_utc_timestamp`: unsupported argument types: {ts_type}")
144        }
145    }
146}
147
148fn process_timestamp_with_tz_array<T: ArrowTimestampType>(
149    ts_array: &ArrayRef,
150    tz_array: &ArrayRef,
151    tz_opt: Option<Arc<str>>,
152) -> Result<ArrayRef> {
153    match tz_array.data_type() {
154        DataType::Utf8 => {
155            process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i32>())
156        }
157        DataType::LargeUtf8 => {
158            process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i64>())
159        }
160        DataType::Utf8View => {
161            process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string_view())
162        }
163        other => {
164            exec_err!("`to_utc_timestamp`: timezone must be a string type, got {other}")
165        }
166    }
167}
168
169fn process_arrays<'a, T: ArrowTimestampType, S>(
170    return_tz_opt: Option<Arc<str>>,
171    ts_array: &ArrayRef,
172    tz_array: &'a S,
173) -> Result<ArrayRef>
174where
175    &'a S: StringArrayType<'a>,
176{
177    let ts_primitive = ts_array.as_primitive::<T>();
178    let mut builder = PrimitiveBuilder::<T>::with_capacity(ts_array.len());
179
180    for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) {
181        match (ts_opt, tz_opt) {
182            (Some(ts), Some(tz_str)) => {
183                let tz: Tz = tz_str.parse().map_err(|e| {
184                    exec_datafusion_err!(
185                        "`to_utc_timestamp`: invalid timezone '{tz_str}': {e}"
186                    )
187                })?;
188                let val = adjust_to_utc_time::<T>(ts, tz)?;
189                builder.append_value(val);
190            }
191            _ => builder.append_null(),
192        }
193    }
194
195    builder = builder.with_timezone_opt(return_tz_opt);
196    Ok(Arc::new(builder.finish()))
197}
198
199fn adjust_to_utc_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
200    let dt = match T::UNIT {
201        TimeUnit::Nanosecond => Some(DateTime::from_timestamp_nanos(ts)),
202        TimeUnit::Microsecond => DateTime::from_timestamp_micros(ts),
203        TimeUnit::Millisecond => DateTime::from_timestamp_millis(ts),
204        TimeUnit::Second => DateTime::from_timestamp(ts, 0),
205    }
206    .ok_or_else(|| internal_datafusion_err!("Invalid timestamp"))?;
207    let naive_dt = dt.naive_utc();
208
209    let offset_seconds = tz
210        .offset_from_utc_datetime(&naive_dt)
211        .fix()
212        .local_minus_utc() as i64;
213
214    let offset_in_unit = match T::UNIT {
215        TimeUnit::Nanosecond => offset_seconds.checked_mul(1_000_000_000),
216        TimeUnit::Microsecond => offset_seconds.checked_mul(1_000_000),
217        TimeUnit::Millisecond => offset_seconds.checked_mul(1_000),
218        TimeUnit::Second => Some(offset_seconds),
219    }
220    .ok_or_else(|| internal_datafusion_err!("Offset overflow"))?;
221
222    ts.checked_sub(offset_in_unit).ok_or_else(|| {
223        internal_datafusion_err!("Timestamp overflow during timezone adjustment")
224    })
225}