use anyhow::Result;
use crossbeam_channel::Receiver;
use std::collections::VecDeque;
use std::fs;
use std::io::{self, BufRead, BufReader, Read};
use std::thread;
use crate::decompression::DecompressionReader;
pub struct PeekableLineReader<R: BufRead> {
inner: R,
buffered_prefix: VecDeque<String>,
detected_line: Option<Option<String>>,
saw_any_input: bool,
}
impl<R: BufRead> PeekableLineReader<R> {
pub fn new(reader: R) -> Self {
Self {
inner: reader,
buffered_prefix: VecDeque::new(),
detected_line: None,
saw_any_input: false,
}
}
pub fn peek_first_non_empty_line(&mut self) -> io::Result<Option<String>> {
if let Some(cached) = &self.detected_line {
return Ok(cached.clone());
}
loop {
let mut line = String::new();
match self.inner.read_line(&mut line) {
Ok(0) => {
self.detected_line = Some(None);
return Ok(None);
}
Ok(_) => {
self.saw_any_input = true;
self.buffered_prefix.push_back(line.clone());
if !line.trim().is_empty() {
self.detected_line = Some(Some(line.clone()));
return Ok(Some(line));
}
}
Err(e) => return Err(e),
}
}
}
pub fn saw_any_input(&self) -> bool {
self.saw_any_input
}
}
impl<R: BufRead> BufRead for PeekableLineReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.inner.consume(amt)
}
fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
if let Some(line) = self.buffered_prefix.pop_front() {
buf.push_str(&line);
return Ok(line.len());
}
self.inner.read_line(buf)
}
}
impl<R: BufRead> std::io::Read for PeekableLineReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
pub struct ChannelStdinReader {
receiver: Receiver<Vec<u8>>,
current_buffer: Option<Vec<u8>>,
current_pos: usize,
eof: bool,
}
impl ChannelStdinReader {
pub fn new() -> Result<Self> {
let (sender, receiver) = crossbeam_channel::unbounded();
thread::spawn(move || {
let stdin = io::stdin();
let mut lock = stdin.lock();
let mut buffer = vec![0u8; 8192];
loop {
match lock.read(&mut buffer) {
Ok(0) => break, Ok(n) => {
if sender.send(buffer[..n].to_vec()).is_err() {
break; }
}
Err(_) => break, }
}
});
Ok(Self {
receiver,
current_buffer: None,
current_pos: 0,
eof: false,
})
}
fn ensure_current_buffer(&mut self) -> io::Result<()> {
if self.current_buffer.is_none() && !self.eof {
match self.receiver.recv() {
Ok(buffer) => {
self.current_buffer = Some(buffer);
self.current_pos = 0;
}
Err(_) => {
self.eof = true;
}
}
}
Ok(())
}
}
impl io::Read for ChannelStdinReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.ensure_current_buffer()?;
if let Some(ref buffer) = self.current_buffer {
let remaining = &buffer[self.current_pos..];
let to_copy = std::cmp::min(buf.len(), remaining.len());
if to_copy > 0 {
buf[..to_copy].copy_from_slice(&remaining[..to_copy]);
self.current_pos += to_copy;
if self.current_pos >= buffer.len() {
self.current_buffer = None;
self.current_pos = 0;
}
Ok(to_copy)
} else {
Ok(0)
}
} else {
Ok(0) }
}
}
impl io::BufRead for ChannelStdinReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.ensure_current_buffer()?;
if let Some(ref buffer) = self.current_buffer {
Ok(&buffer[self.current_pos..])
} else {
Ok(&[])
}
}
fn consume(&mut self, amt: usize) {
if let Some(ref buffer) = self.current_buffer {
self.current_pos = std::cmp::min(self.current_pos + amt, buffer.len());
if self.current_pos >= buffer.len() {
self.current_buffer = None;
self.current_pos = 0;
}
}
}
}
pub struct MultiFileReader {
files: Vec<String>,
current_file_idx: usize,
current_reader: Option<Box<dyn BufRead + Send>>,
buffer_size: usize,
strict: bool,
}
pub fn open_input_reader(
file_path: &str,
buffer_size: usize,
strict: bool,
) -> io::Result<Option<Box<dyn BufRead + Send>>> {
if file_path == "-" {
match ChannelStdinReader::new() {
Ok(stdin_reader) => match crate::decompression::maybe_decompress(stdin_reader) {
Ok(processed_reader) => Ok(Some(Box::new(BufReader::with_capacity(
buffer_size,
processed_reader,
)))),
Err(e) => {
eprintln!(
"{}",
crate::config::format_error_message_auto(&format!(
"Failed to setup stdin decompression: {}",
e
))
);
crate::stats::stats_file_open_failed("-");
if strict {
Err(io::Error::other(e))
} else {
Ok(None)
}
}
},
Err(e) => {
eprintln!(
"{}",
crate::config::format_error_message_auto(&format!(
"Failed to setup stdin reader: {}",
e
))
);
crate::stats::stats_file_open_failed("-");
if strict {
Err(io::Error::other(e))
} else {
Ok(None)
}
}
}
} else {
if let Ok(metadata) = fs::metadata(file_path) {
if metadata.is_dir() {
eprintln!(
"{}",
crate::config::format_error_message_auto(&format!(
"Input path '{}' is a directory; skipping (input files only)",
file_path
))
);
crate::stats::stats_file_open_failed(file_path);
if strict {
return Err(io::Error::other(format!(
"Input path '{}' is a directory; only files are supported",
file_path
)));
}
return Ok(None);
}
}
match DecompressionReader::new(file_path) {
Ok(decompressor) => Ok(Some(Box::new(BufReader::with_capacity(
buffer_size,
decompressor,
)))),
Err(e) => {
eprintln!(
"{}",
crate::config::format_error_message_auto(
&crate::config::format_input_open_error(file_path, &e.to_string()),
)
);
crate::stats::stats_file_open_failed(file_path);
if strict {
Err(io::Error::new(
io::ErrorKind::NotFound,
crate::config::format_input_open_error(file_path, &e.to_string()),
))
} else {
Ok(None)
}
}
}
}
}
pub trait FileAwareRead: BufRead + Send {
fn current_filename(&self) -> Option<&str>;
}
pub struct FileAwareMultiFileReader {
inner: MultiFileReader,
}
impl FileAwareMultiFileReader {
pub fn new(files: Vec<String>, strict: bool) -> Result<Self> {
Ok(Self {
inner: MultiFileReader::new(files, strict)?,
})
}
}
impl io::Read for FileAwareMultiFileReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl io::BufRead for FileAwareMultiFileReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.inner.consume(amt)
}
fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
self.inner.read_line(buf)
}
}
impl FileAwareRead for FileAwareMultiFileReader {
fn current_filename(&self) -> Option<&str> {
self.inner.current_filename()
}
}
impl MultiFileReader {
pub fn new(files: Vec<String>, strict: bool) -> Result<Self> {
Self::with_buffer_size(files, 256 * 1024, strict)
}
pub fn with_buffer_size(files: Vec<String>, buffer_size: usize, strict: bool) -> Result<Self> {
Ok(Self {
files,
current_file_idx: 0,
current_reader: None,
buffer_size,
strict,
})
}
fn ensure_current_reader(&mut self) -> io::Result<bool> {
while self.current_reader.is_none() && self.current_file_idx < self.files.len() {
let file_path = &self.files[self.current_file_idx];
match open_input_reader(file_path, self.buffer_size, self.strict)? {
Some(reader) => {
self.current_reader = Some(reader);
return Ok(true);
}
None => {
self.current_file_idx += 1;
continue;
}
}
}
Ok(self.current_reader.is_some())
}
fn advance_to_next_file(&mut self) {
self.current_reader = None;
self.current_file_idx += 1;
}
pub fn current_filename(&self) -> Option<&str> {
if self.current_file_idx < self.files.len() {
Some(&self.files[self.current_file_idx])
} else {
None
}
}
}
impl io::Read for MultiFileReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
if !self.ensure_current_reader()? {
return Ok(0); }
if let Some(ref mut reader) = self.current_reader {
match reader.read(buf) {
Ok(0) => {
self.advance_to_next_file();
continue;
}
Ok(n) => return Ok(n),
Err(e) => return Err(e),
}
}
}
}
}
impl io::BufRead for MultiFileReader {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if !self.ensure_current_reader()? {
return Ok(&[]); }
if let Some(ref mut reader) = self.current_reader {
reader.fill_buf()
} else {
Ok(&[])
}
}
fn consume(&mut self, amt: usize) {
if let Some(ref mut reader) = self.current_reader {
reader.consume(amt);
}
}
fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
loop {
if !self.ensure_current_reader()? {
return Ok(0); }
if let Some(ref mut reader) = self.current_reader {
match reader.read_line(buf) {
Ok(0) => {
self.advance_to_next_file();
if !buf.is_empty() && !buf.ends_with('\n') {
buf.push('\n');
return Ok(1);
}
continue;
}
Ok(n) => return Ok(n),
Err(e) => return Err(e),
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Write};
use tempfile::NamedTempFile;
#[test]
fn test_multi_file_reader_single_file() -> Result<()> {
let mut temp_file = NamedTempFile::new()?;
writeln!(temp_file, "line1")?;
writeln!(temp_file, "line2")?;
temp_file.flush()?;
let files = vec![temp_file.path().to_string_lossy().to_string()];
let mut reader = MultiFileReader::new(files, false)?;
let mut line = String::new();
let n = reader.read_line(&mut line)?;
assert_eq!(line, "line1\n");
assert_eq!(n, 6);
line.clear();
let n = reader.read_line(&mut line)?;
assert_eq!(line, "line2\n");
assert_eq!(n, 6);
line.clear();
let n = reader.read_line(&mut line)?;
assert_eq!(n, 0);
assert!(line.is_empty());
Ok(())
}
#[test]
fn test_multi_file_reader_multiple_files() -> Result<()> {
let mut temp_file1 = NamedTempFile::new()?;
writeln!(temp_file1, "file1_line1")?;
writeln!(temp_file1, "file1_line2")?;
temp_file1.flush()?;
let mut temp_file2 = NamedTempFile::new()?;
writeln!(temp_file2, "file2_line1")?;
temp_file2.flush()?;
let files = vec![
temp_file1.path().to_string_lossy().to_string(),
temp_file2.path().to_string_lossy().to_string(),
];
let mut reader = MultiFileReader::new(files, false)?;
let mut all_content = String::new();
reader.read_to_string(&mut all_content)?;
assert_eq!(all_content, "file1_line1\nfile1_line2\nfile2_line1\n");
Ok(())
}
}