1use crate::GtpMessage;
11use gbp_core::MemberId;
12use std::collections::HashMap;
13use std::collections::VecDeque;
14
15pub struct MessageHistory {
21 capacity: usize,
22 buffer: VecDeque<GtpMessage>,
23}
24
25impl MessageHistory {
26 pub fn with_capacity(capacity: usize) -> Self {
31 assert!(capacity > 0, "capacity must be > 0");
32 Self {
33 capacity,
34 buffer: VecDeque::with_capacity(capacity),
35 }
36 }
37
38 pub fn push(&mut self, msg: GtpMessage) -> bool {
41 if self.contains(msg.sender_id, msg.message_id) {
42 return false;
43 }
44 if self.buffer.len() == self.capacity {
45 self.buffer.pop_front();
46 }
47 self.buffer.push_back(msg);
48 true
49 }
50
51 pub fn len(&self) -> usize {
53 self.buffer.len()
54 }
55
56 pub fn is_empty(&self) -> bool {
58 self.buffer.is_empty()
59 }
60
61 pub fn contains(&self, sender_id: MemberId, message_id: u64) -> bool {
63 self.buffer
64 .iter()
65 .any(|m| m.sender_id == sender_id && m.message_id == message_id)
66 }
67
68 pub fn since<'a>(&'a self, watermark: &'a Watermark) -> impl Iterator<Item = &'a GtpMessage> + 'a {
71 self.buffer.iter().filter(move |m| {
72 watermark
73 .last_seen
74 .get(&m.sender_id)
75 .copied()
76 .map(|hw| m.message_id > hw)
77 .unwrap_or(true)
78 })
79 }
80
81 pub fn since_for_sender(
84 &self,
85 sender_id: MemberId,
86 since_message_id: u64,
87 ) -> impl Iterator<Item = &GtpMessage> {
88 self.buffer
89 .iter()
90 .filter(move |m| m.sender_id == sender_id && m.message_id > since_message_id)
91 }
92
93 pub fn clear(&mut self) {
95 self.buffer.clear();
96 }
97}
98
99#[derive(Default, Clone, Debug)]
105pub struct Watermark {
106 last_seen: HashMap<MemberId, u64>,
107}
108
109impl Watermark {
110 pub fn new() -> Self {
112 Self::default()
113 }
114
115 pub fn observe(&mut self, sender_id: MemberId, message_id: u64) {
118 let entry = self.last_seen.entry(sender_id).or_insert(0);
119 if message_id > *entry {
120 *entry = message_id;
121 }
122 }
123
124 pub fn last_seen(&self, sender_id: MemberId) -> Option<u64> {
127 self.last_seen.get(&sender_id).copied()
128 }
129
130 pub fn iter(&self) -> impl Iterator<Item = (MemberId, u64)> + '_ {
132 self.last_seen.iter().map(|(&s, &m)| (s, m))
133 }
134
135 pub fn len(&self) -> usize {
137 self.last_seen.len()
138 }
139
140 pub fn is_empty(&self) -> bool {
142 self.last_seen.is_empty()
143 }
144
145 pub fn clear(&mut self) {
147 self.last_seen.clear();
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 fn msg(sender: u32, mid: u64) -> GtpMessage {
156 GtpMessage::plain(sender, mid, "x")
157 }
158
159 #[test]
160 fn push_dedups_and_evicts() {
161 let mut h = MessageHistory::with_capacity(3);
162 assert!(h.push(msg(1, 1)));
163 assert!(h.push(msg(1, 2)));
164 assert!(!h.push(msg(1, 1)));
165 assert!(h.push(msg(1, 3)));
166 assert!(h.push(msg(1, 4)));
167 assert_eq!(h.len(), 3);
168 assert!(!h.contains(1, 1));
169 assert!(h.contains(1, 4));
170 }
171
172 #[test]
173 fn since_returns_only_after_watermark() {
174 let mut h = MessageHistory::with_capacity(10);
175 for mid in 1..=5 {
176 h.push(msg(1, mid));
177 }
178 let mut wm = Watermark::new();
179 wm.observe(1, 3);
180 let after: Vec<u64> = h.since(&wm).map(|m| m.message_id).collect();
181 assert_eq!(after, vec![4, 5]);
182 }
183
184 #[test]
185 fn watermark_keeps_max() {
186 let mut wm = Watermark::new();
187 wm.observe(1, 5);
188 wm.observe(1, 3);
189 wm.observe(1, 7);
190 assert_eq!(wm.last_seen(1), Some(7));
191 }
192}