use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
use anyhow::Error;
use bytesize::ByteSize;
use log::info;
use parquet::{
file::{properties::WriterProperties, writer::SerializedFileWriter},
schema::types::Type,
};
use tempfile::TempPath;
use super::conversion_strategy::ColumnExporter;
pub struct CurrentFile {
writer: SerializedFileWriter<Box<dyn Write + Send>>,
path: TempPath,
file_size: ByteSize,
total_num_rows: u64,
}
impl CurrentFile {
pub fn new(
path: PathBuf,
schema: Arc<Type>,
properties: Arc<WriterProperties>,
) -> Result<CurrentFile, Error> {
let output: Box<dyn Write + Send> = Box::new(File::create(&path).map_err(|io_err| {
Error::from(io_err).context(format!(
"Could not create output file '{}'",
path.to_string_lossy()
))
})?);
let path = TempPath::from_path(path);
let writer = SerializedFileWriter::new(output, schema.clone(), properties.clone())?;
Ok(Self {
writer,
path,
file_size: ByteSize::b(0),
total_num_rows: 0,
})
}
pub fn write_row_group(
&mut self,
mut column_exporter: ColumnExporter,
) -> Result<ByteSize, Error> {
let mut col_index = 0;
let mut row_group_writer = self.writer.next_row_group()?;
while let Some(mut column_writer) = row_group_writer.next_column()? {
column_exporter.export_nth_column(col_index, &mut column_writer)?;
column_writer.close()?;
col_index += 1;
}
let metadata = row_group_writer.close()?;
self.file_size += ByteSize::b(metadata.compressed_size().try_into().unwrap());
let rows_in_row_group: u64 = metadata.num_rows().try_into().unwrap();
self.total_num_rows += rows_in_row_group;
Ok(self.file_size)
}
pub fn finalize(self) -> Result<(), Error> {
self.writer.close()?;
let path = self.path.keep()?;
info!(
"{} rows have been written to {} with a file size of {}.",
self.total_num_rows,
path.to_string_lossy(),
self.file_size
);
Ok(())
}
}