mod adaptor;
mod defs;
#[cfg(test)]
mod tests;
use std::cmp::Ordering;
use std::time::Duration;
#[cfg(not(test))]
use std::time::Instant;
use defs::Key;
pub use defs::{Gap, MessageBox, MessageBoxes, State, UpdatesLike};
use defs::{LiveEntry, NO_DATE, NO_PTS, NO_SEQ, POSSIBLE_GAP_TIMEOUT, PossibleGap, PtsInfo};
use grammers_tl_types as tl;
use log::{debug, info, trace};
#[cfg(test)]
use tests::Instant;
use crate::types::{ChannelState, UpdatesState};
fn next_updates_deadline() -> Instant {
Instant::now() + defs::NO_UPDATES_TIMEOUT
}
impl MessageBox {
pub fn pts(&self) -> i32 {
match self {
MessageBox::Common { pts } => *pts,
MessageBox::Secondary { qts } => *qts,
MessageBox::Channel { pts, .. } => *pts,
}
}
}
impl From<PtsInfo> for MessageBox {
fn from(value: PtsInfo) -> Self {
match value.key {
Key::Common => Self::Common { pts: value.pts },
Key::Secondary => Self::Secondary { qts: value.pts },
Key::Channel(channel_id) => Self::Channel {
channel_id,
pts: value.pts,
},
}
}
}
impl LiveEntry {
fn effective_deadline(&self) -> Instant {
match &self.possible_gap {
Some(gap) => gap.deadline.min(self.deadline),
None => self.deadline,
}
}
}
#[allow(clippy::new_without_default)]
impl MessageBoxes {
pub fn new() -> Self {
trace!("created new message box with no previous state");
Self {
entries: Vec::new(),
date: NO_DATE,
seq: NO_SEQ,
getting_diff_for: Vec::new(),
possible_gaps: Vec::new(),
next_deadline: next_updates_deadline(),
}
}
pub fn load(state: UpdatesState) -> Self {
trace!("created new message box with state: {:?}", state);
let mut entries = Vec::with_capacity(2 + state.channels.len());
let mut getting_diff_for = Vec::with_capacity(2 + state.channels.len());
let possible_gaps = Vec::with_capacity(2 + state.channels.len());
let deadline = next_updates_deadline();
if state.pts != NO_PTS {
entries.push(LiveEntry {
key: Key::Common,
pts: state.pts,
deadline,
possible_gap: None,
});
}
if state.qts != NO_PTS {
entries.push(LiveEntry {
key: Key::Secondary,
pts: state.qts,
deadline,
possible_gap: None,
});
}
entries.extend(state.channels.iter().map(|c| LiveEntry {
key: Key::Channel(c.id),
pts: c.pts,
deadline,
possible_gap: None,
}));
entries.sort_by_key(|entry| entry.key);
getting_diff_for.extend(entries.iter().map(|entry| entry.key));
Self {
entries,
date: state.date,
seq: state.seq,
getting_diff_for,
possible_gaps,
next_deadline: deadline,
}
}
fn entry(&self, key: Key) -> Option<&LiveEntry> {
self.entries
.binary_search_by_key(&key, |entry| entry.key)
.map(|i| &self.entries[i])
.ok()
}
fn update_entry(&mut self, key: Key, updater: impl FnOnce(&mut LiveEntry)) -> bool {
match self.entries.binary_search_by_key(&key, |entry| entry.key) {
Ok(i) => {
updater(&mut self.entries[i]);
true
}
Err(_) => false,
}
}
fn set_entry(&mut self, entry: LiveEntry) {
match self
.entries
.binary_search_by_key(&entry.key, |entry| entry.key)
{
Ok(i) => {
self.possible_gaps.retain(|&k| k != entry.key);
self.entries[i] = entry;
}
Err(i) => self.entries.insert(i, entry),
}
}
fn set_pts(&mut self, key: Key, pts: i32) {
if !self.update_entry(key, |entry| entry.pts = pts) {
self.set_entry(LiveEntry {
key: key,
pts: pts,
deadline: next_updates_deadline(),
possible_gap: None,
});
}
}
fn pop_entry(&mut self, key: Key) -> Option<LiveEntry> {
match self.entries.binary_search_by_key(&key, |entry| entry.key) {
Ok(i) => Some(self.entries.remove(i)),
Err(_) => None,
}
}
fn push_gap(&mut self, key: Key, gap: Option<tl::enums::Update>) -> bool {
let deadline = Instant::now() + POSSIBLE_GAP_TIMEOUT;
let has_gap = gap.is_some();
let exists = self.update_entry(key, |entry| {
let possible_gap = entry.possible_gap.take();
entry.possible_gap = gap.map(|update| match possible_gap {
Some(mut possible) => {
possible.updates.push(update);
possible
}
None => PossibleGap {
deadline,
updates: vec![update],
},
});
});
if exists {
if has_gap {
if !self.possible_gaps.contains(&key) {
self.possible_gaps.push(key);
self.next_deadline = self.next_deadline.min(deadline);
}
} else {
if let Some(i) = self.possible_gaps.iter().position(|&k| k == key) {
self.possible_gaps.remove(i);
}
}
}
exists
}
pub fn session_state(&self) -> UpdatesState {
UpdatesState {
pts: self.entry(Key::Common).map(|s| s.pts).unwrap_or(NO_PTS),
qts: self.entry(Key::Secondary).map(|s| s.pts).unwrap_or(NO_PTS),
date: self.date,
seq: self.seq,
channels: self
.entries
.iter()
.filter_map(|entry| match entry.key {
Key::Channel(id) => Some(ChannelState { id, pts: entry.pts }.into()),
_ => None,
})
.collect(),
}
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn check_deadlines(&mut self) -> Instant {
let now = Instant::now();
if !self.getting_diff_for.is_empty() {
return self.next_deadline;
}
if now >= self.next_deadline {
self.getting_diff_for
.extend(self.entries.iter().filter_map(|entry| {
if now >= entry.effective_deadline() {
debug!("deadline for forcibly fetching updates met for {:?}", entry);
Some(entry.key)
} else {
None
}
}));
for i in 0..self.getting_diff_for.len() {
self.push_gap(self.getting_diff_for[i], None);
}
if self.getting_diff_for.is_empty() {
self.next_deadline = next_updates_deadline();
}
}
self.next_deadline
}
fn reset_deadline(&mut self, key: Key, deadline: Instant) {
let mut old_deadline = self.next_deadline;
self.update_entry(key, |entry| {
old_deadline = entry.deadline;
entry.deadline = deadline;
});
if self.next_deadline == old_deadline {
self.next_deadline = self
.entries
.iter()
.fold(deadline, |d, entry| d.min(entry.effective_deadline()));
}
}
fn reset_timeout(&mut self, key: Key, timeout: Option<i32>) {
self.reset_deadline(
key,
timeout
.map(|t| Instant::now() + Duration::from_secs(t as _))
.unwrap_or_else(next_updates_deadline),
);
}
pub fn set_state(&mut self, state: tl::enums::updates::State) {
trace!("setting state {:?}", state);
debug_assert!(self.is_empty());
let deadline = next_updates_deadline();
let state: tl::types::updates::State = state.into();
self.set_entry(LiveEntry {
key: Key::Common,
pts: state.pts,
deadline,
possible_gap: None,
});
self.set_entry(LiveEntry {
key: Key::Secondary,
pts: state.qts,
deadline,
possible_gap: None,
});
self.date = state.date;
self.seq = state.seq;
self.next_deadline = deadline;
}
pub fn try_set_channel_state(&mut self, id: i64, pts: i32) {
trace!("trying to set channel state for {}: {}", id, pts);
if self.entry(Key::Channel(id)).is_none() {
self.set_entry(LiveEntry {
key: Key::Channel(id),
pts: pts,
deadline: next_updates_deadline(),
possible_gap: None,
});
}
}
fn try_begin_get_diff(&mut self, key: Key) {
if self.push_gap(key, None) {
self.getting_diff_for.push(key);
}
}
fn try_end_get_diff(&mut self, key: Key) {
let i = match self.getting_diff_for.iter().position(|&k| k == key) {
Some(i) => i,
None => return,
};
self.getting_diff_for.remove(i);
self.reset_deadline(key, next_updates_deadline());
debug_assert!(
self.entry(key)
.is_none_or(|entry| entry.possible_gap.is_none()),
"gaps shouldn't be created while getting difference"
);
}
}
impl MessageBoxes {
pub fn process_updates(&mut self, updates: UpdatesLike) -> Result<defs::UpdateAndPeers, Gap> {
trace!("processing updates: {:?}", updates);
let deadline = next_updates_deadline();
let tl::types::UpdatesCombined {
date,
seq_start,
seq,
mut updates,
users,
chats,
} = match adaptor::adapt(updates) {
Ok(combined) => combined,
Err(Gap) => {
self.try_begin_get_diff(Key::Common);
return Err(Gap);
}
};
let new_date = if date == NO_DATE { self.date } else { date };
let new_seq = if seq == NO_SEQ { self.seq } else { seq };
let mk_state = |message_box| State {
date: new_date,
seq: new_seq,
message_box,
};
if seq_start != NO_SEQ {
match (self.seq + 1).cmp(&seq_start) {
Ordering::Equal => {}
Ordering::Greater => {
debug!(
"skipping updates that were already handled at seq = {}",
self.seq
);
return Ok((Vec::new(), users, chats));
}
Ordering::Less => {
debug!(
"gap detected (local seq {}, remote seq {})",
self.seq, seq_start
);
self.try_begin_get_diff(Key::Common);
return Err(Gap);
}
}
}
fn update_sort_key(update: &tl::enums::Update) -> i32 {
match PtsInfo::from_update(update) {
Some(info) => info.pts - info.count,
None => NO_PTS,
}
}
updates.sort_by_key(update_sort_key);
let mut result = Vec::with_capacity(updates.len() + self.possible_gaps.len());
let had_gaps = !self.possible_gaps.is_empty();
for update in updates {
let (key, update) = self.apply_pts_info(update);
if let Some(key) = key {
self.reset_deadline(key, deadline);
}
if let Some((update, message_box)) = update {
result.push((update, mk_state(message_box)));
}
}
if had_gaps {
for i in (0..self.possible_gaps.len()).rev() {
let key = self.possible_gaps[i];
let mut gap = None;
self.update_entry(key, |entry| {
gap = entry.possible_gap.take();
});
let mut gap = gap.unwrap();
gap.updates.sort_by_key(update_sort_key);
for update in gap.updates {
if let (_, Some((update, message_box))) = self.apply_pts_info(update) {
result.push((update, mk_state(message_box)));
}
}
if self
.entry(key)
.is_some_and(|entry| entry.possible_gap.is_none())
{
self.possible_gaps.swap_remove(i);
debug!("successfully resolved gap by waiting");
}
}
}
if !result.is_empty() && self.possible_gaps.is_empty() {
self.date = new_date;
self.seq = new_seq;
}
Ok((result, users, chats))
}
fn apply_pts_info(
&mut self,
update: tl::enums::Update,
) -> (Option<Key>, Option<(tl::enums::Update, Option<MessageBox>)>) {
if let tl::enums::Update::ChannelTooLong(u) = update {
self.try_begin_get_diff(Key::Channel(u.channel_id));
return (None, None);
}
let info = match PtsInfo::from_update(&update) {
Some(info) => info,
None => return (None, Some((update, None))),
};
if self.getting_diff_for.contains(&info.key) {
debug!(
"skipping update for {:?} (getting difference, count {:?}, remote {:?})",
info.key, info.count, info.pts
);
return (Some(info.key), Some((update, Some(info.into()))));
}
if let Some(local_pts) = self.entry(info.key).map(|entry| entry.pts) {
match (local_pts + info.count).cmp(&info.pts) {
Ordering::Equal => {}
Ordering::Greater => {
debug!(
"skipping update for {:?} (local {:?}, count {:?}, remote {:?})",
info.key, local_pts, info.count, info.pts
);
return (Some(info.key), None);
}
Ordering::Less => {
info!(
"gap on update for {:?} (local {:?}, count {:?}, remote {:?})",
info.key, local_pts, info.count, info.pts
);
self.push_gap(info.key, Some(update));
return (Some(info.key), None);
}
}
}
self.set_pts(info.key, info.pts);
(Some(info.key), Some((update, Some(info.into()))))
}
}
impl MessageBoxes {
pub fn get_difference(&self) -> Option<tl::functions::updates::GetDifference> {
for entry in [Key::Common, Key::Secondary] {
if self.getting_diff_for.contains(&entry) {
let pts = self
.entry(Key::Common)
.map(|entry| entry.pts)
.expect("common entry to exist when getting difference for it");
let gd = tl::functions::updates::GetDifference {
pts,
pts_limit: None,
pts_total_limit: None,
date: self.date.max(1), qts: self
.entry(Key::Secondary)
.map(|entry| entry.pts)
.unwrap_or(NO_PTS),
qts_limit: None,
};
trace!("requesting {:?}", gd);
return Some(gd);
}
}
None
}
pub fn apply_difference(
&mut self,
difference: tl::enums::updates::Difference,
) -> defs::UpdateAndPeers {
trace!("applying account difference: {:?}", difference);
debug_assert!(
self.getting_diff_for.contains(&Key::Common)
|| self.getting_diff_for.contains(&Key::Secondary)
);
let finish: bool;
let result = match difference {
tl::enums::updates::Difference::Empty(diff) => {
debug!(
"handling empty difference (date = {}, seq = {}); no longer getting diff",
diff.date, diff.seq
);
finish = true;
self.date = diff.date;
self.seq = diff.seq;
(Vec::new(), Vec::new(), Vec::new())
}
tl::enums::updates::Difference::Difference(diff) => {
debug!(
"handling full difference {:?}; no longer getting diff",
diff.state
);
finish = true;
self.apply_difference_type(diff)
}
tl::enums::updates::Difference::Slice(tl::types::updates::DifferenceSlice {
new_messages,
new_encrypted_messages,
other_updates,
chats,
users,
intermediate_state: state,
}) => {
debug!("handling partial difference {:?}", state);
finish = false;
self.apply_difference_type(tl::types::updates::Difference {
new_messages,
new_encrypted_messages,
other_updates,
chats,
users,
state,
})
}
tl::enums::updates::Difference::TooLong(diff) => {
debug!(
"handling too-long difference (pts = {}); no longer getting diff",
diff.pts
);
finish = true;
self.set_pts(Key::Common, diff.pts);
(Vec::new(), Vec::new(), Vec::new())
}
};
if finish {
self.try_end_get_diff(Key::Common);
self.try_end_get_diff(Key::Secondary);
}
result
}
fn apply_difference_type(
&mut self,
tl::types::updates::Difference {
new_messages,
new_encrypted_messages,
other_updates: updates,
chats,
users,
state: tl::enums::updates::State::State(state),
}: tl::types::updates::Difference,
) -> defs::UpdateAndPeers {
self.date = state.date;
self.seq = state.seq;
self.set_pts(Key::Common, state.pts);
self.set_pts(Key::Secondary, state.qts);
let mk_state = |message_box| State {
date: state.date,
seq: state.seq,
message_box: Some(message_box),
};
let us = UpdatesLike::Updates(tl::enums::Updates::Updates(tl::types::Updates {
updates,
users,
chats,
date: NO_DATE,
seq: NO_SEQ,
}));
let (mut result_updates, users, chats) = self
.process_updates(us)
.expect("gap is detected while applying difference");
result_updates.extend(
new_messages
.into_iter()
.map(|message| {
(
tl::types::UpdateNewMessage {
message,
pts: NO_PTS,
pts_count: 0,
}
.into(),
mk_state(MessageBox::Common { pts: state.pts }),
)
})
.chain(new_encrypted_messages.into_iter().map(|message| {
(
tl::types::UpdateNewEncryptedMessage {
message,
qts: NO_PTS,
}
.into(),
mk_state(MessageBox::Secondary { qts: state.qts }),
)
})),
);
(result_updates, users, chats)
}
}
impl MessageBoxes {
pub fn get_channel_difference(&self) -> Option<tl::functions::updates::GetChannelDifference> {
let (key, channel_id) = self.getting_diff_for.iter().find_map(|&key| match key {
Key::Channel(id) => Some((key, id)),
_ => None,
})?;
if let Some(pts) = self.entry(key).map(|entry| entry.pts) {
let gd = tl::functions::updates::GetChannelDifference {
force: false,
channel: tl::types::InputChannel {
channel_id,
access_hash: 0,
}
.into(),
filter: tl::enums::ChannelMessagesFilter::Empty,
pts,
limit: 0,
};
trace!("requesting {:?}", gd);
Some(gd)
} else {
panic!("Should not try to get difference for an entry {key:?} without known state");
}
}
pub fn apply_channel_difference(
&mut self,
difference: tl::enums::updates::ChannelDifference,
) -> defs::UpdateAndPeers {
let (key, channel_id) = self
.getting_diff_for
.iter()
.find_map(|&key| match key {
Key::Channel(id) => Some((key, id)),
_ => None,
})
.expect("applying channel difference to have a channel in getting_diff_for");
trace!(
"applying channel difference for {}: {:?}",
channel_id, difference
);
self.push_gap(key, None);
let tl::types::updates::ChannelDifference {
r#final,
pts,
timeout,
new_messages,
other_updates: updates,
chats,
users,
} = adaptor::adapt_channel_difference(difference);
if r#final {
debug!(
"handling channel {} difference; no longer getting diff",
channel_id
);
self.try_end_get_diff(key);
} else {
debug!("handling channel {} difference", channel_id);
}
self.set_pts(key, pts);
let us = UpdatesLike::Updates(tl::enums::Updates::Updates(tl::types::Updates {
updates,
users,
chats,
date: NO_DATE,
seq: NO_SEQ,
}));
let (mut result_updates, users, chats) = self
.process_updates(us)
.expect("gap is detected while applying channel difference");
result_updates.extend(new_messages.into_iter().map(|message| {
(
tl::types::UpdateNewChannelMessage {
message,
pts: NO_PTS,
pts_count: 0,
}
.into(),
State {
date: self.date,
seq: self.seq,
message_box: Some(MessageBox::Channel { channel_id, pts }),
},
)
}));
self.reset_timeout(key, timeout);
(result_updates, users, chats)
}
pub fn end_channel_difference(&mut self, reason: PrematureEndReason) {
let (key, channel_id) = self
.getting_diff_for
.iter()
.find_map(|&key| match key {
Key::Channel(id) => Some((key, id)),
_ => None,
})
.expect("ending channel difference to have a channel in getting_diff_for");
trace!(
"ending channel difference for {} because {:?}",
channel_id, reason
);
match reason {
PrematureEndReason::TemporaryServerIssues => {
self.push_gap(key, None);
self.try_end_get_diff(key);
}
PrematureEndReason::Banned => {
self.push_gap(key, None);
self.try_end_get_diff(key);
self.pop_entry(key);
}
}
}
}
#[derive(Debug)]
pub enum PrematureEndReason {
TemporaryServerIssues,
Banned,
}