telegram_bot_fork/
stream.rs

1use std::{cmp::max, collections::VecDeque, time::Duration};
2
3use futures::{task, Async, Future, Poll, Stream};
4use tokio_timer;
5
6use telegram_bot_fork_raw::{AllowedUpdate, GetUpdates, Integer, Update};
7
8use api::Api;
9use errors::Error;
10use future::{NewTelegramFuture, TelegramFuture};
11
12const TELEGRAM_LONG_POLL_TIMEOUT_SECONDS: u64 = 5;
13const TELEGRAM_LONG_POLL_LIMIT_MESSAGES: Integer = 100;
14const TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS: u64 = 500;
15
16/// This type represents stream of Telegram API updates and uses
17/// long polling method under the hood.
18#[must_use = "streams do nothing unless polled"]
19pub struct UpdatesStream {
20    api: Api,
21    last_update: Integer,
22    buffer: VecDeque<Update>,
23    current_request: Option<TelegramFuture<Option<Vec<Update>>>>,
24    timeout: Duration,
25    allowed_updates: Vec<AllowedUpdate>,
26    limit: Integer,
27    error_delay: Duration,
28}
29
30impl Stream for UpdatesStream {
31    type Item = Update;
32    type Error = Error;
33
34    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
35        if let Some(value) = self.buffer.pop_front() {
36            return Ok(Async::Ready(Some(value)));
37        }
38
39        let result = match self.current_request {
40            Some(ref mut current_request) => {
41                let polled_update = current_request.poll();
42                match polled_update {
43                    Ok(Async::Ready(Some(updates))) => {
44                        for update in updates {
45                            self.last_update = max(update.id, self.last_update);
46                            self.buffer.push_back(update)
47                        }
48                        Ok(true)
49                    }
50                    Ok(Async::Ready(None)) => Ok(false),
51                    Ok(Async::NotReady) => return Ok(Async::NotReady),
52                    Err(err) => Err(err),
53                }
54            }
55            None => Ok(false),
56        };
57
58        match result {
59            Ok(true) => {
60                self.current_request = None;
61                task::current().notify();
62                Ok(Async::NotReady)
63            }
64            Ok(false) => {
65                let timeout = self.timeout + Duration::from_secs(1);
66
67                let request = self.api.send_timeout(
68                    GetUpdates::new()
69                        .offset(self.last_update + 1)
70                        .timeout(self.timeout.as_secs() as Integer)
71                        .allowed_updates(&self.allowed_updates)
72                        .limit(self.limit),
73                    timeout,
74                );
75
76                self.current_request = Some(request);
77                task::current().notify();
78                Ok(Async::NotReady)
79            }
80            Err(err) => {
81                let timeout_future = tokio_timer::sleep(self.error_delay)
82                    .from_err()
83                    .map(|()| None);
84
85                self.current_request = Some(TelegramFuture::new(Box::new(timeout_future)));
86                Err(err)
87            }
88        }
89    }
90}
91
92pub trait NewUpdatesStream {
93    fn new(api: Api) -> Self;
94}
95
96impl NewUpdatesStream for UpdatesStream {
97    fn new(api: Api) -> Self {
98        UpdatesStream {
99            api,
100            last_update: 0,
101            buffer: VecDeque::new(),
102            current_request: None,
103            timeout: Duration::from_secs(TELEGRAM_LONG_POLL_TIMEOUT_SECONDS),
104            allowed_updates: Vec::new(),
105            limit: TELEGRAM_LONG_POLL_LIMIT_MESSAGES,
106            error_delay: Duration::from_millis(TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS),
107        }
108    }
109}
110
111impl UpdatesStream {
112    /// Set timeout for long polling requests, this corresponds with `timeout` field
113    /// in [getUpdates](https://core.telegram.org/bots/api#getupdates) method,
114    /// also this stream sets an additional request timeout for `timeout + 1 second`
115    /// in case of invalid Telegram API server behaviour.
116    ///
117    /// Default timeout is 5 seconds.
118    pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
119        self.timeout = timeout;
120        self
121    }
122
123    /// Set timeout for long polling requests, this corresponds with `allowed_updates` field
124    /// in [getUpdates](https://core.telegram.org/bots/api#getupdates) method.
125    /// List the types of updates you want your bot to receive. For example,
126    /// specify [“message”, “edited_channel_post”, “callback_query”] to only receive updates of these types.
127    /// See Update for a complete list of available update types. Specify an empty list to receive all
128    /// updates regardless of type (default). If not specified, the previous setting will be used.
129    ///
130    /// Please note that this parameter doesn't affect updates created before the call to the getUpdates,
131    /// so unwanted updates may be received for a short period of time.
132    pub fn allowed_updates(&mut self, allowed_updates: &[AllowedUpdate]) -> &mut Self {
133        self.allowed_updates = allowed_updates.to_vec();
134        self
135    }
136
137    /// Set limits the number of updates to be retrieved, this corresponds with `limit` field
138    /// in [getUpdates](https://core.telegram.org/bots/api#getupdates) method.
139    /// Values between 1—100 are accepted.
140    ///
141    /// Defaults to 100.
142    pub fn limit(&mut self, limit: Integer) -> &mut Self {
143        self.limit = limit;
144        self
145    }
146
147    /// Set a delay between erroneous request and next request.
148    /// This delay prevents busy looping in some cases.
149    ///
150    /// Default delay is 500 ms.
151    pub fn error_delay(&mut self, delay: Duration) -> &mut Self {
152        self.error_delay = delay;
153        self
154    }
155}