use std::marker::PhantomData;
use anyhow::Error;
use odbc_api::buffers::{AnySlice, BufferDesc, Item};
use parquet::{
basic::{LogicalType, Repetition},
column::writer::{get_typed_column_writer_mut, ColumnWriter},
data_type::DataType,
schema::types::Type,
};
use crate::parquet_buffer::{BufferedDataType, ParquetBuffer};
use super::ColumnStrategy;
pub struct IdenticalOptional<Pdt> {
logical_type: Option<LogicalType>,
_parquet_data_type: PhantomData<Pdt>,
}
impl<Pdt> IdenticalOptional<Pdt> {
pub fn new() -> Self {
Self::with_logical_type(None)
}
pub fn with_logical_type(logical_type: Option<LogicalType>) -> Self {
Self {
logical_type,
_parquet_data_type: PhantomData,
}
}
}
impl<Pdt> ColumnStrategy for IdenticalOptional<Pdt>
where
Pdt: DataType,
Pdt::T: Item + BufferedDataType,
{
fn parquet_type(&self, name: &str) -> Type {
parquet_data_type::<Pdt>(name, self.logical_type.clone(), Repetition::OPTIONAL)
}
fn buffer_desc(&self) -> BufferDesc {
Pdt::T::buffer_desc(true)
}
fn copy_odbc_to_parquet(
&self,
parquet_buffer: &mut ParquetBuffer,
column_writer: &mut ColumnWriter,
column_view: AnySlice,
) -> Result<(), Error> {
let it = Pdt::T::as_nullable_slice(column_view).unwrap();
let column_writer = get_typed_column_writer_mut::<Pdt>(column_writer);
parquet_buffer.write_optional(column_writer, it.map(|opt_ref| opt_ref.copied()))?;
Ok(())
}
}
pub struct IdenticalRequired<Pdt> {
logical_type: Option<LogicalType>,
_parquet_data_type: PhantomData<Pdt>,
}
impl<Pdt> IdenticalRequired<Pdt> {
pub fn new() -> Self {
Self::with_logical_type(None)
}
pub fn with_logical_type(logical_type: Option<LogicalType>) -> Self {
Self {
logical_type,
_parquet_data_type: PhantomData,
}
}
}
impl<Pdt> ColumnStrategy for IdenticalRequired<Pdt>
where
Pdt: DataType,
Pdt::T: Item + BufferedDataType,
{
fn parquet_type(&self, name: &str) -> Type {
parquet_data_type::<Pdt>(name, self.logical_type.clone(), Repetition::REQUIRED)
}
fn buffer_desc(&self) -> BufferDesc {
Pdt::T::buffer_desc(false)
}
fn copy_odbc_to_parquet(
&self,
_parquet_buffer: &mut ParquetBuffer,
column_writer: &mut ColumnWriter,
column_view: AnySlice,
) -> Result<(), Error> {
let values = Pdt::T::as_slice(column_view).unwrap();
let column_writer = get_typed_column_writer_mut::<Pdt>(column_writer);
column_writer.write_batch(values, None, None)?;
Ok(())
}
}
fn parquet_data_type<Pdt>(
name: &str,
logical_type: Option<LogicalType>,
repetition: Repetition,
) -> Type
where
Pdt: DataType,
{
let physical_type = Pdt::get_physical_type();
let mut builder = Type::primitive_type_builder(name, physical_type)
.with_repetition(repetition)
.with_logical_type(logical_type.clone());
if let Some(LogicalType::Decimal { scale, precision }) = logical_type {
builder = builder.with_scale(scale).with_precision(precision);
}
builder.build().unwrap()
}
pub fn fetch_identical<Pdt>(is_optional: bool) -> Box<dyn ColumnStrategy>
where
Pdt: DataType,
Pdt::T: Item + BufferedDataType,
{
if is_optional {
Box::new(IdenticalOptional::<Pdt>::new())
} else {
Box::new(IdenticalRequired::<Pdt>::new())
}
}
pub fn fetch_identical_with_logical_type<Pdt>(
is_optional: bool,
logical_type: LogicalType,
) -> Box<dyn ColumnStrategy>
where
Pdt: DataType,
Pdt::T: Item + BufferedDataType,
{
if is_optional {
Box::new(IdenticalOptional::<Pdt>::with_logical_type(Some(
logical_type,
)))
} else {
Box::new(IdenticalRequired::<Pdt>::with_logical_type(Some(
logical_type,
)))
}
}