1use crate::GtpMessage;
11use gbp_core::MemberId;
12use std::collections::HashMap;
13use std::collections::VecDeque;
14
15pub struct MessageHistory {
21 capacity: usize,
22 buffer: VecDeque<GtpMessage>,
23}
24
25impl MessageHistory {
26 pub fn with_capacity(capacity: usize) -> Self {
31 assert!(capacity > 0, "capacity must be > 0");
32 Self {
33 capacity,
34 buffer: VecDeque::with_capacity(capacity),
35 }
36 }
37
38 pub fn push(&mut self, msg: GtpMessage) -> bool {
41 if self.contains(msg.sender_id, msg.message_id) {
42 return false;
43 }
44 if self.buffer.len() == self.capacity {
45 self.buffer.pop_front();
46 }
47 self.buffer.push_back(msg);
48 true
49 }
50
51 pub fn len(&self) -> usize {
53 self.buffer.len()
54 }
55
56 pub fn is_empty(&self) -> bool {
58 self.buffer.is_empty()
59 }
60
61 pub fn contains(&self, sender_id: MemberId, message_id: u64) -> bool {
63 self.buffer
64 .iter()
65 .any(|m| m.sender_id == sender_id && m.message_id == message_id)
66 }
67
68 pub fn since<'a>(
71 &'a self,
72 watermark: &'a Watermark,
73 ) -> impl Iterator<Item = &'a GtpMessage> + 'a {
74 self.buffer.iter().filter(move |m| {
75 watermark
76 .last_seen
77 .get(&m.sender_id)
78 .copied()
79 .map(|hw| m.message_id > hw)
80 .unwrap_or(true)
81 })
82 }
83
84 pub fn since_for_sender(
87 &self,
88 sender_id: MemberId,
89 since_message_id: u64,
90 ) -> impl Iterator<Item = &GtpMessage> {
91 self.buffer
92 .iter()
93 .filter(move |m| m.sender_id == sender_id && m.message_id > since_message_id)
94 }
95
96 pub fn clear(&mut self) {
98 self.buffer.clear();
99 }
100}
101
102#[derive(Default, Clone, Debug)]
108pub struct Watermark {
109 last_seen: HashMap<MemberId, u64>,
110}
111
112impl Watermark {
113 pub fn new() -> Self {
115 Self::default()
116 }
117
118 pub fn observe(&mut self, sender_id: MemberId, message_id: u64) {
121 let entry = self.last_seen.entry(sender_id).or_insert(0);
122 if message_id > *entry {
123 *entry = message_id;
124 }
125 }
126
127 pub fn last_seen(&self, sender_id: MemberId) -> Option<u64> {
130 self.last_seen.get(&sender_id).copied()
131 }
132
133 pub fn iter(&self) -> impl Iterator<Item = (MemberId, u64)> + '_ {
135 self.last_seen.iter().map(|(&s, &m)| (s, m))
136 }
137
138 pub fn len(&self) -> usize {
140 self.last_seen.len()
141 }
142
143 pub fn is_empty(&self) -> bool {
145 self.last_seen.is_empty()
146 }
147
148 pub fn clear(&mut self) {
150 self.last_seen.clear();
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 fn msg(sender: u32, mid: u64) -> GtpMessage {
159 GtpMessage::plain(sender, mid, "x")
160 }
161
162 #[test]
163 fn push_dedups_and_evicts() {
164 let mut h = MessageHistory::with_capacity(3);
165 assert!(h.push(msg(1, 1)));
166 assert!(h.push(msg(1, 2)));
167 assert!(!h.push(msg(1, 1)));
168 assert!(h.push(msg(1, 3)));
169 assert!(h.push(msg(1, 4)));
170 assert_eq!(h.len(), 3);
171 assert!(!h.contains(1, 1));
172 assert!(h.contains(1, 4));
173 }
174
175 #[test]
176 fn since_returns_only_after_watermark() {
177 let mut h = MessageHistory::with_capacity(10);
178 for mid in 1..=5 {
179 h.push(msg(1, mid));
180 }
181 let mut wm = Watermark::new();
182 wm.observe(1, 3);
183 let after: Vec<u64> = h.since(&wm).map(|m| m.message_id).collect();
184 assert_eq!(after, vec![4, 5]);
185 }
186
187 #[test]
188 fn watermark_keeps_max() {
189 let mut wm = Watermark::new();
190 wm.observe(1, 5);
191 wm.observe(1, 3);
192 wm.observe(1, 7);
193 assert_eq!(wm.last_seen(1), Some(7));
194 }
195}