tokio_bits/stream/
tbseq.rs

1use futures::{Stream, Async, Poll};
2
3use ops::inc::Incrementable;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6/// A stream that produces a sequence of numbers
7/// for a maximum amount of time.
8pub struct TBSeqStream<T> {
9    next: T,
10    start: u64,
11    max: u64,
12}
13
14impl<T> TBSeqStream<T> {
15    pub fn new(initial: T, max_sec: u64) -> Self {
16        TBSeqStream {
17            next: initial,
18            start: Self::ts(),
19            max: max_sec,
20        }
21    }
22
23    fn ts() -> u64 {
24        SystemTime::now()
25            .duration_since(UNIX_EPOCH)
26            .map(|d| d.as_secs())
27            .expect("Time went backwards")
28    }
29}
30
31/// The implementation of the Stream trait uses the generic
32/// post_inc() to produce the next value.
33impl<T: Incrementable> Stream for TBSeqStream<T> {
34    type Item = T;
35
36    type Error = ();
37    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
38
39        if Self::ts() - self.start > self.max {
40            Ok(Async::Ready(None))
41        } else {
42            let v = self.next;
43            self.next.post_inc();
44            Ok(Async::Ready(Some(v)))
45        }
46    }
47}