1use std::cmp::max;
2use std::collections::VecDeque;
3
4use futures::Async;
5use futures::Future;
6use futures::Stream;
7
8use error::Error;
9use responses::Update;
10use std::i64;
11
12pub struct UpdatesStream<Fut, Sender> {
13 pub bot_api_client: Sender,
14 pub buffer: VecDeque<Update>,
15 pub executing_request: Fut,
16 pub is_canceled: bool,
17 pub last_id: Option<i64>,
18 pub has_error: bool,
19}
20
21impl<Fut, Sender> Stream for UpdatesStream<Fut, Sender>
22 where Fut: Future<Item=Vec<Update>, Error=Error>,
23 Sender: FnMut(Option<i64>) -> Fut {
24 type Item = Update;
25 type Error = Error;
26
27 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
28 if self.is_canceled {
29 return Ok(Async::Ready(None));
30 }
31 if let Some(update) = self.buffer.pop_front() {
32 return Ok(Async::Ready(Some(update)));
33 }
34 if self.has_error {
35 self.has_error = false;
36 self.executing_request = (self.bot_api_client)(self.last_id)
37 }
38 match self.executing_request.poll() {
39 Ok(Async::NotReady) =>
40 Ok(Async::NotReady),
41
42 Ok(Async::Ready(updates)) => {
43 let last_id = self.last_id.unwrap_or(-1);
44 for update in updates {
45 self.last_id = Some(max(update.id, last_id) + 1);
46 self.buffer.push_back(update)
47 }
48 self.executing_request = (self.bot_api_client)(self.last_id);
49 self.poll()
50 }
51 Err(err) => {
52self.has_error = true;
57 Err(err)
58 }
59 }
60 }
61}
62
63impl<Fut, Sender> Drop for UpdatesStream<Fut, Sender> {
64 fn drop(&mut self) {
65 self.is_canceled = true;
66 }
67}