use std::cmp::Ordering;
use std::sync::Arc;
use arrow::array::ArrayRef;
use arrow::buffer::NullBuffer;
use arrow::datatypes::Decimal128Type;
use snafu::ResultExt;
use crate::encoding::decimal::UnboundedVarintStreamDecoder;
use crate::encoding::integer::get_signed_int_decoder;
use crate::encoding::PrimitiveValueDecoder;
use crate::error::ArrowSnafu;
use crate::proto::stream::Kind;
use crate::stripe::Stripe;
use crate::{column::Column, error::Result};
use super::{ArrayBatchDecoder, PresentDecoder, PrimitiveArrayDecoder};
pub fn new_decimal_decoder(
column: &Column,
stripe: &Stripe,
precision: u32,
fixed_scale: u32,
) -> Box<dyn ArrayBatchDecoder> {
let varint_iter = stripe.stream_map().get(column, Kind::Data);
let varint_iter = Box::new(UnboundedVarintStreamDecoder::new(varint_iter));
let scale_iter = stripe.stream_map().get(column, Kind::Secondary);
let scale_iter = get_signed_int_decoder::<i32>(scale_iter, column.rle_version());
let present = PresentDecoder::from_stripe(stripe, column);
let iter = DecimalScaleRepairDecoder {
varint_iter,
scale_iter,
fixed_scale,
};
let iter = Box::new(iter);
Box::new(DecimalArrayDecoder::new(
precision as u8,
fixed_scale as i8,
iter,
present,
))
}
pub struct DecimalArrayDecoder {
precision: u8,
scale: i8,
inner: PrimitiveArrayDecoder<Decimal128Type>,
}
impl DecimalArrayDecoder {
pub fn new(
precision: u8,
scale: i8,
iter: Box<dyn PrimitiveValueDecoder<i128> + Send>,
present: Option<PresentDecoder>,
) -> Self {
let inner = PrimitiveArrayDecoder::<Decimal128Type>::new(iter, present);
Self {
precision,
scale,
inner,
}
}
}
impl ArrayBatchDecoder for DecimalArrayDecoder {
fn next_batch(
&mut self,
batch_size: usize,
parent_present: Option<&NullBuffer>,
) -> Result<ArrayRef> {
let array = self
.inner
.next_primitive_batch(batch_size, parent_present)?
.with_precision_and_scale(self.precision, self.scale)
.context(ArrowSnafu)?;
let array = Arc::new(array) as ArrayRef;
Ok(array)
}
fn skip_values(&mut self, n: usize, parent_present: Option<&NullBuffer>) -> Result<()> {
self.inner.skip_values(n, parent_present)
}
}
struct DecimalScaleRepairDecoder {
varint_iter: Box<dyn PrimitiveValueDecoder<i128> + Send>,
scale_iter: Box<dyn PrimitiveValueDecoder<i32> + Send>,
fixed_scale: u32,
}
impl PrimitiveValueDecoder<i128> for DecimalScaleRepairDecoder {
fn skip(&mut self, n: usize) -> Result<()> {
self.varint_iter.skip(n)?;
self.scale_iter.skip(n)?;
Ok(())
}
fn decode(&mut self, out: &mut [i128]) -> Result<()> {
let mut varint = vec![0; out.len()];
let mut scale = vec![0; out.len()];
self.varint_iter.decode(&mut varint)?;
self.scale_iter.decode(&mut scale)?;
for (index, (&varint, &scale)) in varint.iter().zip(scale.iter()).enumerate() {
out[index] = fix_i128_scale(varint, self.fixed_scale, scale);
}
Ok(())
}
}
fn fix_i128_scale(i: i128, fixed_scale: u32, varying_scale: i32) -> i128 {
let varying_scale = varying_scale as u32;
match fixed_scale.cmp(&varying_scale) {
Ordering::Less => {
let scale_factor = varying_scale - fixed_scale;
let scale_factor = 10_i128.pow(scale_factor);
i / scale_factor
}
Ordering::Equal => i,
Ordering::Greater => {
let scale_factor = fixed_scale - varying_scale;
let scale_factor = 10_i128.pow(scale_factor);
i * scale_factor
}
}
}