grammers_session/message_box/
mod.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! This module deals with correct handling of updates, including gaps, and knowing when the code
10//! should "get difference" (the set of updates that the client should know by now minus the set
11//! of updates that it actually knows).
12//!
13//! Each chat has its own [`Entry`] in the [`MessageBox`] (this `struct` is the "entry point").
14//! At any given time, the message box may be either getting difference for them (entry is in
15//! [`MessageBox::getting_diff_for`]) or not. If not getting difference, a possible gap may be
16//! found for the updates (entry is in [`MessageBox::possible_gaps`]). Otherwise, the entry is
17//! on its happy path.
18//!
19//! Gaps are cleared when they are either resolved on their own (by waiting for a short time)
20//! or because we got the difference for the corresponding entry.
21//!
22//! While there are entries for which their difference must be fetched,
23//! [`MessageBox::check_deadlines`] will always return [`Instant::now`], since "now" is the time
24//! to get the difference.
25mod adaptor;
26mod defs;
27
28use super::ChatHashCache;
29use crate::generated::enums::ChannelState as ChannelStateEnum;
30use crate::generated::types::ChannelState;
31use crate::message_box::defs::PossibleGap;
32use crate::UpdateState;
33pub(crate) use defs::Entry;
34pub use defs::{Gap, MessageBox};
35use defs::{PtsInfo, State, NO_DATE, NO_PTS, NO_SEQ, POSSIBLE_GAP_TIMEOUT};
36use grammers_tl_types as tl;
37use log::{debug, info, trace, warn};
38use std::cmp::Ordering;
39use std::collections::{HashMap, HashSet};
40use std::mem;
41use std::time::{Duration, Instant};
42use tl::enums::InputChannel;
43
44fn next_updates_deadline() -> Instant {
45    Instant::now() + defs::NO_UPDATES_TIMEOUT
46}
47
48#[allow(clippy::new_without_default)]
49/// Creation, querying, and setting base state.
50impl MessageBox {
51    /// Create a new, empty [`MessageBox`].
52    ///
53    /// This is the only way it may return `true` from [`MessageBox::is_empty`].
54    pub fn new() -> Self {
55        trace!("created new message box with no previous state");
56        Self {
57            map: HashMap::new(),
58            date: 1, // non-zero or getting difference will fail
59            seq: NO_SEQ,
60            possible_gaps: HashMap::new(),
61            getting_diff_for: HashSet::new(),
62            next_deadline: None,
63            tmp_entries: HashSet::new(),
64        }
65    }
66
67    /// Create a [`MessageBox`] from a previously known update state.
68    pub fn load(state: UpdateState) -> Self {
69        trace!("created new message box with state: {:?}", state);
70        let deadline = next_updates_deadline();
71        let mut map = HashMap::with_capacity(2 + state.channels.len());
72        map.insert(
73            Entry::AccountWide,
74            State {
75                pts: state.pts,
76                deadline,
77            },
78        );
79        map.insert(
80            Entry::SecretChats,
81            State {
82                pts: state.qts,
83                deadline,
84            },
85        );
86        map.extend(state.channels.iter().map(|ChannelStateEnum::State(c)| {
87            (
88                Entry::Channel(c.channel_id),
89                State {
90                    pts: c.pts,
91                    deadline,
92                },
93            )
94        }));
95
96        Self {
97            map,
98            date: state.date,
99            seq: state.seq,
100            possible_gaps: HashMap::new(),
101            getting_diff_for: HashSet::new(),
102            next_deadline: Some(Entry::AccountWide),
103            tmp_entries: HashSet::new(),
104        }
105    }
106
107    /// Return the current state in a format that sessions understand.
108    ///
109    /// This should be used for persisting the state.
110    pub fn session_state(&self) -> UpdateState {
111        UpdateState {
112            pts: self
113                .map
114                .get(&Entry::AccountWide)
115                .map(|s| s.pts)
116                .unwrap_or(NO_PTS),
117            qts: self
118                .map
119                .get(&Entry::SecretChats)
120                .map(|s| s.pts)
121                .unwrap_or(NO_PTS),
122            date: self.date,
123            seq: self.seq,
124            channels: self
125                .map
126                .iter()
127                .filter_map(|(entry, s)| match entry {
128                    Entry::Channel(id) => Some(
129                        ChannelState {
130                            channel_id: *id,
131                            pts: s.pts,
132                        }
133                        .into(),
134                    ),
135                    _ => None,
136                })
137                .collect(),
138        }
139    }
140
141    /// Return true if the message box is empty and has no state yet.
142    pub fn is_empty(&self) -> bool {
143        self.map
144            .get(&Entry::AccountWide)
145            .map(|s| s.pts)
146            .unwrap_or(NO_PTS)
147            == NO_PTS
148    }
149
150    /// Return the next deadline when receiving updates should timeout.
151    ///
152    /// If a deadline expired, the corresponding entries will be marked as needing to get its difference.
153    /// While there are entries pending of getting their difference, this method returns the current instant.
154    pub fn check_deadlines(&mut self) -> Instant {
155        let now = Instant::now();
156
157        if !self.getting_diff_for.is_empty() {
158            return now;
159        }
160
161        let deadline = next_updates_deadline();
162
163        // Most of the time there will be zero or one gap in flight so finding the minimum is cheap.
164        let deadline =
165            if let Some(gap_deadline) = self.possible_gaps.values().map(|gap| gap.deadline).min() {
166                deadline.min(gap_deadline)
167            } else if let Some(state) = self.next_deadline.and_then(|entry| self.map.get(&entry)) {
168                deadline.min(state.deadline)
169            } else {
170                deadline
171            };
172
173        if now >= deadline {
174            // Check all expired entries and add them to the list that needs getting difference.
175            self.getting_diff_for
176                .extend(self.possible_gaps.iter().filter_map(|(entry, gap)| {
177                    if now >= gap.deadline {
178                        info!("gap was not resolved after waiting for {:?}", entry);
179                        Some(entry)
180                    } else {
181                        None
182                    }
183                }));
184
185            self.getting_diff_for
186                .extend(self.map.iter().filter_map(|(entry, state)| {
187                    if now >= state.deadline {
188                        debug!("too much time has passed without updates for {:?}", entry);
189                        Some(entry)
190                    } else {
191                        None
192                    }
193                }));
194
195            // When extending `getting_diff_for`, it's important to have the moral equivalent of
196            // `begin_get_diff` (that is, clear possible gaps if we're now getting difference).
197            let possible_gaps = &mut self.possible_gaps;
198            self.getting_diff_for.iter().for_each(|entry| {
199                possible_gaps.remove(entry);
200            });
201        }
202
203        deadline
204    }
205
206    /// Reset the deadline for the periods without updates for all input entries.
207    ///
208    /// It also updates the next deadline time to reflect the new closest deadline.
209    fn reset_deadlines(&mut self, entries: &HashSet<Entry>, deadline: Instant) {
210        if entries.is_empty() {
211            return;
212        }
213        for entry in entries {
214            if let Some(state) = self.map.get_mut(entry) {
215                state.deadline = deadline;
216                debug!("reset deadline {:?} for {:?}", deadline, entry);
217            } else {
218                panic!("did not reset deadline for {entry:?} as it had no entry");
219            }
220        }
221
222        if self
223            .next_deadline
224            .as_ref()
225            .map(|next| entries.contains(next))
226            .unwrap_or(false)
227        {
228            // If the updated deadline was the closest one, recalculate the new minimum.
229            self.next_deadline = Some(
230                self.map
231                    .iter()
232                    .min_by_key(|(_, state)| state.deadline)
233                    .map(|i| *i.0)
234                    .expect("deadline should exist"),
235            );
236        } else if self
237            .next_deadline
238            .map(|e| deadline < self.map[&e].deadline)
239            .unwrap_or(false)
240        {
241            // If the updated deadline is smaller than the next deadline, change the next deadline to be the new one.
242            // An unrelated deadline was updated, so the closest one remains unchanged.
243            // Any entry will do, as they all share the same new deadline.
244            self.next_deadline = Some(*entries.iter().next().unwrap());
245        }
246    }
247
248    /// Convenience to reset a single entry's deadline.
249    fn reset_deadline(&mut self, entry: Entry, deadline: Instant) {
250        let mut entries = mem::take(&mut self.tmp_entries);
251        entries.insert(entry);
252        self.reset_deadlines(&entries, deadline);
253        entries.clear();
254        self.tmp_entries = entries;
255    }
256
257    /// Convenience to reset a channel's deadline, with optional timeout.
258    fn reset_channel_deadline(&mut self, channel_id: i64, timeout: Option<i32>) {
259        self.reset_deadline(
260            Entry::Channel(channel_id),
261            Instant::now()
262                + timeout
263                    .map(|t| Duration::from_secs(t as _))
264                    .unwrap_or(defs::NO_UPDATES_TIMEOUT),
265        );
266    }
267
268    /// Sets the update state.
269    ///
270    /// Should be called right after login if [`MessageBox::new`] was used, otherwise undesirable
271    /// updates will be fetched.
272    pub fn set_state(&mut self, state: tl::enums::updates::State) {
273        trace!("setting state {:?}", state);
274        let deadline = next_updates_deadline();
275        let state: tl::types::updates::State = state.into();
276        self.map.insert(
277            Entry::AccountWide,
278            State {
279                pts: state.pts,
280                deadline,
281            },
282        );
283        self.map.insert(
284            Entry::SecretChats,
285            State {
286                pts: state.qts,
287                deadline,
288            },
289        );
290        self.date = state.date;
291        self.seq = state.seq;
292    }
293
294    /// Like [`MessageBox::set_state`], but for channels. Useful when getting dialogs.
295    ///
296    /// The update state will only be updated if no entry was known previously.
297    pub fn try_set_channel_state(&mut self, id: i64, pts: i32) {
298        trace!("trying to set channel state for {}: {}", id, pts);
299        self.map.entry(Entry::Channel(id)).or_insert_with(|| State {
300            pts,
301            deadline: next_updates_deadline(),
302        });
303    }
304
305    /// Try to begin getting difference for the given entry.
306    /// Fails if the entry does not have a previously-known state that can be used to get its difference.
307    ///
308    /// Clears any previous gaps.
309    fn try_begin_get_diff(&mut self, entry: Entry) {
310        if !self.map.contains_key(&entry) {
311            // Won't actually be able to get difference for this entry if we don't have a pts to start off from.
312            if self.possible_gaps.contains_key(&entry) {
313                panic!(
314                    "Should not have a possible_gap for an entry {entry:?} not in the state map"
315                );
316            }
317            return;
318        }
319
320        self.getting_diff_for.insert(entry);
321        self.possible_gaps.remove(&entry);
322    }
323
324    /// Finish getting difference for the given entry.
325    ///
326    /// It also resets the deadline.
327    fn end_get_diff(&mut self, entry: Entry) {
328        if !self.getting_diff_for.remove(&entry) {
329            panic!("Called end_get_diff on an entry which was not getting diff for");
330        };
331        self.reset_deadline(entry, next_updates_deadline());
332        assert!(
333            !self.possible_gaps.contains_key(&entry),
334            "gaps shouldn't be created while getting difference"
335        );
336    }
337}
338
339// "Normal" updates flow (processing and detection of gaps).
340impl MessageBox {
341    /// Make sure all peer hashes contained in the update are known by the client
342    /// (either by checking if they were already known, or by extending the hash cache
343    /// with those that were not known).
344    ///
345    /// If a peer is found, but it doesn't contain a non-`min` hash and no hash for it
346    /// is known, it is treated as a gap.
347    pub fn ensure_known_peer_hashes(
348        &mut self,
349        updates: &tl::enums::Updates,
350        chat_hashes: &mut ChatHashCache,
351    ) -> Result<(), Gap> {
352        // In essence, "min constructors suck".
353        // Apparently, TDLib just does `getDifference` if encounters non-cached min peers.
354        // So rather than using the `inputPeer*FromMessage` (which not only are considerably
355        // larger but may need to be nested, and may stop working if the message is gone),
356        // just treat it as a gap when encountering peers for which the hash is not known.
357        // Context: https://t.me/tdlibchat/15096.
358        if chat_hashes.extend_from_updates(updates) {
359            Ok(())
360        } else {
361            // However, some updates do not change the pts, so attempting to recover the gap
362            // will just result in an empty result from `getDifference` (being just wasteful).
363            // Check if this update has any pts we can try to recover from.
364            let can_recover = match updates {
365                tl::enums::Updates::TooLong => true,
366                tl::enums::Updates::UpdateShortMessage(_) => true,
367                tl::enums::Updates::UpdateShortChatMessage(_) => true,
368                tl::enums::Updates::UpdateShort(u) => PtsInfo::from_update(&u.update).is_some(),
369                tl::enums::Updates::Combined(_) => true,
370                tl::enums::Updates::Updates(_) => true,
371                tl::enums::Updates::UpdateShortSentMessage(_) => true,
372            };
373
374            if can_recover {
375                info!("received an update referencing an unknown peer, treating as gap");
376                self.try_begin_get_diff(Entry::AccountWide);
377                Err(Gap)
378            } else {
379                info!("received an update referencing an unknown peer, but cannot find out who");
380                Ok(())
381            }
382        }
383    }
384
385    /// Process an update and return what should be done with it.
386    ///
387    /// Updates corresponding to entries for which their difference is currently being fetched
388    /// will be ignored. While according to the [updates' documentation]:
389    ///
390    /// > Implementations \[have\] to postpone updates received via the socket while
391    /// > filling gaps in the event and `Update` sequences, as well as avoid filling
392    /// > gaps in the same sequence.
393    ///
394    /// In practice, these updates should have also been retrieved through getting difference.
395    ///
396    /// [updates' documentation]: https://core.telegram.org/api/updates
397    pub fn process_updates(
398        &mut self,
399        updates: tl::enums::Updates,
400        chat_hashes: &ChatHashCache,
401    ) -> Result<defs::UpdateAndPeers, Gap> {
402        trace!("processing updates: {:?}", updates);
403        // Top level, when handling received `updates` and `updatesCombined`.
404        // `updatesCombined` groups all the fields we care about, which is why we use it.
405        //
406        // This assumes all access hashes are already known to the client (socket updates are
407        // expected to use `ensure_known_peer_hashes`, and the result from getting difference
408        // has to deal with the peers in a different way).
409        let tl::types::UpdatesCombined {
410            date,
411            seq_start,
412            seq,
413            mut updates,
414            users,
415            chats,
416        } = match adaptor::adapt(updates, chat_hashes) {
417            Ok(combined) => combined,
418            Err(Gap) => {
419                self.try_begin_get_diff(Entry::AccountWide);
420                return Err(Gap);
421            }
422        };
423
424        // > For all the other [not `updates` or `updatesCombined`] `Updates` type constructors
425        // > there is no need to check `seq` or change a local state.
426        if seq_start != NO_SEQ {
427            match (self.seq + 1).cmp(&seq_start) {
428                // Apply
429                Ordering::Equal => {}
430                // Ignore
431                Ordering::Greater => {
432                    debug!(
433                        "skipping updates that were already handled at seq = {}",
434                        self.seq
435                    );
436                    return Ok((Vec::new(), users, chats));
437                }
438                Ordering::Less => {
439                    debug!(
440                        "gap detected (local seq {}, remote seq {})",
441                        self.seq, seq_start
442                    );
443                    self.try_begin_get_diff(Entry::AccountWide);
444                    return Err(Gap);
445                }
446            }
447        }
448
449        fn update_sort_key(update: &tl::enums::Update) -> i32 {
450            match PtsInfo::from_update(update) {
451                Some(pts) => pts.pts - pts.pts_count,
452                None => NO_PTS,
453            }
454        }
455
456        // Telegram can send updates out of order (e.g. `ReadChannelInbox` first
457        // and then `NewChannelMessage`, both with the same `pts`, but the `count`
458        // is `0` and `1` respectively), so we sort them first.
459        updates.sort_by_key(update_sort_key);
460
461        // Adding `possible_gaps.len()` is a guesstimate. Often it's just one update.
462        let mut result = Vec::with_capacity(updates.len() + self.possible_gaps.len());
463
464        // This loop does a lot at once to reduce the amount of times we need to iterate over
465        // the updates as an optimization.
466        //
467        // It mutates the local pts state, remembers possible gaps, builds a set of entries for
468        // which the deadlines should be reset, and determines whether any local pts was changed
469        // so that the seq can be updated too (which could otherwise have been done earlier).
470        let mut any_pts_applied = false;
471        let mut reset_deadlines_for = mem::take(&mut self.tmp_entries);
472        for update in updates {
473            let (entry, update) = self.apply_pts_info(update);
474            if let Some(entry) = entry {
475                // As soon as we receive an update of any form related to messages (has `PtsInfo`),
476                // the "no updates" period for that entry is reset. All the deadlines are reset at
477                // once via the temporary entries buffer as an optimization.
478                reset_deadlines_for.insert(entry);
479            }
480            if let Some(update) = update {
481                result.push(update);
482                any_pts_applied |= entry.is_some();
483            }
484        }
485        self.reset_deadlines(&reset_deadlines_for, next_updates_deadline());
486        reset_deadlines_for.clear();
487        self.tmp_entries = reset_deadlines_for;
488
489        // > If the updates were applied, local *Updates* state must be updated
490        // > with `seq` (unless it's 0) and `date` from the constructor.
491        //
492        // By "were applied", we assume it means "some other pts was applied".
493        // Updates which can be applied in any order, such as `UpdateChat`,
494        // should not cause `seq` to be updated (or upcoming updates such as
495        // `UpdateChatParticipant` could be missed).
496        if any_pts_applied {
497            if date != NO_DATE {
498                self.date = date;
499            }
500            if seq != NO_SEQ {
501                self.seq = seq;
502            }
503        }
504
505        if !self.possible_gaps.is_empty() {
506            // For each update in possible gaps, see if the gap has been resolved already.
507            let keys = self.possible_gaps.keys().copied().collect::<Vec<_>>();
508            for key in keys {
509                self.possible_gaps
510                    .get_mut(&key)
511                    .unwrap()
512                    .updates
513                    .sort_by_key(update_sort_key);
514
515                for _ in 0..self.possible_gaps[&key].updates.len() {
516                    let update = self.possible_gaps.get_mut(&key).unwrap().updates.remove(0);
517                    // If this fails to apply, it will get re-inserted at the end.
518                    // All should fail, so the order will be preserved (it would've cycled once).
519                    if let (_, Some(update)) = self.apply_pts_info(update) {
520                        result.push(update);
521                    }
522                }
523            }
524
525            // Clear now-empty gaps.
526            self.possible_gaps.retain(|_, v| !v.updates.is_empty());
527            if self.possible_gaps.is_empty() {
528                debug!("successfully resolved gap by waiting");
529            }
530        }
531
532        Ok((result, users, chats))
533    }
534
535    /// Tries to apply the input update if its `PtsInfo` follows the correct order.
536    ///
537    /// If the update can be applied, it is returned; otherwise, the update is stored in a
538    /// possible gap (unless it was already handled or would be handled through getting
539    /// difference) and `None` is returned.
540    fn apply_pts_info(
541        &mut self,
542        update: tl::enums::Update,
543    ) -> (Option<Entry>, Option<tl::enums::Update>) {
544        if let tl::enums::Update::ChannelTooLong(u) = update {
545            self.try_begin_get_diff(Entry::Channel(u.channel_id));
546            return (None, None);
547        }
548
549        let pts = match PtsInfo::from_update(&update) {
550            Some(pts) => pts,
551            // No pts means that the update can be applied in any order.
552            None => return (None, Some(update)),
553        };
554
555        if self.getting_diff_for.contains(&pts.entry) {
556            debug!(
557                "skipping update for {:?} (getting difference, count {:?}, remote {:?})",
558                pts.entry, pts.pts_count, pts.pts
559            );
560            // Note: early returning here also prevents gap from being inserted (which they should
561            // not be while getting difference).
562            return (Some(pts.entry), None);
563        }
564
565        if let Some(state) = self.map.get(&pts.entry) {
566            let local_pts = state.pts;
567            match (local_pts + pts.pts_count).cmp(&pts.pts) {
568                // Apply
569                Ordering::Equal => {}
570                // Ignore
571                Ordering::Greater => {
572                    debug!(
573                        "skipping update for {:?} (local {:?}, count {:?}, remote {:?})",
574                        pts.entry, local_pts, pts.pts_count, pts.pts
575                    );
576                    return (Some(pts.entry), None);
577                }
578                Ordering::Less => {
579                    info!(
580                        "gap on update for {:?} (local {:?}, count {:?}, remote {:?})",
581                        pts.entry, local_pts, pts.pts_count, pts.pts
582                    );
583                    // TODO store chats too?
584                    self.possible_gaps
585                        .entry(pts.entry)
586                        .or_insert_with(|| PossibleGap {
587                            deadline: Instant::now() + POSSIBLE_GAP_TIMEOUT,
588                            updates: Vec::new(),
589                        })
590                        .updates
591                        .push(update);
592
593                    return (Some(pts.entry), None);
594                }
595            }
596        }
597        // else, there is no previous `pts` known, and because this update has to be "right"
598        // (it's the first one) our `local_pts` must be `pts - pts_count`.
599
600        self.map
601            .entry(pts.entry)
602            .or_insert_with(|| State {
603                pts: NO_PTS,
604                deadline: next_updates_deadline(),
605            })
606            .pts = pts.pts;
607
608        (Some(pts.entry), Some(update))
609    }
610}
611
612/// Getting and applying account difference.
613impl MessageBox {
614    /// Return the request that needs to be made to get the difference, if any.
615    pub fn get_difference(&mut self) -> Option<tl::functions::updates::GetDifference> {
616        for entry in [Entry::AccountWide, Entry::SecretChats] {
617            if self.getting_diff_for.contains(&entry) {
618                if !self.map.contains_key(&entry) {
619                    panic!(
620                        "Should not try to get difference for an entry {entry:?} without known state"
621                    );
622                }
623
624                let gd = tl::functions::updates::GetDifference {
625                    pts: self.map[&Entry::AccountWide].pts,
626                    pts_limit: None,
627                    pts_total_limit: None,
628                    date: self.date,
629                    qts: if self.map.contains_key(&Entry::SecretChats) {
630                        self.map[&Entry::SecretChats].pts
631                    } else {
632                        NO_PTS
633                    },
634                    qts_limit: None,
635                };
636                trace!("requesting {:?}", gd);
637                return Some(gd);
638            }
639        }
640        None
641    }
642
643    /// Similar to [`MessageBox::process_updates`], but using the result from getting difference.
644    pub fn apply_difference(
645        &mut self,
646        difference: tl::enums::updates::Difference,
647        chat_hashes: &mut ChatHashCache,
648    ) -> defs::UpdateAndPeers {
649        trace!("applying account difference: {:?}", difference);
650        let finish: bool;
651        let result = match difference {
652            tl::enums::updates::Difference::Empty(diff) => {
653                debug!(
654                    "handling empty difference (date = {}, seq = {}); no longer getting diff",
655                    diff.date, diff.seq
656                );
657                finish = true;
658                self.date = diff.date;
659                self.seq = diff.seq;
660                (Vec::new(), Vec::new(), Vec::new())
661            }
662            tl::enums::updates::Difference::Difference(diff) => {
663                // TODO return Err(attempt to find users)
664                let _ = chat_hashes.extend(&diff.users, &diff.chats);
665
666                debug!(
667                    "handling full difference {:?}; no longer getting diff",
668                    diff.state
669                );
670                finish = true;
671                self.apply_difference_type(diff, chat_hashes)
672            }
673            tl::enums::updates::Difference::Slice(tl::types::updates::DifferenceSlice {
674                new_messages,
675                new_encrypted_messages,
676                other_updates,
677                chats,
678                users,
679                intermediate_state: state,
680            }) => {
681                // TODO return Err(attempt to find users)
682                let _ = chat_hashes.extend(&users, &chats);
683
684                debug!("handling partial difference {:?}", state);
685                finish = false;
686                self.apply_difference_type(
687                    tl::types::updates::Difference {
688                        new_messages,
689                        new_encrypted_messages,
690                        other_updates,
691                        chats,
692                        users,
693                        state,
694                    },
695                    chat_hashes,
696                )
697            }
698            tl::enums::updates::Difference::TooLong(diff) => {
699                debug!(
700                    "handling too-long difference (pts = {}); no longer getting diff",
701                    diff.pts
702                );
703                finish = true;
704                // the deadline will be reset once the diff ends
705                self.map.get_mut(&Entry::AccountWide).unwrap().pts = diff.pts;
706                (Vec::new(), Vec::new(), Vec::new())
707            }
708        };
709
710        if finish {
711            let account = self.getting_diff_for.contains(&Entry::AccountWide);
712            let secret = self.getting_diff_for.contains(&Entry::SecretChats);
713
714            if !account && !secret {
715                panic!("Should not be applying the difference when neither account or secret diff was active")
716            }
717
718            if account {
719                self.end_get_diff(Entry::AccountWide);
720            }
721            if secret {
722                self.end_get_diff(Entry::SecretChats);
723            }
724        }
725
726        result
727    }
728
729    fn apply_difference_type(
730        &mut self,
731        tl::types::updates::Difference {
732            new_messages,
733            new_encrypted_messages,
734            other_updates: updates,
735            chats,
736            users,
737            state: tl::enums::updates::State::State(state),
738        }: tl::types::updates::Difference,
739        chat_hashes: &mut ChatHashCache,
740    ) -> defs::UpdateAndPeers {
741        self.map.get_mut(&Entry::AccountWide).unwrap().pts = state.pts;
742        self.map
743            .entry(Entry::SecretChats)
744            // AccountWide affects SecretChats, but this may not have been initialized yet (#258)
745            .or_insert_with(|| State {
746                pts: NO_PTS,
747                deadline: next_updates_deadline(),
748            })
749            .pts = state.qts;
750        self.date = state.date;
751        self.seq = state.seq;
752
753        // other_updates can contain things like UpdateChannelTooLong and UpdateNewChannelMessage.
754        // We need to process those as if they were socket updates to discard any we have already handled.
755        let us = tl::enums::Updates::Updates(tl::types::Updates {
756            updates,
757            users,
758            chats,
759            date: NO_DATE,
760            seq: NO_SEQ,
761        });
762
763        // It is possible that the result from `GetDifference` includes users with `min = true`.
764        // TODO in that case, we will have to resort to getUsers.
765        let (mut result_updates, users, chats) = self
766            .process_updates(us, chat_hashes)
767            .expect("gap is detected while applying difference");
768
769        result_updates.extend(
770            new_messages
771                .into_iter()
772                .map(|message| {
773                    tl::types::UpdateNewMessage {
774                        message,
775                        pts: NO_PTS,
776                        pts_count: 0,
777                    }
778                    .into()
779                })
780                .chain(new_encrypted_messages.into_iter().map(|message| {
781                    tl::types::UpdateNewEncryptedMessage {
782                        message,
783                        qts: NO_PTS,
784                    }
785                    .into()
786                })),
787        );
788
789        (result_updates, users, chats)
790    }
791}
792
793/// Getting and applying channel difference.
794impl MessageBox {
795    /// Return the request that needs to be made to get a channel's difference, if any.
796    pub fn get_channel_difference(
797        &mut self,
798        chat_hashes: &ChatHashCache,
799    ) -> Option<tl::functions::updates::GetChannelDifference> {
800        let (entry, id) = self
801            .getting_diff_for
802            .iter()
803            .find_map(|&entry| match entry {
804                Entry::Channel(id) => Some((entry, id)),
805                _ => None,
806            })?;
807
808        if let Some(packed) = chat_hashes.get(id) {
809            let channel = tl::types::InputChannel {
810                channel_id: packed.id,
811                access_hash: packed
812                    .access_hash
813                    .expect("chat_hashes had chat without hash"),
814            }
815            .into();
816            if let Some(state) = self.map.get(&entry) {
817                let gd = tl::functions::updates::GetChannelDifference {
818                    force: false,
819                    channel,
820                    filter: tl::enums::ChannelMessagesFilter::Empty,
821                    pts: state.pts,
822                    limit: if chat_hashes.is_self_bot() {
823                        defs::BOT_CHANNEL_DIFF_LIMIT
824                    } else {
825                        defs::USER_CHANNEL_DIFF_LIMIT
826                    },
827                };
828                trace!("requesting {:?}", gd);
829                Some(gd)
830            } else {
831                panic!(
832                    "Should not try to get difference for an entry {entry:?} without known state"
833                );
834            }
835        } else {
836            warn!(
837                "cannot getChannelDifference for {} as we're missing its hash",
838                id
839            );
840            self.end_get_diff(entry);
841            // Remove the outdated `pts` entry from the map so that the next update can correct
842            // it. Otherwise, it will spam that the access hash is missing.
843            self.map.remove(&entry);
844            None
845        }
846    }
847
848    /// Similar to [`MessageBox::process_updates`], but using the result from getting difference.
849    pub fn apply_channel_difference(
850        &mut self,
851        request: tl::functions::updates::GetChannelDifference,
852        difference: tl::enums::updates::ChannelDifference,
853        chat_hashes: &mut ChatHashCache,
854    ) -> defs::UpdateAndPeers {
855        let channel_id = channel_id(&request).expect("request had wrong input channel");
856        trace!(
857            "applying channel difference for {}: {:?}",
858            channel_id,
859            difference
860        );
861        let entry = Entry::Channel(channel_id);
862
863        self.possible_gaps.remove(&entry);
864
865        match difference {
866            tl::enums::updates::ChannelDifference::Empty(diff) => {
867                assert!(diff.r#final);
868                debug!(
869                    "handling empty channel {} difference (pts = {}); no longer getting diff",
870                    channel_id, diff.pts
871                );
872                self.end_get_diff(entry);
873                self.map.get_mut(&entry).unwrap().pts = diff.pts;
874                (Vec::new(), Vec::new(), Vec::new())
875            }
876            tl::enums::updates::ChannelDifference::TooLong(diff) => {
877                // TODO return Err(attempt to find users)
878                let _ = chat_hashes.extend(&diff.users, &diff.chats);
879
880                assert!(diff.r#final);
881                info!(
882                    "handling too long channel {} difference; no longer getting diff",
883                    channel_id
884                );
885                match diff.dialog {
886                    tl::enums::Dialog::Dialog(d) => {
887                        self.map.get_mut(&entry).unwrap().pts = d.pts.expect(
888                            "channelDifferenceTooLong dialog did not actually contain a pts",
889                        );
890                    }
891                    tl::enums::Dialog::Folder(_) => {
892                        panic!("received a folder on channelDifferenceTooLong")
893                    }
894                }
895
896                self.reset_channel_deadline(channel_id, diff.timeout);
897                // This `diff` has the "latest messages and corresponding chats", but it would
898                // be strange to give the user only partial changes of these when they would
899                // expect all updates to be fetched. Instead, nothing is returned.
900                (Vec::new(), Vec::new(), Vec::new())
901            }
902            tl::enums::updates::ChannelDifference::Difference(
903                tl::types::updates::ChannelDifference {
904                    r#final,
905                    pts,
906                    timeout,
907                    new_messages,
908                    other_updates: updates,
909                    chats,
910                    users,
911                },
912            ) => {
913                // TODO return Err(attempt to find users)
914                let _ = chat_hashes.extend(&users, &chats);
915
916                if r#final {
917                    debug!(
918                        "handling channel {} difference; no longer getting diff",
919                        channel_id
920                    );
921                    self.end_get_diff(entry);
922                } else {
923                    debug!("handling channel {} difference", channel_id);
924                }
925
926                self.map.get_mut(&entry).unwrap().pts = pts;
927                let us = tl::enums::Updates::Updates(tl::types::Updates {
928                    updates,
929                    users,
930                    chats,
931                    date: NO_DATE,
932                    seq: NO_SEQ,
933                });
934                let (mut result_updates, users, chats) = self
935                    .process_updates(us, chat_hashes)
936                    .expect("gap is detected while applying channel difference");
937
938                result_updates.extend(new_messages.into_iter().map(|message| {
939                    tl::types::UpdateNewChannelMessage {
940                        message,
941                        pts: NO_PTS,
942                        pts_count: 0,
943                    }
944                    .into()
945                }));
946                self.reset_channel_deadline(channel_id, timeout);
947
948                (result_updates, users, chats)
949            }
950        }
951    }
952
953    pub fn end_channel_difference(
954        &mut self,
955        request: &tl::functions::updates::GetChannelDifference,
956        reason: PrematureEndReason,
957    ) {
958        if let Some(channel_id) = channel_id(request) {
959            trace!(
960                "ending channel difference for {} because {:?}",
961                channel_id,
962                reason
963            );
964            let entry = Entry::Channel(channel_id);
965            match reason {
966                PrematureEndReason::TemporaryServerIssues => {
967                    self.possible_gaps.remove(&entry);
968                    self.end_get_diff(entry);
969                }
970                PrematureEndReason::Banned => {
971                    self.possible_gaps.remove(&entry);
972                    self.end_get_diff(entry);
973                    self.map.remove(&entry);
974                }
975            }
976        };
977    }
978}
979
980pub fn channel_id(request: &tl::functions::updates::GetChannelDifference) -> Option<i64> {
981    match request.channel {
982        InputChannel::Channel(ref c) => Some(c.channel_id),
983        InputChannel::FromMessage(ref c) => Some(c.channel_id),
984        InputChannel::Empty => None,
985    }
986}
987
988#[derive(Debug)]
989pub enum PrematureEndReason {
990    TemporaryServerIssues,
991    Banned,
992}