pipeprogress/
read.rs

1//! Reads the input from file or stdin
2//!
3//! # Read
4//!
5//! Reads the input from file or stdin
6//!
7
8use crate::CHUNK_SIZE;
9
10use crossbeam::channel::Sender;
11use std::fs::File;
12use std::io::{self, BufReader, Read, Result};
13
14// Update the function signature to accept Option<&str> for infile
15pub fn read_loop(
16    infile: Option<&str>,
17    stats_tx: Sender<usize>,
18    write_tx: Sender<Vec<u8>>,
19) -> Result<()> {
20    // Read from a file if provided, otherwise default to stdin
21    let mut reader: Box<dyn Read> = match infile {
22        Some(path) => Box::new(BufReader::new(File::open(path)?)),
23        None => Box::new(BufReader::new(io::stdin())),
24    };
25
26    let mut buffer = [0; CHUNK_SIZE];
27
28    loop {
29        // Read a fixed number of bytes from stdin
30        let num_read = match reader.read(&mut buffer) {
31            Ok(0) => break,  // break on 0 bytes
32            Ok(x) => x,      // return the number of bytes
33            Err(_) => break, // break on error
34        };
35
36        // send this buffer to the stats thread
37        let _ = stats_tx.send(num_read);
38
39        // send this buffer to the write thread
40        if write_tx.send(Vec::from(&buffer[..num_read])).is_err() {
41            break;
42        }
43    }
44    // send an empty buffer to the stats thread
45    let _ = stats_tx.send(0);
46    let _ = write_tx.send(Vec::new());
47    Ok(())
48}
49
50#[cfg(test)]
51mod tests {
52    use super::*;
53    use crossbeam::channel::unbounded;
54    use std::fs::File;
55    use std::io::Write;
56
57    #[test]
58    fn test_read_loop() -> Result<()> {
59        // Create a test file with some content
60        let test_input = "test_input.txt";
61        let test_data = "Pipe Progress...";
62        let mut file = File::create(test_input)?;
63        file.write_all(test_data.as_bytes())?;
64
65        // Set up channels
66        let (stats_tx, _stats_rx) = unbounded();
67        let (write_tx, write_rx) = unbounded();
68
69        // Test read_loop
70        read_loop(Some(test_input), stats_tx.clone(), write_tx.clone())?;
71
72        let mut received_data = Vec::new();
73        while let Ok(chunk) = write_rx.recv() {
74            if chunk.is_empty() {
75                break;
76            }
77            received_data.extend(chunk);
78        }
79
80        assert_eq!(test_data.as_bytes(), received_data.as_slice());
81
82        // Clean up the test file
83        std::fs::remove_file(test_input)?;
84
85        Ok(())
86    }
87}