1use std::cmp::max;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::Context;
6use std::task::Poll;
7use std::time::Duration;
8
9use futures::Stream;
10
11use telegram_bot_raw::{AllowedUpdate, GetUpdates, Integer, Update};
12
13use crate::api::Api;
14use crate::errors::Error;
15
16const TELEGRAM_LONG_POLL_TIMEOUT_SECONDS: u64 = 5;
17const TELEGRAM_LONG_POLL_LIMIT_MESSAGES: Integer = 100;
18const TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS: u64 = 500;
19
20#[must_use = "streams do nothing unless polled"]
23pub struct UpdatesStream {
24 api: Api,
25 last_update: Integer,
26 buffer: VecDeque<Update>,
27 current_request:
28 Option<Pin<Box<dyn Future<Output = Result<Option<Vec<Update>>, Error>> + Send>>>,
29 timeout: Duration,
30 allowed_updates: Vec<AllowedUpdate>,
31 limit: Integer,
32 error_delay: Duration,
33 next_poll_id: usize,
34}
35
36impl Stream for UpdatesStream {
37 type Item = Result<Update, Error>;
38
39 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
40 let ref_mut = self.get_mut();
41 let poll_id = ref_mut.next_poll_id;
42 ref_mut.next_poll_id += 1;
43 let span = tracing::trace_span!("stream", poll_id = poll_id);
44 let _enter = span.enter();
45
46 tracing::trace!("start stream polling");
47
48 if let Some(value) = ref_mut.buffer.pop_front() {
49 tracing::trace!(update = ?value, "returning buffered update");
50 return Poll::Ready(Some(Ok(value)));
51 }
52 tracing::trace!("processing request");
53
54 let result = match ref_mut.current_request {
55 None => {
56 tracing::trace!("there is no current request");
57 Ok(false)
58 }
59 Some(ref mut current_request) => {
60 let cc = current_request.as_mut();
61 let polled_update = cc.poll(cx);
62 match polled_update {
63 Poll::Pending => {
64 tracing::trace!("request is pending");
65 return Poll::Pending;
66 }
67 Poll::Ready(Ok(None)) => {
68 tracing::trace!("request timed out");
69 Ok(false)
70 }
71 Poll::Ready(Ok(Some(ref updates))) if updates.is_empty() => {
72 tracing::trace!("request resolved to empty update list");
73 Ok(false)
74 }
75 Poll::Ready(Ok(Some(updates))) => {
76 for update in updates {
77 tracing::trace!(update = ?update, "processing update");
78 ref_mut.last_update = max(update.id, ref_mut.last_update);
79 tracing::trace!(last_update = ref_mut.last_update);
80 ref_mut.buffer.push_back(update)
81 }
82
83 Ok(true)
84 }
85 Poll::Ready(Err(err)) => {
86 tracing::error!(error = %err, "request error");
87 Err(err)
88 }
89 }
90 }
91 };
92
93 match result {
94 Err(err) => {
95 let timeout = ref_mut.timeout + Duration::from_secs(1);
96 let mut get_updates = GetUpdates::new();
97 get_updates
98 .offset(ref_mut.last_update + 1)
99 .timeout(ref_mut.error_delay.as_secs() as Integer)
100 .limit(ref_mut.limit)
101 .allowed_updates(&ref_mut.allowed_updates);
102 tracing::trace!(request = ?get_updates, timeout=?timeout, "preparing new request");
103
104 let request = ref_mut.api.send_timeout(get_updates, timeout);
105 ref_mut.current_request = Some(Box::pin(request));
106 return Poll::Ready(Some(Err(err)));
107 }
108 Ok(false) => {
109 let timeout = ref_mut.timeout + Duration::from_secs(1);
110 let mut get_updates = GetUpdates::new();
111 get_updates
112 .offset(ref_mut.last_update + 1)
113 .timeout(ref_mut.error_delay.as_secs() as Integer)
114 .limit(ref_mut.limit)
115 .allowed_updates(&ref_mut.allowed_updates);
116 tracing::trace!(request = ?get_updates, timeout=?timeout, "preparing new request");
117
118 let request = ref_mut.api.send_timeout(get_updates, timeout);
119 ref_mut.current_request = Some(Box::pin(request));
120
121 tracing::trace!("executing recursive call");
122 Pin::new(ref_mut).poll_next(cx)
123 }
124 Ok(true) => {
125 tracing::trace!("dropping request");
126 ref_mut.current_request = None;
127 tracing::trace!("executing recursive call");
128 Pin::new(ref_mut).poll_next(cx)
129 }
130 }
131 }
132}
133
134impl UpdatesStream {
135 pub fn new(api: &Api) -> Self {
137 UpdatesStream {
138 api: api.clone(),
139 last_update: 0,
140 buffer: VecDeque::new(),
141 current_request: None,
142 timeout: Duration::from_secs(TELEGRAM_LONG_POLL_TIMEOUT_SECONDS),
143 allowed_updates: Vec::new(),
144 limit: TELEGRAM_LONG_POLL_LIMIT_MESSAGES,
145 error_delay: Duration::from_millis(TELEGRAM_LONG_POLL_ERROR_DELAY_MILLISECONDS),
146 next_poll_id: 0,
147 }
148 }
149
150 pub fn timeout(&mut self, timeout: Duration) -> &mut Self {
157 self.timeout = timeout;
158 self
159 }
160
161 pub fn allowed_updates(&mut self, allowed_updates: &[AllowedUpdate]) -> &mut Self {
171 self.allowed_updates = allowed_updates.to_vec();
172 self
173 }
174
175 pub fn limit(&mut self, limit: Integer) -> &mut Self {
181 self.limit = limit;
182 self
183 }
184
185 pub fn error_delay(&mut self, delay: Duration) -> &mut Self {
190 self.error_delay = delay;
191 self
192 }
193}