1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use std::cmp::max;
use std::collections::VecDeque;
use std::time::Duration;
use futures::{Future, Stream, Poll, Async};
use futures::future;
use tokio_core::reactor::{Handle, Timeout};
use telegram_bot_raw::{GetUpdates, Update, Integer};
use api::Api;
use errors::Error;
use future::{TelegramFuture, NewTelegramFuture};
const TELEGRAM_LONG_POLL_TIMEOUT_SECONDS: u64 = 5;
const TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS: u64 = 500;
#[must_use = "streams do nothing unless polled"]
pub struct UpdatesStream {
api: Api,
handle: Handle,
last_update: Integer,
buffer: VecDeque<Update>,
current_request: Option<TelegramFuture<Option<Vec<Update>>>>,
timeout: Duration,
error_delay: Duration
}
impl Stream for UpdatesStream {
type Item = Update;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(value) = self.buffer.pop_front() {
return Ok(Async::Ready(Some(value)))
}
let result = match self.current_request {
None => Ok(false),
Some(ref mut current_request) => {
let polled_update = current_request.poll();
match polled_update {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => Ok(false),
Ok(Async::Ready(Some(updates))) => {
for update in updates {
self.last_update = max(update.id, self.last_update);
self.buffer.push_back(update)
}
Ok(true)
},
Err(err) => Err(err)
}
}
};
match result {
Err(err) => {
let timeout_future = future::result(Timeout::new(self.error_delay, &self.handle));
let timeout_future = timeout_future.map_err(From::from).and_then(|timeout| {
timeout.map_err(From::from).map(|()| None)
});
self.current_request = Some(TelegramFuture::new(Box::new(timeout_future)));
return Err(err)
}
Ok(false) => {
let timeout = self.timeout + Duration::from_secs(1);
let request = self.api.send_timeout(GetUpdates::new()
.offset(self.last_update + 1)
.timeout(self.timeout.as_secs() as Integer)
, timeout);
self.current_request = Some(request);
self.poll()
},
Ok(true) => {
self.current_request = None;
self.poll()
}
}
}
}
pub trait NewUpdatesStream {
fn new(api: Api, handle: Handle) -> Self;
}
impl NewUpdatesStream for UpdatesStream{
fn new(api: Api, handle: Handle) -> Self {
UpdatesStream {
api: api,
handle: handle,
last_update: 0,
buffer: VecDeque::new(),
current_request: None,
timeout: Duration::from_secs(TELEGRAM_LONG_POLL_TIMEOUT_SECONDS),
error_delay: Duration::from_millis(TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS)
}
}
}
impl UpdatesStream {
pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
self.timeout = timeout;
self
}
pub fn error_delay(&mut self, delay: Duration) -> &mut Self {
self.error_delay = delay;
self
}
}