1#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct ReplicaSlot {
18 pub id: String,
21 pub last_seen_ns: u64,
24 pub acked_offset: u64,
27}
28
29#[derive(Debug, Default)]
35pub struct SlotTable {
36 slots: Vec<ReplicaSlot>,
37}
38
39impl SlotTable {
40 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn len(&self) -> usize {
47 self.slots.len()
48 }
49
50 pub fn is_empty(&self) -> bool {
52 self.slots.is_empty()
53 }
54
55 pub fn get(&self, id: &str) -> Option<&ReplicaSlot> {
57 self.slots.iter().find(|s| s.id == id)
58 }
59
60 pub fn iter(&self) -> impl Iterator<Item = &ReplicaSlot> {
62 self.slots.iter()
63 }
64
65 pub fn insert_or_touch(&mut self, id: &str, acked_offset: u64, now_ns: u64) {
72 if let Some(s) = self.slots.iter_mut().find(|s| s.id == id) {
73 s.last_seen_ns = now_ns;
74 if acked_offset > s.acked_offset {
75 s.acked_offset = acked_offset;
76 }
77 return;
78 }
79 self.slots.push(ReplicaSlot {
80 id: id.to_string(),
81 last_seen_ns: now_ns,
82 acked_offset,
83 });
84 }
85
86 pub fn remove(&mut self, id: &str) -> bool {
89 if let Some(pos) = self.slots.iter().position(|s| s.id == id) {
90 self.slots.swap_remove(pos);
91 true
92 } else {
93 false
94 }
95 }
96
97 pub fn expire(&mut self, now_ns: u64, window_ns: u64) -> Vec<String> {
101 let mut dropped = Vec::new();
102 let mut i = self.slots.len();
105 while i > 0 {
106 i -= 1;
107 let cutoff = self.slots[i].last_seen_ns.saturating_add(window_ns);
108 if cutoff <= now_ns {
109 let s = self.slots.swap_remove(i);
110 dropped.push(s.id);
111 }
112 }
113 dropped
114 }
115
116 pub fn min_acked_offset(&self) -> Option<u64> {
120 self.slots.iter().map(|s| s.acked_offset).min()
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127
128 #[test]
129 fn fresh_table_is_empty() {
130 let t = SlotTable::new();
131 assert!(t.is_empty());
132 assert_eq!(t.len(), 0);
133 assert_eq!(t.min_acked_offset(), None);
134 }
135
136 #[test]
137 fn insert_then_get_returns_the_slot() {
138 let mut t = SlotTable::new();
139 t.insert_or_touch("a", 5, 100);
140 let s = t.get("a").unwrap();
141 assert_eq!(s.id, "a");
142 assert_eq!(s.acked_offset, 5);
143 assert_eq!(s.last_seen_ns, 100);
144 assert_eq!(t.len(), 1);
145 }
146
147 #[test]
148 fn touch_advances_last_seen_and_acked_offset() {
149 let mut t = SlotTable::new();
150 t.insert_or_touch("a", 5, 100);
151 t.insert_or_touch("a", 9, 200);
152 let s = t.get("a").unwrap();
153 assert_eq!(s.acked_offset, 9);
154 assert_eq!(s.last_seen_ns, 200);
155 assert_eq!(t.len(), 1);
157 }
158
159 #[test]
160 fn touch_with_lower_acked_offset_keeps_the_higher_one() {
161 let mut t = SlotTable::new();
162 t.insert_or_touch("a", 10, 100);
163 t.insert_or_touch("a", 7, 200);
165 let s = t.get("a").unwrap();
166 assert_eq!(s.acked_offset, 10);
167 assert_eq!(s.last_seen_ns, 200, "last_seen still advances");
168 }
169
170 #[test]
171 fn remove_existing_returns_true_and_drops_slot() {
172 let mut t = SlotTable::new();
173 t.insert_or_touch("a", 1, 100);
174 assert!(t.remove("a"));
175 assert!(t.is_empty());
176 assert_eq!(t.get("a"), None);
177 }
178
179 #[test]
180 fn remove_missing_returns_false() {
181 let mut t = SlotTable::new();
182 assert!(!t.remove("missing"));
183 }
184
185 #[test]
186 fn expire_drops_slots_past_window() {
187 let mut t = SlotTable::new();
188 t.insert_or_touch("old", 1, 100);
189 t.insert_or_touch("fresh", 1, 500);
190 let dropped = t.expire(350, 200);
193 assert_eq!(dropped, vec!["old".to_string()]);
194 assert_eq!(t.len(), 1);
195 assert!(t.get("fresh").is_some());
196 }
197
198 #[test]
199 fn expire_when_nothing_expires_returns_empty() {
200 let mut t = SlotTable::new();
201 t.insert_or_touch("a", 1, 1000);
202 let dropped = t.expire(1100, 500); assert!(dropped.is_empty());
204 assert_eq!(t.len(), 1);
205 }
206
207 #[test]
208 fn expire_with_overflow_window_saturates_does_not_panic() {
209 let mut t = SlotTable::new();
215 t.insert_or_touch("a", 1, u64::MAX - 10);
216 let dropped = t.expire(u64::MAX - 1, u64::MAX);
217 assert!(dropped.is_empty());
218 assert_eq!(t.len(), 1);
219 }
220
221 #[test]
222 fn min_acked_offset_returns_the_floor() {
223 let mut t = SlotTable::new();
224 t.insert_or_touch("a", 7, 100);
225 t.insert_or_touch("b", 3, 100);
226 t.insert_or_touch("c", 12, 100);
227 assert_eq!(t.min_acked_offset(), Some(3));
228 }
229
230 #[test]
231 fn iter_visits_every_slot() {
232 let mut t = SlotTable::new();
233 t.insert_or_touch("a", 1, 100);
234 t.insert_or_touch("b", 2, 100);
235 let mut ids: Vec<_> = t.iter().map(|s| s.id.clone()).collect();
236 ids.sort();
237 assert_eq!(ids, vec!["a".to_string(), "b".to_string()]);
238 }
239
240 #[test]
241 fn expire_all_when_window_zero() {
242 let mut t = SlotTable::new();
243 t.insert_or_touch("a", 1, 100);
244 t.insert_or_touch("b", 1, 100);
245 let dropped = t.expire(100, 0);
246 assert_eq!(dropped.len(), 2);
247 assert!(t.is_empty());
248 }
249}