Skip to main content

layer_client/
pts.rs

1//! Update gap detection and recovery.
2//!
3//! Tracks `pts` / `qts` / `seq` / `date` plus **per-channel pts**, and
4//! fills gaps via `updates.getDifference` (global) and
5//! `updates.getChannelDifference` (per-channel, gap G-15).
6//!
7//! ## What "gap" means
8//! Telegram guarantees updates arrive in order within a pts counter.
9//! If `new_pts != local_pts + pts_count` there is a gap and we must
10//! ask the server for the missed updates before processing this one.
11
12use std::collections::{HashMap, HashSet};
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, RpcError, update};
19
20// ─── PossibleGapBuffer (G-17) ─────────────────────────────────────────────────
21
22/// How long to wait before declaring a pts jump a real gap (ms).
23/// grammers uses a similar short window before triggering getDifference.
24const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
25
26/// Buffers updates received during a possible-gap window so we don't fire
27/// getDifference on every slightly out-of-order update.
28#[derive(Default)]
29pub struct PossibleGapBuffer {
30    /// channel_id → (buffered_updates, window_start)
31    channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32    /// Global buffered updates (non-channel pts gaps)
33    global:  Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37    pub fn new() -> Self { Self::default() }
38
39    /// Buffer a global update during a possible-gap window.
40    pub fn push_global(&mut self, upd: update::Update) {
41        let entry = self.global.get_or_insert_with(|| (Vec::new(), Instant::now()));
42        entry.0.push(upd);
43    }
44
45    /// Buffer a channel update during a possible-gap window.
46    pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
47        let entry = self.channel.entry(channel_id).or_insert_with(|| (Vec::new(), Instant::now()));
48        entry.0.push(upd);
49    }
50
51    /// True if the global possible-gap deadline has elapsed.
52    pub fn global_deadline_elapsed(&self) -> bool {
53        self.global.as_ref()
54            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
55            .unwrap_or(false)
56    }
57
58    /// True if a channel's possible-gap deadline has elapsed.
59    pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
60        self.channel.get(&channel_id)
61            .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
62            .unwrap_or(false)
63    }
64
65    /// True if the global buffer has any pending updates.
66    pub fn has_global(&self) -> bool { self.global.is_some() }
67
68    /// True if a channel buffer has pending updates.
69    pub fn has_channel(&self, channel_id: i64) -> bool { self.channel.contains_key(&channel_id) }
70
71    /// Drain global buffered updates.
72    pub fn drain_global(&mut self) -> Vec<update::Update> {
73        self.global.take().map(|(v, _)| v).unwrap_or_default()
74    }
75
76    /// Drain channel buffered updates.
77    pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
78        self.channel.remove(&channel_id).map(|(v, _)| v).unwrap_or_default()
79    }
80}
81
82// ─── PtsState ─────────────────────────────────────────────────────────────────
83
84/// Full MTProto sequence-number state, including per-channel counters.
85///
86/// All fields are `pub` so that `connect()` can restore them from the
87/// persisted session without going through an artificial constructor.
88#[derive(Debug, Clone, Default)]
89pub struct PtsState {
90    /// Main pts counter (messages, non-channel updates).
91    pub pts:  i32,
92    /// G-18: Secondary counter for secret-chat updates.
93    pub qts:  i32,
94    /// Date of the last received update (Unix timestamp).
95    pub date: i32,
96    /// G-19: Combined-container sequence number.
97    pub seq:  i32,
98    /// Per-channel pts counters.  `channel_id → pts`.
99    pub channel_pts: HashMap<i64, i32>,
100    /// G-16: Timestamp of last received update for deadline-based gap detection.
101    pub last_update_at: Option<Instant>,
102    /// Fix #4: Channels currently awaiting a getChannelDifference response.
103    /// If a channel is in this set, no new gap-fill task is spawned for it —
104    /// matches grammers' `getting_diff_for` guard that prevents 1 gap → N tasks.
105    pub getting_diff_for: HashSet<i64>,
106}
107
108
109impl PtsState {
110    pub fn from_server_state(s: &tl::types::updates::State) -> Self {
111        Self {
112            pts: s.pts, qts: s.qts, date: s.date, seq: s.seq,
113            channel_pts: HashMap::new(),
114            last_update_at: Some(Instant::now()),
115            getting_diff_for: HashSet::new(),
116        }
117    }
118
119    /// Record that an update was received now (resets the deadline timer).
120    pub fn touch(&mut self) {
121        self.last_update_at = Some(Instant::now());
122    }
123
124    /// G-16: Returns true if no update has been received for > 15 minutes.
125    pub fn deadline_exceeded(&self) -> bool {
126        self.last_update_at.as_ref()
127            .map(|t| t.elapsed().as_secs() > 15 * 60)
128            .unwrap_or(false)
129    }
130
131    /// Check whether `new_pts` is in order given `pts_count` new updates.
132    pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
133        let expected = self.pts + pts_count;
134        if new_pts == expected {
135            PtsCheckResult::Ok
136        } else if new_pts > expected {
137            PtsCheckResult::Gap { expected, got: new_pts }
138        } else {
139            PtsCheckResult::Duplicate
140        }
141    }
142
143    /// G-18: Check a qts value (secret chat updates).
144    pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
145        let expected = self.qts + qts_count;
146        if new_qts == expected {
147            PtsCheckResult::Ok
148        } else if new_qts > expected {
149            PtsCheckResult::Gap { expected, got: new_qts }
150        } else {
151            PtsCheckResult::Duplicate
152        }
153    }
154
155    /// G-19: Check top-level seq for UpdatesCombined containers.
156    pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
157        if self.seq == 0 { return PtsCheckResult::Ok; } // uninitialised — accept
158        let expected = self.seq + 1;
159        if seq_start == expected {
160            PtsCheckResult::Ok
161        } else if seq_start > expected {
162            PtsCheckResult::Gap { expected, got: seq_start }
163        } else {
164            PtsCheckResult::Duplicate
165        }
166    }
167
168    /// Check a per-channel pts value.
169    pub fn check_channel_pts(&self, channel_id: i64, new_pts: i32, pts_count: i32) -> PtsCheckResult {
170        let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
171        if local == 0 {
172            return PtsCheckResult::Ok;
173        }
174        let expected = local + pts_count;
175        if new_pts == expected {
176            PtsCheckResult::Ok
177        } else if new_pts > expected {
178            PtsCheckResult::Gap { expected, got: new_pts }
179        } else {
180            PtsCheckResult::Duplicate
181        }
182    }
183
184    /// Advance the global pts.
185    pub fn advance(&mut self, new_pts: i32) {
186        if new_pts > self.pts { self.pts = new_pts; }
187        self.touch();
188    }
189
190    /// Advance the qts (G-18).
191    pub fn advance_qts(&mut self, new_qts: i32) {
192        if new_qts > self.qts { self.qts = new_qts; }
193        self.touch();
194    }
195
196    /// Advance seq (G-19).
197    pub fn advance_seq(&mut self, new_seq: i32) {
198        if new_seq > self.seq { self.seq = new_seq; }
199    }
200
201    /// Advance a per-channel pts.
202    pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
203        let entry = self.channel_pts.entry(channel_id).or_insert(0);
204        if new_pts > *entry { *entry = new_pts; }
205        self.touch();
206    }
207}
208
209#[derive(Debug, PartialEq, Eq)]
210pub enum PtsCheckResult {
211    Ok,
212    Gap { expected: i32, got: i32 },
213    Duplicate,
214}
215
216// ─── Client methods ───────────────────────────────────────────────────────────
217
218impl Client {
219    // ── Global getDifference ──────────────────────────────────────────────
220
221    /// Fetch and replay any updates missed since the persisted pts.
222    pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
223        let (pts, qts, date) = {
224            let s = self.inner.pts_state.lock().await;
225            (s.pts, s.qts, s.date)
226        };
227
228        if pts == 0 {
229            self.sync_pts_state().await?;
230            return Ok(vec![]);
231        }
232
233        log::info!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
234
235        let req = tl::functions::updates::GetDifference {
236            pts,
237            pts_limit:       None,
238            pts_total_limit: None,
239            date,
240            qts,
241            qts_limit:       None,
242        };
243
244        let body    = self.rpc_call_raw_pub(&req).await?;
245        let mut cur = Cursor::from_slice(&body);
246        let diff    = tl::enums::updates::Difference::deserialize(&mut cur)?;
247
248        let mut updates = Vec::new();
249        match diff {
250            tl::enums::updates::Difference::Empty(e) => {
251                let mut s = self.inner.pts_state.lock().await;
252                s.date = e.date;
253                s.seq  = e.seq;
254                s.touch();
255                log::debug!("[layer] getDifference: empty (seq={})", e.seq);
256            }
257            tl::enums::updates::Difference::Difference(d) => {
258                log::info!("[layer] getDifference: {} messages, {} updates",
259                    d.new_messages.len(), d.other_updates.len());
260                self.cache_users_slice_pub(&d.users).await;
261                self.cache_chats_slice_pub(&d.chats).await;
262                for msg in d.new_messages {
263                    updates.push(update::Update::NewMessage(
264                        update::IncomingMessage::from_raw(msg)
265                    ));
266                }
267                for upd in d.other_updates {
268                    updates.extend(update::from_single_update_pub(upd));
269                }
270                let tl::enums::updates::State::State(ns) = d.state;
271                // Preserve channel_pts across the global reset — from_server_state
272                // produces an empty channel_pts map, but we may have restored
273                // per-channel pts from the session for catch-up.
274                let saved_channel_pts = {
275                    let s = self.inner.pts_state.lock().await;
276                    s.channel_pts.clone()
277                };
278                let mut new_state = PtsState::from_server_state(&ns);
279                for (cid, cpts) in saved_channel_pts {
280                    new_state.channel_pts.entry(cid).or_insert(cpts);
281                }
282                *self.inner.pts_state.lock().await = new_state;
283            }
284            tl::enums::updates::Difference::Slice(d) => {
285                log::info!("[layer] getDifference slice: {} messages, {} updates",
286                    d.new_messages.len(), d.other_updates.len());
287                self.cache_users_slice_pub(&d.users).await;
288                self.cache_chats_slice_pub(&d.chats).await;
289                for msg in d.new_messages {
290                    updates.push(update::Update::NewMessage(
291                        update::IncomingMessage::from_raw(msg)
292                    ));
293                }
294                for upd in d.other_updates {
295                    updates.extend(update::from_single_update_pub(upd));
296                }
297                let tl::enums::updates::State::State(ns) = d.intermediate_state;
298                let saved_channel_pts = {
299                    let s = self.inner.pts_state.lock().await;
300                    s.channel_pts.clone()
301                };
302                let mut new_state = PtsState::from_server_state(&ns);
303                for (cid, cpts) in saved_channel_pts {
304                    new_state.channel_pts.entry(cid).or_insert(cpts);
305                }
306                *self.inner.pts_state.lock().await = new_state;
307            }
308            tl::enums::updates::Difference::TooLong(d) => {
309                log::warn!("[layer] getDifference: TooLong (pts={}) — re-syncing", d.pts);
310                self.inner.pts_state.lock().await.pts = d.pts;
311                self.sync_pts_state().await?;
312            }
313        }
314
315        Ok(updates)
316    }
317
318    // ── G-15: Per-channel getChannelDifference ────────────────────────────
319
320    /// Fetch missed updates for a single channel.
321    pub async fn get_channel_difference(
322        &self,
323        channel_id: i64,
324    ) -> Result<Vec<update::Update>, InvocationError> {
325        let local_pts = self.inner.pts_state.lock().await
326            .channel_pts.get(&channel_id).copied().unwrap_or(0);
327
328        let mut access_hash = self.inner.peer_cache.read().await
329            .channels.get(&channel_id).copied().unwrap_or(0);
330
331        // If access_hash is missing, try to resolve it via channels.GetChannels.
332        // Without a valid access_hash, Telegram always returns CHANNEL_INVALID.
333        if access_hash == 0 {
334            log::debug!("[layer] channel {channel_id}: access_hash missing, attempting resolve via GetChannels");
335            let input = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
336                channel_id,
337                access_hash: 0,
338            });
339            let req = tl::functions::channels::GetChannels { id: vec![input] };
340            if let Ok(body) = self.rpc_call_raw_pub(&req).await {
341                    let mut cur = Cursor::from_slice(&body);
342                    if let Ok(chats) = tl::enums::messages::Chats::deserialize(&mut cur) {
343                    let chat_list = match chats {
344                        tl::enums::messages::Chats::Chats(c) => c.chats,
345                        tl::enums::messages::Chats::Slice(c) => c.chats,
346                    };
347                    self.cache_chats_slice_pub(&chat_list).await;
348                    access_hash = self.inner.peer_cache.read().await
349                        .channels.get(&channel_id).copied().unwrap_or(0);
350                }
351            }
352        }
353
354        // Still no access_hash — nothing we can do, bail out early.
355        if access_hash == 0 {
356            log::warn!("[layer] channel {channel_id}: access_hash unknown, cannot call getChannelDifference");
357            return Err(InvocationError::Rpc(RpcError {
358                code: 400,
359                name: "CHANNEL_INVALID".into(),
360                value: None,
361            }));
362        }
363
364        log::info!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
365
366        let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
367            channel_id,
368            access_hash,
369        });
370
371        let req = tl::functions::updates::GetChannelDifference {
372            force:   false,
373            channel,
374            filter:  tl::enums::ChannelMessagesFilter::Empty,
375            pts:     local_pts.max(1),
376            limit:   100,
377        };
378
379        let body = match self.rpc_call_raw_pub(&req).await {
380            Ok(b) => b,
381            Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
382                // G-20: treat as empty diff — retry next gap
383                log::debug!("[layer] G-20 PERSISTENT_TIMESTAMP_OUTDATED — skipping diff");
384                return Ok(vec![]);
385            }
386            Err(e) => return Err(e),
387        };
388        let mut cur = Cursor::from_slice(&body);
389        let diff    = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
390
391        let mut updates = Vec::new();
392
393        match diff {
394            tl::enums::updates::ChannelDifference::Empty(e) => {
395                log::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
396                self.inner.pts_state.lock().await.advance_channel(channel_id, e.pts);
397            }
398            tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
399                log::info!("[layer] getChannelDifference: {} messages, {} updates",
400                    d.new_messages.len(), d.other_updates.len());
401                self.cache_users_slice_pub(&d.users).await;
402                self.cache_chats_slice_pub(&d.chats).await;
403                for msg in d.new_messages {
404                    updates.push(update::Update::NewMessage(
405                        update::IncomingMessage::from_raw(msg)
406                    ));
407                }
408                for upd in d.other_updates {
409                    updates.extend(update::from_single_update_pub(upd));
410                }
411                self.inner.pts_state.lock().await.advance_channel(channel_id, d.pts);
412            }
413            tl::enums::updates::ChannelDifference::TooLong(d) => {
414                log::warn!("[layer] getChannelDifference TooLong — replaying messages, resetting pts");
415                self.cache_users_slice_pub(&d.users).await;
416                self.cache_chats_slice_pub(&d.chats).await;
417                for msg in d.messages {
418                    updates.push(update::Update::NewMessage(
419                        update::IncomingMessage::from_raw(msg)
420                    ));
421                }
422                self.inner.pts_state.lock().await.advance_channel(channel_id, 0);
423            }
424        }
425
426        Ok(updates)
427    }
428
429    // ── Sync from server ──────────────────────────────────────────────────
430
431    pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
432        let body    = self.rpc_call_raw_pub(&tl::functions::updates::GetState {}).await?;
433        let mut cur = Cursor::from_slice(&body);
434        let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
435        let mut state = self.inner.pts_state.lock().await;
436        state.pts  = s.pts;
437        state.qts  = s.qts;
438        state.date = s.date;
439        state.seq  = s.seq;
440        state.touch();
441        log::info!("[layer] pts synced: pts={}, qts={}, seq={}", s.pts, s.qts, s.seq);
442        Ok(())
443    }
444
445    // ── Gap-check helpers ─────────────────────────────────────────────────
446
447    /// G-17: Check global pts, buffer during possible-gap window, fetch diff if real gap.
448    pub async fn check_and_fill_gap(
449        &self,
450        new_pts:   i32,
451        pts_count: i32,
452        upd:       Option<update::Update>,
453    ) -> Result<Vec<update::Update>, InvocationError> {
454        let result = self.inner.pts_state.lock().await.check_pts(new_pts, pts_count);
455        match result {
456            PtsCheckResult::Ok => {
457                // Drain any buffered global updates now that we're in sync,
458                // then append the current update (which triggered the Ok).
459                let mut buffered = self.inner.possible_gap.lock().await.drain_global();
460                self.inner.pts_state.lock().await.advance(new_pts);
461                if let Some(u) = upd {
462                    buffered.push(u);
463                }
464                Ok(buffered)
465            }
466            PtsCheckResult::Gap { expected, got } => {
467                // Buffer the update first; only fetch getDifference after the
468                // deadline has elapsed (avoids spurious getDifference on every
469                // slightly out-of-order update).
470                if let Some(u) = upd {
471                    self.inner.possible_gap.lock().await.push_global(u);
472                }
473                let deadline_elapsed = self.inner.possible_gap.lock().await.global_deadline_elapsed();
474                if deadline_elapsed {
475                    log::warn!("[layer] global pts gap: expected {expected}, got {got} — getDifference");
476                    let buffered = self.inner.possible_gap.lock().await.drain_global();
477                    let mut diff_updates = self.get_difference().await?;
478                    // Prepend buffered updates so ordering is maintained.
479                    diff_updates.splice(0..0, buffered);
480                    Ok(diff_updates)
481                } else {
482                    log::debug!("[layer] global pts gap: expected {expected}, got {got} — buffering (possible gap)");
483                    Ok(vec![])
484                }
485            }
486            PtsCheckResult::Duplicate => {
487                log::debug!("[layer] global pts duplicate, discarding");
488                Ok(vec![])
489            }
490        }
491    }
492
493    /// G-18: Check qts (secret chat updates) and fill gap if needed.
494    pub async fn check_and_fill_qts_gap(
495        &self,
496        new_qts:   i32,
497        qts_count: i32,
498    ) -> Result<Vec<update::Update>, InvocationError> {
499        let result = self.inner.pts_state.lock().await.check_qts(new_qts, qts_count);
500        match result {
501            PtsCheckResult::Ok => {
502                self.inner.pts_state.lock().await.advance_qts(new_qts);
503                Ok(vec![])
504            }
505            PtsCheckResult::Gap { expected, got } => {
506                log::warn!("[layer] qts gap: expected {expected}, got {got} — getDifference");
507                self.get_difference().await
508            }
509            PtsCheckResult::Duplicate => Ok(vec![]),
510        }
511    }
512
513    /// G-19: Check top-level seq and fill gap if needed.
514    pub async fn check_and_fill_seq_gap(
515        &self,
516        new_seq:   i32,
517        seq_start: i32,
518    ) -> Result<Vec<update::Update>, InvocationError> {
519        let result = self.inner.pts_state.lock().await.check_seq(new_seq, seq_start);
520        match result {
521            PtsCheckResult::Ok => {
522                self.inner.pts_state.lock().await.advance_seq(new_seq);
523                Ok(vec![])
524            }
525            PtsCheckResult::Gap { expected, got } => {
526                log::warn!("[layer] seq gap: expected {expected}, got {got} — getDifference");
527                self.get_difference().await
528            }
529            PtsCheckResult::Duplicate => Ok(vec![]),
530        }
531    }
532
533    /// G-15: Check a per-channel pts, fetch getChannelDifference if there is a gap.
534    pub async fn check_and_fill_channel_gap(
535        &self,
536        channel_id: i64,
537        new_pts:    i32,
538        pts_count:  i32,
539        upd:        Option<update::Update>,
540    ) -> Result<Vec<update::Update>, InvocationError> {
541        // Fix #4: if a diff is already in flight for this channel, skip — prevents
542        // 1 gap from spawning N concurrent getChannelDifference tasks.
543        if self.inner.pts_state.lock().await.getting_diff_for.contains(&channel_id) {
544            log::debug!("[layer] channel {channel_id} diff already in flight, skipping");
545            if let Some(u) = upd {
546                self.inner.possible_gap.lock().await.push_channel(channel_id, u);
547            }
548            return Ok(vec![]);
549        }
550
551        let result = self.inner.pts_state.lock().await
552            .check_channel_pts(channel_id, new_pts, pts_count);
553        match result {
554            PtsCheckResult::Ok => {
555                let mut buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
556                self.inner.pts_state.lock().await.advance_channel(channel_id, new_pts);
557                if let Some(u) = upd {
558                    buffered.push(u);
559                }
560                Ok(buffered)
561            }
562            PtsCheckResult::Gap { expected, got } => {
563                if let Some(u) = upd {
564                    self.inner.possible_gap.lock().await.push_channel(channel_id, u);
565                }
566                let deadline_elapsed = self.inner.possible_gap.lock().await
567                    .channel_deadline_elapsed(channel_id);
568                if deadline_elapsed {
569                    log::warn!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — getChannelDifference");
570                    // Fix #4: mark this channel as having a diff in flight.
571                    self.inner.pts_state.lock().await.getting_diff_for.insert(channel_id);
572                    let buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
573                    match self.get_channel_difference(channel_id).await {
574                        Ok(mut diff_updates) => {
575                            // Fix #4: diff complete, allow future gaps to be handled.
576                            self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
577                            diff_updates.splice(0..0, buffered);
578                            Ok(diff_updates)
579                        }
580                        // Permanent access errors: advance pts to `got` so the gap is
581                        // considered closed and we never re-trigger getChannelDifference
582                        // for this channel on every incoming update.
583                        Err(InvocationError::Rpc(ref e))
584                            if e.name == "CHANNEL_INVALID"
585                            || e.name == "CHANNEL_PRIVATE"
586                            || e.name == "CHANNEL_NOT_MODIFIED" =>
587                        {
588                            log::warn!(
589                                "[layer] channel {channel_id}: {} — skipping gap, advancing pts to {got}",
590                                e.name
591                            );
592                            // Fix #4: diff complete (errored out), allow future gaps.
593                            self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
594                            self.inner.pts_state.lock().await.advance_channel(channel_id, got);
595                            Ok(buffered)
596                        }
597                        Err(InvocationError::Deserialize(ref msg)) => {
598                            // Unrecognised constructor (e.g. 0x0000000e) — Telegram returned
599                            // something we can't parse for this channel.  Treat the same as
600                            // CHANNEL_INVALID: advance pts to `got` so we don't retry on every
601                            // subsequent update and flood the logs.
602                            log::warn!(
603                                "[layer] channel {channel_id}: deserialize error ({msg}) — skipping gap, advancing pts to {got}"
604                            );
605                            self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
606                            self.inner.pts_state.lock().await.advance_channel(channel_id, got);
607                            Ok(buffered)
608                        }
609                        Err(e) => {
610                            // Fix #4: also clear on unexpected errors so we don't get stuck.
611                            self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
612                            Err(e)
613                        }
614                    }
615                } else {
616                    log::debug!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — buffering");
617                    Ok(vec![])
618                }
619            }
620            PtsCheckResult::Duplicate => {
621                log::debug!("[layer] channel {channel_id} pts duplicate, discarding");
622                Ok(vec![])
623            }
624        }
625    }
626
627    /// G-16: Called periodically (e.g. from keepalive) to fire getDifference
628    /// if no update has been received for > 15 minutes.
629    pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
630        let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
631        if exceeded {
632            log::info!("[layer] G-16 update deadline exceeded — fetching getDifference");
633            let updates = self.get_difference().await?;
634            for u in updates { let _ = self.inner.update_tx.send(u); }
635        }
636        Ok(())
637    }
638}
639
640