use std::{io::Write, marker::PhantomData, sync::Arc};
use arrow::{
array::{Array, ArrowPrimitiveType, PrimitiveArray, timezone::Tz},
datatypes::{
TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
},
};
use chrono::{DateTime, Datelike, TimeZone, Timelike};
use log::debug;
use odbc_api::{BindParamDesc, buffers::AnySliceMut, sys::Timestamp};
use super::{WriteStrategy, WriterError, map_arrow_to_odbc::MapArrowToOdbc};
pub fn insert_timestamp_strategy(
is_nullable: bool,
time_unit: &TimeUnit,
time_zone: Option<Arc<str>>,
) -> Result<Box<dyn WriteStrategy>, WriterError> {
let ws = match (time_unit, time_zone) {
(TimeUnit::Second, None) => {
TimestampSecondType::map_with(is_nullable, epoch_to_timestamp_s)
}
(TimeUnit::Millisecond, None) => {
TimestampMillisecondType::map_with(is_nullable, epoch_to_timestamp_ms)
}
(TimeUnit::Microsecond, None) => {
TimestampMicrosecondType::map_with(is_nullable, epoch_to_timestamp_us)
}
(TimeUnit::Nanosecond, None) => TimestampNanosecondType::map_with(is_nullable, |ns| {
epoch_to_timestamp_ns((ns / 100) * 100)
}),
(TimeUnit::Second, Some(tz)) => {
Box::new(TimestampTzToText::<TimestampSecondType>::new(tz)?)
}
(TimeUnit::Millisecond, Some(tz)) => {
Box::new(TimestampTzToText::<TimestampMillisecondType>::new(tz)?)
}
(TimeUnit::Microsecond, Some(tz)) => {
Box::new(TimestampTzToText::<TimestampMicrosecondType>::new(tz)?)
}
(TimeUnit::Nanosecond, Some(tz)) => {
Box::new(TimestampTzToText::<TimestampNanosecondType>::new(tz)?)
}
};
Ok(ws)
}
pub fn epoch_to_timestamp_ns(from: i64) -> Timestamp {
let ndt = DateTime::from_timestamp_nanos(from);
datetime_to_timestamp(ndt)
}
pub fn epoch_to_timestamp_us(from: i64) -> Timestamp {
let ndt =
DateTime::from_timestamp_micros(from).expect("Timestamp must be in range for microseconds");
datetime_to_timestamp(ndt)
}
pub fn epoch_to_timestamp_ms(from: i64) -> Timestamp {
let ndt =
DateTime::from_timestamp_millis(from).expect("Timestamp must be in range for milliseconds");
datetime_to_timestamp(ndt)
}
pub fn epoch_to_timestamp_s(from: i64) -> Timestamp {
let ndt = DateTime::from_timestamp_millis(from * 1_000)
.expect("Timestamp must be in range for milliseconds");
datetime_to_timestamp(ndt)
}
fn datetime_to_timestamp(ndt: DateTime<chrono::Utc>) -> Timestamp {
let date = ndt.date_naive();
let time = ndt.time();
Timestamp {
year: date.year().try_into().unwrap(),
month: date.month().try_into().unwrap(),
day: date.day().try_into().unwrap(),
hour: time.hour().try_into().unwrap(),
minute: time.minute().try_into().unwrap(),
second: time.second().try_into().unwrap(),
fraction: time.nanosecond(),
}
}
pub struct TimestampTzToText<P> {
time_zone: Tz,
_phantom: PhantomData<P>,
}
impl<P> TimestampTzToText<P> {
pub fn new(time_zone: Arc<str>) -> Result<Self, WriterError> {
let tz = time_zone.parse().map_err(|e| {
debug!("Failed to parse time zone '{time_zone}'. Original error: {e}");
WriterError::InvalidTimeZone { time_zone }
})?;
Ok(Self {
time_zone: tz,
_phantom: PhantomData,
})
}
}
impl<P> WriteStrategy for TimestampTzToText<P>
where
P: ArrowPrimitiveType<Native = i64> + InserableAsTimestampWithTimeZone,
{
fn buffer_desc(&self) -> BindParamDesc {
BindParamDesc::text(P::FORMAT_WITH_TIME_ZONE_LEN)
}
fn write_rows(
&self,
param_offset: usize,
column_buf: AnySliceMut<'_>,
array: &dyn Array,
) -> Result<(), WriterError> {
let from = array.as_any().downcast_ref::<PrimitiveArray<P>>().unwrap();
let mut to = column_buf.as_text_view().unwrap();
for (index, timestamp) in from.iter().enumerate() {
if let Some(timestamp) = timestamp {
let dt = P::to_regional_datetime(timestamp, &self.time_zone);
write!(
to.set_mut(index + param_offset, P::FORMAT_WITH_TIME_ZONE_LEN),
"{}",
dt.format(P::FORMAT_STRING),
)
.unwrap();
} else {
to.set_cell(index + param_offset, None)
}
}
Ok(())
}
}
trait InserableAsTimestampWithTimeZone {
const FORMAT_WITH_TIME_ZONE_LEN: usize;
const FORMAT_STRING: &'static str;
fn to_regional_datetime(epoch: i64, time_zone: &Tz) -> DateTime<Tz>;
}
impl InserableAsTimestampWithTimeZone for TimestampSecondType {
const FORMAT_WITH_TIME_ZONE_LEN: usize = 25; const FORMAT_STRING: &'static str = "%Y-%m-%d %H:%M:%S%Z";
fn to_regional_datetime(epoch: i64, time_zone: &Tz) -> DateTime<Tz> {
time_zone
.timestamp_opt(epoch, 0)
.earliest()
.expect("Timestamp must be in range for the timezone")
}
}
impl InserableAsTimestampWithTimeZone for TimestampMillisecondType {
const FORMAT_WITH_TIME_ZONE_LEN: usize = 29; const FORMAT_STRING: &'static str = "%Y-%m-%d %H:%M:%S.%3f%Z";
fn to_regional_datetime(epoch: i64, time_zone: &Tz) -> DateTime<Tz> {
let epoch_sec = epoch / 1_000;
let nano = (epoch % 1_000) * 1_000_000; time_zone
.timestamp_opt(epoch_sec, nano as u32)
.earliest()
.expect("Timestamp must be in range for the timezone")
}
}
impl InserableAsTimestampWithTimeZone for TimestampMicrosecondType {
const FORMAT_WITH_TIME_ZONE_LEN: usize = 32; const FORMAT_STRING: &'static str = "%Y-%m-%d %H:%M:%S.%6f%Z";
fn to_regional_datetime(epoch: i64, time_zone: &Tz) -> DateTime<Tz> {
let epoch_sec = epoch / 1_000_000;
let nano = (epoch % 1_000_000) * 1_000; time_zone
.timestamp_opt(epoch_sec, nano as u32)
.earliest()
.expect("Timestamp must be in range for the timezone")
}
}
impl InserableAsTimestampWithTimeZone for TimestampNanosecondType {
const FORMAT_WITH_TIME_ZONE_LEN: usize = 35; const FORMAT_STRING: &'static str = "%Y-%m-%d %H:%M:%S.%9f%Z";
fn to_regional_datetime(epoch: i64, time_zone: &Tz) -> DateTime<Tz> {
let epoch_sec = epoch / 1_000_000_000;
let nano = epoch % 1_000_000_000; time_zone
.timestamp_opt(epoch_sec, nano as u32)
.earliest()
.expect("Timestamp must be in range for the timezone")
}
}