Skip to main content

grammers_client/client/
updates.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! Methods to deal with and offer access to updates.
10
11use std::collections::VecDeque;
12use std::time::{Duration, Instant};
13
14use grammers_mtsender::InvocationError;
15use grammers_session::ErasedSession;
16use grammers_session::types::{PeerId, PeerInfo, UpdateState, UpdatesState};
17use grammers_session::updates::{MessageBoxes, PrematureEndReason, State, UpdatesLike};
18use grammers_tl_types as tl;
19use log::{trace, warn};
20use tokio::sync::mpsc;
21use tokio::time::timeout_at;
22
23use super::{Client, UpdatesConfiguration};
24use crate::peer::PeerMap;
25use crate::update::Update;
26
27/// How long to wait after warning the user that the updates limit was exceeded.
28const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
29
30// See https://core.telegram.org/method/updates.getChannelDifference.
31const BOT_CHANNEL_DIFF_LIMIT: i32 = 100000;
32const USER_CHANNEL_DIFF_LIMIT: i32 = 100;
33
34async fn prepare_channel_difference(
35    mut request: tl::functions::updates::GetChannelDifference,
36    session: &ErasedSession,
37    message_box: &mut MessageBoxes,
38) -> Result<
39    Option<tl::functions::updates::GetChannelDifference>,
40    Box<dyn std::error::Error + Send + Sync>,
41> {
42    let id = match &request.channel {
43        tl::enums::InputChannel::Channel(channel) => PeerId::channel_unchecked(channel.channel_id),
44        _ => unreachable!(),
45    };
46
47    if let Some(PeerInfo::Channel {
48        id,
49        auth: Some(auth),
50        ..
51    }) = session.peer(id).await?
52    {
53        request.channel = tl::enums::InputChannel::Channel(tl::types::InputChannel {
54            channel_id: id,
55            access_hash: auth.hash(),
56        });
57        request.limit = if session
58            .peer(PeerId::self_user())
59            .await?
60            .map(|user| match user {
61                PeerInfo::User { bot, .. } => bot.unwrap_or(false),
62                _ => false,
63            })
64            .unwrap_or(false)
65        {
66            BOT_CHANNEL_DIFF_LIMIT
67        } else {
68            USER_CHANNEL_DIFF_LIMIT
69        };
70        trace!("requesting {:?}", request);
71        Ok(Some(request))
72    } else {
73        warn!(
74            "cannot getChannelDifference for {:?} as we're missing its hash",
75            id
76        );
77        message_box.end_channel_difference(PrematureEndReason::Banned);
78        Ok(None)
79    }
80}
81
82/// Iterator returned by [`Client::stream_updates`].
83pub struct UpdateStream {
84    client: Client,
85    message_box: MessageBoxes,
86    // When did we last warn the user that the update queue filled up?
87    // This is used to avoid spamming the log.
88    last_update_limit_warn: Option<Instant>,
89    buffer: VecDeque<(tl::enums::Update, State, PeerMap)>,
90    updates: mpsc::UnboundedReceiver<UpdatesLike>,
91    configuration: UpdatesConfiguration,
92    should_get_state: bool,
93}
94
95impl UpdateStream {
96    /// Pops an update from the queue, waiting for an update to arrive first if the queue is empty.
97    pub async fn next(&mut self) -> Result<Update, InvocationError> {
98        let (update, state, peers) = self.next_raw().await?;
99        Ok(Update::from_raw(&self.client, update, state, peers))
100    }
101
102    /// Pops an update from the queue, waiting for an update to arrive first if the queue is empty.
103    ///
104    /// Unlike [`Self::next`], the update is not wrapped at all, but it is still processed.
105    pub async fn next_raw(
106        &mut self,
107    ) -> Result<(tl::enums::Update, State, PeerMap), InvocationError> {
108        if self.should_get_state {
109            self.should_get_state = false;
110            match self
111                .client
112                .invoke(&tl::functions::updates::GetState {})
113                .await
114            {
115                Ok(tl::enums::updates::State::State(state)) => {
116                    self.client
117                        .0
118                        .session
119                        .set_update_state(UpdateState::All(UpdatesState {
120                            pts: state.pts,
121                            qts: state.qts,
122                            date: state.date,
123                            seq: state.seq,
124                            channels: Vec::new(),
125                        }))
126                        .await?;
127                    self.message_box.set_state(state);
128                }
129                Err(_err) => {
130                    // The account may no longer actually be logged in, or it can rarely fail.
131                    // `message_box` will try to correct its state as updates arrive.
132                }
133            }
134        }
135
136        loop {
137            let (deadline, get_diff, get_channel_diff) = {
138                if let Some(update) = self.buffer.pop_front() {
139                    return Ok(update);
140                }
141                (
142                    self.message_box.check_deadlines(), // first, as it might trigger differences
143                    self.message_box.get_difference(),
144                    match self.message_box.get_channel_difference() {
145                        Some(gd) => {
146                            prepare_channel_difference(
147                                gd,
148                                self.client.0.session.as_ref(),
149                                &mut self.message_box,
150                            )
151                            .await?
152                        }
153                        None => None,
154                    },
155                )
156            };
157
158            if let Some(request) = get_diff {
159                let response = self.client.invoke(&request).await?;
160                let (updates, users, chats) = self.message_box.apply_difference(response);
161                let peers = self.client.build_peer_map(users, chats).await;
162                self.extend_update_queue(updates, peers);
163                continue;
164            }
165
166            if let Some(request) = get_channel_diff {
167                let maybe_response = self.client.invoke(&request).await;
168
169                let response = match maybe_response {
170                    Ok(r) => r,
171                    Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
172                        // According to Telegram's docs:
173                        // "Channel internal replication issues, try again later (treat this like an RPC_CALL_FAIL)."
174                        // We can treat this as "empty difference" and not update the local pts.
175                        // Then this same call will be retried when another gap is detected or timeout expires.
176                        //
177                        // Another option would be to literally treat this like an RPC_CALL_FAIL and retry after a few
178                        // seconds, but if Telegram is having issues it's probably best to wait for it to send another
179                        // update (hinting it may be okay now) and retry then.
180                        //
181                        // This is a bit hacky because MessageBox doesn't really have a way to "not update" the pts.
182                        // Instead we manually extract the previously-known pts and use that.
183                        log::warn!(
184                            "Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"
185                        );
186                        {
187                            self.message_box
188                                .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
189                        }
190                        continue;
191                    }
192                    Err(e) if e.is("CHANNEL_PRIVATE") => {
193                        log::info!(
194                            "Account is now banned so we can no longer fetch updates with request: {:?}",
195                            request
196                        );
197                        {
198                            self.message_box
199                                .end_channel_difference(PrematureEndReason::Banned);
200                        }
201                        continue;
202                    }
203                    Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
204                        log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
205                        {
206                            self.message_box
207                                .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
208                        }
209                        continue;
210                    }
211                    Err(e) => return Err(e),
212                };
213
214                let (updates, users, chats) = self.message_box.apply_channel_difference(response);
215
216                let peers = self.client.build_peer_map(users, chats).await;
217                self.extend_update_queue(updates, peers);
218                continue;
219            }
220
221            match timeout_at(deadline.into(), self.updates.recv()).await {
222                Ok(Some(updates)) => self.process_socket_updates(updates).await?,
223                Ok(None) => break Err(InvocationError::Dropped),
224                Err(_) => {}
225            }
226        }
227    }
228
229    pub(crate) async fn process_socket_updates(
230        &mut self,
231        updates: UpdatesLike,
232    ) -> Result<(), InvocationError> {
233        let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
234        match self.message_box.process_updates(updates) {
235            Ok(tup) => {
236                if let Some(res) = result.as_mut() {
237                    res.0.extend(tup.0);
238                    res.1.extend(tup.1);
239                    res.2.extend(tup.2);
240                } else {
241                    result = Some(tup);
242                }
243            }
244            Err(_) => return Ok(()),
245        }
246
247        if let Some((updates, users, chats)) = result {
248            let peers = self.client.build_peer_map(users, chats).await;
249            self.extend_update_queue(updates, peers);
250        }
251
252        Ok(())
253    }
254
255    fn extend_update_queue(
256        &mut self,
257        mut updates: Vec<(tl::enums::Update, State)>,
258        peer_map: PeerMap,
259    ) {
260        if let Some(limit) = self.configuration.update_queue_limit {
261            if let Some(exceeds) = (self.buffer.len() + updates.len()).checked_sub(limit + 1) {
262                let exceeds = exceeds + 1;
263                let now = Instant::now();
264                let notify = match self.last_update_limit_warn {
265                    None => true,
266                    Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
267                };
268
269                updates.truncate(updates.len() - exceeds);
270                if notify {
271                    log::warn!(
272                        "{} updates were dropped because the update_queue_limit was exceeded",
273                        exceeds
274                    );
275                }
276
277                self.last_update_limit_warn = Some(now);
278            }
279        }
280
281        self.buffer
282            .extend(updates.into_iter().map(|(u, s)| (u, s, peer_map.handle())));
283    }
284
285    /// Synchronize the updates state to the session.
286    ///
287    /// This is **not** automatically done on drop.
288    pub async fn sync_update_state(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
289        self.client
290            .0
291            .session
292            .set_update_state(UpdateState::All(self.message_box.session_state()))
293            .await?;
294        Ok(())
295    }
296}
297
298impl Client {
299    /// Returns an asynchronous stream of processed updates.
300    ///
301    /// The updates are guaranteed to be in order, and any gaps will be resolved.\
302    /// **Important** to note that for gaps to be resolved, the peers must have been
303    /// persisted in the session cache beforehand (i.e. be retrievable with [`Session::peer`]).
304    /// A good way to achieve this is to use [`Self::iter_dialogs`] at least once after login.
305    ///
306    /// The updates are wrapped in [`crate::update::Update`] to make them more convenient to use,
307    /// but their raw type is still accessible to bridge any missing functionality.
308    pub async fn stream_updates(
309        &self,
310        updates: mpsc::UnboundedReceiver<UpdatesLike>,
311        configuration: UpdatesConfiguration,
312    ) -> Result<UpdateStream, Box<dyn std::error::Error + Send + Sync>> {
313        let message_box = if configuration.catch_up {
314            MessageBoxes::load(self.0.session.updates_state().await?)
315        } else {
316            // If the user doesn't want to bother with catching up on previous update, start with
317            // pristine state instead.
318            MessageBoxes::new()
319        };
320        // Don't bother getting pristine update state if we're not logged in.
321        let should_get_state =
322            message_box.is_empty() && self.0.session.peer(PeerId::self_user()).await?.is_some();
323
324        Ok(UpdateStream {
325            client: self.clone(),
326            message_box,
327            last_update_limit_warn: None,
328            buffer: VecDeque::new(),
329            updates,
330            configuration,
331            should_get_state,
332        })
333    }
334}
335
336#[cfg(test)]
337mod tests {
338    use core::future::Future;
339
340    use super::*;
341
342    fn get_update_stream() -> UpdateStream {
343        panic!()
344    }
345
346    #[test]
347    fn ensure_next_update_future_impls_send() {
348        if false {
349            // We just want it to type-check, not actually run.
350            fn typeck(_: impl Future + Send) {}
351            typeck(get_update_stream().next());
352        }
353    }
354}