Skip to main content

kevy_replicate/
slot.rs

1//! Per-replica slot bookkeeping for the primary's streaming loop.
2//!
3//! Each connected (or recently-disconnected) replica owns one
4//! [`ReplicaSlot`]. The [`SlotTable`] is the source of truth for "who
5//! still has a chance to resume from the backlog vs whom we have
6//! given up on". Slots are pure data; socket lifetime and the
7//! streaming loop's wakeups live in the wiring layer.
8//!
9//! Clock policy: all timestamps are `u64` monotonic nanoseconds
10//! supplied by the caller. The module never reads the clock itself —
11//! tests pump synthetic timestamps, and the production hook uses a
12//! single `Instant`-derived `u64`. Same pattern as the cached-clock
13//! work in `kevy-store`.
14
15/// One connected-or-recently-disconnected replica.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct ReplicaSlot {
18    /// Operator-set replica identifier (opaque to the primary other
19    /// than for slot bookkeeping).
20    pub id: String,
21    /// Monotonic ns timestamp of the most recent contact (handshake
22    /// or ack). Drives expiry under `reconnect_window_ms`.
23    pub last_seen_ns: u64,
24    /// Highest offset the replica has acked. The streaming loop
25    /// resumes sending from here on reconnect.
26    pub acked_offset: u64,
27}
28
29/// Mutable collection of [`ReplicaSlot`]s. Slots are addressed by id;
30/// duplicate insertion upserts (newer state wins). Replica counts in
31/// realistic deployments are small (< 16); a linear `Vec` is faster
32/// than a `HashMap` at this size and avoids the cost of the hasher
33/// the rest of the workspace uses.
34#[derive(Debug, Default)]
35pub struct SlotTable {
36    slots: Vec<ReplicaSlot>,
37}
38
39impl SlotTable {
40    /// A fresh empty table.
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// Number of slots currently tracked.
46    pub fn len(&self) -> usize {
47        self.slots.len()
48    }
49
50    /// Whether the table has no slots.
51    pub fn is_empty(&self) -> bool {
52        self.slots.is_empty()
53    }
54
55    /// Look up a slot by id.
56    pub fn get(&self, id: &str) -> Option<&ReplicaSlot> {
57        self.slots.iter().find(|s| s.id == id)
58    }
59
60    /// Iterate over all slots.
61    pub fn iter(&self) -> impl Iterator<Item = &ReplicaSlot> {
62        self.slots.iter()
63    }
64
65    /// Insert a new slot or update an existing one. Touching always
66    /// refreshes `last_seen_ns` and advances `acked_offset` if the
67    /// new value is higher (a slot's acked offset is monotonic — a
68    /// peer reporting a lower offset than we already recorded is
69    /// almost always a bug; the silent max() here defends the
70    /// invariant).
71    pub fn insert_or_touch(&mut self, id: &str, acked_offset: u64, now_ns: u64) {
72        if let Some(s) = self.slots.iter_mut().find(|s| s.id == id) {
73            s.last_seen_ns = now_ns;
74            if acked_offset > s.acked_offset {
75                s.acked_offset = acked_offset;
76            }
77            return;
78        }
79        self.slots.push(ReplicaSlot {
80            id: id.to_string(),
81            last_seen_ns: now_ns,
82            acked_offset,
83        });
84    }
85
86    /// Remove the slot with the given id. Returns `true` if a slot
87    /// was actually removed.
88    pub fn remove(&mut self, id: &str) -> bool {
89        if let Some(pos) = self.slots.iter().position(|s| s.id == id) {
90            self.slots.swap_remove(pos);
91            true
92        } else {
93            false
94        }
95    }
96
97    /// Drop slots whose `last_seen_ns + window_ns ≤ now_ns`. Returns
98    /// the ids of the dropped slots so callers can fire metrics or
99    /// log lines. Order is unspecified (swap-remove internally).
100    pub fn expire(&mut self, now_ns: u64, window_ns: u64) -> Vec<String> {
101        let mut dropped = Vec::new();
102        // Walk backward so swap_remove doesn't shift indices we still
103        // need to visit.
104        let mut i = self.slots.len();
105        while i > 0 {
106            i -= 1;
107            let cutoff = self.slots[i].last_seen_ns.saturating_add(window_ns);
108            if cutoff <= now_ns {
109                let s = self.slots.swap_remove(i);
110                dropped.push(s.id);
111            }
112        }
113        dropped
114    }
115
116    /// Lowest acked offset across all tracked slots. Useful for the
117    /// streaming loop to know how far back the backlog must still
118    /// retain frames; `None` when the table is empty.
119    pub fn min_acked_offset(&self) -> Option<u64> {
120        self.slots.iter().map(|s| s.acked_offset).min()
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127
128    #[test]
129    fn fresh_table_is_empty() {
130        let t = SlotTable::new();
131        assert!(t.is_empty());
132        assert_eq!(t.len(), 0);
133        assert_eq!(t.min_acked_offset(), None);
134    }
135
136    #[test]
137    fn insert_then_get_returns_the_slot() {
138        let mut t = SlotTable::new();
139        t.insert_or_touch("a", 5, 100);
140        let s = t.get("a").unwrap();
141        assert_eq!(s.id, "a");
142        assert_eq!(s.acked_offset, 5);
143        assert_eq!(s.last_seen_ns, 100);
144        assert_eq!(t.len(), 1);
145    }
146
147    #[test]
148    fn touch_advances_last_seen_and_acked_offset() {
149        let mut t = SlotTable::new();
150        t.insert_or_touch("a", 5, 100);
151        t.insert_or_touch("a", 9, 200);
152        let s = t.get("a").unwrap();
153        assert_eq!(s.acked_offset, 9);
154        assert_eq!(s.last_seen_ns, 200);
155        // No duplicate row.
156        assert_eq!(t.len(), 1);
157    }
158
159    #[test]
160    fn touch_with_lower_acked_offset_keeps_the_higher_one() {
161        let mut t = SlotTable::new();
162        t.insert_or_touch("a", 10, 100);
163        // Peer reports a lower offset (bug / stale ack); slot keeps 10.
164        t.insert_or_touch("a", 7, 200);
165        let s = t.get("a").unwrap();
166        assert_eq!(s.acked_offset, 10);
167        assert_eq!(s.last_seen_ns, 200, "last_seen still advances");
168    }
169
170    #[test]
171    fn remove_existing_returns_true_and_drops_slot() {
172        let mut t = SlotTable::new();
173        t.insert_or_touch("a", 1, 100);
174        assert!(t.remove("a"));
175        assert!(t.is_empty());
176        assert_eq!(t.get("a"), None);
177    }
178
179    #[test]
180    fn remove_missing_returns_false() {
181        let mut t = SlotTable::new();
182        assert!(!t.remove("missing"));
183    }
184
185    #[test]
186    fn expire_drops_slots_past_window() {
187        let mut t = SlotTable::new();
188        t.insert_or_touch("old", 1, 100);
189        t.insert_or_touch("fresh", 1, 500);
190        // Window 200 ns; "now" = 350. "old" expires (100+200 ≤ 350),
191        // "fresh" survives (500+200 > 350).
192        let dropped = t.expire(350, 200);
193        assert_eq!(dropped, vec!["old".to_string()]);
194        assert_eq!(t.len(), 1);
195        assert!(t.get("fresh").is_some());
196    }
197
198    #[test]
199    fn expire_when_nothing_expires_returns_empty() {
200        let mut t = SlotTable::new();
201        t.insert_or_touch("a", 1, 1000);
202        let dropped = t.expire(1100, 500); // 1000 + 500 = 1500 > 1100
203        assert!(dropped.is_empty());
204        assert_eq!(t.len(), 1);
205    }
206
207    #[test]
208    fn expire_with_overflow_window_saturates_does_not_panic() {
209        // `last_seen + window` must saturate at u64::MAX rather than
210        // wrap. With now=u64::MAX-1 the saturated cutoff stays above
211        // now, so the slot survives. (At now=u64::MAX a saturated
212        // cutoff would equal now and the slot does expire — but the
213        // safety invariant being asserted is "no panic".)
214        let mut t = SlotTable::new();
215        t.insert_or_touch("a", 1, u64::MAX - 10);
216        let dropped = t.expire(u64::MAX - 1, u64::MAX);
217        assert!(dropped.is_empty());
218        assert_eq!(t.len(), 1);
219    }
220
221    #[test]
222    fn min_acked_offset_returns_the_floor() {
223        let mut t = SlotTable::new();
224        t.insert_or_touch("a", 7, 100);
225        t.insert_or_touch("b", 3, 100);
226        t.insert_or_touch("c", 12, 100);
227        assert_eq!(t.min_acked_offset(), Some(3));
228    }
229
230    #[test]
231    fn iter_visits_every_slot() {
232        let mut t = SlotTable::new();
233        t.insert_or_touch("a", 1, 100);
234        t.insert_or_touch("b", 2, 100);
235        let mut ids: Vec<_> = t.iter().map(|s| s.id.clone()).collect();
236        ids.sort();
237        assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
238    }
239
240    #[test]
241    fn expire_all_when_window_zero() {
242        let mut t = SlotTable::new();
243        t.insert_or_touch("a", 1, 100);
244        t.insert_or_touch("b", 1, 100);
245        let dropped = t.expire(100, 0);
246        assert_eq!(dropped.len(), 2);
247        assert!(t.is_empty());
248    }
249}