use std::sync::{Arc, LazyLock};
use arrow::array::timezone::Tz;
use arrow::array::{
Array, ArrowPrimitiveType, AsArray, GenericStringArray, PrimitiveArray,
StringArrayType, StringViewArray,
};
use arrow::compute::DecimalCast;
use arrow::compute::kernels::cast_utils::string_to_datetime;
use arrow::datatypes::{DataType, TimeUnit};
use arrow_buffer::ArrowNativeType;
use chrono::LocalResult::Single;
use chrono::format::{Parsed, StrftimeItems, parse};
use chrono::{DateTime, TimeZone, Utc};
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::{
DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err,
internal_datafusion_err, unwrap_or_internal_err,
};
use datafusion_expr::ColumnarValue;
const ERR_NANOSECONDS_NOT_SUPPORTED: &str = "The dates that can be represented as nanoseconds have to be between 1677-09-21T00:12:44.0 and 2262-04-11T23:47:16.854775804";
static UTC: LazyLock<Tz> = LazyLock::new(|| "UTC".parse().expect("UTC is always valid"));
pub(crate) fn string_to_timestamp_nanos_with_timezone(
timezone: &Option<Tz>,
s: &str,
) -> Result<i64> {
let tz = timezone.as_ref().unwrap_or(&UTC);
let dt = string_to_datetime(tz, s)?;
let parsed = dt
.timestamp_nanos_opt()
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
Ok(parsed)
}
pub(crate) fn validate_data_types(args: &[ColumnarValue], name: &str) -> Result<()> {
for (idx, a) in args.iter().skip(1).enumerate() {
match a.data_type() {
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
}
_ => {
return exec_err!(
"{name} function unsupported data type at index {}: {}",
idx + 1,
a.data_type()
);
}
}
}
Ok(())
}
pub(crate) fn string_to_datetime_formatted<T: TimeZone>(
timezone: &T,
s: &str,
format: &str,
) -> Result<DateTime<T>, DataFusionError> {
let err = |err_ctx: &str| {
exec_datafusion_err!(
"Error parsing timestamp from '{s}' using format '{format}': {err_ctx}"
)
};
let mut datetime_str = s;
let mut format = format;
let tz: Option<chrono_tz::Tz> = if format.trim_end().ends_with(" %Z") {
if let Some((dt_str, timezone_name)) = datetime_str.trim_end().rsplit_once(' ') {
datetime_str = dt_str;
let result: Result<chrono_tz::Tz, chrono_tz::ParseError> =
timezone_name.parse();
let Ok(tz) = result else {
return Err(err(&result.unwrap_err().to_string()));
};
format = &format[..format.len() - 3];
Some(tz)
} else {
None
}
} else if format.contains("%Z") {
return Err(err(
"'%Z' is only supported at the end of the format string preceded by a space",
));
} else {
None
};
let mut parsed = Parsed::new();
parse(&mut parsed, datetime_str, StrftimeItems::new(format))
.map_err(|e| err(&e.to_string()))?;
let dt = match tz {
Some(tz) => {
match parsed.to_datetime_with_timezone(&tz) {
Ok(dt) => Ok(dt.fixed_offset()),
Err(e) => Err(e),
}
}
None => parsed.to_datetime(),
};
if let Err(e) = &dt {
let ndt = parsed
.to_naive_datetime_with_offset(0)
.or_else(|_| parsed.to_naive_date().map(|nd| nd.into()));
if let Err(e) = &ndt {
return Err(err(&e.to_string()));
}
if let Single(e) = &timezone.from_local_datetime(&ndt.unwrap()) {
Ok(e.to_owned())
} else {
Err(err(&e.to_string()))
}
} else {
Ok(dt.unwrap().with_timezone(timezone))
}
}
#[inline]
pub(crate) fn string_to_timestamp_nanos_formatted_with_timezone(
timezone: &Option<Tz>,
s: &str,
format: &str,
) -> Result<i64, DataFusionError> {
let dt = string_to_datetime_formatted(timezone.as_ref().unwrap_or(&UTC), s, format)?;
let parsed = dt
.timestamp_nanos_opt()
.ok_or_else(|| exec_datafusion_err!("{ERR_NANOSECONDS_NOT_SUPPORTED}"))?;
Ok(parsed)
}
#[inline]
pub(crate) fn string_to_timestamp_millis_formatted(s: &str, format: &str) -> Result<i64> {
Ok(string_to_datetime_formatted(&Utc, s, format)?
.naive_utc()
.and_utc()
.timestamp_millis())
}
pub(crate) fn handle<O, F>(
args: &[ColumnarValue],
op: F,
name: &str,
dt: &DataType,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
F: Fn(&str) -> Result<O::Native>,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<&StringViewArray, O, _>(
&a.as_string_view(),
op,
)?,
))),
DataType::LargeUtf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<&GenericStringArray<i64>, O, _>(
&a.as_string::<i64>(),
op,
)?,
))),
DataType::Utf8 => Ok(ColumnarValue::Array(Arc::new(
unary_string_to_primitive_function::<&GenericStringArray<i32>, O, _>(
&a.as_string::<i32>(),
op,
)?,
))),
other => exec_err!("Unsupported data type {other:?} for function {name}"),
},
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(a) => {
let result = a
.as_ref()
.map(|x| op(x))
.transpose()?
.and_then(|v| v.to_i64());
let s = scalar_value(dt, result)?;
Ok(ColumnarValue::Scalar(s))
}
_ => exec_err!("Unsupported data type {scalar:?} for function {name}"),
},
}
}
pub(crate) fn handle_multiple<O, F, M>(
args: &[ColumnarValue],
op: F,
op2: M,
name: &str,
dt: &DataType,
) -> Result<ColumnarValue>
where
O: ArrowPrimitiveType,
F: Fn(&str, &str) -> Result<O::Native>,
M: Fn(O::Native) -> O::Native,
{
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
for (pos, arg) in args.iter().enumerate() {
match arg {
ColumnarValue::Array(arg) => match arg.data_type() {
DataType::Utf8View | DataType::LargeUtf8 | DataType::Utf8 => {
}
other => {
return exec_err!(
"Unsupported data type {other:?} for function {name}, arg # {pos}"
);
}
},
ColumnarValue::Scalar(arg) => {
match arg.data_type() {
DataType::Utf8View
| DataType::LargeUtf8
| DataType::Utf8 => {
}
other => {
return exec_err!(
"Unsupported data type {other:?} for function {name}, arg # {pos}"
);
}
}
}
}
}
Ok(ColumnarValue::Array(Arc::new(
strings_to_primitive_function::<O, _, _>(args, op, op2, name)?,
)))
}
other => {
exec_err!("Unsupported data type {other:?} for function {name}")
}
},
ColumnarValue::Scalar(scalar) => match scalar.try_as_str() {
Some(a) => {
let a = a.as_ref();
let a = unwrap_or_internal_err!(a);
let mut ret = None;
for (pos, v) in args.iter().enumerate().skip(1) {
let ColumnarValue::Scalar(
ScalarValue::Utf8View(x)
| ScalarValue::LargeUtf8(x)
| ScalarValue::Utf8(x),
) = v
else {
return exec_err!(
"Unsupported data type {v:?} for function {name}, arg # {pos}"
);
};
if let Some(s) = x {
match op(a, s.as_str()) {
Ok(r) => {
let result = op2(r).to_i64();
let s = scalar_value(dt, result)?;
ret = Some(Ok(ColumnarValue::Scalar(s)));
break;
}
Err(e) => ret = Some(Err(e)),
}
}
}
unwrap_or_internal_err!(ret)
}
other => {
exec_err!("Unsupported data type {other:?} for function {name}")
}
},
}
}
pub(crate) fn strings_to_primitive_function<O, F, F2>(
args: &[ColumnarValue],
op: F,
op2: F2,
name: &str,
) -> Result<PrimitiveArray<O>>
where
O: ArrowPrimitiveType,
F: Fn(&str, &str) -> Result<O::Native>,
F2: Fn(O::Native) -> O::Native,
{
if args.len() < 2 {
return exec_err!(
"{:?} args were supplied but {} takes 2 or more arguments",
args.len(),
name
);
}
match &args[0] {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View => {
let string_array = a.as_string_view();
handle_array_op::<O, &StringViewArray, F, F2>(
&string_array,
&args[1..],
op,
op2,
)
}
DataType::LargeUtf8 => {
let string_array = as_generic_string_array::<i64>(&a)?;
handle_array_op::<O, &GenericStringArray<i64>, F, F2>(
&string_array,
&args[1..],
op,
op2,
)
}
DataType::Utf8 => {
let string_array = as_generic_string_array::<i32>(&a)?;
handle_array_op::<O, &GenericStringArray<i32>, F, F2>(
&string_array,
&args[1..],
op,
op2,
)
}
other => exec_err!(
"Unsupported data type {other:?} for function substr,\
expected Utf8View, Utf8 or LargeUtf8."
),
},
other => exec_err!(
"Received {} data type, expected only array",
other.data_type()
),
}
}
fn handle_array_op<'a, O, V, F, F2>(
first: &V,
args: &[ColumnarValue],
op: F,
op2: F2,
) -> Result<PrimitiveArray<O>>
where
V: StringArrayType<'a>,
O: ArrowPrimitiveType,
F: Fn(&str, &str) -> Result<O::Native>,
F2: Fn(O::Native) -> O::Native,
{
first
.iter()
.enumerate()
.map(|(pos, x)| {
let mut val = None;
if let Some(x) = x {
for arg in args {
let v = match arg {
ColumnarValue::Array(a) => match a.data_type() {
DataType::Utf8View => Ok(a.as_string_view().value(pos)),
DataType::LargeUtf8 => Ok(a.as_string::<i64>().value(pos)),
DataType::Utf8 => Ok(a.as_string::<i32>().value(pos)),
other => exec_err!("Unexpected type encountered '{other}'"),
},
ColumnarValue::Scalar(s) => match s.try_as_str() {
Some(Some(v)) => Ok(v),
Some(None) => continue, None => exec_err!("Unexpected scalar type encountered '{s}'"),
},
}?;
let r = op(x, v);
if let Ok(inner) = r {
val = Some(Ok(op2(inner)));
break;
} else {
val = Some(r);
}
}
};
val.transpose()
})
.collect()
}
fn unary_string_to_primitive_function<'a, StringArrType, O, F>(
array: &StringArrType,
op: F,
) -> Result<PrimitiveArray<O>>
where
StringArrType: StringArrayType<'a>,
O: ArrowPrimitiveType,
F: Fn(&'a str) -> Result<O::Native>,
{
array.iter().map(|x| x.map(&op).transpose()).collect()
}
fn scalar_value(dt: &DataType, r: Option<i64>) -> Result<ScalarValue> {
match dt {
DataType::Date32 => Ok(ScalarValue::Date32(r.and_then(|v| v.to_i32()))),
DataType::Timestamp(u, tz) => match u {
TimeUnit::Second => Ok(ScalarValue::TimestampSecond(r, tz.clone())),
TimeUnit::Millisecond => Ok(ScalarValue::TimestampMillisecond(r, tz.clone())),
TimeUnit::Microsecond => Ok(ScalarValue::TimestampMicrosecond(r, tz.clone())),
TimeUnit::Nanosecond => Ok(ScalarValue::TimestampNanosecond(r, tz.clone())),
},
t => Err(internal_datafusion_err!("Unsupported data type: {t:?}")),
}
}