Skip to main content

noxu_rep/
ack_tracker.rs

1//! Acknowledgment tracking for replication commits.
2//!
3//! Commit acknowledgment tracking for replication.
4//! Tracks transaction commit acknowledgments from replicas to determine when
5//! a transaction's durability requirements have been satisfied.
6
7use hashbrown::HashMap;
8use noxu_sync::Mutex;
9use std::time::{Duration, Instant};
10
11/// Tracks transaction commit acknowledgments from replicas.
12///
13/// When the master commits a transaction, it may need to wait for one or more
14/// replicas to acknowledge receipt before considering the transaction durable.
15/// The `AckTracker` manages pending acknowledgments, recording which replicas
16/// have acked which VLSNs, and detecting when sufficient acks have been
17/// received or when ack timeouts have occurred.
18pub struct AckTracker {
19    /// Maps VLSN to pending ack info.
20    pending_acks: Mutex<HashMap<u64, PendingAck>>,
21    /// Total acks received across all VLSNs.
22    total_acks: Mutex<u64>,
23    /// Total ack timeouts.
24    total_timeouts: Mutex<u64>,
25}
26
27/// Internal state for a VLSN awaiting acknowledgments.
28#[derive(Debug)]
29struct PendingAck {
30    /// The VLSN being tracked.
31    vlsn: u64,
32    /// Number of acks needed to satisfy durability.
33    needed: u32,
34    /// Map of replica name to the time the ack was received.
35    received: HashMap<String, Instant>,
36    /// When this pending ack was created.
37    created: Instant,
38}
39
40impl PendingAck {
41    fn new(vlsn: u64, needed: u32) -> Self {
42        Self { vlsn, needed, received: HashMap::new(), created: Instant::now() }
43    }
44
45    fn is_satisfied(&self) -> bool {
46        self.received.len() as u32 >= self.needed
47    }
48}
49
50/// Result of recording an acknowledgment.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum AckResult {
53    /// Ack received, but not yet sufficient to satisfy durability.
54    Pending,
55    /// This ack satisfied the durability requirement.
56    Satisfied,
57    /// VLSN not being tracked (already cleaned up or never registered).
58    Unknown,
59    /// Duplicate ack from this replica for this VLSN.
60    Duplicate,
61}
62
63impl AckTracker {
64    /// Create a new ack tracker.
65    pub fn new() -> Self {
66        Self {
67            pending_acks: Mutex::new(HashMap::new()),
68            total_acks: Mutex::new(0),
69            total_timeouts: Mutex::new(0),
70        }
71    }
72
73    /// Register a new VLSN that needs acknowledgments.
74    ///
75    /// If the VLSN is already registered, this is a no-op (the existing
76    /// registration is preserved).
77    pub fn register(&self, vlsn: u64, needed_acks: u32) {
78        let mut pending = self.pending_acks.lock();
79        pending
80            .entry(vlsn)
81            .or_insert_with(|| PendingAck::new(vlsn, needed_acks));
82    }
83
84    /// Record an acknowledgment from a replica for a VLSN.
85    ///
86    /// Returns the result indicating whether the ack was accepted and whether
87    /// it satisfied the durability requirement.
88    pub fn record_ack(&self, vlsn: u64, replica_name: &str) -> AckResult {
89        let mut pending = self.pending_acks.lock();
90        let ack = match pending.get_mut(&vlsn) {
91            Some(a) => a,
92            None => return AckResult::Unknown,
93        };
94
95        // Check for duplicate
96        if ack.received.contains_key(replica_name) {
97            return AckResult::Duplicate;
98        }
99
100        ack.received.insert(replica_name.to_string(), Instant::now());
101        *self.total_acks.lock() += 1;
102
103        if ack.is_satisfied() {
104            AckResult::Satisfied
105        } else {
106            AckResult::Pending
107        }
108    }
109
110    /// Check if a VLSN has sufficient acks.
111    pub fn is_satisfied(&self, vlsn: u64) -> bool {
112        let pending = self.pending_acks.lock();
113        match pending.get(&vlsn) {
114            Some(ack) => ack.is_satisfied(),
115            None => false,
116        }
117    }
118
119    /// Number of distinct replica acks recorded for `vlsn`, or `None`
120    /// if no registration exists for that VLSN.
121    ///
122    /// Used by the F1 commit-coordinator path to report the partial
123    /// ack count when a commit times out without satisfying the
124    /// configured `ReplicaAckPolicy`.
125    pub fn received_count(&self, vlsn: u64) -> Option<u32> {
126        let pending = self.pending_acks.lock();
127        pending.get(&vlsn).map(|ack| ack.received.len() as u32)
128    }
129
130    /// Remove all pending acks for VLSNs <= the given value.
131    ///
132    /// This is used to clean up acks for transactions that have been
133    /// durably committed and no longer need tracking.
134    pub fn cleanup_through(&self, vlsn: u64) {
135        let mut pending = self.pending_acks.lock();
136        pending.retain(|&v, _| v > vlsn);
137    }
138
139    /// Get the number of pending (unsatisfied) acks.
140    pub fn pending_count(&self) -> usize {
141        self.pending_acks.lock().len()
142    }
143
144    /// Check for timed-out acks and return their VLSNs.
145    ///
146    /// An ack is considered timed out if it was registered more than
147    /// `timeout` ago and has not yet been satisfied.
148    ///
149    /// **Side effect:** for each unsatisfied, timed-out pending ack found
150    /// during this scan, `total_timeouts` is incremented by one.
151    pub fn check_timeouts(&self, timeout: Duration) -> Vec<u64> {
152        let pending = self.pending_acks.lock();
153        let now = Instant::now();
154        let mut timed_out = Vec::new();
155        for ack in pending.values() {
156            if !ack.is_satisfied()
157                && let Some(elapsed) = now.checked_duration_since(ack.created)
158                && elapsed > timeout
159            {
160                timed_out.push(ack.vlsn);
161                *self.total_timeouts.lock() += 1;
162            }
163        }
164        timed_out
165    }
166
167    /// Get total number of acks received across all VLSNs.
168    pub fn get_total_acks(&self) -> u64 {
169        *self.total_acks.lock()
170    }
171
172    /// Get total number of ack timeouts.
173    pub fn get_total_timeouts(&self) -> u64 {
174        *self.total_timeouts.lock()
175    }
176}
177
178impl Default for AckTracker {
179    fn default() -> Self {
180        Self::new()
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    // --- Basic register/ack flow ---
189
190    #[test]
191    fn test_new_tracker() {
192        let tracker = AckTracker::new();
193        assert_eq!(tracker.pending_count(), 0);
194        assert_eq!(tracker.get_total_acks(), 0);
195        assert_eq!(tracker.get_total_timeouts(), 0);
196    }
197
198    #[test]
199    fn test_default_impl() {
200        let tracker = AckTracker::default();
201        assert_eq!(tracker.pending_count(), 0);
202    }
203
204    #[test]
205    fn test_register() {
206        let tracker = AckTracker::new();
207        tracker.register(100, 2);
208        assert_eq!(tracker.pending_count(), 1);
209        assert!(!tracker.is_satisfied(100));
210    }
211
212    #[test]
213    fn test_register_idempotent() {
214        let tracker = AckTracker::new();
215        tracker.register(100, 2);
216        tracker.register(100, 5); // Should not overwrite
217        assert_eq!(tracker.pending_count(), 1);
218        // Record one ack  -  if needed was overwritten to 5 this wouldn't satisfy with 2
219        tracker.record_ack(100, "replica1");
220        tracker.record_ack(100, "replica2");
221        assert!(tracker.is_satisfied(100));
222    }
223
224    #[test]
225    fn test_record_ack_pending() {
226        let tracker = AckTracker::new();
227        tracker.register(100, 2);
228        let result = tracker.record_ack(100, "replica1");
229        assert_eq!(result, AckResult::Pending);
230        assert!(!tracker.is_satisfied(100));
231        assert_eq!(tracker.get_total_acks(), 1);
232    }
233
234    #[test]
235    fn test_record_ack_satisfied() {
236        let tracker = AckTracker::new();
237        tracker.register(100, 2);
238        tracker.record_ack(100, "replica1");
239        let result = tracker.record_ack(100, "replica2");
240        assert_eq!(result, AckResult::Satisfied);
241        assert!(tracker.is_satisfied(100));
242        assert_eq!(tracker.get_total_acks(), 2);
243    }
244
245    #[test]
246    fn test_single_ack_needed() {
247        let tracker = AckTracker::new();
248        tracker.register(100, 1);
249        let result = tracker.record_ack(100, "replica1");
250        assert_eq!(result, AckResult::Satisfied);
251        assert!(tracker.is_satisfied(100));
252    }
253
254    #[test]
255    fn test_record_ack_unknown_vlsn() {
256        let tracker = AckTracker::new();
257        let result = tracker.record_ack(999, "replica1");
258        assert_eq!(result, AckResult::Unknown);
259        assert_eq!(tracker.get_total_acks(), 0);
260    }
261
262    #[test]
263    fn test_record_ack_duplicate() {
264        let tracker = AckTracker::new();
265        tracker.register(100, 2);
266        tracker.record_ack(100, "replica1");
267        let result = tracker.record_ack(100, "replica1");
268        assert_eq!(result, AckResult::Duplicate);
269        assert!(!tracker.is_satisfied(100));
270        // Duplicate should not increment total
271        assert_eq!(tracker.get_total_acks(), 1);
272    }
273
274    #[test]
275    fn test_is_satisfied_unknown_vlsn() {
276        let tracker = AckTracker::new();
277        assert!(!tracker.is_satisfied(999));
278    }
279
280    // --- Multiple VLSNs ---
281
282    #[test]
283    fn test_multiple_vlsns() {
284        let tracker = AckTracker::new();
285        tracker.register(100, 1);
286        tracker.register(101, 2);
287        tracker.register(102, 1);
288        assert_eq!(tracker.pending_count(), 3);
289
290        tracker.record_ack(100, "r1");
291        assert!(tracker.is_satisfied(100));
292        assert!(!tracker.is_satisfied(101));
293
294        tracker.record_ack(101, "r1");
295        assert!(!tracker.is_satisfied(101));
296        tracker.record_ack(101, "r2");
297        assert!(tracker.is_satisfied(101));
298    }
299
300    // --- Cleanup ---
301
302    #[test]
303    fn test_cleanup_through() {
304        let tracker = AckTracker::new();
305        tracker.register(100, 1);
306        tracker.register(101, 1);
307        tracker.register(102, 1);
308        tracker.register(200, 1);
309        assert_eq!(tracker.pending_count(), 4);
310
311        tracker.cleanup_through(102);
312        assert_eq!(tracker.pending_count(), 1);
313        // Only VLSN 200 should remain
314        assert_eq!(tracker.record_ack(100, "r1"), AckResult::Unknown);
315        assert_eq!(tracker.record_ack(200, "r1"), AckResult::Satisfied);
316    }
317
318    #[test]
319    fn test_cleanup_through_zero() {
320        let tracker = AckTracker::new();
321        tracker.register(100, 1);
322        tracker.cleanup_through(0);
323        assert_eq!(tracker.pending_count(), 1);
324    }
325
326    #[test]
327    fn test_cleanup_through_all() {
328        let tracker = AckTracker::new();
329        tracker.register(1, 1);
330        tracker.register(2, 1);
331        tracker.cleanup_through(100);
332        assert_eq!(tracker.pending_count(), 0);
333    }
334
335    // --- Timeout detection ---
336
337    #[test]
338    fn test_check_timeouts_none() {
339        let tracker = AckTracker::new();
340        tracker.register(100, 1);
341        // Just registered, shouldn't be timed out with generous timeout
342        let timed_out = tracker.check_timeouts(Duration::from_secs(60));
343        assert!(timed_out.is_empty());
344        assert_eq!(tracker.get_total_timeouts(), 0);
345    }
346
347    #[test]
348    fn test_check_timeouts_with_expired() {
349        let tracker = AckTracker::new();
350
351        // Manually insert an old pending ack
352        {
353            let mut pending = tracker.pending_acks.lock();
354            let mut ack = PendingAck::new(50, 1);
355            ack.created = Instant::now() - Duration::from_secs(120);
356            pending.insert(50, ack);
357        }
358
359        let timed_out = tracker.check_timeouts(Duration::from_secs(60));
360        assert_eq!(timed_out.len(), 1);
361        assert_eq!(timed_out[0], 50);
362        assert_eq!(tracker.get_total_timeouts(), 1);
363    }
364
365    #[test]
366    fn test_check_timeouts_skips_satisfied() {
367        let tracker = AckTracker::new();
368
369        // Insert an old but satisfied pending ack
370        {
371            let mut pending = tracker.pending_acks.lock();
372            let mut ack = PendingAck::new(50, 1);
373            ack.created = Instant::now() - Duration::from_secs(120);
374            ack.received.insert("r1".to_string(), Instant::now());
375            pending.insert(50, ack);
376        }
377
378        let timed_out = tracker.check_timeouts(Duration::from_secs(60));
379        assert!(timed_out.is_empty());
380    }
381
382    // --- Extra acks beyond needed ---
383
384    #[test]
385    fn test_extra_acks_beyond_needed() {
386        let tracker = AckTracker::new();
387        tracker.register(100, 1);
388        assert_eq!(tracker.record_ack(100, "r1"), AckResult::Satisfied);
389        // Additional ack from different replica
390        assert_eq!(tracker.record_ack(100, "r2"), AckResult::Satisfied);
391        assert_eq!(tracker.get_total_acks(), 2);
392    }
393
394    // --- Zero acks needed ---
395
396    #[test]
397    fn test_zero_acks_needed() {
398        let tracker = AckTracker::new();
399        tracker.register(100, 0);
400        // Should be immediately satisfied (0 needed, 0 received)
401        assert!(tracker.is_satisfied(100));
402    }
403
404    // --- Send + Sync ---
405
406    #[test]
407    fn test_send_sync() {
408        fn assert_send_sync<T: Send + Sync>() {}
409        assert_send_sync::<AckTracker>();
410    }
411}