Skip to main content

layer_client/
pts.rs

1//! Update gap detection and recovery via `updates.getDifference`.
2//!
3//! The Telegram MTProto protocol assigns a monotonically-increasing sequence
4//! number called **pts** (and **qts** for secret chats, **seq** for the
5//! combined updates container) to each update.  If the client misses updates
6//! (due to a disconnect, lag, or packet loss) the pts will jump forward.  This
7//! module tracks the current pts and fetches any missed updates via
8//! `updates.getDifference` when a gap is detected.
9
10use layer_tl_types as tl;
11use layer_tl_types::{Cursor, Deserializable};
12
13use crate::{Client, InvocationError, update};
14
15// ─── PtsState ─────────────────────────────────────────────────────────────────
16
17/// Tracks MTProto sequence numbers so we can detect and fill update gaps.
18#[derive(Default, Debug, Clone)]
19pub struct PtsState {
20    /// Main sequence counter (messages, channels).
21    pub pts: i32,
22    /// Secondary counter for secret chats.
23    pub qts: i32,
24    /// Date of the last known update (Unix timestamp).
25    pub date: i32,
26    /// Combined updates sequence.
27    pub seq: i32,
28}
29
30impl PtsState {
31    /// Create a new PtsState from the current server state.
32    pub fn from_server_state(state: &tl::types::updates::State) -> Self {
33        Self {
34            pts:  state.pts,
35            qts:  state.qts,
36            date: state.date,
37            seq:  state.seq,
38        }
39    }
40
41    /// Returns true if `new_pts == self.pts + pts_count` (no gap).
42    pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
43        let expected = self.pts + pts_count;
44        if new_pts == expected {
45            PtsCheckResult::Ok
46        } else if new_pts > expected {
47            PtsCheckResult::Gap { expected, got: new_pts }
48        } else {
49            PtsCheckResult::Duplicate
50        }
51    }
52
53    /// Apply a confirmed pts advance.
54    pub fn advance(&mut self, new_pts: i32) {
55        if new_pts > self.pts { self.pts = new_pts; }
56    }
57}
58
59#[derive(Debug, PartialEq, Eq)]
60pub enum PtsCheckResult {
61    /// pts is in order — process the update.
62    Ok,
63    /// pts jumped forward — there is a gap; must call getDifference.
64    Gap { expected: i32, got: i32 },
65    /// pts is in the past — update already seen; discard.
66    Duplicate,
67}
68
69// ─── Client methods ───────────────────────────────────────────────────────────
70
71impl Client {
72    /// Fetch and apply any missed updates since the last known pts.
73    ///
74    /// This should be called after reconnection to close any update gap.
75    /// Returns the updates that were missed.
76    pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
77        let (pts, qts, date) = {
78            let state = self.inner.pts_state.lock().await;
79            (state.pts, state.qts, state.date)
80        };
81
82        if pts == 0 {
83            // No state yet; fetch current state from server first.
84            self.sync_pts_state().await?;
85            return Ok(vec![]);
86        }
87
88        log::info!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
89
90        let req = tl::functions::updates::GetDifference {
91            pts,
92            pts_limit:       None,
93            pts_total_limit: None,
94            date,
95            qts,
96            qts_limit:       None,
97        };
98
99        let body    = self.rpc_call_raw_pub(&req).await?;
100        let mut cur = Cursor::from_slice(&body);
101        let diff    = tl::enums::updates::Difference::deserialize(&mut cur)?;
102
103        let mut updates = Vec::new();
104        match diff {
105            tl::enums::updates::Difference::Empty(e) => {
106                // No new updates; fast-forward our state
107                let mut state = self.inner.pts_state.lock().await;
108                state.date = e.date;
109                state.seq  = e.seq;
110                log::debug!("[layer] getDifference: empty (seq={})", e.seq);
111            }
112            tl::enums::updates::Difference::Difference(d) => {
113                log::info!("[layer] getDifference: {} messages, {} updates",
114                    d.new_messages.len(), d.other_updates.len());
115                // Cache users and chats
116                self.cache_users_slice_pub(&d.users).await;
117                self.cache_chats_slice_pub(&d.chats).await;
118                // Emit new messages as updates
119                for msg in d.new_messages {
120                    updates.push(update::Update::NewMessage(
121                        update::IncomingMessage::from_raw(msg)
122                    ));
123                }
124                // Emit other updates
125                for upd in d.other_updates {
126                    updates.extend(update::from_single_update_pub(upd));
127                }
128                // Advance pts state using the returned state
129                let ns = match d.state {
130                    tl::enums::updates::State::State(s) => s,
131                };
132                let mut state = self.inner.pts_state.lock().await;
133                *state = PtsState::from_server_state(&ns);
134            }
135            tl::enums::updates::Difference::Slice(d) => {
136                log::info!("[layer] getDifference slice: {} messages, {} updates",
137                    d.new_messages.len(), d.other_updates.len());
138                self.cache_users_slice_pub(&d.users).await;
139                self.cache_chats_slice_pub(&d.chats).await;
140                for msg in d.new_messages {
141                    updates.push(update::Update::NewMessage(
142                        update::IncomingMessage::from_raw(msg)
143                    ));
144                }
145                for upd in d.other_updates {
146                    updates.extend(update::from_single_update_pub(upd));
147                }
148                // Slice has intermediate_state
149                let ns = match d.intermediate_state {
150                    tl::enums::updates::State::State(s) => s,
151                };
152                let mut state = self.inner.pts_state.lock().await;
153                *state = PtsState::from_server_state(&ns);
154            }
155            tl::enums::updates::Difference::TooLong(d) => {
156                log::warn!("[layer] getDifference: TooLong (pts={}) — re-syncing state", d.pts);
157                // Jump to the new pts and re-sync
158                let mut state = self.inner.pts_state.lock().await;
159                state.pts = d.pts;
160                drop(state);
161                self.sync_pts_state().await?;
162            }
163        }
164
165        Ok(updates)
166    }
167
168    /// Fetch the current server update state and store it locally.
169    pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
170        let req  = tl::functions::updates::GetState {};
171        let body = self.rpc_call_raw_pub(&req).await?;
172        let mut cur = Cursor::from_slice(&body);
173        let state = match tl::enums::updates::State::deserialize(&mut cur)? {
174            tl::enums::updates::State::State(s) => s,
175        };
176        let mut pts_state = self.inner.pts_state.lock().await;
177        *pts_state = PtsState::from_server_state(&state);
178        log::info!("[layer] pts synced: pts={}, qts={}, seq={}", state.pts, state.qts, state.seq);
179        Ok(())
180    }
181
182    /// Check for update gaps and fill them before processing an update with the given pts.
183    ///
184    /// Returns any catch-up updates that were missed.
185    pub async fn check_and_fill_gap(
186        &self,
187        new_pts:   i32,
188        pts_count: i32,
189    ) -> Result<Vec<update::Update>, InvocationError> {
190        let result = {
191            let state = self.inner.pts_state.lock().await;
192            state.check_pts(new_pts, pts_count)
193        };
194
195        match result {
196            PtsCheckResult::Ok => {
197                let mut state = self.inner.pts_state.lock().await;
198                state.advance(new_pts);
199                Ok(vec![])
200            }
201            PtsCheckResult::Gap { expected, got } => {
202                log::warn!("[layer] pts gap detected: expected {expected}, got {got} — fetching difference");
203                self.get_difference().await
204            }
205            PtsCheckResult::Duplicate => {
206                log::debug!("[layer] pts duplicate, discarding update");
207                Ok(vec![])
208            }
209        }
210    }
211}