libmactime2/bodyfile/
bodyfile_reader.rs1use crate::Provider;
2use crate::stream::*;
3use crate::Joinable;
4use encoding_rs_io::DecodeReaderBytesBuilder;
5use std::io::{BufRead, BufReader, Read};
6use std::sync::mpsc::{Receiver, Sender};
7use std::thread::{JoinHandle};
8
9pub struct BodyfileReader {
10 worker: Option<JoinHandle<()>>,
11 rx: Option<Receiver<String>>,
12}
13
14impl Provider<String, ()> for BodyfileReader {
15 fn get_receiver(&mut self) -> Receiver<String> {
16 self.rx.take().unwrap()
17 }
18}
19
20impl StreamWorker<String> for BodyfileReader {
21 fn worker<R: Read + Send>(input: R, tx: Sender<String>) {
22 let mut line_ctr = 1;
23
24 let drb = DecodeReaderBytesBuilder::new()
25 .encoding(Some(encoding_rs::UTF_8))
26 .utf8_passthru(true)
27 .build(input);
28 let mut reader = BufReader::new(drb);
29
30 loop {
31 let mut line = String::new();
32 let size = reader.read_line(&mut line);
33
34 match size {
35 Err(why) => {
36 eprintln!("IO Error in line {}: {:?}", line_ctr, why);
37 break;
38 }
39 Ok(s) => {
40 if s == 0 {
41 break;
42 }
43
44 if tx.send(line).is_err() {
45 break;
46 }
47 }
48 }
49 line_ctr += 1;
50 }
51 }
52}
53
54impl StreamReader<String, ()> for BodyfileReader {
55 fn new (worker: JoinHandle<()>, rx: Receiver<String>) -> Self {
56 Self {
57 worker: Some(worker),
58 rx: Some(rx)
59 }
60 }
61}
62
63impl Joinable<()> for BodyfileReader {
64 fn join(&mut self) -> std::thread::Result<()> {
65 self.worker.take().unwrap().join()
66 }
67}