Skip to main content

noxu_rep/
ack_tracker.rs

1//! Acknowledgment tracking for replication commits.
2//!
3//! Commit acknowledgment tracking for replication.
4//! Tracks transaction commit acknowledgments from replicas to determine when
5//! a transaction's durability requirements have been satisfied.
6
7use hashbrown::HashMap;
8use noxu_sync::{Condvar, Mutex};
9use std::time::{Duration, Instant};
10
11/// Tracks transaction commit acknowledgments from replicas.
12///
13/// When the master commits a transaction, it may need to wait for one or more
14/// replicas to acknowledge receipt before considering the transaction durable.
15/// The `AckTracker` manages pending acknowledgments, recording which replicas
16/// have acked which VLSNs, and detecting when sufficient acks have been
17/// received or when ack timeouts have occurred.
18pub struct AckTracker {
19    /// Maps VLSN to pending ack info.
20    pending_acks: Mutex<HashMap<u64, PendingAck>>,
21    /// Signalled whenever an ack is recorded so that committers blocked in
22    /// `wait_until_satisfied` wake immediately rather than spin-polling
23    /// (JE FeederTxns uses a per-transaction CountDownLatch; this condvar is
24    /// the shared-mutex equivalent over the pending-ack map).
25    ack_signal: Condvar,
26    /// Total acks received across all VLSNs.
27    total_acks: Mutex<u64>,
28    /// Total ack timeouts.
29    total_timeouts: Mutex<u64>,
30}
31
32/// Internal state for a VLSN awaiting acknowledgments.
33#[derive(Debug)]
34struct PendingAck {
35    /// The VLSN being tracked.
36    vlsn: u64,
37    /// Number of acks needed to satisfy durability.
38    needed: u32,
39    /// Map of replica name to the time the ack was received.
40    received: HashMap<String, Instant>,
41    /// When this pending ack was created.
42    created: Instant,
43}
44
45impl PendingAck {
46    fn new(vlsn: u64, needed: u32) -> Self {
47        Self { vlsn, needed, received: HashMap::new(), created: Instant::now() }
48    }
49
50    fn is_satisfied(&self) -> bool {
51        self.received.len() as u32 >= self.needed
52    }
53}
54
55/// Result of recording an acknowledgment.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum AckResult {
58    /// Ack received, but not yet sufficient to satisfy durability.
59    Pending,
60    /// This ack satisfied the durability requirement.
61    Satisfied,
62    /// VLSN not being tracked (already cleaned up or never registered).
63    Unknown,
64    /// Duplicate ack from this replica for this VLSN.
65    Duplicate,
66}
67
68impl AckTracker {
69    /// Create a new ack tracker.
70    pub fn new() -> Self {
71        Self {
72            pending_acks: Mutex::new(HashMap::new()),
73            ack_signal: Condvar::new(),
74            total_acks: Mutex::new(0),
75            total_timeouts: Mutex::new(0),
76        }
77    }
78
79    /// Register a new VLSN that needs acknowledgments.
80    ///
81    /// If the VLSN is already registered, this is a no-op (the existing
82    /// registration is preserved).
83    pub fn register(&self, vlsn: u64, needed_acks: u32) {
84        let mut pending = self.pending_acks.lock();
85        pending
86            .entry(vlsn)
87            .or_insert_with(|| PendingAck::new(vlsn, needed_acks));
88    }
89
90    /// Record an acknowledgment from a replica for a VLSN.
91    ///
92    /// Returns the result indicating whether the ack was accepted and whether
93    /// it satisfied the durability requirement.
94    pub fn record_ack(&self, vlsn: u64, replica_name: &str) -> AckResult {
95        let mut pending = self.pending_acks.lock();
96        let ack = match pending.get_mut(&vlsn) {
97            Some(a) => a,
98            None => return AckResult::Unknown,
99        };
100
101        // Check for duplicate
102        if ack.received.contains_key(replica_name) {
103            return AckResult::Duplicate;
104        }
105
106        ack.received.insert(replica_name.to_string(), Instant::now());
107        let satisfied = ack.is_satisfied();
108        // Drop the borrow before releasing the lock + notifying.
109        drop(pending);
110        *self.total_acks.lock() += 1;
111        // Wake any committer blocked in `wait_until_satisfied`. notify_all is
112        // cheap (a futex bump) and committers re-check their own predicate.
113        self.ack_signal.notify_all();
114
115        if satisfied { AckResult::Satisfied } else { AckResult::Pending }
116    }
117
118    /// Block until `vlsn` has sufficient acks, the `timeout` elapses, or
119    /// `should_abort` returns true (e.g. shutdown). Returns true if the VLSN
120    /// became satisfied, false on timeout/abort. Replaces the prior
121    /// spin-sleep loop — committers park on the condvar and are woken by
122    /// `record_ack` (JE FeederTxns.TxnInfo CountDownLatch.await).
123    pub fn wait_until_satisfied<F: Fn() -> bool>(
124        &self,
125        vlsn: u64,
126        timeout: Duration,
127        should_abort: F,
128    ) -> bool {
129        let deadline = Instant::now() + timeout;
130        let mut guard = self.pending_acks.lock();
131        loop {
132            match guard.get(&vlsn) {
133                // Registration gone (cleaned up) -> treat as satisfied: the
134                // only path that removes it is cleanup after satisfaction.
135                None => return true,
136                Some(ack) if ack.is_satisfied() => return true,
137                _ => {}
138            }
139            if should_abort() {
140                return false;
141            }
142            let now = Instant::now();
143            if now >= deadline {
144                return false;
145            }
146            let res = self.ack_signal.wait_for(&mut guard, deadline - now);
147            if res.timed_out() && Instant::now() >= deadline {
148                // Final predicate re-check before declaring timeout.
149                match guard.get(&vlsn) {
150                    None => return true,
151                    Some(ack) if ack.is_satisfied() => return true,
152                    _ => return false,
153                }
154            }
155        }
156    }
157
158    /// REP-9: park on the ack-signal condvar until `predicate()` returns true,
159    /// `timeout` elapses, or `should_abort()` returns true. Returns true iff
160    /// `predicate` was satisfied. This is the high-water-mark equivalent of
161    /// `wait_until_satisfied` for callers that count acks themselves via the
162    /// per-replica high-water marks (JE
163    /// `FeederManager.getNumCurrentAckFeeders(commitVLSN)` counts feeders with
164    /// `getReplicaTxnEndVLSN() >= commitVLSN`, not an exact-VLSN match).
165    pub fn wait_for_predicate<P, A>(
166        &self,
167        timeout: Duration,
168        predicate: P,
169        should_abort: A,
170    ) -> bool
171    where
172        P: Fn() -> bool,
173        A: Fn() -> bool,
174    {
175        let deadline = Instant::now() + timeout;
176        // The condvar guards the pending-ack map; we only use it as a parking
177        // lot signalled by `record_ack`. Hold the lock across the wait so a
178        // notify cannot be missed between the predicate check and the park.
179        let mut guard = self.pending_acks.lock();
180        loop {
181            if predicate() {
182                return true;
183            }
184            if should_abort() {
185                return false;
186            }
187            let now = Instant::now();
188            if now >= deadline {
189                return predicate();
190            }
191            let _ = self.ack_signal.wait_for(&mut guard, deadline - now);
192        }
193    }
194
195    /// REP-9: wake any committer parked in `wait_for_predicate` /
196    /// `wait_until_satisfied`.  Used by `env.record_ack` after it advances a
197    /// feeder high-water mark, because satisfaction is now decided by the
198    /// per-replica high-water count (not an exact-VLSN registration), so a
199    /// `record_ack` for a VLSN with no exact registration must still wake
200    /// waiters whose `commit_vlsn` predicate has just become true.
201    pub fn notify_waiters(&self) {
202        self.ack_signal.notify_all();
203    }
204
205    /// Check if a VLSN has sufficient acks.
206    pub fn is_satisfied(&self, vlsn: u64) -> bool {
207        let pending = self.pending_acks.lock();
208        match pending.get(&vlsn) {
209            Some(ack) => ack.is_satisfied(),
210            None => false,
211        }
212    }
213
214    /// Number of distinct replica acks recorded for `vlsn`, or `None`
215    /// if no registration exists for that VLSN.
216    ///
217    /// Used by the F1 commit-coordinator path to report the partial
218    /// ack count when a commit times out without satisfying the
219    /// configured `ReplicaAckPolicy`.
220    pub fn received_count(&self, vlsn: u64) -> Option<u32> {
221        let pending = self.pending_acks.lock();
222        pending.get(&vlsn).map(|ack| ack.received.len() as u32)
223    }
224
225    /// Remove all pending acks for VLSNs <= the given value.
226    ///
227    /// This is used to clean up acks for transactions that have been
228    /// durably committed and no longer need tracking.
229    pub fn cleanup_through(&self, vlsn: u64) {
230        let mut pending = self.pending_acks.lock();
231        pending.retain(|&v, _| v > vlsn);
232    }
233
234    /// Get the number of pending (unsatisfied) acks.
235    pub fn pending_count(&self) -> usize {
236        self.pending_acks.lock().len()
237    }
238
239    /// Check for timed-out acks and return their VLSNs.
240    ///
241    /// An ack is considered timed out if it was registered more than
242    /// `timeout` ago and has not yet been satisfied.
243    ///
244    /// **Side effect:** for each unsatisfied, timed-out pending ack found
245    /// during this scan, `total_timeouts` is incremented by one.
246    pub fn check_timeouts(&self, timeout: Duration) -> Vec<u64> {
247        let pending = self.pending_acks.lock();
248        let now = Instant::now();
249        let mut timed_out = Vec::new();
250        for ack in pending.values() {
251            if !ack.is_satisfied()
252                && let Some(elapsed) = now.checked_duration_since(ack.created)
253                && elapsed > timeout
254            {
255                timed_out.push(ack.vlsn);
256                *self.total_timeouts.lock() += 1;
257            }
258        }
259        timed_out
260    }
261
262    /// Get total number of acks received across all VLSNs.
263    pub fn get_total_acks(&self) -> u64 {
264        *self.total_acks.lock()
265    }
266
267    /// Get total number of ack timeouts.
268    pub fn get_total_timeouts(&self) -> u64 {
269        *self.total_timeouts.lock()
270    }
271}
272
273impl Default for AckTracker {
274    fn default() -> Self {
275        Self::new()
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    // --- Basic register/ack flow ---
284
285    #[test]
286    fn test_new_tracker() {
287        let tracker = AckTracker::new();
288        assert_eq!(tracker.pending_count(), 0);
289        assert_eq!(tracker.get_total_acks(), 0);
290        assert_eq!(tracker.get_total_timeouts(), 0);
291    }
292
293    #[test]
294    fn test_default_impl() {
295        let tracker = AckTracker::default();
296        assert_eq!(tracker.pending_count(), 0);
297    }
298
299    #[test]
300    fn test_register() {
301        let tracker = AckTracker::new();
302        tracker.register(100, 2);
303        assert_eq!(tracker.pending_count(), 1);
304        assert!(!tracker.is_satisfied(100));
305    }
306
307    #[test]
308    fn test_register_idempotent() {
309        let tracker = AckTracker::new();
310        tracker.register(100, 2);
311        tracker.register(100, 5); // Should not overwrite
312        assert_eq!(tracker.pending_count(), 1);
313        // Record one ack  -  if needed was overwritten to 5 this wouldn't satisfy with 2
314        tracker.record_ack(100, "replica1");
315        tracker.record_ack(100, "replica2");
316        assert!(tracker.is_satisfied(100));
317    }
318
319    #[test]
320    fn test_record_ack_pending() {
321        let tracker = AckTracker::new();
322        tracker.register(100, 2);
323        let result = tracker.record_ack(100, "replica1");
324        assert_eq!(result, AckResult::Pending);
325        assert!(!tracker.is_satisfied(100));
326        assert_eq!(tracker.get_total_acks(), 1);
327    }
328
329    #[test]
330    fn test_record_ack_satisfied() {
331        let tracker = AckTracker::new();
332        tracker.register(100, 2);
333        tracker.record_ack(100, "replica1");
334        let result = tracker.record_ack(100, "replica2");
335        assert_eq!(result, AckResult::Satisfied);
336        assert!(tracker.is_satisfied(100));
337        assert_eq!(tracker.get_total_acks(), 2);
338    }
339
340    #[test]
341    fn test_single_ack_needed() {
342        let tracker = AckTracker::new();
343        tracker.register(100, 1);
344        let result = tracker.record_ack(100, "replica1");
345        assert_eq!(result, AckResult::Satisfied);
346        assert!(tracker.is_satisfied(100));
347    }
348
349    #[test]
350    fn test_record_ack_unknown_vlsn() {
351        let tracker = AckTracker::new();
352        let result = tracker.record_ack(999, "replica1");
353        assert_eq!(result, AckResult::Unknown);
354        assert_eq!(tracker.get_total_acks(), 0);
355    }
356
357    #[test]
358    fn test_record_ack_duplicate() {
359        let tracker = AckTracker::new();
360        tracker.register(100, 2);
361        tracker.record_ack(100, "replica1");
362        let result = tracker.record_ack(100, "replica1");
363        assert_eq!(result, AckResult::Duplicate);
364        assert!(!tracker.is_satisfied(100));
365        // Duplicate should not increment total
366        assert_eq!(tracker.get_total_acks(), 1);
367    }
368
369    #[test]
370    fn test_is_satisfied_unknown_vlsn() {
371        let tracker = AckTracker::new();
372        assert!(!tracker.is_satisfied(999));
373    }
374
375    // --- Multiple VLSNs ---
376
377    #[test]
378    fn test_multiple_vlsns() {
379        let tracker = AckTracker::new();
380        tracker.register(100, 1);
381        tracker.register(101, 2);
382        tracker.register(102, 1);
383        assert_eq!(tracker.pending_count(), 3);
384
385        tracker.record_ack(100, "r1");
386        assert!(tracker.is_satisfied(100));
387        assert!(!tracker.is_satisfied(101));
388
389        tracker.record_ack(101, "r1");
390        assert!(!tracker.is_satisfied(101));
391        tracker.record_ack(101, "r2");
392        assert!(tracker.is_satisfied(101));
393    }
394
395    // --- Cleanup ---
396
397    #[test]
398    fn test_cleanup_through() {
399        let tracker = AckTracker::new();
400        tracker.register(100, 1);
401        tracker.register(101, 1);
402        tracker.register(102, 1);
403        tracker.register(200, 1);
404        assert_eq!(tracker.pending_count(), 4);
405
406        tracker.cleanup_through(102);
407        assert_eq!(tracker.pending_count(), 1);
408        // Only VLSN 200 should remain
409        assert_eq!(tracker.record_ack(100, "r1"), AckResult::Unknown);
410        assert_eq!(tracker.record_ack(200, "r1"), AckResult::Satisfied);
411    }
412
413    #[test]
414    fn test_cleanup_through_zero() {
415        let tracker = AckTracker::new();
416        tracker.register(100, 1);
417        tracker.cleanup_through(0);
418        assert_eq!(tracker.pending_count(), 1);
419    }
420
421    #[test]
422    fn test_cleanup_through_all() {
423        let tracker = AckTracker::new();
424        tracker.register(1, 1);
425        tracker.register(2, 1);
426        tracker.cleanup_through(100);
427        assert_eq!(tracker.pending_count(), 0);
428    }
429
430    // --- Timeout detection ---
431
432    #[test]
433    fn test_check_timeouts_none() {
434        let tracker = AckTracker::new();
435        tracker.register(100, 1);
436        // Just registered, shouldn't be timed out with generous timeout
437        let timed_out = tracker.check_timeouts(Duration::from_secs(60));
438        assert!(timed_out.is_empty());
439        assert_eq!(tracker.get_total_timeouts(), 0);
440    }
441
442    #[test]
443    fn test_check_timeouts_with_expired() {
444        let tracker = AckTracker::new();
445
446        // Manually insert an old pending ack
447        {
448            let mut pending = tracker.pending_acks.lock();
449            let mut ack = PendingAck::new(50, 1);
450            ack.created = Instant::now() - Duration::from_secs(120);
451            pending.insert(50, ack);
452        }
453
454        let timed_out = tracker.check_timeouts(Duration::from_secs(60));
455        assert_eq!(timed_out.len(), 1);
456        assert_eq!(timed_out[0], 50);
457        assert_eq!(tracker.get_total_timeouts(), 1);
458    }
459
460    #[test]
461    fn test_check_timeouts_skips_satisfied() {
462        let tracker = AckTracker::new();
463
464        // Insert an old but satisfied pending ack
465        {
466            let mut pending = tracker.pending_acks.lock();
467            let mut ack = PendingAck::new(50, 1);
468            ack.created = Instant::now() - Duration::from_secs(120);
469            ack.received.insert("r1".to_string(), Instant::now());
470            pending.insert(50, ack);
471        }
472
473        let timed_out = tracker.check_timeouts(Duration::from_secs(60));
474        assert!(timed_out.is_empty());
475    }
476
477    // --- Extra acks beyond needed ---
478
479    #[test]
480    fn test_extra_acks_beyond_needed() {
481        let tracker = AckTracker::new();
482        tracker.register(100, 1);
483        assert_eq!(tracker.record_ack(100, "r1"), AckResult::Satisfied);
484        // Additional ack from different replica
485        assert_eq!(tracker.record_ack(100, "r2"), AckResult::Satisfied);
486        assert_eq!(tracker.get_total_acks(), 2);
487    }
488
489    // --- Zero acks needed ---
490
491    #[test]
492    fn test_zero_acks_needed() {
493        let tracker = AckTracker::new();
494        tracker.register(100, 0);
495        // Should be immediately satisfied (0 needed, 0 received)
496        assert!(tracker.is_satisfied(100));
497    }
498
499    // --- Send + Sync ---
500
501    #[test]
502    fn test_send_sync() {
503        fn assert_send_sync<T: Send + Sync>() {}
504        assert_send_sync::<AckTracker>();
505    }
506
507    #[test]
508    fn wait_until_satisfied_wakes_on_ack() {
509        use std::sync::Arc;
510        use std::thread;
511        let tracker = Arc::new(AckTracker::new());
512        tracker.register(42, 2);
513        let t2 = Arc::clone(&tracker);
514        // Committer blocks waiting for 2 acks.
515        let waiter = thread::spawn(move || {
516            t2.wait_until_satisfied(42, Duration::from_secs(5), || false)
517        });
518        // Deliver two acks; the waiter must wake and return true well before
519        // the 5s timeout.
520        thread::sleep(Duration::from_millis(20));
521        assert_eq!(tracker.record_ack(42, "r1"), AckResult::Pending);
522        assert_eq!(tracker.record_ack(42, "r2"), AckResult::Satisfied);
523        let start = Instant::now();
524        let ok = waiter.join().unwrap();
525        assert!(ok, "wait_until_satisfied must return true once satisfied");
526        assert!(
527            start.elapsed() < Duration::from_secs(2),
528            "must wake on ack, not spin to timeout"
529        );
530    }
531
532    #[test]
533    fn wait_until_satisfied_times_out_without_enough_acks() {
534        let tracker = AckTracker::new();
535        tracker.register(7, 3);
536        tracker.record_ack(7, "only-one");
537        let ok =
538            tracker
539                .wait_until_satisfied(7, Duration::from_millis(50), || false);
540        assert!(!ok, "must time out when acks are insufficient");
541    }
542}