Skip to main content

layer_client/
pts.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// NOTE:
5// The "Layer" project is no longer maintained or supported.
6// Its original purpose for personal SDK/APK experimentation and learning
7// has been fulfilled.
8//
9// Please use Ferogram instead:
10// https://github.com/ankit-chaubey/ferogram
11// Ferogram will receive future updates and development, although progress
12// may be slower.
13//
14// Ferogram is an async Telegram MTProto client library written in Rust.
15// Its implementation follows the behaviour of the official Telegram clients,
16// particularly Telegram Desktop and TDLib, and aims to provide a clean and
17// modern async interface for building Telegram clients and tools.
18
19//! Update gap detection and recovery.
20//!
21//! Tracks `pts` / `qts` / `seq` / `date` plus per-channel pts, and
22//! fills gaps via `updates.getDifference` (global) and
23//! `updates.getChannelDifference` (per-channel).
24//!
25//! ## What "gap" means
26//! Telegram guarantees updates arrive in order within a pts counter.
27//! If `new_pts != local_pts + pts_count` there is a gap and we must
28//! ask the server for the missed updates before processing this one.
29
30use std::collections::{HashMap, HashSet};
31use std::time::Instant;
32
33use layer_tl_types as tl;
34use layer_tl_types::{Cursor, Deserializable};
35
36use crate::{Client, InvocationError, RpcError, attach_client_to_update, update};
37
38/// How long to wait before declaring a pts jump a real gap (ms).
39const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
40
41/// Bots are allowed a much larger diff window (Telegram server-side limit).
42const CHANNEL_DIFF_LIMIT_BOT: i32 = 100_000;
43
44/// Buffers updates received during a possible-gap window so we don't fire
45/// getDifference on every slightly out-of-order update.
46#[derive(Default)]
47pub struct PossibleGapBuffer {
48    /// channel_id → (buffered_updates, window_start)
49    channel: HashMap<i64, (Vec<update::Update>, Instant)>,
50    /// Global buffered updates (non-channel pts gaps)
51    global: Option<(Vec<update::Update>, Instant)>,
52}
53
54impl PossibleGapBuffer {
55    pub fn new() -> Self {
56        Self::default()
57    }
58
59    /// Buffer a global update during a possible-gap window.
60    pub fn push_global(&mut self, upd: update::Update) {
61        let entry = self
62            .global
63            .get_or_insert_with(|| (Vec::new(), Instant::now()));
64        entry.0.push(upd);
65    }
66
67    /// Buffer a channel update during a possible-gap window.
68    pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
69        let entry = self
70            .channel
71            .entry(channel_id)
72            .or_insert_with(|| (Vec::new(), Instant::now()));
73        entry.0.push(upd);
74    }
75
76    /// True if the global possible-gap deadline has elapsed.
77    pub fn global_deadline_elapsed(&self) -> bool {
78        self.global
79            .as_ref()
80            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
81            .unwrap_or(false)
82    }
83
84    /// True if a channel's possible-gap deadline has elapsed.
85    pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
86        self.channel
87            .get(&channel_id)
88            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
89            .unwrap_or(false)
90    }
91
92    /// True if the global buffer has any pending updates.
93    pub fn has_global(&self) -> bool {
94        self.global.is_some()
95    }
96
97    /// True if a channel buffer has pending updates.
98    pub fn has_channel(&self, channel_id: i64) -> bool {
99        self.channel.contains_key(&channel_id)
100    }
101
102    /// Start the global deadline timer without buffering an update.
103    ///
104    /// Called when a gap is detected but the triggering update carries no
105    /// high-level `Update` value (e.g. `updateShortSentMessage` with `upd=None`).
106    /// Without this, `global` stays `None` → `global_deadline_elapsed()` always
107    /// returns `false` → `gap_tick` never fires getDifference for such gaps.
108    pub fn touch_global_timer(&mut self) {
109        self.global
110            .get_or_insert_with(|| (Vec::new(), Instant::now()));
111    }
112
113    /// Drain global buffered updates.
114    pub fn drain_global(&mut self) -> Vec<update::Update> {
115        self.global.take().map(|(v, _)| v).unwrap_or_default()
116    }
117
118    /// Drain channel buffered updates.
119    pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
120        self.channel
121            .remove(&channel_id)
122            .map(|(v, _)| v)
123            .unwrap_or_default()
124    }
125}
126
127// PtsState
128
129/// Full MTProto sequence-number state, including per-channel counters.
130///
131/// All fields are `pub` so that `connect()` can restore them from the
132/// persisted session without going through an artificial constructor.
133#[derive(Debug, Clone, Default)]
134pub struct PtsState {
135    /// Main pts counter (messages, non-channel updates).
136    pub pts: i32,
137    /// Secondary counter for secret-chat updates.
138    pub qts: i32,
139    /// Date of the last received update (Unix timestamp).
140    pub date: i32,
141    /// Combined-container sequence number.
142    pub seq: i32,
143    /// Per-channel pts counters.  `channel_id → pts`.
144    pub channel_pts: HashMap<i64, i32>,
145    /// How many times getChannelDifference has been called per channel.
146    /// tDesktop starts at limit=100, then raises to 1000 after the first
147    /// successful response.  We track call count to implement the same ramp-up.
148    pub channel_diff_calls: HashMap<i64, u32>,
149    /// Timestamp of last received update for deadline-based gap detection.
150    pub last_update_at: Option<Instant>,
151    /// Channels currently awaiting a getChannelDifference response.
152    /// If a channel is in this set, no new gap-fill task is spawned for it.
153    pub getting_diff_for: HashSet<i64>,
154    /// Guard against concurrent global getDifference calls.
155    /// Without this, two simultaneous gap detections both spawn get_difference(),
156    /// which double-processes updates and corrupts pts state.
157    pub getting_global_diff: bool,
158    /// When getting_global_diff was set to true.  Used by the stuck-diff watchdog
159    /// in check_update_deadline: if the flag has been set for >30 s the RPC is
160    /// assumed hung and the guard is reset so the next gap_tick can retry.
161    pub getting_global_diff_since: Option<Instant>,
162}
163
164impl PtsState {
165    pub fn from_server_state(s: &tl::types::updates::State) -> Self {
166        Self {
167            pts: s.pts,
168            qts: s.qts,
169            date: s.date,
170            seq: s.seq,
171            channel_pts: HashMap::new(),
172            channel_diff_calls: HashMap::new(),
173            last_update_at: Some(Instant::now()),
174            getting_diff_for: HashSet::new(),
175            getting_global_diff: false,
176            getting_global_diff_since: None,
177        }
178    }
179
180    /// Record that an update was received now (resets the deadline timer).
181    pub fn touch(&mut self) {
182        self.last_update_at = Some(Instant::now());
183    }
184
185    /// Returns true if no update has been received for > 15 minutes.
186    pub fn deadline_exceeded(&self) -> bool {
187        self.last_update_at
188            .as_ref()
189            .map(|t| t.elapsed().as_secs() > 15 * 60)
190            .unwrap_or(false)
191    }
192
193    /// Check whether `new_pts` is in order given `pts_count` new updates.
194    pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
195        let expected = self.pts + pts_count;
196        if new_pts == expected {
197            PtsCheckResult::Ok
198        } else if new_pts > expected {
199            PtsCheckResult::Gap {
200                expected,
201                got: new_pts,
202            }
203        } else {
204            PtsCheckResult::Duplicate
205        }
206    }
207
208    /// Check a qts value (secret chat updates).
209    pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
210        let expected = self.qts + qts_count;
211        if new_qts == expected {
212            PtsCheckResult::Ok
213        } else if new_qts > expected {
214            PtsCheckResult::Gap {
215                expected,
216                got: new_qts,
217            }
218        } else {
219            PtsCheckResult::Duplicate
220        }
221    }
222
223    /// Check top-level seq for UpdatesCombined containers.
224    pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
225        if self.seq == 0 {
226            return PtsCheckResult::Ok;
227        } // uninitialised: accept
228        let expected = self.seq + 1;
229        if seq_start == expected {
230            PtsCheckResult::Ok
231        } else if seq_start > expected {
232            PtsCheckResult::Gap {
233                expected,
234                got: seq_start,
235            }
236        } else {
237            PtsCheckResult::Duplicate
238        }
239    }
240
241    /// Check a per-channel pts value.
242    pub fn check_channel_pts(
243        &self,
244        channel_id: i64,
245        new_pts: i32,
246        pts_count: i32,
247    ) -> PtsCheckResult {
248        let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
249        if local == 0 {
250            return PtsCheckResult::Ok;
251        }
252        let expected = local + pts_count;
253        if new_pts == expected {
254            PtsCheckResult::Ok
255        } else if new_pts > expected {
256            PtsCheckResult::Gap {
257                expected,
258                got: new_pts,
259            }
260        } else {
261            PtsCheckResult::Duplicate
262        }
263    }
264
265    /// Advance the global pts.
266    pub fn advance(&mut self, new_pts: i32) {
267        if new_pts > self.pts {
268            self.pts = new_pts;
269        }
270        self.touch();
271    }
272
273    /// Advance the qts.
274    pub fn advance_qts(&mut self, new_qts: i32) {
275        if new_qts > self.qts {
276            self.qts = new_qts;
277        }
278        self.touch();
279    }
280
281    /// Advance seq.
282    pub fn advance_seq(&mut self, new_seq: i32) {
283        if new_seq > self.seq {
284            self.seq = new_seq;
285        }
286    }
287
288    /// Advance a per-channel pts.
289    pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
290        let entry = self.channel_pts.entry(channel_id).or_insert(0);
291        if new_pts > *entry {
292            *entry = new_pts;
293        }
294        self.touch();
295    }
296}
297
298#[derive(Debug, PartialEq, Eq)]
299pub enum PtsCheckResult {
300    Ok,
301    Gap { expected: i32, got: i32 },
302    Duplicate,
303}
304
305// Client methods
306
307impl Client {
308    // Global getDifference
309
310    /// Fetch and replay any updates missed since the persisted pts.
311    ///
312    /// loops on `Difference::Slice` (partial response) until the server
313    /// returns a final `Difference` or `Empty`
314    /// never dropping a partial batch.  Previous code returned after one slice,
315    /// silently losing all updates in subsequent slices.
316    pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
317        // Atomically claim the in-flight slot.
318        //
319        // TOCTOU fix: the old code set getting_global_diff=true inside get_difference
320        // but the external guard in check_and_fill_gap read the flag in a SEPARATE lock
321        // acquisition.  With multi-threaded Tokio, N tasks could all read false, all
322        // pass the external check, then all race into this function and all set the flag
323        // to true.  Each concurrent call resets pts_state to the server state it received;
324        // the last STALE write rolls pts back below where it actually is, which immediately
325        // triggers a new gap → another burst of concurrent getDifference calls → cascade.
326        //
327        // Fix: check-and-set inside a SINGLE lock acquisition.  Only the first caller
328        // proceeds; all others see true and return Ok(vec![]) immediately.
329        {
330            let mut s = self.inner.pts_state.lock().await;
331            if s.getting_global_diff {
332                return Ok(vec![]);
333            }
334            s.getting_global_diff = true;
335            s.getting_global_diff_since = Some(Instant::now());
336        }
337
338        // Drain the initial-gap buffer before the RPC.
339        //
340        // possible_gap.global contains updates buffered during the possible-gap window
341        // (before we decided to call getDiff).  The server response covers exactly
342        // these pts values → discard the snapshot on success.
343        // On RPC error, restore them so the next gap_tick can retry.
344        //
345        // Note: updates arriving DURING the RPC flight are now force-dispatched by
346        // check_and_fill_gap and never accumulate in possible_gap,
347        // so there is nothing extra to drain after the call returns.
348        let pre_diff = self.inner.possible_gap.lock().await.drain_global();
349
350        // Wrap the RPC in a hard 30-second timeout so a hung TCP connection
351        // (half-open socket, unresponsive DC) cannot hold getting_global_diff=true
352        // forever and freeze the bot indefinitely.
353        let result = tokio::time::timeout(
354            std::time::Duration::from_secs(30),
355            self.get_difference_inner(),
356        )
357        .await
358        .unwrap_or_else(|_| {
359            tracing::warn!("[layer] getDifference RPC timed out after 30 s: will retry");
360            Err(InvocationError::Io(std::io::Error::new(
361                std::io::ErrorKind::TimedOut,
362                "getDifference timed out",
363            )))
364        });
365
366        // Always clear the guard, even on error.
367        {
368            let mut s = self.inner.pts_state.lock().await;
369            s.getting_global_diff = false;
370            s.getting_global_diff_since = None;
371        }
372
373        match &result {
374            Ok(_) => {
375                // pre_diff is covered by the server response; discard it.
376                // (Flight-time updates are force-dispatched, not buffered into possible_gap.)
377            }
378            Err(e) => {
379                let mut gap = self.inner.possible_gap.lock().await;
380                if matches!(e, InvocationError::Rpc(r) if r.code == 401) {
381                    // 401: do not restore the gap buffer. push_global inserts a
382                    // fresh Instant::now() timestamp, making global_deadline_elapsed()
383                    // return true immediately on the next gap_tick cycle, which fires
384                    // get_difference() again and loops. Drop it; reconnect will
385                    // re-sync state via sync_pts_state / getDifference.
386                    drop(pre_diff);
387                } else {
388                    // Transient error: restore so the next gap_tick can retry.
389                    for u in pre_diff {
390                        gap.push_global(u);
391                    }
392                }
393            }
394        }
395
396        result
397    }
398
399    async fn get_difference_inner(&self) -> Result<Vec<update::Update>, InvocationError> {
400        use layer_tl_types::{Cursor, Deserializable};
401
402        let mut all_updates: Vec<update::Update> = Vec::new();
403
404        // loop until the server sends a final (non-Slice) response.
405        loop {
406            let (pts, qts, date) = {
407                let s = self.inner.pts_state.lock().await;
408                (s.pts, s.qts, s.date)
409            };
410
411            if pts == 0 {
412                self.sync_pts_state().await?;
413                return Ok(all_updates);
414            }
415
416            tracing::debug!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
417
418            let req = tl::functions::updates::GetDifference {
419                pts,
420                pts_limit: None,
421                pts_total_limit: None,
422                date,
423                qts,
424                qts_limit: None,
425            };
426
427            let body = self.rpc_call_raw_pub(&req).await?;
428            let body = crate::maybe_gz_decompress(body)?;
429            let mut cur = Cursor::from_slice(&body);
430            // tDesktop mirrors this: if the response body has an unknown constructor
431            // (e.g. a service frame misrouted here during a race), handler.done()
432            // returns false and the request is silently retired.  We do the same:
433            // log at debug level and return whatever updates accumulated so far.
434            let diff = match tl::enums::updates::Difference::deserialize(&mut cur) {
435                Ok(d) => d,
436                Err(e) => {
437                    let cid = if body.len() >= 4 {
438                        u32::from_le_bytes(body[..4].try_into().unwrap())
439                    } else {
440                        0
441                    };
442                    tracing::debug!(
443                        "[layer] getDifference: unrecognised response constructor \
444                         {cid:#010x}, dropping ({})",
445                        e
446                    );
447                    return Ok(all_updates);
448                }
449            };
450
451            match diff {
452                tl::enums::updates::Difference::Empty(e) => {
453                    let mut s = self.inner.pts_state.lock().await;
454                    s.date = e.date;
455                    s.seq = e.seq;
456                    s.touch();
457                    tracing::debug!("[layer] getDifference: empty (seq={})", e.seq);
458                    return Ok(all_updates);
459                }
460
461                tl::enums::updates::Difference::Difference(d) => {
462                    tracing::debug!(
463                        "[layer] getDifference: {} messages, {} updates (final)",
464                        d.new_messages.len(),
465                        d.other_updates.len()
466                    );
467                    self.cache_users_slice_pub(&d.users).await;
468                    self.cache_chats_slice_pub(&d.chats).await;
469                    for msg in d.new_messages {
470                        all_updates.push(update::Update::NewMessage(
471                            update::IncomingMessage::from_raw(msg).with_client(self.clone()),
472                        ));
473                    }
474                    for upd in d.other_updates {
475                        all_updates.extend(update::from_single_update_pub(upd));
476                    }
477                    let tl::enums::updates::State::State(ns) = d.state;
478                    let saved_channel_pts = {
479                        let s = self.inner.pts_state.lock().await;
480                        s.channel_pts.clone()
481                    };
482                    let mut new_state = PtsState::from_server_state(&ns);
483                    // Preserve per-channel pts across the global reset.
484                    for (cid, cpts) in saved_channel_pts {
485                        new_state.channel_pts.entry(cid).or_insert(cpts);
486                    }
487                    // Preserve in-flight sets: we clear getting_global_diff ourselves.
488                    new_state.getting_global_diff = true; // will be cleared by caller
489                    {
490                        let mut s = self.inner.pts_state.lock().await;
491                        let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
492                        let since = s.getting_global_diff_since; // preserve watchdog timestamp
493                        *s = new_state;
494                        s.getting_diff_for = getting_diff_for;
495                        s.getting_global_diff_since = since;
496                    }
497                    // Final response: stop looping.
498                    return Ok(all_updates);
499                }
500
501                tl::enums::updates::Difference::Slice(d) => {
502                    // server has more data: apply intermediate_state and
503                    // continue looping.  Old code returned here, losing all updates
504                    // in subsequent slices.
505                    tracing::debug!(
506                        "[layer] getDifference slice: {} messages, {} updates: continuing",
507                        d.new_messages.len(),
508                        d.other_updates.len()
509                    );
510                    self.cache_users_slice_pub(&d.users).await;
511                    self.cache_chats_slice_pub(&d.chats).await;
512                    for msg in d.new_messages {
513                        all_updates.push(update::Update::NewMessage(
514                            update::IncomingMessage::from_raw(msg).with_client(self.clone()),
515                        ));
516                    }
517                    for upd in d.other_updates {
518                        all_updates.extend(update::from_single_update_pub(upd));
519                    }
520                    let tl::enums::updates::State::State(ns) = d.intermediate_state;
521                    let saved_channel_pts = {
522                        let s = self.inner.pts_state.lock().await;
523                        s.channel_pts.clone()
524                    };
525                    let mut new_state = PtsState::from_server_state(&ns);
526                    for (cid, cpts) in saved_channel_pts {
527                        new_state.channel_pts.entry(cid).or_insert(cpts);
528                    }
529                    new_state.getting_global_diff = true;
530                    {
531                        let mut s = self.inner.pts_state.lock().await;
532                        let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
533                        let since = s.getting_global_diff_since; // preserve watchdog timestamp
534                        *s = new_state;
535                        s.getting_diff_for = getting_diff_for;
536                        s.getting_global_diff_since = since;
537                    }
538                    // Loop: fetch the next slice.
539                    continue;
540                }
541
542                tl::enums::updates::Difference::TooLong(d) => {
543                    tracing::warn!("[layer] getDifference: TooLong (pts={}): re-syncing", d.pts);
544                    self.inner.pts_state.lock().await.pts = d.pts;
545                    self.sync_pts_state().await?;
546                    return Ok(all_updates);
547                }
548            }
549        }
550    }
551
552    // Per-channel getChannelDifference
553
554    /// Fetch missed updates for a single channel.
555    pub async fn get_channel_difference(
556        &self,
557        channel_id: i64,
558    ) -> Result<Vec<update::Update>, InvocationError> {
559        let local_pts = self
560            .inner
561            .pts_state
562            .lock()
563            .await
564            .channel_pts
565            .get(&channel_id)
566            .copied()
567            .unwrap_or(0);
568
569        let access_hash = self
570            .inner
571            .peer_cache
572            .read()
573            .await
574            .channels
575            .get(&channel_id)
576            .copied()
577            .unwrap_or(0);
578
579        // No access hash in cache → we can't call getChannelDifference.
580        // Attempting GetChannels with access_hash=0 also returns CHANNEL_INVALID,
581        // so skip the call entirely and let the caller handle it.
582        if access_hash == 0 {
583            tracing::debug!(
584                "[layer] channel {channel_id}: access_hash not cached, \
585                 cannot call getChannelDifference: caller will remove from tracking"
586            );
587            return Err(InvocationError::Rpc(RpcError {
588                code: 400,
589                name: "CHANNEL_INVALID".into(),
590                value: None,
591            }));
592        }
593
594        tracing::debug!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
595
596        let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
597            channel_id,
598            access_hash,
599        });
600
601        // tDesktop ramp-up: limit=100 on first call, 1000 on subsequent ones.
602        // Bots always use the server-side maximum (100_000).
603        let diff_limit = if self.inner.is_bot.load(std::sync::atomic::Ordering::Relaxed) {
604            CHANNEL_DIFF_LIMIT_BOT
605        } else {
606            let call_count = self
607                .inner
608                .pts_state
609                .lock()
610                .await
611                .channel_diff_calls
612                .get(&channel_id)
613                .copied()
614                .unwrap_or(0);
615            if call_count == 0 { 100 } else { 1000 }
616        };
617
618        let req = tl::functions::updates::GetChannelDifference {
619            force: false,
620            channel,
621            filter: tl::enums::ChannelMessagesFilter::Empty,
622            pts: local_pts.max(1),
623            limit: diff_limit,
624        };
625
626        let body = match self.rpc_call_raw_pub(&req).await {
627            Ok(b) => {
628                // Successful call bump the per-channel counter so next call uses 1000.
629                self.inner
630                    .pts_state
631                    .lock()
632                    .await
633                    .channel_diff_calls
634                    .entry(channel_id)
635                    .and_modify(|c| *c = c.saturating_add(1))
636                    .or_insert(1);
637                b
638            }
639            Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
640                // treat as empty diff: retry next gap
641                tracing::debug!("[layer] PERSISTENT_TIMESTAMP_OUTDATED: skipping diff");
642                return Ok(vec![]);
643            }
644            Err(e) => return Err(e),
645        };
646        let body = crate::maybe_gz_decompress(body)?;
647        let mut cur = Cursor::from_slice(&body);
648        let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
649
650        let mut updates = Vec::new();
651
652        match diff {
653            tl::enums::updates::ChannelDifference::Empty(e) => {
654                tracing::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
655                self.inner
656                    .pts_state
657                    .lock()
658                    .await
659                    .advance_channel(channel_id, e.pts);
660            }
661            tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
662                tracing::debug!(
663                    "[layer] getChannelDifference: {} messages, {} updates",
664                    d.new_messages.len(),
665                    d.other_updates.len()
666                );
667                self.cache_users_slice_pub(&d.users).await;
668                self.cache_chats_slice_pub(&d.chats).await;
669                for msg in d.new_messages {
670                    updates.push(update::Update::NewMessage(
671                        update::IncomingMessage::from_raw(msg).with_client(self.clone()),
672                    ));
673                }
674                for upd in d.other_updates {
675                    updates.extend(update::from_single_update_pub(upd));
676                }
677                self.inner
678                    .pts_state
679                    .lock()
680                    .await
681                    .advance_channel(channel_id, d.pts);
682            }
683            tl::enums::updates::ChannelDifference::TooLong(d) => {
684                tracing::warn!(
685                    "[layer] getChannelDifference TooLong: replaying messages, resetting pts"
686                );
687                self.cache_users_slice_pub(&d.users).await;
688                self.cache_chats_slice_pub(&d.chats).await;
689                for msg in d.messages {
690                    updates.push(update::Update::NewMessage(
691                        update::IncomingMessage::from_raw(msg).with_client(self.clone()),
692                    ));
693                }
694                self.inner
695                    .pts_state
696                    .lock()
697                    .await
698                    .advance_channel(channel_id, 0);
699            }
700        }
701
702        Ok(updates)
703    }
704
705    // Sync from server
706
707    pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
708        let body = self
709            .rpc_call_raw_pub(&tl::functions::updates::GetState {})
710            .await?;
711        let mut cur = Cursor::from_slice(&body);
712        // tDesktop: if the response body has an unknown constructor (e.g. a
713        // misrouted service frame), handler.done() returns false → differenceFail()
714        // → log + retry timer.  We mirror that: treat an unrecognised constructor
715        // as a soft failure and return Ok(()) so the caller can retry later.
716        let s = match tl::enums::updates::State::deserialize(&mut cur) {
717            Ok(tl::enums::updates::State::State(s)) => s,
718            Err(e) => {
719                let cid = if body.len() >= 4 {
720                    u32::from_le_bytes(body[..4].try_into().unwrap())
721                } else {
722                    0
723                };
724                tracing::debug!(
725                    "[layer] GetState: unrecognised response constructor \
726                     {cid:#010x}, treating as soft failure and retrying ({})",
727                    e
728                );
729                return Ok(());
730            }
731        };
732        let mut state = self.inner.pts_state.lock().await;
733        state.pts = s.pts;
734        state.qts = s.qts;
735        state.date = s.date;
736        state.seq = s.seq;
737        state.touch();
738        tracing::debug!(
739            "[layer] pts synced: pts={}, qts={}, seq={}",
740            s.pts,
741            s.qts,
742            s.seq
743        );
744        Ok(())
745    }
746    /// Check global pts, buffer during possible-gap window, fetch diff if real gap.
747    ///
748    /// When a global getDifference is already in-flight (`getting_global_diff == true`),
749    /// updates are **force-dispatched** immediately without pts tracking.
750    /// This prevents the cascade freeze that buffering caused:
751    ///   1. getDiff runs; flight-buffered updates pile up in `possible_gap`.
752    ///   2. getDiff returns; `gap_tick` sees `has_global()=true` → another getDiff.
753    ///   3. Each getDiff spawns another → bot freezes under a burst of messages.
754    pub async fn check_and_fill_gap(
755        &self,
756        new_pts: i32,
757        pts_count: i32,
758        upd: Option<update::Update>,
759    ) -> Result<Vec<update::Update>, InvocationError> {
760        // getDiff in flight: force updates through without pts tracking.
761        //
762        // Force-dispatch: socket updates are sent through
763        // when getting_diff_for contains the key; no buffering, no pts check.
764        // Buffering caused a cascade of getDiff calls and a bot freeze under bursts.
765        // Force-dispatch means these may duplicate what getDiff returns (same pts
766        // range), which is acceptable: Telegram's spec explicitly states that socket
767        // updates received during getDiff "should also have been retrieved through
768        // getDifference". Application-layer deduplication by message_id handles doubles.
769        // pts is NOT advanced here; getDiff sets it authoritatively when it returns.
770        if self.inner.pts_state.lock().await.getting_global_diff {
771            tracing::debug!("[layer] global diff in flight: force-applying pts={new_pts}");
772            return Ok(upd.into_iter().collect());
773        }
774
775        let result = self
776            .inner
777            .pts_state
778            .lock()
779            .await
780            .check_pts(new_pts, pts_count);
781        match result {
782            PtsCheckResult::Ok => {
783                // Advance pts and dispatch only this update.
784                //
785                // Do NOT blindly drain possible_gap here.
786                //
787                // Old behaviour: drain all buffered updates and return them together
788                // with the Ok update.  This caused a second freeze:
789                //
790                //   1. pts=1021.  Burst arrives: 1024-1030 all gap → buffered.
791                //   2. Update 1022 arrives → Ok → drain dispatches 1024-1030.
792                //   3. pts advances only to 1022 (the Ok value), NOT to 1030.
793                //   4. Bot sends replies → updateShortSentMessage pts=1031 →
794                //      check_and_fill_gap: expected=1023, got=1031 → GAP.
795                //   5. Cascade getDiff → duplicates → flood-wait → freeze.
796                //
797                // Grammers avoids this by re-checking each buffered update in
798                // order and advancing pts for each one (process_updates inner loop
799                // over entry.possible_gap).  Layer's Update enum carries no pts
800                // metadata, so we cannot replicate that ordered sequence check here.
801                //
802                // Correct equivalent: leave possible_gap alone.  The buffered
803                // updates will be recovered by gap_tick → getDiff(new_pts), which
804                // drains possible_gap into pre_diff, lets the server fill the
805                // gap, and advances pts to the true server state; no stale pts,
806                // no secondary cascade, no duplicates.
807                self.inner.pts_state.lock().await.advance(new_pts);
808                Ok(upd.into_iter().collect())
809            }
810            PtsCheckResult::Gap { expected, got } => {
811                // Buffer the update; start the deadline timer regardless.
812                //
813                // Bug fix (touch_global_timer): when upd=None (e.g. the gap is
814                // triggered by an updateShortSentMessage RPC response), nothing was
815                // ever pushed to possible_gap.global, so global stayed None.
816                // global_deadline_elapsed() returned false forever, gap_tick never
817                // saw has_global()=true, and the gap was never resolved unless a
818                // subsequent user message arrived.  touch_global_timer() starts the
819                // 1-second deadline clock even without a buffered update.
820                {
821                    let mut gap = self.inner.possible_gap.lock().await;
822                    if let Some(u) = upd {
823                        gap.push_global(u);
824                    } else {
825                        gap.touch_global_timer();
826                    }
827                }
828                let deadline_elapsed = self
829                    .inner
830                    .possible_gap
831                    .lock()
832                    .await
833                    .global_deadline_elapsed();
834                if deadline_elapsed {
835                    tracing::warn!(
836                        "[layer] global pts gap: expected {expected}, got {got}: getDifference"
837                    );
838                    // get_difference() is now atomic (check-and-set) and drains the
839                    // possible_gap buffer internally on success, so callers must NOT
840                    // drain before calling or splice the old buffer onto the results.
841                    // Doing so caused every gap update to be dispatched twice, which
842                    // triggered FLOOD_WAIT, blocked the handler, and froze the bot.
843                    self.get_difference().await
844                } else {
845                    tracing::debug!(
846                        "[layer] global pts gap: expected {expected}, got {got}: buffering (possible gap)"
847                    );
848                    Ok(vec![])
849                }
850            }
851            PtsCheckResult::Duplicate => {
852                tracing::debug!("[layer] global pts duplicate, discarding");
853                Ok(vec![])
854            }
855        }
856    }
857
858    /// Check qts (secret chat updates) and fill gap if needed.
859    pub async fn check_and_fill_qts_gap(
860        &self,
861        new_qts: i32,
862        qts_count: i32,
863    ) -> Result<Vec<update::Update>, InvocationError> {
864        let result = self
865            .inner
866            .pts_state
867            .lock()
868            .await
869            .check_qts(new_qts, qts_count);
870        match result {
871            PtsCheckResult::Ok => {
872                self.inner.pts_state.lock().await.advance_qts(new_qts);
873                Ok(vec![])
874            }
875            PtsCheckResult::Gap { expected, got } => {
876                tracing::warn!("[layer] qts gap: expected {expected}, got {got}: getDifference");
877                self.get_difference().await
878            }
879            PtsCheckResult::Duplicate => Ok(vec![]),
880        }
881    }
882
883    /// Check top-level seq and fill gap if needed.
884    pub async fn check_and_fill_seq_gap(
885        &self,
886        new_seq: i32,
887        seq_start: i32,
888    ) -> Result<Vec<update::Update>, InvocationError> {
889        let result = self
890            .inner
891            .pts_state
892            .lock()
893            .await
894            .check_seq(new_seq, seq_start);
895        match result {
896            PtsCheckResult::Ok => {
897                self.inner.pts_state.lock().await.advance_seq(new_seq);
898                Ok(vec![])
899            }
900            PtsCheckResult::Gap { expected, got } => {
901                tracing::warn!("[layer] seq gap: expected {expected}, got {got}: getDifference");
902                self.get_difference().await
903            }
904            PtsCheckResult::Duplicate => Ok(vec![]),
905        }
906    }
907
908    /// Check a per-channel pts, fetch getChannelDifference if there is a gap.
909    pub async fn check_and_fill_channel_gap(
910        &self,
911        channel_id: i64,
912        new_pts: i32,
913        pts_count: i32,
914        upd: Option<update::Update>,
915    ) -> Result<Vec<update::Update>, InvocationError> {
916        // if a diff is already in flight for this channel, skip: prevents
917        // 1 gap from spawning N concurrent getChannelDifference tasks.
918        if self
919            .inner
920            .pts_state
921            .lock()
922            .await
923            .getting_diff_for
924            .contains(&channel_id)
925        {
926            tracing::debug!("[layer] channel {channel_id} diff already in flight, skipping");
927            if let Some(u) = upd {
928                self.inner
929                    .possible_gap
930                    .lock()
931                    .await
932                    .push_channel(channel_id, u);
933            }
934            return Ok(vec![]);
935        }
936
937        let result = self
938            .inner
939            .pts_state
940            .lock()
941            .await
942            .check_channel_pts(channel_id, new_pts, pts_count);
943        match result {
944            PtsCheckResult::Ok => {
945                let mut buffered = self
946                    .inner
947                    .possible_gap
948                    .lock()
949                    .await
950                    .drain_channel(channel_id);
951                self.inner
952                    .pts_state
953                    .lock()
954                    .await
955                    .advance_channel(channel_id, new_pts);
956                if let Some(u) = upd {
957                    buffered.push(u);
958                }
959                Ok(buffered)
960            }
961            PtsCheckResult::Gap { expected, got } => {
962                if let Some(u) = upd {
963                    self.inner
964                        .possible_gap
965                        .lock()
966                        .await
967                        .push_channel(channel_id, u);
968                }
969                let deadline_elapsed = self
970                    .inner
971                    .possible_gap
972                    .lock()
973                    .await
974                    .channel_deadline_elapsed(channel_id);
975                if deadline_elapsed {
976                    tracing::warn!(
977                        "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: getChannelDifference"
978                    );
979                    // mark this channel as having a diff in flight.
980                    self.inner
981                        .pts_state
982                        .lock()
983                        .await
984                        .getting_diff_for
985                        .insert(channel_id);
986                    let buffered = self
987                        .inner
988                        .possible_gap
989                        .lock()
990                        .await
991                        .drain_channel(channel_id);
992                    match self.get_channel_difference(channel_id).await {
993                        Ok(mut diff_updates) => {
994                            // diff complete, allow future gaps to be handled.
995                            self.inner
996                                .pts_state
997                                .lock()
998                                .await
999                                .getting_diff_for
1000                                .remove(&channel_id);
1001                            diff_updates.splice(0..0, buffered);
1002                            Ok(diff_updates)
1003                        }
1004                        // Permanent access errors: remove the channel from pts tracking
1005                        // entirely (. The next update for this
1006                        // channel will have local=0 → PtsCheckResult::Ok, advancing pts
1007                        // without any gap fill. This breaks the infinite gap→CHANNEL_INVALID
1008                        // loop that happened when advance_channel kept the stale pts alive.
1009                        //
1010                        // Common causes:
1011                        // - access_hash not in peer cache (update arrived via updateShort
1012                        // which carries no chats list)
1013                        // - bot was kicked / channel deleted
1014                        Err(InvocationError::Rpc(ref e))
1015                            if e.name == "CHANNEL_INVALID"
1016                                || e.name == "CHANNEL_PRIVATE"
1017                                || e.name == "CHANNEL_NOT_MODIFIED" =>
1018                        {
1019                            tracing::debug!(
1020                                "[layer] channel {channel_id}: {}: removing from pts tracking \
1021                                 (next update treated as first-seen, no gap fill)",
1022                                e.name
1023                            );
1024                            {
1025                                let mut s = self.inner.pts_state.lock().await;
1026                                s.getting_diff_for.remove(&channel_id);
1027                                s.channel_pts.remove(&channel_id); // ←  fix: delete, not advance
1028                            }
1029                            Ok(buffered)
1030                        }
1031                        Err(InvocationError::Deserialize(ref msg)) => {
1032                            // Unrecognised constructor or parse failure: treat same as
1033                            // CHANNEL_INVALID: remove from tracking so we don't loop.
1034                            tracing::debug!(
1035                                "[layer] channel {channel_id}: deserialize error ({msg}): \
1036                                 removing from pts tracking"
1037                            );
1038                            {
1039                                let mut s = self.inner.pts_state.lock().await;
1040                                s.getting_diff_for.remove(&channel_id);
1041                                s.channel_pts.remove(&channel_id);
1042                            }
1043                            Ok(buffered)
1044                        }
1045                        Err(e) => {
1046                            // also clear on unexpected errors so we don't get stuck.
1047                            self.inner
1048                                .pts_state
1049                                .lock()
1050                                .await
1051                                .getting_diff_for
1052                                .remove(&channel_id);
1053                            Err(e)
1054                        }
1055                    }
1056                } else {
1057                    tracing::debug!(
1058                        "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: buffering"
1059                    );
1060                    Ok(vec![])
1061                }
1062            }
1063            PtsCheckResult::Duplicate => {
1064                tracing::debug!("[layer] channel {channel_id} pts duplicate, discarding");
1065                Ok(vec![])
1066            }
1067        }
1068    }
1069
1070    /// Called periodically (e.g. from keepalive) to fire getDifference
1071    /// if no update has been received for > 15 minutes.
1072    ///
1073    /// also drives per-entry possible-gap deadlines independently of
1074    /// incoming updates.  Previously the POSSIBLE_GAP_DEADLINE_MS window was
1075    /// only evaluated when a new incoming update called check_and_fill_gap
1076    /// meaning a quiet channel with a real gap would never fire getDifference
1077    /// until another update arrived.  This
1078    /// which scans all LiveEntry.effective_deadline() on every keepalive tick.
1079    pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
1080        // Stuck-diff watchdog: if getting_global_diff has been true for more than
1081        // 30 s the in-flight getDifference RPC is assumed hung (e.g. half-open TCP
1082        // that the OS keepalive hasn't killed yet).  Reset the guard so the next
1083        // gap_tick cycle can issue a fresh getDifference.  The 30-second timeout
1084        // in get_difference() will concurrently return an error and also clear the
1085        // flag; this watchdog is a belt-and-suspenders safety net for edge cases
1086        // where that timeout itself is somehow delayed.
1087        {
1088            let stuck = {
1089                let s = self.inner.pts_state.lock().await;
1090                s.getting_global_diff
1091                    && s.getting_global_diff_since
1092                        .map(|t| t.elapsed().as_secs() > 30)
1093                        .unwrap_or(false)
1094            };
1095            if stuck {
1096                tracing::warn!(
1097                    "[layer] getDifference in-flight for >30 s: \
1098                     resetting guard so gap_tick can retry"
1099                );
1100                let mut s = self.inner.pts_state.lock().await;
1101                s.getting_global_diff = false;
1102                s.getting_global_diff_since = None;
1103            }
1104        }
1105
1106        // existing 5-minute global timeout
1107        let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
1108        if exceeded {
1109            tracing::info!("[layer] update deadline exceeded: fetching getDifference");
1110            match self.get_difference().await {
1111                Ok(updates) => {
1112                    for u in updates {
1113                        if self.inner.update_tx.try_send(u).is_err() {
1114                            tracing::warn!("[layer] update channel full: dropping diff update");
1115                        }
1116                    }
1117                }
1118                Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1119                    // 401: gap buffer already cleared inside get_difference().
1120                    // gap_tick will not re-fire. Supervisor handles reconnect.
1121                    tracing::warn!(
1122                        "[layer] deadline getDifference AUTH_KEY_UNREGISTERED: session dead"
1123                    );
1124                }
1125                // tDesktop: differenceFail() just logs + sets a retry timer.
1126                // It never propagates the error outward.  Mirror that here:
1127                // Deserialize errors (unknown constructor, e.g. 0x00000004) are
1128                // transient - the server sent a misrouted frame or a service packet.
1129                // Propagating them causes the caller's loop to log noise every second.
1130                // IO / RPC errors also stay local: the reader loop drives reconnects
1131                // independently via the socket, not via check_update_deadline's return.
1132                Err(e) => {
1133                    tracing::warn!("[layer] deadline getDifference failed (non-fatal): {e}");
1134                }
1135            }
1136        }
1137
1138        // drive global possible-gap deadline
1139        // If the possible-gap window has expired but no new update has arrived
1140        // to trigger check_and_fill_gap, fire getDifference from here.
1141        {
1142            let gap_expired = self
1143                .inner
1144                .possible_gap
1145                .lock()
1146                .await
1147                .global_deadline_elapsed();
1148            // Note: get_difference() is now atomic (check-and-set), so the
1149            // `already` guard is advisory only; get_difference() will bail
1150            // safely if another call is already in flight.
1151            if gap_expired {
1152                tracing::debug!("[layer] B3 global possible-gap deadline expired: getDifference");
1153                // get_difference() snapshots and drains the pre-existing buffer at its
1154                // start (before the RPC), so updates that arrive DURING the RPC flight
1155                // remain in possible_gap for the next cycle.  Never drain here.
1156                match self.get_difference().await {
1157                    Ok(updates) => {
1158                        for u in updates {
1159                            if self.inner.update_tx.try_send(u).is_err() {
1160                                tracing::warn!("[layer] update channel full: dropping gap update");
1161                            }
1162                        }
1163                    }
1164                    Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1165                        // 401: get_difference() cleared the gap buffer, so gap_tick
1166                        // will not re-fire. Supervisor handles reconnect.
1167                        tracing::warn!(
1168                            "[layer] B3 global gap diff AUTH_KEY_UNREGISTERED: \
1169                             session dead, gap buffer cleared"
1170                        );
1171                    }
1172                    // tDesktop differenceFail: log and let the retry timer handle it.
1173                    // Never propagate - the reader drives reconnects via the socket.
1174                    Err(e) => {
1175                        tracing::warn!("[layer] B3 global gap diff failed (non-fatal): {e}");
1176                    }
1177                }
1178            }
1179        }
1180
1181        // drive per-channel possible-gap deadlines
1182        // Collect expired channel IDs up-front to avoid holding the lock across awaits.
1183        let expired_channels: Vec<i64> = {
1184            let gap = self.inner.possible_gap.lock().await;
1185            gap.channel
1186                .keys()
1187                .copied()
1188                .filter(|&id| gap.channel_deadline_elapsed(id))
1189                .collect()
1190        };
1191        for channel_id in expired_channels {
1192            let already = self
1193                .inner
1194                .pts_state
1195                .lock()
1196                .await
1197                .getting_diff_for
1198                .contains(&channel_id);
1199            if already {
1200                continue;
1201            }
1202            tracing::debug!(
1203                "[layer] B3 channel {channel_id} possible-gap deadline expired: getChannelDifference"
1204            );
1205            // Mark in-flight before spawning so a racing incoming update can't
1206            // also spawn a diff for the same channel.
1207            self.inner
1208                .pts_state
1209                .lock()
1210                .await
1211                .getting_diff_for
1212                .insert(channel_id);
1213            let buffered = self
1214                .inner
1215                .possible_gap
1216                .lock()
1217                .await
1218                .drain_channel(channel_id);
1219            let c = self.clone();
1220            let utx = self.inner.update_tx.clone();
1221            tokio::spawn(async move {
1222                match c.get_channel_difference(channel_id).await {
1223                    Ok(mut updates) => {
1224                        c.inner
1225                            .pts_state
1226                            .lock()
1227                            .await
1228                            .getting_diff_for
1229                            .remove(&channel_id);
1230                        updates.splice(0..0, buffered);
1231                        for u in updates {
1232                            if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1233                                tracing::warn!(
1234                                    "[layer] update channel full: dropping ch gap update"
1235                                );
1236                            }
1237                        }
1238                    }
1239                    Err(e) => {
1240                        c.inner
1241                            .pts_state
1242                            .lock()
1243                            .await
1244                            .getting_diff_for
1245                            .remove(&channel_id);
1246                        // Permanent errors (CHANNEL_INVALID etc.): updates are
1247                        // unrecoverable, drop them. Transient errors: restore
1248                        // the buffer so the next B3 cycle can retry.
1249                        let permanent = matches!(&e,
1250                            InvocationError::Rpc(r)
1251                                if r.code == 401
1252                                    || r.name == "CHANNEL_INVALID"
1253                                    || r.name == "CHANNEL_PRIVATE"
1254                                    || r.name == "CHANNEL_NOT_MODIFIED"
1255                        ) || matches!(&e, InvocationError::Deserialize(_));
1256                        if permanent {
1257                            tracing::warn!(
1258                                "[layer] B3 channel {channel_id} gap diff failed (permanent): {e}"
1259                            );
1260                        } else {
1261                            tracing::warn!(
1262                                "[layer] B3 channel {channel_id} gap diff failed (transient): {e}"
1263                            );
1264                            let mut gap = c.inner.possible_gap.lock().await;
1265                            for u in buffered {
1266                                gap.push_channel(channel_id, u);
1267                            }
1268                        }
1269                    }
1270                }
1271            });
1272        }
1273
1274        Ok(())
1275    }
1276}