1use hashbrown::HashMap;
8use noxu_sync::Mutex;
9use std::time::{Duration, Instant};
10
11pub struct AckTracker {
19 pending_acks: Mutex<HashMap<u64, PendingAck>>,
21 total_acks: Mutex<u64>,
23 total_timeouts: Mutex<u64>,
25}
26
27#[derive(Debug)]
29struct PendingAck {
30 vlsn: u64,
32 needed: u32,
34 received: HashMap<String, Instant>,
36 created: Instant,
38}
39
40impl PendingAck {
41 fn new(vlsn: u64, needed: u32) -> Self {
42 Self { vlsn, needed, received: HashMap::new(), created: Instant::now() }
43 }
44
45 fn is_satisfied(&self) -> bool {
46 self.received.len() as u32 >= self.needed
47 }
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum AckResult {
53 Pending,
55 Satisfied,
57 Unknown,
59 Duplicate,
61}
62
63impl AckTracker {
64 pub fn new() -> Self {
66 Self {
67 pending_acks: Mutex::new(HashMap::new()),
68 total_acks: Mutex::new(0),
69 total_timeouts: Mutex::new(0),
70 }
71 }
72
73 pub fn register(&self, vlsn: u64, needed_acks: u32) {
78 let mut pending = self.pending_acks.lock();
79 pending
80 .entry(vlsn)
81 .or_insert_with(|| PendingAck::new(vlsn, needed_acks));
82 }
83
84 pub fn record_ack(&self, vlsn: u64, replica_name: &str) -> AckResult {
89 let mut pending = self.pending_acks.lock();
90 let ack = match pending.get_mut(&vlsn) {
91 Some(a) => a,
92 None => return AckResult::Unknown,
93 };
94
95 if ack.received.contains_key(replica_name) {
97 return AckResult::Duplicate;
98 }
99
100 ack.received.insert(replica_name.to_string(), Instant::now());
101 *self.total_acks.lock() += 1;
102
103 if ack.is_satisfied() {
104 AckResult::Satisfied
105 } else {
106 AckResult::Pending
107 }
108 }
109
110 pub fn is_satisfied(&self, vlsn: u64) -> bool {
112 let pending = self.pending_acks.lock();
113 match pending.get(&vlsn) {
114 Some(ack) => ack.is_satisfied(),
115 None => false,
116 }
117 }
118
119 pub fn received_count(&self, vlsn: u64) -> Option<u32> {
126 let pending = self.pending_acks.lock();
127 pending.get(&vlsn).map(|ack| ack.received.len() as u32)
128 }
129
130 pub fn cleanup_through(&self, vlsn: u64) {
135 let mut pending = self.pending_acks.lock();
136 pending.retain(|&v, _| v > vlsn);
137 }
138
139 pub fn pending_count(&self) -> usize {
141 self.pending_acks.lock().len()
142 }
143
144 pub fn check_timeouts(&self, timeout: Duration) -> Vec<u64> {
152 let pending = self.pending_acks.lock();
153 let now = Instant::now();
154 let mut timed_out = Vec::new();
155 for ack in pending.values() {
156 if !ack.is_satisfied()
157 && let Some(elapsed) = now.checked_duration_since(ack.created)
158 && elapsed > timeout
159 {
160 timed_out.push(ack.vlsn);
161 *self.total_timeouts.lock() += 1;
162 }
163 }
164 timed_out
165 }
166
167 pub fn get_total_acks(&self) -> u64 {
169 *self.total_acks.lock()
170 }
171
172 pub fn get_total_timeouts(&self) -> u64 {
174 *self.total_timeouts.lock()
175 }
176}
177
178impl Default for AckTracker {
179 fn default() -> Self {
180 Self::new()
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 #[test]
191 fn test_new_tracker() {
192 let tracker = AckTracker::new();
193 assert_eq!(tracker.pending_count(), 0);
194 assert_eq!(tracker.get_total_acks(), 0);
195 assert_eq!(tracker.get_total_timeouts(), 0);
196 }
197
198 #[test]
199 fn test_default_impl() {
200 let tracker = AckTracker::default();
201 assert_eq!(tracker.pending_count(), 0);
202 }
203
204 #[test]
205 fn test_register() {
206 let tracker = AckTracker::new();
207 tracker.register(100, 2);
208 assert_eq!(tracker.pending_count(), 1);
209 assert!(!tracker.is_satisfied(100));
210 }
211
212 #[test]
213 fn test_register_idempotent() {
214 let tracker = AckTracker::new();
215 tracker.register(100, 2);
216 tracker.register(100, 5); assert_eq!(tracker.pending_count(), 1);
218 tracker.record_ack(100, "replica1");
220 tracker.record_ack(100, "replica2");
221 assert!(tracker.is_satisfied(100));
222 }
223
224 #[test]
225 fn test_record_ack_pending() {
226 let tracker = AckTracker::new();
227 tracker.register(100, 2);
228 let result = tracker.record_ack(100, "replica1");
229 assert_eq!(result, AckResult::Pending);
230 assert!(!tracker.is_satisfied(100));
231 assert_eq!(tracker.get_total_acks(), 1);
232 }
233
234 #[test]
235 fn test_record_ack_satisfied() {
236 let tracker = AckTracker::new();
237 tracker.register(100, 2);
238 tracker.record_ack(100, "replica1");
239 let result = tracker.record_ack(100, "replica2");
240 assert_eq!(result, AckResult::Satisfied);
241 assert!(tracker.is_satisfied(100));
242 assert_eq!(tracker.get_total_acks(), 2);
243 }
244
245 #[test]
246 fn test_single_ack_needed() {
247 let tracker = AckTracker::new();
248 tracker.register(100, 1);
249 let result = tracker.record_ack(100, "replica1");
250 assert_eq!(result, AckResult::Satisfied);
251 assert!(tracker.is_satisfied(100));
252 }
253
254 #[test]
255 fn test_record_ack_unknown_vlsn() {
256 let tracker = AckTracker::new();
257 let result = tracker.record_ack(999, "replica1");
258 assert_eq!(result, AckResult::Unknown);
259 assert_eq!(tracker.get_total_acks(), 0);
260 }
261
262 #[test]
263 fn test_record_ack_duplicate() {
264 let tracker = AckTracker::new();
265 tracker.register(100, 2);
266 tracker.record_ack(100, "replica1");
267 let result = tracker.record_ack(100, "replica1");
268 assert_eq!(result, AckResult::Duplicate);
269 assert!(!tracker.is_satisfied(100));
270 assert_eq!(tracker.get_total_acks(), 1);
272 }
273
274 #[test]
275 fn test_is_satisfied_unknown_vlsn() {
276 let tracker = AckTracker::new();
277 assert!(!tracker.is_satisfied(999));
278 }
279
280 #[test]
283 fn test_multiple_vlsns() {
284 let tracker = AckTracker::new();
285 tracker.register(100, 1);
286 tracker.register(101, 2);
287 tracker.register(102, 1);
288 assert_eq!(tracker.pending_count(), 3);
289
290 tracker.record_ack(100, "r1");
291 assert!(tracker.is_satisfied(100));
292 assert!(!tracker.is_satisfied(101));
293
294 tracker.record_ack(101, "r1");
295 assert!(!tracker.is_satisfied(101));
296 tracker.record_ack(101, "r2");
297 assert!(tracker.is_satisfied(101));
298 }
299
300 #[test]
303 fn test_cleanup_through() {
304 let tracker = AckTracker::new();
305 tracker.register(100, 1);
306 tracker.register(101, 1);
307 tracker.register(102, 1);
308 tracker.register(200, 1);
309 assert_eq!(tracker.pending_count(), 4);
310
311 tracker.cleanup_through(102);
312 assert_eq!(tracker.pending_count(), 1);
313 assert_eq!(tracker.record_ack(100, "r1"), AckResult::Unknown);
315 assert_eq!(tracker.record_ack(200, "r1"), AckResult::Satisfied);
316 }
317
318 #[test]
319 fn test_cleanup_through_zero() {
320 let tracker = AckTracker::new();
321 tracker.register(100, 1);
322 tracker.cleanup_through(0);
323 assert_eq!(tracker.pending_count(), 1);
324 }
325
326 #[test]
327 fn test_cleanup_through_all() {
328 let tracker = AckTracker::new();
329 tracker.register(1, 1);
330 tracker.register(2, 1);
331 tracker.cleanup_through(100);
332 assert_eq!(tracker.pending_count(), 0);
333 }
334
335 #[test]
338 fn test_check_timeouts_none() {
339 let tracker = AckTracker::new();
340 tracker.register(100, 1);
341 let timed_out = tracker.check_timeouts(Duration::from_secs(60));
343 assert!(timed_out.is_empty());
344 assert_eq!(tracker.get_total_timeouts(), 0);
345 }
346
347 #[test]
348 fn test_check_timeouts_with_expired() {
349 let tracker = AckTracker::new();
350
351 {
353 let mut pending = tracker.pending_acks.lock();
354 let mut ack = PendingAck::new(50, 1);
355 ack.created = Instant::now() - Duration::from_secs(120);
356 pending.insert(50, ack);
357 }
358
359 let timed_out = tracker.check_timeouts(Duration::from_secs(60));
360 assert_eq!(timed_out.len(), 1);
361 assert_eq!(timed_out[0], 50);
362 assert_eq!(tracker.get_total_timeouts(), 1);
363 }
364
365 #[test]
366 fn test_check_timeouts_skips_satisfied() {
367 let tracker = AckTracker::new();
368
369 {
371 let mut pending = tracker.pending_acks.lock();
372 let mut ack = PendingAck::new(50, 1);
373 ack.created = Instant::now() - Duration::from_secs(120);
374 ack.received.insert("r1".to_string(), Instant::now());
375 pending.insert(50, ack);
376 }
377
378 let timed_out = tracker.check_timeouts(Duration::from_secs(60));
379 assert!(timed_out.is_empty());
380 }
381
382 #[test]
385 fn test_extra_acks_beyond_needed() {
386 let tracker = AckTracker::new();
387 tracker.register(100, 1);
388 assert_eq!(tracker.record_ack(100, "r1"), AckResult::Satisfied);
389 assert_eq!(tracker.record_ack(100, "r2"), AckResult::Satisfied);
391 assert_eq!(tracker.get_total_acks(), 2);
392 }
393
394 #[test]
397 fn test_zero_acks_needed() {
398 let tracker = AckTracker::new();
399 tracker.register(100, 0);
400 assert!(tracker.is_satisfied(100));
402 }
403
404 #[test]
407 fn test_send_sync() {
408 fn assert_send_sync<T: Send + Sync>() {}
409 assert_send_sync::<AckTracker>();
410 }
411}