layer_client/pts.rs
1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// NOTE:
5// The "Layer" project is no longer maintained or supported.
6// Its original purpose for personal SDK/APK experimentation and learning
7// has been fulfilled.
8//
9// Please use Ferogram instead:
10// https://github.com/ankit-chaubey/ferogram
11// Ferogram will receive future updates and development, although progress
12// may be slower.
13//
14// Ferogram is an async Telegram MTProto client library written in Rust.
15// Its implementation follows the behaviour of the official Telegram clients,
16// particularly Telegram Desktop and TDLib, and aims to provide a clean and
17// modern async interface for building Telegram clients and tools.
18
19//! Update gap detection and recovery.
20//!
21//! Tracks `pts` / `qts` / `seq` / `date` plus per-channel pts, and
22//! fills gaps via `updates.getDifference` (global) and
23//! `updates.getChannelDifference` (per-channel).
24//!
25//! ## What "gap" means
26//! Telegram guarantees updates arrive in order within a pts counter.
27//! If `new_pts != local_pts + pts_count` there is a gap and we must
28//! ask the server for the missed updates before processing this one.
29
30use std::collections::{HashMap, HashSet};
31use std::time::Instant;
32
33use layer_tl_types as tl;
34use layer_tl_types::{Cursor, Deserializable};
35
36use crate::{Client, InvocationError, RpcError, attach_client_to_update, update};
37
38/// How long to wait before declaring a pts jump a real gap (ms).
39const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
40
41/// Bots are allowed a much larger diff window (Telegram server-side limit).
42const CHANNEL_DIFF_LIMIT_BOT: i32 = 100_000;
43
44/// Buffers updates received during a possible-gap window so we don't fire
45/// getDifference on every slightly out-of-order update.
46#[derive(Default)]
47pub struct PossibleGapBuffer {
48 /// channel_id → (buffered_updates, window_start)
49 channel: HashMap<i64, (Vec<update::Update>, Instant)>,
50 /// Global buffered updates (non-channel pts gaps)
51 global: Option<(Vec<update::Update>, Instant)>,
52}
53
54impl PossibleGapBuffer {
55 pub fn new() -> Self {
56 Self::default()
57 }
58
59 /// Buffer a global update during a possible-gap window.
60 pub fn push_global(&mut self, upd: update::Update) {
61 let entry = self
62 .global
63 .get_or_insert_with(|| (Vec::new(), Instant::now()));
64 entry.0.push(upd);
65 }
66
67 /// Buffer a channel update during a possible-gap window.
68 pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
69 let entry = self
70 .channel
71 .entry(channel_id)
72 .or_insert_with(|| (Vec::new(), Instant::now()));
73 entry.0.push(upd);
74 }
75
76 /// True if the global possible-gap deadline has elapsed.
77 pub fn global_deadline_elapsed(&self) -> bool {
78 self.global
79 .as_ref()
80 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
81 .unwrap_or(false)
82 }
83
84 /// True if a channel's possible-gap deadline has elapsed.
85 pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
86 self.channel
87 .get(&channel_id)
88 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
89 .unwrap_or(false)
90 }
91
92 /// True if the global buffer has any pending updates.
93 pub fn has_global(&self) -> bool {
94 self.global.is_some()
95 }
96
97 /// True if a channel buffer has pending updates.
98 pub fn has_channel(&self, channel_id: i64) -> bool {
99 self.channel.contains_key(&channel_id)
100 }
101
102 /// Start the global deadline timer without buffering an update.
103 ///
104 /// Called when a gap is detected but the triggering update carries no
105 /// high-level `Update` value (e.g. `updateShortSentMessage` with `upd=None`).
106 /// Without this, `global` stays `None` → `global_deadline_elapsed()` always
107 /// returns `false` → `gap_tick` never fires getDifference for such gaps.
108 pub fn touch_global_timer(&mut self) {
109 self.global
110 .get_or_insert_with(|| (Vec::new(), Instant::now()));
111 }
112
113 /// Drain global buffered updates.
114 pub fn drain_global(&mut self) -> Vec<update::Update> {
115 self.global.take().map(|(v, _)| v).unwrap_or_default()
116 }
117
118 /// Drain channel buffered updates.
119 pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
120 self.channel
121 .remove(&channel_id)
122 .map(|(v, _)| v)
123 .unwrap_or_default()
124 }
125}
126
127// PtsState
128
129/// Full MTProto sequence-number state, including per-channel counters.
130///
131/// All fields are `pub` so that `connect()` can restore them from the
132/// persisted session without going through an artificial constructor.
133#[derive(Debug, Clone, Default)]
134pub struct PtsState {
135 /// Main pts counter (messages, non-channel updates).
136 pub pts: i32,
137 /// Secondary counter for secret-chat updates.
138 pub qts: i32,
139 /// Date of the last received update (Unix timestamp).
140 pub date: i32,
141 /// Combined-container sequence number.
142 pub seq: i32,
143 /// Per-channel pts counters. `channel_id → pts`.
144 pub channel_pts: HashMap<i64, i32>,
145 /// How many times getChannelDifference has been called per channel.
146 /// tDesktop starts at limit=100, then raises to 1000 after the first
147 /// successful response. We track call count to implement the same ramp-up.
148 pub channel_diff_calls: HashMap<i64, u32>,
149 /// Timestamp of last received update for deadline-based gap detection.
150 pub last_update_at: Option<Instant>,
151 /// Channels currently awaiting a getChannelDifference response.
152 /// If a channel is in this set, no new gap-fill task is spawned for it.
153 pub getting_diff_for: HashSet<i64>,
154 /// Guard against concurrent global getDifference calls.
155 /// Without this, two simultaneous gap detections both spawn get_difference(),
156 /// which double-processes updates and corrupts pts state.
157 pub getting_global_diff: bool,
158 /// When getting_global_diff was set to true. Used by the stuck-diff watchdog
159 /// in check_update_deadline: if the flag has been set for >30 s the RPC is
160 /// assumed hung and the guard is reset so the next gap_tick can retry.
161 pub getting_global_diff_since: Option<Instant>,
162}
163
164impl PtsState {
165 pub fn from_server_state(s: &tl::types::updates::State) -> Self {
166 Self {
167 pts: s.pts,
168 qts: s.qts,
169 date: s.date,
170 seq: s.seq,
171 channel_pts: HashMap::new(),
172 channel_diff_calls: HashMap::new(),
173 last_update_at: Some(Instant::now()),
174 getting_diff_for: HashSet::new(),
175 getting_global_diff: false,
176 getting_global_diff_since: None,
177 }
178 }
179
180 /// Record that an update was received now (resets the deadline timer).
181 pub fn touch(&mut self) {
182 self.last_update_at = Some(Instant::now());
183 }
184
185 /// Returns true if no update has been received for > 15 minutes.
186 pub fn deadline_exceeded(&self) -> bool {
187 self.last_update_at
188 .as_ref()
189 .map(|t| t.elapsed().as_secs() > 15 * 60)
190 .unwrap_or(false)
191 }
192
193 /// Check whether `new_pts` is in order given `pts_count` new updates.
194 pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
195 let expected = self.pts + pts_count;
196 if new_pts == expected {
197 PtsCheckResult::Ok
198 } else if new_pts > expected {
199 PtsCheckResult::Gap {
200 expected,
201 got: new_pts,
202 }
203 } else {
204 PtsCheckResult::Duplicate
205 }
206 }
207
208 /// Check a qts value (secret chat updates).
209 pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
210 let expected = self.qts + qts_count;
211 if new_qts == expected {
212 PtsCheckResult::Ok
213 } else if new_qts > expected {
214 PtsCheckResult::Gap {
215 expected,
216 got: new_qts,
217 }
218 } else {
219 PtsCheckResult::Duplicate
220 }
221 }
222
223 /// Check top-level seq for UpdatesCombined containers.
224 pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
225 if self.seq == 0 {
226 return PtsCheckResult::Ok;
227 } // uninitialised: accept
228 let expected = self.seq + 1;
229 if seq_start == expected {
230 PtsCheckResult::Ok
231 } else if seq_start > expected {
232 PtsCheckResult::Gap {
233 expected,
234 got: seq_start,
235 }
236 } else {
237 PtsCheckResult::Duplicate
238 }
239 }
240
241 /// Check a per-channel pts value.
242 pub fn check_channel_pts(
243 &self,
244 channel_id: i64,
245 new_pts: i32,
246 pts_count: i32,
247 ) -> PtsCheckResult {
248 let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
249 if local == 0 {
250 return PtsCheckResult::Ok;
251 }
252 let expected = local + pts_count;
253 if new_pts == expected {
254 PtsCheckResult::Ok
255 } else if new_pts > expected {
256 PtsCheckResult::Gap {
257 expected,
258 got: new_pts,
259 }
260 } else {
261 PtsCheckResult::Duplicate
262 }
263 }
264
265 /// Advance the global pts.
266 pub fn advance(&mut self, new_pts: i32) {
267 if new_pts > self.pts {
268 self.pts = new_pts;
269 }
270 self.touch();
271 }
272
273 /// Advance the qts.
274 pub fn advance_qts(&mut self, new_qts: i32) {
275 if new_qts > self.qts {
276 self.qts = new_qts;
277 }
278 self.touch();
279 }
280
281 /// Advance seq.
282 pub fn advance_seq(&mut self, new_seq: i32) {
283 if new_seq > self.seq {
284 self.seq = new_seq;
285 }
286 }
287
288 /// Advance a per-channel pts.
289 pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
290 let entry = self.channel_pts.entry(channel_id).or_insert(0);
291 if new_pts > *entry {
292 *entry = new_pts;
293 }
294 self.touch();
295 }
296}
297
298#[derive(Debug, PartialEq, Eq)]
299pub enum PtsCheckResult {
300 Ok,
301 Gap { expected: i32, got: i32 },
302 Duplicate,
303}
304
305// Client methods
306
307impl Client {
308 // Global getDifference
309
310 /// Fetch and replay any updates missed since the persisted pts.
311 ///
312 /// loops on `Difference::Slice` (partial response) until the server
313 /// returns a final `Difference` or `Empty`
314 /// never dropping a partial batch. Previous code returned after one slice,
315 /// silently losing all updates in subsequent slices.
316 pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
317 // Atomically claim the in-flight slot.
318 //
319 // TOCTOU fix: the old code set getting_global_diff=true inside get_difference
320 // but the external guard in check_and_fill_gap read the flag in a SEPARATE lock
321 // acquisition. With multi-threaded Tokio, N tasks could all read false, all
322 // pass the external check, then all race into this function and all set the flag
323 // to true. Each concurrent call resets pts_state to the server state it received;
324 // the last STALE write rolls pts back below where it actually is, which immediately
325 // triggers a new gap → another burst of concurrent getDifference calls → cascade.
326 //
327 // Fix: check-and-set inside a SINGLE lock acquisition. Only the first caller
328 // proceeds; all others see true and return Ok(vec![]) immediately.
329 {
330 let mut s = self.inner.pts_state.lock().await;
331 if s.getting_global_diff {
332 return Ok(vec![]);
333 }
334 s.getting_global_diff = true;
335 s.getting_global_diff_since = Some(Instant::now());
336 }
337
338 // Drain the initial-gap buffer before the RPC.
339 //
340 // possible_gap.global contains updates buffered during the possible-gap window
341 // (before we decided to call getDiff). The server response covers exactly
342 // these pts values → discard the snapshot on success.
343 // On RPC error, restore them so the next gap_tick can retry.
344 //
345 // Note: updates arriving DURING the RPC flight are now force-dispatched by
346 // check_and_fill_gap and never accumulate in possible_gap,
347 // so there is nothing extra to drain after the call returns.
348 let pre_diff = self.inner.possible_gap.lock().await.drain_global();
349
350 // Wrap the RPC in a hard 30-second timeout so a hung TCP connection
351 // (half-open socket, unresponsive DC) cannot hold getting_global_diff=true
352 // forever and freeze the bot indefinitely.
353 let result = tokio::time::timeout(
354 std::time::Duration::from_secs(30),
355 self.get_difference_inner(),
356 )
357 .await
358 .unwrap_or_else(|_| {
359 tracing::warn!("[layer] getDifference RPC timed out after 30 s: will retry");
360 Err(InvocationError::Io(std::io::Error::new(
361 std::io::ErrorKind::TimedOut,
362 "getDifference timed out",
363 )))
364 });
365
366 // Always clear the guard, even on error.
367 {
368 let mut s = self.inner.pts_state.lock().await;
369 s.getting_global_diff = false;
370 s.getting_global_diff_since = None;
371 }
372
373 match &result {
374 Ok(_) => {
375 // pre_diff is covered by the server response; discard it.
376 // (Flight-time updates are force-dispatched, not buffered into possible_gap.)
377 }
378 Err(e) => {
379 let mut gap = self.inner.possible_gap.lock().await;
380 if matches!(e, InvocationError::Rpc(r) if r.code == 401) {
381 // 401: do not restore the gap buffer. push_global inserts a
382 // fresh Instant::now() timestamp, making global_deadline_elapsed()
383 // return true immediately on the next gap_tick cycle, which fires
384 // get_difference() again and loops. Drop it; reconnect will
385 // re-sync state via sync_pts_state / getDifference.
386 drop(pre_diff);
387 } else {
388 // Transient error: restore so the next gap_tick can retry.
389 for u in pre_diff {
390 gap.push_global(u);
391 }
392 }
393 }
394 }
395
396 result
397 }
398
399 async fn get_difference_inner(&self) -> Result<Vec<update::Update>, InvocationError> {
400 use layer_tl_types::{Cursor, Deserializable};
401
402 let mut all_updates: Vec<update::Update> = Vec::new();
403
404 // loop until the server sends a final (non-Slice) response.
405 loop {
406 let (pts, qts, date) = {
407 let s = self.inner.pts_state.lock().await;
408 (s.pts, s.qts, s.date)
409 };
410
411 if pts == 0 {
412 self.sync_pts_state().await?;
413 return Ok(all_updates);
414 }
415
416 tracing::debug!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
417
418 let req = tl::functions::updates::GetDifference {
419 pts,
420 pts_limit: None,
421 pts_total_limit: None,
422 date,
423 qts,
424 qts_limit: None,
425 };
426
427 let body = self.rpc_call_raw_pub(&req).await?;
428 let body = crate::maybe_gz_decompress(body)?;
429 let mut cur = Cursor::from_slice(&body);
430 // tDesktop mirrors this: if the response body has an unknown constructor
431 // (e.g. a service frame misrouted here during a race), handler.done()
432 // returns false and the request is silently retired. We do the same:
433 // log at debug level and return whatever updates accumulated so far.
434 let diff = match tl::enums::updates::Difference::deserialize(&mut cur) {
435 Ok(d) => d,
436 Err(e) => {
437 let cid = if body.len() >= 4 {
438 u32::from_le_bytes(body[..4].try_into().unwrap())
439 } else {
440 0
441 };
442 tracing::debug!(
443 "[layer] getDifference: unrecognised response constructor \
444 {cid:#010x}, dropping ({})",
445 e
446 );
447 return Ok(all_updates);
448 }
449 };
450
451 match diff {
452 tl::enums::updates::Difference::Empty(e) => {
453 let mut s = self.inner.pts_state.lock().await;
454 s.date = e.date;
455 s.seq = e.seq;
456 s.touch();
457 tracing::debug!("[layer] getDifference: empty (seq={})", e.seq);
458 return Ok(all_updates);
459 }
460
461 tl::enums::updates::Difference::Difference(d) => {
462 tracing::debug!(
463 "[layer] getDifference: {} messages, {} updates (final)",
464 d.new_messages.len(),
465 d.other_updates.len()
466 );
467 self.cache_users_slice_pub(&d.users).await;
468 self.cache_chats_slice_pub(&d.chats).await;
469 for msg in d.new_messages {
470 all_updates.push(update::Update::NewMessage(
471 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
472 ));
473 }
474 for upd in d.other_updates {
475 all_updates.extend(update::from_single_update_pub(upd));
476 }
477 let tl::enums::updates::State::State(ns) = d.state;
478 let saved_channel_pts = {
479 let s = self.inner.pts_state.lock().await;
480 s.channel_pts.clone()
481 };
482 let mut new_state = PtsState::from_server_state(&ns);
483 // Preserve per-channel pts across the global reset.
484 for (cid, cpts) in saved_channel_pts {
485 new_state.channel_pts.entry(cid).or_insert(cpts);
486 }
487 // Preserve in-flight sets: we clear getting_global_diff ourselves.
488 new_state.getting_global_diff = true; // will be cleared by caller
489 {
490 let mut s = self.inner.pts_state.lock().await;
491 let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
492 let since = s.getting_global_diff_since; // preserve watchdog timestamp
493 *s = new_state;
494 s.getting_diff_for = getting_diff_for;
495 s.getting_global_diff_since = since;
496 }
497 // Final response: stop looping.
498 return Ok(all_updates);
499 }
500
501 tl::enums::updates::Difference::Slice(d) => {
502 // server has more data: apply intermediate_state and
503 // continue looping. Old code returned here, losing all updates
504 // in subsequent slices.
505 tracing::debug!(
506 "[layer] getDifference slice: {} messages, {} updates: continuing",
507 d.new_messages.len(),
508 d.other_updates.len()
509 );
510 self.cache_users_slice_pub(&d.users).await;
511 self.cache_chats_slice_pub(&d.chats).await;
512 for msg in d.new_messages {
513 all_updates.push(update::Update::NewMessage(
514 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
515 ));
516 }
517 for upd in d.other_updates {
518 all_updates.extend(update::from_single_update_pub(upd));
519 }
520 let tl::enums::updates::State::State(ns) = d.intermediate_state;
521 let saved_channel_pts = {
522 let s = self.inner.pts_state.lock().await;
523 s.channel_pts.clone()
524 };
525 let mut new_state = PtsState::from_server_state(&ns);
526 for (cid, cpts) in saved_channel_pts {
527 new_state.channel_pts.entry(cid).or_insert(cpts);
528 }
529 new_state.getting_global_diff = true;
530 {
531 let mut s = self.inner.pts_state.lock().await;
532 let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
533 let since = s.getting_global_diff_since; // preserve watchdog timestamp
534 *s = new_state;
535 s.getting_diff_for = getting_diff_for;
536 s.getting_global_diff_since = since;
537 }
538 // Loop: fetch the next slice.
539 continue;
540 }
541
542 tl::enums::updates::Difference::TooLong(d) => {
543 tracing::warn!("[layer] getDifference: TooLong (pts={}): re-syncing", d.pts);
544 self.inner.pts_state.lock().await.pts = d.pts;
545 self.sync_pts_state().await?;
546 return Ok(all_updates);
547 }
548 }
549 }
550 }
551
552 // Per-channel getChannelDifference
553
554 /// Fetch missed updates for a single channel.
555 pub async fn get_channel_difference(
556 &self,
557 channel_id: i64,
558 ) -> Result<Vec<update::Update>, InvocationError> {
559 let local_pts = self
560 .inner
561 .pts_state
562 .lock()
563 .await
564 .channel_pts
565 .get(&channel_id)
566 .copied()
567 .unwrap_or(0);
568
569 let access_hash = self
570 .inner
571 .peer_cache
572 .read()
573 .await
574 .channels
575 .get(&channel_id)
576 .copied()
577 .unwrap_or(0);
578
579 // No access hash in cache → we can't call getChannelDifference.
580 // Attempting GetChannels with access_hash=0 also returns CHANNEL_INVALID,
581 // so skip the call entirely and let the caller handle it.
582 if access_hash == 0 {
583 tracing::debug!(
584 "[layer] channel {channel_id}: access_hash not cached, \
585 cannot call getChannelDifference: caller will remove from tracking"
586 );
587 return Err(InvocationError::Rpc(RpcError {
588 code: 400,
589 name: "CHANNEL_INVALID".into(),
590 value: None,
591 }));
592 }
593
594 tracing::debug!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
595
596 let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
597 channel_id,
598 access_hash,
599 });
600
601 // tDesktop ramp-up: limit=100 on first call, 1000 on subsequent ones.
602 // Bots always use the server-side maximum (100_000).
603 let diff_limit = if self.inner.is_bot.load(std::sync::atomic::Ordering::Relaxed) {
604 CHANNEL_DIFF_LIMIT_BOT
605 } else {
606 let call_count = self
607 .inner
608 .pts_state
609 .lock()
610 .await
611 .channel_diff_calls
612 .get(&channel_id)
613 .copied()
614 .unwrap_or(0);
615 if call_count == 0 { 100 } else { 1000 }
616 };
617
618 let req = tl::functions::updates::GetChannelDifference {
619 force: false,
620 channel,
621 filter: tl::enums::ChannelMessagesFilter::Empty,
622 pts: local_pts.max(1),
623 limit: diff_limit,
624 };
625
626 let body = match self.rpc_call_raw_pub(&req).await {
627 Ok(b) => {
628 // Successful call bump the per-channel counter so next call uses 1000.
629 self.inner
630 .pts_state
631 .lock()
632 .await
633 .channel_diff_calls
634 .entry(channel_id)
635 .and_modify(|c| *c = c.saturating_add(1))
636 .or_insert(1);
637 b
638 }
639 Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
640 // treat as empty diff: retry next gap
641 tracing::debug!("[layer] PERSISTENT_TIMESTAMP_OUTDATED: skipping diff");
642 return Ok(vec![]);
643 }
644 Err(e) => return Err(e),
645 };
646 let body = crate::maybe_gz_decompress(body)?;
647 let mut cur = Cursor::from_slice(&body);
648 let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
649
650 let mut updates = Vec::new();
651
652 match diff {
653 tl::enums::updates::ChannelDifference::Empty(e) => {
654 tracing::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
655 self.inner
656 .pts_state
657 .lock()
658 .await
659 .advance_channel(channel_id, e.pts);
660 }
661 tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
662 tracing::debug!(
663 "[layer] getChannelDifference: {} messages, {} updates",
664 d.new_messages.len(),
665 d.other_updates.len()
666 );
667 self.cache_users_slice_pub(&d.users).await;
668 self.cache_chats_slice_pub(&d.chats).await;
669 for msg in d.new_messages {
670 updates.push(update::Update::NewMessage(
671 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
672 ));
673 }
674 for upd in d.other_updates {
675 updates.extend(update::from_single_update_pub(upd));
676 }
677 self.inner
678 .pts_state
679 .lock()
680 .await
681 .advance_channel(channel_id, d.pts);
682 }
683 tl::enums::updates::ChannelDifference::TooLong(d) => {
684 tracing::warn!(
685 "[layer] getChannelDifference TooLong: replaying messages, resetting pts"
686 );
687 self.cache_users_slice_pub(&d.users).await;
688 self.cache_chats_slice_pub(&d.chats).await;
689 for msg in d.messages {
690 updates.push(update::Update::NewMessage(
691 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
692 ));
693 }
694 self.inner
695 .pts_state
696 .lock()
697 .await
698 .advance_channel(channel_id, 0);
699 }
700 }
701
702 Ok(updates)
703 }
704
705 // Sync from server
706
707 pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
708 let body = self
709 .rpc_call_raw_pub(&tl::functions::updates::GetState {})
710 .await?;
711 let mut cur = Cursor::from_slice(&body);
712 // tDesktop: if the response body has an unknown constructor (e.g. a
713 // misrouted service frame), handler.done() returns false → differenceFail()
714 // → log + retry timer. We mirror that: treat an unrecognised constructor
715 // as a soft failure and return Ok(()) so the caller can retry later.
716 let s = match tl::enums::updates::State::deserialize(&mut cur) {
717 Ok(tl::enums::updates::State::State(s)) => s,
718 Err(e) => {
719 let cid = if body.len() >= 4 {
720 u32::from_le_bytes(body[..4].try_into().unwrap())
721 } else {
722 0
723 };
724 tracing::debug!(
725 "[layer] GetState: unrecognised response constructor \
726 {cid:#010x}, treating as soft failure and retrying ({})",
727 e
728 );
729 return Ok(());
730 }
731 };
732 let mut state = self.inner.pts_state.lock().await;
733 state.pts = s.pts;
734 state.qts = s.qts;
735 state.date = s.date;
736 state.seq = s.seq;
737 state.touch();
738 tracing::debug!(
739 "[layer] pts synced: pts={}, qts={}, seq={}",
740 s.pts,
741 s.qts,
742 s.seq
743 );
744 Ok(())
745 }
746 /// Check global pts, buffer during possible-gap window, fetch diff if real gap.
747 ///
748 /// When a global getDifference is already in-flight (`getting_global_diff == true`),
749 /// updates are **force-dispatched** immediately without pts tracking.
750 /// This prevents the cascade freeze that buffering caused:
751 /// 1. getDiff runs; flight-buffered updates pile up in `possible_gap`.
752 /// 2. getDiff returns; `gap_tick` sees `has_global()=true` → another getDiff.
753 /// 3. Each getDiff spawns another → bot freezes under a burst of messages.
754 pub async fn check_and_fill_gap(
755 &self,
756 new_pts: i32,
757 pts_count: i32,
758 upd: Option<update::Update>,
759 ) -> Result<Vec<update::Update>, InvocationError> {
760 // getDiff in flight: force updates through without pts tracking.
761 //
762 // Force-dispatch: socket updates are sent through
763 // when getting_diff_for contains the key; no buffering, no pts check.
764 // Buffering caused a cascade of getDiff calls and a bot freeze under bursts.
765 // Force-dispatch means these may duplicate what getDiff returns (same pts
766 // range), which is acceptable: Telegram's spec explicitly states that socket
767 // updates received during getDiff "should also have been retrieved through
768 // getDifference". Application-layer deduplication by message_id handles doubles.
769 // pts is NOT advanced here; getDiff sets it authoritatively when it returns.
770 if self.inner.pts_state.lock().await.getting_global_diff {
771 tracing::debug!("[layer] global diff in flight: force-applying pts={new_pts}");
772 return Ok(upd.into_iter().collect());
773 }
774
775 let result = self
776 .inner
777 .pts_state
778 .lock()
779 .await
780 .check_pts(new_pts, pts_count);
781 match result {
782 PtsCheckResult::Ok => {
783 // Advance pts and dispatch only this update.
784 //
785 // Do NOT blindly drain possible_gap here.
786 //
787 // Old behaviour: drain all buffered updates and return them together
788 // with the Ok update. This caused a second freeze:
789 //
790 // 1. pts=1021. Burst arrives: 1024-1030 all gap → buffered.
791 // 2. Update 1022 arrives → Ok → drain dispatches 1024-1030.
792 // 3. pts advances only to 1022 (the Ok value), NOT to 1030.
793 // 4. Bot sends replies → updateShortSentMessage pts=1031 →
794 // check_and_fill_gap: expected=1023, got=1031 → GAP.
795 // 5. Cascade getDiff → duplicates → flood-wait → freeze.
796 //
797 // Grammers avoids this by re-checking each buffered update in
798 // order and advancing pts for each one (process_updates inner loop
799 // over entry.possible_gap). Layer's Update enum carries no pts
800 // metadata, so we cannot replicate that ordered sequence check here.
801 //
802 // Correct equivalent: leave possible_gap alone. The buffered
803 // updates will be recovered by gap_tick → getDiff(new_pts), which
804 // drains possible_gap into pre_diff, lets the server fill the
805 // gap, and advances pts to the true server state; no stale pts,
806 // no secondary cascade, no duplicates.
807 self.inner.pts_state.lock().await.advance(new_pts);
808 Ok(upd.into_iter().collect())
809 }
810 PtsCheckResult::Gap { expected, got } => {
811 // Buffer the update; start the deadline timer regardless.
812 //
813 // Bug fix (touch_global_timer): when upd=None (e.g. the gap is
814 // triggered by an updateShortSentMessage RPC response), nothing was
815 // ever pushed to possible_gap.global, so global stayed None.
816 // global_deadline_elapsed() returned false forever, gap_tick never
817 // saw has_global()=true, and the gap was never resolved unless a
818 // subsequent user message arrived. touch_global_timer() starts the
819 // 1-second deadline clock even without a buffered update.
820 {
821 let mut gap = self.inner.possible_gap.lock().await;
822 if let Some(u) = upd {
823 gap.push_global(u);
824 } else {
825 gap.touch_global_timer();
826 }
827 }
828 let deadline_elapsed = self
829 .inner
830 .possible_gap
831 .lock()
832 .await
833 .global_deadline_elapsed();
834 if deadline_elapsed {
835 tracing::warn!(
836 "[layer] global pts gap: expected {expected}, got {got}: getDifference"
837 );
838 // get_difference() is now atomic (check-and-set) and drains the
839 // possible_gap buffer internally on success, so callers must NOT
840 // drain before calling or splice the old buffer onto the results.
841 // Doing so caused every gap update to be dispatched twice, which
842 // triggered FLOOD_WAIT, blocked the handler, and froze the bot.
843 self.get_difference().await
844 } else {
845 tracing::debug!(
846 "[layer] global pts gap: expected {expected}, got {got}: buffering (possible gap)"
847 );
848 Ok(vec![])
849 }
850 }
851 PtsCheckResult::Duplicate => {
852 tracing::debug!("[layer] global pts duplicate, discarding");
853 Ok(vec![])
854 }
855 }
856 }
857
858 /// Check qts (secret chat updates) and fill gap if needed.
859 pub async fn check_and_fill_qts_gap(
860 &self,
861 new_qts: i32,
862 qts_count: i32,
863 ) -> Result<Vec<update::Update>, InvocationError> {
864 let result = self
865 .inner
866 .pts_state
867 .lock()
868 .await
869 .check_qts(new_qts, qts_count);
870 match result {
871 PtsCheckResult::Ok => {
872 self.inner.pts_state.lock().await.advance_qts(new_qts);
873 Ok(vec![])
874 }
875 PtsCheckResult::Gap { expected, got } => {
876 tracing::warn!("[layer] qts gap: expected {expected}, got {got}: getDifference");
877 self.get_difference().await
878 }
879 PtsCheckResult::Duplicate => Ok(vec![]),
880 }
881 }
882
883 /// Check top-level seq and fill gap if needed.
884 pub async fn check_and_fill_seq_gap(
885 &self,
886 new_seq: i32,
887 seq_start: i32,
888 ) -> Result<Vec<update::Update>, InvocationError> {
889 let result = self
890 .inner
891 .pts_state
892 .lock()
893 .await
894 .check_seq(new_seq, seq_start);
895 match result {
896 PtsCheckResult::Ok => {
897 self.inner.pts_state.lock().await.advance_seq(new_seq);
898 Ok(vec![])
899 }
900 PtsCheckResult::Gap { expected, got } => {
901 tracing::warn!("[layer] seq gap: expected {expected}, got {got}: getDifference");
902 self.get_difference().await
903 }
904 PtsCheckResult::Duplicate => Ok(vec![]),
905 }
906 }
907
908 /// Check a per-channel pts, fetch getChannelDifference if there is a gap.
909 pub async fn check_and_fill_channel_gap(
910 &self,
911 channel_id: i64,
912 new_pts: i32,
913 pts_count: i32,
914 upd: Option<update::Update>,
915 ) -> Result<Vec<update::Update>, InvocationError> {
916 // if a diff is already in flight for this channel, skip: prevents
917 // 1 gap from spawning N concurrent getChannelDifference tasks.
918 if self
919 .inner
920 .pts_state
921 .lock()
922 .await
923 .getting_diff_for
924 .contains(&channel_id)
925 {
926 tracing::debug!("[layer] channel {channel_id} diff already in flight, skipping");
927 if let Some(u) = upd {
928 self.inner
929 .possible_gap
930 .lock()
931 .await
932 .push_channel(channel_id, u);
933 }
934 return Ok(vec![]);
935 }
936
937 let result = self
938 .inner
939 .pts_state
940 .lock()
941 .await
942 .check_channel_pts(channel_id, new_pts, pts_count);
943 match result {
944 PtsCheckResult::Ok => {
945 let mut buffered = self
946 .inner
947 .possible_gap
948 .lock()
949 .await
950 .drain_channel(channel_id);
951 self.inner
952 .pts_state
953 .lock()
954 .await
955 .advance_channel(channel_id, new_pts);
956 if let Some(u) = upd {
957 buffered.push(u);
958 }
959 Ok(buffered)
960 }
961 PtsCheckResult::Gap { expected, got } => {
962 if let Some(u) = upd {
963 self.inner
964 .possible_gap
965 .lock()
966 .await
967 .push_channel(channel_id, u);
968 }
969 let deadline_elapsed = self
970 .inner
971 .possible_gap
972 .lock()
973 .await
974 .channel_deadline_elapsed(channel_id);
975 if deadline_elapsed {
976 tracing::warn!(
977 "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: getChannelDifference"
978 );
979 // mark this channel as having a diff in flight.
980 self.inner
981 .pts_state
982 .lock()
983 .await
984 .getting_diff_for
985 .insert(channel_id);
986 let buffered = self
987 .inner
988 .possible_gap
989 .lock()
990 .await
991 .drain_channel(channel_id);
992 match self.get_channel_difference(channel_id).await {
993 Ok(mut diff_updates) => {
994 // diff complete, allow future gaps to be handled.
995 self.inner
996 .pts_state
997 .lock()
998 .await
999 .getting_diff_for
1000 .remove(&channel_id);
1001 diff_updates.splice(0..0, buffered);
1002 Ok(diff_updates)
1003 }
1004 // Permanent access errors: remove the channel from pts tracking
1005 // entirely (. The next update for this
1006 // channel will have local=0 → PtsCheckResult::Ok, advancing pts
1007 // without any gap fill. This breaks the infinite gap→CHANNEL_INVALID
1008 // loop that happened when advance_channel kept the stale pts alive.
1009 //
1010 // Common causes:
1011 // - access_hash not in peer cache (update arrived via updateShort
1012 // which carries no chats list)
1013 // - bot was kicked / channel deleted
1014 Err(InvocationError::Rpc(ref e))
1015 if e.name == "CHANNEL_INVALID"
1016 || e.name == "CHANNEL_PRIVATE"
1017 || e.name == "CHANNEL_NOT_MODIFIED" =>
1018 {
1019 tracing::debug!(
1020 "[layer] channel {channel_id}: {}: removing from pts tracking \
1021 (next update treated as first-seen, no gap fill)",
1022 e.name
1023 );
1024 {
1025 let mut s = self.inner.pts_state.lock().await;
1026 s.getting_diff_for.remove(&channel_id);
1027 s.channel_pts.remove(&channel_id); // ← fix: delete, not advance
1028 }
1029 Ok(buffered)
1030 }
1031 Err(InvocationError::Deserialize(ref msg)) => {
1032 // Unrecognised constructor or parse failure: treat same as
1033 // CHANNEL_INVALID: remove from tracking so we don't loop.
1034 tracing::debug!(
1035 "[layer] channel {channel_id}: deserialize error ({msg}): \
1036 removing from pts tracking"
1037 );
1038 {
1039 let mut s = self.inner.pts_state.lock().await;
1040 s.getting_diff_for.remove(&channel_id);
1041 s.channel_pts.remove(&channel_id);
1042 }
1043 Ok(buffered)
1044 }
1045 Err(e) => {
1046 // also clear on unexpected errors so we don't get stuck.
1047 self.inner
1048 .pts_state
1049 .lock()
1050 .await
1051 .getting_diff_for
1052 .remove(&channel_id);
1053 Err(e)
1054 }
1055 }
1056 } else {
1057 tracing::debug!(
1058 "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: buffering"
1059 );
1060 Ok(vec![])
1061 }
1062 }
1063 PtsCheckResult::Duplicate => {
1064 tracing::debug!("[layer] channel {channel_id} pts duplicate, discarding");
1065 Ok(vec![])
1066 }
1067 }
1068 }
1069
1070 /// Called periodically (e.g. from keepalive) to fire getDifference
1071 /// if no update has been received for > 15 minutes.
1072 ///
1073 /// also drives per-entry possible-gap deadlines independently of
1074 /// incoming updates. Previously the POSSIBLE_GAP_DEADLINE_MS window was
1075 /// only evaluated when a new incoming update called check_and_fill_gap
1076 /// meaning a quiet channel with a real gap would never fire getDifference
1077 /// until another update arrived. This
1078 /// which scans all LiveEntry.effective_deadline() on every keepalive tick.
1079 pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
1080 // Stuck-diff watchdog: if getting_global_diff has been true for more than
1081 // 30 s the in-flight getDifference RPC is assumed hung (e.g. half-open TCP
1082 // that the OS keepalive hasn't killed yet). Reset the guard so the next
1083 // gap_tick cycle can issue a fresh getDifference. The 30-second timeout
1084 // in get_difference() will concurrently return an error and also clear the
1085 // flag; this watchdog is a belt-and-suspenders safety net for edge cases
1086 // where that timeout itself is somehow delayed.
1087 {
1088 let stuck = {
1089 let s = self.inner.pts_state.lock().await;
1090 s.getting_global_diff
1091 && s.getting_global_diff_since
1092 .map(|t| t.elapsed().as_secs() > 30)
1093 .unwrap_or(false)
1094 };
1095 if stuck {
1096 tracing::warn!(
1097 "[layer] getDifference in-flight for >30 s: \
1098 resetting guard so gap_tick can retry"
1099 );
1100 let mut s = self.inner.pts_state.lock().await;
1101 s.getting_global_diff = false;
1102 s.getting_global_diff_since = None;
1103 }
1104 }
1105
1106 // existing 5-minute global timeout
1107 let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
1108 if exceeded {
1109 tracing::info!("[layer] update deadline exceeded: fetching getDifference");
1110 match self.get_difference().await {
1111 Ok(updates) => {
1112 for u in updates {
1113 if self.inner.update_tx.try_send(u).is_err() {
1114 tracing::warn!("[layer] update channel full: dropping diff update");
1115 }
1116 }
1117 }
1118 Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1119 // 401: gap buffer already cleared inside get_difference().
1120 // gap_tick will not re-fire. Supervisor handles reconnect.
1121 tracing::warn!(
1122 "[layer] deadline getDifference AUTH_KEY_UNREGISTERED: session dead"
1123 );
1124 }
1125 // tDesktop: differenceFail() just logs + sets a retry timer.
1126 // It never propagates the error outward. Mirror that here:
1127 // Deserialize errors (unknown constructor, e.g. 0x00000004) are
1128 // transient - the server sent a misrouted frame or a service packet.
1129 // Propagating them causes the caller's loop to log noise every second.
1130 // IO / RPC errors also stay local: the reader loop drives reconnects
1131 // independently via the socket, not via check_update_deadline's return.
1132 Err(e) => {
1133 tracing::warn!("[layer] deadline getDifference failed (non-fatal): {e}");
1134 }
1135 }
1136 }
1137
1138 // drive global possible-gap deadline
1139 // If the possible-gap window has expired but no new update has arrived
1140 // to trigger check_and_fill_gap, fire getDifference from here.
1141 {
1142 let gap_expired = self
1143 .inner
1144 .possible_gap
1145 .lock()
1146 .await
1147 .global_deadline_elapsed();
1148 // Note: get_difference() is now atomic (check-and-set), so the
1149 // `already` guard is advisory only; get_difference() will bail
1150 // safely if another call is already in flight.
1151 if gap_expired {
1152 tracing::debug!("[layer] B3 global possible-gap deadline expired: getDifference");
1153 // get_difference() snapshots and drains the pre-existing buffer at its
1154 // start (before the RPC), so updates that arrive DURING the RPC flight
1155 // remain in possible_gap for the next cycle. Never drain here.
1156 match self.get_difference().await {
1157 Ok(updates) => {
1158 for u in updates {
1159 if self.inner.update_tx.try_send(u).is_err() {
1160 tracing::warn!("[layer] update channel full: dropping gap update");
1161 }
1162 }
1163 }
1164 Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1165 // 401: get_difference() cleared the gap buffer, so gap_tick
1166 // will not re-fire. Supervisor handles reconnect.
1167 tracing::warn!(
1168 "[layer] B3 global gap diff AUTH_KEY_UNREGISTERED: \
1169 session dead, gap buffer cleared"
1170 );
1171 }
1172 // tDesktop differenceFail: log and let the retry timer handle it.
1173 // Never propagate - the reader drives reconnects via the socket.
1174 Err(e) => {
1175 tracing::warn!("[layer] B3 global gap diff failed (non-fatal): {e}");
1176 }
1177 }
1178 }
1179 }
1180
1181 // drive per-channel possible-gap deadlines
1182 // Collect expired channel IDs up-front to avoid holding the lock across awaits.
1183 let expired_channels: Vec<i64> = {
1184 let gap = self.inner.possible_gap.lock().await;
1185 gap.channel
1186 .keys()
1187 .copied()
1188 .filter(|&id| gap.channel_deadline_elapsed(id))
1189 .collect()
1190 };
1191 for channel_id in expired_channels {
1192 let already = self
1193 .inner
1194 .pts_state
1195 .lock()
1196 .await
1197 .getting_diff_for
1198 .contains(&channel_id);
1199 if already {
1200 continue;
1201 }
1202 tracing::debug!(
1203 "[layer] B3 channel {channel_id} possible-gap deadline expired: getChannelDifference"
1204 );
1205 // Mark in-flight before spawning so a racing incoming update can't
1206 // also spawn a diff for the same channel.
1207 self.inner
1208 .pts_state
1209 .lock()
1210 .await
1211 .getting_diff_for
1212 .insert(channel_id);
1213 let buffered = self
1214 .inner
1215 .possible_gap
1216 .lock()
1217 .await
1218 .drain_channel(channel_id);
1219 let c = self.clone();
1220 let utx = self.inner.update_tx.clone();
1221 tokio::spawn(async move {
1222 match c.get_channel_difference(channel_id).await {
1223 Ok(mut updates) => {
1224 c.inner
1225 .pts_state
1226 .lock()
1227 .await
1228 .getting_diff_for
1229 .remove(&channel_id);
1230 updates.splice(0..0, buffered);
1231 for u in updates {
1232 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1233 tracing::warn!(
1234 "[layer] update channel full: dropping ch gap update"
1235 );
1236 }
1237 }
1238 }
1239 Err(e) => {
1240 c.inner
1241 .pts_state
1242 .lock()
1243 .await
1244 .getting_diff_for
1245 .remove(&channel_id);
1246 // Permanent errors (CHANNEL_INVALID etc.): updates are
1247 // unrecoverable, drop them. Transient errors: restore
1248 // the buffer so the next B3 cycle can retry.
1249 let permanent = matches!(&e,
1250 InvocationError::Rpc(r)
1251 if r.code == 401
1252 || r.name == "CHANNEL_INVALID"
1253 || r.name == "CHANNEL_PRIVATE"
1254 || r.name == "CHANNEL_NOT_MODIFIED"
1255 ) || matches!(&e, InvocationError::Deserialize(_));
1256 if permanent {
1257 tracing::warn!(
1258 "[layer] B3 channel {channel_id} gap diff failed (permanent): {e}"
1259 );
1260 } else {
1261 tracing::warn!(
1262 "[layer] B3 channel {channel_id} gap diff failed (transient): {e}"
1263 );
1264 let mut gap = c.inner.possible_gap.lock().await;
1265 for u in buffered {
1266 gap.push_channel(channel_id, u);
1267 }
1268 }
1269 }
1270 }
1271 });
1272 }
1273
1274 Ok(())
1275 }
1276}