use arrow_array::{make_array, new_empty_array, new_null_array, Array, ArrayRef, Int64Array};
use arrow_schema::{ArrowError, DataType, TimeUnit};
include!(concat!(env!("OUT_DIR"), "/guessing_bound.rs"));
const LOWER_BOUND_MILLIS: i64 = 86400 * 365 * GUESSING_BOUND_YEARS;
const LOWER_BOUND_MICROS: i64 = 1000 * 86400 * 365 * GUESSING_BOUND_YEARS;
const LOWER_BOUND_NANOS: i64 = 1000 * 1000 * 86400 * 365 * GUESSING_BOUND_YEARS;
#[inline]
const fn guess_precision(timestamp: i64) -> TimeUnit {
let timestamp = timestamp.abs();
if timestamp > LOWER_BOUND_NANOS {
return TimeUnit::Nanosecond;
}
if timestamp > LOWER_BOUND_MICROS {
return TimeUnit::Microsecond;
}
if timestamp > LOWER_BOUND_MILLIS {
return TimeUnit::Millisecond;
}
TimeUnit::Second
}
#[inline]
fn guess_precision_in_array(array: &dyn Array) -> Option<TimeUnit> {
let v = array.as_any().downcast_ref::<Int64Array>().unwrap();
v.into_iter().flatten().next().map(guess_precision)
}
pub fn cast(array: &dyn Array, to_type: &DataType) -> Result<ArrayRef, ArrowError> {
cast_with_options(array, to_type, &CastOptions::default())
}
#[derive(Debug, Clone)]
pub struct TimestampCastOptions {
pub guess_timestamp_precision: bool,
pub use_timezone_as_is: bool,
}
impl Default for TimestampCastOptions {
fn default() -> Self {
Self {
guess_timestamp_precision: true,
use_timezone_as_is: true,
}
}
}
pub struct CastOptions<'a> {
pub safe: bool,
pub timestamp_options: TimestampCastOptions,
pub format_options: arrow_cast::display::FormatOptions<'a>,
}
impl Default for CastOptions<'_> {
fn default() -> Self {
Self::new()
}
}
impl CastOptions<'_> {
pub fn new() -> Self {
Self {
safe: true,
timestamp_options: TimestampCastOptions::default(),
format_options: arrow_cast::display::FormatOptions::default(),
}
}
}
impl<'r, 'a> From<&'r CastOptions<'a>> for arrow_cast::CastOptions<'r> {
fn from(options: &'r CastOptions) -> arrow_cast::CastOptions<'r> {
arrow_cast::CastOptions {
safe: options.safe.clone(),
format_options: options.format_options.clone(),
}
}
}
pub fn cast_with_options(
array: &dyn Array,
to_type: &DataType,
cast_options: &CastOptions,
) -> Result<ArrayRef, ArrowError> {
use DataType::*;
let from_type = array.data_type();
if from_type == to_type {
return Ok(make_array(array.to_data()));
}
if array.len() == 0 {
return Ok(new_empty_array(to_type));
}
if from_type == &Null {
return Ok(new_null_array(to_type, array.len()));
}
match (from_type, to_type) {
(
Int8 | Int16 | Int32 | UInt8 | UInt32 | Float16 | Float32 | UInt16,
Timestamp(unit, tz),
) => {
let tz = if cast_options.timestamp_options.use_timezone_as_is {
tz.clone()
} else {
None
};
let array = arrow_cast::cast(array, &Int64)?;
if cast_options.timestamp_options.guess_timestamp_precision {
let array = arrow_cast::cast(&array, &Timestamp(TimeUnit::Second, tz))?;
return arrow_cast::cast_with_options(&array, to_type, &cast_options.into());
} else {
let array = arrow_cast::cast(&array, &Timestamp(unit.clone(), tz))?;
return arrow_cast::cast_with_options(&array, to_type, &cast_options.into());
}
}
(Binary | FixedSizeBinary(_) | LargeBinary | Utf8 | LargeUtf8, _) => {
let string_to_ts = arrow_cast::cast_with_options(array, to_type, &cast_options.into())?;
if string_to_ts.null_count() == string_to_ts.len() {
if let Ok(array) =
arrow_cast::cast_with_options(array, &Int64, &cast_options.into())
{
if array.null_count() < array.len() {
return cast_with_options(array.as_ref(), to_type, cast_options);
}
}
}
return Ok(string_to_ts);
}
(Int64 | UInt64 | Float64, Timestamp(unit, tz)) => {
let array = arrow_cast::cast(array, &Int64)?;
let tz = if cast_options.timestamp_options.use_timezone_as_is {
tz.clone()
} else {
None
};
if cast_options.timestamp_options.guess_timestamp_precision {
let array = arrow_cast::cast(
&array,
&Timestamp(
guess_precision_in_array(&array).unwrap_or_else(|| unit.clone()),
tz,
),
)?;
return arrow_cast::cast_with_options(&array, to_type, &cast_options.into());
} else {
let array = cast(&array, &Timestamp(unit.clone(), tz))?;
return arrow_cast::cast_with_options(&array, to_type, &cast_options.into());
}
}
_ => arrow_cast::cast_with_options(array, to_type, &cast_options.into()),
}
}
#[cfg(test)]
mod test {
use arrow_array::TimestampNanosecondArray;
use super::*;
#[test]
fn test_int_to_timestamp() {
let data = vec![1701325744956, 1701325744956];
let array = arrow_array::Int64Array::from(data);
let array = crate::cast(
&array,
&arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
)
.unwrap();
let nanos = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
dbg!(nanos);
assert_eq!(nanos.value(0), 1701325744956 * 1000 * 1000);
dbg!(array);
}
#[test]
fn test_string_to_timestamp() {
let string = vec!["1701325744956", "1701325744956"];
let array = arrow_array::StringArray::from(string);
let array = crate::cast(
&array,
&arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
)
.unwrap();
let nanos = array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
dbg!(nanos);
assert_eq!(nanos.value(0), 1701325744956 * 1000 * 1000);
dbg!(array);
}
#[test]
fn test() {
let now = chrono::Utc::now();
let ten_years_ago = now - chrono::Duration::days(365 * 10);
let ten_years_later = now + chrono::Duration::days(365 * 10);
let mut ints = Vec::new();
let mut pres = Vec::new();
for now in [now, ten_years_ago, ten_years_later] {
println!("Timestamp {} in ms: {}", now, now.timestamp_millis());
println!("Timestamp {} in us: {}", now, now.timestamp_micros());
println!(
"Timestamp {} in ns: {}",
now,
now.timestamp_nanos_opt().unwrap()
);
ints.push(now.timestamp());
pres.push(TimeUnit::Second);
ints.push(now.timestamp_millis());
pres.push(TimeUnit::Millisecond);
ints.push(now.timestamp_micros());
pres.push(TimeUnit::Microsecond);
ints.push(now.timestamp_nanos_opt().unwrap());
pres.push(TimeUnit::Nanosecond);
}
ints.push(i32::MAX as _);
pres.push(TimeUnit::Second);
for (i, u) in ints.into_iter().zip(pres.into_iter()) {
println!("Timestamp {} in {:?}", i, guess_precision(i),);
assert_eq!(guess_precision(i), u);
}
}
#[test]
fn bound() {
let zero = chrono::NaiveDateTime::from_timestamp_opt(0, 0).unwrap();
let seconds_upper_bound = zero + std::time::Duration::from_secs(LOWER_BOUND_MILLIS as _);
println!("{:?}", (zero..seconds_upper_bound));
let millis_lower_bound = zero + std::time::Duration::from_millis(LOWER_BOUND_MILLIS as _);
let millis_upper_bound = zero + std::time::Duration::from_millis(LOWER_BOUND_MICROS as _);
println!("{:?}", (millis_lower_bound..millis_upper_bound));
let micros_lower_bound = zero + std::time::Duration::from_micros(LOWER_BOUND_MICROS as _);
let micros_upper_bound = zero + std::time::Duration::from_micros(LOWER_BOUND_NANOS as _);
println!("{:?}", (micros_lower_bound..micros_upper_bound));
let nanos_lower_bound = zero + std::time::Duration::from_nanos(LOWER_BOUND_NANOS as _);
println!("{:?}", (nanos_lower_bound..));
}
#[test]
fn bound_sample() {
let zero = chrono::NaiveDateTime::from_timestamp_opt(0, 0).unwrap();
println!("ARROW_CAST_GUESSING_BOUND_YEARS | Lower Bound | Upper Bound ");
println!("------------------------------- | ------------------- | -------------------");
let width: usize = "ARROW_CAST_GUESSING_BOUND_YEARS".len();
for years in vec![100, 200, 500, 1000, 2000, 5000, 10000].into_iter() {
let lower_bound_millis: i64 = 86400 * 365 * years;
let lower_bound_micros: i64 = 1000 * 86400 * 365 * years;
let lower_bound_nanos: i64 = 1000 * 1000 * 86400 * 365 * years;
let seconds_upper_bound =
zero + std::time::Duration::from_secs(lower_bound_millis as _);
println!("{:width$} | {:?} | {:?}", years, zero, seconds_upper_bound);
let millis_lower_bound =
zero + std::time::Duration::from_millis(lower_bound_millis as _);
let millis_upper_bound =
zero + std::time::Duration::from_millis(lower_bound_micros as _);
println!(
"{:width$} | {:?} | {:?}",
years, millis_lower_bound, millis_upper_bound,
);
let micros_lower_bound =
zero + std::time::Duration::from_micros(lower_bound_micros as _);
let micros_upper_bound =
zero + std::time::Duration::from_micros(lower_bound_nanos as _);
println!(
"{:width$} | {:?} | {:?}",
years, micros_lower_bound, micros_upper_bound,
);
let nanos_lower_bound = zero + std::time::Duration::from_nanos(lower_bound_nanos as _);
println!("{:width$} | {:?} |", years, nanos_lower_bound,);
}
println!("ARROW_CAST_GUESSING_BOUND_YEARS | Lower Bound | Upper Bound ");
println!("------------------------------- | ------------------- | -------------------");
for years in vec![100, 200, 500, 1000, 2000, 5000, 10000].into_iter() {
let lower_bound_millis: i64 = 86400 * 365 * years;
let lower_bound_micros: i64 = 1000 * 86400 * 365 * years;
let millis_lower_bound =
zero + std::time::Duration::from_millis(lower_bound_millis as _);
let millis_upper_bound =
zero + std::time::Duration::from_millis(lower_bound_micros as _);
println!(
"{:width$} | {:?} | {:?}",
years,
millis_lower_bound,
millis_upper_bound,
width = width
);
}
}
}