Skip to main content

layer_client/
pts.rs

1//! Update gap detection and recovery.
2//!
3//! Tracks `pts` / `qts` / `seq` / `date` plus per-channel pts, and
4//! fills gaps via `updates.getDifference` (global) and
5//! `updates.getChannelDifference` (per-channel).
6//!
7//! ## What "gap" means
8//! Telegram guarantees updates arrive in order within a pts counter.
9//! If `new_pts != local_pts + pts_count` there is a gap and we must
10//! ask the server for the missed updates before processing this one.
11
12use std::collections::{HashMap, HashSet};
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, RpcError, attach_client_to_update, update};
19
20/// How long to wait before declaring a pts jump a real gap (ms).
21const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
22
23/// Bots are allowed a much larger diff window (Telegram server-side limit).
24const CHANNEL_DIFF_LIMIT_BOT: i32 = 100_000;
25
26/// Buffers updates received during a possible-gap window so we don't fire
27/// getDifference on every slightly out-of-order update.
28#[derive(Default)]
29pub struct PossibleGapBuffer {
30    /// channel_id → (buffered_updates, window_start)
31    channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32    /// Global buffered updates (non-channel pts gaps)
33    global: Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Buffer a global update during a possible-gap window.
42    pub fn push_global(&mut self, upd: update::Update) {
43        let entry = self
44            .global
45            .get_or_insert_with(|| (Vec::new(), Instant::now()));
46        entry.0.push(upd);
47    }
48
49    /// Buffer a channel update during a possible-gap window.
50    pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
51        let entry = self
52            .channel
53            .entry(channel_id)
54            .or_insert_with(|| (Vec::new(), Instant::now()));
55        entry.0.push(upd);
56    }
57
58    /// True if the global possible-gap deadline has elapsed.
59    pub fn global_deadline_elapsed(&self) -> bool {
60        self.global
61            .as_ref()
62            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
63            .unwrap_or(false)
64    }
65
66    /// True if a channel's possible-gap deadline has elapsed.
67    pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
68        self.channel
69            .get(&channel_id)
70            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
71            .unwrap_or(false)
72    }
73
74    /// True if the global buffer has any pending updates.
75    pub fn has_global(&self) -> bool {
76        self.global.is_some()
77    }
78
79    /// True if a channel buffer has pending updates.
80    pub fn has_channel(&self, channel_id: i64) -> bool {
81        self.channel.contains_key(&channel_id)
82    }
83
84    /// Start the global deadline timer without buffering an update.
85    ///
86    /// Called when a gap is detected but the triggering update carries no
87    /// high-level `Update` value (e.g. `updateShortSentMessage` with `upd=None`).
88    /// Without this, `global` stays `None` → `global_deadline_elapsed()` always
89    /// returns `false` → `gap_tick` never fires getDifference for such gaps.
90    pub fn touch_global_timer(&mut self) {
91        self.global
92            .get_or_insert_with(|| (Vec::new(), Instant::now()));
93    }
94
95    /// Drain global buffered updates.
96    pub fn drain_global(&mut self) -> Vec<update::Update> {
97        self.global.take().map(|(v, _)| v).unwrap_or_default()
98    }
99
100    /// Drain channel buffered updates.
101    pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
102        self.channel
103            .remove(&channel_id)
104            .map(|(v, _)| v)
105            .unwrap_or_default()
106    }
107}
108
109// PtsState
110
111/// Full MTProto sequence-number state, including per-channel counters.
112///
113/// All fields are `pub` so that `connect()` can restore them from the
114/// persisted session without going through an artificial constructor.
115#[derive(Debug, Clone, Default)]
116pub struct PtsState {
117    /// Main pts counter (messages, non-channel updates).
118    pub pts: i32,
119    /// Secondary counter for secret-chat updates.
120    pub qts: i32,
121    /// Date of the last received update (Unix timestamp).
122    pub date: i32,
123    /// Combined-container sequence number.
124    pub seq: i32,
125    /// Per-channel pts counters.  `channel_id → pts`.
126    pub channel_pts: HashMap<i64, i32>,
127    /// How many times getChannelDifference has been called per channel.
128    /// tDesktop starts at limit=100, then raises to 1000 after the first
129    /// successful response.  We track call count to implement the same ramp-up.
130    pub channel_diff_calls: HashMap<i64, u32>,
131    /// Timestamp of last received update for deadline-based gap detection.
132    pub last_update_at: Option<Instant>,
133    /// Channels currently awaiting a getChannelDifference response.
134    /// If a channel is in this set, no new gap-fill task is spawned for it.
135    pub getting_diff_for: HashSet<i64>,
136    /// Guard against concurrent global getDifference calls.
137    /// Without this, two simultaneous gap detections both spawn get_difference(),
138    /// which double-processes updates and corrupts pts state.
139    pub getting_global_diff: bool,
140    /// When getting_global_diff was set to true.  Used by the stuck-diff watchdog
141    /// in check_update_deadline: if the flag has been set for >30 s the RPC is
142    /// assumed hung and the guard is reset so the next gap_tick can retry.
143    pub getting_global_diff_since: Option<Instant>,
144}
145
146impl PtsState {
147    pub fn from_server_state(s: &tl::types::updates::State) -> Self {
148        Self {
149            pts: s.pts,
150            qts: s.qts,
151            date: s.date,
152            seq: s.seq,
153            channel_pts: HashMap::new(),
154            channel_diff_calls: HashMap::new(),
155            last_update_at: Some(Instant::now()),
156            getting_diff_for: HashSet::new(),
157            getting_global_diff: false,
158            getting_global_diff_since: None,
159        }
160    }
161
162    /// Record that an update was received now (resets the deadline timer).
163    pub fn touch(&mut self) {
164        self.last_update_at = Some(Instant::now());
165    }
166
167    /// Returns true if no update has been received for > 15 minutes.
168    pub fn deadline_exceeded(&self) -> bool {
169        self.last_update_at
170            .as_ref()
171            .map(|t| t.elapsed().as_secs() > 15 * 60)
172            .unwrap_or(false)
173    }
174
175    /// Check whether `new_pts` is in order given `pts_count` new updates.
176    pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
177        let expected = self.pts + pts_count;
178        if new_pts == expected {
179            PtsCheckResult::Ok
180        } else if new_pts > expected {
181            PtsCheckResult::Gap {
182                expected,
183                got: new_pts,
184            }
185        } else {
186            PtsCheckResult::Duplicate
187        }
188    }
189
190    /// Check a qts value (secret chat updates).
191    pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
192        let expected = self.qts + qts_count;
193        if new_qts == expected {
194            PtsCheckResult::Ok
195        } else if new_qts > expected {
196            PtsCheckResult::Gap {
197                expected,
198                got: new_qts,
199            }
200        } else {
201            PtsCheckResult::Duplicate
202        }
203    }
204
205    /// Check top-level seq for UpdatesCombined containers.
206    pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
207        if self.seq == 0 {
208            return PtsCheckResult::Ok;
209        } // uninitialised: accept
210        let expected = self.seq + 1;
211        if seq_start == expected {
212            PtsCheckResult::Ok
213        } else if seq_start > expected {
214            PtsCheckResult::Gap {
215                expected,
216                got: seq_start,
217            }
218        } else {
219            PtsCheckResult::Duplicate
220        }
221    }
222
223    /// Check a per-channel pts value.
224    pub fn check_channel_pts(
225        &self,
226        channel_id: i64,
227        new_pts: i32,
228        pts_count: i32,
229    ) -> PtsCheckResult {
230        let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
231        if local == 0 {
232            return PtsCheckResult::Ok;
233        }
234        let expected = local + pts_count;
235        if new_pts == expected {
236            PtsCheckResult::Ok
237        } else if new_pts > expected {
238            PtsCheckResult::Gap {
239                expected,
240                got: new_pts,
241            }
242        } else {
243            PtsCheckResult::Duplicate
244        }
245    }
246
247    /// Advance the global pts.
248    pub fn advance(&mut self, new_pts: i32) {
249        if new_pts > self.pts {
250            self.pts = new_pts;
251        }
252        self.touch();
253    }
254
255    /// Advance the qts.
256    pub fn advance_qts(&mut self, new_qts: i32) {
257        if new_qts > self.qts {
258            self.qts = new_qts;
259        }
260        self.touch();
261    }
262
263    /// Advance seq.
264    pub fn advance_seq(&mut self, new_seq: i32) {
265        if new_seq > self.seq {
266            self.seq = new_seq;
267        }
268    }
269
270    /// Advance a per-channel pts.
271    pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
272        let entry = self.channel_pts.entry(channel_id).or_insert(0);
273        if new_pts > *entry {
274            *entry = new_pts;
275        }
276        self.touch();
277    }
278}
279
280#[derive(Debug, PartialEq, Eq)]
281pub enum PtsCheckResult {
282    Ok,
283    Gap { expected: i32, got: i32 },
284    Duplicate,
285}
286
287// Client methods
288
289impl Client {
290    // Global getDifference
291
292    /// Fetch and replay any updates missed since the persisted pts.
293    ///
294    /// loops on `Difference::Slice` (partial response) until the server
295    /// returns a final `Difference` or `Empty`
296    /// never dropping a partial batch.  Previous code returned after one slice,
297    /// silently losing all updates in subsequent slices.
298    pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
299        // Atomically claim the in-flight slot.
300        //
301        // TOCTOU fix: the old code set getting_global_diff=true inside get_difference
302        // but the external guard in check_and_fill_gap read the flag in a SEPARATE lock
303        // acquisition.  With multi-threaded Tokio, N tasks could all read false, all
304        // pass the external check, then all race into this function and all set the flag
305        // to true.  Each concurrent call resets pts_state to the server state it received;
306        // the last STALE write rolls pts back below where it actually is, which immediately
307        // triggers a new gap → another burst of concurrent getDifference calls → cascade.
308        //
309        // Fix: check-and-set inside a SINGLE lock acquisition.  Only the first caller
310        // proceeds; all others see true and return Ok(vec![]) immediately.
311        {
312            let mut s = self.inner.pts_state.lock().await;
313            if s.getting_global_diff {
314                return Ok(vec![]);
315            }
316            s.getting_global_diff = true;
317            s.getting_global_diff_since = Some(Instant::now());
318        }
319
320        // Drain the initial-gap buffer before the RPC.
321        //
322        // possible_gap.global contains updates buffered during the possible-gap window
323        // (before we decided to call getDiff).  The server response covers exactly
324        // these pts values → discard the snapshot on success.
325        // On RPC error, restore them so the next gap_tick can retry.
326        //
327        // Note: updates arriving DURING the RPC flight are now force-dispatched by
328        // check_and_fill_gap and never accumulate in possible_gap,
329        // so there is nothing extra to drain after the call returns.
330        let pre_diff = self.inner.possible_gap.lock().await.drain_global();
331
332        // Wrap the RPC in a hard 30-second timeout so a hung TCP connection
333        // (half-open socket, unresponsive DC) cannot hold getting_global_diff=true
334        // forever and freeze the bot indefinitely.
335        let result = tokio::time::timeout(
336            std::time::Duration::from_secs(30),
337            self.get_difference_inner(),
338        )
339        .await
340        .unwrap_or_else(|_| {
341            tracing::warn!("[layer] getDifference RPC timed out after 30 s: will retry");
342            Err(InvocationError::Io(std::io::Error::new(
343                std::io::ErrorKind::TimedOut,
344                "getDifference timed out",
345            )))
346        });
347
348        // Always clear the guard, even on error.
349        {
350            let mut s = self.inner.pts_state.lock().await;
351            s.getting_global_diff = false;
352            s.getting_global_diff_since = None;
353        }
354
355        match &result {
356            Ok(_) => {
357                // pre_diff is covered by the server response; discard it.
358                // (Flight-time updates are force-dispatched, not buffered into possible_gap.)
359            }
360            Err(_) => {
361                // Restore pre-existing items so the next gap_tick retry sees them.
362                let mut gap = self.inner.possible_gap.lock().await;
363                for u in pre_diff {
364                    gap.push_global(u);
365                }
366            }
367        }
368
369        result
370    }
371
372    async fn get_difference_inner(&self) -> Result<Vec<update::Update>, InvocationError> {
373        use layer_tl_types::{Cursor, Deserializable};
374
375        let mut all_updates: Vec<update::Update> = Vec::new();
376
377        // loop until the server sends a final (non-Slice) response.
378        loop {
379            let (pts, qts, date) = {
380                let s = self.inner.pts_state.lock().await;
381                (s.pts, s.qts, s.date)
382            };
383
384            if pts == 0 {
385                self.sync_pts_state().await?;
386                return Ok(all_updates);
387            }
388
389            tracing::debug!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
390
391            let req = tl::functions::updates::GetDifference {
392                pts,
393                pts_limit: None,
394                pts_total_limit: None,
395                date,
396                qts,
397                qts_limit: None,
398            };
399
400            let body = self.rpc_call_raw_pub(&req).await?;
401            let body = crate::maybe_gz_decompress(body)?;
402            let mut cur = Cursor::from_slice(&body);
403            let diff = tl::enums::updates::Difference::deserialize(&mut cur)?;
404
405            match diff {
406                tl::enums::updates::Difference::Empty(e) => {
407                    let mut s = self.inner.pts_state.lock().await;
408                    s.date = e.date;
409                    s.seq = e.seq;
410                    s.touch();
411                    tracing::debug!("[layer] getDifference: empty (seq={})", e.seq);
412                    return Ok(all_updates);
413                }
414
415                tl::enums::updates::Difference::Difference(d) => {
416                    tracing::debug!(
417                        "[layer] getDifference: {} messages, {} updates (final)",
418                        d.new_messages.len(),
419                        d.other_updates.len()
420                    );
421                    self.cache_users_slice_pub(&d.users).await;
422                    self.cache_chats_slice_pub(&d.chats).await;
423                    for msg in d.new_messages {
424                        all_updates.push(update::Update::NewMessage(
425                            update::IncomingMessage::from_raw(msg).with_client(self.clone()),
426                        ));
427                    }
428                    for upd in d.other_updates {
429                        all_updates.extend(update::from_single_update_pub(upd));
430                    }
431                    let tl::enums::updates::State::State(ns) = d.state;
432                    let saved_channel_pts = {
433                        let s = self.inner.pts_state.lock().await;
434                        s.channel_pts.clone()
435                    };
436                    let mut new_state = PtsState::from_server_state(&ns);
437                    // Preserve per-channel pts across the global reset.
438                    for (cid, cpts) in saved_channel_pts {
439                        new_state.channel_pts.entry(cid).or_insert(cpts);
440                    }
441                    // Preserve in-flight sets: we clear getting_global_diff ourselves.
442                    new_state.getting_global_diff = true; // will be cleared by caller
443                    {
444                        let mut s = self.inner.pts_state.lock().await;
445                        let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
446                        let since = s.getting_global_diff_since; // preserve watchdog timestamp
447                        *s = new_state;
448                        s.getting_diff_for = getting_diff_for;
449                        s.getting_global_diff_since = since;
450                    }
451                    // Final response: stop looping.
452                    return Ok(all_updates);
453                }
454
455                tl::enums::updates::Difference::Slice(d) => {
456                    // server has more data: apply intermediate_state and
457                    // continue looping.  Old code returned here, losing all updates
458                    // in subsequent slices.
459                    tracing::debug!(
460                        "[layer] getDifference slice: {} messages, {} updates: continuing",
461                        d.new_messages.len(),
462                        d.other_updates.len()
463                    );
464                    self.cache_users_slice_pub(&d.users).await;
465                    self.cache_chats_slice_pub(&d.chats).await;
466                    for msg in d.new_messages {
467                        all_updates.push(update::Update::NewMessage(
468                            update::IncomingMessage::from_raw(msg).with_client(self.clone()),
469                        ));
470                    }
471                    for upd in d.other_updates {
472                        all_updates.extend(update::from_single_update_pub(upd));
473                    }
474                    let tl::enums::updates::State::State(ns) = d.intermediate_state;
475                    let saved_channel_pts = {
476                        let s = self.inner.pts_state.lock().await;
477                        s.channel_pts.clone()
478                    };
479                    let mut new_state = PtsState::from_server_state(&ns);
480                    for (cid, cpts) in saved_channel_pts {
481                        new_state.channel_pts.entry(cid).or_insert(cpts);
482                    }
483                    new_state.getting_global_diff = true;
484                    {
485                        let mut s = self.inner.pts_state.lock().await;
486                        let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
487                        let since = s.getting_global_diff_since; // preserve watchdog timestamp
488                        *s = new_state;
489                        s.getting_diff_for = getting_diff_for;
490                        s.getting_global_diff_since = since;
491                    }
492                    // Loop: fetch the next slice.
493                    continue;
494                }
495
496                tl::enums::updates::Difference::TooLong(d) => {
497                    tracing::warn!("[layer] getDifference: TooLong (pts={}): re-syncing", d.pts);
498                    self.inner.pts_state.lock().await.pts = d.pts;
499                    self.sync_pts_state().await?;
500                    return Ok(all_updates);
501                }
502            }
503        }
504    }
505
506    // Per-channel getChannelDifference
507
508    /// Fetch missed updates for a single channel.
509    pub async fn get_channel_difference(
510        &self,
511        channel_id: i64,
512    ) -> Result<Vec<update::Update>, InvocationError> {
513        let local_pts = self
514            .inner
515            .pts_state
516            .lock()
517            .await
518            .channel_pts
519            .get(&channel_id)
520            .copied()
521            .unwrap_or(0);
522
523        let access_hash = self
524            .inner
525            .peer_cache
526            .read()
527            .await
528            .channels
529            .get(&channel_id)
530            .copied()
531            .unwrap_or(0);
532
533        // No access hash in cache → we can't call getChannelDifference.
534        // Attempting GetChannels with access_hash=0 also returns CHANNEL_INVALID,
535        // so skip the call entirely and let the caller handle it.
536        if access_hash == 0 {
537            tracing::debug!(
538                "[layer] channel {channel_id}: access_hash not cached, \
539                 cannot call getChannelDifference: caller will remove from tracking"
540            );
541            return Err(InvocationError::Rpc(RpcError {
542                code: 400,
543                name: "CHANNEL_INVALID".into(),
544                value: None,
545            }));
546        }
547
548        tracing::debug!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
549
550        let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
551            channel_id,
552            access_hash,
553        });
554
555        // tDesktop ramp-up: limit=100 on first call, 1000 on subsequent ones.
556        // Bots always use the server-side maximum (100_000).
557        let diff_limit = if self.inner.is_bot.load(std::sync::atomic::Ordering::Relaxed) {
558            CHANNEL_DIFF_LIMIT_BOT
559        } else {
560            let call_count = self
561                .inner
562                .pts_state
563                .lock()
564                .await
565                .channel_diff_calls
566                .get(&channel_id)
567                .copied()
568                .unwrap_or(0);
569            if call_count == 0 { 100 } else { 1000 }
570        };
571
572        let req = tl::functions::updates::GetChannelDifference {
573            force: false,
574            channel,
575            filter: tl::enums::ChannelMessagesFilter::Empty,
576            pts: local_pts.max(1),
577            limit: diff_limit,
578        };
579
580        let body = match self.rpc_call_raw_pub(&req).await {
581            Ok(b) => {
582                // Successful call bump the per-channel counter so next call uses 1000.
583                self.inner
584                    .pts_state
585                    .lock()
586                    .await
587                    .channel_diff_calls
588                    .entry(channel_id)
589                    .and_modify(|c| *c = c.saturating_add(1))
590                    .or_insert(1);
591                b
592            }
593            Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
594                // treat as empty diff: retry next gap
595                tracing::debug!("[layer] PERSISTENT_TIMESTAMP_OUTDATED: skipping diff");
596                return Ok(vec![]);
597            }
598            Err(e) => return Err(e),
599        };
600        let body = crate::maybe_gz_decompress(body)?;
601        let mut cur = Cursor::from_slice(&body);
602        let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
603
604        let mut updates = Vec::new();
605
606        match diff {
607            tl::enums::updates::ChannelDifference::Empty(e) => {
608                tracing::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
609                self.inner
610                    .pts_state
611                    .lock()
612                    .await
613                    .advance_channel(channel_id, e.pts);
614            }
615            tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
616                tracing::debug!(
617                    "[layer] getChannelDifference: {} messages, {} updates",
618                    d.new_messages.len(),
619                    d.other_updates.len()
620                );
621                self.cache_users_slice_pub(&d.users).await;
622                self.cache_chats_slice_pub(&d.chats).await;
623                for msg in d.new_messages {
624                    updates.push(update::Update::NewMessage(
625                        update::IncomingMessage::from_raw(msg).with_client(self.clone()),
626                    ));
627                }
628                for upd in d.other_updates {
629                    updates.extend(update::from_single_update_pub(upd));
630                }
631                self.inner
632                    .pts_state
633                    .lock()
634                    .await
635                    .advance_channel(channel_id, d.pts);
636            }
637            tl::enums::updates::ChannelDifference::TooLong(d) => {
638                tracing::warn!(
639                    "[layer] getChannelDifference TooLong: replaying messages, resetting pts"
640                );
641                self.cache_users_slice_pub(&d.users).await;
642                self.cache_chats_slice_pub(&d.chats).await;
643                for msg in d.messages {
644                    updates.push(update::Update::NewMessage(
645                        update::IncomingMessage::from_raw(msg).with_client(self.clone()),
646                    ));
647                }
648                self.inner
649                    .pts_state
650                    .lock()
651                    .await
652                    .advance_channel(channel_id, 0);
653            }
654        }
655
656        Ok(updates)
657    }
658
659    // Sync from server
660
661    pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
662        let body = self
663            .rpc_call_raw_pub(&tl::functions::updates::GetState {})
664            .await?;
665        let mut cur = Cursor::from_slice(&body);
666        let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
667        let mut state = self.inner.pts_state.lock().await;
668        state.pts = s.pts;
669        state.qts = s.qts;
670        state.date = s.date;
671        state.seq = s.seq;
672        state.touch();
673        tracing::debug!(
674            "[layer] pts synced: pts={}, qts={}, seq={}",
675            s.pts,
676            s.qts,
677            s.seq
678        );
679        Ok(())
680    }
681    /// Check global pts, buffer during possible-gap window, fetch diff if real gap.
682    ///
683    /// When a global getDifference is already in-flight (`getting_global_diff == true`),
684    /// updates are **force-dispatched** immediately without pts tracking.
685    /// This prevents the cascade freeze that buffering caused:
686    ///   1. getDiff runs; flight-buffered updates pile up in `possible_gap`.
687    ///   2. getDiff returns; `gap_tick` sees `has_global()=true` → another getDiff.
688    ///   3. Each getDiff spawns another → bot freezes under a burst of messages.
689    pub async fn check_and_fill_gap(
690        &self,
691        new_pts: i32,
692        pts_count: i32,
693        upd: Option<update::Update>,
694    ) -> Result<Vec<update::Update>, InvocationError> {
695        // getDiff in flight: force updates through without pts tracking.
696        //
697        // Force-dispatch: socket updates are sent through
698        // when getting_diff_for contains the key; no buffering, no pts check.
699        // Buffering caused a cascade of getDiff calls and a bot freeze under bursts.
700        // Force-dispatch means these may duplicate what getDiff returns (same pts
701        // range), which is acceptable: Telegram's spec explicitly states that socket
702        // updates received during getDiff "should also have been retrieved through
703        // getDifference". Application-layer deduplication by message_id handles doubles.
704        // pts is NOT advanced here; getDiff sets it authoritatively when it returns.
705        if self.inner.pts_state.lock().await.getting_global_diff {
706            tracing::debug!("[layer] global diff in flight: force-applying pts={new_pts}");
707            return Ok(upd.into_iter().collect());
708        }
709
710        let result = self
711            .inner
712            .pts_state
713            .lock()
714            .await
715            .check_pts(new_pts, pts_count);
716        match result {
717            PtsCheckResult::Ok => {
718                // Advance pts and dispatch only this update.
719                //
720                // Do NOT blindly drain possible_gap here.
721                //
722                // Old behaviour: drain all buffered updates and return them together
723                // with the Ok update.  This caused a second freeze:
724                //
725                //   1. pts=1021.  Burst arrives: 1024-1030 all gap → buffered.
726                //   2. Update 1022 arrives → Ok → drain dispatches 1024-1030.
727                //   3. pts advances only to 1022 (the Ok value), NOT to 1030.
728                //   4. Bot sends replies → updateShortSentMessage pts=1031 →
729                //      check_and_fill_gap: expected=1023, got=1031 → GAP.
730                //   5. Cascade getDiff → duplicates → flood-wait → freeze.
731                //
732                // Grammers avoids this by re-checking each buffered update in
733                // order and advancing pts for each one (process_updates inner loop
734                // over entry.possible_gap).  Layer's Update enum carries no pts
735                // metadata, so we cannot replicate that ordered sequence check here.
736                //
737                // Correct equivalent: leave possible_gap alone.  The buffered
738                // updates will be recovered by gap_tick → getDiff(new_pts), which
739                // drains possible_gap into pre_diff, lets the server fill the
740                // gap, and advances pts to the true server state; no stale pts,
741                // no secondary cascade, no duplicates.
742                self.inner.pts_state.lock().await.advance(new_pts);
743                Ok(upd.into_iter().collect())
744            }
745            PtsCheckResult::Gap { expected, got } => {
746                // Buffer the update; start the deadline timer regardless.
747                //
748                // Bug fix (touch_global_timer): when upd=None (e.g. the gap is
749                // triggered by an updateShortSentMessage RPC response), nothing was
750                // ever pushed to possible_gap.global, so global stayed None.
751                // global_deadline_elapsed() returned false forever, gap_tick never
752                // saw has_global()=true, and the gap was never resolved unless a
753                // subsequent user message arrived.  touch_global_timer() starts the
754                // 1-second deadline clock even without a buffered update.
755                {
756                    let mut gap = self.inner.possible_gap.lock().await;
757                    if let Some(u) = upd {
758                        gap.push_global(u);
759                    } else {
760                        gap.touch_global_timer();
761                    }
762                }
763                let deadline_elapsed = self
764                    .inner
765                    .possible_gap
766                    .lock()
767                    .await
768                    .global_deadline_elapsed();
769                if deadline_elapsed {
770                    tracing::warn!(
771                        "[layer] global pts gap: expected {expected}, got {got}: getDifference"
772                    );
773                    // get_difference() is now atomic (check-and-set) and drains the
774                    // possible_gap buffer internally on success, so callers must NOT
775                    // drain before calling or splice the old buffer onto the results.
776                    // Doing so caused every gap update to be dispatched twice, which
777                    // triggered FLOOD_WAIT, blocked the handler, and froze the bot.
778                    self.get_difference().await
779                } else {
780                    tracing::debug!(
781                        "[layer] global pts gap: expected {expected}, got {got}: buffering (possible gap)"
782                    );
783                    Ok(vec![])
784                }
785            }
786            PtsCheckResult::Duplicate => {
787                tracing::debug!("[layer] global pts duplicate, discarding");
788                Ok(vec![])
789            }
790        }
791    }
792
793    /// Check qts (secret chat updates) and fill gap if needed.
794    pub async fn check_and_fill_qts_gap(
795        &self,
796        new_qts: i32,
797        qts_count: i32,
798    ) -> Result<Vec<update::Update>, InvocationError> {
799        let result = self
800            .inner
801            .pts_state
802            .lock()
803            .await
804            .check_qts(new_qts, qts_count);
805        match result {
806            PtsCheckResult::Ok => {
807                self.inner.pts_state.lock().await.advance_qts(new_qts);
808                Ok(vec![])
809            }
810            PtsCheckResult::Gap { expected, got } => {
811                tracing::warn!("[layer] qts gap: expected {expected}, got {got}: getDifference");
812                self.get_difference().await
813            }
814            PtsCheckResult::Duplicate => Ok(vec![]),
815        }
816    }
817
818    /// Check top-level seq and fill gap if needed.
819    pub async fn check_and_fill_seq_gap(
820        &self,
821        new_seq: i32,
822        seq_start: i32,
823    ) -> Result<Vec<update::Update>, InvocationError> {
824        let result = self
825            .inner
826            .pts_state
827            .lock()
828            .await
829            .check_seq(new_seq, seq_start);
830        match result {
831            PtsCheckResult::Ok => {
832                self.inner.pts_state.lock().await.advance_seq(new_seq);
833                Ok(vec![])
834            }
835            PtsCheckResult::Gap { expected, got } => {
836                tracing::warn!("[layer] seq gap: expected {expected}, got {got}: getDifference");
837                self.get_difference().await
838            }
839            PtsCheckResult::Duplicate => Ok(vec![]),
840        }
841    }
842
843    /// Check a per-channel pts, fetch getChannelDifference if there is a gap.
844    pub async fn check_and_fill_channel_gap(
845        &self,
846        channel_id: i64,
847        new_pts: i32,
848        pts_count: i32,
849        upd: Option<update::Update>,
850    ) -> Result<Vec<update::Update>, InvocationError> {
851        // if a diff is already in flight for this channel, skip: prevents
852        // 1 gap from spawning N concurrent getChannelDifference tasks.
853        if self
854            .inner
855            .pts_state
856            .lock()
857            .await
858            .getting_diff_for
859            .contains(&channel_id)
860        {
861            tracing::debug!("[layer] channel {channel_id} diff already in flight, skipping");
862            if let Some(u) = upd {
863                self.inner
864                    .possible_gap
865                    .lock()
866                    .await
867                    .push_channel(channel_id, u);
868            }
869            return Ok(vec![]);
870        }
871
872        let result = self
873            .inner
874            .pts_state
875            .lock()
876            .await
877            .check_channel_pts(channel_id, new_pts, pts_count);
878        match result {
879            PtsCheckResult::Ok => {
880                let mut buffered = self
881                    .inner
882                    .possible_gap
883                    .lock()
884                    .await
885                    .drain_channel(channel_id);
886                self.inner
887                    .pts_state
888                    .lock()
889                    .await
890                    .advance_channel(channel_id, new_pts);
891                if let Some(u) = upd {
892                    buffered.push(u);
893                }
894                Ok(buffered)
895            }
896            PtsCheckResult::Gap { expected, got } => {
897                if let Some(u) = upd {
898                    self.inner
899                        .possible_gap
900                        .lock()
901                        .await
902                        .push_channel(channel_id, u);
903                }
904                let deadline_elapsed = self
905                    .inner
906                    .possible_gap
907                    .lock()
908                    .await
909                    .channel_deadline_elapsed(channel_id);
910                if deadline_elapsed {
911                    tracing::warn!(
912                        "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: getChannelDifference"
913                    );
914                    // mark this channel as having a diff in flight.
915                    self.inner
916                        .pts_state
917                        .lock()
918                        .await
919                        .getting_diff_for
920                        .insert(channel_id);
921                    let buffered = self
922                        .inner
923                        .possible_gap
924                        .lock()
925                        .await
926                        .drain_channel(channel_id);
927                    match self.get_channel_difference(channel_id).await {
928                        Ok(mut diff_updates) => {
929                            // diff complete, allow future gaps to be handled.
930                            self.inner
931                                .pts_state
932                                .lock()
933                                .await
934                                .getting_diff_for
935                                .remove(&channel_id);
936                            diff_updates.splice(0..0, buffered);
937                            Ok(diff_updates)
938                        }
939                        // Permanent access errors: remove the channel from pts tracking
940                        // entirely (. The next update for this
941                        // channel will have local=0 → PtsCheckResult::Ok, advancing pts
942                        // without any gap fill. This breaks the infinite gap→CHANNEL_INVALID
943                        // loop that happened when advance_channel kept the stale pts alive.
944                        //
945                        // Common causes:
946                        // - access_hash not in peer cache (update arrived via updateShort
947                        // which carries no chats list)
948                        // - bot was kicked / channel deleted
949                        Err(InvocationError::Rpc(ref e))
950                            if e.name == "CHANNEL_INVALID"
951                                || e.name == "CHANNEL_PRIVATE"
952                                || e.name == "CHANNEL_NOT_MODIFIED" =>
953                        {
954                            tracing::debug!(
955                                "[layer] channel {channel_id}: {}: removing from pts tracking \
956                                 (next update treated as first-seen, no gap fill)",
957                                e.name
958                            );
959                            {
960                                let mut s = self.inner.pts_state.lock().await;
961                                s.getting_diff_for.remove(&channel_id);
962                                s.channel_pts.remove(&channel_id); // ←  fix: delete, not advance
963                            }
964                            Ok(buffered)
965                        }
966                        Err(InvocationError::Deserialize(ref msg)) => {
967                            // Unrecognised constructor or parse failure: treat same as
968                            // CHANNEL_INVALID: remove from tracking so we don't loop.
969                            tracing::debug!(
970                                "[layer] channel {channel_id}: deserialize error ({msg}): \
971                                 removing from pts tracking"
972                            );
973                            {
974                                let mut s = self.inner.pts_state.lock().await;
975                                s.getting_diff_for.remove(&channel_id);
976                                s.channel_pts.remove(&channel_id);
977                            }
978                            Ok(buffered)
979                        }
980                        Err(e) => {
981                            // also clear on unexpected errors so we don't get stuck.
982                            self.inner
983                                .pts_state
984                                .lock()
985                                .await
986                                .getting_diff_for
987                                .remove(&channel_id);
988                            Err(e)
989                        }
990                    }
991                } else {
992                    tracing::debug!(
993                        "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: buffering"
994                    );
995                    Ok(vec![])
996                }
997            }
998            PtsCheckResult::Duplicate => {
999                tracing::debug!("[layer] channel {channel_id} pts duplicate, discarding");
1000                Ok(vec![])
1001            }
1002        }
1003    }
1004
1005    /// Called periodically (e.g. from keepalive) to fire getDifference
1006    /// if no update has been received for > 15 minutes.
1007    ///
1008    /// also drives per-entry possible-gap deadlines independently of
1009    /// incoming updates.  Previously the POSSIBLE_GAP_DEADLINE_MS window was
1010    /// only evaluated when a new incoming update called check_and_fill_gap
1011    /// meaning a quiet channel with a real gap would never fire getDifference
1012    /// until another update arrived.  This
1013    /// which scans all LiveEntry.effective_deadline() on every keepalive tick.
1014    pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
1015        // Stuck-diff watchdog: if getting_global_diff has been true for more than
1016        // 30 s the in-flight getDifference RPC is assumed hung (e.g. half-open TCP
1017        // that the OS keepalive hasn't killed yet).  Reset the guard so the next
1018        // gap_tick cycle can issue a fresh getDifference.  The 30-second timeout
1019        // in get_difference() will concurrently return an error and also clear the
1020        // flag; this watchdog is a belt-and-suspenders safety net for edge cases
1021        // where that timeout itself is somehow delayed.
1022        {
1023            let stuck = {
1024                let s = self.inner.pts_state.lock().await;
1025                s.getting_global_diff
1026                    && s.getting_global_diff_since
1027                        .map(|t| t.elapsed().as_secs() > 30)
1028                        .unwrap_or(false)
1029            };
1030            if stuck {
1031                tracing::warn!(
1032                    "[layer] getDifference in-flight for >30 s: \
1033                     resetting guard so gap_tick can retry"
1034                );
1035                let mut s = self.inner.pts_state.lock().await;
1036                s.getting_global_diff = false;
1037                s.getting_global_diff_since = None;
1038            }
1039        }
1040
1041        // existing 5-minute global timeout
1042        let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
1043        if exceeded {
1044            tracing::info!("[layer] update deadline exceeded: fetching getDifference");
1045            let updates = self.get_difference().await?;
1046            for u in updates {
1047                if self.inner.update_tx.try_send(u).is_err() {
1048                    tracing::warn!("[layer] update channel full: dropping diff update");
1049                }
1050            }
1051        }
1052
1053        // drive global possible-gap deadline
1054        // If the possible-gap window has expired but no new update has arrived
1055        // to trigger check_and_fill_gap, fire getDifference from here.
1056        {
1057            let gap_expired = self
1058                .inner
1059                .possible_gap
1060                .lock()
1061                .await
1062                .global_deadline_elapsed();
1063            // Note: get_difference() is now atomic (check-and-set), so the
1064            // `already` guard is advisory only; get_difference() will bail
1065            // safely if another call is already in flight.
1066            if gap_expired {
1067                tracing::debug!("[layer] B3 global possible-gap deadline expired: getDifference");
1068                // get_difference() snapshots and drains the pre-existing buffer at its
1069                // start (before the RPC), so updates that arrive DURING the RPC flight
1070                // remain in possible_gap for the next cycle.  Never drain here.
1071                match self.get_difference().await {
1072                    Ok(updates) => {
1073                        for u in updates {
1074                            if self.inner.update_tx.try_send(u).is_err() {
1075                                tracing::warn!("[layer] update channel full: dropping gap update");
1076                            }
1077                        }
1078                    }
1079                    Err(e) => {
1080                        tracing::warn!("[layer] B3 global gap diff failed: {e}");
1081                        return Err(e);
1082                    }
1083                }
1084            }
1085        }
1086
1087        // drive per-channel possible-gap deadlines
1088        // Collect expired channel IDs up-front to avoid holding the lock across awaits.
1089        let expired_channels: Vec<i64> = {
1090            let gap = self.inner.possible_gap.lock().await;
1091            gap.channel
1092                .keys()
1093                .copied()
1094                .filter(|&id| gap.channel_deadline_elapsed(id))
1095                .collect()
1096        };
1097        for channel_id in expired_channels {
1098            let already = self
1099                .inner
1100                .pts_state
1101                .lock()
1102                .await
1103                .getting_diff_for
1104                .contains(&channel_id);
1105            if already {
1106                continue;
1107            }
1108            tracing::debug!(
1109                "[layer] B3 channel {channel_id} possible-gap deadline expired: getChannelDifference"
1110            );
1111            // Mark in-flight before spawning so a racing incoming update can't
1112            // also spawn a diff for the same channel.
1113            self.inner
1114                .pts_state
1115                .lock()
1116                .await
1117                .getting_diff_for
1118                .insert(channel_id);
1119            let buffered = self
1120                .inner
1121                .possible_gap
1122                .lock()
1123                .await
1124                .drain_channel(channel_id);
1125            let c = self.clone();
1126            let utx = self.inner.update_tx.clone();
1127            tokio::spawn(async move {
1128                match c.get_channel_difference(channel_id).await {
1129                    Ok(mut updates) => {
1130                        c.inner
1131                            .pts_state
1132                            .lock()
1133                            .await
1134                            .getting_diff_for
1135                            .remove(&channel_id);
1136                        updates.splice(0..0, buffered);
1137                        for u in updates {
1138                            if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1139                                tracing::warn!(
1140                                    "[layer] update channel full: dropping ch gap update"
1141                                );
1142                            }
1143                        }
1144                    }
1145                    Err(e) => {
1146                        tracing::warn!("[layer] B3 channel {channel_id} gap diff failed: {e}");
1147                        c.inner
1148                            .pts_state
1149                            .lock()
1150                            .await
1151                            .getting_diff_for
1152                            .remove(&channel_id);
1153                    }
1154                }
1155            });
1156        }
1157
1158        Ok(())
1159    }
1160}