grammers_client/client/updates.rs
1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Methods to deal with and offer access to updates.
10
11use super::Client;
12use crate::types::{ChatMap, Update};
13use futures_util::future::{select, Either};
14pub use grammers_mtsender::{AuthorizationError, InvocationError};
15use grammers_session::channel_id;
16pub use grammers_session::{PrematureEndReason, UpdateState};
17use grammers_tl_types as tl;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::time::sleep_until;
22
23/// How long to wait after warning the user that the updates limit was exceeded.
24const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
25
26impl Client {
27 /// Returns the next update from the buffer where they are queued until used.
28 ///
29 /// # Example
30 ///
31 /// ```
32 /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
33 /// use grammers_client::Update;
34 ///
35 /// loop {
36 /// let update = client.next_update().await?;
37 /// // Echo incoming messages and ignore everything else
38 /// match update {
39 /// Update::NewMessage(mut message) if !message.outgoing() => {
40 /// message.respond(message.text()).await?;
41 /// }
42 /// _ => {}
43 /// }
44 /// }
45 /// # Ok(())
46 /// # }
47 /// ```
48 pub async fn next_update(&self) -> Result<Update, InvocationError> {
49 loop {
50 let (update, chats) = self.next_raw_update().await?;
51
52 if let Some(update) = Update::new(&self, update, &chats) {
53 return Ok(update);
54 }
55 }
56 }
57
58 /// Returns the next raw update and associated chat map from the buffer where they are queued until used.
59 ///
60 /// # Example
61 ///
62 /// ```
63 /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
64 /// loop {
65 /// let (update, chats) = client.next_raw_update().await?;
66 ///
67 /// // Print all incoming updates in their raw form
68 /// dbg!(update);
69 /// }
70 /// # Ok(())
71 /// # }
72 ///
73 /// ```
74 ///
75 /// P.S. If you don't receive updateBotInlineSend, go to [@BotFather](https://t.me/BotFather), select your bot and click "Bot Settings", then "Inline Feedback" and select probability.
76 ///
77 pub async fn next_raw_update(
78 &self,
79 ) -> Result<(tl::enums::Update, Arc<ChatMap>), InvocationError> {
80 loop {
81 let (deadline, get_diff, get_channel_diff) = {
82 let state = &mut *self.0.state.write().unwrap();
83 if let Some(update) = state.updates.pop_front() {
84 return Ok(update);
85 }
86 (
87 state.message_box.check_deadlines(), // first, as it might trigger differences
88 state.message_box.get_difference(),
89 state.message_box.get_channel_difference(&state.chat_hashes),
90 )
91 };
92
93 if let Some(request) = get_diff {
94 let response = self.invoke(&request).await?;
95 let (updates, users, chats) = {
96 let state = &mut *self.0.state.write().unwrap();
97 state
98 .message_box
99 .apply_difference(response, &mut state.chat_hashes)
100 };
101 self.extend_update_queue(updates, ChatMap::new(users, chats));
102 continue;
103 }
104
105 if let Some(request) = get_channel_diff {
106 let maybe_response = self.invoke(&request).await;
107
108 let response = match maybe_response {
109 Ok(r) => r,
110 Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
111 // According to Telegram's docs:
112 // "Channel internal replication issues, try again later (treat this like an RPC_CALL_FAIL)."
113 // We can treat this as "empty difference" and not update the local pts.
114 // Then this same call will be retried when another gap is detected or timeout expires.
115 //
116 // Another option would be to literally treat this like an RPC_CALL_FAIL and retry after a few
117 // seconds, but if Telegram is having issues it's probably best to wait for it to send another
118 // update (hinting it may be okay now) and retry then.
119 //
120 // This is a bit hacky because MessageBox doesn't really have a way to "not update" the pts.
121 // Instead we manually extract the previously-known pts and use that.
122 log::warn!("Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved");
123 {
124 self.0
125 .state
126 .write()
127 .unwrap()
128 .message_box
129 .end_channel_difference(
130 &request,
131 PrematureEndReason::TemporaryServerIssues,
132 );
133 }
134 continue;
135 }
136 Err(e) if e.is("CHANNEL_PRIVATE") => {
137 log::info!(
138 "Account is now banned in {} so we can no longer fetch updates from it",
139 channel_id(&request)
140 .map(|i| i.to_string())
141 .unwrap_or_else(|| "empty channel".into())
142 );
143 {
144 self.0
145 .state
146 .write()
147 .unwrap()
148 .message_box
149 .end_channel_difference(&request, PrematureEndReason::Banned);
150 }
151 continue;
152 }
153 Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
154 log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
155 {
156 self.0
157 .state
158 .write()
159 .unwrap()
160 .message_box
161 .end_channel_difference(
162 &request,
163 PrematureEndReason::TemporaryServerIssues,
164 );
165 }
166 continue;
167 }
168 Err(e) => return Err(e),
169 };
170
171 let (updates, users, chats) = {
172 let state = &mut *self.0.state.write().unwrap();
173 state.message_box.apply_channel_difference(
174 request,
175 response,
176 &mut state.chat_hashes,
177 )
178 };
179
180 self.extend_update_queue(updates, ChatMap::new(users, chats));
181 continue;
182 }
183
184 let sleep = pin!(async { sleep_until(deadline.into()).await });
185 let step = pin!(async { self.step().await });
186
187 match select(sleep, step).await {
188 Either::Left(_) => {}
189 Either::Right((step, _)) => step?,
190 }
191 }
192 }
193
194 pub(crate) fn process_socket_updates(&self, all_updates: Vec<tl::enums::Updates>) {
195 if all_updates.is_empty() {
196 return;
197 }
198
199 let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
200 {
201 let state = &mut *self.0.state.write().unwrap();
202
203 for updates in all_updates {
204 if state
205 .message_box
206 .ensure_known_peer_hashes(&updates, &mut state.chat_hashes)
207 .is_err()
208 {
209 continue;
210 }
211 match state
212 .message_box
213 .process_updates(updates, &state.chat_hashes)
214 {
215 Ok(tup) => {
216 if let Some(res) = result.as_mut() {
217 res.0.extend(tup.0);
218 res.1.extend(tup.1);
219 res.2.extend(tup.2);
220 } else {
221 result = Some(tup);
222 }
223 }
224 Err(_) => return,
225 }
226 }
227 }
228
229 if let Some((updates, users, chats)) = result {
230 self.extend_update_queue(updates, ChatMap::new(users, chats));
231 }
232 }
233
234 fn extend_update_queue(&self, mut updates: Vec<tl::enums::Update>, chat_map: Arc<ChatMap>) {
235 let mut state = self.0.state.write().unwrap();
236
237 if let Some(limit) = self.0.config.params.update_queue_limit {
238 if let Some(exceeds) = (state.updates.len() + updates.len()).checked_sub(limit + 1) {
239 let exceeds = exceeds + 1;
240 let now = Instant::now();
241 let notify = match state.last_update_limit_warn {
242 None => true,
243 Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
244 };
245
246 updates.truncate(updates.len() - exceeds);
247 if notify {
248 log::warn!(
249 "{} updates were dropped because the update_queue_limit was exceeded",
250 exceeds
251 );
252 }
253
254 state.last_update_limit_warn = Some(now);
255 }
256 }
257
258 state
259 .updates
260 .extend(updates.into_iter().map(|u| (u, chat_map.clone())));
261 }
262
263 /// Synchronize the updates state to the session.
264 pub fn sync_update_state(&self) {
265 let state = self.0.state.read().unwrap();
266 self.0
267 .config
268 .session
269 .set_state(state.message_box.session_state());
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use super::*;
276 use core::future::Future;
277
278 fn get_client() -> Client {
279 panic!()
280 }
281
282 #[test]
283 fn ensure_next_update_future_impls_send() {
284 if false {
285 // We just want it to type-check, not actually run.
286 fn typeck(_: impl Future + Send) {}
287 typeck(get_client().next_update());
288 }
289 }
290}