datafusion_spark/function/datetime/
from_utc_timestamp.rs1use std::sync::Arc;
19
20use arrow::array::timezone::Tz;
21use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType};
22use arrow::datatypes::TimeUnit;
23use arrow::datatypes::{
24 ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType,
25 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
26};
27use datafusion_common::types::{NativeType, logical_string};
28use datafusion_common::utils::take_function_args;
29use datafusion_common::{Result, exec_datafusion_err, exec_err, internal_err};
30use datafusion_expr::{
31 Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
32 Signature, TypeSignatureClass, Volatility,
33};
34use datafusion_functions::datetime::to_local_time::adjust_to_local_time;
35use datafusion_functions::utils::make_scalar_function;
36
37#[derive(Debug, PartialEq, Eq, Hash)]
47pub struct SparkFromUtcTimestamp {
48 signature: Signature,
49}
50
51impl Default for SparkFromUtcTimestamp {
52 fn default() -> Self {
53 Self::new()
54 }
55}
56
57impl SparkFromUtcTimestamp {
58 pub fn new() -> Self {
59 Self {
60 signature: Signature::coercible(
61 vec![
62 Coercion::new_implicit(
63 TypeSignatureClass::Timestamp,
64 vec![TypeSignatureClass::Native(logical_string())],
65 NativeType::Timestamp(TimeUnit::Microsecond, None),
66 ),
67 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
68 ],
69 Volatility::Immutable,
70 ),
71 }
72 }
73}
74
75impl ScalarUDFImpl for SparkFromUtcTimestamp {
76 fn name(&self) -> &str {
77 "from_utc_timestamp"
78 }
79
80 fn signature(&self) -> &Signature {
81 &self.signature
82 }
83
84 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
85 internal_err!("return_field_from_args should be used instead")
86 }
87
88 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
89 let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
90
91 Ok(Arc::new(Field::new(
92 self.name(),
93 args.arg_fields[0].data_type().clone(),
94 nullable,
95 )))
96 }
97
98 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
99 make_scalar_function(spark_from_utc_timestamp, vec![])(&args.args)
100 }
101}
102
103fn spark_from_utc_timestamp(args: &[ArrayRef]) -> Result<ArrayRef> {
104 let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?;
105
106 match timestamp.data_type() {
107 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
108 process_timestamp_with_tz_array::<TimestampNanosecondType>(
109 timestamp,
110 timezone,
111 tz_opt.clone(),
112 )
113 }
114 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
115 process_timestamp_with_tz_array::<TimestampMicrosecondType>(
116 timestamp,
117 timezone,
118 tz_opt.clone(),
119 )
120 }
121 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
122 process_timestamp_with_tz_array::<TimestampMillisecondType>(
123 timestamp,
124 timezone,
125 tz_opt.clone(),
126 )
127 }
128 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
129 process_timestamp_with_tz_array::<TimestampSecondType>(
130 timestamp,
131 timezone,
132 tz_opt.clone(),
133 )
134 }
135 ts_type => {
136 exec_err!("`from_utc_timestamp`: unsupported argument types: {ts_type}")
137 }
138 }
139}
140
141fn process_timestamp_with_tz_array<T: ArrowTimestampType>(
142 ts_array: &ArrayRef,
143 tz_array: &ArrayRef,
144 tz_opt: Option<Arc<str>>,
145) -> Result<ArrayRef> {
146 match tz_array.data_type() {
147 DataType::Utf8 => {
148 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i32>())
149 }
150 DataType::LargeUtf8 => {
151 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i64>())
152 }
153 DataType::Utf8View => {
154 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string_view())
155 }
156 other => {
157 exec_err!("`from_utc_timestamp`: timezone must be a string type, got {other}")
158 }
159 }
160}
161
162fn process_arrays<'a, T: ArrowTimestampType, S>(
163 return_tz_opt: Option<Arc<str>>,
164 ts_array: &ArrayRef,
165 tz_array: &'a S,
166) -> Result<ArrayRef>
167where
168 &'a S: StringArrayType<'a>,
169{
170 let ts_primitive = ts_array.as_primitive::<T>();
171 let mut builder = PrimitiveBuilder::<T>::with_capacity(ts_array.len());
172
173 for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) {
174 match (ts_opt, tz_opt) {
175 (Some(ts), Some(tz_str)) => {
176 let tz: Tz = tz_str.parse().map_err(|e| {
177 exec_datafusion_err!(
178 "`from_utc_timestamp`: invalid timezone '{tz_str}': {e}"
179 )
180 })?;
181 let val = adjust_to_local_time::<T>(ts, tz)?;
182 builder.append_value(val);
183 }
184 _ => builder.append_null(),
185 }
186 }
187
188 builder = builder.with_timezone_opt(return_tz_opt);
189 Ok(Arc::new(builder.finish()))
190}