Skip to main content

kelora/
readers.rs

1use anyhow::Result;
2use crossbeam_channel::Receiver;
3use std::collections::VecDeque;
4use std::fs;
5use std::io::{self, BufRead, BufReader, Read};
6use std::thread;
7
8use crate::decompression::DecompressionReader;
9
10/// A reader that can peek at the first line without consuming it
11/// Used for format auto-detection on streams
12pub struct PeekableLineReader<R: BufRead> {
13    inner: R,
14    buffered_prefix: VecDeque<String>,
15    detected_line: Option<Option<String>>,
16    saw_any_input: bool,
17}
18
19impl<R: BufRead> PeekableLineReader<R> {
20    pub fn new(reader: R) -> Self {
21        Self {
22            inner: reader,
23            buffered_prefix: VecDeque::new(),
24            detected_line: None,
25            saw_any_input: false,
26        }
27    }
28
29    /// Peek at the first non-empty line without consuming already-read lines.
30    /// Blank lines encountered before detection are replayed later by `read_line`.
31    pub fn peek_first_non_empty_line(&mut self) -> io::Result<Option<String>> {
32        if let Some(cached) = &self.detected_line {
33            return Ok(cached.clone());
34        }
35
36        loop {
37            let mut line = String::new();
38            match self.inner.read_line(&mut line) {
39                Ok(0) => {
40                    self.detected_line = Some(None);
41                    return Ok(None);
42                }
43                Ok(_) => {
44                    self.saw_any_input = true;
45                    self.buffered_prefix.push_back(line.clone());
46                    if !line.trim().is_empty() {
47                        self.detected_line = Some(Some(line.clone()));
48                        return Ok(Some(line));
49                    }
50                }
51                Err(e) => return Err(e),
52            }
53        }
54    }
55
56    pub fn saw_any_input(&self) -> bool {
57        self.saw_any_input
58    }
59}
60
61impl<R: BufRead> BufRead for PeekableLineReader<R> {
62    fn fill_buf(&mut self) -> io::Result<&[u8]> {
63        self.inner.fill_buf()
64    }
65
66    fn consume(&mut self, amt: usize) {
67        self.inner.consume(amt)
68    }
69
70    fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
71        if let Some(line) = self.buffered_prefix.pop_front() {
72            buf.push_str(&line);
73            return Ok(line.len());
74        }
75
76        self.inner.read_line(buf)
77    }
78}
79
80impl<R: BufRead> std::io::Read for PeekableLineReader<R> {
81    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
82        self.inner.read(buf)
83    }
84}
85
86/// A channel-based stdin reader that is Send-compatible
87pub struct ChannelStdinReader {
88    receiver: Receiver<Vec<u8>>,
89    current_buffer: Option<Vec<u8>>,
90    current_pos: usize,
91    eof: bool,
92}
93
94impl ChannelStdinReader {
95    pub fn new() -> Result<Self> {
96        let (sender, receiver) = crossbeam_channel::unbounded();
97
98        // Spawn a thread to read from stdin using raw bytes
99        thread::spawn(move || {
100            let stdin = io::stdin();
101            let mut lock = stdin.lock();
102            let mut buffer = vec![0u8; 8192]; // 8KB buffer
103
104            loop {
105                match lock.read(&mut buffer) {
106                    Ok(0) => break, // EOF
107                    Ok(n) => {
108                        if sender.send(buffer[..n].to_vec()).is_err() {
109                            break; // Receiver dropped
110                        }
111                    }
112                    Err(_) => break, // Error reading
113                }
114            }
115        });
116
117        Ok(Self {
118            receiver,
119            current_buffer: None,
120            current_pos: 0,
121            eof: false,
122        })
123    }
124
125    fn ensure_current_buffer(&mut self) -> io::Result<()> {
126        if self.current_buffer.is_none() && !self.eof {
127            match self.receiver.recv() {
128                Ok(buffer) => {
129                    self.current_buffer = Some(buffer);
130                    self.current_pos = 0;
131                }
132                Err(_) => {
133                    self.eof = true;
134                }
135            }
136        }
137        Ok(())
138    }
139}
140
141impl io::Read for ChannelStdinReader {
142    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143        self.ensure_current_buffer()?;
144
145        if let Some(ref buffer) = self.current_buffer {
146            let remaining = &buffer[self.current_pos..];
147            let to_copy = std::cmp::min(buf.len(), remaining.len());
148
149            if to_copy > 0 {
150                buf[..to_copy].copy_from_slice(&remaining[..to_copy]);
151                self.current_pos += to_copy;
152
153                // If we've consumed the entire buffer, clear it
154                if self.current_pos >= buffer.len() {
155                    self.current_buffer = None;
156                    self.current_pos = 0;
157                }
158
159                Ok(to_copy)
160            } else {
161                Ok(0)
162            }
163        } else {
164            Ok(0) // EOF
165        }
166    }
167}
168
169impl io::BufRead for ChannelStdinReader {
170    fn fill_buf(&mut self) -> io::Result<&[u8]> {
171        self.ensure_current_buffer()?;
172
173        if let Some(ref buffer) = self.current_buffer {
174            Ok(&buffer[self.current_pos..])
175        } else {
176            Ok(&[])
177        }
178    }
179
180    fn consume(&mut self, amt: usize) {
181        if let Some(ref buffer) = self.current_buffer {
182            self.current_pos = std::cmp::min(self.current_pos + amt, buffer.len());
183
184            // If we've consumed the entire buffer, clear it
185            if self.current_pos >= buffer.len() {
186                self.current_buffer = None;
187                self.current_pos = 0;
188            }
189        }
190    }
191}
192
193/// A multi-file reader that streams through files sequentially
194pub struct MultiFileReader {
195    files: Vec<String>,
196    current_file_idx: usize,
197    current_reader: Option<Box<dyn BufRead + Send>>,
198    buffer_size: usize,
199    strict: bool,
200}
201
202pub fn open_input_reader(
203    file_path: &str,
204    buffer_size: usize,
205    strict: bool,
206) -> io::Result<Option<Box<dyn BufRead + Send>>> {
207    if file_path == "-" {
208        match ChannelStdinReader::new() {
209            Ok(stdin_reader) => match crate::decompression::maybe_decompress(stdin_reader) {
210                Ok(processed_reader) => Ok(Some(Box::new(BufReader::with_capacity(
211                    buffer_size,
212                    processed_reader,
213                )))),
214                Err(e) => {
215                    eprintln!(
216                        "{}",
217                        crate::config::format_error_message_auto(&format!(
218                            "Failed to setup stdin decompression: {}",
219                            e
220                        ))
221                    );
222                    crate::stats::stats_file_open_failed("-");
223                    if strict {
224                        Err(io::Error::other(e))
225                    } else {
226                        Ok(None)
227                    }
228                }
229            },
230            Err(e) => {
231                eprintln!(
232                    "{}",
233                    crate::config::format_error_message_auto(&format!(
234                        "Failed to setup stdin reader: {}",
235                        e
236                    ))
237                );
238                crate::stats::stats_file_open_failed("-");
239                if strict {
240                    Err(io::Error::other(e))
241                } else {
242                    Ok(None)
243                }
244            }
245        }
246    } else {
247        if let Ok(metadata) = fs::metadata(file_path) {
248            if metadata.is_dir() {
249                eprintln!(
250                    "{}",
251                    crate::config::format_error_message_auto(&format!(
252                        "Input path '{}' is a directory; skipping (input files only)",
253                        file_path
254                    ))
255                );
256                crate::stats::stats_file_open_failed(file_path);
257                if strict {
258                    return Err(io::Error::other(format!(
259                        "Input path '{}' is a directory; only files are supported",
260                        file_path
261                    )));
262                }
263                return Ok(None);
264            }
265        }
266
267        match DecompressionReader::new(file_path) {
268            Ok(decompressor) => Ok(Some(Box::new(BufReader::with_capacity(
269                buffer_size,
270                decompressor,
271            )))),
272            Err(e) => {
273                eprintln!(
274                    "{}",
275                    crate::config::format_error_message_auto(
276                        &crate::config::format_input_open_error(file_path, &e.to_string()),
277                    )
278                );
279                crate::stats::stats_file_open_failed(file_path);
280                if strict {
281                    Err(io::Error::new(
282                        io::ErrorKind::NotFound,
283                        crate::config::format_input_open_error(file_path, &e.to_string()),
284                    ))
285                } else {
286                    Ok(None)
287                }
288            }
289        }
290    }
291}
292
293/// A file-aware reader that can provide filename information
294pub trait FileAwareRead: BufRead + Send {
295    fn current_filename(&self) -> Option<&str>;
296}
297
298/// A multi-file reader that provides filename information
299pub struct FileAwareMultiFileReader {
300    inner: MultiFileReader,
301}
302
303impl FileAwareMultiFileReader {
304    pub fn new(files: Vec<String>, strict: bool) -> Result<Self> {
305        Ok(Self {
306            inner: MultiFileReader::new(files, strict)?,
307        })
308    }
309}
310
311impl io::Read for FileAwareMultiFileReader {
312    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
313        self.inner.read(buf)
314    }
315}
316
317impl io::BufRead for FileAwareMultiFileReader {
318    fn fill_buf(&mut self) -> io::Result<&[u8]> {
319        self.inner.fill_buf()
320    }
321
322    fn consume(&mut self, amt: usize) {
323        self.inner.consume(amt)
324    }
325
326    fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
327        self.inner.read_line(buf)
328    }
329}
330
331impl FileAwareRead for FileAwareMultiFileReader {
332    fn current_filename(&self) -> Option<&str> {
333        self.inner.current_filename()
334    }
335}
336
337impl MultiFileReader {
338    /// Create a new MultiFileReader with default buffer size (256KB for better throughput)
339    pub fn new(files: Vec<String>, strict: bool) -> Result<Self> {
340        Self::with_buffer_size(files, 256 * 1024, strict)
341    }
342
343    /// Create a new MultiFileReader with custom buffer size
344    pub fn with_buffer_size(files: Vec<String>, buffer_size: usize, strict: bool) -> Result<Self> {
345        Ok(Self {
346            files,
347            current_file_idx: 0,
348            current_reader: None,
349            buffer_size,
350            strict,
351        })
352    }
353
354    fn ensure_current_reader(&mut self) -> io::Result<bool> {
355        while self.current_reader.is_none() && self.current_file_idx < self.files.len() {
356            let file_path = &self.files[self.current_file_idx];
357            match open_input_reader(file_path, self.buffer_size, self.strict)? {
358                Some(reader) => {
359                    self.current_reader = Some(reader);
360                    return Ok(true);
361                }
362                None => {
363                    self.current_file_idx += 1;
364                    continue;
365                }
366            }
367        }
368
369        Ok(self.current_reader.is_some())
370    }
371
372    fn advance_to_next_file(&mut self) {
373        self.current_reader = None;
374        self.current_file_idx += 1;
375    }
376
377    /// Get the current filename being read (if any)
378    pub fn current_filename(&self) -> Option<&str> {
379        if self.current_file_idx < self.files.len() {
380            Some(&self.files[self.current_file_idx])
381        } else {
382            None
383        }
384    }
385}
386
387impl io::Read for MultiFileReader {
388    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
389        loop {
390            if !self.ensure_current_reader()? {
391                return Ok(0); // No more files
392            }
393
394            if let Some(ref mut reader) = self.current_reader {
395                match reader.read(buf) {
396                    Ok(0) => {
397                        // EOF on current file, advance to next
398                        self.advance_to_next_file();
399                        continue;
400                    }
401                    Ok(n) => return Ok(n),
402                    Err(e) => return Err(e),
403                }
404            }
405        }
406    }
407}
408
409impl io::BufRead for MultiFileReader {
410    fn fill_buf(&mut self) -> io::Result<&[u8]> {
411        if !self.ensure_current_reader()? {
412            return Ok(&[]); // No more files
413        }
414
415        if let Some(ref mut reader) = self.current_reader {
416            reader.fill_buf()
417        } else {
418            Ok(&[])
419        }
420    }
421
422    fn consume(&mut self, amt: usize) {
423        if let Some(ref mut reader) = self.current_reader {
424            reader.consume(amt);
425        }
426    }
427
428    fn read_line(&mut self, buf: &mut String) -> io::Result<usize> {
429        loop {
430            if !self.ensure_current_reader()? {
431                return Ok(0); // No more files
432            }
433
434            if let Some(ref mut reader) = self.current_reader {
435                match reader.read_line(buf) {
436                    Ok(0) => {
437                        // EOF on current file, advance to next
438                        self.advance_to_next_file();
439
440                        // Add newline between files if the previous file didn't end with one
441                        if !buf.is_empty() && !buf.ends_with('\n') {
442                            buf.push('\n');
443                            return Ok(1);
444                        }
445                        continue;
446                    }
447                    Ok(n) => return Ok(n),
448                    Err(e) => return Err(e),
449                }
450            }
451        }
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use std::io::{Read, Write};
459    use tempfile::NamedTempFile;
460
461    #[test]
462    fn test_multi_file_reader_single_file() -> Result<()> {
463        // Create a temporary file
464        let mut temp_file = NamedTempFile::new()?;
465        writeln!(temp_file, "line1")?;
466        writeln!(temp_file, "line2")?;
467        temp_file.flush()?;
468
469        let files = vec![temp_file.path().to_string_lossy().to_string()];
470        let mut reader = MultiFileReader::new(files, false)?;
471
472        let mut line = String::new();
473
474        // Read first line
475        let n = reader.read_line(&mut line)?;
476        assert_eq!(line, "line1\n");
477        assert_eq!(n, 6);
478
479        line.clear();
480
481        // Read second line
482        let n = reader.read_line(&mut line)?;
483        assert_eq!(line, "line2\n");
484        assert_eq!(n, 6);
485
486        line.clear();
487
488        // EOF
489        let n = reader.read_line(&mut line)?;
490        assert_eq!(n, 0);
491        assert!(line.is_empty());
492
493        Ok(())
494    }
495
496    #[test]
497    fn test_multi_file_reader_multiple_files() -> Result<()> {
498        // Create temporary files
499        let mut temp_file1 = NamedTempFile::new()?;
500        writeln!(temp_file1, "file1_line1")?;
501        writeln!(temp_file1, "file1_line2")?;
502        temp_file1.flush()?;
503
504        let mut temp_file2 = NamedTempFile::new()?;
505        writeln!(temp_file2, "file2_line1")?;
506        temp_file2.flush()?;
507
508        let files = vec![
509            temp_file1.path().to_string_lossy().to_string(),
510            temp_file2.path().to_string_lossy().to_string(),
511        ];
512        let mut reader = MultiFileReader::new(files, false)?;
513
514        let mut all_content = String::new();
515        reader.read_to_string(&mut all_content)?;
516
517        assert_eq!(all_content, "file1_line1\nfile1_line2\nfile2_line1\n");
518
519        Ok(())
520    }
521}