layer_client/pts.rs
1//! Update gap detection and recovery.
2//!
3//! Tracks `pts` / `qts` / `seq` / `date` plus per-channel pts, and
4//! fills gaps via `updates.getDifference` (global) and
5//! `updates.getChannelDifference` (per-channel).
6//!
7//! ## What "gap" means
8//! Telegram guarantees updates arrive in order within a pts counter.
9//! If `new_pts != local_pts + pts_count` there is a gap and we must
10//! ask the server for the missed updates before processing this one.
11
12use std::collections::{HashMap, HashSet};
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, RpcError, attach_client_to_update, update};
19
20/// How long to wait before declaring a pts jump a real gap (ms).
21const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
22
23/// Bots are allowed a much larger diff window (Telegram server-side limit).
24const CHANNEL_DIFF_LIMIT_BOT: i32 = 100_000;
25
26/// Buffers updates received during a possible-gap window so we don't fire
27/// getDifference on every slightly out-of-order update.
28#[derive(Default)]
29pub struct PossibleGapBuffer {
30 /// channel_id → (buffered_updates, window_start)
31 channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32 /// Global buffered updates (non-channel pts gaps)
33 global: Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37 pub fn new() -> Self {
38 Self::default()
39 }
40
41 /// Buffer a global update during a possible-gap window.
42 pub fn push_global(&mut self, upd: update::Update) {
43 let entry = self
44 .global
45 .get_or_insert_with(|| (Vec::new(), Instant::now()));
46 entry.0.push(upd);
47 }
48
49 /// Buffer a channel update during a possible-gap window.
50 pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
51 let entry = self
52 .channel
53 .entry(channel_id)
54 .or_insert_with(|| (Vec::new(), Instant::now()));
55 entry.0.push(upd);
56 }
57
58 /// True if the global possible-gap deadline has elapsed.
59 pub fn global_deadline_elapsed(&self) -> bool {
60 self.global
61 .as_ref()
62 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
63 .unwrap_or(false)
64 }
65
66 /// True if a channel's possible-gap deadline has elapsed.
67 pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
68 self.channel
69 .get(&channel_id)
70 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
71 .unwrap_or(false)
72 }
73
74 /// True if the global buffer has any pending updates.
75 pub fn has_global(&self) -> bool {
76 self.global.is_some()
77 }
78
79 /// True if a channel buffer has pending updates.
80 pub fn has_channel(&self, channel_id: i64) -> bool {
81 self.channel.contains_key(&channel_id)
82 }
83
84 /// Start the global deadline timer without buffering an update.
85 ///
86 /// Called when a gap is detected but the triggering update carries no
87 /// high-level `Update` value (e.g. `updateShortSentMessage` with `upd=None`).
88 /// Without this, `global` stays `None` → `global_deadline_elapsed()` always
89 /// returns `false` → `gap_tick` never fires getDifference for such gaps.
90 pub fn touch_global_timer(&mut self) {
91 self.global
92 .get_or_insert_with(|| (Vec::new(), Instant::now()));
93 }
94
95 /// Drain global buffered updates.
96 pub fn drain_global(&mut self) -> Vec<update::Update> {
97 self.global.take().map(|(v, _)| v).unwrap_or_default()
98 }
99
100 /// Drain channel buffered updates.
101 pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
102 self.channel
103 .remove(&channel_id)
104 .map(|(v, _)| v)
105 .unwrap_or_default()
106 }
107}
108
109// PtsState
110
111/// Full MTProto sequence-number state, including per-channel counters.
112///
113/// All fields are `pub` so that `connect()` can restore them from the
114/// persisted session without going through an artificial constructor.
115#[derive(Debug, Clone, Default)]
116pub struct PtsState {
117 /// Main pts counter (messages, non-channel updates).
118 pub pts: i32,
119 /// Secondary counter for secret-chat updates.
120 pub qts: i32,
121 /// Date of the last received update (Unix timestamp).
122 pub date: i32,
123 /// Combined-container sequence number.
124 pub seq: i32,
125 /// Per-channel pts counters. `channel_id → pts`.
126 pub channel_pts: HashMap<i64, i32>,
127 /// How many times getChannelDifference has been called per channel.
128 /// tDesktop starts at limit=100, then raises to 1000 after the first
129 /// successful response. We track call count to implement the same ramp-up.
130 pub channel_diff_calls: HashMap<i64, u32>,
131 /// Timestamp of last received update for deadline-based gap detection.
132 pub last_update_at: Option<Instant>,
133 /// Channels currently awaiting a getChannelDifference response.
134 /// If a channel is in this set, no new gap-fill task is spawned for it.
135 pub getting_diff_for: HashSet<i64>,
136 /// Guard against concurrent global getDifference calls.
137 /// Without this, two simultaneous gap detections both spawn get_difference(),
138 /// which double-processes updates and corrupts pts state.
139 pub getting_global_diff: bool,
140 /// When getting_global_diff was set to true. Used by the stuck-diff watchdog
141 /// in check_update_deadline: if the flag has been set for >30 s the RPC is
142 /// assumed hung and the guard is reset so the next gap_tick can retry.
143 pub getting_global_diff_since: Option<Instant>,
144}
145
146impl PtsState {
147 pub fn from_server_state(s: &tl::types::updates::State) -> Self {
148 Self {
149 pts: s.pts,
150 qts: s.qts,
151 date: s.date,
152 seq: s.seq,
153 channel_pts: HashMap::new(),
154 channel_diff_calls: HashMap::new(),
155 last_update_at: Some(Instant::now()),
156 getting_diff_for: HashSet::new(),
157 getting_global_diff: false,
158 getting_global_diff_since: None,
159 }
160 }
161
162 /// Record that an update was received now (resets the deadline timer).
163 pub fn touch(&mut self) {
164 self.last_update_at = Some(Instant::now());
165 }
166
167 /// Returns true if no update has been received for > 15 minutes.
168 pub fn deadline_exceeded(&self) -> bool {
169 self.last_update_at
170 .as_ref()
171 .map(|t| t.elapsed().as_secs() > 15 * 60)
172 .unwrap_or(false)
173 }
174
175 /// Check whether `new_pts` is in order given `pts_count` new updates.
176 pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
177 let expected = self.pts + pts_count;
178 if new_pts == expected {
179 PtsCheckResult::Ok
180 } else if new_pts > expected {
181 PtsCheckResult::Gap {
182 expected,
183 got: new_pts,
184 }
185 } else {
186 PtsCheckResult::Duplicate
187 }
188 }
189
190 /// Check a qts value (secret chat updates).
191 pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
192 let expected = self.qts + qts_count;
193 if new_qts == expected {
194 PtsCheckResult::Ok
195 } else if new_qts > expected {
196 PtsCheckResult::Gap {
197 expected,
198 got: new_qts,
199 }
200 } else {
201 PtsCheckResult::Duplicate
202 }
203 }
204
205 /// Check top-level seq for UpdatesCombined containers.
206 pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
207 if self.seq == 0 {
208 return PtsCheckResult::Ok;
209 } // uninitialised: accept
210 let expected = self.seq + 1;
211 if seq_start == expected {
212 PtsCheckResult::Ok
213 } else if seq_start > expected {
214 PtsCheckResult::Gap {
215 expected,
216 got: seq_start,
217 }
218 } else {
219 PtsCheckResult::Duplicate
220 }
221 }
222
223 /// Check a per-channel pts value.
224 pub fn check_channel_pts(
225 &self,
226 channel_id: i64,
227 new_pts: i32,
228 pts_count: i32,
229 ) -> PtsCheckResult {
230 let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
231 if local == 0 {
232 return PtsCheckResult::Ok;
233 }
234 let expected = local + pts_count;
235 if new_pts == expected {
236 PtsCheckResult::Ok
237 } else if new_pts > expected {
238 PtsCheckResult::Gap {
239 expected,
240 got: new_pts,
241 }
242 } else {
243 PtsCheckResult::Duplicate
244 }
245 }
246
247 /// Advance the global pts.
248 pub fn advance(&mut self, new_pts: i32) {
249 if new_pts > self.pts {
250 self.pts = new_pts;
251 }
252 self.touch();
253 }
254
255 /// Advance the qts.
256 pub fn advance_qts(&mut self, new_qts: i32) {
257 if new_qts > self.qts {
258 self.qts = new_qts;
259 }
260 self.touch();
261 }
262
263 /// Advance seq.
264 pub fn advance_seq(&mut self, new_seq: i32) {
265 if new_seq > self.seq {
266 self.seq = new_seq;
267 }
268 }
269
270 /// Advance a per-channel pts.
271 pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
272 let entry = self.channel_pts.entry(channel_id).or_insert(0);
273 if new_pts > *entry {
274 *entry = new_pts;
275 }
276 self.touch();
277 }
278}
279
280#[derive(Debug, PartialEq, Eq)]
281pub enum PtsCheckResult {
282 Ok,
283 Gap { expected: i32, got: i32 },
284 Duplicate,
285}
286
287// Client methods
288
289impl Client {
290 // Global getDifference
291
292 /// Fetch and replay any updates missed since the persisted pts.
293 ///
294 /// loops on `Difference::Slice` (partial response) until the server
295 /// returns a final `Difference` or `Empty`
296 /// never dropping a partial batch. Previous code returned after one slice,
297 /// silently losing all updates in subsequent slices.
298 pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
299 // Atomically claim the in-flight slot.
300 //
301 // TOCTOU fix: the old code set getting_global_diff=true inside get_difference
302 // but the external guard in check_and_fill_gap read the flag in a SEPARATE lock
303 // acquisition. With multi-threaded Tokio, N tasks could all read false, all
304 // pass the external check, then all race into this function and all set the flag
305 // to true. Each concurrent call resets pts_state to the server state it received;
306 // the last STALE write rolls pts back below where it actually is, which immediately
307 // triggers a new gap → another burst of concurrent getDifference calls → cascade.
308 //
309 // Fix: check-and-set inside a SINGLE lock acquisition. Only the first caller
310 // proceeds; all others see true and return Ok(vec![]) immediately.
311 {
312 let mut s = self.inner.pts_state.lock().await;
313 if s.getting_global_diff {
314 return Ok(vec![]);
315 }
316 s.getting_global_diff = true;
317 s.getting_global_diff_since = Some(Instant::now());
318 }
319
320 // Drain the initial-gap buffer before the RPC.
321 //
322 // possible_gap.global contains updates buffered during the possible-gap window
323 // (before we decided to call getDiff). The server response covers exactly
324 // these pts values → discard the snapshot on success.
325 // On RPC error, restore them so the next gap_tick can retry.
326 //
327 // Note: updates arriving DURING the RPC flight are now force-dispatched by
328 // check_and_fill_gap and never accumulate in possible_gap,
329 // so there is nothing extra to drain after the call returns.
330 let pre_diff = self.inner.possible_gap.lock().await.drain_global();
331
332 // Wrap the RPC in a hard 30-second timeout so a hung TCP connection
333 // (half-open socket, unresponsive DC) cannot hold getting_global_diff=true
334 // forever and freeze the bot indefinitely.
335 let result = tokio::time::timeout(
336 std::time::Duration::from_secs(30),
337 self.get_difference_inner(),
338 )
339 .await
340 .unwrap_or_else(|_| {
341 tracing::warn!("[layer] getDifference RPC timed out after 30 s: will retry");
342 Err(InvocationError::Io(std::io::Error::new(
343 std::io::ErrorKind::TimedOut,
344 "getDifference timed out",
345 )))
346 });
347
348 // Always clear the guard, even on error.
349 {
350 let mut s = self.inner.pts_state.lock().await;
351 s.getting_global_diff = false;
352 s.getting_global_diff_since = None;
353 }
354
355 match &result {
356 Ok(_) => {
357 // pre_diff is covered by the server response; discard it.
358 // (Flight-time updates are force-dispatched, not buffered into possible_gap.)
359 }
360 Err(_) => {
361 // Restore pre-existing items so the next gap_tick retry sees them.
362 let mut gap = self.inner.possible_gap.lock().await;
363 for u in pre_diff {
364 gap.push_global(u);
365 }
366 }
367 }
368
369 result
370 }
371
372 async fn get_difference_inner(&self) -> Result<Vec<update::Update>, InvocationError> {
373 use layer_tl_types::{Cursor, Deserializable};
374
375 let mut all_updates: Vec<update::Update> = Vec::new();
376
377 // loop until the server sends a final (non-Slice) response.
378 loop {
379 let (pts, qts, date) = {
380 let s = self.inner.pts_state.lock().await;
381 (s.pts, s.qts, s.date)
382 };
383
384 if pts == 0 {
385 self.sync_pts_state().await?;
386 return Ok(all_updates);
387 }
388
389 tracing::debug!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
390
391 let req = tl::functions::updates::GetDifference {
392 pts,
393 pts_limit: None,
394 pts_total_limit: None,
395 date,
396 qts,
397 qts_limit: None,
398 };
399
400 let body = self.rpc_call_raw_pub(&req).await?;
401 let body = crate::maybe_gz_decompress(body)?;
402 let mut cur = Cursor::from_slice(&body);
403 let diff = tl::enums::updates::Difference::deserialize(&mut cur)?;
404
405 match diff {
406 tl::enums::updates::Difference::Empty(e) => {
407 let mut s = self.inner.pts_state.lock().await;
408 s.date = e.date;
409 s.seq = e.seq;
410 s.touch();
411 tracing::debug!("[layer] getDifference: empty (seq={})", e.seq);
412 return Ok(all_updates);
413 }
414
415 tl::enums::updates::Difference::Difference(d) => {
416 tracing::debug!(
417 "[layer] getDifference: {} messages, {} updates (final)",
418 d.new_messages.len(),
419 d.other_updates.len()
420 );
421 self.cache_users_slice_pub(&d.users).await;
422 self.cache_chats_slice_pub(&d.chats).await;
423 for msg in d.new_messages {
424 all_updates.push(update::Update::NewMessage(
425 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
426 ));
427 }
428 for upd in d.other_updates {
429 all_updates.extend(update::from_single_update_pub(upd));
430 }
431 let tl::enums::updates::State::State(ns) = d.state;
432 let saved_channel_pts = {
433 let s = self.inner.pts_state.lock().await;
434 s.channel_pts.clone()
435 };
436 let mut new_state = PtsState::from_server_state(&ns);
437 // Preserve per-channel pts across the global reset.
438 for (cid, cpts) in saved_channel_pts {
439 new_state.channel_pts.entry(cid).or_insert(cpts);
440 }
441 // Preserve in-flight sets: we clear getting_global_diff ourselves.
442 new_state.getting_global_diff = true; // will be cleared by caller
443 {
444 let mut s = self.inner.pts_state.lock().await;
445 let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
446 let since = s.getting_global_diff_since; // preserve watchdog timestamp
447 *s = new_state;
448 s.getting_diff_for = getting_diff_for;
449 s.getting_global_diff_since = since;
450 }
451 // Final response: stop looping.
452 return Ok(all_updates);
453 }
454
455 tl::enums::updates::Difference::Slice(d) => {
456 // server has more data: apply intermediate_state and
457 // continue looping. Old code returned here, losing all updates
458 // in subsequent slices.
459 tracing::debug!(
460 "[layer] getDifference slice: {} messages, {} updates: continuing",
461 d.new_messages.len(),
462 d.other_updates.len()
463 );
464 self.cache_users_slice_pub(&d.users).await;
465 self.cache_chats_slice_pub(&d.chats).await;
466 for msg in d.new_messages {
467 all_updates.push(update::Update::NewMessage(
468 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
469 ));
470 }
471 for upd in d.other_updates {
472 all_updates.extend(update::from_single_update_pub(upd));
473 }
474 let tl::enums::updates::State::State(ns) = d.intermediate_state;
475 let saved_channel_pts = {
476 let s = self.inner.pts_state.lock().await;
477 s.channel_pts.clone()
478 };
479 let mut new_state = PtsState::from_server_state(&ns);
480 for (cid, cpts) in saved_channel_pts {
481 new_state.channel_pts.entry(cid).or_insert(cpts);
482 }
483 new_state.getting_global_diff = true;
484 {
485 let mut s = self.inner.pts_state.lock().await;
486 let getting_diff_for = std::mem::take(&mut s.getting_diff_for);
487 let since = s.getting_global_diff_since; // preserve watchdog timestamp
488 *s = new_state;
489 s.getting_diff_for = getting_diff_for;
490 s.getting_global_diff_since = since;
491 }
492 // Loop: fetch the next slice.
493 continue;
494 }
495
496 tl::enums::updates::Difference::TooLong(d) => {
497 tracing::warn!("[layer] getDifference: TooLong (pts={}): re-syncing", d.pts);
498 self.inner.pts_state.lock().await.pts = d.pts;
499 self.sync_pts_state().await?;
500 return Ok(all_updates);
501 }
502 }
503 }
504 }
505
506 // Per-channel getChannelDifference
507
508 /// Fetch missed updates for a single channel.
509 pub async fn get_channel_difference(
510 &self,
511 channel_id: i64,
512 ) -> Result<Vec<update::Update>, InvocationError> {
513 let local_pts = self
514 .inner
515 .pts_state
516 .lock()
517 .await
518 .channel_pts
519 .get(&channel_id)
520 .copied()
521 .unwrap_or(0);
522
523 let access_hash = self
524 .inner
525 .peer_cache
526 .read()
527 .await
528 .channels
529 .get(&channel_id)
530 .copied()
531 .unwrap_or(0);
532
533 // No access hash in cache → we can't call getChannelDifference.
534 // Attempting GetChannels with access_hash=0 also returns CHANNEL_INVALID,
535 // so skip the call entirely and let the caller handle it.
536 if access_hash == 0 {
537 tracing::debug!(
538 "[layer] channel {channel_id}: access_hash not cached, \
539 cannot call getChannelDifference: caller will remove from tracking"
540 );
541 return Err(InvocationError::Rpc(RpcError {
542 code: 400,
543 name: "CHANNEL_INVALID".into(),
544 value: None,
545 }));
546 }
547
548 tracing::debug!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
549
550 let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
551 channel_id,
552 access_hash,
553 });
554
555 // tDesktop ramp-up: limit=100 on first call, 1000 on subsequent ones.
556 // Bots always use the server-side maximum (100_000).
557 let diff_limit = if self.inner.is_bot.load(std::sync::atomic::Ordering::Relaxed) {
558 CHANNEL_DIFF_LIMIT_BOT
559 } else {
560 let call_count = self
561 .inner
562 .pts_state
563 .lock()
564 .await
565 .channel_diff_calls
566 .get(&channel_id)
567 .copied()
568 .unwrap_or(0);
569 if call_count == 0 { 100 } else { 1000 }
570 };
571
572 let req = tl::functions::updates::GetChannelDifference {
573 force: false,
574 channel,
575 filter: tl::enums::ChannelMessagesFilter::Empty,
576 pts: local_pts.max(1),
577 limit: diff_limit,
578 };
579
580 let body = match self.rpc_call_raw_pub(&req).await {
581 Ok(b) => {
582 // Successful call bump the per-channel counter so next call uses 1000.
583 self.inner
584 .pts_state
585 .lock()
586 .await
587 .channel_diff_calls
588 .entry(channel_id)
589 .and_modify(|c| *c = c.saturating_add(1))
590 .or_insert(1);
591 b
592 }
593 Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
594 // treat as empty diff: retry next gap
595 tracing::debug!("[layer] PERSISTENT_TIMESTAMP_OUTDATED: skipping diff");
596 return Ok(vec![]);
597 }
598 Err(e) => return Err(e),
599 };
600 let body = crate::maybe_gz_decompress(body)?;
601 let mut cur = Cursor::from_slice(&body);
602 let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
603
604 let mut updates = Vec::new();
605
606 match diff {
607 tl::enums::updates::ChannelDifference::Empty(e) => {
608 tracing::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
609 self.inner
610 .pts_state
611 .lock()
612 .await
613 .advance_channel(channel_id, e.pts);
614 }
615 tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
616 tracing::debug!(
617 "[layer] getChannelDifference: {} messages, {} updates",
618 d.new_messages.len(),
619 d.other_updates.len()
620 );
621 self.cache_users_slice_pub(&d.users).await;
622 self.cache_chats_slice_pub(&d.chats).await;
623 for msg in d.new_messages {
624 updates.push(update::Update::NewMessage(
625 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
626 ));
627 }
628 for upd in d.other_updates {
629 updates.extend(update::from_single_update_pub(upd));
630 }
631 self.inner
632 .pts_state
633 .lock()
634 .await
635 .advance_channel(channel_id, d.pts);
636 }
637 tl::enums::updates::ChannelDifference::TooLong(d) => {
638 tracing::warn!(
639 "[layer] getChannelDifference TooLong: replaying messages, resetting pts"
640 );
641 self.cache_users_slice_pub(&d.users).await;
642 self.cache_chats_slice_pub(&d.chats).await;
643 for msg in d.messages {
644 updates.push(update::Update::NewMessage(
645 update::IncomingMessage::from_raw(msg).with_client(self.clone()),
646 ));
647 }
648 self.inner
649 .pts_state
650 .lock()
651 .await
652 .advance_channel(channel_id, 0);
653 }
654 }
655
656 Ok(updates)
657 }
658
659 // Sync from server
660
661 pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
662 let body = self
663 .rpc_call_raw_pub(&tl::functions::updates::GetState {})
664 .await?;
665 let mut cur = Cursor::from_slice(&body);
666 let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
667 let mut state = self.inner.pts_state.lock().await;
668 state.pts = s.pts;
669 state.qts = s.qts;
670 state.date = s.date;
671 state.seq = s.seq;
672 state.touch();
673 tracing::debug!(
674 "[layer] pts synced: pts={}, qts={}, seq={}",
675 s.pts,
676 s.qts,
677 s.seq
678 );
679 Ok(())
680 }
681 /// Check global pts, buffer during possible-gap window, fetch diff if real gap.
682 ///
683 /// When a global getDifference is already in-flight (`getting_global_diff == true`),
684 /// updates are **force-dispatched** immediately without pts tracking.
685 /// This prevents the cascade freeze that buffering caused:
686 /// 1. getDiff runs; flight-buffered updates pile up in `possible_gap`.
687 /// 2. getDiff returns; `gap_tick` sees `has_global()=true` → another getDiff.
688 /// 3. Each getDiff spawns another → bot freezes under a burst of messages.
689 pub async fn check_and_fill_gap(
690 &self,
691 new_pts: i32,
692 pts_count: i32,
693 upd: Option<update::Update>,
694 ) -> Result<Vec<update::Update>, InvocationError> {
695 // getDiff in flight: force updates through without pts tracking.
696 //
697 // Force-dispatch: socket updates are sent through
698 // when getting_diff_for contains the key; no buffering, no pts check.
699 // Buffering caused a cascade of getDiff calls and a bot freeze under bursts.
700 // Force-dispatch means these may duplicate what getDiff returns (same pts
701 // range), which is acceptable: Telegram's spec explicitly states that socket
702 // updates received during getDiff "should also have been retrieved through
703 // getDifference". Application-layer deduplication by message_id handles doubles.
704 // pts is NOT advanced here; getDiff sets it authoritatively when it returns.
705 if self.inner.pts_state.lock().await.getting_global_diff {
706 tracing::debug!("[layer] global diff in flight: force-applying pts={new_pts}");
707 return Ok(upd.into_iter().collect());
708 }
709
710 let result = self
711 .inner
712 .pts_state
713 .lock()
714 .await
715 .check_pts(new_pts, pts_count);
716 match result {
717 PtsCheckResult::Ok => {
718 // Advance pts and dispatch only this update.
719 //
720 // Do NOT blindly drain possible_gap here.
721 //
722 // Old behaviour: drain all buffered updates and return them together
723 // with the Ok update. This caused a second freeze:
724 //
725 // 1. pts=1021. Burst arrives: 1024-1030 all gap → buffered.
726 // 2. Update 1022 arrives → Ok → drain dispatches 1024-1030.
727 // 3. pts advances only to 1022 (the Ok value), NOT to 1030.
728 // 4. Bot sends replies → updateShortSentMessage pts=1031 →
729 // check_and_fill_gap: expected=1023, got=1031 → GAP.
730 // 5. Cascade getDiff → duplicates → flood-wait → freeze.
731 //
732 // Grammers avoids this by re-checking each buffered update in
733 // order and advancing pts for each one (process_updates inner loop
734 // over entry.possible_gap). Layer's Update enum carries no pts
735 // metadata, so we cannot replicate that ordered sequence check here.
736 //
737 // Correct equivalent: leave possible_gap alone. The buffered
738 // updates will be recovered by gap_tick → getDiff(new_pts), which
739 // drains possible_gap into pre_diff, lets the server fill the
740 // gap, and advances pts to the true server state; no stale pts,
741 // no secondary cascade, no duplicates.
742 self.inner.pts_state.lock().await.advance(new_pts);
743 Ok(upd.into_iter().collect())
744 }
745 PtsCheckResult::Gap { expected, got } => {
746 // Buffer the update; start the deadline timer regardless.
747 //
748 // Bug fix (touch_global_timer): when upd=None (e.g. the gap is
749 // triggered by an updateShortSentMessage RPC response), nothing was
750 // ever pushed to possible_gap.global, so global stayed None.
751 // global_deadline_elapsed() returned false forever, gap_tick never
752 // saw has_global()=true, and the gap was never resolved unless a
753 // subsequent user message arrived. touch_global_timer() starts the
754 // 1-second deadline clock even without a buffered update.
755 {
756 let mut gap = self.inner.possible_gap.lock().await;
757 if let Some(u) = upd {
758 gap.push_global(u);
759 } else {
760 gap.touch_global_timer();
761 }
762 }
763 let deadline_elapsed = self
764 .inner
765 .possible_gap
766 .lock()
767 .await
768 .global_deadline_elapsed();
769 if deadline_elapsed {
770 tracing::warn!(
771 "[layer] global pts gap: expected {expected}, got {got}: getDifference"
772 );
773 // get_difference() is now atomic (check-and-set) and drains the
774 // possible_gap buffer internally on success, so callers must NOT
775 // drain before calling or splice the old buffer onto the results.
776 // Doing so caused every gap update to be dispatched twice, which
777 // triggered FLOOD_WAIT, blocked the handler, and froze the bot.
778 self.get_difference().await
779 } else {
780 tracing::debug!(
781 "[layer] global pts gap: expected {expected}, got {got}: buffering (possible gap)"
782 );
783 Ok(vec![])
784 }
785 }
786 PtsCheckResult::Duplicate => {
787 tracing::debug!("[layer] global pts duplicate, discarding");
788 Ok(vec![])
789 }
790 }
791 }
792
793 /// Check qts (secret chat updates) and fill gap if needed.
794 pub async fn check_and_fill_qts_gap(
795 &self,
796 new_qts: i32,
797 qts_count: i32,
798 ) -> Result<Vec<update::Update>, InvocationError> {
799 let result = self
800 .inner
801 .pts_state
802 .lock()
803 .await
804 .check_qts(new_qts, qts_count);
805 match result {
806 PtsCheckResult::Ok => {
807 self.inner.pts_state.lock().await.advance_qts(new_qts);
808 Ok(vec![])
809 }
810 PtsCheckResult::Gap { expected, got } => {
811 tracing::warn!("[layer] qts gap: expected {expected}, got {got}: getDifference");
812 self.get_difference().await
813 }
814 PtsCheckResult::Duplicate => Ok(vec![]),
815 }
816 }
817
818 /// Check top-level seq and fill gap if needed.
819 pub async fn check_and_fill_seq_gap(
820 &self,
821 new_seq: i32,
822 seq_start: i32,
823 ) -> Result<Vec<update::Update>, InvocationError> {
824 let result = self
825 .inner
826 .pts_state
827 .lock()
828 .await
829 .check_seq(new_seq, seq_start);
830 match result {
831 PtsCheckResult::Ok => {
832 self.inner.pts_state.lock().await.advance_seq(new_seq);
833 Ok(vec![])
834 }
835 PtsCheckResult::Gap { expected, got } => {
836 tracing::warn!("[layer] seq gap: expected {expected}, got {got}: getDifference");
837 self.get_difference().await
838 }
839 PtsCheckResult::Duplicate => Ok(vec![]),
840 }
841 }
842
843 /// Check a per-channel pts, fetch getChannelDifference if there is a gap.
844 pub async fn check_and_fill_channel_gap(
845 &self,
846 channel_id: i64,
847 new_pts: i32,
848 pts_count: i32,
849 upd: Option<update::Update>,
850 ) -> Result<Vec<update::Update>, InvocationError> {
851 // if a diff is already in flight for this channel, skip: prevents
852 // 1 gap from spawning N concurrent getChannelDifference tasks.
853 if self
854 .inner
855 .pts_state
856 .lock()
857 .await
858 .getting_diff_for
859 .contains(&channel_id)
860 {
861 tracing::debug!("[layer] channel {channel_id} diff already in flight, skipping");
862 if let Some(u) = upd {
863 self.inner
864 .possible_gap
865 .lock()
866 .await
867 .push_channel(channel_id, u);
868 }
869 return Ok(vec![]);
870 }
871
872 let result = self
873 .inner
874 .pts_state
875 .lock()
876 .await
877 .check_channel_pts(channel_id, new_pts, pts_count);
878 match result {
879 PtsCheckResult::Ok => {
880 let mut buffered = self
881 .inner
882 .possible_gap
883 .lock()
884 .await
885 .drain_channel(channel_id);
886 self.inner
887 .pts_state
888 .lock()
889 .await
890 .advance_channel(channel_id, new_pts);
891 if let Some(u) = upd {
892 buffered.push(u);
893 }
894 Ok(buffered)
895 }
896 PtsCheckResult::Gap { expected, got } => {
897 if let Some(u) = upd {
898 self.inner
899 .possible_gap
900 .lock()
901 .await
902 .push_channel(channel_id, u);
903 }
904 let deadline_elapsed = self
905 .inner
906 .possible_gap
907 .lock()
908 .await
909 .channel_deadline_elapsed(channel_id);
910 if deadline_elapsed {
911 tracing::warn!(
912 "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: getChannelDifference"
913 );
914 // mark this channel as having a diff in flight.
915 self.inner
916 .pts_state
917 .lock()
918 .await
919 .getting_diff_for
920 .insert(channel_id);
921 let buffered = self
922 .inner
923 .possible_gap
924 .lock()
925 .await
926 .drain_channel(channel_id);
927 match self.get_channel_difference(channel_id).await {
928 Ok(mut diff_updates) => {
929 // diff complete, allow future gaps to be handled.
930 self.inner
931 .pts_state
932 .lock()
933 .await
934 .getting_diff_for
935 .remove(&channel_id);
936 diff_updates.splice(0..0, buffered);
937 Ok(diff_updates)
938 }
939 // Permanent access errors: remove the channel from pts tracking
940 // entirely (. The next update for this
941 // channel will have local=0 → PtsCheckResult::Ok, advancing pts
942 // without any gap fill. This breaks the infinite gap→CHANNEL_INVALID
943 // loop that happened when advance_channel kept the stale pts alive.
944 //
945 // Common causes:
946 // - access_hash not in peer cache (update arrived via updateShort
947 // which carries no chats list)
948 // - bot was kicked / channel deleted
949 Err(InvocationError::Rpc(ref e))
950 if e.name == "CHANNEL_INVALID"
951 || e.name == "CHANNEL_PRIVATE"
952 || e.name == "CHANNEL_NOT_MODIFIED" =>
953 {
954 tracing::debug!(
955 "[layer] channel {channel_id}: {}: removing from pts tracking \
956 (next update treated as first-seen, no gap fill)",
957 e.name
958 );
959 {
960 let mut s = self.inner.pts_state.lock().await;
961 s.getting_diff_for.remove(&channel_id);
962 s.channel_pts.remove(&channel_id); // ← fix: delete, not advance
963 }
964 Ok(buffered)
965 }
966 Err(InvocationError::Deserialize(ref msg)) => {
967 // Unrecognised constructor or parse failure: treat same as
968 // CHANNEL_INVALID: remove from tracking so we don't loop.
969 tracing::debug!(
970 "[layer] channel {channel_id}: deserialize error ({msg}): \
971 removing from pts tracking"
972 );
973 {
974 let mut s = self.inner.pts_state.lock().await;
975 s.getting_diff_for.remove(&channel_id);
976 s.channel_pts.remove(&channel_id);
977 }
978 Ok(buffered)
979 }
980 Err(e) => {
981 // also clear on unexpected errors so we don't get stuck.
982 self.inner
983 .pts_state
984 .lock()
985 .await
986 .getting_diff_for
987 .remove(&channel_id);
988 Err(e)
989 }
990 }
991 } else {
992 tracing::debug!(
993 "[layer] channel {channel_id} pts gap: expected {expected}, got {got}: buffering"
994 );
995 Ok(vec![])
996 }
997 }
998 PtsCheckResult::Duplicate => {
999 tracing::debug!("[layer] channel {channel_id} pts duplicate, discarding");
1000 Ok(vec![])
1001 }
1002 }
1003 }
1004
1005 /// Called periodically (e.g. from keepalive) to fire getDifference
1006 /// if no update has been received for > 15 minutes.
1007 ///
1008 /// also drives per-entry possible-gap deadlines independently of
1009 /// incoming updates. Previously the POSSIBLE_GAP_DEADLINE_MS window was
1010 /// only evaluated when a new incoming update called check_and_fill_gap
1011 /// meaning a quiet channel with a real gap would never fire getDifference
1012 /// until another update arrived. This
1013 /// which scans all LiveEntry.effective_deadline() on every keepalive tick.
1014 pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
1015 // Stuck-diff watchdog: if getting_global_diff has been true for more than
1016 // 30 s the in-flight getDifference RPC is assumed hung (e.g. half-open TCP
1017 // that the OS keepalive hasn't killed yet). Reset the guard so the next
1018 // gap_tick cycle can issue a fresh getDifference. The 30-second timeout
1019 // in get_difference() will concurrently return an error and also clear the
1020 // flag; this watchdog is a belt-and-suspenders safety net for edge cases
1021 // where that timeout itself is somehow delayed.
1022 {
1023 let stuck = {
1024 let s = self.inner.pts_state.lock().await;
1025 s.getting_global_diff
1026 && s.getting_global_diff_since
1027 .map(|t| t.elapsed().as_secs() > 30)
1028 .unwrap_or(false)
1029 };
1030 if stuck {
1031 tracing::warn!(
1032 "[layer] getDifference in-flight for >30 s: \
1033 resetting guard so gap_tick can retry"
1034 );
1035 let mut s = self.inner.pts_state.lock().await;
1036 s.getting_global_diff = false;
1037 s.getting_global_diff_since = None;
1038 }
1039 }
1040
1041 // existing 5-minute global timeout
1042 let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
1043 if exceeded {
1044 tracing::info!("[layer] update deadline exceeded: fetching getDifference");
1045 let updates = self.get_difference().await?;
1046 for u in updates {
1047 if self.inner.update_tx.try_send(u).is_err() {
1048 tracing::warn!("[layer] update channel full: dropping diff update");
1049 }
1050 }
1051 }
1052
1053 // drive global possible-gap deadline
1054 // If the possible-gap window has expired but no new update has arrived
1055 // to trigger check_and_fill_gap, fire getDifference from here.
1056 {
1057 let gap_expired = self
1058 .inner
1059 .possible_gap
1060 .lock()
1061 .await
1062 .global_deadline_elapsed();
1063 // Note: get_difference() is now atomic (check-and-set), so the
1064 // `already` guard is advisory only; get_difference() will bail
1065 // safely if another call is already in flight.
1066 if gap_expired {
1067 tracing::debug!("[layer] B3 global possible-gap deadline expired: getDifference");
1068 // get_difference() snapshots and drains the pre-existing buffer at its
1069 // start (before the RPC), so updates that arrive DURING the RPC flight
1070 // remain in possible_gap for the next cycle. Never drain here.
1071 match self.get_difference().await {
1072 Ok(updates) => {
1073 for u in updates {
1074 if self.inner.update_tx.try_send(u).is_err() {
1075 tracing::warn!("[layer] update channel full: dropping gap update");
1076 }
1077 }
1078 }
1079 Err(e) => {
1080 tracing::warn!("[layer] B3 global gap diff failed: {e}");
1081 return Err(e);
1082 }
1083 }
1084 }
1085 }
1086
1087 // drive per-channel possible-gap deadlines
1088 // Collect expired channel IDs up-front to avoid holding the lock across awaits.
1089 let expired_channels: Vec<i64> = {
1090 let gap = self.inner.possible_gap.lock().await;
1091 gap.channel
1092 .keys()
1093 .copied()
1094 .filter(|&id| gap.channel_deadline_elapsed(id))
1095 .collect()
1096 };
1097 for channel_id in expired_channels {
1098 let already = self
1099 .inner
1100 .pts_state
1101 .lock()
1102 .await
1103 .getting_diff_for
1104 .contains(&channel_id);
1105 if already {
1106 continue;
1107 }
1108 tracing::debug!(
1109 "[layer] B3 channel {channel_id} possible-gap deadline expired: getChannelDifference"
1110 );
1111 // Mark in-flight before spawning so a racing incoming update can't
1112 // also spawn a diff for the same channel.
1113 self.inner
1114 .pts_state
1115 .lock()
1116 .await
1117 .getting_diff_for
1118 .insert(channel_id);
1119 let buffered = self
1120 .inner
1121 .possible_gap
1122 .lock()
1123 .await
1124 .drain_channel(channel_id);
1125 let c = self.clone();
1126 let utx = self.inner.update_tx.clone();
1127 tokio::spawn(async move {
1128 match c.get_channel_difference(channel_id).await {
1129 Ok(mut updates) => {
1130 c.inner
1131 .pts_state
1132 .lock()
1133 .await
1134 .getting_diff_for
1135 .remove(&channel_id);
1136 updates.splice(0..0, buffered);
1137 for u in updates {
1138 if utx.try_send(attach_client_to_update(u, &c)).is_err() {
1139 tracing::warn!(
1140 "[layer] update channel full: dropping ch gap update"
1141 );
1142 }
1143 }
1144 }
1145 Err(e) => {
1146 tracing::warn!("[layer] B3 channel {channel_id} gap diff failed: {e}");
1147 c.inner
1148 .pts_state
1149 .lock()
1150 .await
1151 .getting_diff_for
1152 .remove(&channel_id);
1153 }
1154 }
1155 });
1156 }
1157
1158 Ok(())
1159 }
1160}