1mod adaptor;
36mod defs;
37#[cfg(test)]
38mod tests;
39
40use std::cmp::Ordering;
41use std::time::Duration;
42#[cfg(not(test))]
43use std::time::Instant;
44
45use defs::Key;
46pub use defs::{Gap, MessageBox, MessageBoxes, State, UpdatesLike};
47use defs::{LiveEntry, NO_DATE, NO_PTS, NO_SEQ, POSSIBLE_GAP_TIMEOUT, PossibleGap, PtsInfo};
48use grammers_tl_types as tl;
49use log::{debug, info, trace};
50#[cfg(test)]
51use tests::Instant;
52
53use crate::types::{ChannelState, UpdatesState};
54
55fn next_updates_deadline() -> Instant {
56 Instant::now() + defs::NO_UPDATES_TIMEOUT
57}
58
59impl MessageBox {
60 pub fn pts(&self) -> i32 {
61 match self {
62 MessageBox::Common { pts } => *pts,
63 MessageBox::Secondary { qts } => *qts,
64 MessageBox::Channel { pts, .. } => *pts,
65 }
66 }
67}
68
69impl From<PtsInfo> for MessageBox {
70 fn from(value: PtsInfo) -> Self {
71 match value.key {
72 Key::Common => Self::Common { pts: value.pts },
73 Key::Secondary => Self::Secondary { qts: value.pts },
74 Key::Channel(channel_id) => Self::Channel {
75 channel_id,
76 pts: value.pts,
77 },
78 }
79 }
80}
81
82impl LiveEntry {
83 fn effective_deadline(&self) -> Instant {
84 match &self.possible_gap {
85 Some(gap) => gap.deadline.min(self.deadline),
86 None => self.deadline,
87 }
88 }
89}
90
91#[allow(clippy::new_without_default)]
92impl MessageBoxes {
94 pub fn new() -> Self {
96 trace!("created new message box with no previous state");
97 Self {
98 entries: Vec::new(),
99 date: NO_DATE,
100 seq: NO_SEQ,
101 getting_diff_for: Vec::new(),
102 possible_gaps: Vec::new(),
103 next_deadline: next_updates_deadline(),
104 }
105 }
106
107 pub fn load(state: UpdatesState) -> Self {
109 trace!("created new message box with state: {:?}", state);
110 let mut entries = Vec::with_capacity(2 + state.channels.len());
111 let mut getting_diff_for = Vec::with_capacity(2 + state.channels.len());
112 let possible_gaps = Vec::with_capacity(2 + state.channels.len());
113 let deadline = next_updates_deadline();
114
115 if state.pts != NO_PTS {
116 entries.push(LiveEntry {
117 key: Key::Common,
118 pts: state.pts,
119 deadline,
120 possible_gap: None,
121 });
122 }
123 if state.qts != NO_PTS {
124 entries.push(LiveEntry {
125 key: Key::Secondary,
126 pts: state.qts,
127 deadline,
128 possible_gap: None,
129 });
130 }
131 entries.extend(state.channels.iter().map(|c| LiveEntry {
132 key: Key::Channel(c.id),
133 pts: c.pts,
134 deadline,
135 possible_gap: None,
136 }));
137 entries.sort_by_key(|entry| entry.key);
138
139 getting_diff_for.extend(entries.iter().map(|entry| entry.key));
140
141 Self {
142 entries,
143 date: state.date,
144 seq: state.seq,
145 getting_diff_for,
146 possible_gaps,
147 next_deadline: deadline,
148 }
149 }
150
151 fn entry(&self, key: Key) -> Option<&LiveEntry> {
152 self.entries
153 .binary_search_by_key(&key, |entry| entry.key)
154 .map(|i| &self.entries[i])
155 .ok()
156 }
157
158 fn update_entry(&mut self, key: Key, updater: impl FnOnce(&mut LiveEntry)) -> bool {
159 match self.entries.binary_search_by_key(&key, |entry| entry.key) {
160 Ok(i) => {
161 updater(&mut self.entries[i]);
162 true
163 }
164 Err(_) => false,
165 }
166 }
167
168 fn set_entry(&mut self, entry: LiveEntry) {
169 match self
170 .entries
171 .binary_search_by_key(&entry.key, |entry| entry.key)
172 {
173 Ok(i) => {
174 self.possible_gaps.retain(|&k| k != entry.key);
175 self.entries[i] = entry;
176 }
177 Err(i) => self.entries.insert(i, entry),
178 }
179 }
180
181 fn set_pts(&mut self, key: Key, pts: i32) {
182 if !self.update_entry(key, |entry| entry.pts = pts) {
183 self.set_entry(LiveEntry {
184 key: key,
185 pts: pts,
186 deadline: next_updates_deadline(),
187 possible_gap: None,
188 });
189 }
190 }
191
192 fn pop_entry(&mut self, key: Key) -> Option<LiveEntry> {
193 match self.entries.binary_search_by_key(&key, |entry| entry.key) {
194 Ok(i) => Some(self.entries.remove(i)),
195 Err(_) => None,
196 }
197 }
198
199 fn push_gap(&mut self, key: Key, gap: Option<tl::enums::Update>) -> bool {
200 let deadline = Instant::now() + POSSIBLE_GAP_TIMEOUT;
201 let has_gap = gap.is_some();
202 let exists = self.update_entry(key, |entry| {
203 let possible_gap = entry.possible_gap.take();
204
205 entry.possible_gap = gap.map(|update| match possible_gap {
206 Some(mut possible) => {
207 possible.updates.push(update);
208 possible
209 }
210 None => PossibleGap {
211 deadline,
212 updates: vec![update],
213 },
214 });
215 });
216 if exists {
217 if has_gap {
218 if !self.possible_gaps.contains(&key) {
219 self.possible_gaps.push(key);
220 self.next_deadline = self.next_deadline.min(deadline);
221 }
222 } else {
223 if let Some(i) = self.possible_gaps.iter().position(|&k| k == key) {
224 self.possible_gaps.remove(i);
225 }
226 }
227 }
228 exists
229 }
230
231 pub fn session_state(&self) -> UpdatesState {
235 UpdatesState {
236 pts: self.entry(Key::Common).map(|s| s.pts).unwrap_or(NO_PTS),
237 qts: self.entry(Key::Secondary).map(|s| s.pts).unwrap_or(NO_PTS),
238 date: self.date,
239 seq: self.seq,
240 channels: self
241 .entries
242 .iter()
243 .filter_map(|entry| match entry.key {
244 Key::Channel(id) => Some(ChannelState { id, pts: entry.pts }.into()),
245 _ => None,
246 })
247 .collect(),
248 }
249 }
250
251 pub fn is_empty(&self) -> bool {
253 self.entries.is_empty()
254 }
255
256 pub fn check_deadlines(&mut self) -> Instant {
261 let now = Instant::now();
262
263 if !self.getting_diff_for.is_empty() {
264 return self.next_deadline;
265 }
266
267 if now >= self.next_deadline {
268 self.getting_diff_for
269 .extend(self.entries.iter().filter_map(|entry| {
270 if now >= entry.effective_deadline() {
271 debug!("deadline for forcibly fetching updates met for {:?}", entry);
272 Some(entry.key)
273 } else {
274 None
275 }
276 }));
277
278 for i in 0..self.getting_diff_for.len() {
281 self.push_gap(self.getting_diff_for[i], None);
282 }
283
284 if self.getting_diff_for.is_empty() {
288 self.next_deadline = next_updates_deadline();
289 }
290 }
291
292 self.next_deadline
293 }
294
295 fn reset_deadline(&mut self, key: Key, deadline: Instant) {
301 let mut old_deadline = self.next_deadline;
302 self.update_entry(key, |entry| {
303 old_deadline = entry.deadline;
304 entry.deadline = deadline;
305 });
306 if self.next_deadline == old_deadline {
307 self.next_deadline = self
308 .entries
309 .iter()
310 .fold(deadline, |d, entry| d.min(entry.effective_deadline()));
311 }
312 }
313
314 fn reset_timeout(&mut self, key: Key, timeout: Option<i32>) {
315 self.reset_deadline(
316 key,
317 timeout
318 .map(|t| Instant::now() + Duration::from_secs(t as _))
319 .unwrap_or_else(next_updates_deadline),
320 );
321 }
322
323 pub fn set_state(&mut self, state: tl::enums::updates::State) {
330 trace!("setting state {:?}", state);
331 debug_assert!(self.is_empty());
332 let deadline = next_updates_deadline();
333 let state: tl::types::updates::State = state.into();
334 self.set_entry(LiveEntry {
335 key: Key::Common,
336 pts: state.pts,
337 deadline,
338 possible_gap: None,
339 });
340 self.set_entry(LiveEntry {
341 key: Key::Secondary,
342 pts: state.qts,
343 deadline,
344 possible_gap: None,
345 });
346 self.date = state.date;
347 self.seq = state.seq;
348 self.next_deadline = deadline;
349 }
350
351 pub fn try_set_channel_state(&mut self, id: i64, pts: i32) {
355 trace!("trying to set channel state for {}: {}", id, pts);
356 if self.entry(Key::Channel(id)).is_none() {
357 self.set_entry(LiveEntry {
358 key: Key::Channel(id),
359 pts: pts,
360 deadline: next_updates_deadline(),
361 possible_gap: None,
362 });
363 }
364 }
365
366 fn try_begin_get_diff(&mut self, key: Key) {
371 if self.push_gap(key, None) {
372 self.getting_diff_for.push(key);
373 }
374 }
375
376 fn try_end_get_diff(&mut self, key: Key) {
379 let i = match self.getting_diff_for.iter().position(|&k| k == key) {
380 Some(i) => i,
381 None => return,
382 };
383 self.getting_diff_for.remove(i);
384 self.reset_deadline(key, next_updates_deadline());
385
386 debug_assert!(
387 self.entry(key)
388 .is_none_or(|entry| entry.possible_gap.is_none()),
389 "gaps shouldn't be created while getting difference"
390 );
391 }
392}
393
394impl MessageBoxes {
396 pub fn process_updates(&mut self, updates: UpdatesLike) -> Result<defs::UpdateAndPeers, Gap> {
409 trace!("processing updates: {:?}", updates);
410 let deadline = next_updates_deadline();
411
412 let tl::types::UpdatesCombined {
419 date,
420 seq_start,
421 seq,
422 mut updates,
423 users,
424 chats,
425 } = match adaptor::adapt(updates) {
426 Ok(combined) => combined,
427 Err(Gap) => {
428 self.try_begin_get_diff(Key::Common);
429 return Err(Gap);
430 }
431 };
432
433 let new_date = if date == NO_DATE { self.date } else { date };
434 let new_seq = if seq == NO_SEQ { self.seq } else { seq };
435 let mk_state = |message_box| State {
436 date: new_date,
437 seq: new_seq,
438 message_box,
439 };
440
441 if seq_start != NO_SEQ {
444 match (self.seq + 1).cmp(&seq_start) {
445 Ordering::Equal => {}
447 Ordering::Greater => {
449 debug!(
450 "skipping updates that were already handled at seq = {}",
451 self.seq
452 );
453 return Ok((Vec::new(), users, chats));
454 }
455 Ordering::Less => {
456 debug!(
457 "gap detected (local seq {}, remote seq {})",
458 self.seq, seq_start
459 );
460 self.try_begin_get_diff(Key::Common);
461 return Err(Gap);
462 }
463 }
464 }
465
466 fn update_sort_key(update: &tl::enums::Update) -> i32 {
467 match PtsInfo::from_update(update) {
468 Some(info) => info.pts - info.count,
469 None => NO_PTS,
470 }
471 }
472
473 updates.sort_by_key(update_sort_key);
477
478 let mut result = Vec::with_capacity(updates.len() + self.possible_gaps.len());
479 let had_gaps = !self.possible_gaps.is_empty();
480
481 for update in updates {
488 let (key, update) = self.apply_pts_info(update);
489 if let Some(key) = key {
490 self.reset_deadline(key, deadline);
494 }
495 if let Some((update, message_box)) = update {
496 result.push((update, mk_state(message_box)));
497 }
498 }
499
500 if had_gaps {
501 for i in (0..self.possible_gaps.len()).rev() {
504 let key = self.possible_gaps[i];
505 let mut gap = None;
506 self.update_entry(key, |entry| {
507 gap = entry.possible_gap.take();
508 });
509 let mut gap = gap.unwrap();
510 gap.updates.sort_by_key(update_sort_key);
511
512 for update in gap.updates {
515 if let (_, Some((update, message_box))) = self.apply_pts_info(update) {
516 result.push((update, mk_state(message_box)));
517 }
518 }
519
520 if self
522 .entry(key)
523 .is_some_and(|entry| entry.possible_gap.is_none())
524 {
525 self.possible_gaps.swap_remove(i);
526 debug!("successfully resolved gap by waiting");
527 }
528 }
529 }
530
531 if !result.is_empty() && self.possible_gaps.is_empty() {
532 self.date = new_date;
535 self.seq = new_seq;
536 }
537
538 Ok((result, users, chats))
539 }
540
541 fn apply_pts_info(
547 &mut self,
548 update: tl::enums::Update,
549 ) -> (Option<Key>, Option<(tl::enums::Update, Option<MessageBox>)>) {
550 if let tl::enums::Update::ChannelTooLong(u) = update {
551 self.try_begin_get_diff(Key::Channel(u.channel_id));
552 return (None, None);
553 }
554
555 let info = match PtsInfo::from_update(&update) {
556 Some(info) => info,
557 None => return (None, Some((update, None))),
559 };
560
561 if self.getting_diff_for.contains(&info.key) {
562 debug!(
563 "skipping update for {:?} (getting difference, count {:?}, remote {:?})",
564 info.key, info.count, info.pts
565 );
566 return (Some(info.key), Some((update, Some(info.into()))));
569 }
570
571 if let Some(local_pts) = self.entry(info.key).map(|entry| entry.pts) {
572 match (local_pts + info.count).cmp(&info.pts) {
573 Ordering::Equal => {}
575 Ordering::Greater => {
577 debug!(
578 "skipping update for {:?} (local {:?}, count {:?}, remote {:?})",
579 info.key, local_pts, info.count, info.pts
580 );
581 return (Some(info.key), None);
582 }
583 Ordering::Less => {
584 info!(
585 "gap on update for {:?} (local {:?}, count {:?}, remote {:?})",
586 info.key, local_pts, info.count, info.pts
587 );
588 self.push_gap(info.key, Some(update));
590
591 return (Some(info.key), None);
592 }
593 }
594 }
595 self.set_pts(info.key, info.pts);
599 (Some(info.key), Some((update, Some(info.into()))))
600 }
601}
602
603impl MessageBoxes {
605 pub fn get_difference(&self) -> Option<tl::functions::updates::GetDifference> {
607 for entry in [Key::Common, Key::Secondary] {
608 if self.getting_diff_for.contains(&entry) {
609 let pts = self
610 .entry(Key::Common)
611 .map(|entry| entry.pts)
612 .expect("common entry to exist when getting difference for it");
613
614 let gd = tl::functions::updates::GetDifference {
615 pts,
616 pts_limit: None,
617 pts_total_limit: None,
618 date: self.date.max(1), qts: self
620 .entry(Key::Secondary)
621 .map(|entry| entry.pts)
622 .unwrap_or(NO_PTS),
623 qts_limit: None,
624 };
625 trace!("requesting {:?}", gd);
626 return Some(gd);
627 }
628 }
629 None
630 }
631
632 pub fn apply_difference(
634 &mut self,
635 difference: tl::enums::updates::Difference,
636 ) -> defs::UpdateAndPeers {
637 trace!("applying account difference: {:?}", difference);
638 debug_assert!(
639 self.getting_diff_for.contains(&Key::Common)
640 || self.getting_diff_for.contains(&Key::Secondary)
641 );
642 let finish: bool;
643 let result = match difference {
644 tl::enums::updates::Difference::Empty(diff) => {
645 debug!(
646 "handling empty difference (date = {}, seq = {}); no longer getting diff",
647 diff.date, diff.seq
648 );
649 finish = true;
650 self.date = diff.date;
651 self.seq = diff.seq;
652 (Vec::new(), Vec::new(), Vec::new())
653 }
654 tl::enums::updates::Difference::Difference(diff) => {
655 debug!(
656 "handling full difference {:?}; no longer getting diff",
657 diff.state
658 );
659 finish = true;
660 self.apply_difference_type(diff)
661 }
662 tl::enums::updates::Difference::Slice(tl::types::updates::DifferenceSlice {
663 new_messages,
664 new_encrypted_messages,
665 other_updates,
666 chats,
667 users,
668 intermediate_state: state,
669 }) => {
670 debug!("handling partial difference {:?}", state);
671 finish = false;
672 self.apply_difference_type(tl::types::updates::Difference {
673 new_messages,
674 new_encrypted_messages,
675 other_updates,
676 chats,
677 users,
678 state,
679 })
680 }
681 tl::enums::updates::Difference::TooLong(diff) => {
682 debug!(
683 "handling too-long difference (pts = {}); no longer getting diff",
684 diff.pts
685 );
686 finish = true;
687 self.set_pts(Key::Common, diff.pts);
689 (Vec::new(), Vec::new(), Vec::new())
690 }
691 };
692
693 if finish {
694 self.try_end_get_diff(Key::Common);
695 self.try_end_get_diff(Key::Secondary);
696 }
697
698 result
699 }
700
701 fn apply_difference_type(
702 &mut self,
703 tl::types::updates::Difference {
704 new_messages,
705 new_encrypted_messages,
706 other_updates: updates,
707 chats,
708 users,
709 state: tl::enums::updates::State::State(state),
710 }: tl::types::updates::Difference,
711 ) -> defs::UpdateAndPeers {
712 self.date = state.date;
713 self.seq = state.seq;
714 self.set_pts(Key::Common, state.pts);
715 self.set_pts(Key::Secondary, state.qts);
716 let mk_state = |message_box| State {
717 date: state.date,
718 seq: state.seq,
719 message_box: Some(message_box),
720 };
721
722 let us = UpdatesLike::Updates(tl::enums::Updates::Updates(tl::types::Updates {
725 updates,
726 users,
727 chats,
728 date: NO_DATE,
729 seq: NO_SEQ,
730 }));
731
732 let (mut result_updates, users, chats) = self
735 .process_updates(us)
736 .expect("gap is detected while applying difference");
737
738 result_updates.extend(
739 new_messages
740 .into_iter()
741 .map(|message| {
742 (
743 tl::types::UpdateNewMessage {
744 message,
745 pts: NO_PTS,
746 pts_count: 0,
747 }
748 .into(),
749 mk_state(MessageBox::Common { pts: state.pts }),
750 )
751 })
752 .chain(new_encrypted_messages.into_iter().map(|message| {
753 (
754 tl::types::UpdateNewEncryptedMessage {
755 message,
756 qts: NO_PTS,
757 }
758 .into(),
759 mk_state(MessageBox::Secondary { qts: state.qts }),
760 )
761 })),
762 );
763
764 (result_updates, users, chats)
765 }
766}
767
768impl MessageBoxes {
770 pub fn get_channel_difference(&self) -> Option<tl::functions::updates::GetChannelDifference> {
775 let (key, channel_id) = self.getting_diff_for.iter().find_map(|&key| match key {
776 Key::Channel(id) => Some((key, id)),
777 _ => None,
778 })?;
779
780 if let Some(pts) = self.entry(key).map(|entry| entry.pts) {
781 let gd = tl::functions::updates::GetChannelDifference {
782 force: false,
783 channel: tl::types::InputChannel {
784 channel_id,
785 access_hash: 0,
786 }
787 .into(),
788 filter: tl::enums::ChannelMessagesFilter::Empty,
789 pts,
790 limit: 0,
791 };
792 trace!("requesting {:?}", gd);
793 Some(gd)
794 } else {
795 panic!("Should not try to get difference for an entry {key:?} without known state");
796 }
797 }
798
799 pub fn apply_channel_difference(
801 &mut self,
802 difference: tl::enums::updates::ChannelDifference,
803 ) -> defs::UpdateAndPeers {
804 let (key, channel_id) = self
805 .getting_diff_for
806 .iter()
807 .find_map(|&key| match key {
808 Key::Channel(id) => Some((key, id)),
809 _ => None,
810 })
811 .expect("applying channel difference to have a channel in getting_diff_for");
812
813 trace!(
814 "applying channel difference for {}: {:?}",
815 channel_id, difference
816 );
817
818 self.push_gap(key, None);
819
820 let tl::types::updates::ChannelDifference {
821 r#final,
822 pts,
823 timeout,
824 new_messages,
825 other_updates: updates,
826 chats,
827 users,
828 } = adaptor::adapt_channel_difference(difference);
829
830 if r#final {
831 debug!(
832 "handling channel {} difference; no longer getting diff",
833 channel_id
834 );
835 self.try_end_get_diff(key);
836 } else {
837 debug!("handling channel {} difference", channel_id);
838 }
839
840 self.set_pts(key, pts);
841 let us = UpdatesLike::Updates(tl::enums::Updates::Updates(tl::types::Updates {
842 updates,
843 users,
844 chats,
845 date: NO_DATE,
846 seq: NO_SEQ,
847 }));
848 let (mut result_updates, users, chats) = self
849 .process_updates(us)
850 .expect("gap is detected while applying channel difference");
851
852 result_updates.extend(new_messages.into_iter().map(|message| {
853 (
854 tl::types::UpdateNewChannelMessage {
855 message,
856 pts: NO_PTS,
857 pts_count: 0,
858 }
859 .into(),
860 State {
861 date: self.date,
862 seq: self.seq,
863 message_box: Some(MessageBox::Channel { channel_id, pts }),
864 },
865 )
866 }));
867
868 self.reset_timeout(key, timeout);
869
870 (result_updates, users, chats)
871 }
872
873 pub fn end_channel_difference(&mut self, reason: PrematureEndReason) {
874 let (key, channel_id) = self
875 .getting_diff_for
876 .iter()
877 .find_map(|&key| match key {
878 Key::Channel(id) => Some((key, id)),
879 _ => None,
880 })
881 .expect("ending channel difference to have a channel in getting_diff_for");
882
883 trace!(
884 "ending channel difference for {} because {:?}",
885 channel_id, reason
886 );
887 match reason {
888 PrematureEndReason::TemporaryServerIssues => {
889 self.push_gap(key, None);
890 self.try_end_get_diff(key);
891 }
892 PrematureEndReason::Banned => {
893 self.push_gap(key, None);
894 self.try_end_get_diff(key);
895 self.pop_entry(key);
896 }
897 }
898 }
899}
900
901#[derive(Debug)]
903pub enum PrematureEndReason {
904 TemporaryServerIssues,
909 Banned,
915}