grammers_client/client/
updates.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Methods to deal with and offer access to updates.
10
11#![allow(deprecated)]
12
13use super::{Client, UpdatesConfiguration};
14use crate::types::{PeerMap, Update};
15use grammers_mtsender::InvocationError;
16use grammers_session::PeerAuthCache;
17use grammers_session::defs::{PeerId, UpdateState, UpdatesState};
18pub use grammers_session::updates::{MessageBoxes, PrematureEndReason, State, UpdatesLike};
19use grammers_tl_types as tl;
20use log::{trace, warn};
21use std::collections::VecDeque;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::mpsc;
25use tokio::time::timeout_at;
26
27/// How long to wait after warning the user that the updates limit was exceeded.
28const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
29
30// See https://core.telegram.org/method/updates.getChannelDifference.
31const BOT_CHANNEL_DIFF_LIMIT: i32 = 100000;
32const USER_CHANNEL_DIFF_LIMIT: i32 = 100;
33
34fn prepare_channel_difference(
35    mut request: tl::functions::updates::GetChannelDifference,
36    peer_auths: &PeerAuthCache,
37    message_box: &mut MessageBoxes,
38) -> Option<tl::functions::updates::GetChannelDifference> {
39    let id = match &request.channel {
40        tl::enums::InputChannel::Channel(channel) => PeerId::channel(channel.channel_id),
41        _ => unreachable!(),
42    };
43
44    if let Some(peer) = peer_auths.get(id) {
45        request.channel = peer.into();
46        request.limit = if peer_auths.is_self_bot() {
47            BOT_CHANNEL_DIFF_LIMIT
48        } else {
49            USER_CHANNEL_DIFF_LIMIT
50        };
51        trace!("requesting {:?}", request);
52        Some(request)
53    } else {
54        warn!(
55            "cannot getChannelDifference for {:?} as we're missing its hash",
56            id
57        );
58        message_box.end_channel_difference(PrematureEndReason::Banned);
59        None
60    }
61}
62
63pub struct UpdateStream {
64    client: Client,
65    message_box: MessageBoxes,
66    peer_auths: PeerAuthCache,
67    // When did we last warn the user that the update queue filled up?
68    // This is used to avoid spamming the log.
69    last_update_limit_warn: Option<Instant>,
70    buffer: VecDeque<(tl::enums::Update, State, Arc<crate::types::PeerMap>)>,
71    updates: mpsc::UnboundedReceiver<UpdatesLike>,
72    configuration: UpdatesConfiguration,
73    should_get_state: bool,
74}
75
76impl UpdateStream {
77    pub async fn next(&mut self) -> Result<Update, InvocationError> {
78        let (update, state, peers) = self.next_raw().await?;
79        Ok(Update::new(&self.client, update, state, &peers))
80    }
81
82    pub async fn next_raw(
83        &mut self,
84    ) -> Result<(tl::enums::Update, State, Arc<PeerMap>), InvocationError> {
85        if self.should_get_state {
86            self.should_get_state = false;
87            match self
88                .client
89                .invoke(&tl::functions::updates::GetState {})
90                .await
91            {
92                Ok(tl::enums::updates::State::State(state)) => {
93                    self.client
94                        .0
95                        .session
96                        .set_update_state(UpdateState::All(UpdatesState {
97                            pts: state.pts,
98                            qts: state.qts,
99                            date: state.date,
100                            seq: state.seq,
101                            channels: Vec::new(),
102                        }));
103                }
104                Err(_err) => {
105                    // The account may no longer actually be logged in, or it can rarely fail.
106                    // `message_box` will try to correct its state as updates arrive.
107                }
108            }
109        }
110
111        loop {
112            let (deadline, get_diff, get_channel_diff) = {
113                if let Some(update) = self.buffer.pop_front() {
114                    return Ok(update);
115                }
116                (
117                    self.message_box.check_deadlines(), // first, as it might trigger differences
118                    self.message_box.get_difference(),
119                    self.message_box.get_channel_difference().and_then(|gd| {
120                        prepare_channel_difference(gd, &self.peer_auths, &mut self.message_box)
121                    }),
122                )
123            };
124
125            if let Some(request) = get_diff {
126                let response = self.client.invoke(&request).await?;
127                let (updates, users, chats) = self.message_box.apply_difference(response);
128                let _ = self.peer_auths.extend(&users, &chats);
129                self.extend_update_queue(updates, PeerMap::new(users, chats));
130                continue;
131            }
132
133            if let Some(request) = get_channel_diff {
134                let maybe_response = self.client.invoke(&request).await;
135
136                let response = match maybe_response {
137                    Ok(r) => r,
138                    Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
139                        // According to Telegram's docs:
140                        // "Channel internal replication issues, try again later (treat this like an RPC_CALL_FAIL)."
141                        // We can treat this as "empty difference" and not update the local pts.
142                        // Then this same call will be retried when another gap is detected or timeout expires.
143                        //
144                        // Another option would be to literally treat this like an RPC_CALL_FAIL and retry after a few
145                        // seconds, but if Telegram is having issues it's probably best to wait for it to send another
146                        // update (hinting it may be okay now) and retry then.
147                        //
148                        // This is a bit hacky because MessageBox doesn't really have a way to "not update" the pts.
149                        // Instead we manually extract the previously-known pts and use that.
150                        log::warn!(
151                            "Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"
152                        );
153                        {
154                            self.message_box
155                                .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
156                        }
157                        continue;
158                    }
159                    Err(e) if e.is("CHANNEL_PRIVATE") => {
160                        log::info!(
161                            "Account is now banned so we can no longer fetch updates with request: {:?}",
162                            request
163                        );
164                        {
165                            self.message_box
166                                .end_channel_difference(PrematureEndReason::Banned);
167                        }
168                        continue;
169                    }
170                    Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
171                        log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
172                        {
173                            self.message_box
174                                .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
175                        }
176                        continue;
177                    }
178                    Err(e) => return Err(e),
179                };
180
181                let (updates, users, chats) = self.message_box.apply_channel_difference(response);
182                let _ = self.peer_auths.extend(&users, &chats);
183
184                self.extend_update_queue(updates, PeerMap::new(users, chats));
185                continue;
186            }
187
188            match timeout_at(deadline.into(), self.updates.recv()).await {
189                Ok(Some(updates)) => self.process_socket_updates(updates),
190                Ok(None) => break Err(InvocationError::Dropped),
191                Err(_) => {}
192            }
193        }
194    }
195
196    pub(crate) fn process_socket_updates(&mut self, updates: UpdatesLike) {
197        let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
198        match self.message_box.process_updates(updates) {
199            Ok(tup) => {
200                if let Some(res) = result.as_mut() {
201                    res.0.extend(tup.0);
202                    res.1.extend(tup.1);
203                    res.2.extend(tup.2);
204                } else {
205                    result = Some(tup);
206                }
207            }
208            Err(_) => return,
209        }
210
211        if let Some((updates, users, chats)) = result {
212            self.extend_update_queue(updates, PeerMap::new(users, chats));
213        }
214    }
215
216    fn extend_update_queue(
217        &mut self,
218        mut updates: Vec<(tl::enums::Update, State)>,
219        peer_map: Arc<PeerMap>,
220    ) {
221        if let Some(limit) = self.configuration.update_queue_limit {
222            if let Some(exceeds) = (self.buffer.len() + updates.len()).checked_sub(limit + 1) {
223                let exceeds = exceeds + 1;
224                let now = Instant::now();
225                let notify = match self.last_update_limit_warn {
226                    None => true,
227                    Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
228                };
229
230                updates.truncate(updates.len() - exceeds);
231                if notify {
232                    log::warn!(
233                        "{} updates were dropped because the update_queue_limit was exceeded",
234                        exceeds
235                    );
236                }
237
238                self.last_update_limit_warn = Some(now);
239            }
240        }
241
242        self.buffer
243            .extend(updates.into_iter().map(|(u, s)| (u, s, peer_map.clone())));
244    }
245
246    /// Synchronize the updates state to the session.
247    pub fn sync_update_state(&self) {
248        self.client
249            .0
250            .session
251            .set_update_state(UpdateState::All(self.message_box.session_state()));
252    }
253}
254
255impl Drop for UpdateStream {
256    fn drop(&mut self) {
257        self.sync_update_state();
258    }
259}
260
261impl Client {
262    /// Returns an asynchronous stream of processed updates.
263    ///
264    /// The updates are guaranteed to be in order, and any gaps will be resolved.
265    ///
266    /// The updates are wrapped in [`crate::Update`] to make them more convenient to use,
267    /// but their raw type is still accessible to bridge any missing functionality.
268    pub fn stream_updates(
269        &self,
270        updates: mpsc::UnboundedReceiver<UpdatesLike>,
271        configuration: UpdatesConfiguration,
272    ) -> UpdateStream {
273        let message_box = if configuration.catch_up {
274            MessageBoxes::load(self.0.session.updates_state())
275        } else {
276            // If the user doesn't want to bother with catching up on previous update, start with
277            // pristine state instead.
278            MessageBoxes::new()
279        };
280        // Don't bother getting pristine update state if we're not logged in.
281        let should_get_state =
282            message_box.is_empty() && self.0.session.peer(PeerId::self_user()).is_some();
283
284        UpdateStream {
285            client: self.clone(),
286            message_box,
287            peer_auths: PeerAuthCache::new(None),
288            last_update_limit_warn: None,
289            buffer: VecDeque::new(),
290            updates,
291            configuration,
292            should_get_state,
293        }
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use core::future::Future;
301
302    fn get_update_stream() -> UpdateStream {
303        panic!()
304    }
305
306    #[test]
307    fn ensure_next_update_future_impls_send() {
308        if false {
309            // We just want it to type-check, not actually run.
310            fn typeck(_: impl Future + Send) {}
311            typeck(get_update_stream().next());
312        }
313    }
314}