relegram/
stream.rs

1use std::cmp::max;
2use std::collections::VecDeque;
3
4use futures::Async;
5use futures::Future;
6use futures::Stream;
7
8use error::Error;
9use responses::Update;
10use std::i64;
11
12pub struct UpdatesStream<Fut, Sender> {
13    pub bot_api_client: Sender,
14    pub buffer: VecDeque<Update>,
15    pub executing_request: Fut,
16    pub is_canceled: bool,
17    pub last_id: Option<i64>,
18    pub has_error: bool,
19}
20
21impl<Fut, Sender> Stream for UpdatesStream<Fut, Sender>
22    where Fut: Future<Item=Vec<Update>, Error=Error>,
23          Sender: FnMut(Option<i64>) -> Fut {
24    type Item = Update;
25    type Error = Error;
26
27    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
28        if self.is_canceled {
29            return Ok(Async::Ready(None));
30        }
31        if let Some(update) = self.buffer.pop_front() {
32            return Ok(Async::Ready(Some(update)));
33        }
34        if self.has_error {
35            self.has_error = false;
36            self.executing_request = (self.bot_api_client)(self.last_id)
37        }
38        match self.executing_request.poll() {
39            Ok(Async::NotReady) =>
40                Ok(Async::NotReady),
41
42            Ok(Async::Ready(updates)) => {
43                let last_id = self.last_id.unwrap_or(-1);
44                for update in updates {
45                    self.last_id = Some(max(update.id, last_id) + 1);
46                    self.buffer.push_back(update)
47                }
48                self.executing_request = (self.bot_api_client)(self.last_id);
49                self.poll()
50            }
51            Err(err) => {
52//                match err {
53//                    Error::UnexpectedResponse { .. } => self.last_id = Some(self.last_id.map(|x| x + 1).unwrap_or(-2)),
54//                    _ => {}
55//                }
56                self.has_error = true;
57                Err(err)
58            }
59        }
60    }
61}
62
63impl<Fut, Sender> Drop for UpdatesStream<Fut, Sender> {
64    fn drop(&mut self) {
65        self.is_canceled = true;
66    }
67}