Skip to main content

gtp/
history.rs

1//! Bounded message log + per-sender resynchronisation watermark.
2//!
3//! GTP §11 (resync): a re-joining client SHOULD be able to ask the group for
4//! every message produced after the last `message_id` it observed. Servers
5//! and peer caches that hold recent messages can use [`MessageHistory`] as a
6//! ready-made ring buffer with `since(...)` queries, and re-joining clients
7//! can use [`Watermark`] to track the highest `message_id` they have seen
8//! per sender so they know where to resume from.
9
10use crate::GtpMessage;
11use gbp_core::MemberId;
12use std::collections::HashMap;
13use std::collections::VecDeque;
14
15/// Bounded ring-buffer of recent GTP messages.
16///
17/// The capacity is fixed at construction time; older messages are discarded
18/// once the buffer is full. Insertion is O(1); resync queries are O(n) over
19/// the buffer (typically small).
20pub struct MessageHistory {
21    capacity: usize,
22    buffer: VecDeque<GtpMessage>,
23}
24
25impl MessageHistory {
26    /// Builds a history that retains up to `capacity` messages.
27    ///
28    /// # Panics
29    /// Panics if `capacity == 0`.
30    pub fn with_capacity(capacity: usize) -> Self {
31        assert!(capacity > 0, "capacity must be > 0");
32        Self {
33            capacity,
34            buffer: VecDeque::with_capacity(capacity),
35        }
36    }
37
38    /// Records a message. Returns `true` if it was newly added, `false` if
39    /// `(sender_id, message_id)` was already present (idempotent insert).
40    pub fn push(&mut self, msg: GtpMessage) -> bool {
41        if self.contains(msg.sender_id, msg.message_id) {
42            return false;
43        }
44        if self.buffer.len() == self.capacity {
45            self.buffer.pop_front();
46        }
47        self.buffer.push_back(msg);
48        true
49    }
50
51    /// Number of messages currently in the buffer.
52    pub fn len(&self) -> usize {
53        self.buffer.len()
54    }
55
56    /// `true` if the buffer is empty.
57    pub fn is_empty(&self) -> bool {
58        self.buffer.is_empty()
59    }
60
61    /// Returns `true` if `(sender_id, message_id)` is present.
62    pub fn contains(&self, sender_id: MemberId, message_id: u64) -> bool {
63        self.buffer
64            .iter()
65            .any(|m| m.sender_id == sender_id && m.message_id == message_id)
66    }
67
68    /// Returns every message produced **after** the given watermark, in
69    /// insertion order. Use this to satisfy a peer's resync request.
70    pub fn since<'a>(
71        &'a self,
72        watermark: &'a Watermark,
73    ) -> impl Iterator<Item = &'a GtpMessage> + 'a {
74        self.buffer.iter().filter(move |m| {
75            watermark
76                .last_seen
77                .get(&m.sender_id)
78                .copied()
79                .map(|hw| m.message_id > hw)
80                .unwrap_or(true)
81        })
82    }
83
84    /// Returns every message from a single sender produced strictly after
85    /// `since_message_id`.
86    pub fn since_for_sender(
87        &self,
88        sender_id: MemberId,
89        since_message_id: u64,
90    ) -> impl Iterator<Item = &GtpMessage> {
91        self.buffer
92            .iter()
93            .filter(move |m| m.sender_id == sender_id && m.message_id > since_message_id)
94    }
95
96    /// Drops every message in the buffer.
97    pub fn clear(&mut self) {
98        self.buffer.clear();
99    }
100}
101
102/// Per-sender high-water mark of accepted GTP `message_id`s.
103///
104/// Use this to remember the latest `message_id` seen from each sender; a
105/// re-joining client can ship its watermark to the group and ask for every
106/// message above it (see [`MessageHistory::since`]).
107#[derive(Default, Clone, Debug)]
108pub struct Watermark {
109    last_seen: HashMap<MemberId, u64>,
110}
111
112impl Watermark {
113    /// Empty watermark (no messages seen).
114    pub fn new() -> Self {
115        Self::default()
116    }
117
118    /// Records that `message_id` from `sender_id` has been observed.
119    /// Keeps the maximum.
120    pub fn observe(&mut self, sender_id: MemberId, message_id: u64) {
121        let entry = self.last_seen.entry(sender_id).or_insert(0);
122        if message_id > *entry {
123            *entry = message_id;
124        }
125    }
126
127    /// Returns the last observed `message_id` for `sender_id`, or `None` if
128    /// nothing has been seen.
129    pub fn last_seen(&self, sender_id: MemberId) -> Option<u64> {
130        self.last_seen.get(&sender_id).copied()
131    }
132
133    /// Iterates `(sender_id, last_message_id)` pairs.
134    pub fn iter(&self) -> impl Iterator<Item = (MemberId, u64)> + '_ {
135        self.last_seen.iter().map(|(&s, &m)| (s, m))
136    }
137
138    /// Number of senders tracked.
139    pub fn len(&self) -> usize {
140        self.last_seen.len()
141    }
142
143    /// Empty?
144    pub fn is_empty(&self) -> bool {
145        self.last_seen.is_empty()
146    }
147
148    /// Drops every entry.
149    pub fn clear(&mut self) {
150        self.last_seen.clear();
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    fn msg(sender: u32, mid: u64) -> GtpMessage {
159        GtpMessage::plain(sender, mid, "x")
160    }
161
162    #[test]
163    fn push_dedups_and_evicts() {
164        let mut h = MessageHistory::with_capacity(3);
165        assert!(h.push(msg(1, 1)));
166        assert!(h.push(msg(1, 2)));
167        assert!(!h.push(msg(1, 1)));
168        assert!(h.push(msg(1, 3)));
169        assert!(h.push(msg(1, 4)));
170        assert_eq!(h.len(), 3);
171        assert!(!h.contains(1, 1));
172        assert!(h.contains(1, 4));
173    }
174
175    #[test]
176    fn since_returns_only_after_watermark() {
177        let mut h = MessageHistory::with_capacity(10);
178        for mid in 1..=5 {
179            h.push(msg(1, mid));
180        }
181        let mut wm = Watermark::new();
182        wm.observe(1, 3);
183        let after: Vec<u64> = h.since(&wm).map(|m| m.message_id).collect();
184        assert_eq!(after, vec![4, 5]);
185    }
186
187    #[test]
188    fn watermark_keeps_max() {
189        let mut wm = Watermark::new();
190        wm.observe(1, 5);
191        wm.observe(1, 3);
192        wm.observe(1, 7);
193        assert_eq!(wm.last_seen(1), Some(7));
194    }
195}