use crate::{Error, FileSplitting};
use csv::{StringRecord, Writer};
use std::{
io::Write,
path::{Path, PathBuf},
rc::Rc,
};
pub(crate) type CreateFileWriter = fn(&Path) -> std::io::Result<Box<dyn Write>>;
struct ShardFile {
path: PathBuf,
key: String,
writer: Writer<Box<dyn Write>>,
written: usize,
splitting: FileSplitting,
}
impl ShardFile {
fn write_record(&mut self, record: &StringRecord) -> Result<bool, Error> {
self.writer.write_record(record)?;
Ok(match self.splitting {
FileSplitting::NoSplit => false,
FileSplitting::SplitAfterRows(rows) => {
self.written += 1;
self.written >= rows
}
FileSplitting::SplitAfterBytes(bytes) => {
self.written += record.as_byte_record().as_slice().len();
self.written >= bytes
}
})
}
}
pub(crate) struct Shard<FNameFile>
where
FNameFile: Fn(&str, usize) -> String,
{
key: String,
sequence: usize,
splitting: FileSplitting,
current_file: Option<ShardFile>,
header_record: Option<StringRecord>,
create_file_writer: CreateFileWriter,
on_file_completion: Option<fn(&Path, &str)>,
create_output_filename: Rc<FNameFile>,
}
impl<FNameFile> Shard<FNameFile>
where
FNameFile: Fn(&str, usize) -> String,
{
fn path(&self) -> std::path::PathBuf {
(self.create_output_filename)(&self.key, self.sequence).into()
}
pub fn new(
splitting: FileSplitting,
key: String,
header_record: Option<StringRecord>,
create_file_writer: CreateFileWriter,
create_output_filename: Rc<FNameFile>,
on_file_completion: Option<fn(&Path, &str)>,
) -> Self {
Self {
splitting,
current_file: None,
header_record,
on_file_completion,
key,
sequence: 0,
create_output_filename,
create_file_writer,
}
}
pub fn write_record(&mut self, record: &StringRecord) -> Result<(), crate::Error> {
match self.current_file.as_mut() {
Some(sf) => {
if sf.write_record(record)? {
if let Some(s) = self.current_file.take() {
if let Some(callback) = &self.on_file_completion {
let ShardFile {
path, key, writer, ..
} = s;
drop(writer);
callback(&path, &key);
}
}
}
}
None => {
let writer = (self.create_file_writer)(&self.path())?;
let mut writer = Writer::from_writer(writer);
if let Some(h) = &self.header_record {
writer.write_record(h)?;
}
let mut shard_file = ShardFile {
path: self.path(),
key: self.key.to_owned(),
writer,
written: 0,
splitting: self.splitting,
};
self.sequence += 1;
if !shard_file.write_record(record)? {
self.current_file = Some(shard_file);
}
}
}
Ok(())
}
}
impl<FNameFile> Drop for Shard<FNameFile>
where
FNameFile: Fn(&str, usize) -> String,
{
fn drop(&mut self) {
if let Some(ShardFile {
path, key, writer, ..
}) = self.current_file.take()
{
if let Some(callback) = &self.on_file_completion {
drop(writer);
callback(&path, &key);
}
}
}
}