use std::marker::PhantomData;
use arrow::datatypes::{ArrowTimestampType, TimeUnit};
use snafu::ensure;
use crate::{
encoding::PrimitiveValueDecoder,
error::{DecodeTimestampSnafu, Result},
};
const NANOSECONDS_IN_SECOND: i64 = 1_000_000_000;
pub struct TimestampDecoder<T: ArrowTimestampType> {
base_from_epoch: i64,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
_marker: PhantomData<T>,
}
impl<T: ArrowTimestampType> TimestampDecoder<T> {
pub fn new(
base_from_epoch: i64,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
) -> Self {
Self {
base_from_epoch,
data,
secondary,
_marker: PhantomData,
}
}
}
impl<T: ArrowTimestampType> PrimitiveValueDecoder<T::Native> for TimestampDecoder<T> {
fn skip(&mut self, n: usize) -> Result<()> {
self.data.skip(n)?;
self.secondary.skip(n)?;
Ok(())
}
fn decode(&mut self, out: &mut [T::Native]) -> Result<()> {
let mut data = vec![0; out.len()];
let mut secondary = vec![0; out.len()];
self.data.decode(&mut data)?;
self.secondary.decode(&mut secondary)?;
for (index, (&seconds_since_orc_base, &nanoseconds)) in
data.iter().zip(secondary.iter()).enumerate()
{
out[index] =
decode_timestamp::<T>(self.base_from_epoch, seconds_since_orc_base, nanoseconds)?;
}
Ok(())
}
}
pub struct TimestampNanosecondAsDecimalDecoder {
base_from_epoch: i64,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
}
impl TimestampNanosecondAsDecimalDecoder {
pub fn new(
base_from_epoch: i64,
data: Box<dyn PrimitiveValueDecoder<i64> + Send>,
secondary: Box<dyn PrimitiveValueDecoder<i64> + Send>,
) -> Self {
Self {
base_from_epoch,
data,
secondary,
}
}
}
impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalDecoder {
fn skip(&mut self, n: usize) -> Result<()> {
self.data.skip(n)?;
self.secondary.skip(n)?;
Ok(())
}
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
let mut data = vec![0; out.len()];
let mut secondary = vec![0; out.len()];
self.data.decode(&mut data)?;
self.secondary.decode(&mut secondary)?;
for (index, (&seconds_since_orc_base, &nanoseconds)) in
data.iter().zip(secondary.iter()).enumerate()
{
out[index] =
decode_timestamp_as_i128(self.base_from_epoch, seconds_since_orc_base, nanoseconds);
}
Ok(())
}
}
fn decode(base: i64, seconds_since_orc_base: i64, nanoseconds: i64) -> (i128, i64, u64) {
let data = seconds_since_orc_base;
let mut nanoseconds = nanoseconds as u64;
let zeros = nanoseconds & 0x7;
nanoseconds >>= 3;
if zeros != 0 {
nanoseconds *= 10_u64.pow(zeros as u32 + 1);
}
let seconds_since_epoch = data + base;
let seconds = if seconds_since_epoch < 0 && nanoseconds > 999_999 {
seconds_since_epoch - 1
} else {
seconds_since_epoch
};
let nanoseconds_since_epoch =
(seconds as i128 * NANOSECONDS_IN_SECOND as i128) + (nanoseconds as i128);
(nanoseconds_since_epoch, seconds, nanoseconds)
}
fn decode_timestamp<T: ArrowTimestampType>(
base: i64,
seconds_since_orc_base: i64,
nanoseconds: i64,
) -> Result<i64> {
let (nanoseconds_since_epoch, seconds, nanoseconds) =
decode(base, seconds_since_orc_base, nanoseconds);
let nanoseconds_in_timeunit = match T::UNIT {
TimeUnit::Second => 1_000_000_000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Microsecond => 1_000,
TimeUnit::Nanosecond => 1,
};
ensure!(
nanoseconds_since_epoch % nanoseconds_in_timeunit == 0,
DecodeTimestampSnafu {
seconds,
nanoseconds,
to_time_unit: T::UNIT,
}
);
let num_since_epoch = (nanoseconds_since_epoch / nanoseconds_in_timeunit)
.try_into()
.or_else(|_| {
DecodeTimestampSnafu {
seconds,
nanoseconds,
to_time_unit: T::UNIT,
}
.fail()
})?;
Ok(num_since_epoch)
}
fn decode_timestamp_as_i128(base: i64, seconds_since_orc_base: i64, nanoseconds: i64) -> i128 {
let (nanoseconds_since_epoch, _, _) = decode(base, seconds_since_orc_base, nanoseconds);
nanoseconds_since_epoch
}