Skip to main content

layer_client/
pts.rs

1//! Update gap detection and recovery.
2//!
3//! Tracks `pts` / `qts` / `seq` / `date` plus per-channel pts, and
4//! fills gaps via `updates.getDifference` (global) and
5//! `updates.getChannelDifference` (per-channel).
6//!
7//! ## What "gap" means
8//! Telegram guarantees updates arrive in order within a pts counter.
9//! If `new_pts != local_pts + pts_count` there is a gap and we must
10//! ask the server for the missed updates before processing this one.
11
12use std::collections::{HashMap, HashSet};
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, RpcError, attach_client_to_update, update};
19
20/// How long to wait before declaring a pts jump a real gap (ms).
21const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
22
23/// Bots are allowed a much larger diff window (Telegram server-side limit).
24const CHANNEL_DIFF_LIMIT_BOT: i32 = 100_000;
25
26/// Buffers updates received during a possible-gap window so we don't fire
27/// getDifference on every slightly out-of-order update.
28#[derive(Default)]
29pub struct PossibleGapBuffer {
30    /// channel_id → (buffered_updates, window_start)
31    channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32    /// Global buffered updates (non-channel pts gaps)
33    global: Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Buffer a global update during a possible-gap window.
42    pub fn push_global(&mut self, upd: update::Update) {
43        let entry = self
44            .global
45            .get_or_insert_with(|| (Vec::new(), Instant::now()));
46        entry.0.push(upd);
47    }
48
49    /// Buffer a channel update during a possible-gap window.
50    pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
51        let entry = self
52            .channel
53            .entry(channel_id)
54            .or_insert_with(|| (Vec::new(), Instant::now()));
55        entry.0.push(upd);
56    }
57
58    /// True if the global possible-gap deadline has elapsed.
59    pub fn global_deadline_elapsed(&self) -> bool {
60        self.global
61            .as_ref()
62            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
63            .unwrap_or(false)
64    }
65
66    /// True if a channel's possible-gap deadline has elapsed.
67    pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
68        self.channel
69            .get(&channel_id)
70            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
71            .unwrap_or(false)
72    }
73
74    /// True if the global buffer has any pending updates.
75    pub fn has_global(&self) -> bool {
76        self.global.is_some()
77    }
78
79    /// True if a channel buffer has pending updates.
80    pub fn has_channel(&self, channel_id: i64) -> bool {
81        self.channel.contains_key(&channel_id)
82    }
83
84    /// Start the global deadline timer without buffering an update.
85    ///
86    /// Called when a gap is detected but the triggering update carries no
87    /// high-level `Update` value (e.g. `updateShortSentMessage` with `upd=None`).
88    /// Without this, `global` stays `None` → `global_deadline_elapsed()` always
89    /// returns `false` → `gap_tick` never fires getDifference for such gaps.
90    pub fn touch_global_timer(&mut self) {
91        self.global
92            .get_or_insert_with(|| (Vec::new(), Instant::now()));
93    }
94
95    /// Drain global buffered updates.
96    pub fn drain_global(&mut self) -> Vec<update::Update> {
97        self.global.take().map(|(v, _)| v).unwrap_or_default()
98    }
99
100    /// Drain channel buffered updates.
101    pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
102        self.channel
103            .remove(&channel_id)
104            .map(|(v, _)| v)
105            .unwrap_or_default()
106    }
107}
108
109// PtsState
110
111/// Full MTProto sequence-number state, including per-channel counters.
112///
113/// All fields are `pub` so that `connect()` can restore them from the
114/// persisted session without going through an artificial constructor.
115#[derive(Debug, Clone, Default)]
116pub struct PtsState {
117    /// Main pts counter (messages, non-channel updates).
118    pub pts: i32,
119    /// Secondary counter for secret-chat updates.
120    pub qts: i32,
121    /// Date of the last received update (Unix timestamp).
122    pub date: i32,
123    /// Combined-container sequence number.
124    pub seq: i32,
125    /// Per-channel pts counters.  `channel_id → pts`.
126    pub channel_pts: HashMap<i64, i32>,
127    /// How many times getChannelDifference has been called per channel.
128    /// tDesktop starts at limit=100, then raises to 1000 after the first
129    /// successful response.  We track call count to implement the same ramp-up.
130    pub channel_diff_calls: HashMap<i64, u32>,
131    /// Timestamp of last received update for deadline-based gap detection.
132    pub last_update_at: Option<Instant>,
133    /// Channels currently awaiting a getChannelDifference response.
134    /// If a channel is in this set, no new gap-fill task is spawned for it.
135    pub getting_diff_for: HashSet<i64>,
136    /// Guard against concurrent global getDifference calls.
137    /// Without this, two simultaneous gap detections both spawn get_difference(),
138    /// which double-processes updates and corrupts pts state.
139    pub getting_global_diff: bool,
140    /// When getting_global_diff was set to true.  Used by the stuck-diff watchdog
141    /// in check_update_deadline: if the flag has been set for >30 s the RPC is
142    /// assumed hung and the guard is reset so the next gap_tick can retry.
143    pub getting_global_diff_since: Option<Instant>,
144}
145
146impl PtsState {
147    pub fn from_server_state(s: &tl::types::updates::State) -> Self {
148        Self {
149            pts: s.pts,
150            qts: s.qts,
151            date: s.date,
152            seq: s.seq,
153            channel_pts: HashMap::new(),
154            channel_diff_calls: HashMap::new(),
155            last_update_at: Some(Instant::now()),
156            getting_diff_for: HashSet::new(),
157            getting_global_diff: false,
158            getting_global_diff_since: None,
159        }
160    }
161
162    /// Record that an update was received now (resets the deadline timer).
163    pub fn touch(&mut self) {
164        self.last_update_at = Some(Instant::now());
165    }
166
167    /// Returns true if no update has been received for > 15 minutes.
168    pub fn deadline_exceeded(&self) -> bool {
169        self.last_update_at
170            .as_ref()
171            .map(|t| t.elapsed().as_secs() > 15 * 60)
172            .unwrap_or(false)
173    }
174
175    /// Check whether `new_pts` is in order given `pts_count` new updates.
176    pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
177        let expected = self.pts + pts_count;
178        if new_pts == expected {
179            PtsCheckResult::Ok
180        } else if new_pts > expected {
181            PtsCheckResult::Gap {
182                expected,
183                got: new_pts,
184            }
185        } else {
186            PtsCheckResult::Duplicate
187        }
188    }
189
190    /// Check a qts value (secret chat updates).
191    pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
192        let expected = self.qts + qts_count;
193        if new_qts == expected {
194            PtsCheckResult::Ok
195        } else if new_qts > expected {
196            PtsCheckResult::Gap {
197                expected,
198                got: new_qts,
199            }
200        } else {
201            PtsCheckResult::Duplicate
202        }
203    }
204
205    /// Check top-level seq for UpdatesCombined containers.
206    pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
207        if self.seq == 0 {
208            return PtsCheckResult::Ok;
209        } // uninitialised: accept
210        let expected = self.seq + 1;
211        if seq_start == expected {
212            PtsCheckResult::Ok
213        } else if seq_start > expected {
214            PtsCheckResult::Gap {
215                expected,
216                got: seq_start,
217            }
218        } else {
219            PtsCheckResult::Duplicate
220        }
221    }
222
223    /// Check a per-channel pts value.
224    pub fn check_channel_pts(
225        &self,
226        channel_id: i64,
227        new_pts: i32,
228        pts_count: i32,
229    ) -> PtsCheckResult {
230        let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
231        if local == 0 {
232            return PtsCheckResult::Ok;
233        }
234        let expected = local + pts_count;
235        if new_pts == expected {
236            PtsCheckResult::Ok
237        } else if new_pts > expected {
238            PtsCheckResult::Gap {
239                expected,
240                got: new_pts,
241            }
242        } else {
243            PtsCheckResult::Duplicate
244        }
245    }
246
247    /// Advance the global pts.
248    pub fn advance(&mut self, new_pts: i32) {
249        if new_pts > self.pts {
250            self.pts = new_pts;
251        }
252        self.touch();
253    }
254
255    /// Advance the qts.
256    pub fn advance_qts(&mut self, new_qts: i32) {
257        if new_qts > self.qts {
258            self.qts = new_qts;
259        }
260        self.touch();
261    }
262
263    /// Advance seq.
264    pub fn advance_seq(&mut self, new_seq: i32) {
265        if new_seq > self.seq {
266            self.seq = new_seq;
267        }
268    }
269
270    /// Advance a per-channel pts.
271    pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
272        let entry = self.channel_pts.entry(channel_id).or_insert(0);
273        if new_pts > *entry {
274            *entry = new_pts;
275        }
276        self.touch();
277    }
278}
279
280#[derive(Debug, PartialEq, Eq)]
281pub enum PtsCheckResult {
282    Ok,
283    Gap { expected: i32, got: i32 },
284    Duplicate,
285}
286
287// Client methods
288
289impl Client {
290    // Global getDifference
291
292    /// Fetch and replay any updates missed since the persisted pts.
293    ///
294    /// loops on `Difference::Slice` (partial response) until the server
295    /// returns a final `Difference` or `Empty`
296    /// never dropping a partial batch.  Previous code returned after one slice,
297    /// silently losing all updates in subsequent slices.
298    pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
299        // Atomically claim the in-flight slot.
300        //
301        // TOCTOU fix: the old code set getting_global_diff=true inside get_difference
302        // but the external guard in check_and_fill_gap read the flag in a SEPARATE lock
303        // acquisition.  With multi-threaded Tokio, N tasks could all read false, all
304        // pass the external check, then all race into this function and all set the flag
305        // to true.  Each concurrent call resets pts_state to the server state it received;
306        // the last STALE write rolls pts back below where it actually is, which immediately
307        // triggers a new gap → another burst of concurrent getDifference calls → cascade.
308        //
309        // Fix: check-and-set inside a SINGLE lock acquisition.  Only the first caller
310        // proceeds; all others see true and return Ok(vec![]) immediately.
311        {
312            let mut s = self.inner.pts_state.lock().await;
313            if s.getting_global_diff {
314                return Ok(vec![]);
315            }
316            s.getting_global_diff = true;
317            s.getting_global_diff_since = Some(Instant::now());
318        }
319
320        // Drain the initial-gap buffer before the RPC.
321        //
322        // possible_gap.global contains updates buffered during the possible-gap window
323        // (before we decided to call getDiff).  The server response covers exactly
324        // these pts values → discard the snapshot on success.
325        // On RPC error, restore them so the next gap_tick can retry.
326        //
327        // Note: updates arriving DURING the RPC flight are now force-dispatched by
328        // check_and_fill_gap and never accumulate in possible_gap,
329        // so there is nothing extra to drain after the call returns.
330        let pre_diff = self.inner.possible_gap.lock().await.drain_global();
331
332        // Wrap the RPC in a hard 30-second timeout so a hung TCP connection
333        // (half-open socket, unresponsive DC) cannot hold getting_global_diff=true
334        // forever and freeze the bot indefinitely.
335        let result = tokio::time::timeout(
336            std::time::Duration::from_secs(30),
337            self.get_difference_inner(),
338        )
339        .await
340        .unwrap_or_else(|_| {
341            tracing::warn!("[layer] getDifference RPC timed out after 30 s: will retry");
342            Err(InvocationError::Io(std::io::Error::new(
343                std::io::ErrorKind::TimedOut,
344                "getDifference timed out",
345            )))
346        });
347
348        // Always clear the guard, even on error.
349        {
350            let mut s = self.inner.pts_state.lock().await;
351            s.getting_global_diff = false;
352            s.getting_global_diff_since = None;
353        }
354
355        match &result {
356            Ok(_) => {
357                // pre_diff is covered by the server response; discard it.
358                // (Flight-time updates are force-dispatched, not buffered into possible_gap.)
359            }
360            Err(e) => {
361                let mut gap = self.inner.possible_gap.lock().await;
362                if matches!(e, InvocationError::Rpc(r) if r.code == 401) {
363                    // 401: do not restore the gap buffer. push_global inserts a
364                    // fresh Instant::now() timestamp, making global_deadline_elapsed()
365                    // return true immediately on the next gap_tick cycle, which fires
366                    // get_difference() again and loops. Drop it; reconnect will
367                    // re-sync state via sync_pts_state / getDifference.
368                    drop(pre_diff);
369                } else {
370                    // Transient error: restore so the next gap_tick can retry.
371                    for u in pre_diff {
372                        gap.push_global(u);
373                    }
374                }
375            }
376        }
377
378        result
379    }
380
381    async fn get_difference_inner(&self) -> Result<Vec<update::Update>, InvocationError> {
382        use layer_tl_types::{Cursor, Deserializable};
383
384        let mut all_updates: Vec<update::Update> = Vec::new();
385
386        // loop until the server sends a final (non-Slice) response.
387        loop {
388            let (pts, qts, date) = {
389                let s = self.inner.pts_state.lock().await;
390                (s.pts, s.qts, s.date)
391            };
392
393            if pts == 0 {
394                self.sync_pts_state().await?;
395                return Ok(all_updates);
396            }
397
398            tracing::debug!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
399
400            let req = tl::functions::updates::GetDifference {
401                pts,
402                pts_limit: None,
403                pts_total_limit: None,
404                date,
405                qts,
406                qts_limit: None,
407            };
408
409            let body = self.rpc_call_raw_pub(&req).await?;
410            let body = crate::maybe_gz_decompress(body)?;
411            let mut cur = Cursor::from_slice(&body);
412            let diff = tl::enums::updates::Difference::deserialize(&mut cur)?;
413
414            match diff {
415                tl::enums::updates::Difference::Empty(e) => {
416                    let mut s = self.inner.pts_state.lock().await;
417                    s.date = e.date;
418                    s.seq = e.seq;
419                    s.touch();
420                    tracing::debug!("[layer] getDifference: empty (seq={})", e.seq);
421                    return Ok(all_updates);
422                }
423
424                tl::enums::updates::Difference::Difference(d) => {
425                    tracing::debug!(
426                        "[layer] getDifference: {} messages, {} updates (final)",
427                        d.new_messages.len(),
428                        d.other_updates.len()
429                    );
430                    self.cache_users_slice_pub(&d.users).await;
431                    self.cache_chats_slice_pub(&d.chats).await;
432                    for msg in d.new_messages {
433                        all_updates.push(update::Update::NewMessage(
434                            update::IncomingMessage::from_raw(msg).with_client(self.clone()),
435                        ));
436                    }
437                    for upd in d.other_updates {
438                        all_updates.extend(update::from_single_update_pub(upd));
439                    }
440                    let tl::enums::updates::State::State(ns) = d.state;
441                    let saved_channel_pts = {
442                        let s = self.inner.pts_state.lock().await;
443                        s.channel_pts.clone()
444                    };
445                    let mut new_state = PtsState::from_server_state(&ns);
446                    // Preserve per-channel pts across the global reset.
447                    for (cid, cpts) in saved_channel_pts {
448                        new_state.channel_pts.entry(cid).or_insert(cpts);
449                    }
450                    // Preserve in-flight sets: we clear getting_global_diff ourselves.
451                    new_state.getting_global_diff = true; // will be cleared by caller
452                    {
453                        let mut s = self.inner.pts_state.lock().await;
454                        let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
455                        let since = s.getting_global_diff_since; // preserve watchdog timestamp
456                        *s = new_state;
457                        s.getting_diff_for = getting_diff_for;
458                        s.getting_global_diff_since = since;
459                    }
460                    // Final response: stop looping.
461                    return Ok(all_updates);
462                }
463
464                tl::enums::updates::Difference::Slice(d) => {
465                    // server has more data: apply intermediate_state and
466                    // continue looping.  Old code returned here, losing all updates
467                    // in subsequent slices.
468                    tracing::debug!(
469                        "[layer] getDifference slice: {} messages, {} updates: continuing",
470                        d.new_messages.len(),
471                        d.other_updates.len()
472                    );
473                    self.cache_users_slice_pub(&d.users).await;
474                    self.cache_chats_slice_pub(&d.chats).await;
475                    for msg in d.new_messages {
476                        all_updates.push(update::Update::NewMessage(
477                            update::IncomingMessage::from_raw(msg).with_client(self.clone()),
478                        ));
479                    }
480                    for upd in d.other_updates {
481                        all_updates.extend(update::from_single_update_pub(upd));
482                    }
483                    let tl::enums::updates::State::State(ns) = d.intermediate_state;
484                    let saved_channel_pts = {
485                        let s = self.inner.pts_state.lock().await;
486                        s.channel_pts.clone()
487                    };
488                    let mut new_state = PtsState::from_server_state(&ns);
489                    for (cid, cpts) in saved_channel_pts {
490                        new_state.channel_pts.entry(cid).or_insert(cpts);
491                    }
492                    new_state.getting_global_diff = true;
493                    {
494                        let mut s = self.inner.pts_state.lock().await;
495                        let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
496                        let since = s.getting_global_diff_since; // preserve watchdog timestamp
497                        *s = new_state;
498                        s.getting_diff_for = getting_diff_for;
499                        s.getting_global_diff_since = since;
500                    }
501                    // Loop: fetch the next slice.
502                    continue;
503                }
504
505                tl::enums::updates::Difference::TooLong(d) => {
506                    tracing::warn!("[layer] getDifference: TooLong (pts={}): re-syncing", d.pts);
507                    self.inner.pts_state.lock().await.pts = d.pts;
508                    self.sync_pts_state().await?;
509                    return Ok(all_updates);
510                }
511            }
512        }
513    }
514
515    // Per-channel getChannelDifference
516
517    /// Fetch missed updates for a single channel.
518    pub async fn get_channel_difference(
519        &self,
520        channel_id: i64,
521    ) -> Result<Vec<update::Update>, InvocationError> {
522        let local_pts = self
523            .inner
524            .pts_state
525            .lock()
526            .await
527            .channel_pts
528            .get(&channel_id)
529            .copied()
530            .unwrap_or(0);
531
532        let access_hash = self
533            .inner
534            .peer_cache
535            .read()
536            .await
537            .channels
538            .get(&channel_id)
539            .copied()
540            .unwrap_or(0);
541
542        // No access hash in cache → we can't call getChannelDifference.
543        // Attempting GetChannels with access_hash=0 also returns CHANNEL_INVALID,
544        // so skip the call entirely and let the caller handle it.
545        if access_hash == 0 {
546            tracing::debug!(
547                "[layer] channel {channel_id}: access_hash not cached, \
548                 cannot call getChannelDifference: caller will remove from tracking"
549            );
550            return Err(InvocationError::Rpc(RpcError {
551                code: 400,
552                name: "CHANNEL_INVALID".into(),
553                value: None,
554            }));
555        }
556
557        tracing::debug!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
558
559        let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
560            channel_id,
561            access_hash,
562        });
563
564        // tDesktop ramp-up: limit=100 on first call, 1000 on subsequent ones.
565        // Bots always use the server-side maximum (100_000).
566        let diff_limit = if self.inner.is_bot.load(std::sync::atomic::Ordering::Relaxed) {
567            CHANNEL_DIFF_LIMIT_BOT
568        } else {
569            let call_count = self
570                .inner
571                .pts_state
572                .lock()
573                .await
574                .channel_diff_calls
575                .get(&channel_id)
576                .copied()
577                .unwrap_or(0);
578            if call_count == 0 { 100 } else { 1000 }
579        };
580
581        let req = tl::functions::updates::GetChannelDifference {
582            force: false,
583            channel,
584            filter: tl::enums::ChannelMessagesFilter::Empty,
585            pts: local_pts.max(1),
586            limit: diff_limit,
587        };
588
589        let body = match self.rpc_call_raw_pub(&req).await {
590            Ok(b) => {
591                // Successful call bump the per-channel counter so next call uses 1000.
592                self.inner
593                    .pts_state
594                    .lock()
595                    .await
596                    .channel_diff_calls
597                    .entry(channel_id)
598                    .and_modify(|c| *c = c.saturating_add(1))
599                    .or_insert(1);
600                b
601            }
602            Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
603                // treat as empty diff: retry next gap
604                tracing::debug!("[layer] PERSISTENT_TIMESTAMP_OUTDATED: skipping diff");
605                return Ok(vec![]);
606            }
607            Err(e) => return Err(e),
608        };
609        let body = crate::maybe_gz_decompress(body)?;
610        let mut cur = Cursor::from_slice(&body);
611        let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
612
613        let mut updates = Vec::new();
614
615        match diff {
616            tl::enums::updates::ChannelDifference::Empty(e) => {
617                tracing::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
618                self.inner
619                    .pts_state
620                    .lock()
621                    .await
622                    .advance_channel(channel_id, e.pts);
623            }
624            tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
625                tracing::debug!(
626                    "[layer] getChannelDifference: {} messages, {} updates",
627                    d.new_messages.len(),
628                    d.other_updates.len()
629                );
630                self.cache_users_slice_pub(&d.users).await;
631                self.cache_chats_slice_pub(&d.chats).await;
632                for msg in d.new_messages {
633                    updates.push(update::Update::NewMessage(
634                        update::IncomingMessage::from_raw(msg).with_client(self.clone()),
635                    ));
636                }
637                for upd in d.other_updates {
638                    updates.extend(update::from_single_update_pub(upd));
639                }
640                self.inner
641                    .pts_state
642                    .lock()
643                    .await
644                    .advance_channel(channel_id, d.pts);
645            }
646            tl::enums::updates::ChannelDifference::TooLong(d) => {
647                tracing::warn!(
648                    "[layer] getChannelDifference TooLong: replaying messages, resetting pts"
649                );
650                self.cache_users_slice_pub(&d.users).await;
651                self.cache_chats_slice_pub(&d.chats).await;
652                for msg in d.messages {
653                    updates.push(update::Update::NewMessage(
654                        update::IncomingMessage::from_raw(msg).with_client(self.clone()),
655                    ));
656                }
657                self.inner
658                    .pts_state
659                    .lock()
660                    .await
661                    .advance_channel(channel_id, 0);
662            }
663        }
664
665        Ok(updates)
666    }
667
668    // Sync from server
669
670    pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
671        let body = self
672            .rpc_call_raw_pub(&tl::functions::updates::GetState {})
673            .await?;
674        let mut cur = Cursor::from_slice(&body);
675        let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
676        let mut state = self.inner.pts_state.lock().await;
677        state.pts = s.pts;
678        state.qts = s.qts;
679        state.date = s.date;
680        state.seq = s.seq;
681        state.touch();
682        tracing::debug!(
683            "[layer] pts synced: pts={}, qts={}, seq={}",
684            s.pts,
685            s.qts,
686            s.seq
687        );
688        Ok(())
689    }
690    /// Check global pts, buffer during possible-gap window, fetch diff if real gap.
691    ///
692    /// When a global getDifference is already in-flight (`getting_global_diff == true`),
693    /// updates are **force-dispatched** immediately without pts tracking.
694    /// This prevents the cascade freeze that buffering caused:
695    ///   1. getDiff runs; flight-buffered updates pile up in `possible_gap`.
696    ///   2. getDiff returns; `gap_tick` sees `has_global()=true` → another getDiff.
697    ///   3. Each getDiff spawns another → bot freezes under a burst of messages.
698    pub async fn check_and_fill_gap(
699        &self,
700        new_pts: i32,
701        pts_count: i32,
702        upd: Option<update::Update>,
703    ) -> Result<Vec<update::Update>, InvocationError> {
704        // getDiff in flight: force updates through without pts tracking.
705        //
706        // Force-dispatch: socket updates are sent through
707        // when getting_diff_for contains the key; no buffering, no pts check.
708        // Buffering caused a cascade of getDiff calls and a bot freeze under bursts.
709        // Force-dispatch means these may duplicate what getDiff returns (same pts
710        // range), which is acceptable: Telegram's spec explicitly states that socket
711        // updates received during getDiff "should also have been retrieved through
712        // getDifference". Application-layer deduplication by message_id handles doubles.
713        // pts is NOT advanced here; getDiff sets it authoritatively when it returns.
714        if self.inner.pts_state.lock().await.getting_global_diff {
715            tracing::debug!("[layer] global diff in flight: force-applying pts={new_pts}");
716            return Ok(upd.into_iter().collect());
717        }
718
719        let result = self
720            .inner
721            .pts_state
722            .lock()
723            .await
724            .check_pts(new_pts, pts_count);
725        match result {
726            PtsCheckResult::Ok => {
727                // Advance pts and dispatch only this update.
728                //
729                // Do NOT blindly drain possible_gap here.
730                //
731                // Old behaviour: drain all buffered updates and return them together
732                // with the Ok update.  This caused a second freeze:
733                //
734                //   1. pts=1021.  Burst arrives: 1024-1030 all gap → buffered.
735                //   2. Update 1022 arrives → Ok → drain dispatches 1024-1030.
736                //   3. pts advances only to 1022 (the Ok value), NOT to 1030.
737                //   4. Bot sends replies → updateShortSentMessage pts=1031 →
738                //      check_and_fill_gap: expected=1023, got=1031 → GAP.
739                //   5. Cascade getDiff → duplicates → flood-wait → freeze.
740                //
741                // Grammers avoids this by re-checking each buffered update in
742                // order and advancing pts for each one (process_updates inner loop
743                // over entry.possible_gap).  Layer's Update enum carries no pts
744                // metadata, so we cannot replicate that ordered sequence check here.
745                //
746                // Correct equivalent: leave possible_gap alone.  The buffered
747                // updates will be recovered by gap_tick → getDiff(new_pts), which
748                // drains possible_gap into pre_diff, lets the server fill the
749                // gap, and advances pts to the true server state; no stale pts,
750                // no secondary cascade, no duplicates.
751                self.inner.pts_state.lock().await.advance(new_pts);
752                Ok(upd.into_iter().collect())
753            }
754            PtsCheckResult::Gap { expected, got } => {
755                // Buffer the update; start the deadline timer regardless.
756                //
757                // Bug fix (touch_global_timer): when upd=None (e.g. the gap is
758                // triggered by an updateShortSentMessage RPC response), nothing was
759                // ever pushed to possible_gap.global, so global stayed None.
760                // global_deadline_elapsed() returned false forever, gap_tick never
761                // saw has_global()=true, and the gap was never resolved unless a
762                // subsequent user message arrived.  touch_global_timer() starts the
763                // 1-second deadline clock even without a buffered update.
764                {
765                    let mut gap = self.inner.possible_gap.lock().await;
766                    if let Some(u) = upd {
767                        gap.push_global(u);
768                    } else {
769                        gap.touch_global_timer();
770                    }
771                }
772                let deadline_elapsed = self
773                    .inner
774                    .possible_gap
775                    .lock()
776                    .await
777                    .global_deadline_elapsed();
778                if deadline_elapsed {
779                    tracing::warn!(
780                        "[layer] global pts gap: expected {expected}, got {got}: getDifference"
781                    );
782                    // get_difference() is now atomic (check-and-set) and drains the
783                    // possible_gap buffer internally on success, so callers must NOT
784                    // drain before calling or splice the old buffer onto the results.
785                    // Doing so caused every gap update to be dispatched twice, which
786                    // triggered FLOOD_WAIT, blocked the handler, and froze the bot.
787                    self.get_difference().await
788                } else {
789                    tracing::debug!(
790                        "[layer] global pts gap: expected {expected}, got {got}: buffering (possible gap)"
791                    );
792                    Ok(vec![])
793                }
794            }
795            PtsCheckResult::Duplicate => {
796                tracing::debug!("[layer] global pts duplicate, discarding");
797                Ok(vec![])
798            }
799        }
800    }
801
802    /// Check qts (secret chat updates) and fill gap if needed.
803    pub async fn check_and_fill_qts_gap(
804        &self,
805        new_qts: i32,
806        qts_count: i32,
807    ) -> Result<Vec<update::Update>, InvocationError> {
808        let result = self
809            .inner
810            .pts_state
811            .lock()
812            .await
813            .check_qts(new_qts, qts_count);
814        match result {
815            PtsCheckResult::Ok => {
816                self.inner.pts_state.lock().await.advance_qts(new_qts);
817                Ok(vec![])
818            }
819            PtsCheckResult::Gap { expected, got } => {
820                tracing::warn!("[layer] qts gap: expected {expected}, got {got}: getDifference");
821                self.get_difference().await
822            }
823            PtsCheckResult::Duplicate => Ok(vec![]),
824        }
825    }
826
827    /// Check top-level seq and fill gap if needed.
828    pub async fn check_and_fill_seq_gap(
829        &self,
830        new_seq: i32,
831        seq_start: i32,
832    ) -> Result<Vec<update::Update>, InvocationError> {
833        let result = self
834            .inner
835            .pts_state
836            .lock()
837            .await
838            .check_seq(new_seq, seq_start);
839        match result {
840            PtsCheckResult::Ok => {
841                self.inner.pts_state.lock().await.advance_seq(new_seq);
842                Ok(vec![])
843            }
844            PtsCheckResult::Gap { expected, got } => {
845                tracing::warn!("[layer] seq gap: expected {expected}, got {got}: getDifference");
846                self.get_difference().await
847            }
848            PtsCheckResult::Duplicate => Ok(vec![]),
849        }
850    }
851
852    /// Check a per-channel pts, fetch getChannelDifference if there is a gap.
853    pub async fn check_and_fill_channel_gap(
854        &self,
855        channel_id: i64,
856        new_pts: i32,
857        pts_count: i32,
858        upd: Option<update::Update>,
859    ) -> Result<Vec<update::Update>, InvocationError> {
860        // if a diff is already in flight for this channel, skip: prevents
861        // 1 gap from spawning N concurrent getChannelDifference tasks.
862        if self
863            .inner
864            .pts_state
865            .lock()
866            .await
867            .getting_diff_for
868            .contains(&channel_id)
869        {
870            tracing::debug!("[layer] channel {channel_id} diff already in flight, skipping");
871            if let Some(u) = upd {
872                self.inner
873                    .possible_gap
874                    .lock()
875                    .await
876                    .push_channel(channel_id, u);
877            }
878            return Ok(vec![]);
879        }
880
881        let result = self
882            .inner
883            .pts_state
884            .lock()
885            .await
886            .check_channel_pts(channel_id, new_pts, pts_count);
887        match result {
888            PtsCheckResult::Ok => {
889                let mut buffered = self
890                    .inner
891                    .possible_gap
892                    .lock()
893                    .await
894                    .drain_channel(channel_id);
895                self.inner
896                    .pts_state
897                    .lock()
898                    .await
899                    .advance_channel(channel_id, new_pts);
900                if let Some(u) = upd {
901                    buffered.push(u);
902                }
903                Ok(buffered)
904            }
905            PtsCheckResult::Gap { expected, got } => {
906                if let Some(u) = upd {
907                    self.inner
908                        .possible_gap
909                        .lock()
910                        .await
911                        .push_channel(channel_id, u);
912                }
913                let deadline_elapsed = self
914                    .inner
915                    .possible_gap
916                    .lock()
917                    .await
918                    .channel_deadline_elapsed(channel_id);
919                if deadline_elapsed {
920                    tracing::warn!(
921                        "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: getChannelDifference"
922                    );
923                    // mark this channel as having a diff in flight.
924                    self.inner
925                        .pts_state
926                        .lock()
927                        .await
928                        .getting_diff_for
929                        .insert(channel_id);
930                    let buffered = self
931                        .inner
932                        .possible_gap
933                        .lock()
934                        .await
935                        .drain_channel(channel_id);
936                    match self.get_channel_difference(channel_id).await {
937                        Ok(mut diff_updates) => {
938                            // diff complete, allow future gaps to be handled.
939                            self.inner
940                                .pts_state
941                                .lock()
942                                .await
943                                .getting_diff_for
944                                .remove(&channel_id);
945                            diff_updates.splice(0..0, buffered);
946                            Ok(diff_updates)
947                        }
948                        // Permanent access errors: remove the channel from pts tracking
949                        // entirely (. The next update for this
950                        // channel will have local=0 → PtsCheckResult::Ok, advancing pts
951                        // without any gap fill. This breaks the infinite gap→CHANNEL_INVALID
952                        // loop that happened when advance_channel kept the stale pts alive.
953                        //
954                        // Common causes:
955                        // - access_hash not in peer cache (update arrived via updateShort
956                        // which carries no chats list)
957                        // - bot was kicked / channel deleted
958                        Err(InvocationError::Rpc(ref e))
959                            if e.name == "CHANNEL_INVALID"
960                                || e.name == "CHANNEL_PRIVATE"
961                                || e.name == "CHANNEL_NOT_MODIFIED" =>
962                        {
963                            tracing::debug!(
964                                "[layer] channel {channel_id}: {}: removing from pts tracking \
965                                 (next update treated as first-seen, no gap fill)",
966                                e.name
967                            );
968                            {
969                                let mut s = self.inner.pts_state.lock().await;
970                                s.getting_diff_for.remove(&channel_id);
971                                s.channel_pts.remove(&channel_id); // ←  fix: delete, not advance
972                            }
973                            Ok(buffered)
974                        }
975                        Err(InvocationError::Deserialize(ref msg)) => {
976                            // Unrecognised constructor or parse failure: treat same as
977                            // CHANNEL_INVALID: remove from tracking so we don't loop.
978                            tracing::debug!(
979                                "[layer] channel {channel_id}: deserialize error ({msg}): \
980                                 removing from pts tracking"
981                            );
982                            {
983                                let mut s = self.inner.pts_state.lock().await;
984                                s.getting_diff_for.remove(&channel_id);
985                                s.channel_pts.remove(&channel_id);
986                            }
987                            Ok(buffered)
988                        }
989                        Err(e) => {
990                            // also clear on unexpected errors so we don't get stuck.
991                            self.inner
992                                .pts_state
993                                .lock()
994                                .await
995                                .getting_diff_for
996                                .remove(&channel_id);
997                            Err(e)
998                        }
999                    }
1000                } else {
1001                    tracing::debug!(
1002                        "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: buffering"
1003                    );
1004                    Ok(vec![])
1005                }
1006            }
1007            PtsCheckResult::Duplicate => {
1008                tracing::debug!("[layer] channel {channel_id} pts duplicate, discarding");
1009                Ok(vec![])
1010            }
1011        }
1012    }
1013
1014    /// Called periodically (e.g. from keepalive) to fire getDifference
1015    /// if no update has been received for > 15 minutes.
1016    ///
1017    /// also drives per-entry possible-gap deadlines independently of
1018    /// incoming updates.  Previously the POSSIBLE_GAP_DEADLINE_MS window was
1019    /// only evaluated when a new incoming update called check_and_fill_gap
1020    /// meaning a quiet channel with a real gap would never fire getDifference
1021    /// until another update arrived.  This
1022    /// which scans all LiveEntry.effective_deadline() on every keepalive tick.
1023    pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
1024        // Stuck-diff watchdog: if getting_global_diff has been true for more than
1025        // 30 s the in-flight getDifference RPC is assumed hung (e.g. half-open TCP
1026        // that the OS keepalive hasn't killed yet).  Reset the guard so the next
1027        // gap_tick cycle can issue a fresh getDifference.  The 30-second timeout
1028        // in get_difference() will concurrently return an error and also clear the
1029        // flag; this watchdog is a belt-and-suspenders safety net for edge cases
1030        // where that timeout itself is somehow delayed.
1031        {
1032            let stuck = {
1033                let s = self.inner.pts_state.lock().await;
1034                s.getting_global_diff
1035                    && s.getting_global_diff_since
1036                        .map(|t| t.elapsed().as_secs() > 30)
1037                        .unwrap_or(false)
1038            };
1039            if stuck {
1040                tracing::warn!(
1041                    "[layer] getDifference in-flight for >30 s: \
1042                     resetting guard so gap_tick can retry"
1043                );
1044                let mut s = self.inner.pts_state.lock().await;
1045                s.getting_global_diff = false;
1046                s.getting_global_diff_since = None;
1047            }
1048        }
1049
1050        // existing 5-minute global timeout
1051        let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
1052        if exceeded {
1053            tracing::info!("[layer] update deadline exceeded: fetching getDifference");
1054            match self.get_difference().await {
1055                Ok(updates) => {
1056                    for u in updates {
1057                        if self.inner.update_tx.try_send(u).is_err() {
1058                            tracing::warn!("[layer] update channel full: dropping diff update");
1059                        }
1060                    }
1061                }
1062                Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1063                    // 401: gap buffer already cleared inside get_difference().
1064                    // gap_tick will not re-fire. Supervisor handles reconnect.
1065                    tracing::warn!(
1066                        "[layer] deadline getDifference AUTH_KEY_UNREGISTERED: session dead"
1067                    );
1068                }
1069                Err(e) => return Err(e),
1070            }
1071        }
1072
1073        // drive global possible-gap deadline
1074        // If the possible-gap window has expired but no new update has arrived
1075        // to trigger check_and_fill_gap, fire getDifference from here.
1076        {
1077            let gap_expired = self
1078                .inner
1079                .possible_gap
1080                .lock()
1081                .await
1082                .global_deadline_elapsed();
1083            // Note: get_difference() is now atomic (check-and-set), so the
1084            // `already` guard is advisory only; get_difference() will bail
1085            // safely if another call is already in flight.
1086            if gap_expired {
1087                tracing::debug!("[layer] B3 global possible-gap deadline expired: getDifference");
1088                // get_difference() snapshots and drains the pre-existing buffer at its
1089                // start (before the RPC), so updates that arrive DURING the RPC flight
1090                // remain in possible_gap for the next cycle.  Never drain here.
1091                match self.get_difference().await {
1092                    Ok(updates) => {
1093                        for u in updates {
1094                            if self.inner.update_tx.try_send(u).is_err() {
1095                                tracing::warn!("[layer] update channel full: dropping gap update");
1096                            }
1097                        }
1098                    }
1099                    Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1100                        // 401: get_difference() cleared the gap buffer, so gap_tick
1101                        // will not re-fire. Supervisor handles reconnect.
1102                        tracing::warn!(
1103                            "[layer] B3 global gap diff AUTH_KEY_UNREGISTERED: \
1104                             session dead, gap buffer cleared"
1105                        );
1106                    }
1107                    Err(e) => {
1108                        tracing::warn!("[layer] B3 global gap diff failed: {e}");
1109                        return Err(e);
1110                    }
1111                }
1112            }
1113        }
1114
1115        // drive per-channel possible-gap deadlines
1116        // Collect expired channel IDs up-front to avoid holding the lock across awaits.
1117        let expired_channels: Vec<i64> = {
1118            let gap = self.inner.possible_gap.lock().await;
1119            gap.channel
1120                .keys()
1121                .copied()
1122                .filter(|&id| gap.channel_deadline_elapsed(id))
1123                .collect()
1124        };
1125        for channel_id in expired_channels {
1126            let already = self
1127                .inner
1128                .pts_state
1129                .lock()
1130                .await
1131                .getting_diff_for
1132                .contains(&channel_id);
1133            if already {
1134                continue;
1135            }
1136            tracing::debug!(
1137                "[layer] B3 channel {channel_id} possible-gap deadline expired: getChannelDifference"
1138            );
1139            // Mark in-flight before spawning so a racing incoming update can't
1140            // also spawn a diff for the same channel.
1141            self.inner
1142                .pts_state
1143                .lock()
1144                .await
1145                .getting_diff_for
1146                .insert(channel_id);
1147            let buffered = self
1148                .inner
1149                .possible_gap
1150                .lock()
1151                .await
1152                .drain_channel(channel_id);
1153            let c = self.clone();
1154            let utx = self.inner.update_tx.clone();
1155            tokio::spawn(async move {
1156                match c.get_channel_difference(channel_id).await {
1157                    Ok(mut updates) => {
1158                        c.inner
1159                            .pts_state
1160                            .lock()
1161                            .await
1162                            .getting_diff_for
1163                            .remove(&channel_id);
1164                        updates.splice(0..0, buffered);
1165                        for u in updates {
1166                            if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1167                                tracing::warn!(
1168                                    "[layer] update channel full: dropping ch gap update"
1169                                );
1170                            }
1171                        }
1172                    }
1173                    Err(e) => {
1174                        c.inner
1175                            .pts_state
1176                            .lock()
1177                            .await
1178                            .getting_diff_for
1179                            .remove(&channel_id);
1180                        // Permanent errors (CHANNEL_INVALID etc.): updates are
1181                        // unrecoverable, drop them. Transient errors: restore
1182                        // the buffer so the next B3 cycle can retry.
1183                        let permanent = matches!(&e,
1184                            InvocationError::Rpc(r)
1185                                if r.code == 401
1186                                    || r.name == "CHANNEL_INVALID"
1187                                    || r.name == "CHANNEL_PRIVATE"
1188                                    || r.name == "CHANNEL_NOT_MODIFIED"
1189                        ) || matches!(&e, InvocationError::Deserialize(_));
1190                        if permanent {
1191                            tracing::warn!(
1192                                "[layer] B3 channel {channel_id} gap diff failed (permanent): {e}"
1193                            );
1194                        } else {
1195                            tracing::warn!(
1196                                "[layer] B3 channel {channel_id} gap diff failed (transient): {e}"
1197                            );
1198                            let mut gap = c.inner.possible_gap.lock().await;
1199                            for u in buffered {
1200                                gap.push_channel(channel_id, u);
1201                            }
1202                        }
1203                    }
1204                }
1205            });
1206        }
1207
1208        Ok(())
1209    }
1210}