1use std::collections::HashMap;
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, update};
19
20const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
25
26#[derive(Default)]
29pub struct PossibleGapBuffer {
30 channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32 global: Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37 pub fn new() -> Self { Self::default() }
38
39 pub fn push_global(&mut self, upd: update::Update) {
41 let entry = self.global.get_or_insert_with(|| (Vec::new(), Instant::now()));
42 entry.0.push(upd);
43 }
44
45 pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
47 let entry = self.channel.entry(channel_id).or_insert_with(|| (Vec::new(), Instant::now()));
48 entry.0.push(upd);
49 }
50
51 pub fn global_deadline_elapsed(&self) -> bool {
53 self.global.as_ref()
54 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
55 .unwrap_or(false)
56 }
57
58 pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
60 self.channel.get(&channel_id)
61 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
62 .unwrap_or(false)
63 }
64
65 pub fn has_global(&self) -> bool { self.global.is_some() }
67
68 pub fn has_channel(&self, channel_id: i64) -> bool { self.channel.contains_key(&channel_id) }
70
71 pub fn drain_global(&mut self) -> Vec<update::Update> {
73 self.global.take().map(|(v, _)| v).unwrap_or_default()
74 }
75
76 pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
78 self.channel.remove(&channel_id).map(|(v, _)| v).unwrap_or_default()
79 }
80}
81
82#[derive(Debug, Clone, Default)]
89pub struct PtsState {
90 pub pts: i32,
92 pub qts: i32,
94 pub date: i32,
96 pub seq: i32,
98 pub channel_pts: HashMap<i64, i32>,
100 pub last_update_at: Option<Instant>,
102}
103
104
105impl PtsState {
106 pub fn from_server_state(s: &tl::types::updates::State) -> Self {
107 Self {
108 pts: s.pts, qts: s.qts, date: s.date, seq: s.seq,
109 channel_pts: HashMap::new(),
110 last_update_at: Some(Instant::now()),
111 }
112 }
113
114 pub fn touch(&mut self) {
116 self.last_update_at = Some(Instant::now());
117 }
118
119 pub fn deadline_exceeded(&self) -> bool {
121 self.last_update_at.as_ref()
122 .map(|t| t.elapsed().as_secs() > 15 * 60)
123 .unwrap_or(false)
124 }
125
126 pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
128 let expected = self.pts + pts_count;
129 if new_pts == expected {
130 PtsCheckResult::Ok
131 } else if new_pts > expected {
132 PtsCheckResult::Gap { expected, got: new_pts }
133 } else {
134 PtsCheckResult::Duplicate
135 }
136 }
137
138 pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
140 let expected = self.qts + qts_count;
141 if new_qts == expected {
142 PtsCheckResult::Ok
143 } else if new_qts > expected {
144 PtsCheckResult::Gap { expected, got: new_qts }
145 } else {
146 PtsCheckResult::Duplicate
147 }
148 }
149
150 pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
152 if self.seq == 0 { return PtsCheckResult::Ok; } let expected = self.seq + 1;
154 if seq_start == expected {
155 PtsCheckResult::Ok
156 } else if seq_start > expected {
157 PtsCheckResult::Gap { expected, got: seq_start }
158 } else {
159 PtsCheckResult::Duplicate
160 }
161 }
162
163 pub fn check_channel_pts(&self, channel_id: i64, new_pts: i32, pts_count: i32) -> PtsCheckResult {
165 let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
166 if local == 0 {
167 return PtsCheckResult::Ok;
168 }
169 let expected = local + pts_count;
170 if new_pts == expected {
171 PtsCheckResult::Ok
172 } else if new_pts > expected {
173 PtsCheckResult::Gap { expected, got: new_pts }
174 } else {
175 PtsCheckResult::Duplicate
176 }
177 }
178
179 pub fn advance(&mut self, new_pts: i32) {
181 if new_pts > self.pts { self.pts = new_pts; }
182 self.touch();
183 }
184
185 pub fn advance_qts(&mut self, new_qts: i32) {
187 if new_qts > self.qts { self.qts = new_qts; }
188 self.touch();
189 }
190
191 pub fn advance_seq(&mut self, new_seq: i32) {
193 if new_seq > self.seq { self.seq = new_seq; }
194 }
195
196 pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
198 let entry = self.channel_pts.entry(channel_id).or_insert(0);
199 if new_pts > *entry { *entry = new_pts; }
200 self.touch();
201 }
202}
203
204#[derive(Debug, PartialEq, Eq)]
205pub enum PtsCheckResult {
206 Ok,
207 Gap { expected: i32, got: i32 },
208 Duplicate,
209}
210
211impl Client {
214 pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
218 let (pts, qts, date) = {
219 let s = self.inner.pts_state.lock().await;
220 (s.pts, s.qts, s.date)
221 };
222
223 if pts == 0 {
224 self.sync_pts_state().await?;
225 return Ok(vec![]);
226 }
227
228 log::info!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
229
230 let req = tl::functions::updates::GetDifference {
231 pts,
232 pts_limit: None,
233 pts_total_limit: None,
234 date,
235 qts,
236 qts_limit: None,
237 };
238
239 let body = self.rpc_call_raw_pub(&req).await?;
240 let mut cur = Cursor::from_slice(&body);
241 let diff = tl::enums::updates::Difference::deserialize(&mut cur)?;
242
243 let mut updates = Vec::new();
244 match diff {
245 tl::enums::updates::Difference::Empty(e) => {
246 let mut s = self.inner.pts_state.lock().await;
247 s.date = e.date;
248 s.seq = e.seq;
249 s.touch();
250 log::debug!("[layer] getDifference: empty (seq={})", e.seq);
251 }
252 tl::enums::updates::Difference::Difference(d) => {
253 log::info!("[layer] getDifference: {} messages, {} updates",
254 d.new_messages.len(), d.other_updates.len());
255 self.cache_users_slice_pub(&d.users).await;
256 self.cache_chats_slice_pub(&d.chats).await;
257 for msg in d.new_messages {
258 updates.push(update::Update::NewMessage(
259 update::IncomingMessage::from_raw(msg)
260 ));
261 }
262 for upd in d.other_updates {
263 updates.extend(update::from_single_update_pub(upd));
264 }
265 let tl::enums::updates::State::State(ns) = d.state;
266 *self.inner.pts_state.lock().await = PtsState::from_server_state(&ns);
267 }
268 tl::enums::updates::Difference::Slice(d) => {
269 log::info!("[layer] getDifference slice: {} messages, {} updates",
270 d.new_messages.len(), d.other_updates.len());
271 self.cache_users_slice_pub(&d.users).await;
272 self.cache_chats_slice_pub(&d.chats).await;
273 for msg in d.new_messages {
274 updates.push(update::Update::NewMessage(
275 update::IncomingMessage::from_raw(msg)
276 ));
277 }
278 for upd in d.other_updates {
279 updates.extend(update::from_single_update_pub(upd));
280 }
281 let tl::enums::updates::State::State(ns) = d.intermediate_state;
282 *self.inner.pts_state.lock().await = PtsState::from_server_state(&ns);
283 }
284 tl::enums::updates::Difference::TooLong(d) => {
285 log::warn!("[layer] getDifference: TooLong (pts={}) — re-syncing", d.pts);
286 self.inner.pts_state.lock().await.pts = d.pts;
287 self.sync_pts_state().await?;
288 }
289 }
290
291 Ok(updates)
292 }
293
294 pub async fn get_channel_difference(
298 &self,
299 channel_id: i64,
300 ) -> Result<Vec<update::Update>, InvocationError> {
301 let local_pts = self.inner.pts_state.lock().await
302 .channel_pts.get(&channel_id).copied().unwrap_or(0);
303
304 let access_hash = self.inner.peer_cache.lock().await
305 .channels.get(&channel_id).copied().unwrap_or(0);
306
307 log::info!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
308
309 let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
310 channel_id,
311 access_hash,
312 });
313
314 let req = tl::functions::updates::GetChannelDifference {
315 force: false,
316 channel,
317 filter: tl::enums::ChannelMessagesFilter::Empty,
318 pts: local_pts.max(1),
319 limit: 100,
320 };
321
322 let body = match self.rpc_call_raw_pub(&req).await {
323 Ok(b) => b,
324 Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
325 log::debug!("[layer] G-20 PERSISTENT_TIMESTAMP_OUTDATED — skipping diff");
327 return Ok(vec![]);
328 }
329 Err(e) => return Err(e),
330 };
331 let mut cur = Cursor::from_slice(&body);
332 let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
333
334 let mut updates = Vec::new();
335
336 match diff {
337 tl::enums::updates::ChannelDifference::Empty(e) => {
338 log::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
339 self.inner.pts_state.lock().await.advance_channel(channel_id, e.pts);
340 }
341 tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
342 log::info!("[layer] getChannelDifference: {} messages, {} updates",
343 d.new_messages.len(), d.other_updates.len());
344 self.cache_users_slice_pub(&d.users).await;
345 self.cache_chats_slice_pub(&d.chats).await;
346 for msg in d.new_messages {
347 updates.push(update::Update::NewMessage(
348 update::IncomingMessage::from_raw(msg)
349 ));
350 }
351 for upd in d.other_updates {
352 updates.extend(update::from_single_update_pub(upd));
353 }
354 self.inner.pts_state.lock().await.advance_channel(channel_id, d.pts);
355 }
356 tl::enums::updates::ChannelDifference::TooLong(d) => {
357 log::warn!("[layer] getChannelDifference TooLong — replaying messages, resetting pts");
358 self.cache_users_slice_pub(&d.users).await;
359 self.cache_chats_slice_pub(&d.chats).await;
360 for msg in d.messages {
361 updates.push(update::Update::NewMessage(
362 update::IncomingMessage::from_raw(msg)
363 ));
364 }
365 self.inner.pts_state.lock().await.advance_channel(channel_id, 0);
366 }
367 }
368
369 Ok(updates)
370 }
371
372 pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
375 let body = self.rpc_call_raw_pub(&tl::functions::updates::GetState {}).await?;
376 let mut cur = Cursor::from_slice(&body);
377 let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
378 let mut state = self.inner.pts_state.lock().await;
379 state.pts = s.pts;
380 state.qts = s.qts;
381 state.date = s.date;
382 state.seq = s.seq;
383 state.touch();
384 log::info!("[layer] pts synced: pts={}, qts={}, seq={}", s.pts, s.qts, s.seq);
385 Ok(())
386 }
387
388 pub async fn check_and_fill_gap(
392 &self,
393 new_pts: i32,
394 pts_count: i32,
395 upd: Option<update::Update>,
396 ) -> Result<Vec<update::Update>, InvocationError> {
397 let result = self.inner.pts_state.lock().await.check_pts(new_pts, pts_count);
398 match result {
399 PtsCheckResult::Ok => {
400 let mut buffered = self.inner.possible_gap.lock().await.drain_global();
403 self.inner.pts_state.lock().await.advance(new_pts);
404 if let Some(u) = upd {
405 buffered.push(u);
406 }
407 Ok(buffered)
408 }
409 PtsCheckResult::Gap { expected, got } => {
410 if let Some(u) = upd {
414 self.inner.possible_gap.lock().await.push_global(u);
415 }
416 let deadline_elapsed = self.inner.possible_gap.lock().await.global_deadline_elapsed();
417 if deadline_elapsed {
418 log::warn!("[layer] global pts gap: expected {expected}, got {got} — getDifference");
419 let buffered = self.inner.possible_gap.lock().await.drain_global();
420 let mut diff_updates = self.get_difference().await?;
421 diff_updates.splice(0..0, buffered);
423 Ok(diff_updates)
424 } else {
425 log::debug!("[layer] global pts gap: expected {expected}, got {got} — buffering (possible gap)");
426 Ok(vec![])
427 }
428 }
429 PtsCheckResult::Duplicate => {
430 log::debug!("[layer] global pts duplicate, discarding");
431 Ok(vec![])
432 }
433 }
434 }
435
436 pub async fn check_and_fill_qts_gap(
438 &self,
439 new_qts: i32,
440 qts_count: i32,
441 ) -> Result<Vec<update::Update>, InvocationError> {
442 let result = self.inner.pts_state.lock().await.check_qts(new_qts, qts_count);
443 match result {
444 PtsCheckResult::Ok => {
445 self.inner.pts_state.lock().await.advance_qts(new_qts);
446 Ok(vec![])
447 }
448 PtsCheckResult::Gap { expected, got } => {
449 log::warn!("[layer] qts gap: expected {expected}, got {got} — getDifference");
450 self.get_difference().await
451 }
452 PtsCheckResult::Duplicate => Ok(vec![]),
453 }
454 }
455
456 pub async fn check_and_fill_seq_gap(
458 &self,
459 new_seq: i32,
460 seq_start: i32,
461 ) -> Result<Vec<update::Update>, InvocationError> {
462 let result = self.inner.pts_state.lock().await.check_seq(new_seq, seq_start);
463 match result {
464 PtsCheckResult::Ok => {
465 self.inner.pts_state.lock().await.advance_seq(new_seq);
466 Ok(vec![])
467 }
468 PtsCheckResult::Gap { expected, got } => {
469 log::warn!("[layer] seq gap: expected {expected}, got {got} — getDifference");
470 self.get_difference().await
471 }
472 PtsCheckResult::Duplicate => Ok(vec![]),
473 }
474 }
475
476 pub async fn check_and_fill_channel_gap(
478 &self,
479 channel_id: i64,
480 new_pts: i32,
481 pts_count: i32,
482 upd: Option<update::Update>,
483 ) -> Result<Vec<update::Update>, InvocationError> {
484 let result = self.inner.pts_state.lock().await
485 .check_channel_pts(channel_id, new_pts, pts_count);
486 match result {
487 PtsCheckResult::Ok => {
488 let mut buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
489 self.inner.pts_state.lock().await.advance_channel(channel_id, new_pts);
490 if let Some(u) = upd {
491 buffered.push(u);
492 }
493 Ok(buffered)
494 }
495 PtsCheckResult::Gap { expected, got } => {
496 if let Some(u) = upd {
497 self.inner.possible_gap.lock().await.push_channel(channel_id, u);
498 }
499 let deadline_elapsed = self.inner.possible_gap.lock().await
500 .channel_deadline_elapsed(channel_id);
501 if deadline_elapsed {
502 log::warn!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — getChannelDifference");
503 let buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
504 let mut diff_updates = self.get_channel_difference(channel_id).await?;
505 diff_updates.splice(0..0, buffered);
506 Ok(diff_updates)
507 } else {
508 log::debug!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — buffering");
509 Ok(vec![])
510 }
511 }
512 PtsCheckResult::Duplicate => {
513 log::debug!("[layer] channel {channel_id} pts duplicate, discarding");
514 Ok(vec![])
515 }
516 }
517 }
518
519 pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
522 let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
523 if exceeded {
524 log::info!("[layer] G-16 update deadline exceeded — fetching getDifference");
525 let updates = self.get_difference().await?;
526 for u in updates { let _ = self.inner.update_tx.send(u); }
527 }
528 Ok(())
529 }
530}
531
532