use std::{convert::TryInto, marker::PhantomData};
use anyhow::Error;
use atoi::FromRadix10Signed;
use odbc_api::{
buffers::{AnySlice, BufferDesc},
decimal_text_to_i128, decimal_text_to_i32, decimal_text_to_i64, DataType,
};
use parquet::{
basic::{LogicalType, Repetition, Type as PhysicalType},
column::writer::ColumnWriter,
data_type::{DataType as ParquetDataType, FixedLenByteArrayType, Int32Type, Int64Type},
schema::types::Type,
};
use crate::parquet_buffer::{BufferedDataType, ParquetBuffer};
use super::{
column_strategy::ColumnStrategy, identical::fetch_identical_with_logical_type, text::Utf8,
};
pub fn decimal_fetch_strategy(
is_optional: bool,
scale: i32,
precision: u8,
avoid_decimal: bool,
driver_does_support_i64: bool,
) -> Box<dyn ColumnStrategy> {
let repetition = if is_optional {
Repetition::OPTIONAL
} else {
Repetition::REQUIRED
};
if avoid_decimal && scale != 0 {
let length = precision as usize + 2;
return Box::new(Utf8::with_bytes_length(repetition, length));
}
match (precision, scale) {
(0..=9, 0) => {
let logical_type = if avoid_decimal {
LogicalType::Integer {
bit_width: 32,
is_signed: true,
}
} else {
LogicalType::Decimal {
scale: 0,
precision: precision as i32,
}
};
fetch_identical_with_logical_type::<Int32Type>(is_optional, logical_type)
}
(0..=9, 1..=9) => {
Box::new(DecimalTextToInteger::<Int32Type>::new(
precision,
scale,
repetition,
LogicalType::Decimal {
scale,
precision: precision as i32,
},
))
}
(10..=18, 0) => {
let logical_type = if avoid_decimal {
LogicalType::Integer {
bit_width: 64,
is_signed: true,
}
} else {
LogicalType::Decimal {
scale: 0,
precision: precision as i32,
}
};
if driver_does_support_i64 {
fetch_identical_with_logical_type::<Int64Type>(is_optional, logical_type)
} else {
let logical_type = if avoid_decimal {
LogicalType::Integer {
bit_width: 64,
is_signed: true,
}
} else {
LogicalType::Decimal {
scale,
precision: precision as i32,
}
};
Box::new(DecimalTextToInteger::<Int64Type>::new(
precision,
0,
repetition,
logical_type,
))
}
}
(10..=18, 1..=18) => {
Box::new(DecimalTextToInteger::<Int64Type>::new(
precision,
scale,
repetition,
LogicalType::Decimal {
scale,
precision: precision as i32,
},
))
}
(0..=38, _) => Box::new(DecimalAsBinary::new(repetition, scale, precision)),
(_, _) => {
let length = odbc_api::DataType::Decimal {
precision: precision as usize,
scale: scale.try_into().unwrap(),
}
.display_size()
.unwrap();
Box::new(Utf8::with_bytes_length(repetition, length.get()))
}
}
}
struct DecimalTextToInteger<Pdt> {
precision: u8,
scale: i32,
repetition: Repetition,
logical_type: LogicalType,
_pdt: PhantomData<Pdt>,
}
impl<Pdt> DecimalTextToInteger<Pdt> {
fn new(precision: u8, scale: i32, repetition: Repetition, logical_type: LogicalType) -> Self {
Self {
precision,
scale,
repetition,
logical_type,
_pdt: PhantomData,
}
}
}
impl<Pdt> ColumnStrategy for DecimalTextToInteger<Pdt>
where
Pdt: ParquetDataType,
Pdt::T: FromRadix10Signed + BufferedDataType + FromDecimalTextRepresentation,
{
fn parquet_type(&self, name: &str) -> Type {
Type::primitive_type_builder(name, Pdt::get_physical_type())
.with_logical_type(Some(self.logical_type.clone()))
.with_precision(self.precision as i32)
.with_scale(self.scale)
.with_repetition(self.repetition)
.build()
.unwrap()
}
fn buffer_desc(&self) -> BufferDesc {
let max_str_len = DataType::Decimal {
precision: self.precision as usize,
scale: self.scale.try_into().unwrap(),
}
.display_size()
.unwrap()
.get();
BufferDesc::Text { max_str_len }
}
fn copy_odbc_to_parquet(
&self,
parquet_buffer: &mut ParquetBuffer,
column_writer: &mut ColumnWriter,
column_view: AnySlice,
) -> Result<(), Error> {
let column_writer = Pdt::get_column_writer_mut(column_writer).unwrap();
let view = column_view.as_text_view().expect(
"Invalid Column view type. This is not supposed to happen. Please open a Bug at \
https://github.com/pacman82/odbc2parquet/issues.",
);
parquet_buffer.write_optional(
column_writer,
view.iter().map(|value| {
let scale = self.scale as usize;
value.map(|text| Pdt::T::from_decimal_text_representation(text, scale))
}),
)
}
}
trait FromDecimalTextRepresentation {
fn from_decimal_text_representation(text: &[u8], scale: usize) -> Self;
}
impl FromDecimalTextRepresentation for i32 {
fn from_decimal_text_representation(text: &[u8], scale: usize) -> Self {
decimal_text_to_i32(text, scale)
}
}
impl FromDecimalTextRepresentation for i64 {
fn from_decimal_text_representation(text: &[u8], scale: usize) -> Self {
decimal_text_to_i64(text, scale)
}
}
struct DecimalAsBinary {
repetition: Repetition,
scale: i32,
precision: u8,
length_in_bytes: usize,
}
impl DecimalAsBinary {
pub fn new(repetition: Repetition, scale: i32, precision: u8) -> Self {
let num_binary_digits = precision as f64 * 10f64.log2();
let length_in_bits = num_binary_digits + 1.0;
let length_in_bytes = (length_in_bits / 8.0).ceil() as usize;
Self {
repetition,
scale,
precision,
length_in_bytes,
}
}
}
impl ColumnStrategy for DecimalAsBinary {
fn parquet_type(&self, name: &str) -> Type {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_length(self.length_in_bytes.try_into().unwrap())
.with_logical_type(Some(LogicalType::Decimal {
scale: self.scale,
precision: self.precision as i32,
}))
.with_precision(self.precision.into())
.with_scale(self.scale)
.with_repetition(self.repetition)
.build()
.unwrap()
}
fn buffer_desc(&self) -> BufferDesc {
let max_str_len = DataType::Decimal {
precision: self.precision as usize,
scale: self.scale.try_into().unwrap(),
}
.display_size()
.unwrap()
.get();
BufferDesc::Text { max_str_len }
}
fn copy_odbc_to_parquet(
&self,
parquet_buffer: &mut ParquetBuffer,
column_writer: &mut ColumnWriter,
column_view: AnySlice,
) -> Result<(), Error> {
write_decimal_col(
parquet_buffer,
column_writer,
column_view,
self.length_in_bytes,
self.scale,
)
}
}
fn write_decimal_col(
parquet_buffer: &mut ParquetBuffer,
column_writer: &mut ColumnWriter,
column_reader: AnySlice,
length_in_bytes: usize,
scale: i32,
) -> Result<(), Error> {
let column_writer = FixedLenByteArrayType::get_column_writer_mut(column_writer).unwrap();
let view = column_reader.as_text_view().expect(
"Invalid Column view type. This is not supposed to happen. Please open a Bug at \
https://github.com/pacman82/odbc2parquet/issues.",
);
let scale = scale as usize;
parquet_buffer.write_twos_complement_i128(
column_writer,
view.iter()
.map(|field| field.map(|text| decimal_text_to_i128(text, scale))),
length_in_bytes,
)?;
Ok(())
}