1mod adaptor;
26mod defs;
27
28use super::ChatHashCache;
29use crate::generated::enums::ChannelState as ChannelStateEnum;
30use crate::generated::types::ChannelState;
31use crate::message_box::defs::PossibleGap;
32use crate::UpdateState;
33pub(crate) use defs::Entry;
34pub use defs::{Gap, MessageBox};
35use defs::{PtsInfo, State, NO_DATE, NO_PTS, NO_SEQ, POSSIBLE_GAP_TIMEOUT};
36use grammers_tl_types as tl;
37use log::{debug, info, trace, warn};
38use std::cmp::Ordering;
39use std::collections::{HashMap, HashSet};
40use std::mem;
41use std::time::{Duration, Instant};
42use tl::enums::InputChannel;
43
44fn next_updates_deadline() -> Instant {
45 Instant::now() + defs::NO_UPDATES_TIMEOUT
46}
47
48#[allow(clippy::new_without_default)]
49impl MessageBox {
51 pub fn new() -> Self {
55 trace!("created new message box with no previous state");
56 Self {
57 map: HashMap::new(),
58 date: 1, seq: NO_SEQ,
60 possible_gaps: HashMap::new(),
61 getting_diff_for: HashSet::new(),
62 next_deadline: None,
63 tmp_entries: HashSet::new(),
64 }
65 }
66
67 pub fn load(state: UpdateState) -> Self {
69 trace!("created new message box with state: {:?}", state);
70 let deadline = next_updates_deadline();
71 let mut map = HashMap::with_capacity(2 + state.channels.len());
72 map.insert(
73 Entry::AccountWide,
74 State {
75 pts: state.pts,
76 deadline,
77 },
78 );
79 map.insert(
80 Entry::SecretChats,
81 State {
82 pts: state.qts,
83 deadline,
84 },
85 );
86 map.extend(state.channels.iter().map(|ChannelStateEnum::State(c)| {
87 (
88 Entry::Channel(c.channel_id),
89 State {
90 pts: c.pts,
91 deadline,
92 },
93 )
94 }));
95
96 Self {
97 map,
98 date: state.date,
99 seq: state.seq,
100 possible_gaps: HashMap::new(),
101 getting_diff_for: HashSet::new(),
102 next_deadline: Some(Entry::AccountWide),
103 tmp_entries: HashSet::new(),
104 }
105 }
106
107 pub fn session_state(&self) -> UpdateState {
111 UpdateState {
112 pts: self
113 .map
114 .get(&Entry::AccountWide)
115 .map(|s| s.pts)
116 .unwrap_or(NO_PTS),
117 qts: self
118 .map
119 .get(&Entry::SecretChats)
120 .map(|s| s.pts)
121 .unwrap_or(NO_PTS),
122 date: self.date,
123 seq: self.seq,
124 channels: self
125 .map
126 .iter()
127 .filter_map(|(entry, s)| match entry {
128 Entry::Channel(id) => Some(
129 ChannelState {
130 channel_id: *id,
131 pts: s.pts,
132 }
133 .into(),
134 ),
135 _ => None,
136 })
137 .collect(),
138 }
139 }
140
141 pub fn is_empty(&self) -> bool {
143 self.map
144 .get(&Entry::AccountWide)
145 .map(|s| s.pts)
146 .unwrap_or(NO_PTS)
147 == NO_PTS
148 }
149
150 pub fn check_deadlines(&mut self) -> Instant {
155 let now = Instant::now();
156
157 if !self.getting_diff_for.is_empty() {
158 return now;
159 }
160
161 let deadline = next_updates_deadline();
162
163 let deadline =
165 if let Some(gap_deadline) = self.possible_gaps.values().map(|gap| gap.deadline).min() {
166 deadline.min(gap_deadline)
167 } else if let Some(state) = self.next_deadline.and_then(|entry| self.map.get(&entry)) {
168 deadline.min(state.deadline)
169 } else {
170 deadline
171 };
172
173 if now >= deadline {
174 self.getting_diff_for
176 .extend(self.possible_gaps.iter().filter_map(|(entry, gap)| {
177 if now >= gap.deadline {
178 info!("gap was not resolved after waiting for {:?}", entry);
179 Some(entry)
180 } else {
181 None
182 }
183 }));
184
185 self.getting_diff_for
186 .extend(self.map.iter().filter_map(|(entry, state)| {
187 if now >= state.deadline {
188 debug!("too much time has passed without updates for {:?}", entry);
189 Some(entry)
190 } else {
191 None
192 }
193 }));
194
195 let possible_gaps = &mut self.possible_gaps;
198 self.getting_diff_for.iter().for_each(|entry| {
199 possible_gaps.remove(entry);
200 });
201 }
202
203 deadline
204 }
205
206 fn reset_deadlines(&mut self, entries: &HashSet<Entry>, deadline: Instant) {
210 if entries.is_empty() {
211 return;
212 }
213 for entry in entries {
214 if let Some(state) = self.map.get_mut(entry) {
215 state.deadline = deadline;
216 debug!("reset deadline {:?} for {:?}", deadline, entry);
217 } else {
218 panic!("did not reset deadline for {entry:?} as it had no entry");
219 }
220 }
221
222 if self
223 .next_deadline
224 .as_ref()
225 .map(|next| entries.contains(next))
226 .unwrap_or(false)
227 {
228 self.next_deadline = Some(
230 self.map
231 .iter()
232 .min_by_key(|(_, state)| state.deadline)
233 .map(|i| *i.0)
234 .expect("deadline should exist"),
235 );
236 } else if self
237 .next_deadline
238 .map(|e| deadline < self.map[&e].deadline)
239 .unwrap_or(false)
240 {
241 self.next_deadline = Some(*entries.iter().next().unwrap());
245 }
246 }
247
248 fn reset_deadline(&mut self, entry: Entry, deadline: Instant) {
250 let mut entries = mem::take(&mut self.tmp_entries);
251 entries.insert(entry);
252 self.reset_deadlines(&entries, deadline);
253 entries.clear();
254 self.tmp_entries = entries;
255 }
256
257 fn reset_channel_deadline(&mut self, channel_id: i64, timeout: Option<i32>) {
259 self.reset_deadline(
260 Entry::Channel(channel_id),
261 Instant::now()
262 + timeout
263 .map(|t| Duration::from_secs(t as _))
264 .unwrap_or(defs::NO_UPDATES_TIMEOUT),
265 );
266 }
267
268 pub fn set_state(&mut self, state: tl::enums::updates::State) {
273 trace!("setting state {:?}", state);
274 let deadline = next_updates_deadline();
275 let state: tl::types::updates::State = state.into();
276 self.map.insert(
277 Entry::AccountWide,
278 State {
279 pts: state.pts,
280 deadline,
281 },
282 );
283 self.map.insert(
284 Entry::SecretChats,
285 State {
286 pts: state.qts,
287 deadline,
288 },
289 );
290 self.date = state.date;
291 self.seq = state.seq;
292 }
293
294 pub fn try_set_channel_state(&mut self, id: i64, pts: i32) {
298 trace!("trying to set channel state for {}: {}", id, pts);
299 self.map.entry(Entry::Channel(id)).or_insert_with(|| State {
300 pts,
301 deadline: next_updates_deadline(),
302 });
303 }
304
305 fn try_begin_get_diff(&mut self, entry: Entry) {
310 if !self.map.contains_key(&entry) {
311 if self.possible_gaps.contains_key(&entry) {
313 panic!(
314 "Should not have a possible_gap for an entry {entry:?} not in the state map"
315 );
316 }
317 return;
318 }
319
320 self.getting_diff_for.insert(entry);
321 self.possible_gaps.remove(&entry);
322 }
323
324 fn end_get_diff(&mut self, entry: Entry) {
328 if !self.getting_diff_for.remove(&entry) {
329 panic!("Called end_get_diff on an entry which was not getting diff for");
330 };
331 self.reset_deadline(entry, next_updates_deadline());
332 assert!(
333 !self.possible_gaps.contains_key(&entry),
334 "gaps shouldn't be created while getting difference"
335 );
336 }
337}
338
339impl MessageBox {
341 pub fn ensure_known_peer_hashes(
348 &mut self,
349 updates: &tl::enums::Updates,
350 chat_hashes: &mut ChatHashCache,
351 ) -> Result<(), Gap> {
352 if chat_hashes.extend_from_updates(updates) {
359 Ok(())
360 } else {
361 let can_recover = match updates {
365 tl::enums::Updates::TooLong => true,
366 tl::enums::Updates::UpdateShortMessage(_) => true,
367 tl::enums::Updates::UpdateShortChatMessage(_) => true,
368 tl::enums::Updates::UpdateShort(u) => PtsInfo::from_update(&u.update).is_some(),
369 tl::enums::Updates::Combined(_) => true,
370 tl::enums::Updates::Updates(_) => true,
371 tl::enums::Updates::UpdateShortSentMessage(_) => true,
372 };
373
374 if can_recover {
375 info!("received an update referencing an unknown peer, treating as gap");
376 self.try_begin_get_diff(Entry::AccountWide);
377 Err(Gap)
378 } else {
379 info!("received an update referencing an unknown peer, but cannot find out who");
380 Ok(())
381 }
382 }
383 }
384
385 pub fn process_updates(
398 &mut self,
399 updates: tl::enums::Updates,
400 chat_hashes: &ChatHashCache,
401 ) -> Result<defs::UpdateAndPeers, Gap> {
402 trace!("processing updates: {:?}", updates);
403 let tl::types::UpdatesCombined {
410 date,
411 seq_start,
412 seq,
413 mut updates,
414 users,
415 chats,
416 } = match adaptor::adapt(updates, chat_hashes) {
417 Ok(combined) => combined,
418 Err(Gap) => {
419 self.try_begin_get_diff(Entry::AccountWide);
420 return Err(Gap);
421 }
422 };
423
424 if seq_start != NO_SEQ {
427 match (self.seq + 1).cmp(&seq_start) {
428 Ordering::Equal => {}
430 Ordering::Greater => {
432 debug!(
433 "skipping updates that were already handled at seq = {}",
434 self.seq
435 );
436 return Ok((Vec::new(), users, chats));
437 }
438 Ordering::Less => {
439 debug!(
440 "gap detected (local seq {}, remote seq {})",
441 self.seq, seq_start
442 );
443 self.try_begin_get_diff(Entry::AccountWide);
444 return Err(Gap);
445 }
446 }
447 }
448
449 fn update_sort_key(update: &tl::enums::Update) -> i32 {
450 match PtsInfo::from_update(update) {
451 Some(pts) => pts.pts - pts.pts_count,
452 None => NO_PTS,
453 }
454 }
455
456 updates.sort_by_key(update_sort_key);
460
461 let mut result = Vec::with_capacity(updates.len() + self.possible_gaps.len());
463
464 let mut any_pts_applied = false;
471 let mut reset_deadlines_for = mem::take(&mut self.tmp_entries);
472 for update in updates {
473 let (entry, update) = self.apply_pts_info(update);
474 if let Some(entry) = entry {
475 reset_deadlines_for.insert(entry);
479 }
480 if let Some(update) = update {
481 result.push(update);
482 any_pts_applied |= entry.is_some();
483 }
484 }
485 self.reset_deadlines(&reset_deadlines_for, next_updates_deadline());
486 reset_deadlines_for.clear();
487 self.tmp_entries = reset_deadlines_for;
488
489 if any_pts_applied {
497 if date != NO_DATE {
498 self.date = date;
499 }
500 if seq != NO_SEQ {
501 self.seq = seq;
502 }
503 }
504
505 if !self.possible_gaps.is_empty() {
506 let keys = self.possible_gaps.keys().copied().collect::<Vec<_>>();
508 for key in keys {
509 self.possible_gaps
510 .get_mut(&key)
511 .unwrap()
512 .updates
513 .sort_by_key(update_sort_key);
514
515 for _ in 0..self.possible_gaps[&key].updates.len() {
516 let update = self.possible_gaps.get_mut(&key).unwrap().updates.remove(0);
517 if let (_, Some(update)) = self.apply_pts_info(update) {
520 result.push(update);
521 }
522 }
523 }
524
525 self.possible_gaps.retain(|_, v| !v.updates.is_empty());
527 if self.possible_gaps.is_empty() {
528 debug!("successfully resolved gap by waiting");
529 }
530 }
531
532 Ok((result, users, chats))
533 }
534
535 fn apply_pts_info(
541 &mut self,
542 update: tl::enums::Update,
543 ) -> (Option<Entry>, Option<tl::enums::Update>) {
544 if let tl::enums::Update::ChannelTooLong(u) = update {
545 self.try_begin_get_diff(Entry::Channel(u.channel_id));
546 return (None, None);
547 }
548
549 let pts = match PtsInfo::from_update(&update) {
550 Some(pts) => pts,
551 None => return (None, Some(update)),
553 };
554
555 if self.getting_diff_for.contains(&pts.entry) {
556 debug!(
557 "skipping update for {:?} (getting difference, count {:?}, remote {:?})",
558 pts.entry, pts.pts_count, pts.pts
559 );
560 return (Some(pts.entry), None);
563 }
564
565 if let Some(state) = self.map.get(&pts.entry) {
566 let local_pts = state.pts;
567 match (local_pts + pts.pts_count).cmp(&pts.pts) {
568 Ordering::Equal => {}
570 Ordering::Greater => {
572 debug!(
573 "skipping update for {:?} (local {:?}, count {:?}, remote {:?})",
574 pts.entry, local_pts, pts.pts_count, pts.pts
575 );
576 return (Some(pts.entry), None);
577 }
578 Ordering::Less => {
579 info!(
580 "gap on update for {:?} (local {:?}, count {:?}, remote {:?})",
581 pts.entry, local_pts, pts.pts_count, pts.pts
582 );
583 self.possible_gaps
585 .entry(pts.entry)
586 .or_insert_with(|| PossibleGap {
587 deadline: Instant::now() + POSSIBLE_GAP_TIMEOUT,
588 updates: Vec::new(),
589 })
590 .updates
591 .push(update);
592
593 return (Some(pts.entry), None);
594 }
595 }
596 }
597 self.map
601 .entry(pts.entry)
602 .or_insert_with(|| State {
603 pts: NO_PTS,
604 deadline: next_updates_deadline(),
605 })
606 .pts = pts.pts;
607
608 (Some(pts.entry), Some(update))
609 }
610}
611
612impl MessageBox {
614 pub fn get_difference(&mut self) -> Option<tl::functions::updates::GetDifference> {
616 for entry in [Entry::AccountWide, Entry::SecretChats] {
617 if self.getting_diff_for.contains(&entry) {
618 if !self.map.contains_key(&entry) {
619 panic!(
620 "Should not try to get difference for an entry {entry:?} without known state"
621 );
622 }
623
624 let gd = tl::functions::updates::GetDifference {
625 pts: self.map[&Entry::AccountWide].pts,
626 pts_limit: None,
627 pts_total_limit: None,
628 date: self.date,
629 qts: if self.map.contains_key(&Entry::SecretChats) {
630 self.map[&Entry::SecretChats].pts
631 } else {
632 NO_PTS
633 },
634 qts_limit: None,
635 };
636 trace!("requesting {:?}", gd);
637 return Some(gd);
638 }
639 }
640 None
641 }
642
643 pub fn apply_difference(
645 &mut self,
646 difference: tl::enums::updates::Difference,
647 chat_hashes: &mut ChatHashCache,
648 ) -> defs::UpdateAndPeers {
649 trace!("applying account difference: {:?}", difference);
650 let finish: bool;
651 let result = match difference {
652 tl::enums::updates::Difference::Empty(diff) => {
653 debug!(
654 "handling empty difference (date = {}, seq = {}); no longer getting diff",
655 diff.date, diff.seq
656 );
657 finish = true;
658 self.date = diff.date;
659 self.seq = diff.seq;
660 (Vec::new(), Vec::new(), Vec::new())
661 }
662 tl::enums::updates::Difference::Difference(diff) => {
663 let _ = chat_hashes.extend(&diff.users, &diff.chats);
665
666 debug!(
667 "handling full difference {:?}; no longer getting diff",
668 diff.state
669 );
670 finish = true;
671 self.apply_difference_type(diff, chat_hashes)
672 }
673 tl::enums::updates::Difference::Slice(tl::types::updates::DifferenceSlice {
674 new_messages,
675 new_encrypted_messages,
676 other_updates,
677 chats,
678 users,
679 intermediate_state: state,
680 }) => {
681 let _ = chat_hashes.extend(&users, &chats);
683
684 debug!("handling partial difference {:?}", state);
685 finish = false;
686 self.apply_difference_type(
687 tl::types::updates::Difference {
688 new_messages,
689 new_encrypted_messages,
690 other_updates,
691 chats,
692 users,
693 state,
694 },
695 chat_hashes,
696 )
697 }
698 tl::enums::updates::Difference::TooLong(diff) => {
699 debug!(
700 "handling too-long difference (pts = {}); no longer getting diff",
701 diff.pts
702 );
703 finish = true;
704 self.map.get_mut(&Entry::AccountWide).unwrap().pts = diff.pts;
706 (Vec::new(), Vec::new(), Vec::new())
707 }
708 };
709
710 if finish {
711 let account = self.getting_diff_for.contains(&Entry::AccountWide);
712 let secret = self.getting_diff_for.contains(&Entry::SecretChats);
713
714 if !account && !secret {
715 panic!("Should not be applying the difference when neither account or secret diff was active")
716 }
717
718 if account {
719 self.end_get_diff(Entry::AccountWide);
720 }
721 if secret {
722 self.end_get_diff(Entry::SecretChats);
723 }
724 }
725
726 result
727 }
728
729 fn apply_difference_type(
730 &mut self,
731 tl::types::updates::Difference {
732 new_messages,
733 new_encrypted_messages,
734 other_updates: updates,
735 chats,
736 users,
737 state: tl::enums::updates::State::State(state),
738 }: tl::types::updates::Difference,
739 chat_hashes: &mut ChatHashCache,
740 ) -> defs::UpdateAndPeers {
741 self.map.get_mut(&Entry::AccountWide).unwrap().pts = state.pts;
742 self.map
743 .entry(Entry::SecretChats)
744 .or_insert_with(|| State {
746 pts: NO_PTS,
747 deadline: next_updates_deadline(),
748 })
749 .pts = state.qts;
750 self.date = state.date;
751 self.seq = state.seq;
752
753 let us = tl::enums::Updates::Updates(tl::types::Updates {
756 updates,
757 users,
758 chats,
759 date: NO_DATE,
760 seq: NO_SEQ,
761 });
762
763 let (mut result_updates, users, chats) = self
766 .process_updates(us, chat_hashes)
767 .expect("gap is detected while applying difference");
768
769 result_updates.extend(
770 new_messages
771 .into_iter()
772 .map(|message| {
773 tl::types::UpdateNewMessage {
774 message,
775 pts: NO_PTS,
776 pts_count: 0,
777 }
778 .into()
779 })
780 .chain(new_encrypted_messages.into_iter().map(|message| {
781 tl::types::UpdateNewEncryptedMessage {
782 message,
783 qts: NO_PTS,
784 }
785 .into()
786 })),
787 );
788
789 (result_updates, users, chats)
790 }
791}
792
793impl MessageBox {
795 pub fn get_channel_difference(
797 &mut self,
798 chat_hashes: &ChatHashCache,
799 ) -> Option<tl::functions::updates::GetChannelDifference> {
800 let (entry, id) = self
801 .getting_diff_for
802 .iter()
803 .find_map(|&entry| match entry {
804 Entry::Channel(id) => Some((entry, id)),
805 _ => None,
806 })?;
807
808 if let Some(packed) = chat_hashes.get(id) {
809 let channel = tl::types::InputChannel {
810 channel_id: packed.id,
811 access_hash: packed
812 .access_hash
813 .expect("chat_hashes had chat without hash"),
814 }
815 .into();
816 if let Some(state) = self.map.get(&entry) {
817 let gd = tl::functions::updates::GetChannelDifference {
818 force: false,
819 channel,
820 filter: tl::enums::ChannelMessagesFilter::Empty,
821 pts: state.pts,
822 limit: if chat_hashes.is_self_bot() {
823 defs::BOT_CHANNEL_DIFF_LIMIT
824 } else {
825 defs::USER_CHANNEL_DIFF_LIMIT
826 },
827 };
828 trace!("requesting {:?}", gd);
829 Some(gd)
830 } else {
831 panic!(
832 "Should not try to get difference for an entry {entry:?} without known state"
833 );
834 }
835 } else {
836 warn!(
837 "cannot getChannelDifference for {} as we're missing its hash",
838 id
839 );
840 self.end_get_diff(entry);
841 self.map.remove(&entry);
844 None
845 }
846 }
847
848 pub fn apply_channel_difference(
850 &mut self,
851 request: tl::functions::updates::GetChannelDifference,
852 difference: tl::enums::updates::ChannelDifference,
853 chat_hashes: &mut ChatHashCache,
854 ) -> defs::UpdateAndPeers {
855 let channel_id = channel_id(&request).expect("request had wrong input channel");
856 trace!(
857 "applying channel difference for {}: {:?}",
858 channel_id,
859 difference
860 );
861 let entry = Entry::Channel(channel_id);
862
863 self.possible_gaps.remove(&entry);
864
865 match difference {
866 tl::enums::updates::ChannelDifference::Empty(diff) => {
867 assert!(diff.r#final);
868 debug!(
869 "handling empty channel {} difference (pts = {}); no longer getting diff",
870 channel_id, diff.pts
871 );
872 self.end_get_diff(entry);
873 self.map.get_mut(&entry).unwrap().pts = diff.pts;
874 (Vec::new(), Vec::new(), Vec::new())
875 }
876 tl::enums::updates::ChannelDifference::TooLong(diff) => {
877 let _ = chat_hashes.extend(&diff.users, &diff.chats);
879
880 assert!(diff.r#final);
881 info!(
882 "handling too long channel {} difference; no longer getting diff",
883 channel_id
884 );
885 match diff.dialog {
886 tl::enums::Dialog::Dialog(d) => {
887 self.map.get_mut(&entry).unwrap().pts = d.pts.expect(
888 "channelDifferenceTooLong dialog did not actually contain a pts",
889 );
890 }
891 tl::enums::Dialog::Folder(_) => {
892 panic!("received a folder on channelDifferenceTooLong")
893 }
894 }
895
896 self.reset_channel_deadline(channel_id, diff.timeout);
897 (Vec::new(), Vec::new(), Vec::new())
901 }
902 tl::enums::updates::ChannelDifference::Difference(
903 tl::types::updates::ChannelDifference {
904 r#final,
905 pts,
906 timeout,
907 new_messages,
908 other_updates: updates,
909 chats,
910 users,
911 },
912 ) => {
913 let _ = chat_hashes.extend(&users, &chats);
915
916 if r#final {
917 debug!(
918 "handling channel {} difference; no longer getting diff",
919 channel_id
920 );
921 self.end_get_diff(entry);
922 } else {
923 debug!("handling channel {} difference", channel_id);
924 }
925
926 self.map.get_mut(&entry).unwrap().pts = pts;
927 let us = tl::enums::Updates::Updates(tl::types::Updates {
928 updates,
929 users,
930 chats,
931 date: NO_DATE,
932 seq: NO_SEQ,
933 });
934 let (mut result_updates, users, chats) = self
935 .process_updates(us, chat_hashes)
936 .expect("gap is detected while applying channel difference");
937
938 result_updates.extend(new_messages.into_iter().map(|message| {
939 tl::types::UpdateNewChannelMessage {
940 message,
941 pts: NO_PTS,
942 pts_count: 0,
943 }
944 .into()
945 }));
946 self.reset_channel_deadline(channel_id, timeout);
947
948 (result_updates, users, chats)
949 }
950 }
951 }
952
953 pub fn end_channel_difference(
954 &mut self,
955 request: &tl::functions::updates::GetChannelDifference,
956 reason: PrematureEndReason,
957 ) {
958 if let Some(channel_id) = channel_id(request) {
959 trace!(
960 "ending channel difference for {} because {:?}",
961 channel_id,
962 reason
963 );
964 let entry = Entry::Channel(channel_id);
965 match reason {
966 PrematureEndReason::TemporaryServerIssues => {
967 self.possible_gaps.remove(&entry);
968 self.end_get_diff(entry);
969 }
970 PrematureEndReason::Banned => {
971 self.possible_gaps.remove(&entry);
972 self.end_get_diff(entry);
973 self.map.remove(&entry);
974 }
975 }
976 };
977 }
978}
979
980pub fn channel_id(request: &tl::functions::updates::GetChannelDifference) -> Option<i64> {
981 match request.channel {
982 InputChannel::Channel(ref c) => Some(c.channel_id),
983 InputChannel::FromMessage(ref c) => Some(c.channel_id),
984 InputChannel::Empty => None,
985 }
986}
987
988#[derive(Debug)]
989pub enum PrematureEndReason {
990 TemporaryServerIssues,
991 Banned,
992}