1use std::collections::{HashMap, HashSet};
13use std::time::Instant;
14
15use layer_tl_types as tl;
16use layer_tl_types::{Cursor, Deserializable};
17
18use crate::{Client, InvocationError, RpcError, update};
19
20const POSSIBLE_GAP_DEADLINE_MS: u64 = 1_000;
25
26#[derive(Default)]
29pub struct PossibleGapBuffer {
30 channel: HashMap<i64, (Vec<update::Update>, Instant)>,
32 global: Option<(Vec<update::Update>, Instant)>,
34}
35
36impl PossibleGapBuffer {
37 pub fn new() -> Self { Self::default() }
38
39 pub fn push_global(&mut self, upd: update::Update) {
41 let entry = self.global.get_or_insert_with(|| (Vec::new(), Instant::now()));
42 entry.0.push(upd);
43 }
44
45 pub fn push_channel(&mut self, channel_id: i64, upd: update::Update) {
47 let entry = self.channel.entry(channel_id).or_insert_with(|| (Vec::new(), Instant::now()));
48 entry.0.push(upd);
49 }
50
51 pub fn global_deadline_elapsed(&self) -> bool {
53 self.global.as_ref()
54 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
55 .unwrap_or(false)
56 }
57
58 pub fn channel_deadline_elapsed(&self, channel_id: i64) -> bool {
60 self.channel.get(&channel_id)
61 .map(|(_, t)| t.elapsed().as_millis() as u64 >= POSSIBLE_GAP_DEADLINE_MS)
62 .unwrap_or(false)
63 }
64
65 pub fn has_global(&self) -> bool { self.global.is_some() }
67
68 pub fn has_channel(&self, channel_id: i64) -> bool { self.channel.contains_key(&channel_id) }
70
71 pub fn drain_global(&mut self) -> Vec<update::Update> {
73 self.global.take().map(|(v, _)| v).unwrap_or_default()
74 }
75
76 pub fn drain_channel(&mut self, channel_id: i64) -> Vec<update::Update> {
78 self.channel.remove(&channel_id).map(|(v, _)| v).unwrap_or_default()
79 }
80}
81
82#[derive(Debug, Clone, Default)]
89pub struct PtsState {
90 pub pts: i32,
92 pub qts: i32,
94 pub date: i32,
96 pub seq: i32,
98 pub channel_pts: HashMap<i64, i32>,
100 pub last_update_at: Option<Instant>,
102 pub getting_diff_for: HashSet<i64>,
106}
107
108
109impl PtsState {
110 pub fn from_server_state(s: &tl::types::updates::State) -> Self {
111 Self {
112 pts: s.pts, qts: s.qts, date: s.date, seq: s.seq,
113 channel_pts: HashMap::new(),
114 last_update_at: Some(Instant::now()),
115 getting_diff_for: HashSet::new(),
116 }
117 }
118
119 pub fn touch(&mut self) {
121 self.last_update_at = Some(Instant::now());
122 }
123
124 pub fn deadline_exceeded(&self) -> bool {
126 self.last_update_at.as_ref()
127 .map(|t| t.elapsed().as_secs() > 15 * 60)
128 .unwrap_or(false)
129 }
130
131 pub fn check_pts(&self, new_pts: i32, pts_count: i32) -> PtsCheckResult {
133 let expected = self.pts + pts_count;
134 if new_pts == expected {
135 PtsCheckResult::Ok
136 } else if new_pts > expected {
137 PtsCheckResult::Gap { expected, got: new_pts }
138 } else {
139 PtsCheckResult::Duplicate
140 }
141 }
142
143 pub fn check_qts(&self, new_qts: i32, qts_count: i32) -> PtsCheckResult {
145 let expected = self.qts + qts_count;
146 if new_qts == expected {
147 PtsCheckResult::Ok
148 } else if new_qts > expected {
149 PtsCheckResult::Gap { expected, got: new_qts }
150 } else {
151 PtsCheckResult::Duplicate
152 }
153 }
154
155 pub fn check_seq(&self, _new_seq: i32, seq_start: i32) -> PtsCheckResult {
157 if self.seq == 0 { return PtsCheckResult::Ok; } let expected = self.seq + 1;
159 if seq_start == expected {
160 PtsCheckResult::Ok
161 } else if seq_start > expected {
162 PtsCheckResult::Gap { expected, got: seq_start }
163 } else {
164 PtsCheckResult::Duplicate
165 }
166 }
167
168 pub fn check_channel_pts(&self, channel_id: i64, new_pts: i32, pts_count: i32) -> PtsCheckResult {
170 let local = self.channel_pts.get(&channel_id).copied().unwrap_or(0);
171 if local == 0 {
172 return PtsCheckResult::Ok;
173 }
174 let expected = local + pts_count;
175 if new_pts == expected {
176 PtsCheckResult::Ok
177 } else if new_pts > expected {
178 PtsCheckResult::Gap { expected, got: new_pts }
179 } else {
180 PtsCheckResult::Duplicate
181 }
182 }
183
184 pub fn advance(&mut self, new_pts: i32) {
186 if new_pts > self.pts { self.pts = new_pts; }
187 self.touch();
188 }
189
190 pub fn advance_qts(&mut self, new_qts: i32) {
192 if new_qts > self.qts { self.qts = new_qts; }
193 self.touch();
194 }
195
196 pub fn advance_seq(&mut self, new_seq: i32) {
198 if new_seq > self.seq { self.seq = new_seq; }
199 }
200
201 pub fn advance_channel(&mut self, channel_id: i64, new_pts: i32) {
203 let entry = self.channel_pts.entry(channel_id).or_insert(0);
204 if new_pts > *entry { *entry = new_pts; }
205 self.touch();
206 }
207}
208
209#[derive(Debug, PartialEq, Eq)]
210pub enum PtsCheckResult {
211 Ok,
212 Gap { expected: i32, got: i32 },
213 Duplicate,
214}
215
216impl Client {
219 pub async fn get_difference(&self) -> Result<Vec<update::Update>, InvocationError> {
223 let (pts, qts, date) = {
224 let s = self.inner.pts_state.lock().await;
225 (s.pts, s.qts, s.date)
226 };
227
228 if pts == 0 {
229 self.sync_pts_state().await?;
230 return Ok(vec![]);
231 }
232
233 log::info!("[layer] getDifference (pts={pts}, qts={qts}, date={date}) …");
234
235 let req = tl::functions::updates::GetDifference {
236 pts,
237 pts_limit: None,
238 pts_total_limit: None,
239 date,
240 qts,
241 qts_limit: None,
242 };
243
244 let body = self.rpc_call_raw_pub(&req).await?;
245 let mut cur = Cursor::from_slice(&body);
246 let diff = tl::enums::updates::Difference::deserialize(&mut cur)?;
247
248 let mut updates = Vec::new();
249 match diff {
250 tl::enums::updates::Difference::Empty(e) => {
251 let mut s = self.inner.pts_state.lock().await;
252 s.date = e.date;
253 s.seq = e.seq;
254 s.touch();
255 log::debug!("[layer] getDifference: empty (seq={})", e.seq);
256 }
257 tl::enums::updates::Difference::Difference(d) => {
258 log::info!("[layer] getDifference: {} messages, {} updates",
259 d.new_messages.len(), d.other_updates.len());
260 self.cache_users_slice_pub(&d.users).await;
261 self.cache_chats_slice_pub(&d.chats).await;
262 for msg in d.new_messages {
263 updates.push(update::Update::NewMessage(
264 update::IncomingMessage::from_raw(msg)
265 ));
266 }
267 for upd in d.other_updates {
268 updates.extend(update::from_single_update_pub(upd));
269 }
270 let tl::enums::updates::State::State(ns) = d.state;
271 let saved_channel_pts = {
275 let s = self.inner.pts_state.lock().await;
276 s.channel_pts.clone()
277 };
278 let mut new_state = PtsState::from_server_state(&ns);
279 for (cid, cpts) in saved_channel_pts {
280 new_state.channel_pts.entry(cid).or_insert(cpts);
281 }
282 *self.inner.pts_state.lock().await = new_state;
283 }
284 tl::enums::updates::Difference::Slice(d) => {
285 log::info!("[layer] getDifference slice: {} messages, {} updates",
286 d.new_messages.len(), d.other_updates.len());
287 self.cache_users_slice_pub(&d.users).await;
288 self.cache_chats_slice_pub(&d.chats).await;
289 for msg in d.new_messages {
290 updates.push(update::Update::NewMessage(
291 update::IncomingMessage::from_raw(msg)
292 ));
293 }
294 for upd in d.other_updates {
295 updates.extend(update::from_single_update_pub(upd));
296 }
297 let tl::enums::updates::State::State(ns) = d.intermediate_state;
298 let saved_channel_pts = {
299 let s = self.inner.pts_state.lock().await;
300 s.channel_pts.clone()
301 };
302 let mut new_state = PtsState::from_server_state(&ns);
303 for (cid, cpts) in saved_channel_pts {
304 new_state.channel_pts.entry(cid).or_insert(cpts);
305 }
306 *self.inner.pts_state.lock().await = new_state;
307 }
308 tl::enums::updates::Difference::TooLong(d) => {
309 log::warn!("[layer] getDifference: TooLong (pts={}) — re-syncing", d.pts);
310 self.inner.pts_state.lock().await.pts = d.pts;
311 self.sync_pts_state().await?;
312 }
313 }
314
315 Ok(updates)
316 }
317
318 pub async fn get_channel_difference(
322 &self,
323 channel_id: i64,
324 ) -> Result<Vec<update::Update>, InvocationError> {
325 let local_pts = self.inner.pts_state.lock().await
326 .channel_pts.get(&channel_id).copied().unwrap_or(0);
327
328 let mut access_hash = self.inner.peer_cache.read().await
329 .channels.get(&channel_id).copied().unwrap_or(0);
330
331 if access_hash == 0 {
334 log::debug!("[layer] channel {channel_id}: access_hash missing, attempting resolve via GetChannels");
335 let input = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
336 channel_id,
337 access_hash: 0,
338 });
339 let req = tl::functions::channels::GetChannels { id: vec![input] };
340 if let Ok(body) = self.rpc_call_raw_pub(&req).await {
341 let mut cur = Cursor::from_slice(&body);
342 if let Ok(chats) = tl::enums::messages::Chats::deserialize(&mut cur) {
343 let chat_list = match chats {
344 tl::enums::messages::Chats::Chats(c) => c.chats,
345 tl::enums::messages::Chats::Slice(c) => c.chats,
346 };
347 self.cache_chats_slice_pub(&chat_list).await;
348 access_hash = self.inner.peer_cache.read().await
349 .channels.get(&channel_id).copied().unwrap_or(0);
350 }
351 }
352 }
353
354 if access_hash == 0 {
356 log::warn!("[layer] channel {channel_id}: access_hash unknown, cannot call getChannelDifference");
357 return Err(InvocationError::Rpc(RpcError {
358 code: 400,
359 name: "CHANNEL_INVALID".into(),
360 value: None,
361 }));
362 }
363
364 log::info!("[layer] getChannelDifference channel_id={channel_id} pts={local_pts}");
365
366 let channel = tl::enums::InputChannel::InputChannel(tl::types::InputChannel {
367 channel_id,
368 access_hash,
369 });
370
371 let req = tl::functions::updates::GetChannelDifference {
372 force: false,
373 channel,
374 filter: tl::enums::ChannelMessagesFilter::Empty,
375 pts: local_pts.max(1),
376 limit: 100,
377 };
378
379 let body = match self.rpc_call_raw_pub(&req).await {
380 Ok(b) => b,
381 Err(InvocationError::Rpc(ref e)) if e.name == "PERSISTENT_TIMESTAMP_OUTDATED" => {
382 log::debug!("[layer] G-20 PERSISTENT_TIMESTAMP_OUTDATED — skipping diff");
384 return Ok(vec![]);
385 }
386 Err(e) => return Err(e),
387 };
388 let mut cur = Cursor::from_slice(&body);
389 let diff = tl::enums::updates::ChannelDifference::deserialize(&mut cur)?;
390
391 let mut updates = Vec::new();
392
393 match diff {
394 tl::enums::updates::ChannelDifference::Empty(e) => {
395 log::debug!("[layer] getChannelDifference: empty (pts={})", e.pts);
396 self.inner.pts_state.lock().await.advance_channel(channel_id, e.pts);
397 }
398 tl::enums::updates::ChannelDifference::ChannelDifference(d) => {
399 log::info!("[layer] getChannelDifference: {} messages, {} updates",
400 d.new_messages.len(), d.other_updates.len());
401 self.cache_users_slice_pub(&d.users).await;
402 self.cache_chats_slice_pub(&d.chats).await;
403 for msg in d.new_messages {
404 updates.push(update::Update::NewMessage(
405 update::IncomingMessage::from_raw(msg)
406 ));
407 }
408 for upd in d.other_updates {
409 updates.extend(update::from_single_update_pub(upd));
410 }
411 self.inner.pts_state.lock().await.advance_channel(channel_id, d.pts);
412 }
413 tl::enums::updates::ChannelDifference::TooLong(d) => {
414 log::warn!("[layer] getChannelDifference TooLong — replaying messages, resetting pts");
415 self.cache_users_slice_pub(&d.users).await;
416 self.cache_chats_slice_pub(&d.chats).await;
417 for msg in d.messages {
418 updates.push(update::Update::NewMessage(
419 update::IncomingMessage::from_raw(msg)
420 ));
421 }
422 self.inner.pts_state.lock().await.advance_channel(channel_id, 0);
423 }
424 }
425
426 Ok(updates)
427 }
428
429 pub async fn sync_pts_state(&self) -> Result<(), InvocationError> {
432 let body = self.rpc_call_raw_pub(&tl::functions::updates::GetState {}).await?;
433 let mut cur = Cursor::from_slice(&body);
434 let tl::enums::updates::State::State(s) = tl::enums::updates::State::deserialize(&mut cur)?;
435 let mut state = self.inner.pts_state.lock().await;
436 state.pts = s.pts;
437 state.qts = s.qts;
438 state.date = s.date;
439 state.seq = s.seq;
440 state.touch();
441 log::info!("[layer] pts synced: pts={}, qts={}, seq={}", s.pts, s.qts, s.seq);
442 Ok(())
443 }
444
445 pub async fn check_and_fill_gap(
449 &self,
450 new_pts: i32,
451 pts_count: i32,
452 upd: Option<update::Update>,
453 ) -> Result<Vec<update::Update>, InvocationError> {
454 let result = self.inner.pts_state.lock().await.check_pts(new_pts, pts_count);
455 match result {
456 PtsCheckResult::Ok => {
457 let mut buffered = self.inner.possible_gap.lock().await.drain_global();
460 self.inner.pts_state.lock().await.advance(new_pts);
461 if let Some(u) = upd {
462 buffered.push(u);
463 }
464 Ok(buffered)
465 }
466 PtsCheckResult::Gap { expected, got } => {
467 if let Some(u) = upd {
471 self.inner.possible_gap.lock().await.push_global(u);
472 }
473 let deadline_elapsed = self.inner.possible_gap.lock().await.global_deadline_elapsed();
474 if deadline_elapsed {
475 log::warn!("[layer] global pts gap: expected {expected}, got {got} — getDifference");
476 let buffered = self.inner.possible_gap.lock().await.drain_global();
477 let mut diff_updates = self.get_difference().await?;
478 diff_updates.splice(0..0, buffered);
480 Ok(diff_updates)
481 } else {
482 log::debug!("[layer] global pts gap: expected {expected}, got {got} — buffering (possible gap)");
483 Ok(vec![])
484 }
485 }
486 PtsCheckResult::Duplicate => {
487 log::debug!("[layer] global pts duplicate, discarding");
488 Ok(vec![])
489 }
490 }
491 }
492
493 pub async fn check_and_fill_qts_gap(
495 &self,
496 new_qts: i32,
497 qts_count: i32,
498 ) -> Result<Vec<update::Update>, InvocationError> {
499 let result = self.inner.pts_state.lock().await.check_qts(new_qts, qts_count);
500 match result {
501 PtsCheckResult::Ok => {
502 self.inner.pts_state.lock().await.advance_qts(new_qts);
503 Ok(vec![])
504 }
505 PtsCheckResult::Gap { expected, got } => {
506 log::warn!("[layer] qts gap: expected {expected}, got {got} — getDifference");
507 self.get_difference().await
508 }
509 PtsCheckResult::Duplicate => Ok(vec![]),
510 }
511 }
512
513 pub async fn check_and_fill_seq_gap(
515 &self,
516 new_seq: i32,
517 seq_start: i32,
518 ) -> Result<Vec<update::Update>, InvocationError> {
519 let result = self.inner.pts_state.lock().await.check_seq(new_seq, seq_start);
520 match result {
521 PtsCheckResult::Ok => {
522 self.inner.pts_state.lock().await.advance_seq(new_seq);
523 Ok(vec![])
524 }
525 PtsCheckResult::Gap { expected, got } => {
526 log::warn!("[layer] seq gap: expected {expected}, got {got} — getDifference");
527 self.get_difference().await
528 }
529 PtsCheckResult::Duplicate => Ok(vec![]),
530 }
531 }
532
533 pub async fn check_and_fill_channel_gap(
535 &self,
536 channel_id: i64,
537 new_pts: i32,
538 pts_count: i32,
539 upd: Option<update::Update>,
540 ) -> Result<Vec<update::Update>, InvocationError> {
541 if self.inner.pts_state.lock().await.getting_diff_for.contains(&channel_id) {
544 log::debug!("[layer] channel {channel_id} diff already in flight, skipping");
545 if let Some(u) = upd {
546 self.inner.possible_gap.lock().await.push_channel(channel_id, u);
547 }
548 return Ok(vec![]);
549 }
550
551 let result = self.inner.pts_state.lock().await
552 .check_channel_pts(channel_id, new_pts, pts_count);
553 match result {
554 PtsCheckResult::Ok => {
555 let mut buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
556 self.inner.pts_state.lock().await.advance_channel(channel_id, new_pts);
557 if let Some(u) = upd {
558 buffered.push(u);
559 }
560 Ok(buffered)
561 }
562 PtsCheckResult::Gap { expected, got } => {
563 if let Some(u) = upd {
564 self.inner.possible_gap.lock().await.push_channel(channel_id, u);
565 }
566 let deadline_elapsed = self.inner.possible_gap.lock().await
567 .channel_deadline_elapsed(channel_id);
568 if deadline_elapsed {
569 log::warn!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — getChannelDifference");
570 self.inner.pts_state.lock().await.getting_diff_for.insert(channel_id);
572 let buffered = self.inner.possible_gap.lock().await.drain_channel(channel_id);
573 match self.get_channel_difference(channel_id).await {
574 Ok(mut diff_updates) => {
575 self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
577 diff_updates.splice(0..0, buffered);
578 Ok(diff_updates)
579 }
580 Err(InvocationError::Rpc(ref e))
584 if e.name == "CHANNEL_INVALID"
585 || e.name == "CHANNEL_PRIVATE"
586 || e.name == "CHANNEL_NOT_MODIFIED" =>
587 {
588 log::warn!(
589 "[layer] channel {channel_id}: {} — skipping gap, advancing pts to {got}",
590 e.name
591 );
592 self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
594 self.inner.pts_state.lock().await.advance_channel(channel_id, got);
595 Ok(buffered)
596 }
597 Err(InvocationError::Deserialize(ref msg)) => {
598 log::warn!(
603 "[layer] channel {channel_id}: deserialize error ({msg}) — skipping gap, advancing pts to {got}"
604 );
605 self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
606 self.inner.pts_state.lock().await.advance_channel(channel_id, got);
607 Ok(buffered)
608 }
609 Err(e) => {
610 self.inner.pts_state.lock().await.getting_diff_for.remove(&channel_id);
612 Err(e)
613 }
614 }
615 } else {
616 log::debug!("[layer] channel {channel_id} pts gap: expected {expected}, got {got} — buffering");
617 Ok(vec![])
618 }
619 }
620 PtsCheckResult::Duplicate => {
621 log::debug!("[layer] channel {channel_id} pts duplicate, discarding");
622 Ok(vec![])
623 }
624 }
625 }
626
627 pub async fn check_update_deadline(&self) -> Result<(), InvocationError> {
630 let exceeded = self.inner.pts_state.lock().await.deadline_exceeded();
631 if exceeded {
632 log::info!("[layer] G-16 update deadline exceeded — fetching getDifference");
633 let updates = self.get_difference().await?;
634 for u in updates { let _ = self.inner.update_tx.send(u); }
635 }
636 Ok(())
637 }
638}
639
640