telegram_bot/
stream.rs

1use std::cmp::max;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::Context;
6use std::task::Poll;
7use std::time::Duration;
8
9use futures::Stream;
10
11use telegram_bot_raw::{AllowedUpdate, GetUpdates, Integer, Update};
12
13use crate::api::Api;
14use crate::errors::Error;
15
16const TELEGRAM_LONG_POLL_TIMEOUT_SECONDS: u64 = 5;
17const TELEGRAM_LONG_POLL_LIMIT_MESSAGES: Integer = 100;
18const TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS: u64 = 500;
19
20/// This type represents stream of Telegram API updates and uses
21/// long polling method under the hood.
22#[must_use = "streams do nothing unless polled"]
23pub struct UpdatesStream {
24    api: Api,
25    last_update: Integer,
26    buffer: VecDeque<Update>,
27    current_request:
28        Option<Pin<Box<dyn Future<Output = Result<Option<Vec<Update>>, Error>> + Send>>>,
29    timeout: Duration,
30    allowed_updates: Vec<AllowedUpdate>,
31    limit: Integer,
32    error_delay: Duration,
33    next_poll_id: usize,
34}
35
36impl Stream for UpdatesStream {
37    type Item = Result<Update, Error>;
38
39    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
40        let ref_mut = self.get_mut();
41        let poll_id = ref_mut.next_poll_id;
42        ref_mut.next_poll_id += 1;
43        let span = tracing::trace_span!("stream", poll_id = poll_id);
44        let _enter = span.enter();
45
46        tracing::trace!("start stream polling");
47
48        if let Some(value) = ref_mut.buffer.pop_front() {
49            tracing::trace!(update = ?value, "returning buffered update");
50            return Poll::Ready(Some(Ok(value)));
51        }
52        tracing::trace!("processing request");
53
54        let result = match ref_mut.current_request {
55            None => {
56                tracing::trace!("there is no current request");
57                Ok(false)
58            }
59            Some(ref mut current_request) => {
60                let cc = current_request.as_mut();
61                let polled_update = cc.poll(cx);
62                match polled_update {
63                    Poll::Pending => {
64                        tracing::trace!("request is pending");
65                        return Poll::Pending;
66                    }
67                    Poll::Ready(Ok(None)) => {
68                        tracing::trace!("request timed out");
69                        Ok(false)
70                    }
71                    Poll::Ready(Ok(Some(ref updates))) if updates.is_empty() => {
72                        tracing::trace!("request resolved to empty update list");
73                        Ok(false)
74                    }
75                    Poll::Ready(Ok(Some(updates))) => {
76                        for update in updates {
77                            tracing::trace!(update = ?update, "processing update");
78                            ref_mut.last_update = max(update.id, ref_mut.last_update);
79                            tracing::trace!(last_update = ref_mut.last_update);
80                            ref_mut.buffer.push_back(update)
81                        }
82
83                        Ok(true)
84                    }
85                    Poll::Ready(Err(err)) => {
86                        tracing::error!(error = %err, "request error");
87                        Err(err)
88                    }
89                }
90            }
91        };
92
93        match result {
94            Err(err) => {
95                let timeout = ref_mut.timeout + Duration::from_secs(1);
96                let mut get_updates = GetUpdates::new();
97                get_updates
98                    .offset(ref_mut.last_update + 1)
99                    .timeout(ref_mut.error_delay.as_secs() as Integer)
100                    .limit(ref_mut.limit)
101                    .allowed_updates(&ref_mut.allowed_updates);
102                tracing::trace!(request = ?get_updates, timeout=?timeout, "preparing new request");
103
104                let request = ref_mut.api.send_timeout(get_updates, timeout);
105                ref_mut.current_request = Some(Box::pin(request));
106                return Poll::Ready(Some(Err(err)));
107            }
108            Ok(false) => {
109                let timeout = ref_mut.timeout + Duration::from_secs(1);
110                let mut get_updates = GetUpdates::new();
111                get_updates
112                    .offset(ref_mut.last_update + 1)
113                    .timeout(ref_mut.error_delay.as_secs() as Integer)
114                    .limit(ref_mut.limit)
115                    .allowed_updates(&ref_mut.allowed_updates);
116                tracing::trace!(request = ?get_updates, timeout=?timeout, "preparing new request");
117
118                let request = ref_mut.api.send_timeout(get_updates, timeout);
119                ref_mut.current_request = Some(Box::pin(request));
120
121                tracing::trace!("executing recursive call");
122                Pin::new(ref_mut).poll_next(cx)
123            }
124            Ok(true) => {
125                tracing::trace!("dropping request");
126                ref_mut.current_request = None;
127                tracing::trace!("executing recursive call");
128                Pin::new(ref_mut).poll_next(cx)
129            }
130        }
131    }
132}
133
134impl UpdatesStream {
135    ///  create a new `UpdatesStream` instance.
136    pub fn new(api: &Api) -> Self {
137        UpdatesStream {
138            api: api.clone(),
139            last_update: 0,
140            buffer: VecDeque::new(),
141            current_request: None,
142            timeout: Duration::from_secs(TELEGRAM_LONG_POLL_TIMEOUT_SECONDS),
143            allowed_updates: Vec::new(),
144            limit: TELEGRAM_LONG_POLL_LIMIT_MESSAGES,
145            error_delay: Duration::from_millis(TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS),
146            next_poll_id: 0,
147        }
148    }
149
150    /// Set timeout for long polling requests, this corresponds with `timeout` field
151    /// in [getUpdates](https://core.telegram.org/bots/api#getupdates) method,
152    /// also this stream sets an additional request timeout for `timeout + 1 second`
153    /// in case of invalid Telegram API server behaviour.
154    ///
155    /// Default timeout is 5 seconds.
156    pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
157        self.timeout = timeout;
158        self
159    }
160
161    /// Set allowed updates to receive, this corresponds with `allowed_updates` field
162    /// in [getUpdates](https://core.telegram.org/bots/api#getupdates) method.
163    /// List the types of updates you want your bot to receive. For example,
164    /// specify [“message”, “edited_channel_post”, “callback_query”] to only receive updates of these types.
165    /// See Update for a complete list of available update types. Specify an empty list to receive all
166    /// updates regardless of type (default). If not specified, the previous setting will be used.
167    ///
168    /// Please note that this parameter doesn't affect updates created before the call to the getUpdates,
169    /// so unwanted updates may be received for a short period of time.
170    pub fn allowed_updates(&mut self, allowed_updates: &[AllowedUpdate]) -> &mut Self {
171        self.allowed_updates = allowed_updates.to_vec();
172        self
173    }
174
175    /// Set limits the number of updates to be retrieved, this corresponds with `limit` field
176    /// in [getUpdates](https://core.telegram.org/bots/api#getupdates) method.
177    /// Values between 1—100 are accepted.
178    ///
179    /// Defaults to 100.
180    pub fn limit(&mut self, limit: Integer) -> &mut Self {
181        self.limit = limit;
182        self
183    }
184
185    /// Set a delay between erroneous request and next request.
186    /// This delay prevents busy looping in some cases.
187    ///
188    /// Default delay is 500 ms.
189    pub fn error_delay(&mut self, delay: Duration) -> &mut Self {
190        self.error_delay = delay;
191        self
192    }
193}