datafusion_spark/function/datetime/
from_utc_timestamp.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::timezone::Tz;
22use arrow::array::{Array, ArrayRef, AsArray, PrimitiveBuilder, StringArrayType};
23use arrow::datatypes::TimeUnit;
24use arrow::datatypes::{
25 ArrowTimestampType, DataType, Field, FieldRef, TimestampMicrosecondType,
26 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
27};
28use datafusion_common::types::{NativeType, logical_string};
29use datafusion_common::utils::take_function_args;
30use datafusion_common::{Result, exec_datafusion_err, exec_err, internal_err};
31use datafusion_expr::{
32 Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl,
33 Signature, TypeSignatureClass, Volatility,
34};
35use datafusion_functions::datetime::to_local_time::adjust_to_local_time;
36use datafusion_functions::utils::make_scalar_function;
37
38#[derive(Debug, PartialEq, Eq, Hash)]
48pub struct SparkFromUtcTimestamp {
49 signature: Signature,
50}
51
52impl Default for SparkFromUtcTimestamp {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl SparkFromUtcTimestamp {
59 pub fn new() -> Self {
60 Self {
61 signature: Signature::coercible(
62 vec![
63 Coercion::new_implicit(
64 TypeSignatureClass::Timestamp,
65 vec![TypeSignatureClass::Native(logical_string())],
66 NativeType::Timestamp(TimeUnit::Microsecond, None),
67 ),
68 Coercion::new_exact(TypeSignatureClass::Native(logical_string())),
69 ],
70 Volatility::Immutable,
71 ),
72 }
73 }
74}
75
76impl ScalarUDFImpl for SparkFromUtcTimestamp {
77 fn as_any(&self) -> &dyn Any {
78 self
79 }
80
81 fn name(&self) -> &str {
82 "from_utc_timestamp"
83 }
84
85 fn signature(&self) -> &Signature {
86 &self.signature
87 }
88
89 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
90 internal_err!("return_field_from_args should be used instead")
91 }
92
93 fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<FieldRef> {
94 let nullable = args.arg_fields.iter().any(|f| f.is_nullable());
95
96 Ok(Arc::new(Field::new(
97 self.name(),
98 args.arg_fields[0].data_type().clone(),
99 nullable,
100 )))
101 }
102
103 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
104 make_scalar_function(spark_from_utc_timestamp, vec![])(&args.args)
105 }
106}
107
108fn spark_from_utc_timestamp(args: &[ArrayRef]) -> Result<ArrayRef> {
109 let [timestamp, timezone] = take_function_args("from_utc_timestamp", args)?;
110
111 match timestamp.data_type() {
112 DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
113 process_timestamp_with_tz_array::<TimestampNanosecondType>(
114 timestamp,
115 timezone,
116 tz_opt.clone(),
117 )
118 }
119 DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
120 process_timestamp_with_tz_array::<TimestampMicrosecondType>(
121 timestamp,
122 timezone,
123 tz_opt.clone(),
124 )
125 }
126 DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
127 process_timestamp_with_tz_array::<TimestampMillisecondType>(
128 timestamp,
129 timezone,
130 tz_opt.clone(),
131 )
132 }
133 DataType::Timestamp(TimeUnit::Second, tz_opt) => {
134 process_timestamp_with_tz_array::<TimestampSecondType>(
135 timestamp,
136 timezone,
137 tz_opt.clone(),
138 )
139 }
140 ts_type => {
141 exec_err!("`from_utc_timestamp`: unsupported argument types: {ts_type}")
142 }
143 }
144}
145
146fn process_timestamp_with_tz_array<T: ArrowTimestampType>(
147 ts_array: &ArrayRef,
148 tz_array: &ArrayRef,
149 tz_opt: Option<Arc<str>>,
150) -> Result<ArrayRef> {
151 match tz_array.data_type() {
152 DataType::Utf8 => {
153 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i32>())
154 }
155 DataType::LargeUtf8 => {
156 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string::<i64>())
157 }
158 DataType::Utf8View => {
159 process_arrays::<T, _>(tz_opt, ts_array, tz_array.as_string_view())
160 }
161 other => {
162 exec_err!("`from_utc_timestamp`: timezone must be a string type, got {other}")
163 }
164 }
165}
166
167fn process_arrays<'a, T: ArrowTimestampType, S>(
168 return_tz_opt: Option<Arc<str>>,
169 ts_array: &ArrayRef,
170 tz_array: &'a S,
171) -> Result<ArrayRef>
172where
173 &'a S: StringArrayType<'a>,
174{
175 let ts_primitive = ts_array.as_primitive::<T>();
176 let mut builder = PrimitiveBuilder::<T>::with_capacity(ts_array.len());
177
178 for (ts_opt, tz_opt) in ts_primitive.iter().zip(tz_array.iter()) {
179 match (ts_opt, tz_opt) {
180 (Some(ts), Some(tz_str)) => {
181 let tz: Tz = tz_str.parse().map_err(|e| {
182 exec_datafusion_err!(
183 "`from_utc_timestamp`: invalid timezone '{tz_str}': {e}"
184 )
185 })?;
186 let val = adjust_to_local_time::<T>(ts, tz)?;
187 builder.append_value(val);
188 }
189 _ => builder.append_null(),
190 }
191 }
192
193 builder = builder.with_timezone_opt(return_tz_opt);
194 Ok(Arc::new(builder.finish()))
195}