Skip to main content

grammers_client/client/
updates.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Methods to deal with and offer access to updates.
10
11use std::collections::VecDeque;
12use std::time::{Duration, Instant};
13
14use grammers_mtsender::InvocationError;
15use grammers_session::Session;
16use grammers_session::types::{PeerId, PeerInfo, UpdateState, UpdatesState};
17use grammers_session::updates::{MessageBoxes, PrematureEndReason, State, UpdatesLike};
18use grammers_tl_types as tl;
19use log::{trace, warn};
20use tokio::sync::mpsc;
21use tokio::time::timeout_at;
22
23use super::{Client, UpdatesConfiguration};
24use crate::peer::PeerMap;
25use crate::update::Update;
26
27/// How long to wait after warning the user that the updates limit was exceeded.
28const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
29
30// See https://core.telegram.org/method/updates.getChannelDifference.
31const BOT_CHANNEL_DIFF_LIMIT: i32 = 100000;
32const USER_CHANNEL_DIFF_LIMIT: i32 = 100;
33
34async fn prepare_channel_difference(
35    mut request: tl::functions::updates::GetChannelDifference,
36    session: &dyn Session,
37    message_box: &mut MessageBoxes,
38) -> Option<tl::functions::updates::GetChannelDifference> {
39    let id = match &request.channel {
40        tl::enums::InputChannel::Channel(channel) => PeerId::channel_unchecked(channel.channel_id),
41        _ => unreachable!(),
42    };
43
44    if let Some(PeerInfo::Channel {
45        id,
46        auth: Some(auth),
47        ..
48    }) = session.peer(id).await
49    {
50        request.channel = tl::enums::InputChannel::Channel(tl::types::InputChannel {
51            channel_id: id,
52            access_hash: auth.hash(),
53        });
54        request.limit = if session
55            .peer(PeerId::self_user())
56            .await
57            .map(|user| match user {
58                PeerInfo::User { bot, .. } => bot.unwrap_or(false),
59                _ => false,
60            })
61            .unwrap_or(false)
62        {
63            BOT_CHANNEL_DIFF_LIMIT
64        } else {
65            USER_CHANNEL_DIFF_LIMIT
66        };
67        trace!("requesting {:?}", request);
68        Some(request)
69    } else {
70        warn!(
71            "cannot getChannelDifference for {:?} as we're missing its hash",
72            id
73        );
74        message_box.end_channel_difference(PrematureEndReason::Banned);
75        None
76    }
77}
78
79/// Iterator returned by [`Client::stream_updates`].
80pub struct UpdateStream {
81    client: Client,
82    message_box: MessageBoxes,
83    // When did we last warn the user that the update queue filled up?
84    // This is used to avoid spamming the log.
85    last_update_limit_warn: Option<Instant>,
86    buffer: VecDeque<(tl::enums::Update, State, PeerMap)>,
87    updates: mpsc::UnboundedReceiver<UpdatesLike>,
88    configuration: UpdatesConfiguration,
89    should_get_state: bool,
90}
91
92impl UpdateStream {
93    /// Pops an update from the queue, waiting for an update to arrive first if the queue is empty.
94    pub async fn next(&mut self) -> Result<Update, InvocationError> {
95        let (update, state, peers) = self.next_raw().await?;
96        Ok(Update::from_raw(&self.client, update, state, peers))
97    }
98
99    /// Pops an update from the queue, waiting for an update to arrive first if the queue is empty.
100    ///
101    /// Unlike [`Self::next`], the update is not wrapped at all, but it is still processed.
102    pub async fn next_raw(
103        &mut self,
104    ) -> Result<(tl::enums::Update, State, PeerMap), InvocationError> {
105        if self.should_get_state {
106            self.should_get_state = false;
107            match self
108                .client
109                .invoke(&tl::functions::updates::GetState {})
110                .await
111            {
112                Ok(tl::enums::updates::State::State(state)) => {
113                    self.client
114                        .0
115                        .session
116                        .set_update_state(UpdateState::All(UpdatesState {
117                            pts: state.pts,
118                            qts: state.qts,
119                            date: state.date,
120                            seq: state.seq,
121                            channels: Vec::new(),
122                        }))
123                        .await;
124                }
125                Err(_err) => {
126                    // The account may no longer actually be logged in, or it can rarely fail.
127                    // `message_box` will try to correct its state as updates arrive.
128                }
129            }
130        }
131
132        loop {
133            let (deadline, get_diff, get_channel_diff) = {
134                if let Some(update) = self.buffer.pop_front() {
135                    return Ok(update);
136                }
137                (
138                    self.message_box.check_deadlines(), // first, as it might trigger differences
139                    self.message_box.get_difference(),
140                    match self.message_box.get_channel_difference() {
141                        Some(gd) => {
142                            prepare_channel_difference(
143                                gd,
144                                self.client.0.session.as_ref(),
145                                &mut self.message_box,
146                            )
147                            .await
148                        }
149                        None => None,
150                    },
151                )
152            };
153
154            if let Some(request) = get_diff {
155                let response = self.client.invoke(&request).await?;
156                let (updates, users, chats) = self.message_box.apply_difference(response);
157                let peers = self.client.build_peer_map(users, chats).await;
158                self.extend_update_queue(updates, peers);
159                continue;
160            }
161
162            if let Some(request) = get_channel_diff {
163                let maybe_response = self.client.invoke(&request).await;
164
165                let response = match maybe_response {
166                    Ok(r) => r,
167                    Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
168                        // According to Telegram's docs:
169                        // "Channel internal replication issues, try again later (treat this like an RPC_CALL_FAIL)."
170                        // We can treat this as "empty difference" and not update the local pts.
171                        // Then this same call will be retried when another gap is detected or timeout expires.
172                        //
173                        // Another option would be to literally treat this like an RPC_CALL_FAIL and retry after a few
174                        // seconds, but if Telegram is having issues it's probably best to wait for it to send another
175                        // update (hinting it may be okay now) and retry then.
176                        //
177                        // This is a bit hacky because MessageBox doesn't really have a way to "not update" the pts.
178                        // Instead we manually extract the previously-known pts and use that.
179                        log::warn!(
180                            "Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"
181                        );
182                        {
183                            self.message_box
184                                .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
185                        }
186                        continue;
187                    }
188                    Err(e) if e.is("CHANNEL_PRIVATE") => {
189                        log::info!(
190                            "Account is now banned so we can no longer fetch updates with request: {:?}",
191                            request
192                        );
193                        {
194                            self.message_box
195                                .end_channel_difference(PrematureEndReason::Banned);
196                        }
197                        continue;
198                    }
199                    Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
200                        log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
201                        {
202                            self.message_box
203                                .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
204                        }
205                        continue;
206                    }
207                    Err(e) => return Err(e),
208                };
209
210                let (updates, users, chats) = self.message_box.apply_channel_difference(response);
211
212                let peers = self.client.build_peer_map(users, chats).await;
213                self.extend_update_queue(updates, peers);
214                continue;
215            }
216
217            match timeout_at(deadline.into(), self.updates.recv()).await {
218                Ok(Some(updates)) => self.process_socket_updates(updates).await,
219                Ok(None) => break Err(InvocationError::Dropped),
220                Err(_) => {}
221            }
222        }
223    }
224
225    pub(crate) async fn process_socket_updates(&mut self, updates: UpdatesLike) {
226        let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
227        match self.message_box.process_updates(updates) {
228            Ok(tup) => {
229                if let Some(res) = result.as_mut() {
230                    res.0.extend(tup.0);
231                    res.1.extend(tup.1);
232                    res.2.extend(tup.2);
233                } else {
234                    result = Some(tup);
235                }
236            }
237            Err(_) => return,
238        }
239
240        if let Some((updates, users, chats)) = result {
241            let peers = self.client.build_peer_map(users, chats).await;
242            self.extend_update_queue(updates, peers);
243        }
244    }
245
246    fn extend_update_queue(
247        &mut self,
248        mut updates: Vec<(tl::enums::Update, State)>,
249        peer_map: PeerMap,
250    ) {
251        if let Some(limit) = self.configuration.update_queue_limit {
252            if let Some(exceeds) = (self.buffer.len() + updates.len()).checked_sub(limit + 1) {
253                let exceeds = exceeds + 1;
254                let now = Instant::now();
255                let notify = match self.last_update_limit_warn {
256                    None => true,
257                    Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
258                };
259
260                updates.truncate(updates.len() - exceeds);
261                if notify {
262                    log::warn!(
263                        "{} updates were dropped because the update_queue_limit was exceeded",
264                        exceeds
265                    );
266                }
267
268                self.last_update_limit_warn = Some(now);
269            }
270        }
271
272        self.buffer
273            .extend(updates.into_iter().map(|(u, s)| (u, s, peer_map.handle())));
274    }
275
276    /// Synchronize the updates state to the session.
277    ///
278    /// This is **not** automatically done on drop.
279    pub async fn sync_update_state(&self) {
280        self.client
281            .0
282            .session
283            .set_update_state(UpdateState::All(self.message_box.session_state()))
284            .await;
285    }
286}
287
288impl Client {
289    /// Returns an asynchronous stream of processed updates.
290    ///
291    /// The updates are guaranteed to be in order, and any gaps will be resolved.\
292    /// **Important** to note that for gaps to be resolved, the peers must have been
293    /// persisted in the session cache beforehand (i.e. be retrievable with [`Session::peer`]).
294    /// A good way to achieve this is to use [`Self::iter_dialogs`] at least once after login.
295    ///
296    /// The updates are wrapped in [`crate::update::Update`] to make them more convenient to use,
297    /// but their raw type is still accessible to bridge any missing functionality.
298    pub async fn stream_updates(
299        &self,
300        updates: mpsc::UnboundedReceiver<UpdatesLike>,
301        configuration: UpdatesConfiguration,
302    ) -> UpdateStream {
303        let message_box = if configuration.catch_up {
304            MessageBoxes::load(self.0.session.updates_state().await)
305        } else {
306            // If the user doesn't want to bother with catching up on previous update, start with
307            // pristine state instead.
308            MessageBoxes::new()
309        };
310        // Don't bother getting pristine update state if we're not logged in.
311        let should_get_state =
312            message_box.is_empty() && self.0.session.peer(PeerId::self_user()).await.is_some();
313
314        UpdateStream {
315            client: self.clone(),
316            message_box,
317            last_update_limit_warn: None,
318            buffer: VecDeque::new(),
319            updates,
320            configuration,
321            should_get_state,
322        }
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use core::future::Future;
329
330    use super::*;
331
332    fn get_update_stream() -> UpdateStream {
333        panic!()
334    }
335
336    #[test]
337    fn ensure_next_update_future_impls_send() {
338        if false {
339            // We just want it to type-check, not actually run.
340            fn typeck(_: impl Future + Send) {}
341            typeck(get_update_stream().next());
342        }
343    }
344}