grammers_client/client/
updates.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Methods to deal with and offer access to updates.
10
11use super::Client;
12use crate::types::{ChatMap, Update};
13use futures_util::future::{select, Either};
14pub use grammers_mtsender::{AuthorizationError, InvocationError};
15use grammers_session::channel_id;
16pub use grammers_session::{PrematureEndReason, UpdateState};
17use grammers_tl_types as tl;
18use std::pin::pin;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::time::sleep_until;
22
23/// How long to wait after warning the user that the updates limit was exceeded.
24const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
25
26impl Client {
27    /// Returns the next update from the buffer where they are queued until used.
28    ///
29    /// # Example
30    ///
31    /// ```
32    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
33    /// use grammers_client::Update;
34    ///
35    /// loop {
36    ///     let update = client.next_update().await?;
37    ///     // Echo incoming messages and ignore everything else
38    ///     match update {
39    ///         Update::NewMessage(mut message) if !message.outgoing() => {
40    ///             message.respond(message.text()).await?;
41    ///         }
42    ///         _ => {}
43    ///     }
44    /// }
45    /// # Ok(())
46    /// # }
47    /// ```
48    pub async fn next_update(&self) -> Result<Update, InvocationError> {
49        loop {
50            let (update, chats) = self.next_raw_update().await?;
51
52            if let Some(update) = Update::new(&self, update, &chats) {
53                return Ok(update);
54            }
55        }
56    }
57
58    /// Returns the next raw update and associated chat map from the buffer where they are queued until used.
59    ///
60    /// # Example
61    ///
62    /// ```
63    /// # async fn f(client: grammers_client::Client) -> Result<(), Box<dyn std::error::Error>> {
64    /// loop {
65    ///     let (update, chats) = client.next_raw_update().await?;
66    ///
67    ///     // Print all incoming updates in their raw form
68    ///     dbg!(update);
69    /// }
70    /// # Ok(())
71    /// # }
72    ///
73    /// ```
74    ///
75    /// P.S. If you don't receive updateBotInlineSend, go to [@BotFather](https://t.me/BotFather), select your bot and click "Bot Settings", then "Inline Feedback" and select probability.
76    ///
77    pub async fn next_raw_update(
78        &self,
79    ) -> Result<(tl::enums::Update, Arc<ChatMap>), InvocationError> {
80        loop {
81            let (deadline, get_diff, get_channel_diff) = {
82                let state = &mut *self.0.state.write().unwrap();
83                if let Some(update) = state.updates.pop_front() {
84                    return Ok(update);
85                }
86                (
87                    state.message_box.check_deadlines(), // first, as it might trigger differences
88                    state.message_box.get_difference(),
89                    state.message_box.get_channel_difference(&state.chat_hashes),
90                )
91            };
92
93            if let Some(request) = get_diff {
94                let response = self.invoke(&request).await?;
95                let (updates, users, chats) = {
96                    let state = &mut *self.0.state.write().unwrap();
97                    state
98                        .message_box
99                        .apply_difference(response, &mut state.chat_hashes)
100                };
101                self.extend_update_queue(updates, ChatMap::new(users, chats));
102                continue;
103            }
104
105            if let Some(request) = get_channel_diff {
106                let maybe_response = self.invoke(&request).await;
107
108                let response = match maybe_response {
109                    Ok(r) => r,
110                    Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
111                        // According to Telegram's docs:
112                        // "Channel internal replication issues, try again later (treat this like an RPC_CALL_FAIL)."
113                        // We can treat this as "empty difference" and not update the local pts.
114                        // Then this same call will be retried when another gap is detected or timeout expires.
115                        //
116                        // Another option would be to literally treat this like an RPC_CALL_FAIL and retry after a few
117                        // seconds, but if Telegram is having issues it's probably best to wait for it to send another
118                        // update (hinting it may be okay now) and retry then.
119                        //
120                        // This is a bit hacky because MessageBox doesn't really have a way to "not update" the pts.
121                        // Instead we manually extract the previously-known pts and use that.
122                        log::warn!("Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved");
123                        {
124                            self.0
125                                .state
126                                .write()
127                                .unwrap()
128                                .message_box
129                                .end_channel_difference(
130                                    &request,
131                                    PrematureEndReason::TemporaryServerIssues,
132                                );
133                        }
134                        continue;
135                    }
136                    Err(e) if e.is("CHANNEL_PRIVATE") => {
137                        log::info!(
138                            "Account is now banned in {} so we can no longer fetch updates from it",
139                            channel_id(&request)
140                                .map(|i| i.to_string())
141                                .unwrap_or_else(|| "empty channel".into())
142                        );
143                        {
144                            self.0
145                                .state
146                                .write()
147                                .unwrap()
148                                .message_box
149                                .end_channel_difference(&request, PrematureEndReason::Banned);
150                        }
151                        continue;
152                    }
153                    Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
154                        log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
155                        {
156                            self.0
157                                .state
158                                .write()
159                                .unwrap()
160                                .message_box
161                                .end_channel_difference(
162                                    &request,
163                                    PrematureEndReason::TemporaryServerIssues,
164                                );
165                        }
166                        continue;
167                    }
168                    Err(e) => return Err(e),
169                };
170
171                let (updates, users, chats) = {
172                    let state = &mut *self.0.state.write().unwrap();
173                    state.message_box.apply_channel_difference(
174                        request,
175                        response,
176                        &mut state.chat_hashes,
177                    )
178                };
179
180                self.extend_update_queue(updates, ChatMap::new(users, chats));
181                continue;
182            }
183
184            let sleep = pin!(async { sleep_until(deadline.into()).await });
185            let step = pin!(async { self.step().await });
186
187            match select(sleep, step).await {
188                Either::Left(_) => {}
189                Either::Right((step, _)) => step?,
190            }
191        }
192    }
193
194    pub(crate) fn process_socket_updates(&self, all_updates: Vec<tl::enums::Updates>) {
195        if all_updates.is_empty() {
196            return;
197        }
198
199        let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
200        {
201            let state = &mut *self.0.state.write().unwrap();
202
203            for updates in all_updates {
204                if state
205                    .message_box
206                    .ensure_known_peer_hashes(&updates, &mut state.chat_hashes)
207                    .is_err()
208                {
209                    continue;
210                }
211                match state
212                    .message_box
213                    .process_updates(updates, &state.chat_hashes)
214                {
215                    Ok(tup) => {
216                        if let Some(res) = result.as_mut() {
217                            res.0.extend(tup.0);
218                            res.1.extend(tup.1);
219                            res.2.extend(tup.2);
220                        } else {
221                            result = Some(tup);
222                        }
223                    }
224                    Err(_) => return,
225                }
226            }
227        }
228
229        if let Some((updates, users, chats)) = result {
230            self.extend_update_queue(updates, ChatMap::new(users, chats));
231        }
232    }
233
234    fn extend_update_queue(&self, mut updates: Vec<tl::enums::Update>, chat_map: Arc<ChatMap>) {
235        let mut state = self.0.state.write().unwrap();
236
237        if let Some(limit) = self.0.config.params.update_queue_limit {
238            if let Some(exceeds) = (state.updates.len() + updates.len()).checked_sub(limit + 1) {
239                let exceeds = exceeds + 1;
240                let now = Instant::now();
241                let notify = match state.last_update_limit_warn {
242                    None => true,
243                    Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
244                };
245
246                updates.truncate(updates.len() - exceeds);
247                if notify {
248                    log::warn!(
249                        "{} updates were dropped because the update_queue_limit was exceeded",
250                        exceeds
251                    );
252                }
253
254                state.last_update_limit_warn = Some(now);
255            }
256        }
257
258        state
259            .updates
260            .extend(updates.into_iter().map(|u| (u, chat_map.clone())));
261    }
262
263    /// Synchronize the updates state to the session.
264    pub fn sync_update_state(&self) {
265        let state = self.0.state.read().unwrap();
266        self.0
267            .config
268            .session
269            .set_state(state.message_box.session_state());
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::*;
276    use core::future::Future;
277
278    fn get_client() -> Client {
279        panic!()
280    }
281
282    #[test]
283    fn ensure_next_update_future_impls_send() {
284        if false {
285            // We just want it to type-check, not actually run.
286            fn typeck(_: impl Future + Send) {}
287            typeck(get_client().next_update());
288        }
289    }
290}