1use crate::CHUNK_SIZE;
9
10use crossbeam::channel::Sender;
11use std::fs::File;
12use std::io::{self, BufReader, Read, Result};
13
14pub fn read_loop(
16 infile: Option<&str>,
17 stats_tx: Sender<usize>,
18 write_tx: Sender<Vec<u8>>,
19) -> Result<()> {
20 let mut reader: Box<dyn Read> = match infile {
22 Some(path) => Box::new(BufReader::new(File::open(path)?)),
23 None => Box::new(BufReader::new(io::stdin())),
24 };
25
26 let mut buffer = [0; CHUNK_SIZE];
27
28 loop {
29 let num_read = match reader.read(&mut buffer) {
31 Ok(0) => break, Ok(x) => x, Err(_) => break, };
35
36 let _ = stats_tx.send(num_read);
38
39 if write_tx.send(Vec::from(&buffer[..num_read])).is_err() {
41 break;
42 }
43 }
44 let _ = stats_tx.send(0);
46 let _ = write_tx.send(Vec::new());
47 Ok(())
48}
49
50#[cfg(test)]
51mod tests {
52 use super::*;
53 use crossbeam::channel::unbounded;
54 use std::fs::File;
55 use std::io::Write;
56
57 #[test]
58 fn test_read_loop() -> Result<()> {
59 let test_input = "test_input.txt";
61 let test_data = "Pipe Progress...";
62 let mut file = File::create(test_input)?;
63 file.write_all(test_data.as_bytes())?;
64
65 let (stats_tx, _stats_rx) = unbounded();
67 let (write_tx, write_rx) = unbounded();
68
69 read_loop(Some(test_input), stats_tx.clone(), write_tx.clone())?;
71
72 let mut received_data = Vec::new();
73 while let Ok(chunk) = write_rx.recv() {
74 if chunk.is_empty() {
75 break;
76 }
77 received_data.extend(chunk);
78 }
79
80 assert_eq!(test_data.as_bytes(), received_data.as_slice());
81
82 std::fs::remove_file(test_input)?;
84
85 Ok(())
86 }
87}