libmactime2/bodyfile/
bodyfile_reader.rs

1use crate::Provider;
2use crate::stream::*;
3use crate::Joinable;
4use encoding_rs_io::DecodeReaderBytesBuilder;
5use std::io::{BufRead, BufReader, Read};
6use std::sync::mpsc::{Receiver, Sender};
7use std::thread::{JoinHandle};
8
9pub struct BodyfileReader {
10    worker: Option<JoinHandle<()>>,
11    rx: Option<Receiver<String>>,
12}
13
14impl Provider<String, ()> for BodyfileReader {
15    fn get_receiver(&mut self) -> Receiver<String> {
16        self.rx.take().unwrap()
17    }
18}
19
20impl StreamWorker<String> for BodyfileReader {
21    fn worker<R: Read + Send>(input: R, tx: Sender<String>) {
22        let mut line_ctr = 1;
23
24        let drb = DecodeReaderBytesBuilder::new()
25            .encoding(Some(encoding_rs::UTF_8))
26            .utf8_passthru(true)
27            .build(input);
28        let mut reader = BufReader::new(drb);
29
30        loop {
31            let mut line = String::new();
32            let size = reader.read_line(&mut line);
33
34            match size {
35                Err(why) => {
36                    eprintln!("IO Error in line {}: {:?}", line_ctr, why);
37                    break;
38                }
39                Ok(s) => {
40                    if s == 0 {
41                        break;
42                    }
43
44                    if tx.send(line).is_err() {
45                        break;
46                    }
47                }
48            }
49            line_ctr += 1;
50        }
51    }
52}
53
54impl StreamReader<String, ()> for BodyfileReader {
55    fn new (worker: JoinHandle<()>, rx: Receiver<String>) -> Self {
56        Self {
57            worker: Some(worker),
58            rx: Some(rx)
59        }
60    }
61}
62
63impl Joinable<()> for BodyfileReader {
64    fn join(&mut self) -> std::thread::Result<()> {
65        self.worker.take().unwrap().join()
66    }
67}