Skip to main content

streams_rs/
fn_stream.rs

1use crate::*;
2/// Basic stream type. To use it you should provide getter function: 
3/// ```rust
4/// use streams_rs::{*,fn_stream::*};
5/// let get = (|| StreamResult::Ok(1));
6/// let mut stream = FnStream::new(get);
7/// stream.token(0); // 1
8/// stream.token(1); // 1
9/// ```
10pub struct FnStream<'a, T: Clone> {
11    get: Box<dyn FnMut() -> StreamResult<T> + 'a>,
12    pos: usize,
13    cache: Vec<Option<T>>,
14}
15
16impl<'a, T: Clone> FnStream<'a, T> {
17    pub fn new(get: impl FnMut() -> StreamResult<T> + 'a) -> Self {
18        Self {
19            get: Box::new(get),
20            pos: 0,
21            cache: vec![],
22        }
23    }
24    
25}
26impl<'a,T: Clone> Stream<'a> for FnStream<'a,T> {
27    type Item = T;
28     fn token(&mut self, x: usize) -> StreamResult<T> {
29        if let Some(Some(tmp)) = self.cache.get(x) {
30            return StreamResult::Ok(tmp.clone());
31        }
32        let mut n = self.cache.len();
33        let mut c = (0..x + 1).map(|_| None).collect::<Vec<_>>();
34        while n <= x {
35            let p =(self.get)();
36            if let StreamResult::Ok(val) = p {
37                c[n] = Some(val);
38                n += 1;
39            } else if let StreamResult::Err(StreamError::Str(_)) = p {
40                return p;
41            } else {
42                c[n] = None;
43            }
44        }
45        self.cache = c;
46        StreamResult::Ok(self.cache[x].as_ref().unwrap().clone())
47    }
48
49    fn junk(&mut self, mut x: usize) {
50        let c = self.cache.len();
51        self.pos += x;
52        if c >= x {
53            self.cache.truncate(c - x);
54        } else {
55            self.cache = vec![];
56            x = x - c;
57            while x > 0 {
58                (self.get)();
59                x = x - 1;
60            }
61        }
62    }
63
64    fn pos(&self) -> usize {
65        self.pos
66    }
67}