Skip to main content

gtp/
history.rs

1//! Bounded message log + per-sender resynchronisation watermark.
2//!
3//! GTP §11 (resync): a re-joining client SHOULD be able to ask the group for
4//! every message produced after the last `message_id` it observed. Servers
5//! and peer caches that hold recent messages can use [`MessageHistory`] as a
6//! ready-made ring buffer with `since(...)` queries, and re-joining clients
7//! can use [`Watermark`] to track the highest `message_id` they have seen
8//! per sender so they know where to resume from.
9
10use crate::GtpMessage;
11use gbp_core::MemberId;
12use std::collections::HashMap;
13use std::collections::VecDeque;
14
15/// Bounded ring-buffer of recent GTP messages.
16///
17/// The capacity is fixed at construction time; older messages are discarded
18/// once the buffer is full. Insertion is O(1); resync queries are O(n) over
19/// the buffer (typically small).
20pub struct MessageHistory {
21    capacity: usize,
22    buffer: VecDeque<GtpMessage>,
23}
24
25impl MessageHistory {
26    /// Builds a history that retains up to `capacity` messages.
27    ///
28    /// # Panics
29    /// Panics if `capacity == 0`.
30    pub fn with_capacity(capacity: usize) -> Self {
31        assert!(capacity > 0, "capacity must be > 0");
32        Self {
33            capacity,
34            buffer: VecDeque::with_capacity(capacity),
35        }
36    }
37
38    /// Records a message. Returns `true` if it was newly added, `false` if
39    /// `(sender_id, message_id)` was already present (idempotent insert).
40    pub fn push(&mut self, msg: GtpMessage) -> bool {
41        if self.contains(msg.sender_id, msg.message_id) {
42            return false;
43        }
44        if self.buffer.len() == self.capacity {
45            self.buffer.pop_front();
46        }
47        self.buffer.push_back(msg);
48        true
49    }
50
51    /// Number of messages currently in the buffer.
52    pub fn len(&self) -> usize {
53        self.buffer.len()
54    }
55
56    /// `true` if the buffer is empty.
57    pub fn is_empty(&self) -> bool {
58        self.buffer.is_empty()
59    }
60
61    /// Returns `true` if `(sender_id, message_id)` is present.
62    pub fn contains(&self, sender_id: MemberId, message_id: u64) -> bool {
63        self.buffer
64            .iter()
65            .any(|m| m.sender_id == sender_id && m.message_id == message_id)
66    }
67
68    /// Returns every message produced **after** the given watermark, in
69    /// insertion order. Use this to satisfy a peer's resync request.
70    pub fn since<'a>(&'a self, watermark: &'a Watermark) -> impl Iterator<Item = &'a GtpMessage> + 'a {
71        self.buffer.iter().filter(move |m| {
72            watermark
73                .last_seen
74                .get(&m.sender_id)
75                .copied()
76                .map(|hw| m.message_id > hw)
77                .unwrap_or(true)
78        })
79    }
80
81    /// Returns every message from a single sender produced strictly after
82    /// `since_message_id`.
83    pub fn since_for_sender(
84        &self,
85        sender_id: MemberId,
86        since_message_id: u64,
87    ) -> impl Iterator<Item = &GtpMessage> {
88        self.buffer
89            .iter()
90            .filter(move |m| m.sender_id == sender_id && m.message_id > since_message_id)
91    }
92
93    /// Drops every message in the buffer.
94    pub fn clear(&mut self) {
95        self.buffer.clear();
96    }
97}
98
99/// Per-sender high-water mark of accepted GTP `message_id`s.
100///
101/// Use this to remember the latest `message_id` seen from each sender; a
102/// re-joining client can ship its watermark to the group and ask for every
103/// message above it (see [`MessageHistory::since`]).
104#[derive(Default, Clone, Debug)]
105pub struct Watermark {
106    last_seen: HashMap<MemberId, u64>,
107}
108
109impl Watermark {
110    /// Empty watermark (no messages seen).
111    pub fn new() -> Self {
112        Self::default()
113    }
114
115    /// Records that `message_id` from `sender_id` has been observed.
116    /// Keeps the maximum.
117    pub fn observe(&mut self, sender_id: MemberId, message_id: u64) {
118        let entry = self.last_seen.entry(sender_id).or_insert(0);
119        if message_id > *entry {
120            *entry = message_id;
121        }
122    }
123
124    /// Returns the last observed `message_id` for `sender_id`, or `None` if
125    /// nothing has been seen.
126    pub fn last_seen(&self, sender_id: MemberId) -> Option<u64> {
127        self.last_seen.get(&sender_id).copied()
128    }
129
130    /// Iterates `(sender_id, last_message_id)` pairs.
131    pub fn iter(&self) -> impl Iterator<Item = (MemberId, u64)> + '_ {
132        self.last_seen.iter().map(|(&s, &m)| (s, m))
133    }
134
135    /// Number of senders tracked.
136    pub fn len(&self) -> usize {
137        self.last_seen.len()
138    }
139
140    /// Empty?
141    pub fn is_empty(&self) -> bool {
142        self.last_seen.is_empty()
143    }
144
145    /// Drops every entry.
146    pub fn clear(&mut self) {
147        self.last_seen.clear();
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    fn msg(sender: u32, mid: u64) -> GtpMessage {
156        GtpMessage::plain(sender, mid, "x")
157    }
158
159    #[test]
160    fn push_dedups_and_evicts() {
161        let mut h = MessageHistory::with_capacity(3);
162        assert!(h.push(msg(1, 1)));
163        assert!(h.push(msg(1, 2)));
164        assert!(!h.push(msg(1, 1)));
165        assert!(h.push(msg(1, 3)));
166        assert!(h.push(msg(1, 4)));
167        assert_eq!(h.len(), 3);
168        assert!(!h.contains(1, 1));
169        assert!(h.contains(1, 4));
170    }
171
172    #[test]
173    fn since_returns_only_after_watermark() {
174        let mut h = MessageHistory::with_capacity(10);
175        for mid in 1..=5 {
176            h.push(msg(1, mid));
177        }
178        let mut wm = Watermark::new();
179        wm.observe(1, 3);
180        let after: Vec<u64> = h.since(&wm).map(|m| m.message_id).collect();
181        assert_eq!(after, vec![4, 5]);
182    }
183
184    #[test]
185    fn watermark_keeps_max() {
186        let mut wm = Watermark::new();
187        wm.observe(1, 5);
188        wm.observe(1, 3);
189        wm.observe(1, 7);
190        assert_eq!(wm.last_seen(1), Some(7));
191    }
192}