1use layer_tl_types as tl;
11use layer_tl_types::{Cursor, Deserializable};
12
13use crate::{Client, InvocationError, update};
14
15#[derive(Default, Debug, Clone)]
19pub struct PtsState {
20 pub pts: i32,
22 pub qts: i32,
24 pub date: i32,
26 pub seq: i32,
28}
29
30impl PtsState {
31 pub fn from_server_state(state: &tl::types::updates::State) -> Self {
33 Self {
34 pts: state.pts,
35 qts: state.qts,
36 date: state.date,
37 seq: state.seq,
38 }
39 }
40
41 pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
43 let expected = self.pts + pts_count;
44 if new_pts == expected {
45 PtsCheckResult::Ok
46 } else if new_pts > expected {
47 PtsCheckResult::Gap { expected, got: new_pts }
48 } else {
49 PtsCheckResult::Duplicate
50 }
51 }
52
53 pub fn advance(&mut self, new_pts: i32) {
55 if new_pts > self.pts { self.pts = new_pts; }
56 }
57}
58
59#[derive(Debug, PartialEq, Eq)]
60pub enum PtsCheckResult {
61 Ok,
63 Gap { expected: i32, got: i32 },
65 Duplicate,
67}
68
69impl Client {
72 pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
77 let (pts, qts, date) = {
78 let state = self.inner.pts_state.lock().await;
79 (state.pts, state.qts, state.date)
80 };
81
82 if pts == 0 {
83 self.sync_pts_state().await?;
85 return Ok(vec![]);
86 }
87
88 log::info!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
89
90 let req = tl::functions::updates::GetDifference {
91 pts,
92 pts_limit: None,
93 pts_total_limit: None,
94 date,
95 qts,
96 qts_limit: None,
97 };
98
99 let body = self.rpc_call_raw_pub(&req).await?;
100 let mut cur = Cursor::from_slice(&body);
101 let diff = tl::enums::updates::Difference::deserialize(&mut cur)?;
102
103 let mut updates = Vec::new();
104 match diff {
105 tl::enums::updates::Difference::Empty(e) => {
106 let mut state = self.inner.pts_state.lock().await;
108 state.date = e.date;
109 state.seq = e.seq;
110 log::debug!("[layer] getDifference: empty (seq={})", e.seq);
111 }
112 tl::enums::updates::Difference::Difference(d) => {
113 log::info!("[layer] getDifference: {} messages, {} updates",
114 d.new_messages.len(), d.other_updates.len());
115 self.cache_users_slice_pub(&d.users).await;
117 self.cache_chats_slice_pub(&d.chats).await;
118 for msg in d.new_messages {
120 updates.push(update::Update::NewMessage(
121 update::IncomingMessage::from_raw(msg)
122 ));
123 }
124 for upd in d.other_updates {
126 updates.extend(update::from_single_update_pub(upd));
127 }
128 let ns = match d.state {
130 tl::enums::updates::State::State(s) => s,
131 };
132 let mut state = self.inner.pts_state.lock().await;
133 *state = PtsState::from_server_state(&ns);
134 }
135 tl::enums::updates::Difference::Slice(d) => {
136 log::info!("[layer] getDifference slice: {} messages, {} updates",
137 d.new_messages.len(), d.other_updates.len());
138 self.cache_users_slice_pub(&d.users).await;
139 self.cache_chats_slice_pub(&d.chats).await;
140 for msg in d.new_messages {
141 updates.push(update::Update::NewMessage(
142 update::IncomingMessage::from_raw(msg)
143 ));
144 }
145 for upd in d.other_updates {
146 updates.extend(update::from_single_update_pub(upd));
147 }
148 let ns = match d.intermediate_state {
150 tl::enums::updates::State::State(s) => s,
151 };
152 let mut state = self.inner.pts_state.lock().await;
153 *state = PtsState::from_server_state(&ns);
154 }
155 tl::enums::updates::Difference::TooLong(d) => {
156 log::warn!("[layer] getDifference: TooLong (pts={}) — re-syncing state", d.pts);
157 let mut state = self.inner.pts_state.lock().await;
159 state.pts = d.pts;
160 drop(state);
161 self.sync_pts_state().await?;
162 }
163 }
164
165 Ok(updates)
166 }
167
168 pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
170 let req = tl::functions::updates::GetState {};
171 let body = self.rpc_call_raw_pub(&req).await?;
172 let mut cur = Cursor::from_slice(&body);
173 let state = match tl::enums::updates::State::deserialize(&mut cur)? {
174 tl::enums::updates::State::State(s) => s,
175 };
176 let mut pts_state = self.inner.pts_state.lock().await;
177 *pts_state = PtsState::from_server_state(&state);
178 log::info!("[layer] pts synced: pts={}, qts={}, seq={}", state.pts, state.qts, state.seq);
179 Ok(())
180 }
181
182 pub async fn check_and_fill_gap(
186 &self,
187 new_pts: i32,
188 pts_count: i32,
189 ) -> Result<Vec<update::Update>, InvocationError> {
190 let result = {
191 let state = self.inner.pts_state.lock().await;
192 state.check_pts(new_pts, pts_count)
193 };
194
195 match result {
196 PtsCheckResult::Ok => {
197 let mut state = self.inner.pts_state.lock().await;
198 state.advance(new_pts);
199 Ok(vec![])
200 }
201 PtsCheckResult::Gap { expected, got } => {
202 log::warn!("[layer] pts gap detected: expected {expected}, got {got} — fetching difference");
203 self.get_difference().await
204 }
205 PtsCheckResult::Duplicate => {
206 log::debug!("[layer] pts duplicate, discarding update");
207 Ok(vec![])
208 }
209 }
210 }
211}