use std::{marker::PhantomData, sync::Arc, borrow::Cow};
use parquet::{data_type::DataType, file::writer::SerializedColumnWriter, errors::ParquetError};
use crate::{level_index::{LevelIndexState, LevelIndexList}, myfrom::MyFrom};
use super::{real_memory_size::RealMemorySize, ColumnAppenderBase, ColumnAppender, DynamicSerializedWriter};
pub struct GenericColumnAppender<TPg, TPq, FConversion>
where TPq::T: Clone + RealMemorySize, TPq: DataType, FConversion: Fn(TPg) -> TPq::T {
max_dl: i16,
max_rl: i16,
column: Vec<TPq::T>,
dls: Vec<i16>,
rls: Vec<i16>,
dummy: PhantomData<TPg>,
dummy2: PhantomData<TPq>,
repetition_index: LevelIndexState,
conversion: FConversion,
}
pub fn new_autoconv_generic_appender<TPg, TPq: DataType>(
max_dl: i16, max_rl: i16,
) -> GenericColumnAppender<TPg, TPq, impl Fn(TPg) -> TPq::T>
where TPq::T: Clone + RealMemorySize, TPq::T: MyFrom<TPg> {
GenericColumnAppender::new(max_dl, max_rl, |value: TPg| MyFrom::my_from(value))
}
impl<TPg, TPq, FConversion> GenericColumnAppender<TPg, TPq, FConversion>
where TPq::T: Clone + RealMemorySize, TPq: DataType, FConversion: Fn(TPg) -> TPq::T {
pub fn new(max_dl: i16, max_rl: i16, conversion: FConversion) -> Self {
if max_dl < 0 || max_rl < 0 {
panic!("Cannot create {} with max_dl={}, max_rl={}", std::any::type_name::<Self>(), max_dl, max_rl);
}
GenericColumnAppender {
max_dl, max_rl,
column: Vec::new(),
dummy: PhantomData,
dummy2: PhantomData,
dls: Vec::new(),
rls: Vec::new(),
repetition_index: LevelIndexState::new(max_rl),
conversion,
}
}
pub fn convert(&self, value: TPg) -> TPq::T {
(self.conversion)(value)
}
fn write_column(&mut self, writer: &mut SerializedColumnWriter) -> Result<(), ParquetError> {
let dls = if self.max_dl > 0 { Some(self.dls.as_slice()) } else { None };
let rls = if self.max_rl > 0 { Some(self.rls.as_slice()) } else { None };
let typed = writer.typed::<TPq>();
let _num_written = typed.write_batch(&self.column, dls, rls)?;
self.column.clear();
self.dls.clear();
self.rls.clear();
Ok(())
}
}
impl<TPg, TPq, FConversion> ColumnAppenderBase for GenericColumnAppender<TPg, TPq, FConversion>
where TPq::T: Clone + RealMemorySize, TPq: DataType, FConversion: Fn(TPg) -> TPq::T {
fn write_columns<'b>(&mut self, column_i: usize, next_col: &mut dyn DynamicSerializedWriter) -> Result<(), String> {
let mut error = None;
let c = next_col.next_column(&mut |mut column| {
let result = self.write_column(&mut column);
let error1 = result.err();
let result2 = column.close();
error = error1.or(result2.err());
}).map_err(|e| format!("Could not create column[{}]: {}", column_i, e))?;
if error.is_some() {
return Err(format!("Couldn't write data of column[{}]: {}", column_i, error.unwrap()));
}
if !c {
return Err("Not enough columns".to_string());
}
Ok(())
}
fn write_null(&mut self, repetition_index: &LevelIndexList, level: i16) -> Result<usize, String> {
debug_assert!(level < self.max_dl);
self.dls.push(level);
if self.max_rl > 0 {
let rl = self.repetition_index.copy_and_diff(repetition_index);
self.rls.push(rl);
Ok(4)
} else {
Ok(2)
}
}
fn max_dl(&self) -> i16 { self.max_dl }
fn max_rl(&self) -> i16 { self.max_rl }
}
impl<TPg: Clone, TPq, FConversion> ColumnAppender<TPg> for GenericColumnAppender<TPg, TPq, FConversion>
where TPq::T: Clone + RealMemorySize, TPq: DataType, FConversion: Fn(TPg) -> TPq::T {
fn copy_value(&mut self, repetition_index: &LevelIndexList, value: Cow<TPg>) -> Result<usize, String> {
let pq_value = self.convert(value.into_owned());
let byte_size = pq_value.real_memory_size();
self.column.push(pq_value);
if self.max_dl > 0 {
self.dls.push(self.max_dl);
}
if self.max_rl > 0 {
let rl = self.repetition_index.copy_and_diff(repetition_index);
self.rls.push(rl);
}
Ok(byte_size + (self.max_dl > 0) as usize * 2 + (self.max_rl > 0) as usize * 2)
}
}