Skip to main content

nodedb_array/sync/
ack.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Per-replica acknowledgement watermarks for GC frontier tracking.
4//!
5//! [`AckVector`] tracks the highest [`Hlc`] that each peer has confirmed
6//! applying. The minimum across all tracked replicas is the GC frontier:
7//! ops below it have been applied everywhere and are safe to collapse into a
8//! snapshot and prune from the log.
9
10use std::collections::HashMap;
11
12use serde::{Deserialize, Serialize};
13
14use crate::sync::hlc::Hlc;
15use crate::sync::replica_id::ReplicaId;
16
17/// Per-replica ack watermark table.
18///
19/// Records grow monotonically — [`AckVector::record`] never moves a watermark
20/// backward. [`AckVector::min_ack_hlc`] returns `None` when no peers are known
21/// (cannot safely GC without knowing all peers have caught up).
22#[derive(
23    Clone, Debug, Default, Serialize, Deserialize, zerompk::ToMessagePack, zerompk::FromMessagePack,
24)]
25pub struct AckVector {
26    acks: HashMap<ReplicaId, Hlc>,
27}
28
29impl AckVector {
30    /// Create an empty [`AckVector`].
31    pub fn new() -> Self {
32        Self::default()
33    }
34
35    /// Record that `replica` has applied all ops up to and including `hlc`.
36    ///
37    /// Monotonic: if the existing watermark for `replica` is already higher,
38    /// the call is a silent no-op.
39    pub fn record(&mut self, replica: ReplicaId, hlc: Hlc) {
40        let entry = self.acks.entry(replica).or_insert(Hlc::ZERO);
41        if hlc > *entry {
42            *entry = hlc;
43        }
44    }
45
46    /// Return the ack watermark for a specific replica, if known.
47    pub fn ack_for(&self, replica: ReplicaId) -> Option<Hlc> {
48        self.acks.get(&replica).copied()
49    }
50
51    /// Return the minimum ack HLC across all tracked replicas.
52    ///
53    /// Returns `None` if no replicas are tracked (GC must not proceed without
54    /// knowing every peer's progress).
55    pub fn min_ack_hlc(&self) -> Option<Hlc> {
56        self.acks.values().copied().min()
57    }
58
59    /// Iterate over all replica IDs currently tracked.
60    pub fn replicas(&self) -> impl Iterator<Item = ReplicaId> + '_ {
61        self.acks.keys().copied()
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use crate::sync::hlc::Hlc;
69    use crate::sync::replica_id::ReplicaId;
70
71    fn hlc(ms: u64) -> Hlc {
72        Hlc::new(ms, 0, ReplicaId::new(0)).unwrap()
73    }
74
75    fn r(id: u64) -> ReplicaId {
76        ReplicaId::new(id)
77    }
78
79    #[test]
80    fn record_is_monotonic() {
81        let mut av = AckVector::new();
82        av.record(r(1), hlc(100));
83        av.record(r(1), hlc(50)); // lower — must be ignored
84        assert_eq!(av.ack_for(r(1)), Some(hlc(100)));
85
86        av.record(r(1), hlc(200)); // higher — must advance
87        assert_eq!(av.ack_for(r(1)), Some(hlc(200)));
88    }
89
90    #[test]
91    fn min_returns_lowest() {
92        let mut av = AckVector::new();
93        av.record(r(1), hlc(300));
94        av.record(r(2), hlc(100));
95        av.record(r(3), hlc(200));
96        assert_eq!(av.min_ack_hlc(), Some(hlc(100)));
97    }
98
99    #[test]
100    fn empty_returns_none() {
101        let av = AckVector::new();
102        assert_eq!(av.min_ack_hlc(), None);
103    }
104
105    #[test]
106    fn single_peer() {
107        let mut av = AckVector::new();
108        av.record(r(7), hlc(999));
109        assert_eq!(av.min_ack_hlc(), Some(hlc(999)));
110        assert_eq!(av.ack_for(r(7)), Some(hlc(999)));
111        assert_eq!(av.ack_for(r(8)), None);
112    }
113
114    #[test]
115    fn serialize_roundtrip() {
116        let mut av = AckVector::new();
117        av.record(r(1), hlc(10));
118        av.record(r(2), hlc(20));
119        let bytes = zerompk::to_msgpack_vec(&av).expect("serialize");
120        let back: AckVector = zerompk::from_msgpack(&bytes).expect("deserialize");
121        assert_eq!(av.min_ack_hlc(), back.min_ack_hlc());
122        assert_eq!(av.ack_for(r(1)), back.ack_for(r(1)));
123        assert_eq!(av.ack_for(r(2)), back.ack_for(r(2)));
124    }
125}