aprs_logger/
stream_processor.rs1use std::io::{BufReader, BufRead, Error, Read};
2
3pub type StreamProcessor<T> = Box<dyn Fn(&[u8]) -> Option<T>>;
4
5pub struct StreamIterator<R: Read, T> {
6 reader: BufReader<R>,
7 processor: StreamProcessor<T>,
8}
9
10
11impl<R: Read, T> StreamIterator<R, T> {
12 pub fn new(input: R, processor: StreamProcessor<T>) -> Self {
13 StreamIterator{
14 reader: BufReader::new(input),
15 processor,
16 }
17 }
18}
19
20impl<R: Read, T> Iterator for StreamIterator<R, T> {
21 type Item = Result<T, Error>;
22
23 fn next(&mut self) -> Option<Self::Item> {
24 let mut line = vec![];
25 match self.reader.read_until(b'\n', &mut line) {
26 Ok(0) => None, Ok(_) => {
28 match (self.processor)(&line) {
30 Some(value) => Some(Ok(value)),
31 None => self.next(), }
33 }
34 Err(e) => Some(Err(e)),
35 }
36 }
37}
38
39pub fn process_stream<R, T, F>(input: R, processor: F) -> StreamIterator<R, T>
40where
41 R: Read,
42 F: Fn(&[u8]) -> Option<T> + 'static,
43{
44 StreamIterator::new(input, Box::new(processor))
45}