use std::any::Any;
use std::sync::Arc;
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
use datafusion::arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondArray};
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{exec_err, Result, ScalarValue};
use datafusion::logical_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
Volatility,
};
use super::string_utils::{scalar_to_str, STRING_TYPES};
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct ParseTimestampUdf {
signature: Signature,
}
impl Default for ParseTimestampUdf {
fn default() -> Self {
Self::new()
}
}
impl ParseTimestampUdf {
pub fn new() -> Self {
let sigs: Vec<TypeSignature> = STRING_TYPES
.iter()
.map(|t| TypeSignature::Exact(vec![t.clone()]))
.collect();
Self {
signature: Signature::new(TypeSignature::OneOf(sigs), Volatility::Immutable),
}
}
}
fn parse_timestamp_micros(s: &str) -> Result<i64> {
if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
return Ok(dt.with_timezone(&Utc).timestamp_micros());
}
let tz_formats = &["%Y-%m-%d %H:%M:%S%z", "%Y-%m-%dT%H:%M:%S%z"];
for fmt in tz_formats {
if let Ok(dt) = DateTime::parse_from_str(s, fmt) {
return Ok(dt.with_timezone(&Utc).timestamp_micros());
}
}
let naive_formats = &[
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S%.f",
"%Y-%m-%dT%H:%M:%S%.f",
"%Y/%m/%d %H:%M:%S",
];
for fmt in naive_formats {
if let Ok(naive) = NaiveDateTime::parse_from_str(s, fmt) {
return Ok(Utc.from_utc_datetime(&naive).timestamp_micros());
}
}
let date_formats = &["%Y-%m-%d", "%Y/%m/%d"];
for fmt in date_formats {
if let Ok(date) = NaiveDate::parse_from_str(s, fmt) {
if let Some(naive) = date.and_hms_opt(0, 0, 0) {
return Ok(Utc.from_utc_datetime(&naive).timestamp_micros());
}
}
}
exec_err!("Failed to parse timestamp from string: {}", s)
}
fn parse_string_array<T>(array: &T) -> ArrayRef
where
T: Array + 'static,
for<'a> &'a T: IntoIterator<Item = Option<&'a str>>,
{
let result: TimestampMicrosecondArray = array
.into_iter()
.map(|opt_str| opt_str.and_then(|s| parse_timestamp_micros(s).ok()))
.collect();
Arc::new(result.with_timezone("+00:00"))
}
impl ScalarUDFImpl for ParseTimestampUdf {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"hamelin_parse_timestamp"
}
fn signature(&self) -> &Signature {
&self.signature
}
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Timestamp(
TimeUnit::Microsecond,
Some("+00:00".into()),
))
}
fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let args = args.args;
if args.len() != 1 {
return exec_err!(
"hamelin_parse_timestamp expects exactly 1 argument, got {}",
args.len()
);
}
match &args[0] {
ColumnarValue::Scalar(scalar) => {
let micros = scalar_to_str(scalar)?.and_then(|s| parse_timestamp_micros(s).ok());
Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
micros,
Some("+00:00".into()),
)))
}
ColumnarValue::Array(array) => {
let result = match array.data_type() {
DataType::Utf8 => parse_string_array(array.as_string::<i32>()),
DataType::LargeUtf8 => parse_string_array(array.as_string::<i64>()),
DataType::Utf8View => parse_string_array(array.as_string_view()),
other => {
return exec_err!(
"hamelin_parse_timestamp expects string array, got {}",
other
)
}
};
Ok(ColumnarValue::Array(result))
}
}
}
}
pub fn parse_timestamp_udf() -> ScalarUDF {
ScalarUDF::from(ParseTimestampUdf::new())
}