1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
// SPDX-License-Identifier: MIT OR Apache-2.0
//
// ferogram: async Telegram MTProto client in Rust
// https://github.com/ankit-chaubey/ferogram
//
// If you use or modify this code, keep this notice at the top of your file
// and include the LICENSE-MIT or LICENSE-APACHE file from this repository:
// https://github.com/ankit-chaubey/ferogram
use std::collections::{HashSet, VecDeque};
/// Bounded ring-buffer dedup cache. Sits beneath the pts machinery as a
/// last-resort guard against edge-case duplicates (e.g. a live socket update
/// racing a diff replay that covers the same message).
///
/// Keyed by (canonical_peer_id, msg_id). Capacity-bounded: evicts the oldest
/// entry on overflow so memory stays O(1).
///
/// Uses a `HashSet` for O(1) membership checks alongside a `VecDeque` for
/// ordered eviction. Same memory profile as the old VecDeque-only approach,
/// ~10x faster at high message rates.
pub struct BoundedDedupeCache {
// Ordered eviction queue.
order: VecDeque<(i64, i32)>,
// O(1) membership check; mirrors `order` exactly.
set: HashSet<(i64, i32)>,
capacity: usize,
/// Total duplicates suppressed since creation.
pub suppressed: u64,
}
impl BoundedDedupeCache {
pub fn new(capacity: usize) -> Self {
Self {
order: VecDeque::with_capacity(capacity),
set: HashSet::with_capacity(capacity),
capacity,
suppressed: 0,
}
}
/// Returns true if (peer_id, msg_id) was already seen, meaning the update
/// is a duplicate and should be dropped. Otherwise inserts and returns false.
#[inline]
pub fn check_and_insert(&mut self, peer_id: i64, msg_id: i32) -> bool {
if self.set.contains(&(peer_id, msg_id)) {
self.suppressed += 1;
tracing::debug!(
"[ferogram/dedup] duplicate suppressed msg_id={msg_id} peer={peer_id} \
(total={})",
self.suppressed
);
return true;
}
if self.order.len() >= self.capacity
&& let Some(evicted) = self.order.pop_front()
{
self.set.remove(&evicted);
}
self.order.push_back((peer_id, msg_id));
self.set.insert((peer_id, msg_id));
false
}
}
impl Default for BoundedDedupeCache {
fn default() -> Self {
Self::new(512)
}
}