telegram_bot_fork/
stream.rs1use std::{cmp::max, collections::VecDeque, time::Duration};
2
3use futures::{task, Async, Future, Poll, Stream};
4use tokio_timer;
5
6use telegram_bot_fork_raw::{AllowedUpdate, GetUpdates, Integer, Update};
7
8use api::Api;
9use errors::Error;
10use future::{NewTelegramFuture, TelegramFuture};
11
12const TELEGRAM_LONG_POLL_TIMEOUT_SECONDS: u64 = 5;
13const TELEGRAM_LONG_POLL_LIMIT_MESSAGES: Integer = 100;
14const TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS: u64 = 500;
15
16#[must_use = "streams do nothing unless polled"]
19pub struct UpdatesStream {
20 api: Api,
21 last_update: Integer,
22 buffer: VecDeque<Update>,
23 current_request: Option<TelegramFuture<Option<Vec<Update>>>>,
24 timeout: Duration,
25 allowed_updates: Vec<AllowedUpdate>,
26 limit: Integer,
27 error_delay: Duration,
28}
29
30impl Stream for UpdatesStream {
31 type Item = Update;
32 type Error = Error;
33
34 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
35 if let Some(value) = self.buffer.pop_front() {
36 return Ok(Async::Ready(Some(value)));
37 }
38
39 let result = match self.current_request {
40 Some(ref mut current_request) => {
41 let polled_update = current_request.poll();
42 match polled_update {
43 Ok(Async::Ready(Some(updates))) => {
44 for update in updates {
45 self.last_update = max(update.id, self.last_update);
46 self.buffer.push_back(update)
47 }
48 Ok(true)
49 }
50 Ok(Async::Ready(None)) => Ok(false),
51 Ok(Async::NotReady) => return Ok(Async::NotReady),
52 Err(err) => Err(err),
53 }
54 }
55 None => Ok(false),
56 };
57
58 match result {
59 Ok(true) => {
60 self.current_request = None;
61 task::current().notify();
62 Ok(Async::NotReady)
63 }
64 Ok(false) => {
65 let timeout = self.timeout + Duration::from_secs(1);
66
67 let request = self.api.send_timeout(
68 GetUpdates::new()
69 .offset(self.last_update + 1)
70 .timeout(self.timeout.as_secs() as Integer)
71 .allowed_updates(&self.allowed_updates)
72 .limit(self.limit),
73 timeout,
74 );
75
76 self.current_request = Some(request);
77 task::current().notify();
78 Ok(Async::NotReady)
79 }
80 Err(err) => {
81 let timeout_future = tokio_timer::sleep(self.error_delay)
82 .from_err()
83 .map(|()| None);
84
85 self.current_request = Some(TelegramFuture::new(Box::new(timeout_future)));
86 Err(err)
87 }
88 }
89 }
90}
91
92pub trait NewUpdatesStream {
93 fn new(api: Api) -> Self;
94}
95
96impl NewUpdatesStream for UpdatesStream {
97 fn new(api: Api) -> Self {
98 UpdatesStream {
99 api,
100 last_update: 0,
101 buffer: VecDeque::new(),
102 current_request: None,
103 timeout: Duration::from_secs(TELEGRAM_LONG_POLL_TIMEOUT_SECONDS),
104 allowed_updates: Vec::new(),
105 limit: TELEGRAM_LONG_POLL_LIMIT_MESSAGES,
106 error_delay: Duration::from_millis(TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS),
107 }
108 }
109}
110
111impl UpdatesStream {
112 pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
119 self.timeout = timeout;
120 self
121 }
122
123 pub fn allowed_updates(&mut self, allowed_updates: &[AllowedUpdate]) -> &mut Self {
133 self.allowed_updates = allowed_updates.to_vec();
134 self
135 }
136
137 pub fn limit(&mut self, limit: Integer) -> &mut Self {
143 self.limit = limit;
144 self
145 }
146
147 pub fn error_delay(&mut self, delay: Duration) -> &mut Self {
152 self.error_delay = delay;
153 self
154 }
155}