use std::{
fmt,
fs::{self, File},
io::{self, BufRead, BufReader, Read},
path::{Path, PathBuf},
};
use failure::Fail;
use log;
#[cfg(feature = "gz")]
use flate2::read::GzDecoder;
#[cfg(feature = "gz")]
use std::ffi::OsStr;
#[derive(Debug, Fail)]
pub enum DirectoryLinesStreamerError {
#[fail(display = "directory {:?} does not exists", _0)]
DirectoryDoesNotExists(PathBuf),
#[fail(display = "{}", _0)]
Io(#[cause] io::Error),
#[fail(display = "directory {:?} is empty", _0)]
EmptyDirectory(PathBuf),
}
pub struct DirectoryLinesStreamer {
dir: PathBuf,
files: Vec<PathBuf>,
opened_file_path: PathBuf,
opened_file: BufReader<Box<dyn Read>>,
line_id: usize,
}
impl fmt::Debug for DirectoryLinesStreamer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "DirectoryLinesStreamer {{ dir: {:?}, files: {:?}, opened_file_path: {:?}, line_id: {:?} }}",
self.dir,
self.files,
self.opened_file_path,
self.line_id,
)
}
}
impl DirectoryLinesStreamer {
pub fn from_dir<P>(input_dir: P) -> Result<DirectoryLinesStreamer, failure::Error>
where
P: Into<PathBuf>,
{
let dir = input_dir.into();
if dir.exists() {
let dir_entries = fs::read_dir(&dir)?;
let mut files: Vec<PathBuf> = dir_entries
.filter_map(Result::ok)
.map(|dir_entry| dir_entry.path())
.collect();
alphanumeric_sort::sort_path_slice(&mut files);
let mut files: Vec<PathBuf> = files.into_iter().rev().collect();
log::debug!("files: {:#?}", files);
if files.is_empty() {
Err(DirectoryLinesStreamerError::EmptyDirectory(dir).into())
} else {
let opened_file_path = files.pop().unwrap();
log::debug!("Opening first file: {:?}", opened_file_path);
let opened_file = BufReader::new(open_file(&opened_file_path)?);
Ok(DirectoryLinesStreamer {
dir,
files,
opened_file_path,
opened_file,
line_id: 1,
})
}
} else {
Err(DirectoryLinesStreamerError::DirectoryDoesNotExists(dir).into())
}
}
}
impl Iterator for DirectoryLinesStreamer {
type Item = String;
fn next(&mut self) -> Option<String> {
read_next_line_from_files(
&mut self.files,
&mut self.opened_file,
&mut self.opened_file_path,
&mut self.line_id,
)
}
}
fn open_file<P>(to_open: P) -> io::Result<Box<dyn Read>>
where
P: AsRef<Path>,
{
let to_open = to_open.as_ref();
log::debug!("Opening file: {:?}", to_open);
#[allow(unused_mut)]
let mut file: Box<dyn Read> = Box::new(File::open(to_open)?);
#[cfg(feature = "gz")]
{
if to_open.extension() == Some(OsStr::new("gz")) {
log::debug!("Setting up Gzip decompression...");
file = Box::new(GzDecoder::new(file));
}
}
Ok(file)
}
fn read_next_line_from_files(
files: &mut Vec<PathBuf>,
opened_file: &mut BufReader<Box<dyn Read>>,
opened_file_path: &mut PathBuf,
line_id: &mut usize,
) -> Option<String> {
loop {
let line = read_line_from_file(opened_file, opened_file_path, *line_id);
*line_id += 1;
if line.is_some() {
return line;
} else {
let next_file = files.pop()?;
match open_file(&next_file) {
Err(e) => log::error!("Error opening file {:?}: {:?}", next_file, e),
Ok(f) => {
*opened_file = BufReader::new(f);
*opened_file_path = next_file;
}
}
}
}
}
fn read_line_from_file(
f: &mut BufReader<Box<dyn Read>>,
file_path: &PathBuf,
line_id: usize,
) -> Option<String> {
let mut buf: Vec<u8> = Vec::new();
let nb_bytes_read_result = f.read_until(b'\n', &mut buf);
let line = String::from_utf8_lossy(&buf).to_string();
match nb_bytes_read_result {
Ok(nb_bytes_read) => {
if nb_bytes_read == 0 {
None
} else {
Some(line)
}
}
Err(e) => {
log::error!("Error reading line {} of {:?}: {:?}", line_id, file_path, e);
Some(line)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn init_logger() {
let _ = env_logger::Builder::new()
.filter(
Some("dir_lines_streamer"),
log::Level::Trace.to_level_filter(),
)
.try_init();
}
#[test]
fn streamer_failure() {
init_logger();
match DirectoryLinesStreamer::from_dir("fixtures/non-existent-dir")
.unwrap_err()
.downcast_ref()
.unwrap()
{
DirectoryLinesStreamerError::DirectoryDoesNotExists(dir) => {
assert_eq!(dir, &PathBuf::from("fixtures/non-existent-dir"))
}
_ => panic!("Unexpected error"),
}
}
#[test]
fn streamer_empty() {
init_logger();
match DirectoryLinesStreamer::from_dir("fixtures/empty-dir")
.unwrap_err()
.downcast_ref()
.unwrap()
{
DirectoryLinesStreamerError::EmptyDirectory(dir) => {
assert_eq!(dir, &PathBuf::from("fixtures/empty-dir"))
}
_ => panic!("Unexpected error"),
}
}
#[test]
fn streamer_success() {
init_logger();
let streamer = DirectoryLinesStreamer::from_dir("fixtures/non-empty-dir").unwrap();
let expected_lines = &[
"line one from messages\n",
"line two from messages\n",
"line three from messages\n",
"line one from messages.1\n",
"line two from messages.1\n",
"line three from messages.1\n",
"line one from messages.2\n",
"line two from messages.2\n",
"line three from messages.2\n",
"line one from messages.10\n",
"line two from messages.10\n",
"line three from messages.10\n",
"line one from messages.20\n",
"line two from messages.20\n",
"line three from messages.20\n",
#[cfg(feature = "gz")]
"line one from messages.30\n",
#[cfg(feature = "gz")]
"line two from messages.30\n",
#[cfg(feature = "gz")]
"line three from messages.30\n",
];
for (ref line, expected_line) in streamer.zip(expected_lines) {
assert_eq!(line, expected_line);
}
}
#[test]
fn collect() {
init_logger();
let streamer = DirectoryLinesStreamer::from_dir("fixtures/non-empty-dir").unwrap();
let lines: Vec<String> = streamer.collect();
let expected_lines = &[
"line one from messages\n",
"line two from messages\n",
"line three from messages\n",
"line one from messages.1\n",
"line two from messages.1\n",
"line three from messages.1\n",
"line one from messages.2\n",
"line two from messages.2\n",
"line three from messages.2\n",
"line one from messages.10\n",
"line two from messages.10\n",
"line three from messages.10\n",
"line one from messages.20\n",
"line two from messages.20\n",
"line three from messages.20\n",
#[cfg(feature = "gz")]
"line one from messages.30\n",
#[cfg(feature = "gz")]
"line two from messages.30\n",
#[cfg(feature = "gz")]
"line three from messages.30\n",
];
for (ref line, expected_line) in lines.iter().zip(expected_lines) {
assert_eq!(line, expected_line);
}
}
}