Skip to main content

grammers_session/message_box/
mod.rs

1// Copyright 2020 - developers of the `grammers` project.
2//
3// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6// option. This file may not be copied, modified, or distributed
7// except according to those terms.
8
9//! This module deals with correct handling of updates,
10//! including gaps,
11//! and knowing when the code should "get difference"
12//! (the set of updates that the client should know by now
13//! minus the set of updates that it actually knows).
14//!
15//! Each chat has its own [`LiveEntry`] in the [`MessageBoxes`]
16//! (this `struct` is the "entry point").
17//! At any given time,
18//! the message box may be either getting difference for them
19//! (entry is in [`MessageBoxes::getting_diff_for`])
20//! or not.
21//! If not getting difference,
22//! a possible gap may be found for the updates
23//! (entry is in [`MessageBoxes::possible_gaps`]).
24//! Otherwise, the entry is on its happy path.
25//!
26//! Gaps are cleared when they are either
27//! resolved on their own
28//! (by waiting for a short time)
29//! or because we got the difference for the corresponding entry.
30//!
31//! While there are entries for which their difference must be fetched,
32//! [`MessageBoxes::check_deadlines`] will always return [`Instant::now`],
33//! since "now" is the time to get the difference.
34
35mod adaptor;
36mod defs;
37#[cfg(test)]
38mod tests;
39
40use std::cmp::Ordering;
41use std::time::Duration;
42#[cfg(not(test))]
43use std::time::Instant;
44
45use defs::Key;
46pub use defs::{Gap, MessageBox, MessageBoxes, State, UpdatesLike};
47use defs::{LiveEntry, NO_DATE, NO_PTS, NO_SEQ, POSSIBLE_GAP_TIMEOUT, PossibleGap, PtsInfo};
48use grammers_tl_types as tl;
49use log::{debug, info, trace};
50#[cfg(test)]
51use tests::Instant;
52
53use crate::types::{ChannelState, UpdatesState};
54
55fn next_updates_deadline() -> Instant {
56    Instant::now() + defs::NO_UPDATES_TIMEOUT
57}
58
59impl MessageBox {
60    pub fn pts(&self) -> i32 {
61        match self {
62            MessageBox::Common { pts } => *pts,
63            MessageBox::Secondary { qts } => *qts,
64            MessageBox::Channel { pts, .. } => *pts,
65        }
66    }
67}
68
69impl From<PtsInfo> for MessageBox {
70    fn from(value: PtsInfo) -> Self {
71        match value.key {
72            Key::Common => Self::Common { pts: value.pts },
73            Key::Secondary => Self::Secondary { qts: value.pts },
74            Key::Channel(channel_id) => Self::Channel {
75                channel_id,
76                pts: value.pts,
77            },
78        }
79    }
80}
81
82impl LiveEntry {
83    fn effective_deadline(&self) -> Instant {
84        match &self.possible_gap {
85            Some(gap) => gap.deadline.min(self.deadline),
86            None => self.deadline,
87        }
88    }
89}
90
91#[allow(clippy::new_without_default)]
92/// Creation, querying, and setting base state.
93impl MessageBoxes {
94    /// Create a new, empty [`MessageBoxes`].
95    pub fn new() -> Self {
96        trace!("created new message box with no previous state");
97        Self {
98            entries: Vec::new(),
99            date: NO_DATE,
100            seq: NO_SEQ,
101            getting_diff_for: Vec::new(),
102            possible_gaps: Vec::new(),
103            next_deadline: next_updates_deadline(),
104        }
105    }
106
107    /// Create a [`MessageBoxes`] from a previously known update state.
108    pub fn load(state: UpdatesState) -> Self {
109        trace!("created new message box with state: {:?}", state);
110        let mut entries = Vec::with_capacity(2 + state.channels.len());
111        let mut getting_diff_for = Vec::with_capacity(2 + state.channels.len());
112        let possible_gaps = Vec::with_capacity(2 + state.channels.len());
113        let deadline = next_updates_deadline();
114
115        if state.pts != NO_PTS {
116            entries.push(LiveEntry {
117                key: Key::Common,
118                pts: state.pts,
119                deadline,
120                possible_gap: None,
121            });
122        }
123        if state.qts != NO_PTS {
124            entries.push(LiveEntry {
125                key: Key::Secondary,
126                pts: state.qts,
127                deadline,
128                possible_gap: None,
129            });
130        }
131        entries.extend(state.channels.iter().map(|c| LiveEntry {
132            key: Key::Channel(c.id),
133            pts: c.pts,
134            deadline,
135            possible_gap: None,
136        }));
137        entries.sort_by_key(|entry| entry.key);
138
139        getting_diff_for.extend(entries.iter().map(|entry| entry.key));
140
141        Self {
142            entries,
143            date: state.date,
144            seq: state.seq,
145            getting_diff_for,
146            possible_gaps,
147            next_deadline: deadline,
148        }
149    }
150
151    fn entry(&self, key: Key) -> Option<&LiveEntry> {
152        self.entries
153            .binary_search_by_key(&key, |entry| entry.key)
154            .map(|i| &self.entries[i])
155            .ok()
156    }
157
158    fn update_entry(&mut self, key: Key, updater: impl FnOnce(&mut LiveEntry)) -> bool {
159        match self.entries.binary_search_by_key(&key, |entry| entry.key) {
160            Ok(i) => {
161                updater(&mut self.entries[i]);
162                true
163            }
164            Err(_) => false,
165        }
166    }
167
168    fn set_entry(&mut self, entry: LiveEntry) {
169        match self
170            .entries
171            .binary_search_by_key(&entry.key, |entry| entry.key)
172        {
173            Ok(i) => {
174                self.possible_gaps.retain(|&k| k != entry.key);
175                self.entries[i] = entry;
176            }
177            Err(i) => self.entries.insert(i, entry),
178        }
179    }
180
181    fn set_pts(&mut self, key: Key, pts: i32) {
182        if !self.update_entry(key, |entry| entry.pts = pts) {
183            self.set_entry(LiveEntry {
184                key: key,
185                pts: pts,
186                deadline: next_updates_deadline(),
187                possible_gap: None,
188            });
189        }
190    }
191
192    fn pop_entry(&mut self, key: Key) -> Option<LiveEntry> {
193        match self.entries.binary_search_by_key(&key, |entry| entry.key) {
194            Ok(i) => Some(self.entries.remove(i)),
195            Err(_) => None,
196        }
197    }
198
199    fn push_gap(&mut self, key: Key, gap: Option<tl::enums::Update>) -> bool {
200        let deadline = Instant::now() + POSSIBLE_GAP_TIMEOUT;
201        let has_gap = gap.is_some();
202        let exists = self.update_entry(key, |entry| {
203            let possible_gap = entry.possible_gap.take();
204
205            entry.possible_gap = gap.map(|update| match possible_gap {
206                Some(mut possible) => {
207                    possible.updates.push(update);
208                    possible
209                }
210                None => PossibleGap {
211                    deadline,
212                    updates: vec![update],
213                },
214            });
215        });
216        if exists {
217            if has_gap {
218                if !self.possible_gaps.contains(&key) {
219                    self.possible_gaps.push(key);
220                    self.next_deadline = self.next_deadline.min(deadline);
221                }
222            } else {
223                if let Some(i) = self.possible_gaps.iter().position(|&k| k == key) {
224                    self.possible_gaps.remove(i);
225                }
226            }
227        }
228        exists
229    }
230
231    /// Return the current state in a format that sessions understand.
232    ///
233    /// This should be used for persisting the state.
234    pub fn session_state(&self) -> UpdatesState {
235        UpdatesState {
236            pts: self.entry(Key::Common).map(|s| s.pts).unwrap_or(NO_PTS),
237            qts: self.entry(Key::Secondary).map(|s| s.pts).unwrap_or(NO_PTS),
238            date: self.date,
239            seq: self.seq,
240            channels: self
241                .entries
242                .iter()
243                .filter_map(|entry| match entry.key {
244                    Key::Channel(id) => Some(ChannelState { id, pts: entry.pts }.into()),
245                    _ => None,
246                })
247                .collect(),
248        }
249    }
250
251    /// Return true if the message box is empty and has no state yet.
252    pub fn is_empty(&self) -> bool {
253        self.entries.is_empty()
254    }
255
256    /// Return the next deadline when receiving updates should timeout.
257    ///
258    /// If a deadline expired, the corresponding entries will be marked as needing to get its difference.
259    /// While there are entries pending of getting their difference, this method returns the current instant.
260    pub fn check_deadlines(&mut self) -> Instant {
261        let now = Instant::now();
262
263        if !self.getting_diff_for.is_empty() {
264            return self.next_deadline;
265        }
266
267        if now >= self.next_deadline {
268            self.getting_diff_for
269                .extend(self.entries.iter().filter_map(|entry| {
270                    if now >= entry.effective_deadline() {
271                        debug!("deadline for forcibly fetching updates met for {:?}", entry);
272                        Some(entry.key)
273                    } else {
274                        None
275                    }
276                }));
277
278            // When extending `getting_diff_for`, it's important to have the moral equivalent of
279            // `begin_get_diff` (that is, clear possible gaps if we're now getting difference).
280            for i in 0..self.getting_diff_for.len() {
281                self.push_gap(self.getting_diff_for[i], None);
282            }
283
284            // To avoid hogging up all CPU by sleeping for no duration,
285            // if the deadline was met but there's no state to fetch,
286            // advance the deadline.
287            if self.getting_diff_for.is_empty() {
288                self.next_deadline = next_updates_deadline();
289            }
290        }
291
292        self.next_deadline
293    }
294
295    /// Reset the deadline of an existing entry.
296    /// Does nothing if the entry doesn't exist.
297    ///
298    /// If the updated deadline was the currently-next-deadline,
299    /// the new closest deadline is calculated and stored as the next.
300    fn reset_deadline(&mut self, key: Key, deadline: Instant) {
301        let mut old_deadline = self.next_deadline;
302        self.update_entry(key, |entry| {
303            old_deadline = entry.deadline;
304            entry.deadline = deadline;
305        });
306        if self.next_deadline == old_deadline {
307            self.next_deadline = self
308                .entries
309                .iter()
310                .fold(deadline, |d, entry| d.min(entry.effective_deadline()));
311        }
312    }
313
314    fn reset_timeout(&mut self, key: Key, timeout: Option<i32>) {
315        self.reset_deadline(
316            key,
317            timeout
318                .map(|t| Instant::now() + Duration::from_secs(t as _))
319                .unwrap_or_else(next_updates_deadline),
320        );
321    }
322
323    /// Sets the update state.
324    ///
325    /// Should be called right after login if [`MessageBoxes::new`] was used, otherwise undesirable
326    /// updates will be fetched.
327    ///
328    /// Should only be called while [`MessageBoxes::is_empty`].
329    pub fn set_state(&mut self, state: tl::enums::updates::State) {
330        trace!("setting state {:?}", state);
331        debug_assert!(self.is_empty());
332        let deadline = next_updates_deadline();
333        let state: tl::types::updates::State = state.into();
334        self.set_entry(LiveEntry {
335            key: Key::Common,
336            pts: state.pts,
337            deadline,
338            possible_gap: None,
339        });
340        self.set_entry(LiveEntry {
341            key: Key::Secondary,
342            pts: state.qts,
343            deadline,
344            possible_gap: None,
345        });
346        self.date = state.date;
347        self.seq = state.seq;
348        self.next_deadline = deadline;
349    }
350
351    /// Like [`MessageBoxes::set_state`], but for channels. Useful when getting dialogs.
352    ///
353    /// The update state will only be updated if no entry was known previously.
354    pub fn try_set_channel_state(&mut self, id: i64, pts: i32) {
355        trace!("trying to set channel state for {}: {}", id, pts);
356        if self.entry(Key::Channel(id)).is_none() {
357            self.set_entry(LiveEntry {
358                key: Key::Channel(id),
359                pts: pts,
360                deadline: next_updates_deadline(),
361                possible_gap: None,
362            });
363        }
364    }
365
366    /// Try to begin getting difference for the given entry.
367    /// Fails if the entry does not have a previously-known state that can be used to get its difference.
368    ///
369    /// Clears any previous gaps.
370    fn try_begin_get_diff(&mut self, key: Key) {
371        if self.push_gap(key, None) {
372            self.getting_diff_for.push(key);
373        }
374    }
375
376    /// Finish getting difference for the given entry, and resets its deadline.
377    /// Does nothing if no difference was being fetched for the given key.
378    fn try_end_get_diff(&mut self, key: Key) {
379        let i = match self.getting_diff_for.iter().position(|&k| k == key) {
380            Some(i) => i,
381            None => return,
382        };
383        self.getting_diff_for.remove(i);
384        self.reset_deadline(key, next_updates_deadline());
385
386        debug_assert!(
387            self.entry(key)
388                .is_none_or(|entry| entry.possible_gap.is_none()),
389            "gaps shouldn't be created while getting difference"
390        );
391    }
392}
393
394// "Normal" updates flow (processing and detection of gaps).
395impl MessageBoxes {
396    /// Process an update and return what should be done with it.
397    ///
398    /// Updates corresponding to entries for which their difference is currently being fetched
399    /// will be ignored. While according to the [updates' documentation]:
400    ///
401    /// > Implementations \[have\] to postpone updates received via the socket while
402    /// > filling gaps in the event and `Update` sequences, as well as avoid filling
403    /// > gaps in the same sequence.
404    ///
405    /// In practice, these updates should have also been retrieved through getting difference.
406    ///
407    /// [updates' documentation]: https://core.telegram.org/api/updates
408    pub fn process_updates(&mut self, updates: UpdatesLike) -> Result<defs::UpdateAndPeers, Gap> {
409        trace!("processing updates: {:?}", updates);
410        let deadline = next_updates_deadline();
411
412        // Top level, when handling received `updates` and `updatesCombined`.
413        // `updatesCombined` groups all the fields we care about, which is why we use it.
414        //
415        // This assumes all access hashes are already known to the client (socket updates are
416        // expected to use `ensure_known_peer_hashes`, and the result from getting difference
417        // has to deal with the peers in a different way).
418        let tl::types::UpdatesCombined {
419            date,
420            seq_start,
421            seq,
422            mut updates,
423            users,
424            chats,
425        } = match adaptor::adapt(updates) {
426            Ok(combined) => combined,
427            Err(Gap) => {
428                self.try_begin_get_diff(Key::Common);
429                return Err(Gap);
430            }
431        };
432
433        let new_date = if date == NO_DATE { self.date } else { date };
434        let new_seq = if seq == NO_SEQ { self.seq } else { seq };
435        let mk_state = |message_box| State {
436            date: new_date,
437            seq: new_seq,
438            message_box,
439        };
440
441        // > For all the other [not `updates` or `updatesCombined`] `Updates` type constructors
442        // > there is no need to check `seq` or change a local state.
443        if seq_start != NO_SEQ {
444            match (self.seq + 1).cmp(&seq_start) {
445                // Apply
446                Ordering::Equal => {}
447                // Ignore
448                Ordering::Greater => {
449                    debug!(
450                        "skipping updates that were already handled at seq = {}",
451                        self.seq
452                    );
453                    return Ok((Vec::new(), users, chats));
454                }
455                Ordering::Less => {
456                    debug!(
457                        "gap detected (local seq {}, remote seq {})",
458                        self.seq, seq_start
459                    );
460                    self.try_begin_get_diff(Key::Common);
461                    return Err(Gap);
462                }
463            }
464        }
465
466        fn update_sort_key(update: &tl::enums::Update) -> i32 {
467            match PtsInfo::from_update(update) {
468                Some(info) => info.pts - info.count,
469                None => NO_PTS,
470            }
471        }
472
473        // Telegram can send updates out of order (e.g. `ReadChannelInbox` first
474        // and then `NewChannelMessage`, both with the same `pts`, but the `count`
475        // is `0` and `1` respectively), so we sort them first.
476        updates.sort_by_key(update_sort_key);
477
478        let mut result = Vec::with_capacity(updates.len() + self.possible_gaps.len());
479        let had_gaps = !self.possible_gaps.is_empty();
480
481        // This loop does a lot at once to reduce the amount of times we need to iterate over
482        // the updates as an optimization.
483        //
484        // It mutates the local pts state, remembers possible gaps, builds a set of entries for
485        // which the deadlines should be reset, and determines whether any local pts was changed
486        // so that the seq can be updated too (which could otherwise have been done earlier).
487        for update in updates {
488            let (key, update) = self.apply_pts_info(update);
489            if let Some(key) = key {
490                // As soon as we receive an update of any form related to messages (has `PtsInfo`),
491                // the "no updates" period for that entry is reset. All the deadlines are reset at
492                // once via the temporary entries buffer as an optimization.
493                self.reset_deadline(key, deadline);
494            }
495            if let Some((update, message_box)) = update {
496                result.push((update, mk_state(message_box)));
497            }
498        }
499
500        if had_gaps {
501            // Newly-added gaps won't be resolved immediately, so we check if we had gaps before.
502            // For each update in possible gaps, see if the gap has been resolved already.
503            for i in (0..self.possible_gaps.len()).rev() {
504                let key = self.possible_gaps[i];
505                let mut gap = None;
506                self.update_entry(key, |entry| {
507                    gap = entry.possible_gap.take();
508                });
509                let mut gap = gap.unwrap();
510                gap.updates.sort_by_key(update_sort_key);
511
512                // If this fails to apply, it will get re-inserted at the end.
513                // All should fail, so the order will be preserved (it would've cycled once).
514                for update in gap.updates {
515                    if let (_, Some((update, message_box))) = self.apply_pts_info(update) {
516                        result.push((update, mk_state(message_box)));
517                    }
518                }
519
520                // Gap was taken earlier. If it's still empty, the key can be removed from possible gaps.
521                if self
522                    .entry(key)
523                    .is_some_and(|entry| entry.possible_gap.is_none())
524                {
525                    self.possible_gaps.swap_remove(i);
526                    debug!("successfully resolved gap by waiting");
527                }
528            }
529        }
530
531        if !result.is_empty() && self.possible_gaps.is_empty() {
532            // > If the updates were applied, local *Updates* state must be updated
533            // > with `seq` (unless it's 0) and `date` from the constructor.
534            self.date = new_date;
535            self.seq = new_seq;
536        }
537
538        Ok((result, users, chats))
539    }
540
541    /// Tries to apply the input update if its `PtsInfo` follows the correct order.
542    ///
543    /// If the update can be applied, it is returned; otherwise, the update is stored in a
544    /// possible gap (unless it was already handled or would be handled through getting
545    /// difference) and `None` is returned.
546    fn apply_pts_info(
547        &mut self,
548        update: tl::enums::Update,
549    ) -> (Option<Key>, Option<(tl::enums::Update, Option<MessageBox>)>) {
550        if let tl::enums::Update::ChannelTooLong(u) = update {
551            self.try_begin_get_diff(Key::Channel(u.channel_id));
552            return (None, None);
553        }
554
555        let info = match PtsInfo::from_update(&update) {
556            Some(info) => info,
557            // No pts means that the update can be applied in any order.
558            None => return (None, Some((update, None))),
559        };
560
561        if self.getting_diff_for.contains(&info.key) {
562            debug!(
563                "skipping update for {:?} (getting difference, count {:?}, remote {:?})",
564                info.key, info.count, info.pts
565            );
566            // Note: early returning here also prevents gap from being inserted (which they should
567            // not be while getting difference).
568            return (Some(info.key), Some((update, Some(info.into()))));
569        }
570
571        if let Some(local_pts) = self.entry(info.key).map(|entry| entry.pts) {
572            match (local_pts + info.count).cmp(&info.pts) {
573                // Apply
574                Ordering::Equal => {}
575                // Ignore
576                Ordering::Greater => {
577                    debug!(
578                        "skipping update for {:?} (local {:?}, count {:?}, remote {:?})",
579                        info.key, local_pts, info.count, info.pts
580                    );
581                    return (Some(info.key), None);
582                }
583                Ordering::Less => {
584                    info!(
585                        "gap on update for {:?} (local {:?}, count {:?}, remote {:?})",
586                        info.key, local_pts, info.count, info.pts
587                    );
588                    // TODO store chats too?
589                    self.push_gap(info.key, Some(update));
590
591                    return (Some(info.key), None);
592                }
593            }
594        }
595        // else, there is no previous `pts` known, and because this update has to be "right"
596        // (it's the first one) our `local_pts` must be `pts - pts_count`.
597
598        self.set_pts(info.key, info.pts);
599        (Some(info.key), Some((update, Some(info.into()))))
600    }
601}
602
603/// Getting and applying account difference.
604impl MessageBoxes {
605    /// Return the request that needs to be made to get the difference, if any.
606    pub fn get_difference(&self) -> Option<tl::functions::updates::GetDifference> {
607        for entry in [Key::Common, Key::Secondary] {
608            if self.getting_diff_for.contains(&entry) {
609                let pts = self
610                    .entry(Key::Common)
611                    .map(|entry| entry.pts)
612                    .expect("common entry to exist when getting difference for it");
613
614                let gd = tl::functions::updates::GetDifference {
615                    pts,
616                    pts_limit: None,
617                    pts_total_limit: None,
618                    date: self.date.max(1), // non-zero or the request will fail
619                    qts: self
620                        .entry(Key::Secondary)
621                        .map(|entry| entry.pts)
622                        .unwrap_or(NO_PTS),
623                    qts_limit: None,
624                };
625                trace!("requesting {:?}", gd);
626                return Some(gd);
627            }
628        }
629        None
630    }
631
632    /// Similar to [`MessageBoxes::process_updates`], but using the result from getting difference.
633    pub fn apply_difference(
634        &mut self,
635        difference: tl::enums::updates::Difference,
636    ) -> defs::UpdateAndPeers {
637        trace!("applying account difference: {:?}", difference);
638        debug_assert!(
639            self.getting_diff_for.contains(&Key::Common)
640                || self.getting_diff_for.contains(&Key::Secondary)
641        );
642        let finish: bool;
643        let result = match difference {
644            tl::enums::updates::Difference::Empty(diff) => {
645                debug!(
646                    "handling empty difference (date = {}, seq = {}); no longer getting diff",
647                    diff.date, diff.seq
648                );
649                finish = true;
650                self.date = diff.date;
651                self.seq = diff.seq;
652                (Vec::new(), Vec::new(), Vec::new())
653            }
654            tl::enums::updates::Difference::Difference(diff) => {
655                debug!(
656                    "handling full difference {:?}; no longer getting diff",
657                    diff.state
658                );
659                finish = true;
660                self.apply_difference_type(diff)
661            }
662            tl::enums::updates::Difference::Slice(tl::types::updates::DifferenceSlice {
663                new_messages,
664                new_encrypted_messages,
665                other_updates,
666                chats,
667                users,
668                intermediate_state: state,
669            }) => {
670                debug!("handling partial difference {:?}", state);
671                finish = false;
672                self.apply_difference_type(tl::types::updates::Difference {
673                    new_messages,
674                    new_encrypted_messages,
675                    other_updates,
676                    chats,
677                    users,
678                    state,
679                })
680            }
681            tl::enums::updates::Difference::TooLong(diff) => {
682                debug!(
683                    "handling too-long difference (pts = {}); no longer getting diff",
684                    diff.pts
685                );
686                finish = true;
687                // the deadline will be reset once the diff ends
688                self.set_pts(Key::Common, diff.pts);
689                (Vec::new(), Vec::new(), Vec::new())
690            }
691        };
692
693        if finish {
694            self.try_end_get_diff(Key::Common);
695            self.try_end_get_diff(Key::Secondary);
696        }
697
698        result
699    }
700
701    fn apply_difference_type(
702        &mut self,
703        tl::types::updates::Difference {
704            new_messages,
705            new_encrypted_messages,
706            other_updates: updates,
707            chats,
708            users,
709            state: tl::enums::updates::State::State(state),
710        }: tl::types::updates::Difference,
711    ) -> defs::UpdateAndPeers {
712        self.date = state.date;
713        self.seq = state.seq;
714        self.set_pts(Key::Common, state.pts);
715        self.set_pts(Key::Secondary, state.qts);
716        let mk_state = |message_box| State {
717            date: state.date,
718            seq: state.seq,
719            message_box: Some(message_box),
720        };
721
722        // other_updates can contain things like UpdateChannelTooLong and UpdateNewChannelMessage.
723        // We need to process those as if they were socket updates to discard any we have already handled.
724        let us = UpdatesLike::Updates(tl::enums::Updates::Updates(tl::types::Updates {
725            updates,
726            users,
727            chats,
728            date: NO_DATE,
729            seq: NO_SEQ,
730        }));
731
732        // It is possible that the result from `GetDifference` includes users with `min = true`.
733        // TODO in that case, we will have to resort to getUsers.
734        let (mut result_updates, users, chats) = self
735            .process_updates(us)
736            .expect("gap is detected while applying difference");
737
738        result_updates.extend(
739            new_messages
740                .into_iter()
741                .map(|message| {
742                    (
743                        tl::types::UpdateNewMessage {
744                            message,
745                            pts: NO_PTS,
746                            pts_count: 0,
747                        }
748                        .into(),
749                        mk_state(MessageBox::Common { pts: state.pts }),
750                    )
751                })
752                .chain(new_encrypted_messages.into_iter().map(|message| {
753                    (
754                        tl::types::UpdateNewEncryptedMessage {
755                            message,
756                            qts: NO_PTS,
757                        }
758                        .into(),
759                        mk_state(MessageBox::Secondary { qts: state.qts }),
760                    )
761                })),
762        );
763
764        (result_updates, users, chats)
765    }
766}
767
768/// Getting and applying channel difference.
769impl MessageBoxes {
770    /// Return the request that needs to be made to get a channel's difference, if any.
771    ///
772    /// Note that this only returns the skeleton of a request.
773    /// Both the channel hash and limit will be their default value.
774    pub fn get_channel_difference(&self) -> Option<tl::functions::updates::GetChannelDifference> {
775        let (key, channel_id) = self.getting_diff_for.iter().find_map(|&key| match key {
776            Key::Channel(id) => Some((key, id)),
777            _ => None,
778        })?;
779
780        if let Some(pts) = self.entry(key).map(|entry| entry.pts) {
781            let gd = tl::functions::updates::GetChannelDifference {
782                force: false,
783                channel: tl::types::InputChannel {
784                    channel_id,
785                    access_hash: 0,
786                }
787                .into(),
788                filter: tl::enums::ChannelMessagesFilter::Empty,
789                pts,
790                limit: 0,
791            };
792            trace!("requesting {:?}", gd);
793            Some(gd)
794        } else {
795            panic!("Should not try to get difference for an entry {key:?} without known state");
796        }
797    }
798
799    /// Similar to [`MessageBoxes::process_updates`], but using the result from getting difference.
800    pub fn apply_channel_difference(
801        &mut self,
802        difference: tl::enums::updates::ChannelDifference,
803    ) -> defs::UpdateAndPeers {
804        let (key, channel_id) = self
805            .getting_diff_for
806            .iter()
807            .find_map(|&key| match key {
808                Key::Channel(id) => Some((key, id)),
809                _ => None,
810            })
811            .expect("applying channel difference to have a channel in getting_diff_for");
812
813        trace!(
814            "applying channel difference for {}: {:?}",
815            channel_id, difference
816        );
817
818        self.push_gap(key, None);
819
820        let tl::types::updates::ChannelDifference {
821            r#final,
822            pts,
823            timeout,
824            new_messages,
825            other_updates: updates,
826            chats,
827            users,
828        } = adaptor::adapt_channel_difference(difference);
829
830        if r#final {
831            debug!(
832                "handling channel {} difference; no longer getting diff",
833                channel_id
834            );
835            self.try_end_get_diff(key);
836        } else {
837            debug!("handling channel {} difference", channel_id);
838        }
839
840        self.set_pts(key, pts);
841        let us = UpdatesLike::Updates(tl::enums::Updates::Updates(tl::types::Updates {
842            updates,
843            users,
844            chats,
845            date: NO_DATE,
846            seq: NO_SEQ,
847        }));
848        let (mut result_updates, users, chats) = self
849            .process_updates(us)
850            .expect("gap is detected while applying channel difference");
851
852        result_updates.extend(new_messages.into_iter().map(|message| {
853            (
854                tl::types::UpdateNewChannelMessage {
855                    message,
856                    pts: NO_PTS,
857                    pts_count: 0,
858                }
859                .into(),
860                State {
861                    date: self.date,
862                    seq: self.seq,
863                    message_box: Some(MessageBox::Channel { channel_id, pts }),
864                },
865            )
866        }));
867
868        self.reset_timeout(key, timeout);
869
870        (result_updates, users, chats)
871    }
872
873    pub fn end_channel_difference(&mut self, reason: PrematureEndReason) {
874        let (key, channel_id) = self
875            .getting_diff_for
876            .iter()
877            .find_map(|&key| match key {
878                Key::Channel(id) => Some((key, id)),
879                _ => None,
880            })
881            .expect("ending channel difference to have a channel in getting_diff_for");
882
883        trace!(
884            "ending channel difference for {} because {:?}",
885            channel_id, reason
886        );
887        match reason {
888            PrematureEndReason::TemporaryServerIssues => {
889                self.push_gap(key, None);
890                self.try_end_get_diff(key);
891            }
892            PrematureEndReason::Banned => {
893                self.push_gap(key, None);
894                self.try_end_get_diff(key);
895                self.pop_entry(key);
896            }
897        }
898    }
899}
900
901/// Reason for calling [`MessageBoxes::end_channel_difference`].
902#[derive(Debug)]
903pub enum PrematureEndReason {
904    /// The channel difference failed to be fetched for a temporary reasons.
905    ///
906    /// The channel state must be kept,
907    /// and getting its difference retried in the future.
908    TemporaryServerIssues,
909    /// The logged-in account has been banned from the channel and won't
910    /// be able to fetch the difference in the future.
911    ///
912    /// The channel state will be removed,
913    /// and getting its difference should not be retried in the future.
914    Banned,
915}