use anyhow::Error;
use chrono::NaiveDate;
use odbc_api::sys::{Date, Len, Timestamp, NULL_DATA};
use parquet::{
column::writer::ColumnWriterImpl,
data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type},
};
use std::convert::TryInto;
pub struct ParquetBuffer {
pub values_i32: Vec<i32>,
pub values_i64: Vec<i64>,
pub values_bytes_array: Vec<ByteArray>,
pub def_levels: Vec<i16>,
}
impl ParquetBuffer {
pub fn new(batch_size: usize) -> ParquetBuffer {
ParquetBuffer {
values_i32: Vec::with_capacity(batch_size),
values_i64: Vec::with_capacity(batch_size),
values_bytes_array: Vec::with_capacity(batch_size),
def_levels: Vec::with_capacity(batch_size),
}
}
pub fn set_num_rows_fetched(&mut self, num_rows: usize) {
self.def_levels.resize(num_rows, 0);
self.values_i32.resize(num_rows, 0);
self.values_i64.resize(num_rows, 0);
self.values_bytes_array.resize(num_rows, ByteArray::new());
}
pub fn write_directly<T>(
&mut self,
cw: &mut ColumnWriterImpl<T>,
source: (&[T::T], &[Len]),
) -> Result<(), Error>
where
T: DataType,
{
let (values, indicators) = source;
for (def, &ind) in self.def_levels.iter_mut().zip(indicators) {
*def = if ind == NULL_DATA { 0 } else { 1 };
}
cw.write_batch(values, Some(&self.def_levels), None)?;
Ok(())
}
pub fn write_dates<'a>(
&mut self,
cw: &mut ColumnWriterImpl<Int32Type>,
dates: impl Iterator<Item = Option<&'a Date>>,
) -> Result<(), Error> {
let unix_epoch = NaiveDate::from_ymd(1970, 1, 1);
for (row_index, field) in dates.enumerate() {
let (value, def) = field
.map(|date| {
let date =
NaiveDate::from_ymd(date.year as i32, date.month as u32, date.day as u32);
let duration = date.signed_duration_since(unix_epoch);
(duration.num_days().try_into().unwrap(), 1)
})
.unwrap_or((0, 0));
self.def_levels[row_index] = def;
self.values_i32[row_index] = value;
}
cw.write_batch(&self.values_i32, Some(&self.def_levels), None)?;
Ok(())
}
pub fn write_timestamps<'a>(
&mut self,
cw: &mut ColumnWriterImpl<Int64Type>,
timestamps: impl Iterator<Item = Option<&'a Timestamp>>,
) -> Result<(), Error> {
for (row_index, field) in timestamps.enumerate() {
let (value, def) = field
.map(|ts| {
let datetime =
NaiveDate::from_ymd(ts.year as i32, ts.month as u32, ts.day as u32)
.and_hms_nano(
ts.hour as u32,
ts.minute as u32,
ts.second as u32,
ts.fraction as u32,
);
(datetime.timestamp_nanos() / 1000, 1)
})
.unwrap_or((0, 0));
self.def_levels[row_index] = def;
self.values_i64[row_index] = value;
}
cw.write_batch(&self.values_i64, Some(&self.def_levels), None)?;
Ok(())
}
pub fn write_strings<'a>(
&mut self,
cw: &mut ColumnWriterImpl<ByteArrayType>,
strings: impl Iterator<Item = Option<&'a [u8]>>,
) -> Result<(), Error> {
for (row_index, read_buf) in strings.enumerate() {
let (bytes, nul) = read_buf
.map(|buf| (buf.to_owned().into(), 1))
.unwrap_or_else(|| (ByteArray::new(), 0));
self.values_bytes_array[row_index] = bytes;
self.def_levels[row_index] = nul;
}
cw.write_batch(&self.values_bytes_array, Some(&self.def_levels), None)?;
Ok(())
}
}