use std::sync::Arc;
use crate::{
array_decoder::ArrowDataType,
column::Column,
encoding::{
integer::{get_signed_int_decoder, get_unsigned_int_decoder},
timestamp::{TimestampDecoder, TimestampNanosecondAsDecimalDecoder},
PrimitiveValueDecoder,
},
error::{MismatchedSchemaSnafu, Result},
proto::stream::Kind,
stripe::Stripe,
};
use arrow::datatypes::{
ArrowTimestampType, Decimal128Type, DecimalType, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
};
use arrow::{array::ArrayRef, buffer::NullBuffer};
use chrono::offset::TimeZone;
use chrono::TimeDelta;
use chrono_tz::{Tz, UTC};
use super::{
decimal::DecimalArrayDecoder, ArrayBatchDecoder, PresentDecoder, PrimitiveArrayDecoder,
};
use crate::error::UnsupportedTypeVariantSnafu;
const NANOSECONDS_IN_SECOND: i128 = 1_000_000_000;
const NANOSECOND_DIGITS: i8 = 9;
const ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH: i64 = 1_420_070_400;
fn get_inner_timestamp_decoder<T: ArrowTimestampType + Send>(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
) -> PrimitiveArrayDecoder<T> {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_signed_int_decoder(data, column.rle_version());
let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_unsigned_int_decoder(secondary, column.rle_version());
let present = PresentDecoder::from_stripe(stripe, column);
let iter = Box::new(TimestampDecoder::<T>::new(
seconds_since_unix_epoch,
data,
secondary,
));
PrimitiveArrayDecoder::<T>::new(iter, present)
}
fn get_timestamp_decoder<T: ArrowTimestampType + Send>(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
) -> Box<dyn ArrayBatchDecoder> {
let inner = get_inner_timestamp_decoder::<T>(column, stripe, seconds_since_unix_epoch);
match stripe.writer_tz() {
Some(writer_tz) => Box::new(TimestampOffsetArrayDecoder { inner, writer_tz }),
None => Box::new(inner),
}
}
fn get_timestamp_instant_decoder<T: ArrowTimestampType + Send>(
column: &Column,
stripe: &Stripe,
) -> Box<dyn ArrayBatchDecoder> {
let inner =
get_inner_timestamp_decoder::<T>(column, stripe, ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH);
Box::new(TimestampInstantArrayDecoder(inner))
}
fn decimal128_decoder(
column: &Column,
stripe: &Stripe,
seconds_since_unix_epoch: i64,
writer_tz: Option<Tz>,
) -> DecimalArrayDecoder {
let data = stripe.stream_map().get(column, Kind::Data);
let data = get_signed_int_decoder(data, column.rle_version());
let secondary = stripe.stream_map().get(column, Kind::Secondary);
let secondary = get_unsigned_int_decoder(secondary, column.rle_version());
let present = PresentDecoder::from_stripe(stripe, column);
let iter = TimestampNanosecondAsDecimalDecoder::new(seconds_since_unix_epoch, data, secondary);
let iter: Box<dyn PrimitiveValueDecoder<i128> + Send> = match writer_tz {
Some(UTC) | None => Box::new(iter),
Some(writer_tz) => Box::new(TimestampNanosecondAsDecimalWithTzDecoder(iter, writer_tz)),
};
DecimalArrayDecoder::new(
Decimal128Type::MAX_PRECISION,
NANOSECOND_DIGITS,
iter,
present,
)
}
pub fn new_timestamp_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
let seconds_since_unix_epoch = match stripe.writer_tz() {
Some(writer_tz) => {
writer_tz
.with_ymd_and_hms(2015, 1, 1, 0, 0, 0)
.unwrap()
.timestamp()
}
None => {
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH
}
};
match field_type {
ArrowDataType::Timestamp(TimeUnit::Second, None) => {
Ok(get_timestamp_decoder::<TimestampSecondType>(
column,
stripe,
seconds_since_unix_epoch,
))
}
ArrowDataType::Timestamp(TimeUnit::Millisecond, None) => {
Ok(get_timestamp_decoder::<TimestampMillisecondType>(
column,
stripe,
seconds_since_unix_epoch,
))
}
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => {
Ok(get_timestamp_decoder::<TimestampMicrosecondType>(
column,
stripe,
seconds_since_unix_epoch,
))
}
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None) => {
Ok(get_timestamp_decoder::<TimestampNanosecondType>(
column,
stripe,
seconds_since_unix_epoch,
))
}
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
stripe,
seconds_since_unix_epoch,
stripe.writer_tz(),
)))
}
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail(),
}
}
pub fn new_timestamp_instant_decoder(
column: &Column,
field_type: ArrowDataType,
stripe: &Stripe,
) -> Result<Box<dyn ArrayBatchDecoder>> {
match field_type {
ArrowDataType::Timestamp(TimeUnit::Second, Some(tz)) if tz.as_ref() == "UTC" => Ok(
get_timestamp_instant_decoder::<TimestampSecondType>(column, stripe),
),
ArrowDataType::Timestamp(TimeUnit::Millisecond, Some(tz)) if tz.as_ref() == "UTC" => Ok(
get_timestamp_instant_decoder::<TimestampMillisecondType>(column, stripe),
),
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC" => Ok(
get_timestamp_instant_decoder::<TimestampMicrosecondType>(column, stripe),
),
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some(tz)) if tz.as_ref() == "UTC" => Ok(
get_timestamp_instant_decoder::<TimestampNanosecondType>(column, stripe),
),
ArrowDataType::Timestamp(_, Some(_)) => UnsupportedTypeVariantSnafu {
msg: "Non-UTC Arrow timestamps",
}
.fail(),
ArrowDataType::Decimal128(Decimal128Type::MAX_PRECISION, NANOSECOND_DIGITS) => {
Ok(Box::new(decimal128_decoder(
column,
stripe,
ORC_EPOCH_UTC_SECONDS_SINCE_UNIX_EPOCH,
None,
)))
}
_ => MismatchedSchemaSnafu {
orc_type: column.data_type().clone(),
arrow_type: field_type,
}
.fail()?,
}
}
struct TimestampOffsetArrayDecoder<T: ArrowTimestampType> {
inner: PrimitiveArrayDecoder<T>,
writer_tz: chrono_tz::Tz,
}
impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampOffsetArrayDecoder<T> {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let array = self
.inner
.next_primitive_batch(batch_size, parent_present)?;
let convert_timezone = |ts| {
let microseconds_in_timeunit = match T::UNIT {
TimeUnit::Second => 1_000_000,
TimeUnit::Millisecond => 1_000,
TimeUnit::Microsecond => 1,
TimeUnit::Nanosecond => -1, };
match T::UNIT {
TimeUnit::Second | TimeUnit::Millisecond | TimeUnit::Microsecond => self
.writer_tz
.timestamp_micros(ts * microseconds_in_timeunit)
.single()
.map(|dt| {
dt.naive_local().and_utc().timestamp_micros() / microseconds_in_timeunit
}),
TimeUnit::Nanosecond => self
.writer_tz
.timestamp_nanos(ts)
.naive_local()
.and_utc()
.timestamp_nanos_opt(),
}
};
let array = array
.try_unary::<_, T, _>(|ts| convert_timezone(ts).ok_or(()))
.unwrap_or_else(|()| array.unary_opt::<_, T>(convert_timezone));
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.inner.skip_values(n, parent_present)
}
}
struct TimestampInstantArrayDecoder<T: ArrowTimestampType>(PrimitiveArrayDecoder<T>);
impl<T: ArrowTimestampType> ArrayBatchDecoder for TimestampInstantArrayDecoder<T> {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let array = self
.0
.next_primitive_batch(batch_size, parent_present)?
.with_timezone("UTC");
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.0.skip_values(n, parent_present)
}
}
struct TimestampNanosecondAsDecimalWithTzDecoder(TimestampNanosecondAsDecimalDecoder, Tz);
impl TimestampNanosecondAsDecimalWithTzDecoder {
fn next_inner(&self, ts: i128) -> i128 {
let seconds = ts.div_euclid(NANOSECONDS_IN_SECOND);
let nanoseconds = ts.rem_euclid(NANOSECONDS_IN_SECOND);
let dt = (self.1.timestamp_nanos(0)
+ TimeDelta::new(seconds as i64, nanoseconds as u32)
.expect("TimeDelta duration out of bound"))
.naive_local()
.and_utc();
(dt.timestamp() as i128) * NANOSECONDS_IN_SECOND + (dt.timestamp_subsec_nanos() as i128)
}
}
impl PrimitiveValueDecoder<i128> for TimestampNanosecondAsDecimalWithTzDecoder {
fn skip(&mut self, n: usize) -> Result<()> {
self.0.skip(n)?;
Ok(())
}
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
self.0.decode(out)?;
for x in out.iter_mut() {
*x = self.next_inner(*x);
}
Ok(())
}
}