1use std::sync::Arc;
19
20use arrow::array::{
21 Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
22 StringArrayType, StringViewArray,
23};
24use arrow::compute::kernels::cast_utils::string_to_timestamp_nanos;
25use arrow::datatypes::DataType;
26use chrono::format::{parse, Parsed, StrftimeItems};
27use chrono::LocalResult::Single;
28use chrono::{DateTime, TimeZone, Utc};
29
30use datafusion_common::cast::as_generic_string_array;
31use datafusion_common::{
32 exec_datafusion_err, exec_err, unwrap_or_internal_err, DataFusionError, Result,
33 ScalarType, ScalarValue,
34};
35use datafusion_expr::ColumnarValue;
36
37const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
39
40pub(crate) fn string_to_timestamp_nanos_shim(s: &str) -> Result<i64> {
42 string_to_timestamp_nanos(s).map_err(|e| e.into())
43}
44
45pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
51 for (idx, a) in args.iter().skip(1).enumerate() {
52 match a.data_type() {
53 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
54 }
56 _ => {
57 return exec_err!(
58 "{name} function unsupported data type at index {}: {}",
59 idx + 1,
60 a.data_type()
61 );
62 }
63 }
64 }
65
66 Ok(())
67}
68
69pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
82 timezone: &T,
83 s: &str,
84 format: &str,
85) -> Result<DateTime<T>, DataFusionError> {
86 let err = |err_ctx: &str| {
87 exec_datafusion_err!(
88 "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
89 )
90 };
91
92 let mut parsed = Parsed::new();
93 parse(&mut parsed, s, StrftimeItems::new(format)).map_err(|e| err(&e.to_string()))?;
94
95 let dt = parsed.to_datetime();
97
98 if let Err(e) = &dt {
99 let ndt = parsed
101 .to_naive_datetime_with_offset(0)
102 .or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
103 if let Err(e) = &ndt {
104 return Err(err(&e.to_string()));
105 }
106
107 if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
108 Ok(e.to_owned())
109 } else {
110 Err(err(&e.to_string()))
111 }
112 } else {
113 Ok(dt.unwrap().with_timezone(timezone))
114 }
115}
116
117#[inline]
144pub(crate) fn string_to_timestamp_nanos_formatted(
145 s: &str,
146 format: &str,
147) -> Result<i64, DataFusionError> {
148 string_to_datetime_formatted(&Utc, s, format)?
149 .naive_utc()
150 .and_utc()
151 .timestamp_nanos_opt()
152 .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))
153}
154
155#[inline]
172pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
173 Ok(string_to_datetime_formatted(&Utc, s, format)?
174 .naive_utc()
175 .and_utc()
176 .timestamp_millis())
177}
178
179pub(crate) fn handle<O, F, S>(
180 args: &[ColumnarValue],
181 op: F,
182 name: &str,
183) -> Result<ColumnarValue>
184where
185 O: ArrowPrimitiveType,
186 S: ScalarType<O::Native>,
187 F: Fn(&str) -> Result<O::Native>,
188{
189 match &args[0] {
190 ColumnarValue::Array(a) => match a.data_type() {
191 DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
192 unary_string_to_primitive_function::<&StringViewArray, O, _>(
193 a.as_ref().as_string_view(),
194 op,
195 )?,
196 ))),
197 DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
198 unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
199 a.as_ref().as_string::<i64>(),
200 op,
201 )?,
202 ))),
203 DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
204 unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
205 a.as_ref().as_string::<i32>(),
206 op,
207 )?,
208 ))),
209 other => exec_err!("Unsupported data type {other:?} for function {name}"),
210 },
211 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
212 Some(a) => {
213 let result = a.as_ref().map(|x| op(x)).transpose()?;
214 Ok(ColumnarValue::Scalar(S::scalar(result)))
215 }
216 _ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
217 },
218 }
219}
220
221pub(crate) fn handle_multiple<O, F, S, M>(
225 args: &[ColumnarValue],
226 op: F,
227 op2: M,
228 name: &str,
229) -> Result<ColumnarValue>
230where
231 O: ArrowPrimitiveType,
232 S: ScalarType<O::Native>,
233 F: Fn(&str, &str) -> Result<O::Native>,
234 M: Fn(O::Native) -> O::Native,
235{
236 match &args[0] {
237 ColumnarValue::Array(a) => match a.data_type() {
238 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
239 for (pos, arg) in args.iter().enumerate() {
241 match arg {
242 ColumnarValue::Array(arg) => match arg.data_type() {
243 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
244 }
246 other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
247 },
248 ColumnarValue::Scalar(arg) => {
249 match arg.data_type() {
250 DataType::Utf8View| DataType::LargeUtf8 | DataType::Utf8 => {
251 }
253 other => return exec_err!("Unsupported data type {other:?} for function {name}, arg # {pos}"),
254 }
255 }
256 }
257 }
258
259 Ok(ColumnarValue::Array(Arc::new(
260 strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
261 )))
262 }
263 other => {
264 exec_err!("Unsupported data type {other:?} for function {name}")
265 }
266 },
267 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
269 Some(a) => {
270 let a = a.as_ref();
271 let a = unwrap_or_internal_err!(a);
273
274 let mut ret = None;
275
276 for (pos, v) in args.iter().enumerate().skip(1) {
277 let ColumnarValue::Scalar(
278 ScalarValue::Utf8View(x)
279 | ScalarValue::LargeUtf8(x)
280 | ScalarValue::Utf8(x),
281 ) = v
282 else {
283 return exec_err!("Unsupported data type {v:?} for function {name}, arg # {pos}");
284 };
285
286 if let Some(s) = x {
287 match op(a, s.as_str()) {
288 Ok(r) => {
289 ret = Some(Ok(ColumnarValue::Scalar(S::scalar(Some(
290 op2(r),
291 )))));
292 break;
293 }
294 Err(e) => ret = Some(Err(e)),
295 }
296 }
297 }
298
299 unwrap_or_internal_err!(ret)
300 }
301 other => {
302 exec_err!("Unsupported data type {other:?} for function {name}")
303 }
304 },
305 }
306}
307
308pub(crate) fn strings_to_primitive_function<O, F, F2>(
319 args: &[ColumnarValue],
320 op: F,
321 op2: F2,
322 name: &str,
323) -> Result<PrimitiveArray<O>>
324where
325 O: ArrowPrimitiveType,
326 F: Fn(&str, &str) -> Result<O::Native>,
327 F2: Fn(O::Native) -> O::Native,
328{
329 if args.len() < 2 {
330 return exec_err!(
331 "{:?} args were supplied but {} takes 2 or more arguments",
332 args.len(),
333 name
334 );
335 }
336
337 match &args[0] {
338 ColumnarValue::Array(a) => match a.data_type() {
339 DataType::Utf8View => {
340 let string_array = a.as_string_view();
341 handle_array_op::<O, &StringViewArray, F, F2>(
342 &string_array,
343 &args[1..],
344 op,
345 op2,
346 )
347 }
348 DataType::LargeUtf8 => {
349 let string_array = as_generic_string_array::<i64>(&a)?;
350 handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
351 &string_array,
352 &args[1..],
353 op,
354 op2,
355 )
356 }
357 DataType::Utf8 => {
358 let string_array = as_generic_string_array::<i32>(&a)?;
359 handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
360 &string_array,
361 &args[1..],
362 op,
363 op2,
364 )
365 }
366 other => exec_err!(
367 "Unsupported data type {other:?} for function substr,\
368 expected Utf8View, Utf8 or LargeUtf8."
369 ),
370 },
371 other => exec_err!(
372 "Received {} data type, expected only array",
373 other.data_type()
374 ),
375 }
376}
377
378fn handle_array_op<'a, O, V, F, F2>(
379 first: &V,
380 args: &[ColumnarValue],
381 op: F,
382 op2: F2,
383) -> Result<PrimitiveArray<O>>
384where
385 V: StringArrayType<'a>,
386 O: ArrowPrimitiveType,
387 F: Fn(&str, &str) -> Result<O::Native>,
388 F2: Fn(O::Native) -> O::Native,
389{
390 first
391 .iter()
392 .enumerate()
393 .map(|(pos, x)| {
394 let mut val = None;
395 if let Some(x) = x {
396 for arg in args {
397 let v = match arg {
398 ColumnarValue::Array(a) => match a.data_type() {
399 DataType::Utf8View => Ok(a.as_string_view().value(pos)),
400 DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
401 DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
402 other => exec_err!("Unexpected type encountered '{other}'"),
403 },
404 ColumnarValue::Scalar(s) => match s.try_as_str() {
405 Some(Some(v)) => Ok(v),
406 Some(None) => continue, None => exec_err!("Unexpected scalar type encountered '{s}'"),
408 },
409 }?;
410
411 let r = op(x, v);
412 if let Ok(inner) = r {
413 val = Some(Ok(op2(inner)));
414 break;
415 } else {
416 val = Some(r);
417 }
418 }
419 };
420
421 val.transpose()
422 })
423 .collect()
424}
425
426fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
434 array: StringArrType,
435 op: F,
436) -> Result<PrimitiveArray<O>>
437where
438 StringArrType: StringArrayType<'a>,
439 O: ArrowPrimitiveType,
440 F: Fn(&'a str) -> Result<O::Native>,
441{
442 array.iter().map(|x| x.map(&op).transpose()).collect()
444}