1use hashbrown::HashMap;
8use noxu_sync::{Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11pub struct AckTracker {
19 pending_acks: Mutex<HashMap<u64, PendingAck>>,
21 ack_signal: Condvar,
26 total_acks: Mutex<u64>,
28 total_timeouts: Mutex<u64>,
30}
31
32#[derive(Debug)]
34struct PendingAck {
35 vlsn: u64,
37 needed: u32,
39 received: HashMap<String, Instant>,
41 created: Instant,
43}
44
45impl PendingAck {
46 fn new(vlsn: u64, needed: u32) -> Self {
47 Self { vlsn, needed, received: HashMap::new(), created: Instant::now() }
48 }
49
50 fn is_satisfied(&self) -> bool {
51 self.received.len() as u32 >= self.needed
52 }
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum AckResult {
58 Pending,
60 Satisfied,
62 Unknown,
64 Duplicate,
66}
67
68impl AckTracker {
69 pub fn new() -> Self {
71 Self {
72 pending_acks: Mutex::new(HashMap::new()),
73 ack_signal: Condvar::new(),
74 total_acks: Mutex::new(0),
75 total_timeouts: Mutex::new(0),
76 }
77 }
78
79 pub fn register(&self, vlsn: u64, needed_acks: u32) {
84 let mut pending = self.pending_acks.lock();
85 pending
86 .entry(vlsn)
87 .or_insert_with(|| PendingAck::new(vlsn, needed_acks));
88 }
89
90 pub fn record_ack(&self, vlsn: u64, replica_name: &str) -> AckResult {
95 let mut pending = self.pending_acks.lock();
96 let ack = match pending.get_mut(&vlsn) {
97 Some(a) => a,
98 None => return AckResult::Unknown,
99 };
100
101 if ack.received.contains_key(replica_name) {
103 return AckResult::Duplicate;
104 }
105
106 ack.received.insert(replica_name.to_string(), Instant::now());
107 let satisfied = ack.is_satisfied();
108 drop(pending);
110 *self.total_acks.lock() += 1;
111 self.ack_signal.notify_all();
114
115 if satisfied { AckResult::Satisfied } else { AckResult::Pending }
116 }
117
118 pub fn wait_until_satisfied<F: Fn() -> bool>(
124 &self,
125 vlsn: u64,
126 timeout: Duration,
127 should_abort: F,
128 ) -> bool {
129 let deadline = Instant::now() + timeout;
130 let mut guard = self.pending_acks.lock();
131 loop {
132 match guard.get(&vlsn) {
133 None => return true,
136 Some(ack) if ack.is_satisfied() => return true,
137 _ => {}
138 }
139 if should_abort() {
140 return false;
141 }
142 let now = Instant::now();
143 if now >= deadline {
144 return false;
145 }
146 let res = self.ack_signal.wait_for(&mut guard, deadline - now);
147 if res.timed_out() && Instant::now() >= deadline {
148 match guard.get(&vlsn) {
150 None => return true,
151 Some(ack) if ack.is_satisfied() => return true,
152 _ => return false,
153 }
154 }
155 }
156 }
157
158 pub fn wait_for_predicate<P, A>(
166 &self,
167 timeout: Duration,
168 predicate: P,
169 should_abort: A,
170 ) -> bool
171 where
172 P: Fn() -> bool,
173 A: Fn() -> bool,
174 {
175 let deadline = Instant::now() + timeout;
176 let mut guard = self.pending_acks.lock();
180 loop {
181 if predicate() {
182 return true;
183 }
184 if should_abort() {
185 return false;
186 }
187 let now = Instant::now();
188 if now >= deadline {
189 return predicate();
190 }
191 let _ = self.ack_signal.wait_for(&mut guard, deadline - now);
192 }
193 }
194
195 pub fn notify_waiters(&self) {
202 self.ack_signal.notify_all();
203 }
204
205 pub fn is_satisfied(&self, vlsn: u64) -> bool {
207 let pending = self.pending_acks.lock();
208 match pending.get(&vlsn) {
209 Some(ack) => ack.is_satisfied(),
210 None => false,
211 }
212 }
213
214 pub fn received_count(&self, vlsn: u64) -> Option<u32> {
221 let pending = self.pending_acks.lock();
222 pending.get(&vlsn).map(|ack| ack.received.len() as u32)
223 }
224
225 pub fn cleanup_through(&self, vlsn: u64) {
230 let mut pending = self.pending_acks.lock();
231 pending.retain(|&v, _| v > vlsn);
232 }
233
234 pub fn pending_count(&self) -> usize {
236 self.pending_acks.lock().len()
237 }
238
239 pub fn check_timeouts(&self, timeout: Duration) -> Vec<u64> {
247 let pending = self.pending_acks.lock();
248 let now = Instant::now();
249 let mut timed_out = Vec::new();
250 for ack in pending.values() {
251 if !ack.is_satisfied()
252 && let Some(elapsed) = now.checked_duration_since(ack.created)
253 && elapsed > timeout
254 {
255 timed_out.push(ack.vlsn);
256 *self.total_timeouts.lock() += 1;
257 }
258 }
259 timed_out
260 }
261
262 pub fn get_total_acks(&self) -> u64 {
264 *self.total_acks.lock()
265 }
266
267 pub fn get_total_timeouts(&self) -> u64 {
269 *self.total_timeouts.lock()
270 }
271}
272
273impl Default for AckTracker {
274 fn default() -> Self {
275 Self::new()
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
286 fn test_new_tracker() {
287 let tracker = AckTracker::new();
288 assert_eq!(tracker.pending_count(), 0);
289 assert_eq!(tracker.get_total_acks(), 0);
290 assert_eq!(tracker.get_total_timeouts(), 0);
291 }
292
293 #[test]
294 fn test_default_impl() {
295 let tracker = AckTracker::default();
296 assert_eq!(tracker.pending_count(), 0);
297 }
298
299 #[test]
300 fn test_register() {
301 let tracker = AckTracker::new();
302 tracker.register(100, 2);
303 assert_eq!(tracker.pending_count(), 1);
304 assert!(!tracker.is_satisfied(100));
305 }
306
307 #[test]
308 fn test_register_idempotent() {
309 let tracker = AckTracker::new();
310 tracker.register(100, 2);
311 tracker.register(100, 5); assert_eq!(tracker.pending_count(), 1);
313 tracker.record_ack(100, "replica1");
315 tracker.record_ack(100, "replica2");
316 assert!(tracker.is_satisfied(100));
317 }
318
319 #[test]
320 fn test_record_ack_pending() {
321 let tracker = AckTracker::new();
322 tracker.register(100, 2);
323 let result = tracker.record_ack(100, "replica1");
324 assert_eq!(result, AckResult::Pending);
325 assert!(!tracker.is_satisfied(100));
326 assert_eq!(tracker.get_total_acks(), 1);
327 }
328
329 #[test]
330 fn test_record_ack_satisfied() {
331 let tracker = AckTracker::new();
332 tracker.register(100, 2);
333 tracker.record_ack(100, "replica1");
334 let result = tracker.record_ack(100, "replica2");
335 assert_eq!(result, AckResult::Satisfied);
336 assert!(tracker.is_satisfied(100));
337 assert_eq!(tracker.get_total_acks(), 2);
338 }
339
340 #[test]
341 fn test_single_ack_needed() {
342 let tracker = AckTracker::new();
343 tracker.register(100, 1);
344 let result = tracker.record_ack(100, "replica1");
345 assert_eq!(result, AckResult::Satisfied);
346 assert!(tracker.is_satisfied(100));
347 }
348
349 #[test]
350 fn test_record_ack_unknown_vlsn() {
351 let tracker = AckTracker::new();
352 let result = tracker.record_ack(999, "replica1");
353 assert_eq!(result, AckResult::Unknown);
354 assert_eq!(tracker.get_total_acks(), 0);
355 }
356
357 #[test]
358 fn test_record_ack_duplicate() {
359 let tracker = AckTracker::new();
360 tracker.register(100, 2);
361 tracker.record_ack(100, "replica1");
362 let result = tracker.record_ack(100, "replica1");
363 assert_eq!(result, AckResult::Duplicate);
364 assert!(!tracker.is_satisfied(100));
365 assert_eq!(tracker.get_total_acks(), 1);
367 }
368
369 #[test]
370 fn test_is_satisfied_unknown_vlsn() {
371 let tracker = AckTracker::new();
372 assert!(!tracker.is_satisfied(999));
373 }
374
375 #[test]
378 fn test_multiple_vlsns() {
379 let tracker = AckTracker::new();
380 tracker.register(100, 1);
381 tracker.register(101, 2);
382 tracker.register(102, 1);
383 assert_eq!(tracker.pending_count(), 3);
384
385 tracker.record_ack(100, "r1");
386 assert!(tracker.is_satisfied(100));
387 assert!(!tracker.is_satisfied(101));
388
389 tracker.record_ack(101, "r1");
390 assert!(!tracker.is_satisfied(101));
391 tracker.record_ack(101, "r2");
392 assert!(tracker.is_satisfied(101));
393 }
394
395 #[test]
398 fn test_cleanup_through() {
399 let tracker = AckTracker::new();
400 tracker.register(100, 1);
401 tracker.register(101, 1);
402 tracker.register(102, 1);
403 tracker.register(200, 1);
404 assert_eq!(tracker.pending_count(), 4);
405
406 tracker.cleanup_through(102);
407 assert_eq!(tracker.pending_count(), 1);
408 assert_eq!(tracker.record_ack(100, "r1"), AckResult::Unknown);
410 assert_eq!(tracker.record_ack(200, "r1"), AckResult::Satisfied);
411 }
412
413 #[test]
414 fn test_cleanup_through_zero() {
415 let tracker = AckTracker::new();
416 tracker.register(100, 1);
417 tracker.cleanup_through(0);
418 assert_eq!(tracker.pending_count(), 1);
419 }
420
421 #[test]
422 fn test_cleanup_through_all() {
423 let tracker = AckTracker::new();
424 tracker.register(1, 1);
425 tracker.register(2, 1);
426 tracker.cleanup_through(100);
427 assert_eq!(tracker.pending_count(), 0);
428 }
429
430 #[test]
433 fn test_check_timeouts_none() {
434 let tracker = AckTracker::new();
435 tracker.register(100, 1);
436 let timed_out = tracker.check_timeouts(Duration::from_secs(60));
438 assert!(timed_out.is_empty());
439 assert_eq!(tracker.get_total_timeouts(), 0);
440 }
441
442 #[test]
443 fn test_check_timeouts_with_expired() {
444 let tracker = AckTracker::new();
445
446 {
448 let mut pending = tracker.pending_acks.lock();
449 let mut ack = PendingAck::new(50, 1);
450 ack.created = Instant::now() - Duration::from_secs(120);
451 pending.insert(50, ack);
452 }
453
454 let timed_out = tracker.check_timeouts(Duration::from_secs(60));
455 assert_eq!(timed_out.len(), 1);
456 assert_eq!(timed_out[0], 50);
457 assert_eq!(tracker.get_total_timeouts(), 1);
458 }
459
460 #[test]
461 fn test_check_timeouts_skips_satisfied() {
462 let tracker = AckTracker::new();
463
464 {
466 let mut pending = tracker.pending_acks.lock();
467 let mut ack = PendingAck::new(50, 1);
468 ack.created = Instant::now() - Duration::from_secs(120);
469 ack.received.insert("r1".to_string(), Instant::now());
470 pending.insert(50, ack);
471 }
472
473 let timed_out = tracker.check_timeouts(Duration::from_secs(60));
474 assert!(timed_out.is_empty());
475 }
476
477 #[test]
480 fn test_extra_acks_beyond_needed() {
481 let tracker = AckTracker::new();
482 tracker.register(100, 1);
483 assert_eq!(tracker.record_ack(100, "r1"), AckResult::Satisfied);
484 assert_eq!(tracker.record_ack(100, "r2"), AckResult::Satisfied);
486 assert_eq!(tracker.get_total_acks(), 2);
487 }
488
489 #[test]
492 fn test_zero_acks_needed() {
493 let tracker = AckTracker::new();
494 tracker.register(100, 0);
495 assert!(tracker.is_satisfied(100));
497 }
498
499 #[test]
502 fn test_send_sync() {
503 fn assert_send_sync<T: Send + Sync>() {}
504 assert_send_sync::<AckTracker>();
505 }
506
507 #[test]
508 fn wait_until_satisfied_wakes_on_ack() {
509 use std::sync::Arc;
510 use std::thread;
511 let tracker = Arc::new(AckTracker::new());
512 tracker.register(42, 2);
513 let t2 = Arc::clone(&tracker);
514 let waiter = thread::spawn(move || {
516 t2.wait_until_satisfied(42, Duration::from_secs(5), || false)
517 });
518 thread::sleep(Duration::from_millis(20));
521 assert_eq!(tracker.record_ack(42, "r1"), AckResult::Pending);
522 assert_eq!(tracker.record_ack(42, "r2"), AckResult::Satisfied);
523 let start = Instant::now();
524 let ok = waiter.join().unwrap();
525 assert!(ok, "wait_until_satisfied must return true once satisfied");
526 assert!(
527 start.elapsed() < Duration::from_secs(2),
528 "must wake on ack, not spin to timeout"
529 );
530 }
531
532 #[test]
533 fn wait_until_satisfied_times_out_without_enough_acks() {
534 let tracker = AckTracker::new();
535 tracker.register(7, 3);
536 tracker.record_ack(7, "only-one");
537 let ok =
538 tracker
539 .wait_until_satisfied(7, Duration::from_millis(50), || false);
540 assert!(!ok, "must time out when acks are insufficient");
541 }
542}