1use std::sync::{Arc, LazyLock};
19
20use arrow::array::timezone::Tz;
21use arrow::array::{
22 Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
23 StringArrayType, StringViewArray,
24};
25use arrow::compute::DecimalCast;
26use arrow::compute::kernels::cast_utils::string_to_datetime;
27use arrow::datatypes::{DataType, TimeUnit};
28use arrow_buffer::ArrowNativeType;
29use chrono::LocalResult::Single;
30use chrono::format::{Parsed, StrftimeItems, parse};
31use chrono::{DateTime, TimeZone, Utc};
32use datafusion_common::cast::as_generic_string_array;
33use datafusion_common::{
34 DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
35 internal_datafusion_err, unwrap_or_internal_err,
36};
37use datafusion_expr::ColumnarValue;
38
39const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
41
42static UTC: LazyLock<Tz> = LazyLock::new(|| "UTC".parse().expect("UTC is always valid"));
43
44pub(crate) fn string_to_timestamp_nanos_with_timezone(
59 timezone: &Option<Tz>,
60 s: &str,
61) -> Result<i64> {
62 let tz = timezone.as_ref().unwrap_or(&UTC);
63 let dt = string_to_datetime(tz, s)?;
64 let parsed = dt
65 .timestamp_nanos_opt()
66 .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
67
68 Ok(parsed)
69}
70
71pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
77 for (idx, a) in args.iter().skip(1).enumerate() {
78 match a.data_type() {
79 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
80 }
82 _ => {
83 return exec_err!(
84 "{name} function unsupported data type at index {}: {}",
85 idx + 1,
86 a.data_type()
87 );
88 }
89 }
90 }
91
92 Ok(())
93}
94
95pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
107 timezone: &T,
108 s: &str,
109 format: &str,
110) -> Result<DateTime<T>, DataFusionError> {
111 let err = |err_ctx: &str| {
112 exec_datafusion_err!(
113 "Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
114 )
115 };
116
117 let mut datetime_str = s;
118 let mut format = format;
119
120 let tz: Option<chrono_tz::Tz> = if format.trim_end().ends_with(" %Z") {
125 if let Some((dt_str, timezone_name)) = datetime_str.trim_end().rsplit_once(' ') {
127 datetime_str = dt_str;
128
129 let result: Result<chrono_tz::Tz, chrono_tz::ParseError> =
131 timezone_name.parse();
132 let Ok(tz) = result else {
133 return Err(err(&result.unwrap_err().to_string()));
134 };
135
136 format = &format[..format.len() - 3];
138
139 Some(tz)
140 } else {
141 None
142 }
143 } else if format.contains("%Z") {
144 return Err(err(
145 "'%Z' is only supported at the end of the format string preceded by a space",
146 ));
147 } else {
148 None
149 };
150
151 let mut parsed = Parsed::new();
152 parse(&mut parsed, datetime_str, StrftimeItems::new(format))
153 .map_err(|e| err(&e.to_string()))?;
154
155 let dt = match tz {
156 Some(tz) => {
157 match parsed.to_datetime_with_timezone(&tz) {
159 Ok(dt) => Ok(dt.fixed_offset()),
160 Err(e) => Err(e),
161 }
162 }
163 None => parsed.to_datetime(),
165 };
166
167 if let Err(e) = &dt {
168 let ndt = parsed
170 .to_naive_datetime_with_offset(0)
171 .or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
172 if let Err(e) = &ndt {
173 return Err(err(&e.to_string()));
174 }
175
176 if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
177 Ok(e.to_owned())
178 } else {
179 Err(err(&e.to_string()))
180 }
181 } else {
182 Ok(dt.unwrap().with_timezone(timezone))
183 }
184}
185
186#[inline]
213pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
214 timezone: &Option<Tz>,
215 s: &str,
216 format: &str,
217) -> Result<i64, DataFusionError> {
218 let dt = string_to_datetime_formatted(timezone.as_ref().unwrap_or(&UTC), s, format)?;
219 let parsed = dt
220 .timestamp_nanos_opt()
221 .ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
222
223 Ok(parsed)
224}
225
226#[inline]
243pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
244 Ok(string_to_datetime_formatted(&Utc, s, format)?
245 .naive_utc()
246 .and_utc()
247 .timestamp_millis())
248}
249
250pub(crate) fn handle<O, F>(
251 args: &[ColumnarValue],
252 op: F,
253 name: &str,
254 dt: &DataType,
255) -> Result<ColumnarValue>
256where
257 O: ArrowPrimitiveType,
258 F: Fn(&str) -> Result<O::Native>,
259{
260 match &args[0] {
261 ColumnarValue::Array(a) => match a.data_type() {
262 DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
263 unary_string_to_primitive_function::<&StringViewArray, O, _>(
264 &a.as_string_view(),
265 op,
266 )?,
267 ))),
268 DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
269 unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
270 &a.as_string::<i64>(),
271 op,
272 )?,
273 ))),
274 DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
275 unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
276 &a.as_string::<i32>(),
277 op,
278 )?,
279 ))),
280 other => exec_err!("Unsupported data type {other:?} for function {name}"),
281 },
282 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
283 Some(a) => {
284 let result = a
285 .as_ref()
286 .map(|x| op(x))
287 .transpose()?
288 .and_then(|v| v.to_i64());
289 let s = scalar_value(dt, result)?;
290 Ok(ColumnarValue::Scalar(s))
291 }
292 _ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
293 },
294 }
295}
296
297pub(crate) fn handle_multiple<O, F, M>(
301 args: &[ColumnarValue],
302 op: F,
303 op2: M,
304 name: &str,
305 dt: &DataType,
306) -> Result<ColumnarValue>
307where
308 O: ArrowPrimitiveType,
309 F: Fn(&str, &str) -> Result<O::Native>,
310 M: Fn(O::Native) -> O::Native,
311{
312 match &args[0] {
313 ColumnarValue::Array(a) => match a.data_type() {
314 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
315 for (pos, arg) in args.iter().enumerate() {
317 match arg {
318 ColumnarValue::Array(arg) => match arg.data_type() {
319 DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
320 }
322 other => {
323 return exec_err!(
324 "Unsupported data type {other:?} for function {name}, arg # {pos}"
325 );
326 }
327 },
328 ColumnarValue::Scalar(arg) => {
329 match arg.data_type() {
330 DataType::Utf8View
331 | DataType::LargeUtf8
332 | DataType::Utf8 => {
333 }
335 other => {
336 return exec_err!(
337 "Unsupported data type {other:?} for function {name}, arg # {pos}"
338 );
339 }
340 }
341 }
342 }
343 }
344
345 Ok(ColumnarValue::Array(Arc::new(
346 strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
347 )))
348 }
349 other => {
350 exec_err!("Unsupported data type {other:?} for function {name}")
351 }
352 },
353 ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
355 Some(a) => {
356 let a = a.as_ref();
357 let a = unwrap_or_internal_err!(a);
359
360 let mut ret = None;
361
362 for (pos, v) in args.iter().enumerate().skip(1) {
363 let ColumnarValue::Scalar(
364 ScalarValue::Utf8View(x)
365 | ScalarValue::LargeUtf8(x)
366 | ScalarValue::Utf8(x),
367 ) = v
368 else {
369 return exec_err!(
370 "Unsupported data type {v:?} for function {name}, arg # {pos}"
371 );
372 };
373
374 if let Some(s) = x {
375 match op(a, s.as_str()) {
376 Ok(r) => {
377 let result = op2(r).to_i64();
378 let s = scalar_value(dt, result)?;
379 ret = Some(Ok(ColumnarValue::Scalar(s)));
380 break;
381 }
382 Err(e) => ret = Some(Err(e)),
383 }
384 }
385 }
386
387 unwrap_or_internal_err!(ret)
388 }
389 other => {
390 exec_err!("Unsupported data type {other:?} for function {name}")
391 }
392 },
393 }
394}
395
396pub(crate) fn strings_to_primitive_function<O, F, F2>(
407 args: &[ColumnarValue],
408 op: F,
409 op2: F2,
410 name: &str,
411) -> Result<PrimitiveArray<O>>
412where
413 O: ArrowPrimitiveType,
414 F: Fn(&str, &str) -> Result<O::Native>,
415 F2: Fn(O::Native) -> O::Native,
416{
417 if args.len() < 2 {
418 return exec_err!(
419 "{:?} args were supplied but {} takes 2 or more arguments",
420 args.len(),
421 name
422 );
423 }
424
425 match &args[0] {
426 ColumnarValue::Array(a) => match a.data_type() {
427 DataType::Utf8View => {
428 let string_array = a.as_string_view();
429 handle_array_op::<O, &StringViewArray, F, F2>(
430 &string_array,
431 &args[1..],
432 op,
433 op2,
434 )
435 }
436 DataType::LargeUtf8 => {
437 let string_array = as_generic_string_array::<i64>(&a)?;
438 handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
439 &string_array,
440 &args[1..],
441 op,
442 op2,
443 )
444 }
445 DataType::Utf8 => {
446 let string_array = as_generic_string_array::<i32>(&a)?;
447 handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
448 &string_array,
449 &args[1..],
450 op,
451 op2,
452 )
453 }
454 other => exec_err!(
455 "Unsupported data type {other:?} for function substr,\
456 expected Utf8View, Utf8 or LargeUtf8."
457 ),
458 },
459 other => exec_err!(
460 "Received {} data type, expected only array",
461 other.data_type()
462 ),
463 }
464}
465
466fn handle_array_op<'a, O, V, F, F2>(
467 first: &V,
468 args: &[ColumnarValue],
469 op: F,
470 op2: F2,
471) -> Result<PrimitiveArray<O>>
472where
473 V: StringArrayType<'a>,
474 O: ArrowPrimitiveType,
475 F: Fn(&str, &str) -> Result<O::Native>,
476 F2: Fn(O::Native) -> O::Native,
477{
478 first
479 .iter()
480 .enumerate()
481 .map(|(pos, x)| {
482 let mut val = None;
483 if let Some(x) = x {
484 for arg in args {
485 let v = match arg {
486 ColumnarValue::Array(a) => match a.data_type() {
487 DataType::Utf8View => Ok(a.as_string_view().value(pos)),
488 DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
489 DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
490 other => exec_err!("Unexpected type encountered '{other}'"),
491 },
492 ColumnarValue::Scalar(s) => match s.try_as_str() {
493 Some(Some(v)) => Ok(v),
494 Some(None) => continue, None => exec_err!("Unexpected scalar type encountered '{s}'"),
496 },
497 }?;
498
499 let r = op(x, v);
500 if let Ok(inner) = r {
501 val = Some(Ok(op2(inner)));
502 break;
503 } else {
504 val = Some(r);
505 }
506 }
507 };
508
509 val.transpose()
510 })
511 .collect()
512}
513
514fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
522 array: &StringArrType,
523 op: F,
524) -> Result<PrimitiveArray<O>>
525where
526 StringArrType: StringArrayType<'a>,
527 O: ArrowPrimitiveType,
528 F: Fn(&'a str) -> Result<O::Native>,
529{
530 array.iter().map(|x| x.map(&op).transpose()).collect()
532}
533
534fn scalar_value(dt: &DataType, r: Option<i64>) -> Result<ScalarValue> {
535 match dt {
536 DataType::Date32 => Ok(ScalarValue::Date32(r.and_then(|v| v.to_i32()))),
537 DataType::Timestamp(u, tz) => match u {
538 TimeUnit::Second => Ok(ScalarValue::TimestampSecond(r, tz.clone())),
539 TimeUnit::Millisecond => Ok(ScalarValue::TimestampMillisecond(r, tz.clone())),
540 TimeUnit::Microsecond => Ok(ScalarValue::TimestampMicrosecond(r, tz.clone())),
541 TimeUnit::Nanosecond => Ok(ScalarValue::TimestampNanosecond(r, tz.clone())),
542 },
543 t => Err(internal_datafusion_err!("Unsupported data type: {t:?}")),
544 }
545}