use std::fs::{self, File};
use std::io::{self, BufWriter, Write};
use std::path::PathBuf;
use bzip2::Compression;
use bzip2::write::BzEncoder;
use crate::error::Error;
const FILES_PER_DIR: u64 = 100;
pub struct OutputConfig {
pub path: PathBuf,
pub max_file_size: u64,
pub compress: bool,
}
enum Writer {
Stdout(io::Stdout),
File(BufWriter<File>),
CompressedFile(BzEncoder<BufWriter<File>>),
}
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Writer::Stdout(w) => w.write(buf),
Writer::File(w) => w.write(buf),
Writer::CompressedFile(w) => w.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
Writer::Stdout(w) => w.flush(),
Writer::File(w) => w.flush(),
Writer::CompressedFile(w) => w.flush(),
}
}
}
pub struct OutputSplitter {
config: OutputConfig,
writer: Option<Writer>,
current_bytes: u64,
file_index: u64,
}
impl OutputSplitter {
pub fn new(config: OutputConfig) -> Result<Self, Error> {
let mut splitter = OutputSplitter {
config,
writer: None,
current_bytes: 0,
file_index: 0,
};
if splitter.is_stdout() {
splitter.writer = Some(Writer::Stdout(io::stdout()));
} else {
splitter.open_next_file()?;
}
Ok(splitter)
}
pub fn write(&mut self, data: &str) -> Result<(), Error> {
if self.is_stdout() {
if let Some(ref mut writer) = self.writer {
writer.write_all(data.as_bytes())?;
}
return Ok(());
}
let needs_rotation = if self.config.max_file_size == 0 {
self.current_bytes > 0
} else {
self.current_bytes + data.len() as u64 > self.config.max_file_size
};
if needs_rotation {
self.close()?;
self.open_next_file()?;
}
if let Some(ref mut writer) = self.writer {
writer.write_all(data.as_bytes())?;
self.current_bytes += data.len() as u64;
}
Ok(())
}
pub fn close(&mut self) -> Result<(), Error> {
if self.is_stdout() {
return Ok(());
}
if let Some(writer) = self.writer.take() {
match writer {
Writer::CompressedFile(encoder) => {
encoder.finish()?;
}
Writer::File(mut buf_writer) => {
buf_writer.flush()?;
}
Writer::Stdout(_) => {}
}
}
self.current_bytes = 0;
Ok(())
}
fn is_stdout(&self) -> bool {
self.config.path.as_os_str() == "-"
}
fn dir_name(dir_index: u64) -> String {
let first = (b'A' + ((dir_index / 26) % 26) as u8) as char;
let second = (b'A' + (dir_index % 26) as u8) as char;
format!("{}{}", first, second)
}
fn file_name(file_in_dir: u64, compress: bool) -> String {
if compress {
format!("wiki_{:02}.bz2", file_in_dir)
} else {
format!("wiki_{:02}", file_in_dir)
}
}
fn open_next_file(&mut self) -> Result<(), Error> {
let dir_index = self.file_index / FILES_PER_DIR;
let file_in_dir = self.file_index % FILES_PER_DIR;
let dir_name = Self::dir_name(dir_index);
let file_name = Self::file_name(file_in_dir, self.config.compress);
let dir_path = self.config.path.join(&dir_name);
fs::create_dir_all(&dir_path)?;
let file_path = dir_path.join(&file_name);
let file = File::create(&file_path)?;
let buf_writer = BufWriter::new(file);
self.writer = if self.config.compress {
Some(Writer::CompressedFile(BzEncoder::new(
buf_writer,
Compression::default(),
)))
} else {
Some(Writer::File(buf_writer))
};
self.current_bytes = 0;
self.file_index += 1;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::path::Path;
fn test_dir(name: &str) -> PathBuf {
let dir = env::temp_dir().join(format!("wicket_test_{}", name));
if dir.exists() {
fs::remove_dir_all(&dir).unwrap();
}
dir
}
fn cleanup(dir: &Path) {
if dir.exists() {
fs::remove_dir_all(dir).unwrap();
}
}
#[test]
fn test_next_file_naming() {
let dir = test_dir("file_naming");
let config = OutputConfig {
path: dir.clone(),
max_file_size: 1024,
compress: false,
};
let mut splitter = OutputSplitter::new(config).unwrap();
assert!(dir.join("AA").join("wiki_00").exists());
splitter.close().unwrap();
splitter.open_next_file().unwrap();
assert!(dir.join("AA").join("wiki_01").exists());
cleanup(&dir);
}
#[test]
fn test_dir_naming_sequence() {
assert_eq!(OutputSplitter::dir_name(0), "AA");
assert_eq!(OutputSplitter::dir_name(1), "AB");
assert_eq!(OutputSplitter::dir_name(25), "AZ");
assert_eq!(OutputSplitter::dir_name(26), "BA");
assert_eq!(OutputSplitter::dir_name(27), "BB");
assert_eq!(OutputSplitter::dir_name(51), "BZ");
assert_eq!(OutputSplitter::dir_name(52), "CA");
}
#[test]
fn test_file_rotation_by_size() {
let dir = test_dir("rotation_size");
let config = OutputConfig {
path: dir.clone(),
max_file_size: 50,
compress: false,
};
let mut splitter = OutputSplitter::new(config).unwrap();
let small_data = "Hello, World!\n";
splitter.write(small_data).unwrap();
assert!(dir.join("AA").join("wiki_00").exists());
let large_data = "A".repeat(50);
splitter.write(&large_data).unwrap();
assert!(dir.join("AA").join("wiki_01").exists());
splitter.close().unwrap();
cleanup(&dir);
}
#[test]
fn test_zero_size_one_per_file() {
let dir = test_dir("zero_size");
let config = OutputConfig {
path: dir.clone(),
max_file_size: 0,
compress: false,
};
let mut splitter = OutputSplitter::new(config).unwrap();
splitter.write("Article 1\n").unwrap();
splitter.write("Article 2\n").unwrap();
splitter.write("Article 3\n").unwrap();
splitter.close().unwrap();
assert!(dir.join("AA").join("wiki_00").exists());
assert!(dir.join("AA").join("wiki_01").exists());
assert!(dir.join("AA").join("wiki_02").exists());
let content0 = fs::read_to_string(dir.join("AA").join("wiki_00")).unwrap();
let content1 = fs::read_to_string(dir.join("AA").join("wiki_01")).unwrap();
let content2 = fs::read_to_string(dir.join("AA").join("wiki_02")).unwrap();
assert_eq!(content0, "Article 1\n");
assert_eq!(content1, "Article 2\n");
assert_eq!(content2, "Article 3\n");
cleanup(&dir);
}
#[test]
fn test_stdout_mode() {
let config = OutputConfig {
path: PathBuf::from("-"),
max_file_size: 1024,
compress: false,
};
let splitter = OutputSplitter::new(config).unwrap();
assert!(splitter.is_stdout());
}
}