1use anyhow::Result;
2use crossbeam_channel::Receiver;
3use std::collections::VecDeque;
4use std::fs;
5use std::io::{self, BufRead, BufReader, Read};
6use std::thread;
7
8use crate::decompression::DecompressionReader;
9
10pub struct PeekableLineReader<R: BufRead> {
13 inner: R,
14 buffered_prefix: VecDeque<String>,
15 detected_line: Option<Option<String>>,
16 saw_any_input: bool,
17}
18
19impl<R: BufRead> PeekableLineReader<R> {
20 pub fn new(reader: R) -> Self {
21 Self {
22 inner: reader,
23 buffered_prefix: VecDeque::new(),
24 detected_line: None,
25 saw_any_input: false,
26 }
27 }
28
29 pub fn peek_first_non_empty_line(&mut self) -> io::Result<Option<String>> {
32 if let Some(cached) = &self.detected_line {
33 return Ok(cached.clone());
34 }
35
36 loop {
37 let mut line = String::new();
38 match self.inner.read_line(&mut line) {
39 Ok(0) => {
40 self.detected_line = Some(None);
41 return Ok(None);
42 }
43 Ok(_) => {
44 self.saw_any_input = true;
45 self.buffered_prefix.push_back(line.clone());
46 if !line.trim().is_empty() {
47 self.detected_line = Some(Some(line.clone()));
48 return Ok(Some(line));
49 }
50 }
51 Err(e) => return Err(e),
52 }
53 }
54 }
55
56 pub fn saw_any_input(&self) -> bool {
57 self.saw_any_input
58 }
59}
60
61impl<R: BufRead> BufRead for PeekableLineReader<R> {
62 fn fill_buf(&mut self) -> io::Result<&[u8]> {
63 self.inner.fill_buf()
64 }
65
66 fn consume(&mut self, amt: usize) {
67 self.inner.consume(amt)
68 }
69
70 fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
71 if let Some(line) = self.buffered_prefix.pop_front() {
72 buf.push_str(&line);
73 return Ok(line.len());
74 }
75
76 self.inner.read_line(buf)
77 }
78}
79
80impl<R: BufRead> std::io::Read for PeekableLineReader<R> {
81 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82 self.inner.read(buf)
83 }
84}
85
86pub struct ChannelStdinReader {
88 receiver: Receiver<Vec<u8>>,
89 current_buffer: Option<Vec<u8>>,
90 current_pos: usize,
91 eof: bool,
92}
93
94impl ChannelStdinReader {
95 pub fn new() -> Result<Self> {
96 let (sender, receiver) = crossbeam_channel::unbounded();
97
98 thread::spawn(move || {
100 let stdin = io::stdin();
101 let mut lock = stdin.lock();
102 let mut buffer = vec![0u8; 8192]; loop {
105 match lock.read(&mut buffer) {
106 Ok(0) => break, Ok(n) => {
108 if sender.send(buffer[..n].to_vec()).is_err() {
109 break; }
111 }
112 Err(_) => break, }
114 }
115 });
116
117 Ok(Self {
118 receiver,
119 current_buffer: None,
120 current_pos: 0,
121 eof: false,
122 })
123 }
124
125 fn ensure_current_buffer(&mut self) -> io::Result<()> {
126 if self.current_buffer.is_none() && !self.eof {
127 match self.receiver.recv() {
128 Ok(buffer) => {
129 self.current_buffer = Some(buffer);
130 self.current_pos = 0;
131 }
132 Err(_) => {
133 self.eof = true;
134 }
135 }
136 }
137 Ok(())
138 }
139}
140
141impl io::Read for ChannelStdinReader {
142 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143 self.ensure_current_buffer()?;
144
145 if let Some(ref buffer) = self.current_buffer {
146 let remaining = &buffer[self.current_pos..];
147 let to_copy = std::cmp::min(buf.len(), remaining.len());
148
149 if to_copy > 0 {
150 buf[..to_copy].copy_from_slice(&remaining[..to_copy]);
151 self.current_pos += to_copy;
152
153 if self.current_pos >= buffer.len() {
155 self.current_buffer = None;
156 self.current_pos = 0;
157 }
158
159 Ok(to_copy)
160 } else {
161 Ok(0)
162 }
163 } else {
164 Ok(0) }
166 }
167}
168
169impl io::BufRead for ChannelStdinReader {
170 fn fill_buf(&mut self) -> io::Result<&[u8]> {
171 self.ensure_current_buffer()?;
172
173 if let Some(ref buffer) = self.current_buffer {
174 Ok(&buffer[self.current_pos..])
175 } else {
176 Ok(&[])
177 }
178 }
179
180 fn consume(&mut self, amt: usize) {
181 if let Some(ref buffer) = self.current_buffer {
182 self.current_pos = std::cmp::min(self.current_pos + amt, buffer.len());
183
184 if self.current_pos >= buffer.len() {
186 self.current_buffer = None;
187 self.current_pos = 0;
188 }
189 }
190 }
191}
192
193pub struct MultiFileReader {
195 files: Vec<String>,
196 current_file_idx: usize,
197 current_reader: Option<Box<dyn BufRead + Send>>,
198 buffer_size: usize,
199 strict: bool,
200}
201
202pub fn open_input_reader(
203 file_path: &str,
204 buffer_size: usize,
205 strict: bool,
206) -> io::Result<Option<Box<dyn BufRead + Send>>> {
207 if file_path == "-" {
208 match ChannelStdinReader::new() {
209 Ok(stdin_reader) => match crate::decompression::maybe_decompress(stdin_reader) {
210 Ok(processed_reader) => Ok(Some(Box::new(BufReader::with_capacity(
211 buffer_size,
212 processed_reader,
213 )))),
214 Err(e) => {
215 eprintln!(
216 "{}",
217 crate::config::format_error_message_auto(&format!(
218 "Failed to setup stdin decompression: {}",
219 e
220 ))
221 );
222 crate::stats::stats_file_open_failed("-");
223 if strict {
224 Err(io::Error::other(e))
225 } else {
226 Ok(None)
227 }
228 }
229 },
230 Err(e) => {
231 eprintln!(
232 "{}",
233 crate::config::format_error_message_auto(&format!(
234 "Failed to setup stdin reader: {}",
235 e
236 ))
237 );
238 crate::stats::stats_file_open_failed("-");
239 if strict {
240 Err(io::Error::other(e))
241 } else {
242 Ok(None)
243 }
244 }
245 }
246 } else {
247 if let Ok(metadata) = fs::metadata(file_path) {
248 if metadata.is_dir() {
249 eprintln!(
250 "{}",
251 crate::config::format_error_message_auto(&format!(
252 "Input path '{}' is a directory; skipping (input files only)",
253 file_path
254 ))
255 );
256 crate::stats::stats_file_open_failed(file_path);
257 if strict {
258 return Err(io::Error::other(format!(
259 "Input path '{}' is a directory; only files are supported",
260 file_path
261 )));
262 }
263 return Ok(None);
264 }
265 }
266
267 match DecompressionReader::new(file_path) {
268 Ok(decompressor) => Ok(Some(Box::new(BufReader::with_capacity(
269 buffer_size,
270 decompressor,
271 )))),
272 Err(e) => {
273 eprintln!(
274 "{}",
275 crate::config::format_error_message_auto(
276 &crate::config::format_input_open_error(file_path, &e.to_string()),
277 )
278 );
279 crate::stats::stats_file_open_failed(file_path);
280 if strict {
281 Err(io::Error::new(
282 io::ErrorKind::NotFound,
283 crate::config::format_input_open_error(file_path, &e.to_string()),
284 ))
285 } else {
286 Ok(None)
287 }
288 }
289 }
290 }
291}
292
293pub trait FileAwareRead: BufRead + Send {
295 fn current_filename(&self) -> Option<&str>;
296}
297
298pub struct FileAwareMultiFileReader {
300 inner: MultiFileReader,
301}
302
303impl FileAwareMultiFileReader {
304 pub fn new(files: Vec<String>, strict: bool) -> Result<Self> {
305 Ok(Self {
306 inner: MultiFileReader::new(files, strict)?,
307 })
308 }
309}
310
311impl io::Read for FileAwareMultiFileReader {
312 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
313 self.inner.read(buf)
314 }
315}
316
317impl io::BufRead for FileAwareMultiFileReader {
318 fn fill_buf(&mut self) -> io::Result<&[u8]> {
319 self.inner.fill_buf()
320 }
321
322 fn consume(&mut self, amt: usize) {
323 self.inner.consume(amt)
324 }
325
326 fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
327 self.inner.read_line(buf)
328 }
329}
330
331impl FileAwareRead for FileAwareMultiFileReader {
332 fn current_filename(&self) -> Option<&str> {
333 self.inner.current_filename()
334 }
335}
336
337impl MultiFileReader {
338 pub fn new(files: Vec<String>, strict: bool) -> Result<Self> {
340 Self::with_buffer_size(files, 256 * 1024, strict)
341 }
342
343 pub fn with_buffer_size(files: Vec<String>, buffer_size: usize, strict: bool) -> Result<Self> {
345 Ok(Self {
346 files,
347 current_file_idx: 0,
348 current_reader: None,
349 buffer_size,
350 strict,
351 })
352 }
353
354 fn ensure_current_reader(&mut self) -> io::Result<bool> {
355 while self.current_reader.is_none() && self.current_file_idx < self.files.len() {
356 let file_path = &self.files[self.current_file_idx];
357 match open_input_reader(file_path, self.buffer_size, self.strict)? {
358 Some(reader) => {
359 self.current_reader = Some(reader);
360 return Ok(true);
361 }
362 None => {
363 self.current_file_idx += 1;
364 continue;
365 }
366 }
367 }
368
369 Ok(self.current_reader.is_some())
370 }
371
372 fn advance_to_next_file(&mut self) {
373 self.current_reader = None;
374 self.current_file_idx += 1;
375 }
376
377 pub fn current_filename(&self) -> Option<&str> {
379 if self.current_file_idx < self.files.len() {
380 Some(&self.files[self.current_file_idx])
381 } else {
382 None
383 }
384 }
385}
386
387impl io::Read for MultiFileReader {
388 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
389 loop {
390 if !self.ensure_current_reader()? {
391 return Ok(0); }
393
394 if let Some(ref mut reader) = self.current_reader {
395 match reader.read(buf) {
396 Ok(0) => {
397 self.advance_to_next_file();
399 continue;
400 }
401 Ok(n) => return Ok(n),
402 Err(e) => return Err(e),
403 }
404 }
405 }
406 }
407}
408
409impl io::BufRead for MultiFileReader {
410 fn fill_buf(&mut self) -> io::Result<&[u8]> {
411 if !self.ensure_current_reader()? {
412 return Ok(&[]); }
414
415 if let Some(ref mut reader) = self.current_reader {
416 reader.fill_buf()
417 } else {
418 Ok(&[])
419 }
420 }
421
422 fn consume(&mut self, amt: usize) {
423 if let Some(ref mut reader) = self.current_reader {
424 reader.consume(amt);
425 }
426 }
427
428 fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
429 loop {
430 if !self.ensure_current_reader()? {
431 return Ok(0); }
433
434 if let Some(ref mut reader) = self.current_reader {
435 match reader.read_line(buf) {
436 Ok(0) => {
437 self.advance_to_next_file();
439
440 if !buf.is_empty() && !buf.ends_with('\n') {
442 buf.push('\n');
443 return Ok(1);
444 }
445 continue;
446 }
447 Ok(n) => return Ok(n),
448 Err(e) => return Err(e),
449 }
450 }
451 }
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use std::io::{Read, Write};
459 use tempfile::NamedTempFile;
460
461 #[test]
462 fn test_multi_file_reader_single_file() -> Result<()> {
463 let mut temp_file = NamedTempFile::new()?;
465 writeln!(temp_file, "line1")?;
466 writeln!(temp_file, "line2")?;
467 temp_file.flush()?;
468
469 let files = vec![temp_file.path().to_string_lossy().to_string()];
470 let mut reader = MultiFileReader::new(files, false)?;
471
472 let mut line = String::new();
473
474 let n = reader.read_line(&mut line)?;
476 assert_eq!(line, "line1\n");
477 assert_eq!(n, 6);
478
479 line.clear();
480
481 let n = reader.read_line(&mut line)?;
483 assert_eq!(line, "line2\n");
484 assert_eq!(n, 6);
485
486 line.clear();
487
488 let n = reader.read_line(&mut line)?;
490 assert_eq!(n, 0);
491 assert!(line.is_empty());
492
493 Ok(())
494 }
495
496 #[test]
497 fn test_multi_file_reader_multiple_files() -> Result<()> {
498 let mut temp_file1 = NamedTempFile::new()?;
500 writeln!(temp_file1, "file1_line1")?;
501 writeln!(temp_file1, "file1_line2")?;
502 temp_file1.flush()?;
503
504 let mut temp_file2 = NamedTempFile::new()?;
505 writeln!(temp_file2, "file2_line1")?;
506 temp_file2.flush()?;
507
508 let files = vec![
509 temp_file1.path().to_string_lossy().to_string(),
510 temp_file2.path().to_string_lossy().to_string(),
511 ];
512 let mut reader = MultiFileReader::new(files, false)?;
513
514 let mut all_content = String::new();
515 reader.read_to_string(&mut all_content)?;
516
517 assert_eq!(all_content, "file1_line1\nfile1_line2\nfile2_line1\n");
518
519 Ok(())
520 }
521}