datafusion_spark/function/datetime/
to_utc_timestamp.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType};
23use arrow::datatypes::TimeUnit;
24use arrow::datatypes::{
25 ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType,
26 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
27};
28use chrono::{DateTime, Offset, TimeZone};
29use datafusion_common::types::{NativeType, logical_string};
30use datafusion_common::utils::take_function_args;
31use datafusion_common::{
32 Result, exec_datafusion_err, exec_err, internal_datafusion_err, internal_err,
33};
34use datafusion_expr::{
35 Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
36 Signature, TypeSignatureClass, Volatility,
37};
38use datafusion_functions::utils::make_scalar_function;
39
40#[derive(Debug, PartialEq, Eq, Hash)]
50pub struct SparkToUtcTimestamp {
51 signature: Signature,
52}
53
54impl Default for SparkToUtcTimestamp {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl SparkToUtcTimestamp {
61 pub fn new() -> Self {
62 Self {
63 signature: Signature::coercible(
64 vec![
65 Coercion::new_implicit(
66 TypeSignatureClass::Timestamp,
67 vec![TypeSignatureClass::Native(logical_string())],
68 NativeType::Timestamp(TimeUnit::Microsecond, None),
69 ),
70 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
71 ],
72 Volatility::Immutable,
73 ),
74 }
75 }
76}
77
78impl ScalarUDFImpl for SparkToUtcTimestamp {
79 fn as_any(&self) -> &dyn Any {
80 self
81 }
82
83 fn name(&self) -> &str {
84 "to_utc_timestamp"
85 }
86
87 fn signature(&self) -> &Signature {
88 &self.signature
89 }
90
91 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
92 internal_err!("return_field_from_args should be used instead")
93 }
94
95 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
96 let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
97
98 Ok(Arc::new(Field::new(
99 self.name(),
100 args.arg_fields[0].data_type().clone(),
101 nullable,
102 )))
103 }
104
105 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
106 make_scalar_function(to_utc_timestamp, vec![])(&args.args)
107 }
108}
109
110fn to_utc_timestamp(args: &[ArrayRef]) -> Result<ArrayRef> {
111 let [timestamp, timezone] = take_function_args("to_utc_timestamp", args)?;
112
113 match timestamp.data_type() {
114 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
115 process_timestamp_with_tz_array::<TimestampNanosecondType>(
116 timestamp,
117 timezone,
118 tz_opt.clone(),
119 )
120 }
121 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
122 process_timestamp_with_tz_array::<TimestampMicrosecondType>(
123 timestamp,
124 timezone,
125 tz_opt.clone(),
126 )
127 }
128 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
129 process_timestamp_with_tz_array::<TimestampMillisecondType>(
130 timestamp,
131 timezone,
132 tz_opt.clone(),
133 )
134 }
135 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
136 process_timestamp_with_tz_array::<TimestampSecondType>(
137 timestamp,
138 timezone,
139 tz_opt.clone(),
140 )
141 }
142 ts_type => {
143 exec_err!("`to_utc_timestamp`: unsupported argument types: {ts_type}")
144 }
145 }
146}
147
148fn process_timestamp_with_tz_array<T: ArrowTimestampType>(
149 ts_array: &ArrayRef,
150 tz_array: &ArrayRef,
151 tz_opt: Option<Arc<str>>,
152) -> Result<ArrayRef> {
153 match tz_array.data_type() {
154 DataType::Utf8 => {
155 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i32>())
156 }
157 DataType::LargeUtf8 => {
158 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i64>())
159 }
160 DataType::Utf8View => {
161 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string_view())
162 }
163 other => {
164 exec_err!("`to_utc_timestamp`: timezone must be a string type, got {other}")
165 }
166 }
167}
168
169fn process_arrays<'a, T: ArrowTimestampType, S>(
170 return_tz_opt: Option<Arc<str>>,
171 ts_array: &ArrayRef,
172 tz_array: &'a S,
173) -> Result<ArrayRef>
174where
175 &'a S: StringArrayType<'a>,
176{
177 let ts_primitive = ts_array.as_primitive::<T>();
178 let mut builder = PrimitiveBuilder::<T>::with_capacity(ts_array.len());
179
180 for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) {
181 match (ts_opt, tz_opt) {
182 (Some(ts), Some(tz_str)) => {
183 let tz: Tz = tz_str.parse().map_err(|e| {
184 exec_datafusion_err!(
185 "`to_utc_timestamp`: invalid timezone '{tz_str}': {e}"
186 )
187 })?;
188 let val = adjust_to_utc_time::<T>(ts, tz)?;
189 builder.append_value(val);
190 }
191 _ => builder.append_null(),
192 }
193 }
194
195 builder = builder.with_timezone_opt(return_tz_opt);
196 Ok(Arc::new(builder.finish()))
197}
198
199fn adjust_to_utc_time<T: ArrowTimestampType>(ts: i64, tz: Tz) -> Result<i64> {
200 let dt = match T::UNIT {
201 TimeUnit::Nanosecond => Some(DateTime::from_timestamp_nanos(ts)),
202 TimeUnit::Microsecond => DateTime::from_timestamp_micros(ts),
203 TimeUnit::Millisecond => DateTime::from_timestamp_millis(ts),
204 TimeUnit::Second => DateTime::from_timestamp(ts, 0),
205 }
206 .ok_or_else(|| internal_datafusion_err!("Invalid timestamp"))?;
207 let naive_dt = dt.naive_utc();
208
209 let offset_seconds = tz
210 .offset_from_utc_datetime(&naive_dt)
211 .fix()
212 .local_minus_utc() as i64;
213
214 let offset_in_unit = match T::UNIT {
215 TimeUnit::Nanosecond => offset_seconds.checked_mul(1_000_000_000),
216 TimeUnit::Microsecond => offset_seconds.checked_mul(1_000_000),
217 TimeUnit::Millisecond => offset_seconds.checked_mul(1_000),
218 TimeUnit::Second => Some(offset_seconds),
219 }
220 .ok_or_else(|| internal_datafusion_err!("Offset overflow"))?;
221
222 ts.checked_sub(offset_in_unit).ok_or_else(|| {
223 internal_datafusion_err!("Timestamp overflow during timezone adjustment")
224 })
225}