use std::{
io::{stdout, Write},
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::{anyhow, format_err, Error};
use io_arg::IoArg;
use parquet::{
basic::{Compression, Encoding},
file::{
properties::{WriterProperties, WriterVersion},
writer::SerializedFileWriter,
},
schema::types::{ColumnPath, Type},
};
use super::{
batch_size_limit::FileSizeLimit, conversion_strategy::ColumnExporter, current_file::CurrentFile,
};
pub struct ParquetWriterOptions {
pub column_compression_default: Compression,
pub column_encodings: Vec<(String, Encoding)>,
pub suffix_length: usize,
pub file_size: FileSizeLimit,
pub no_empty_file: bool,
}
pub fn parquet_output(
output: IoArg,
schema: Arc<Type>,
options: ParquetWriterOptions,
) -> Result<Box<dyn ParquetOutput>, Error> {
let mut wpb = WriterProperties::builder()
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_compression(options.column_compression_default);
for (column_name, encoding) in options.column_encodings.clone() {
let col = ColumnPath::new(vec![column_name]);
wpb = wpb.set_column_encoding(col, encoding)
}
let properties = Arc::new(wpb.build());
let writer: Box<dyn ParquetOutput> = match output {
IoArg::StdStream => Box::new(StandardOut::new(schema, properties)?),
IoArg::File(path) => Box::new(FileWriter::new(path, schema, options, properties)?),
};
Ok(writer)
}
pub trait ParquetOutput {
fn write_row_group(
&mut self,
num_batch: u32,
export_nth_column: ColumnExporter,
) -> Result<(), Error>;
fn close(self) -> Result<(), Error>;
fn close_box(self: Box<Self>) -> Result<(), Error>;
}
struct FileWriter {
base_path: PathBuf,
schema: Arc<Type>,
properties: Arc<WriterProperties>,
file_size: FileSizeLimit,
num_file: u32,
suffix_length: usize,
current_file: Option<CurrentFile>,
}
impl FileWriter {
pub fn new(
path: PathBuf,
schema: Arc<Type>,
options: ParquetWriterOptions,
properties: Arc<WriterProperties>,
) -> Result<Self, Error> {
let mut file_writer = Self {
base_path: path,
schema,
properties,
file_size: options.file_size,
num_file: 0,
suffix_length: options.suffix_length,
current_file: None,
};
if !options.no_empty_file {
file_writer.next_file()?;
}
Ok(file_writer)
}
fn next_file(&mut self) -> Result<(), Error> {
let suffix = self
.file_size
.output_is_splitted()
.then_some((self.num_file + 1, self.suffix_length));
let path = Self::current_path(&self.base_path, suffix)?;
self.current_file = Some(CurrentFile::new(
path,
self.schema.clone(),
self.properties.clone(),
)?);
self.num_file += 1;
Ok(())
}
fn current_path(base_path: &Path, suffix: Option<(u32, usize)>) -> Result<PathBuf, Error> {
let path = if let Some((num_file, suffix_length)) = suffix {
path_with_suffix(base_path, num_file, suffix_length)?
} else {
base_path.to_owned()
};
Ok(path)
}
}
impl ParquetOutput for FileWriter {
fn write_row_group(
&mut self,
num_batch: u32,
column_exporter: ColumnExporter,
) -> Result<(), Error> {
if self.current_file.is_none() {
self.next_file()?
}
let file_size = self
.current_file
.as_mut()
.unwrap()
.write_row_group(column_exporter)?;
if self.file_size.file_limit_reached(num_batch, file_size) {
self.current_file.take().unwrap().finalize()?;
}
Ok(())
}
fn close(self) -> Result<(), Error> {
if let Some(open_file) = self.current_file {
open_file.finalize()?;
}
Ok(())
}
fn close_box(self: Box<Self>) -> Result<(), Error> {
self.close()
}
}
struct StandardOut {
writer: SerializedFileWriter<Box<dyn Write + Send>>,
}
impl StandardOut {
pub fn new(schema: Arc<Type>, properties: Arc<WriterProperties>) -> Result<Self, Error> {
let output: Box<dyn Write + Send> = Box::new(stdout());
let writer = SerializedFileWriter::new(output, schema.clone(), properties.clone())?;
Ok(Self { writer })
}
}
impl ParquetOutput for StandardOut {
fn write_row_group(
&mut self,
_num_batch: u32,
mut column_exporter: ColumnExporter,
) -> Result<(), Error> {
let mut row_group_writer = self.writer.next_row_group()?;
let mut col_index = 0;
while let Some(mut column_writer) = row_group_writer.next_column()? {
column_exporter.export_nth_column(col_index, &mut column_writer)?;
column_writer.close()?;
col_index += 1;
}
row_group_writer.close()?;
Ok(())
}
fn close(self) -> Result<(), Error> {
self.writer.close()?;
Ok(())
}
fn close_box(self: Box<Self>) -> Result<(), Error> {
self.close()
}
}
fn path_with_suffix(path: &Path, num_file: u32, suffix_length: usize) -> Result<PathBuf, Error> {
let suffix = format!("_{:0width$}", num_file, width = suffix_length);
let mut stem = path
.file_stem()
.ok_or_else(|| format_err!("Output needs To have a file stem."))?
.to_owned();
stem.push(suffix);
if let Some(extension) = path.extension() {
stem.push(format!(
".{}",
extension
.to_str()
.ok_or(anyhow!("Output file extension is not valid UTF-8"))?
));
}
let path_with_suffix = path.with_file_name(stem);
Ok(path_with_suffix)
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use super::path_with_suffix;
#[test]
fn filenames_with_suffixes() {
let path = path_with_suffix(&PathBuf::from("test.par"), 1, 2).unwrap();
assert_eq!(path.to_str().unwrap(), "test_01.par");
}
#[test]
fn filenames_with_dot_in_filestem_and_suffix() {
let path = path_with_suffix(&PathBuf::from("server-name-schema.table.par"), 1, 2).unwrap();
assert_eq!(path.to_str().unwrap(), "server-name-schema.table_01.par");
}
#[test]
fn retain_path_before_file_name() {
let path = path_with_suffix(&PathBuf::from("./some_path/out.par"), 1, 2).unwrap();
assert_eq!(path, PathBuf::from("./some_path/out_01.par"));
}
}