Skip to main content

datafusion_spark/function/datetime/
from_utc_timestamp.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType};
23use arrow::datatypes::TimeUnit;
24use arrow::datatypes::{
25    ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType,
26    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
27};
28use datafusion_common::types::{NativeType, logical_string};
29use datafusion_common::utils::take_function_args;
30use datafusion_common::{Result, exec_datafusion_err, exec_err, internal_err};
31use datafusion_expr::{
32    Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
33    Signature, TypeSignatureClass, Volatility,
34};
35use datafusion_functions::datetime::to_local_time::adjust_to_local_time;
36use datafusion_functions::utils::make_scalar_function;
37
38/// Apache Spark `from_utc_timestamp` function.
39///
40/// Interprets the given timestamp as UTC and converts it to the given timezone.
41///
42/// Timestamp in Apache Spark represents number of microseconds from the Unix epoch, which is not
43/// timezone-agnostic. So in Apache Spark this function just shift the timestamp value from UTC timezone to
44/// the given timezone.
45///
46/// See <https://spark.apache.org/docs/latest/api/sql/index.html#from_utc_timestamp>
47#[derive(Debug, PartialEq, Eq, Hash)]
48pub struct SparkFromUtcTimestamp {
49    signature: Signature,
50}
51
52impl Default for SparkFromUtcTimestamp {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl SparkFromUtcTimestamp {
59    pub fn new() -> Self {
60        Self {
61            signature: Signature::coercible(
62                vec![
63                    Coercion::new_implicit(
64                        TypeSignatureClass::Timestamp,
65                        vec![TypeSignatureClass::Native(logical_string())],
66                        NativeType::Timestamp(TimeUnit::Microsecond, None),
67                    ),
68                    Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
69                ],
70                Volatility::Immutable,
71            ),
72        }
73    }
74}
75
76impl ScalarUDFImpl for SparkFromUtcTimestamp {
77    fn as_any(&self) -> &dyn Any {
78        self
79    }
80
81    fn name(&self) -> &str {
82        "from_utc_timestamp"
83    }
84
85    fn signature(&self) -> &Signature {
86        &self.signature
87    }
88
89    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
90        internal_err!("return_field_from_args should be used instead")
91    }
92
93    fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
94        let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
95
96        Ok(Arc::new(Field::new(
97            self.name(),
98            args.arg_fields[0].data_type().clone(),
99            nullable,
100        )))
101    }
102
103    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
104        make_scalar_function(spark_from_utc_timestamp, vec![])(&args.args)
105    }
106}
107
108fn spark_from_utc_timestamp(args: &[ArrayRef]) -> Result<ArrayRef> {
109    let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?;
110
111    match timestamp.data_type() {
112        DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
113            process_timestamp_with_tz_array::<TimestampNanosecondType>(
114                timestamp,
115                timezone,
116                tz_opt.clone(),
117            )
118        }
119        DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
120            process_timestamp_with_tz_array::<TimestampMicrosecondType>(
121                timestamp,
122                timezone,
123                tz_opt.clone(),
124            )
125        }
126        DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
127            process_timestamp_with_tz_array::<TimestampMillisecondType>(
128                timestamp,
129                timezone,
130                tz_opt.clone(),
131            )
132        }
133        DataType::Timestamp(TimeUnit::Second, tz_opt) => {
134            process_timestamp_with_tz_array::<TimestampSecondType>(
135                timestamp,
136                timezone,
137                tz_opt.clone(),
138            )
139        }
140        ts_type => {
141            exec_err!("`from_utc_timestamp`: unsupported argument types: {ts_type}")
142        }
143    }
144}
145
146fn process_timestamp_with_tz_array<T: ArrowTimestampType>(
147    ts_array: &ArrayRef,
148    tz_array: &ArrayRef,
149    tz_opt: Option<Arc<str>>,
150) -> Result<ArrayRef> {
151    match tz_array.data_type() {
152        DataType::Utf8 => {
153            process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i32>())
154        }
155        DataType::LargeUtf8 => {
156            process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i64>())
157        }
158        DataType::Utf8View => {
159            process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string_view())
160        }
161        other => {
162            exec_err!("`from_utc_timestamp`: timezone must be a string type, got {other}")
163        }
164    }
165}
166
167fn process_arrays<'a, T: ArrowTimestampType, S>(
168    return_tz_opt: Option<Arc<str>>,
169    ts_array: &ArrayRef,
170    tz_array: &'a S,
171) -> Result<ArrayRef>
172where
173    &'a S: StringArrayType<'a>,
174{
175    let ts_primitive = ts_array.as_primitive::<T>();
176    let mut builder = PrimitiveBuilder::<T>::with_capacity(ts_array.len());
177
178    for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) {
179        match (ts_opt, tz_opt) {
180            (Some(ts), Some(tz_str)) => {
181                let tz: Tz = tz_str.parse().map_err(|e| {
182                    exec_datafusion_err!(
183                        "`from_utc_timestamp`: invalid timezone '{tz_str}': {e}"
184                    )
185                })?;
186                let val = adjust_to_local_time::<T>(ts, tz)?;
187                builder.append_value(val);
188            }
189            _ => builder.append_null(),
190        }
191    }
192
193    builder = builder.with_timezone_opt(return_tz_opt);
194    Ok(Arc::new(builder.finish()))
195}