layer_client/pts.rs
1//! Update gap detection and recovery.
2//!
3//! Tracks `pts` / `qts` / `seq` / `date` plus per-channel pts, and
4//! fills gaps via `updates.getDifference` (global) and
5//! `updates.getChannelDifference` (per-channel).
6//!
7//! ## What "gap" means
8//! Telegram guarantees updates arrive in order within a pts counter.
9//! If `new_pts != local_pts + pts_count` there is a gap and we must
10//! ask the server for the missed updates before processing this one.
11
12use std::collections::{HashMap, HashSet};
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, RpcError, attach_client_to_update, update};
19
20/// How long to wait before declaring a pts jump a real gap (ms).
21const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
22
23/// Bots are allowed a much larger diff window (Telegram server-side limit).
24const CHANNEL_DIFF_LIMIT_BOT: i32 = 100_000;
25
26/// Buffers updates received during a possible-gap window so we don't fire
27/// getDifference on every slightly out-of-order update.
28#[derive(Default)]
29pub struct PossibleGapBuffer {
30 /// channel_id → (buffered_updates, window_start)
31 channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32 /// Global buffered updates (non-channel pts gaps)
33 global: Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37 pub fn new() -> Self {
38 Self::default()
39 }
40
41 /// Buffer a global update during a possible-gap window.
42 pub fn push_global(&mut self, upd: update::Update) {
43 let entry = self
44 .global
45 .get_or_insert_with(|| (Vec::new(), Instant::now()));
46 entry.0.push(upd);
47 }
48
49 /// Buffer a channel update during a possible-gap window.
50 pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
51 let entry = self
52 .channel
53 .entry(channel_id)
54 .or_insert_with(|| (Vec::new(), Instant::now()));
55 entry.0.push(upd);
56 }
57
58 /// True if the global possible-gap deadline has elapsed.
59 pub fn global_deadline_elapsed(&self) -> bool {
60 self.global
61 .as_ref()
62 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
63 .unwrap_or(false)
64 }
65
66 /// True if a channel's possible-gap deadline has elapsed.
67 pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
68 self.channel
69 .get(&channel_id)
70 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
71 .unwrap_or(false)
72 }
73
74 /// True if the global buffer has any pending updates.
75 pub fn has_global(&self) -> bool {
76 self.global.is_some()
77 }
78
79 /// True if a channel buffer has pending updates.
80 pub fn has_channel(&self, channel_id: i64) -> bool {
81 self.channel.contains_key(&channel_id)
82 }
83
84 /// Start the global deadline timer without buffering an update.
85 ///
86 /// Called when a gap is detected but the triggering update carries no
87 /// high-level `Update` value (e.g. `updateShortSentMessage` with `upd=None`).
88 /// Without this, `global` stays `None` → `global_deadline_elapsed()` always
89 /// returns `false` → `gap_tick` never fires getDifference for such gaps.
90 pub fn touch_global_timer(&mut self) {
91 self.global
92 .get_or_insert_with(|| (Vec::new(), Instant::now()));
93 }
94
95 /// Drain global buffered updates.
96 pub fn drain_global(&mut self) -> Vec<update::Update> {
97 self.global.take().map(|(v, _)| v).unwrap_or_default()
98 }
99
100 /// Drain channel buffered updates.
101 pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
102 self.channel
103 .remove(&channel_id)
104 .map(|(v, _)| v)
105 .unwrap_or_default()
106 }
107}
108
109// PtsState
110
111/// Full MTProto sequence-number state, including per-channel counters.
112///
113/// All fields are `pub` so that `connect()` can restore them from the
114/// persisted session without going through an artificial constructor.
115#[derive(Debug, Clone, Default)]
116pub struct PtsState {
117 /// Main pts counter (messages, non-channel updates).
118 pub pts: i32,
119 /// Secondary counter for secret-chat updates.
120 pub qts: i32,
121 /// Date of the last received update (Unix timestamp).
122 pub date: i32,
123 /// Combined-container sequence number.
124 pub seq: i32,
125 /// Per-channel pts counters. `channel_id → pts`.
126 pub channel_pts: HashMap<i64, i32>,
127 /// How many times getChannelDifference has been called per channel.
128 /// tDesktop starts at limit=100, then raises to 1000 after the first
129 /// successful response. We track call count to implement the same ramp-up.
130 pub channel_diff_calls: HashMap<i64, u32>,
131 /// Timestamp of last received update for deadline-based gap detection.
132 pub last_update_at: Option<Instant>,
133 /// Channels currently awaiting a getChannelDifference response.
134 /// If a channel is in this set, no new gap-fill task is spawned for it.
135 pub getting_diff_for: HashSet<i64>,
136 /// Guard against concurrent global getDifference calls.
137 /// Without this, two simultaneous gap detections both spawn get_difference(),
138 /// which double-processes updates and corrupts pts state.
139 pub getting_global_diff: bool,
140 /// When getting_global_diff was set to true. Used by the stuck-diff watchdog
141 /// in check_update_deadline: if the flag has been set for >30 s the RPC is
142 /// assumed hung and the guard is reset so the next gap_tick can retry.
143 pub getting_global_diff_since: Option<Instant>,
144}
145
146impl PtsState {
147 pub fn from_server_state(s: &tl::types::updates::State) -> Self {
148 Self {
149 pts: s.pts,
150 qts: s.qts,
151 date: s.date,
152 seq: s.seq,
153 channel_pts: HashMap::new(),
154 channel_diff_calls: HashMap::new(),
155 last_update_at: Some(Instant::now()),
156 getting_diff_for: HashSet::new(),
157 getting_global_diff: false,
158 getting_global_diff_since: None,
159 }
160 }
161
162 /// Record that an update was received now (resets the deadline timer).
163 pub fn touch(&mut self) {
164 self.last_update_at = Some(Instant::now());
165 }
166
167 /// Returns true if no update has been received for > 15 minutes.
168 pub fn deadline_exceeded(&self) -> bool {
169 self.last_update_at
170 .as_ref()
171 .map(|t| t.elapsed().as_secs() > 15 * 60)
172 .unwrap_or(false)
173 }
174
175 /// Check whether `new_pts` is in order given `pts_count` new updates.
176 pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
177 let expected = self.pts + pts_count;
178 if new_pts == expected {
179 PtsCheckResult::Ok
180 } else if new_pts > expected {
181 PtsCheckResult::Gap {
182 expected,
183 got: new_pts,
184 }
185 } else {
186 PtsCheckResult::Duplicate
187 }
188 }
189
190 /// Check a qts value (secret chat updates).
191 pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
192 let expected = self.qts + qts_count;
193 if new_qts == expected {
194 PtsCheckResult::Ok
195 } else if new_qts > expected {
196 PtsCheckResult::Gap {
197 expected,
198 got: new_qts,
199 }
200 } else {
201 PtsCheckResult::Duplicate
202 }
203 }
204
205 /// Check top-level seq for UpdatesCombined containers.
206 pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
207 if self.seq == 0 {
208 return PtsCheckResult::Ok;
209 } // uninitialised: accept
210 let expected = self.seq + 1;
211 if seq_start == expected {
212 PtsCheckResult::Ok
213 } else if seq_start > expected {
214 PtsCheckResult::Gap {
215 expected,
216 got: seq_start,
217 }
218 } else {
219 PtsCheckResult::Duplicate
220 }
221 }
222
223 /// Check a per-channel pts value.
224 pub fn check_channel_pts(
225 &self,
226 channel_id: i64,
227 new_pts: i32,
228 pts_count: i32,
229 ) -> PtsCheckResult {
230 let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
231 if local == 0 {
232 return PtsCheckResult::Ok;
233 }
234 let expected = local + pts_count;
235 if new_pts == expected {
236 PtsCheckResult::Ok
237 } else if new_pts > expected {
238 PtsCheckResult::Gap {
239 expected,
240 got: new_pts,
241 }
242 } else {
243 PtsCheckResult::Duplicate
244 }
245 }
246
247 /// Advance the global pts.
248 pub fn advance(&mut self, new_pts: i32) {
249 if new_pts > self.pts {
250 self.pts = new_pts;
251 }
252 self.touch();
253 }
254
255 /// Advance the qts.
256 pub fn advance_qts(&mut self, new_qts: i32) {
257 if new_qts > self.qts {
258 self.qts = new_qts;
259 }
260 self.touch();
261 }
262
263 /// Advance seq.
264 pub fn advance_seq(&mut self, new_seq: i32) {
265 if new_seq > self.seq {
266 self.seq = new_seq;
267 }
268 }
269
270 /// Advance a per-channel pts.
271 pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
272 let entry = self.channel_pts.entry(channel_id).or_insert(0);
273 if new_pts > *entry {
274 *entry = new_pts;
275 }
276 self.touch();
277 }
278}
279
280#[derive(Debug, PartialEq, Eq)]
281pub enum PtsCheckResult {
282 Ok,
283 Gap { expected: i32, got: i32 },
284 Duplicate,
285}
286
287// Client methods
288
289impl Client {
290 // Global getDifference
291
292 /// Fetch and replay any updates missed since the persisted pts.
293 ///
294 /// loops on `Difference::Slice` (partial response) until the server
295 /// returns a final `Difference` or `Empty`
296 /// never dropping a partial batch. Previous code returned after one slice,
297 /// silently losing all updates in subsequent slices.
298 pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
299 // Atomically claim the in-flight slot.
300 //
301 // TOCTOU fix: the old code set getting_global_diff=true inside get_difference
302 // but the external guard in check_and_fill_gap read the flag in a SEPARATE lock
303 // acquisition. With multi-threaded Tokio, N tasks could all read false, all
304 // pass the external check, then all race into this function and all set the flag
305 // to true. Each concurrent call resets pts_state to the server state it received;
306 // the last STALE write rolls pts back below where it actually is, which immediately
307 // triggers a new gap → another burst of concurrent getDifference calls → cascade.
308 //
309 // Fix: check-and-set inside a SINGLE lock acquisition. Only the first caller
310 // proceeds; all others see true and return Ok(vec![]) immediately.
311 {
312 let mut s = self.inner.pts_state.lock().await;
313 if s.getting_global_diff {
314 return Ok(vec![]);
315 }
316 s.getting_global_diff = true;
317 s.getting_global_diff_since = Some(Instant::now());
318 }
319
320 // Drain the initial-gap buffer before the RPC.
321 //
322 // possible_gap.global contains updates buffered during the possible-gap window
323 // (before we decided to call getDiff). The server response covers exactly
324 // these pts values → discard the snapshot on success.
325 // On RPC error, restore them so the next gap_tick can retry.
326 //
327 // Note: updates arriving DURING the RPC flight are now force-dispatched by
328 // check_and_fill_gap and never accumulate in possible_gap,
329 // so there is nothing extra to drain after the call returns.
330 let pre_diff = self.inner.possible_gap.lock().await.drain_global();
331
332 // Wrap the RPC in a hard 30-second timeout so a hung TCP connection
333 // (half-open socket, unresponsive DC) cannot hold getting_global_diff=true
334 // forever and freeze the bot indefinitely.
335 let result = tokio::time::timeout(
336 std::time::Duration::from_secs(30),
337 self.get_difference_inner(),
338 )
339 .await
340 .unwrap_or_else(|_| {
341 tracing::warn!("[layer] getDifference RPC timed out after 30 s: will retry");
342 Err(InvocationError::Io(std::io::Error::new(
343 std::io::ErrorKind::TimedOut,
344 "getDifference timed out",
345 )))
346 });
347
348 // Always clear the guard, even on error.
349 {
350 let mut s = self.inner.pts_state.lock().await;
351 s.getting_global_diff = false;
352 s.getting_global_diff_since = None;
353 }
354
355 match &result {
356 Ok(_) => {
357 // pre_diff is covered by the server response; discard it.
358 // (Flight-time updates are force-dispatched, not buffered into possible_gap.)
359 }
360 Err(e) => {
361 let mut gap = self.inner.possible_gap.lock().await;
362 if matches!(e, InvocationError::Rpc(r) if r.code == 401) {
363 // 401: do not restore the gap buffer. push_global inserts a
364 // fresh Instant::now() timestamp, making global_deadline_elapsed()
365 // return true immediately on the next gap_tick cycle, which fires
366 // get_difference() again and loops. Drop it; reconnect will
367 // re-sync state via sync_pts_state / getDifference.
368 drop(pre_diff);
369 } else {
370 // Transient error: restore so the next gap_tick can retry.
371 for u in pre_diff {
372 gap.push_global(u);
373 }
374 }
375 }
376 }
377
378 result
379 }
380
381 async fn get_difference_inner(&self) -> Result<Vec<update::Update>, InvocationError> {
382 use layer_tl_types::{Cursor, Deserializable};
383
384 let mut all_updates: Vec<update::Update> = Vec::new();
385
386 // loop until the server sends a final (non-Slice) response.
387 loop {
388 let (pts, qts, date) = {
389 let s = self.inner.pts_state.lock().await;
390 (s.pts, s.qts, s.date)
391 };
392
393 if pts == 0 {
394 self.sync_pts_state().await?;
395 return Ok(all_updates);
396 }
397
398 tracing::debug!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
399
400 let req = tl::functions::updates::GetDifference {
401 pts,
402 pts_limit: None,
403 pts_total_limit: None,
404 date,
405 qts,
406 qts_limit: None,
407 };
408
409 let body = self.rpc_call_raw_pub(&req).await?;
410 let body = crate::maybe_gz_decompress(body)?;
411 let mut cur = Cursor::from_slice(&body);
412 let diff = tl::enums::updates::Difference::deserialize(&mut cur)?;
413
414 match diff {
415 tl::enums::updates::Difference::Empty(e) => {
416 let mut s = self.inner.pts_state.lock().await;
417 s.date = e.date;
418 s.seq = e.seq;
419 s.touch();
420 tracing::debug!("[layer] getDifference: empty (seq={})", e.seq);
421 return Ok(all_updates);
422 }
423
424 tl::enums::updates::Difference::Difference(d) => {
425 tracing::debug!(
426 "[layer] getDifference: {} messages, {} updates (final)",
427 d.new_messages.len(),
428 d.other_updates.len()
429 );
430 self.cache_users_slice_pub(&d.users).await;
431 self.cache_chats_slice_pub(&d.chats).await;
432 for msg in d.new_messages {
433 all_updates.push(update::Update::NewMessage(
434 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
435 ));
436 }
437 for upd in d.other_updates {
438 all_updates.extend(update::from_single_update_pub(upd));
439 }
440 let tl::enums::updates::State::State(ns) = d.state;
441 let saved_channel_pts = {
442 let s = self.inner.pts_state.lock().await;
443 s.channel_pts.clone()
444 };
445 let mut new_state = PtsState::from_server_state(&ns);
446 // Preserve per-channel pts across the global reset.
447 for (cid, cpts) in saved_channel_pts {
448 new_state.channel_pts.entry(cid).or_insert(cpts);
449 }
450 // Preserve in-flight sets: we clear getting_global_diff ourselves.
451 new_state.getting_global_diff = true; // will be cleared by caller
452 {
453 let mut s = self.inner.pts_state.lock().await;
454 let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
455 let since = s.getting_global_diff_since; // preserve watchdog timestamp
456 *s = new_state;
457 s.getting_diff_for = getting_diff_for;
458 s.getting_global_diff_since = since;
459 }
460 // Final response: stop looping.
461 return Ok(all_updates);
462 }
463
464 tl::enums::updates::Difference::Slice(d) => {
465 // server has more data: apply intermediate_state and
466 // continue looping. Old code returned here, losing all updates
467 // in subsequent slices.
468 tracing::debug!(
469 "[layer] getDifference slice: {} messages, {} updates: continuing",
470 d.new_messages.len(),
471 d.other_updates.len()
472 );
473 self.cache_users_slice_pub(&d.users).await;
474 self.cache_chats_slice_pub(&d.chats).await;
475 for msg in d.new_messages {
476 all_updates.push(update::Update::NewMessage(
477 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
478 ));
479 }
480 for upd in d.other_updates {
481 all_updates.extend(update::from_single_update_pub(upd));
482 }
483 let tl::enums::updates::State::State(ns) = d.intermediate_state;
484 let saved_channel_pts = {
485 let s = self.inner.pts_state.lock().await;
486 s.channel_pts.clone()
487 };
488 let mut new_state = PtsState::from_server_state(&ns);
489 for (cid, cpts) in saved_channel_pts {
490 new_state.channel_pts.entry(cid).or_insert(cpts);
491 }
492 new_state.getting_global_diff = true;
493 {
494 let mut s = self.inner.pts_state.lock().await;
495 let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
496 let since = s.getting_global_diff_since; // preserve watchdog timestamp
497 *s = new_state;
498 s.getting_diff_for = getting_diff_for;
499 s.getting_global_diff_since = since;
500 }
501 // Loop: fetch the next slice.
502 continue;
503 }
504
505 tl::enums::updates::Difference::TooLong(d) => {
506 tracing::warn!("[layer] getDifference: TooLong (pts={}): re-syncing", d.pts);
507 self.inner.pts_state.lock().await.pts = d.pts;
508 self.sync_pts_state().await?;
509 return Ok(all_updates);
510 }
511 }
512 }
513 }
514
515 // Per-channel getChannelDifference
516
517 /// Fetch missed updates for a single channel.
518 pub async fn get_channel_difference(
519 &self,
520 channel_id: i64,
521 ) -> Result<Vec<update::Update>, InvocationError> {
522 let local_pts = self
523 .inner
524 .pts_state
525 .lock()
526 .await
527 .channel_pts
528 .get(&channel_id)
529 .copied()
530 .unwrap_or(0);
531
532 let access_hash = self
533 .inner
534 .peer_cache
535 .read()
536 .await
537 .channels
538 .get(&channel_id)
539 .copied()
540 .unwrap_or(0);
541
542 // No access hash in cache → we can't call getChannelDifference.
543 // Attempting GetChannels with access_hash=0 also returns CHANNEL_INVALID,
544 // so skip the call entirely and let the caller handle it.
545 if access_hash == 0 {
546 tracing::debug!(
547 "[layer] channel {channel_id}: access_hash not cached, \
548 cannot call getChannelDifference: caller will remove from tracking"
549 );
550 return Err(InvocationError::Rpc(RpcError {
551 code: 400,
552 name: "CHANNEL_INVALID".into(),
553 value: None,
554 }));
555 }
556
557 tracing::debug!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
558
559 let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
560 channel_id,
561 access_hash,
562 });
563
564 // tDesktop ramp-up: limit=100 on first call, 1000 on subsequent ones.
565 // Bots always use the server-side maximum (100_000).
566 let diff_limit = if self.inner.is_bot.load(std::sync::atomic::Ordering::Relaxed) {
567 CHANNEL_DIFF_LIMIT_BOT
568 } else {
569 let call_count = self
570 .inner
571 .pts_state
572 .lock()
573 .await
574 .channel_diff_calls
575 .get(&channel_id)
576 .copied()
577 .unwrap_or(0);
578 if call_count == 0 { 100 } else { 1000 }
579 };
580
581 let req = tl::functions::updates::GetChannelDifference {
582 force: false,
583 channel,
584 filter: tl::enums::ChannelMessagesFilter::Empty,
585 pts: local_pts.max(1),
586 limit: diff_limit,
587 };
588
589 let body = match self.rpc_call_raw_pub(&req).await {
590 Ok(b) => {
591 // Successful call bump the per-channel counter so next call uses 1000.
592 self.inner
593 .pts_state
594 .lock()
595 .await
596 .channel_diff_calls
597 .entry(channel_id)
598 .and_modify(|c| *c = c.saturating_add(1))
599 .or_insert(1);
600 b
601 }
602 Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
603 // treat as empty diff: retry next gap
604 tracing::debug!("[layer] PERSISTENT_TIMESTAMP_OUTDATED: skipping diff");
605 return Ok(vec![]);
606 }
607 Err(e) => return Err(e),
608 };
609 let body = crate::maybe_gz_decompress(body)?;
610 let mut cur = Cursor::from_slice(&body);
611 let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
612
613 let mut updates = Vec::new();
614
615 match diff {
616 tl::enums::updates::ChannelDifference::Empty(e) => {
617 tracing::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
618 self.inner
619 .pts_state
620 .lock()
621 .await
622 .advance_channel(channel_id, e.pts);
623 }
624 tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
625 tracing::debug!(
626 "[layer] getChannelDifference: {} messages, {} updates",
627 d.new_messages.len(),
628 d.other_updates.len()
629 );
630 self.cache_users_slice_pub(&d.users).await;
631 self.cache_chats_slice_pub(&d.chats).await;
632 for msg in d.new_messages {
633 updates.push(update::Update::NewMessage(
634 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
635 ));
636 }
637 for upd in d.other_updates {
638 updates.extend(update::from_single_update_pub(upd));
639 }
640 self.inner
641 .pts_state
642 .lock()
643 .await
644 .advance_channel(channel_id, d.pts);
645 }
646 tl::enums::updates::ChannelDifference::TooLong(d) => {
647 tracing::warn!(
648 "[layer] getChannelDifference TooLong: replaying messages, resetting pts"
649 );
650 self.cache_users_slice_pub(&d.users).await;
651 self.cache_chats_slice_pub(&d.chats).await;
652 for msg in d.messages {
653 updates.push(update::Update::NewMessage(
654 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
655 ));
656 }
657 self.inner
658 .pts_state
659 .lock()
660 .await
661 .advance_channel(channel_id, 0);
662 }
663 }
664
665 Ok(updates)
666 }
667
668 // Sync from server
669
670 pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
671 let body = self
672 .rpc_call_raw_pub(&tl::functions::updates::GetState {})
673 .await?;
674 let mut cur = Cursor::from_slice(&body);
675 let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
676 let mut state = self.inner.pts_state.lock().await;
677 state.pts = s.pts;
678 state.qts = s.qts;
679 state.date = s.date;
680 state.seq = s.seq;
681 state.touch();
682 tracing::debug!(
683 "[layer] pts synced: pts={}, qts={}, seq={}",
684 s.pts,
685 s.qts,
686 s.seq
687 );
688 Ok(())
689 }
690 /// Check global pts, buffer during possible-gap window, fetch diff if real gap.
691 ///
692 /// When a global getDifference is already in-flight (`getting_global_diff == true`),
693 /// updates are **force-dispatched** immediately without pts tracking.
694 /// This prevents the cascade freeze that buffering caused:
695 /// 1. getDiff runs; flight-buffered updates pile up in `possible_gap`.
696 /// 2. getDiff returns; `gap_tick` sees `has_global()=true` → another getDiff.
697 /// 3. Each getDiff spawns another → bot freezes under a burst of messages.
698 pub async fn check_and_fill_gap(
699 &self,
700 new_pts: i32,
701 pts_count: i32,
702 upd: Option<update::Update>,
703 ) -> Result<Vec<update::Update>, InvocationError> {
704 // getDiff in flight: force updates through without pts tracking.
705 //
706 // Force-dispatch: socket updates are sent through
707 // when getting_diff_for contains the key; no buffering, no pts check.
708 // Buffering caused a cascade of getDiff calls and a bot freeze under bursts.
709 // Force-dispatch means these may duplicate what getDiff returns (same pts
710 // range), which is acceptable: Telegram's spec explicitly states that socket
711 // updates received during getDiff "should also have been retrieved through
712 // getDifference". Application-layer deduplication by message_id handles doubles.
713 // pts is NOT advanced here; getDiff sets it authoritatively when it returns.
714 if self.inner.pts_state.lock().await.getting_global_diff {
715 tracing::debug!("[layer] global diff in flight: force-applying pts={new_pts}");
716 return Ok(upd.into_iter().collect());
717 }
718
719 let result = self
720 .inner
721 .pts_state
722 .lock()
723 .await
724 .check_pts(new_pts, pts_count);
725 match result {
726 PtsCheckResult::Ok => {
727 // Advance pts and dispatch only this update.
728 //
729 // Do NOT blindly drain possible_gap here.
730 //
731 // Old behaviour: drain all buffered updates and return them together
732 // with the Ok update. This caused a second freeze:
733 //
734 // 1. pts=1021. Burst arrives: 1024-1030 all gap → buffered.
735 // 2. Update 1022 arrives → Ok → drain dispatches 1024-1030.
736 // 3. pts advances only to 1022 (the Ok value), NOT to 1030.
737 // 4. Bot sends replies → updateShortSentMessage pts=1031 →
738 // check_and_fill_gap: expected=1023, got=1031 → GAP.
739 // 5. Cascade getDiff → duplicates → flood-wait → freeze.
740 //
741 // Grammers avoids this by re-checking each buffered update in
742 // order and advancing pts for each one (process_updates inner loop
743 // over entry.possible_gap). Layer's Update enum carries no pts
744 // metadata, so we cannot replicate that ordered sequence check here.
745 //
746 // Correct equivalent: leave possible_gap alone. The buffered
747 // updates will be recovered by gap_tick → getDiff(new_pts), which
748 // drains possible_gap into pre_diff, lets the server fill the
749 // gap, and advances pts to the true server state; no stale pts,
750 // no secondary cascade, no duplicates.
751 self.inner.pts_state.lock().await.advance(new_pts);
752 Ok(upd.into_iter().collect())
753 }
754 PtsCheckResult::Gap { expected, got } => {
755 // Buffer the update; start the deadline timer regardless.
756 //
757 // Bug fix (touch_global_timer): when upd=None (e.g. the gap is
758 // triggered by an updateShortSentMessage RPC response), nothing was
759 // ever pushed to possible_gap.global, so global stayed None.
760 // global_deadline_elapsed() returned false forever, gap_tick never
761 // saw has_global()=true, and the gap was never resolved unless a
762 // subsequent user message arrived. touch_global_timer() starts the
763 // 1-second deadline clock even without a buffered update.
764 {
765 let mut gap = self.inner.possible_gap.lock().await;
766 if let Some(u) = upd {
767 gap.push_global(u);
768 } else {
769 gap.touch_global_timer();
770 }
771 }
772 let deadline_elapsed = self
773 .inner
774 .possible_gap
775 .lock()
776 .await
777 .global_deadline_elapsed();
778 if deadline_elapsed {
779 tracing::warn!(
780 "[layer] global pts gap: expected {expected}, got {got}: getDifference"
781 );
782 // get_difference() is now atomic (check-and-set) and drains the
783 // possible_gap buffer internally on success, so callers must NOT
784 // drain before calling or splice the old buffer onto the results.
785 // Doing so caused every gap update to be dispatched twice, which
786 // triggered FLOOD_WAIT, blocked the handler, and froze the bot.
787 self.get_difference().await
788 } else {
789 tracing::debug!(
790 "[layer] global pts gap: expected {expected}, got {got}: buffering (possible gap)"
791 );
792 Ok(vec![])
793 }
794 }
795 PtsCheckResult::Duplicate => {
796 tracing::debug!("[layer] global pts duplicate, discarding");
797 Ok(vec![])
798 }
799 }
800 }
801
802 /// Check qts (secret chat updates) and fill gap if needed.
803 pub async fn check_and_fill_qts_gap(
804 &self,
805 new_qts: i32,
806 qts_count: i32,
807 ) -> Result<Vec<update::Update>, InvocationError> {
808 let result = self
809 .inner
810 .pts_state
811 .lock()
812 .await
813 .check_qts(new_qts, qts_count);
814 match result {
815 PtsCheckResult::Ok => {
816 self.inner.pts_state.lock().await.advance_qts(new_qts);
817 Ok(vec![])
818 }
819 PtsCheckResult::Gap { expected, got } => {
820 tracing::warn!("[layer] qts gap: expected {expected}, got {got}: getDifference");
821 self.get_difference().await
822 }
823 PtsCheckResult::Duplicate => Ok(vec![]),
824 }
825 }
826
827 /// Check top-level seq and fill gap if needed.
828 pub async fn check_and_fill_seq_gap(
829 &self,
830 new_seq: i32,
831 seq_start: i32,
832 ) -> Result<Vec<update::Update>, InvocationError> {
833 let result = self
834 .inner
835 .pts_state
836 .lock()
837 .await
838 .check_seq(new_seq, seq_start);
839 match result {
840 PtsCheckResult::Ok => {
841 self.inner.pts_state.lock().await.advance_seq(new_seq);
842 Ok(vec![])
843 }
844 PtsCheckResult::Gap { expected, got } => {
845 tracing::warn!("[layer] seq gap: expected {expected}, got {got}: getDifference");
846 self.get_difference().await
847 }
848 PtsCheckResult::Duplicate => Ok(vec![]),
849 }
850 }
851
852 /// Check a per-channel pts, fetch getChannelDifference if there is a gap.
853 pub async fn check_and_fill_channel_gap(
854 &self,
855 channel_id: i64,
856 new_pts: i32,
857 pts_count: i32,
858 upd: Option<update::Update>,
859 ) -> Result<Vec<update::Update>, InvocationError> {
860 // if a diff is already in flight for this channel, skip: prevents
861 // 1 gap from spawning N concurrent getChannelDifference tasks.
862 if self
863 .inner
864 .pts_state
865 .lock()
866 .await
867 .getting_diff_for
868 .contains(&channel_id)
869 {
870 tracing::debug!("[layer] channel {channel_id} diff already in flight, skipping");
871 if let Some(u) = upd {
872 self.inner
873 .possible_gap
874 .lock()
875 .await
876 .push_channel(channel_id, u);
877 }
878 return Ok(vec![]);
879 }
880
881 let result = self
882 .inner
883 .pts_state
884 .lock()
885 .await
886 .check_channel_pts(channel_id, new_pts, pts_count);
887 match result {
888 PtsCheckResult::Ok => {
889 let mut buffered = self
890 .inner
891 .possible_gap
892 .lock()
893 .await
894 .drain_channel(channel_id);
895 self.inner
896 .pts_state
897 .lock()
898 .await
899 .advance_channel(channel_id, new_pts);
900 if let Some(u) = upd {
901 buffered.push(u);
902 }
903 Ok(buffered)
904 }
905 PtsCheckResult::Gap { expected, got } => {
906 if let Some(u) = upd {
907 self.inner
908 .possible_gap
909 .lock()
910 .await
911 .push_channel(channel_id, u);
912 }
913 let deadline_elapsed = self
914 .inner
915 .possible_gap
916 .lock()
917 .await
918 .channel_deadline_elapsed(channel_id);
919 if deadline_elapsed {
920 tracing::warn!(
921 "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: getChannelDifference"
922 );
923 // mark this channel as having a diff in flight.
924 self.inner
925 .pts_state
926 .lock()
927 .await
928 .getting_diff_for
929 .insert(channel_id);
930 let buffered = self
931 .inner
932 .possible_gap
933 .lock()
934 .await
935 .drain_channel(channel_id);
936 match self.get_channel_difference(channel_id).await {
937 Ok(mut diff_updates) => {
938 // diff complete, allow future gaps to be handled.
939 self.inner
940 .pts_state
941 .lock()
942 .await
943 .getting_diff_for
944 .remove(&channel_id);
945 diff_updates.splice(0..0, buffered);
946 Ok(diff_updates)
947 }
948 // Permanent access errors: remove the channel from pts tracking
949 // entirely (. The next update for this
950 // channel will have local=0 → PtsCheckResult::Ok, advancing pts
951 // without any gap fill. This breaks the infinite gap→CHANNEL_INVALID
952 // loop that happened when advance_channel kept the stale pts alive.
953 //
954 // Common causes:
955 // - access_hash not in peer cache (update arrived via updateShort
956 // which carries no chats list)
957 // - bot was kicked / channel deleted
958 Err(InvocationError::Rpc(ref e))
959 if e.name == "CHANNEL_INVALID"
960 || e.name == "CHANNEL_PRIVATE"
961 || e.name == "CHANNEL_NOT_MODIFIED" =>
962 {
963 tracing::debug!(
964 "[layer] channel {channel_id}: {}: removing from pts tracking \
965 (next update treated as first-seen, no gap fill)",
966 e.name
967 );
968 {
969 let mut s = self.inner.pts_state.lock().await;
970 s.getting_diff_for.remove(&channel_id);
971 s.channel_pts.remove(&channel_id); // ← fix: delete, not advance
972 }
973 Ok(buffered)
974 }
975 Err(InvocationError::Deserialize(ref msg)) => {
976 // Unrecognised constructor or parse failure: treat same as
977 // CHANNEL_INVALID: remove from tracking so we don't loop.
978 tracing::debug!(
979 "[layer] channel {channel_id}: deserialize error ({msg}): \
980 removing from pts tracking"
981 );
982 {
983 let mut s = self.inner.pts_state.lock().await;
984 s.getting_diff_for.remove(&channel_id);
985 s.channel_pts.remove(&channel_id);
986 }
987 Ok(buffered)
988 }
989 Err(e) => {
990 // also clear on unexpected errors so we don't get stuck.
991 self.inner
992 .pts_state
993 .lock()
994 .await
995 .getting_diff_for
996 .remove(&channel_id);
997 Err(e)
998 }
999 }
1000 } else {
1001 tracing::debug!(
1002 "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: buffering"
1003 );
1004 Ok(vec![])
1005 }
1006 }
1007 PtsCheckResult::Duplicate => {
1008 tracing::debug!("[layer] channel {channel_id} pts duplicate, discarding");
1009 Ok(vec![])
1010 }
1011 }
1012 }
1013
1014 /// Called periodically (e.g. from keepalive) to fire getDifference
1015 /// if no update has been received for > 15 minutes.
1016 ///
1017 /// also drives per-entry possible-gap deadlines independently of
1018 /// incoming updates. Previously the POSSIBLE_GAP_DEADLINE_MS window was
1019 /// only evaluated when a new incoming update called check_and_fill_gap
1020 /// meaning a quiet channel with a real gap would never fire getDifference
1021 /// until another update arrived. This
1022 /// which scans all LiveEntry.effective_deadline() on every keepalive tick.
1023 pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
1024 // Stuck-diff watchdog: if getting_global_diff has been true for more than
1025 // 30 s the in-flight getDifference RPC is assumed hung (e.g. half-open TCP
1026 // that the OS keepalive hasn't killed yet). Reset the guard so the next
1027 // gap_tick cycle can issue a fresh getDifference. The 30-second timeout
1028 // in get_difference() will concurrently return an error and also clear the
1029 // flag; this watchdog is a belt-and-suspenders safety net for edge cases
1030 // where that timeout itself is somehow delayed.
1031 {
1032 let stuck = {
1033 let s = self.inner.pts_state.lock().await;
1034 s.getting_global_diff
1035 && s.getting_global_diff_since
1036 .map(|t| t.elapsed().as_secs() > 30)
1037 .unwrap_or(false)
1038 };
1039 if stuck {
1040 tracing::warn!(
1041 "[layer] getDifference in-flight for >30 s: \
1042 resetting guard so gap_tick can retry"
1043 );
1044 let mut s = self.inner.pts_state.lock().await;
1045 s.getting_global_diff = false;
1046 s.getting_global_diff_since = None;
1047 }
1048 }
1049
1050 // existing 5-minute global timeout
1051 let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
1052 if exceeded {
1053 tracing::info!("[layer] update deadline exceeded: fetching getDifference");
1054 match self.get_difference().await {
1055 Ok(updates) => {
1056 for u in updates {
1057 if self.inner.update_tx.try_send(u).is_err() {
1058 tracing::warn!("[layer] update channel full: dropping diff update");
1059 }
1060 }
1061 }
1062 Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1063 // 401: gap buffer already cleared inside get_difference().
1064 // gap_tick will not re-fire. Supervisor handles reconnect.
1065 tracing::warn!(
1066 "[layer] deadline getDifference AUTH_KEY_UNREGISTERED: session dead"
1067 );
1068 }
1069 Err(e) => return Err(e),
1070 }
1071 }
1072
1073 // drive global possible-gap deadline
1074 // If the possible-gap window has expired but no new update has arrived
1075 // to trigger check_and_fill_gap, fire getDifference from here.
1076 {
1077 let gap_expired = self
1078 .inner
1079 .possible_gap
1080 .lock()
1081 .await
1082 .global_deadline_elapsed();
1083 // Note: get_difference() is now atomic (check-and-set), so the
1084 // `already` guard is advisory only; get_difference() will bail
1085 // safely if another call is already in flight.
1086 if gap_expired {
1087 tracing::debug!("[layer] B3 global possible-gap deadline expired: getDifference");
1088 // get_difference() snapshots and drains the pre-existing buffer at its
1089 // start (before the RPC), so updates that arrive DURING the RPC flight
1090 // remain in possible_gap for the next cycle. Never drain here.
1091 match self.get_difference().await {
1092 Ok(updates) => {
1093 for u in updates {
1094 if self.inner.update_tx.try_send(u).is_err() {
1095 tracing::warn!("[layer] update channel full: dropping gap update");
1096 }
1097 }
1098 }
1099 Err(e) if matches!(&e, InvocationError::Rpc(r) if r.code == 401) => {
1100 // 401: get_difference() cleared the gap buffer, so gap_tick
1101 // will not re-fire. Supervisor handles reconnect.
1102 tracing::warn!(
1103 "[layer] B3 global gap diff AUTH_KEY_UNREGISTERED: \
1104 session dead, gap buffer cleared"
1105 );
1106 }
1107 Err(e) => {
1108 tracing::warn!("[layer] B3 global gap diff failed: {e}");
1109 return Err(e);
1110 }
1111 }
1112 }
1113 }
1114
1115 // drive per-channel possible-gap deadlines
1116 // Collect expired channel IDs up-front to avoid holding the lock across awaits.
1117 let expired_channels: Vec<i64> = {
1118 let gap = self.inner.possible_gap.lock().await;
1119 gap.channel
1120 .keys()
1121 .copied()
1122 .filter(|&id| gap.channel_deadline_elapsed(id))
1123 .collect()
1124 };
1125 for channel_id in expired_channels {
1126 let already = self
1127 .inner
1128 .pts_state
1129 .lock()
1130 .await
1131 .getting_diff_for
1132 .contains(&channel_id);
1133 if already {
1134 continue;
1135 }
1136 tracing::debug!(
1137 "[layer] B3 channel {channel_id} possible-gap deadline expired: getChannelDifference"
1138 );
1139 // Mark in-flight before spawning so a racing incoming update can't
1140 // also spawn a diff for the same channel.
1141 self.inner
1142 .pts_state
1143 .lock()
1144 .await
1145 .getting_diff_for
1146 .insert(channel_id);
1147 let buffered = self
1148 .inner
1149 .possible_gap
1150 .lock()
1151 .await
1152 .drain_channel(channel_id);
1153 let c = self.clone();
1154 let utx = self.inner.update_tx.clone();
1155 tokio::spawn(async move {
1156 match c.get_channel_difference(channel_id).await {
1157 Ok(mut updates) => {
1158 c.inner
1159 .pts_state
1160 .lock()
1161 .await
1162 .getting_diff_for
1163 .remove(&channel_id);
1164 updates.splice(0..0, buffered);
1165 for u in updates {
1166 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1167 tracing::warn!(
1168 "[layer] update channel full: dropping ch gap update"
1169 );
1170 }
1171 }
1172 }
1173 Err(e) => {
1174 c.inner
1175 .pts_state
1176 .lock()
1177 .await
1178 .getting_diff_for
1179 .remove(&channel_id);
1180 // Permanent errors (CHANNEL_INVALID etc.): updates are
1181 // unrecoverable, drop them. Transient errors: restore
1182 // the buffer so the next B3 cycle can retry.
1183 let permanent = matches!(&e,
1184 InvocationError::Rpc(r)
1185 if r.code == 401
1186 || r.name == "CHANNEL_INVALID"
1187 || r.name == "CHANNEL_PRIVATE"
1188 || r.name == "CHANNEL_NOT_MODIFIED"
1189 ) || matches!(&e, InvocationError::Deserialize(_));
1190 if permanent {
1191 tracing::warn!(
1192 "[layer] B3 channel {channel_id} gap diff failed (permanent): {e}"
1193 );
1194 } else {
1195 tracing::warn!(
1196 "[layer] B3 channel {channel_id} gap diff failed (transient): {e}"
1197 );
1198 let mut gap = c.inner.possible_gap.lock().await;
1199 for u in buffered {
1200 gap.push_channel(channel_id, u);
1201 }
1202 }
1203 }
1204 }
1205 });
1206 }
1207
1208 Ok(())
1209 }
1210}