use std::io::BufRead;
use std::path::Path;
use dsi_progress_logger::{ConcurrentProgressLog, ProgressLog};
use rayon::prelude::*;
pub struct ByteLines<B: std::io::BufRead> {
buf: B,
}
impl<B: BufRead> Iterator for ByteLines<B> {
type Item = std::io::Result<Vec<u8>>;
fn next(&mut self) -> Option<std::io::Result<Vec<u8>>> {
let mut buf = Vec::new();
match self.buf.read_until(b'\n', &mut buf) {
Ok(0) => None,
Ok(_n) => {
if buf.last() == Some(&b'\n') {
buf.pop();
if buf.last() == Some(&b'\r') {
buf.pop();
}
}
Some(Ok(buf))
}
Err(e) => Some(Err(e)),
}
}
}
pub trait ToByteLines: std::io::BufRead + Sized {
fn byte_lines(self) -> ByteLines<Self> {
ByteLines { buf: self }
}
}
impl<B: std::io::BufRead> ToByteLines for B {}
pub fn iter_lines_from_file<'a, Line>(
path: &Path,
mut pl: impl ProgressLog + 'a,
) -> impl Iterator<Item = Line> + 'a
where
Line: TryFrom<Vec<u8>>,
<Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
{
std::io::BufReader::new(
zstd::stream::read::Decoder::new(
std::fs::File::open(path).unwrap_or_else(|e| {
panic!("Could not open {} for reading: {:?}", path.display(), e)
}),
)
.unwrap_or_else(|e| panic!("{} is not a ZSTD file: {:?}", path.display(), e)),
)
.byte_lines()
.map(move |line| {
pl.light_update();
line.unwrap_or_else(|line| panic!("Could not parse swhid {:?}", &line))
.try_into()
.unwrap_or_else(|line| panic!("Could not parse swhid {:?}", &line))
})
}
pub fn iter_lines_from_dir<'a, Line>(
path: &'a Path,
pl: impl ConcurrentProgressLog + 'a,
) -> impl Iterator<Item = Line> + 'a
where
Line: TryFrom<Vec<u8>>,
<Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
{
let mut file_paths: Vec<_> = std::fs::read_dir(path)
.unwrap_or_else(|e| panic!("Could not list {}: {:?}", path.display(), e))
.map(|entry| {
entry
.as_ref()
.unwrap_or_else(|e| panic!("Could not read {} entry: {:?}", path.display(), e))
.path()
})
.collect();
file_paths.sort();
file_paths
.into_iter()
.flat_map(move |file_path| iter_lines_from_file(&file_path, pl.clone()))
}
pub fn par_iter_lines_from_dir<'a, Line>(
path: &'a Path,
pl: impl ConcurrentProgressLog + 'a,
) -> impl ParallelIterator<Item = Line> + 'a
where
Line: TryFrom<Vec<u8>> + Send,
<Line as TryFrom<Vec<u8>>>::Error: std::fmt::Debug,
{
let mut file_paths: Vec<_> = std::fs::read_dir(path)
.unwrap_or_else(|e| panic!("Could not list {}: {:?}", path.display(), e))
.map(|entry| {
entry
.as_ref()
.unwrap_or_else(|e| panic!("Could not read {} entry: {:?}", path.display(), e))
.path()
})
.collect();
file_paths.sort();
file_paths
.into_par_iter()
.flat_map_iter(move |file_path| iter_lines_from_file(&file_path, pl.clone()))
}