use chrono::{
format::{parse, Parsed, StrftimeItems},
{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Offset, TimeZone, Timelike, Utc},
};
use regex::Regex;
use std::{str::FromStr, sync::Arc};
use vegafusion_common::arrow::array::{ArrayRef, StringArray, TimestampMillisecondArray};
use vegafusion_common::arrow::datatypes::{DataType, TimeUnit};
use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue};
use vegafusion_common::datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
Volatility,
};
lazy_static! {
pub static ref ALL_STRF_DATETIME_ITEMS: Vec<StrftimeItems<'static>> = vec![
StrftimeItems::new("%Y-%m-%dT%H:%M:%S%.f%:z"),
StrftimeItems::new("%Y-%m-%d %H:%M:%S%.f%:z"),
StrftimeItems::new("%Y/%m/%d %H:%M:%S%.f%:z"),
StrftimeItems::new("%m/%d/%Y %H:%M:%S%.f%:z"),
StrftimeItems::new("%a %b %e %T %Y"),
StrftimeItems::new("%d %b %Y %T"),
StrftimeItems::new("%a, %d %b %Y %T"),
StrftimeItems::new("%B %d, %Y %T"),
];
pub static ref ALL_STRF_DATE_ITEMS: Vec<StrftimeItems<'static>> = vec![
StrftimeItems::new("%B %d, %Y"),
StrftimeItems::new("%d %b %Y"),
];
}
pub fn parse_datetime(
date_str: &str,
default_input_tz: &Option<chrono_tz::Tz>,
) -> Option<DateTime<Utc>> {
for strf_item in &*ALL_STRF_DATETIME_ITEMS {
let mut parsed = Parsed::new();
parse(&mut parsed, date_str, strf_item.clone()).ok();
if let Ok(datetime) = parsed.to_datetime() {
return Some(datetime.with_timezone(&chrono::Utc));
} else if let (Ok(date), Ok(time)) = (parsed.to_naive_date(), parsed.to_naive_time()) {
let datetime = NaiveDateTime::new(date, time);
if date_str.ends_with('Z') {
if let Some(datetime) = chrono::Utc.from_local_datetime(&datetime).earliest() {
return Some(datetime);
}
} else {
let local_tz = (*default_input_tz)?;
let dt = if let Some(dt) = local_tz.from_local_datetime(&datetime).earliest() {
dt
} else {
let datetime = datetime.with_hour(datetime.hour() + 1).unwrap();
local_tz.from_local_datetime(&datetime).earliest()?
};
let dt_utc = dt.with_timezone(&chrono::Utc);
return Some(dt_utc);
}
}
}
if let Ok(date) = NaiveDate::parse_from_str(date_str, r#"%Y-%m-%d"#) {
let datetime = date.and_hms_opt(0, 0, 0).expect("Invalid date");
return Some(chrono::Utc.from_utc_datetime(&datetime));
} else {
for strf_item in &*ALL_STRF_DATE_ITEMS {
let mut parsed = Parsed::new();
parse(&mut parsed, date_str, strf_item.clone()).ok();
if let Ok(date) = parsed.to_naive_date() {
let datetime = date.and_hms_opt(0, 0, 0).expect("Invalid date");
let default_input_tz = (*default_input_tz)?;
let datetime = default_input_tz.from_local_datetime(&datetime).earliest()?;
return Some(datetime.with_timezone(&chrono::Utc));
}
}
}
parse_datetime_fallback(date_str, default_input_tz)
}
pub fn parse_datetime_fallback(
date_str: &str,
default_input_tz: &Option<chrono_tz::Tz>,
) -> Option<DateTime<Utc>> {
let mut date_tokens = vec![String::from(""), String::from(""), String::from("")];
let mut time_tokens = vec![
String::from(""),
String::from(""),
String::from(""),
String::from(""),
];
let mut timezone_tokens = vec![String::from(""), String::from("")];
let mut timezone_sign = ' ';
let mut date_ind = 0;
let mut time_ind = 0;
let mut timezone_ind = 0;
let mut stage = 0;
let mut has_time = false;
let mut date_split = '-';
for c in date_str.trim().chars() {
match stage {
0 => {
if date_ind < 2 && (c == '-' || c == '/' || c == ' ') {
date_split = c;
date_ind += 1;
} else if date_ind == 2 && (c == 'T' || c == ' ') {
stage += 1;
} else if c.is_ascii_alphanumeric() {
date_tokens[date_ind].push(c)
} else {
return None;
}
}
1 => {
if c.is_whitespace() {
continue;
} else if c.is_ascii_digit() {
has_time = true;
time_tokens[time_ind].push(c)
} else if (time_ind < 2 && c == ':') || (time_ind == 2 && c == '.') {
time_ind += 1;
} else if c == '+' || c == '-' {
stage += 1;
timezone_sign = c;
} else if c == 'Z' {
timezone_tokens[0].push('0');
timezone_tokens[1].push('0');
break;
} else {
return None;
}
}
2 => {
if c.is_ascii_digit() {
timezone_tokens[timezone_ind].push(c)
} else if timezone_ind == 0 && c == ':' {
timezone_ind += 1;
} else {
return None;
}
}
_ => return None,
}
}
let year_re = Regex::new(r"\d{4}").unwrap();
let (year, month, day, iso8601_date) = if year_re.is_match(&date_tokens[0]) {
let year: i32 = date_tokens[0].parse().ok()?;
let month: u32 = parse_month_str(&date_tokens[1]).unwrap_or(1);
let day: u32 = date_tokens[2].parse().unwrap_or(1);
(year, month, day, date_split == '-')
} else if year_re.is_match(&date_tokens[2]) {
let year: i32 = date_tokens[2].parse().ok()?;
let month: u32 = parse_month_str(&date_tokens[0]).unwrap_or(1);
let day: u32 = date_tokens[1].parse().ok()?;
(year, month, day, false)
} else {
return None;
};
let hour: u32 = time_tokens[0].parse().unwrap_or(0);
let minute: u32 = time_tokens[1].parse().unwrap_or(0);
let second: u32 = time_tokens[2].parse().unwrap_or(0);
let milliseconds: u32 = if time_tokens[3].is_empty() {
0
} else if time_tokens[3].len() == 3 {
time_tokens[3].parse().ok()?
} else {
return None;
};
let offset = if timezone_tokens[0].is_empty() {
if iso8601_date && !has_time {
FixedOffset::east_opt(0).expect("FixedOffset::east out of bounds")
} else {
let local_tz = (*default_input_tz)?;
let naive_date =
NaiveDate::from_ymd_opt(year, month, day).expect("invalid or out-of-range date");
let naive_time = NaiveTime::from_hms_milli_opt(hour, minute, second, milliseconds)
.expect("invalid or out-of-range date");
let naive_datetime = NaiveDateTime::new(naive_date, naive_time);
local_tz
.offset_from_local_datetime(&naive_datetime)
.single()?
.fix()
}
} else {
let timezone_hours: i32 = timezone_tokens[0].parse().unwrap_or(0);
let timezone_minutes: i32 = timezone_tokens[1].parse().unwrap_or(0);
let time_offset_seconds = timezone_hours * 3600 + timezone_minutes * 60;
if timezone_sign == '-' {
FixedOffset::west_opt(time_offset_seconds).expect("FixedOffset::west out of bounds")
} else {
FixedOffset::east_opt(time_offset_seconds).expect("FixedOffset::east out of bounds")
}
};
let parsed = offset
.with_ymd_and_hms(year, month, day, hour, minute, second)
.earliest()?
.with_nanosecond(milliseconds * 1_000_000)?
.with_timezone(&chrono::Utc);
Some(parsed)
}
fn parse_month_str(month_str: &str) -> Option<u32> {
let month_str = month_str.to_lowercase();
if let Ok(month) = month_str.parse::<u32>() {
Some(month)
} else if month_str.len() > 2 {
if "january"[..month_str.len()] == month_str {
Some(1)
} else if "february"[..month_str.len()] == month_str {
Some(2)
} else if "march"[..month_str.len()] == month_str {
Some(3)
} else if "april"[..month_str.len()] == month_str {
Some(4)
} else if "may"[..month_str.len()] == month_str {
Some(5)
} else if "june"[..month_str.len()] == month_str {
Some(6)
} else if "july"[..month_str.len()] == month_str {
Some(7)
} else if "august"[..month_str.len()] == month_str {
Some(8)
} else if "september"[..month_str.len()] == month_str {
Some(9)
} else if "october"[..month_str.len()] == month_str {
Some(10)
} else if "november"[..month_str.len()] == month_str {
Some(11)
} else if "december"[..month_str.len()] == month_str {
Some(12)
} else {
None
}
} else {
None
}
}
pub fn parse_datetime_to_utc_millis(
date_str: &str,
default_input_tz: &Option<chrono_tz::Tz>,
) -> Option<i64> {
let parsed_utc = parse_datetime(date_str, default_input_tz)?;
Some(parsed_utc.timestamp_millis())
}
pub fn datetime_strs_to_timestamp_millis(
date_strs: &StringArray,
default_input_tz: &Option<chrono_tz::Tz>,
) -> ArrayRef {
let millis_array = TimestampMillisecondArray::from(
date_strs
.iter()
.map(|date_str| -> Option<i64> {
date_str
.and_then(|date_str| parse_datetime_to_utc_millis(date_str, default_input_tz))
})
.collect::<Vec<Option<i64>>>(),
);
Arc::new(millis_array) as ArrayRef
}
fn make_str_to_utc_timestamp_udf() -> ScalarUDF {
let scalar_fn: ScalarFunctionImplementation = Arc::new(move |args: &[ColumnarValue]| {
let str_array = match &args[0] {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
};
let tz_str = if let ColumnarValue::Scalar(default_input_tz) = &args[1] {
default_input_tz.to_string()
} else {
return Err(DataFusionError::Internal(
"Expected default_input_tz to be a scalar".to_string(),
));
};
let tz = chrono_tz::Tz::from_str(&tz_str).map_err(|_err| {
DataFusionError::Internal(format!("Failed to parse {tz_str} as a timezone"))
})?;
let str_array = str_array.as_any().downcast_ref::<StringArray>().unwrap();
let timestamp_array = datetime_strs_to_timestamp_millis(str_array, &Some(tz));
if timestamp_array.len() != 1 {
Ok(ColumnarValue::Array(timestamp_array))
} else {
ScalarValue::try_from_array(×tamp_array, 0).map(ColumnarValue::Scalar)
}
});
let return_type: ReturnTypeFunction =
Arc::new(move |_| Ok(Arc::new(DataType::Timestamp(TimeUnit::Millisecond, None))));
let signature: Signature =
Signature::exact(vec![DataType::Utf8, DataType::Utf8], Volatility::Immutable);
ScalarUDF::new("str_to_utc_timestamp", &signature, &return_type, &scalar_fn)
}
lazy_static! {
pub static ref STR_TO_UTC_TIMESTAMP_UDF: ScalarUDF = make_str_to_utc_timestamp_udf();
}
#[test]
fn test_parse_datetime() {
let local_tz = Some(chrono_tz::Tz::America__New_York);
let utc = Some(chrono_tz::Tz::UTC);
let res = parse_datetime("2020-05-16T09:30:00+05:00", &utc).unwrap();
let utc_res = res.with_timezone(&Utc);
println!("res: {res}");
println!("utc_res: {utc_res}");
let res = parse_datetime("2020-05-16T09:30:00", &utc).unwrap();
let utc_res = res.with_timezone(&Utc);
println!("res: {res}");
println!("utc_res: {utc_res}");
let res = parse_datetime("2020-05-16T09:30:00", &local_tz).unwrap();
let utc_res = res.with_timezone(&Utc);
println!("res: {res}");
println!("utc_res: {utc_res}");
let res = parse_datetime("2001/02/05 06:20", &local_tz).unwrap();
let utc_res = res.with_timezone(&Utc);
println!("res: {res}");
println!("utc_res: {utc_res}");
let res = parse_datetime("2001/02/05 06:20", &utc).unwrap();
let utc_res = res.with_timezone(&Utc);
println!("res: {res}");
println!("utc_res: {utc_res}");
let res = parse_datetime("2000-01-01T08:00:00.000Z", &utc).unwrap();
let utc_res = res.with_timezone(&Utc);
println!("res: {res}");
println!("utc_res: {utc_res}");
}