hamelin_datafusion 0.7.5

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! Parse timestamp UDF for DataFusion.
//!
//! Uses chrono to parse timestamps directly to microseconds, avoiding the
//! nanosecond limitation in DataFusion's built-in to_timestamp functions.

use std::any::Any;
use std::sync::Arc;

use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
use datafusion::arrow::array::{Array, ArrayRef, AsArray, TimestampMicrosecondArray};
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{exec_err, Result, ScalarValue};
use datafusion::logical_expr::{
    ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
    Volatility,
};

use super::string_utils::{scalar_to_str, STRING_TYPES};

#[derive(Debug, PartialEq, Eq, Hash)]
pub struct ParseTimestampUdf {
    signature: Signature,
}

impl Default for ParseTimestampUdf {
    fn default() -> Self {
        Self::new()
    }
}

impl ParseTimestampUdf {
    pub fn new() -> Self {
        let sigs: Vec<TypeSignature> = STRING_TYPES
            .iter()
            .map(|t| TypeSignature::Exact(vec![t.clone()]))
            .collect();
        Self {
            signature: Signature::new(TypeSignature::OneOf(sigs), Volatility::Immutable),
        }
    }
}

/// Parse a timestamp string using chrono, returning microseconds since epoch.
fn parse_timestamp_micros(s: &str) -> Result<i64> {
    // First try RFC3339 with timezone
    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
        return Ok(dt.with_timezone(&Utc).timestamp_micros());
    }

    // Try other timezone-aware formats
    let tz_formats = &["%Y-%m-%d %H:%M:%S%z", "%Y-%m-%dT%H:%M:%S%z"];
    for fmt in tz_formats {
        if let Ok(dt) = DateTime::parse_from_str(s, fmt) {
            return Ok(dt.with_timezone(&Utc).timestamp_micros());
        }
    }

    // Try formats without explicit timezone (assume UTC)
    let naive_formats = &[
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%dT%H:%M:%S",
        "%Y-%m-%d %H:%M:%S%.f",
        "%Y-%m-%dT%H:%M:%S%.f",
        "%Y/%m/%d %H:%M:%S",
    ];

    for fmt in naive_formats {
        if let Ok(naive) = NaiveDateTime::parse_from_str(s, fmt) {
            return Ok(Utc.from_utc_datetime(&naive).timestamp_micros());
        }
    }

    // Try date-only formats (NaiveDateTime won't parse these without a time component)
    let date_formats = &["%Y-%m-%d", "%Y/%m/%d"];
    for fmt in date_formats {
        if let Ok(date) = NaiveDate::parse_from_str(s, fmt) {
            if let Some(naive) = date.and_hms_opt(0, 0, 0) {
                return Ok(Utc.from_utc_datetime(&naive).timestamp_micros());
            }
        }
    }

    exec_err!("Failed to parse timestamp from string: {}", s)
}

/// Parse timestamps from any string array type.
fn parse_string_array<T>(array: &T) -> ArrayRef
where
    T: Array + 'static,
    for<'a> &'a T: IntoIterator<Item = Option<&'a str>>,
{
    let result: TimestampMicrosecondArray = array
        .into_iter()
        .map(|opt_str| opt_str.and_then(|s| parse_timestamp_micros(s).ok()))
        .collect();
    Arc::new(result.with_timezone("+00:00"))
}

impl ScalarUDFImpl for ParseTimestampUdf {
    fn as_any(&self) -> &dyn Any {
        self
    }

    fn name(&self) -> &str {
        "hamelin_parse_timestamp"
    }

    fn signature(&self) -> &Signature {
        &self.signature
    }

    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
        Ok(DataType::Timestamp(
            TimeUnit::Microsecond,
            Some("+00:00".into()),
        ))
    }

    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
        let args = args.args;
        if args.len() != 1 {
            return exec_err!(
                "hamelin_parse_timestamp expects exactly 1 argument, got {}",
                args.len()
            );
        }

        match &args[0] {
            ColumnarValue::Scalar(scalar) => {
                let micros = scalar_to_str(scalar)?.and_then(|s| parse_timestamp_micros(s).ok());
                Ok(ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
                    micros,
                    Some("+00:00".into()),
                )))
            }
            ColumnarValue::Array(array) => {
                let result = match array.data_type() {
                    DataType::Utf8 => parse_string_array(array.as_string::<i32>()),
                    DataType::LargeUtf8 => parse_string_array(array.as_string::<i64>()),
                    DataType::Utf8View => parse_string_array(array.as_string_view()),
                    other => {
                        return exec_err!(
                            "hamelin_parse_timestamp expects string array, got {}",
                            other
                        )
                    }
                };
                Ok(ColumnarValue::Array(result))
            }
        }
    }
}

/// Create the parse_timestamp UDF.
pub fn parse_timestamp_udf() -> ScalarUDF {
    ScalarUDF::from(ParseTimestampUdf::new())
}