Skip to main content

layer_client/
pts.rs

1//! Update gap detection and recovery.
2//!
3//! Tracks `pts` / `qts` / `seq` / `date` plus **per-channel pts**, and
4//! fills gaps via `updates.getDifference` (global) and
5//! `updates.getChannelDifference` (per-channel, gap G-15).
6//!
7//! ## What "gap" means
8//! Telegram guarantees updates arrive in order within a pts counter.
9//! If `new_pts != local_pts + pts_count` there is a gap and we must
10//! ask the server for the missed updates before processing this one.
11
12use std::collections::HashMap;
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, update};
19
20// ─── PossibleGapBuffer (G-17) ─────────────────────────────────────────────────
21
22/// How long to wait before declaring a pts jump a real gap (ms).
23/// grammers uses a similar short window before triggering getDifference.
24const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
25
26/// Buffers updates received during a possible-gap window so we don't fire
27/// getDifference on every slightly out-of-order update.
28#[derive(Default)]
29pub struct PossibleGapBuffer {
30    /// channel_id → (buffered_updates, window_start)
31    channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32    /// Global buffered updates (non-channel pts gaps)
33    global:  Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37    pub fn new() -> Self { Self::default() }
38
39    /// Buffer a global update during a possible-gap window.
40    pub fn push_global(&mut self, upd: update::Update) {
41        let entry = self.global.get_or_insert_with(|| (Vec::new(), Instant::now()));
42        entry.0.push(upd);
43    }
44
45    /// Buffer a channel update during a possible-gap window.
46    pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
47        let entry = self.channel.entry(channel_id).or_insert_with(|| (Vec::new(), Instant::now()));
48        entry.0.push(upd);
49    }
50
51    /// True if the global possible-gap deadline has elapsed.
52    pub fn global_deadline_elapsed(&self) -> bool {
53        self.global.as_ref()
54            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
55            .unwrap_or(false)
56    }
57
58    /// True if a channel's possible-gap deadline has elapsed.
59    pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
60        self.channel.get(&channel_id)
61            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
62            .unwrap_or(false)
63    }
64
65    /// True if the global buffer has any pending updates.
66    pub fn has_global(&self) -> bool { self.global.is_some() }
67
68    /// True if a channel buffer has pending updates.
69    pub fn has_channel(&self, channel_id: i64) -> bool { self.channel.contains_key(&channel_id) }
70
71    /// Drain global buffered updates.
72    pub fn drain_global(&mut self) -> Vec<update::Update> {
73        self.global.take().map(|(v, _)| v).unwrap_or_default()
74    }
75
76    /// Drain channel buffered updates.
77    pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
78        self.channel.remove(&channel_id).map(|(v, _)| v).unwrap_or_default()
79    }
80}
81
82// ─── PtsState ─────────────────────────────────────────────────────────────────
83
84/// Full MTProto sequence-number state, including per-channel counters.
85///
86/// All fields are `pub` so that `connect()` can restore them from the
87/// persisted session without going through an artificial constructor.
88#[derive(Debug, Clone, Default)]
89pub struct PtsState {
90    /// Main pts counter (messages, non-channel updates).
91    pub pts:  i32,
92    /// G-18: Secondary counter for secret-chat updates.
93    pub qts:  i32,
94    /// Date of the last received update (Unix timestamp).
95    pub date: i32,
96    /// G-19: Combined-container sequence number.
97    pub seq:  i32,
98    /// Per-channel pts counters.  `channel_id → pts`.
99    pub channel_pts: HashMap<i64, i32>,
100    /// G-16: Timestamp of last received update for deadline-based gap detection.
101    pub last_update_at: Option<Instant>,
102}
103
104
105impl PtsState {
106    pub fn from_server_state(s: &tl::types::updates::State) -> Self {
107        Self {
108            pts: s.pts, qts: s.qts, date: s.date, seq: s.seq,
109            channel_pts: HashMap::new(),
110            last_update_at: Some(Instant::now()),
111        }
112    }
113
114    /// Record that an update was received now (resets the deadline timer).
115    pub fn touch(&mut self) {
116        self.last_update_at = Some(Instant::now());
117    }
118
119    /// G-16: Returns true if no update has been received for > 15 minutes.
120    pub fn deadline_exceeded(&self) -> bool {
121        self.last_update_at.as_ref()
122            .map(|t| t.elapsed().as_secs() > 15 * 60)
123            .unwrap_or(false)
124    }
125
126    /// Check whether `new_pts` is in order given `pts_count` new updates.
127    pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
128        let expected = self.pts + pts_count;
129        if new_pts == expected {
130            PtsCheckResult::Ok
131        } else if new_pts > expected {
132            PtsCheckResult::Gap { expected, got: new_pts }
133        } else {
134            PtsCheckResult::Duplicate
135        }
136    }
137
138    /// G-18: Check a qts value (secret chat updates).
139    pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
140        let expected = self.qts + qts_count;
141        if new_qts == expected {
142            PtsCheckResult::Ok
143        } else if new_qts > expected {
144            PtsCheckResult::Gap { expected, got: new_qts }
145        } else {
146            PtsCheckResult::Duplicate
147        }
148    }
149
150    /// G-19: Check top-level seq for UpdatesCombined containers.
151    pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
152        if self.seq == 0 { return PtsCheckResult::Ok; } // uninitialised — accept
153        let expected = self.seq + 1;
154        if seq_start == expected {
155            PtsCheckResult::Ok
156        } else if seq_start > expected {
157            PtsCheckResult::Gap { expected, got: seq_start }
158        } else {
159            PtsCheckResult::Duplicate
160        }
161    }
162
163    /// Check a per-channel pts value.
164    pub fn check_channel_pts(&self, channel_id: i64, new_pts: i32, pts_count: i32) -> PtsCheckResult {
165        let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
166        if local == 0 {
167            return PtsCheckResult::Ok;
168        }
169        let expected = local + pts_count;
170        if new_pts == expected {
171            PtsCheckResult::Ok
172        } else if new_pts > expected {
173            PtsCheckResult::Gap { expected, got: new_pts }
174        } else {
175            PtsCheckResult::Duplicate
176        }
177    }
178
179    /// Advance the global pts.
180    pub fn advance(&mut self, new_pts: i32) {
181        if new_pts > self.pts { self.pts = new_pts; }
182        self.touch();
183    }
184
185    /// Advance the qts (G-18).
186    pub fn advance_qts(&mut self, new_qts: i32) {
187        if new_qts > self.qts { self.qts = new_qts; }
188        self.touch();
189    }
190
191    /// Advance seq (G-19).
192    pub fn advance_seq(&mut self, new_seq: i32) {
193        if new_seq > self.seq { self.seq = new_seq; }
194    }
195
196    /// Advance a per-channel pts.
197    pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
198        let entry = self.channel_pts.entry(channel_id).or_insert(0);
199        if new_pts > *entry { *entry = new_pts; }
200        self.touch();
201    }
202}
203
204#[derive(Debug, PartialEq, Eq)]
205pub enum PtsCheckResult {
206    Ok,
207    Gap { expected: i32, got: i32 },
208    Duplicate,
209}
210
211// ─── Client methods ───────────────────────────────────────────────────────────
212
213impl Client {
214    // ── Global getDifference ──────────────────────────────────────────────
215
216    /// Fetch and replay any updates missed since the persisted pts.
217    pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
218        let (pts, qts, date) = {
219            let s = self.inner.pts_state.lock().await;
220            (s.pts, s.qts, s.date)
221        };
222
223        if pts == 0 {
224            self.sync_pts_state().await?;
225            return Ok(vec![]);
226        }
227
228        log::info!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
229
230        let req = tl::functions::updates::GetDifference {
231            pts,
232            pts_limit:       None,
233            pts_total_limit: None,
234            date,
235            qts,
236            qts_limit:       None,
237        };
238
239        let body    = self.rpc_call_raw_pub(&req).await?;
240        let mut cur = Cursor::from_slice(&body);
241        let diff    = tl::enums::updates::Difference::deserialize(&mut cur)?;
242
243        let mut updates = Vec::new();
244        match diff {
245            tl::enums::updates::Difference::Empty(e) => {
246                let mut s = self.inner.pts_state.lock().await;
247                s.date = e.date;
248                s.seq  = e.seq;
249                s.touch();
250                log::debug!("[layer] getDifference: empty (seq={})", e.seq);
251            }
252            tl::enums::updates::Difference::Difference(d) => {
253                log::info!("[layer] getDifference: {} messages, {} updates",
254                    d.new_messages.len(), d.other_updates.len());
255                self.cache_users_slice_pub(&d.users).await;
256                self.cache_chats_slice_pub(&d.chats).await;
257                for msg in d.new_messages {
258                    updates.push(update::Update::NewMessage(
259                        update::IncomingMessage::from_raw(msg)
260                    ));
261                }
262                for upd in d.other_updates {
263                    updates.extend(update::from_single_update_pub(upd));
264                }
265                let tl::enums::updates::State::State(ns) = d.state;
266                *self.inner.pts_state.lock().await = PtsState::from_server_state(&ns);
267            }
268            tl::enums::updates::Difference::Slice(d) => {
269                log::info!("[layer] getDifference slice: {} messages, {} updates",
270                    d.new_messages.len(), d.other_updates.len());
271                self.cache_users_slice_pub(&d.users).await;
272                self.cache_chats_slice_pub(&d.chats).await;
273                for msg in d.new_messages {
274                    updates.push(update::Update::NewMessage(
275                        update::IncomingMessage::from_raw(msg)
276                    ));
277                }
278                for upd in d.other_updates {
279                    updates.extend(update::from_single_update_pub(upd));
280                }
281                let tl::enums::updates::State::State(ns) = d.intermediate_state;
282                *self.inner.pts_state.lock().await = PtsState::from_server_state(&ns);
283            }
284            tl::enums::updates::Difference::TooLong(d) => {
285                log::warn!("[layer] getDifference: TooLong (pts={}) — re-syncing", d.pts);
286                self.inner.pts_state.lock().await.pts = d.pts;
287                self.sync_pts_state().await?;
288            }
289        }
290
291        Ok(updates)
292    }
293
294    // ── G-15: Per-channel getChannelDifference ────────────────────────────
295
296    /// Fetch missed updates for a single channel.
297    pub async fn get_channel_difference(
298        &self,
299        channel_id: i64,
300    ) -> Result<Vec<update::Update>, InvocationError> {
301        let local_pts = self.inner.pts_state.lock().await
302            .channel_pts.get(&channel_id).copied().unwrap_or(0);
303
304        let access_hash = self.inner.peer_cache.lock().await
305            .channels.get(&channel_id).copied().unwrap_or(0);
306
307        log::info!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
308
309        let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
310            channel_id,
311            access_hash,
312        });
313
314        let req = tl::functions::updates::GetChannelDifference {
315            force:   false,
316            channel,
317            filter:  tl::enums::ChannelMessagesFilter::Empty,
318            pts:     local_pts.max(1),
319            limit:   100,
320        };
321
322        let body = match self.rpc_call_raw_pub(&req).await {
323            Ok(b) => b,
324            Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
325                // G-20: treat as empty diff — retry next gap
326                log::debug!("[layer] G-20 PERSISTENT_TIMESTAMP_OUTDATED — skipping diff");
327                return Ok(vec![]);
328            }
329            Err(e) => return Err(e),
330        };
331        let mut cur = Cursor::from_slice(&body);
332        let diff    = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
333
334        let mut updates = Vec::new();
335
336        match diff {
337            tl::enums::updates::ChannelDifference::Empty(e) => {
338                log::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
339                self.inner.pts_state.lock().await.advance_channel(channel_id, e.pts);
340            }
341            tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
342                log::info!("[layer] getChannelDifference: {} messages, {} updates",
343                    d.new_messages.len(), d.other_updates.len());
344                self.cache_users_slice_pub(&d.users).await;
345                self.cache_chats_slice_pub(&d.chats).await;
346                for msg in d.new_messages {
347                    updates.push(update::Update::NewMessage(
348                        update::IncomingMessage::from_raw(msg)
349                    ));
350                }
351                for upd in d.other_updates {
352                    updates.extend(update::from_single_update_pub(upd));
353                }
354                self.inner.pts_state.lock().await.advance_channel(channel_id, d.pts);
355            }
356            tl::enums::updates::ChannelDifference::TooLong(d) => {
357                log::warn!("[layer] getChannelDifference TooLong — replaying messages, resetting pts");
358                self.cache_users_slice_pub(&d.users).await;
359                self.cache_chats_slice_pub(&d.chats).await;
360                for msg in d.messages {
361                    updates.push(update::Update::NewMessage(
362                        update::IncomingMessage::from_raw(msg)
363                    ));
364                }
365                self.inner.pts_state.lock().await.advance_channel(channel_id, 0);
366            }
367        }
368
369        Ok(updates)
370    }
371
372    // ── Sync from server ──────────────────────────────────────────────────
373
374    pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
375        let body    = self.rpc_call_raw_pub(&tl::functions::updates::GetState {}).await?;
376        let mut cur = Cursor::from_slice(&body);
377        let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
378        let mut state = self.inner.pts_state.lock().await;
379        state.pts  = s.pts;
380        state.qts  = s.qts;
381        state.date = s.date;
382        state.seq  = s.seq;
383        state.touch();
384        log::info!("[layer] pts synced: pts={}, qts={}, seq={}", s.pts, s.qts, s.seq);
385        Ok(())
386    }
387
388    // ── Gap-check helpers ─────────────────────────────────────────────────
389
390    /// G-17: Check global pts, buffer during possible-gap window, fetch diff if real gap.
391    pub async fn check_and_fill_gap(
392        &self,
393        new_pts:   i32,
394        pts_count: i32,
395        upd:       Option<update::Update>,
396    ) -> Result<Vec<update::Update>, InvocationError> {
397        let result = self.inner.pts_state.lock().await.check_pts(new_pts, pts_count);
398        match result {
399            PtsCheckResult::Ok => {
400                // Drain any buffered global updates now that we're in sync,
401                // then append the current update (which triggered the Ok).
402                let mut buffered = self.inner.possible_gap.lock().await.drain_global();
403                self.inner.pts_state.lock().await.advance(new_pts);
404                if let Some(u) = upd {
405                    buffered.push(u);
406                }
407                Ok(buffered)
408            }
409            PtsCheckResult::Gap { expected, got } => {
410                // Buffer the update first; only fetch getDifference after the
411                // deadline has elapsed (avoids spurious getDifference on every
412                // slightly out-of-order update).
413                if let Some(u) = upd {
414                    self.inner.possible_gap.lock().await.push_global(u);
415                }
416                let deadline_elapsed = self.inner.possible_gap.lock().await.global_deadline_elapsed();
417                if deadline_elapsed {
418                    log::warn!("[layer] global pts gap: expected {expected}, got {got} — getDifference");
419                    let buffered = self.inner.possible_gap.lock().await.drain_global();
420                    let mut diff_updates = self.get_difference().await?;
421                    // Prepend buffered updates so ordering is maintained.
422                    diff_updates.splice(0..0, buffered);
423                    Ok(diff_updates)
424                } else {
425                    log::debug!("[layer] global pts gap: expected {expected}, got {got} — buffering (possible gap)");
426                    Ok(vec![])
427                }
428            }
429            PtsCheckResult::Duplicate => {
430                log::debug!("[layer] global pts duplicate, discarding");
431                Ok(vec![])
432            }
433        }
434    }
435
436    /// G-18: Check qts (secret chat updates) and fill gap if needed.
437    pub async fn check_and_fill_qts_gap(
438        &self,
439        new_qts:   i32,
440        qts_count: i32,
441    ) -> Result<Vec<update::Update>, InvocationError> {
442        let result = self.inner.pts_state.lock().await.check_qts(new_qts, qts_count);
443        match result {
444            PtsCheckResult::Ok => {
445                self.inner.pts_state.lock().await.advance_qts(new_qts);
446                Ok(vec![])
447            }
448            PtsCheckResult::Gap { expected, got } => {
449                log::warn!("[layer] qts gap: expected {expected}, got {got} — getDifference");
450                self.get_difference().await
451            }
452            PtsCheckResult::Duplicate => Ok(vec![]),
453        }
454    }
455
456    /// G-19: Check top-level seq and fill gap if needed.
457    pub async fn check_and_fill_seq_gap(
458        &self,
459        new_seq:   i32,
460        seq_start: i32,
461    ) -> Result<Vec<update::Update>, InvocationError> {
462        let result = self.inner.pts_state.lock().await.check_seq(new_seq, seq_start);
463        match result {
464            PtsCheckResult::Ok => {
465                self.inner.pts_state.lock().await.advance_seq(new_seq);
466                Ok(vec![])
467            }
468            PtsCheckResult::Gap { expected, got } => {
469                log::warn!("[layer] seq gap: expected {expected}, got {got} — getDifference");
470                self.get_difference().await
471            }
472            PtsCheckResult::Duplicate => Ok(vec![]),
473        }
474    }
475
476    /// G-15: Check a per-channel pts, fetch getChannelDifference if there is a gap.
477    pub async fn check_and_fill_channel_gap(
478        &self,
479        channel_id: i64,
480        new_pts:    i32,
481        pts_count:  i32,
482        upd:        Option<update::Update>,
483    ) -> Result<Vec<update::Update>, InvocationError> {
484        let result = self.inner.pts_state.lock().await
485            .check_channel_pts(channel_id, new_pts, pts_count);
486        match result {
487            PtsCheckResult::Ok => {
488                let mut buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
489                self.inner.pts_state.lock().await.advance_channel(channel_id, new_pts);
490                if let Some(u) = upd {
491                    buffered.push(u);
492                }
493                Ok(buffered)
494            }
495            PtsCheckResult::Gap { expected, got } => {
496                if let Some(u) = upd {
497                    self.inner.possible_gap.lock().await.push_channel(channel_id, u);
498                }
499                let deadline_elapsed = self.inner.possible_gap.lock().await
500                    .channel_deadline_elapsed(channel_id);
501                if deadline_elapsed {
502                    log::warn!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — getChannelDifference");
503                    let buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
504                    let mut diff_updates = self.get_channel_difference(channel_id).await?;
505                    diff_updates.splice(0..0, buffered);
506                    Ok(diff_updates)
507                } else {
508                    log::debug!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — buffering");
509                    Ok(vec![])
510                }
511            }
512            PtsCheckResult::Duplicate => {
513                log::debug!("[layer] channel {channel_id} pts duplicate, discarding");
514                Ok(vec![])
515            }
516        }
517    }
518
519    /// G-16: Called periodically (e.g. from keepalive) to fire getDifference
520    /// if no update has been received for > 15 minutes.
521    pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
522        let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
523        if exceeded {
524            log::info!("[layer] G-16 update deadline exceeded — fetching getDifference");
525            let updates = self.get_difference().await?;
526            for u in updates { let _ = self.inner.update_tx.send(u); }
527        }
528        Ok(())
529    }
530}
531
532