use std::fs::create_dir_all;
use std::io::Write;
use std::path::PathBuf;
use std::sync::mpsc::{Receiver, SyncSender};
use super::single::SplitEnum;
use crate::error::Result;
use crate::io;
pub(crate) struct SplitWriter {
chunk_senders: Vec<SyncSender<String>>,
next_index: usize,
}
impl SplitWriter {
pub fn new(
path: &PathBuf,
split: &SplitEnum,
chunk_size: Option<u64>,
total_rows: Option<u64>,
compression: io::Compression,
) -> Result<(Self, Vec<ChunkWriter>)> {
let n_chunks = match (split, chunk_size, total_rows) {
(_, None, _) => 1,
(SplitEnum::Rows(r), Some(c), _) => (r.total / c as f64).ceil() as u64,
(SplitEnum::Proportion(_), Some(_), None) => 2,
(SplitEnum::Proportion(p), Some(c), Some(t)) => {
((t as f64) * p.proportion / c as f64).ceil() as u64 + 1
}
};
let mut chunk_senders = Vec::new();
let mut chunk_writers = Vec::new();
for chunk_id in 0..n_chunks {
let (sender, receiver) = std::sync::mpsc::sync_channel(100);
chunk_senders.push(sender);
let chunk_id = if n_chunks == 1 {
None
} else {
Some(chunk_id as u64)
};
let chunk_writer = ChunkWriter::new(
path.clone(),
split.name().to_string(),
compression,
chunk_id,
chunk_size,
receiver,
);
chunk_writers.push(chunk_writer);
}
Ok((
SplitWriter {
chunk_senders,
next_index: 0,
},
chunk_writers,
))
}
pub fn send(&mut self, row: String) -> Result<bool> {
match self.chunk_senders.get(self.next_index) {
Some(sender) => {
sender.send(row)?;
self.next_index += 1;
}
None => {
self.chunk_senders[0].send(row)?;
self.next_index = 1;
}
}
Ok(true)
}
pub fn send_all(&mut self, row: &str) -> Result<()> {
for sender in &self.chunk_senders {
sender.send(row.to_string())?
}
Ok(())
}
pub fn finish(self) {
for sender in self.chunk_senders {
drop(sender);
}
}
}
pub struct ChunkWriter {
path: PathBuf,
name: String,
compression: io::Compression,
pub chunk_id: Option<u64>,
pub chunk_size: Option<u64>,
pub receiver: Receiver<String>,
}
impl ChunkWriter {
fn new(
path: PathBuf,
name: String,
compression: io::Compression,
chunk_id: Option<u64>,
chunk_size: Option<u64>,
receiver: Receiver<String>,
) -> Self {
ChunkWriter {
path,
name,
compression,
chunk_id,
chunk_size,
receiver,
}
}
pub fn output(&self, chunk_id: Option<u64>) -> Result<io::OutputWriter> {
let mut filename = self.path.clone();
let original_filename = self.path.file_stem().unwrap();
filename.pop();
filename.push(&self.name);
create_dir_all(&filename)?;
let chunk_part = match chunk_id {
None => "".to_string(),
Some(c) => format!(".{:0>4}", c),
};
let extension = match self.compression {
io::Compression::GzipCompression => ".gz",
io::Compression::Uncompressed => "",
};
filename.push(format!(
"{}.{}{}.csv{}",
original_filename.to_string_lossy(),
&self.name,
chunk_part,
extension,
));
io::open_output(filename, self.compression)
}
pub fn handle_row(&self, file: &mut io::OutputWriter, row: &str) -> Result<()> {
file.write_all(row.as_bytes())?;
Ok(())
}
}