Skip to main content

nodedb_cluster/
closed_timestamp.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-group closed-timestamp tracker with HLC skew bounding.
4//!
5//! Every time a Raft group applies a committed entry, the applier
6//! records the wall-clock instant as that group's "closed timestamp".
7//! A follower whose closed timestamp for a group is within the
8//! caller's staleness bound can serve reads locally — no gateway hop
9//! to the leader.
10//!
11//! ## HLC integration
12//!
13//! The tracker also owns the node-wide [`HlcClock`]. When an apply
14//! path knows the leader-stamped `Hlc` for the entry it is applying,
15//! it should call [`ClosedTimestampTracker::fold_remote_hlc`] instead
16//! of [`ClosedTimestampTracker::mark_applied`]. Folding the remote
17//! HLC into the local clock bounds cross-node `_ts_system` skew at
18//! this node: any subsequent local stamp is strictly greater than
19//! every observed remote HLC, so versions written here can never
20//! collide with — or appear earlier than — versions a leader has
21//! already replicated.
22//!
23//! Apply-side wiring is intentionally optional. Code paths that don't
24//! yet thread the leader's HLC keep using `mark_applied` and only
25//! lose the cross-node skew bound; correctness of the local
26//! `_ts_system` stamp is unaffected because [`HlcClock::now`] already
27//! advances past the local wall clock.
28
29use std::collections::HashMap;
30use std::sync::{Arc, RwLock};
31use std::time::{Duration, Instant};
32
33use nodedb_types::{Hlc, HlcClock};
34
35/// Tracks the most recent apply instant per Raft group plus the
36/// shared node-wide HLC.
37pub struct ClosedTimestampTracker {
38    groups: RwLock<HashMap<u64, Instant>>,
39    hlc: Arc<HlcClock>,
40}
41
42impl ClosedTimestampTracker {
43    /// Construct a tracker with a fresh, node-private HLC. Tests and
44    /// stand-alone follower-read setups use this; production paths
45    /// should call [`Self::with_hlc`] to share the node-wide clock.
46    pub fn new() -> Self {
47        Self {
48            groups: RwLock::new(HashMap::new()),
49            hlc: Arc::new(HlcClock::new()),
50        }
51    }
52
53    /// Construct a tracker wired to a caller-supplied HLC. Use this
54    /// in production so the tracker's `fold_remote_hlc` advances the
55    /// same clock that other subsystems read via `now()`.
56    pub fn with_hlc(hlc: Arc<HlcClock>) -> Self {
57        Self {
58            groups: RwLock::new(HashMap::new()),
59            hlc,
60        }
61    }
62
63    /// Read access to the shared HLC. Other apply-side subsystems
64    /// (descriptor leases, metadata cache) advance and read it
65    /// through this handle.
66    pub fn hlc(&self) -> &Arc<HlcClock> {
67        &self.hlc
68    }
69
70    /// Record that `group_id` just applied one or more entries.
71    /// Called by the raft-loop applier after each apply batch.
72    pub fn mark_applied(&self, group_id: u64) {
73        let mut g = self.groups.write().unwrap_or_else(|p| p.into_inner());
74        g.insert(group_id, Instant::now());
75    }
76
77    /// Record that `group_id` just applied, using a caller-supplied
78    /// instant. Exposed for deterministic testing with paused time.
79    pub fn mark_applied_at(&self, group_id: u64, at: Instant) {
80        let mut g = self.groups.write().unwrap_or_else(|p| p.into_inner());
81        g.insert(group_id, at);
82    }
83
84    /// Mark a group applied AND fold the leader-stamped `remote` HLC
85    /// into the local clock. Returns the merged HLC that any local
86    /// stamp emitted after this call is guaranteed to be strictly
87    /// greater than.
88    ///
89    /// This is the production apply-path entry point: every committed
90    /// entry that carries a leader HLC (descriptor leases, catalog
91    /// DDL, drain events) should route through here so cross-node
92    /// `_ts_system` skew is bounded at this node.
93    pub fn fold_remote_hlc(&self, group_id: u64, remote: Hlc) -> Hlc {
94        self.mark_applied(group_id);
95        self.hlc.update(remote)
96    }
97
98    /// Check whether this node's replica of `group_id` has applied
99    /// recently enough that a read with `max_staleness` can be
100    /// served locally.
101    ///
102    /// Returns `false` if the group has never applied on this node
103    /// (no closed timestamp recorded).
104    pub fn is_fresh_enough(&self, group_id: u64, max_staleness: Duration) -> bool {
105        let g = self.groups.read().unwrap_or_else(|p| p.into_inner());
106        match g.get(&group_id) {
107            Some(last) => last.elapsed() <= max_staleness,
108            None => false,
109        }
110    }
111
112    /// Return the age of the closed timestamp for a group, or `None`
113    /// if the group has never applied on this node. Useful for
114    /// observability (metrics, SHOW commands).
115    pub fn staleness(&self, group_id: u64) -> Option<Duration> {
116        let g = self.groups.read().unwrap_or_else(|p| p.into_inner());
117        g.get(&group_id).map(|last| last.elapsed())
118    }
119}
120
121impl Default for ClosedTimestampTracker {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn unknown_group_is_not_fresh() {
133        let tracker = ClosedTimestampTracker::new();
134        assert!(!tracker.is_fresh_enough(99, Duration::from_secs(10)));
135    }
136
137    #[test]
138    fn recently_applied_is_fresh() {
139        let tracker = ClosedTimestampTracker::new();
140        tracker.mark_applied(1);
141        assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
142    }
143
144    #[test]
145    fn stale_group_is_not_fresh() {
146        let tracker = ClosedTimestampTracker::new();
147        let old = Instant::now() - Duration::from_secs(30);
148        tracker.mark_applied_at(1, old);
149        assert!(!tracker.is_fresh_enough(1, Duration::from_secs(5)));
150    }
151
152    #[test]
153    fn staleness_returns_none_for_unknown() {
154        let tracker = ClosedTimestampTracker::new();
155        assert!(tracker.staleness(42).is_none());
156    }
157
158    #[test]
159    fn staleness_returns_age_for_known() {
160        let tracker = ClosedTimestampTracker::new();
161        tracker.mark_applied(1);
162        let s = tracker.staleness(1).unwrap();
163        assert!(s < Duration::from_millis(100));
164    }
165
166    #[test]
167    fn mark_applied_updates_monotonically() {
168        let tracker = ClosedTimestampTracker::new();
169        let old = Instant::now() - Duration::from_secs(10);
170        tracker.mark_applied_at(1, old);
171        assert!(!tracker.is_fresh_enough(1, Duration::from_secs(5)));
172        tracker.mark_applied(1);
173        assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
174    }
175
176    #[test]
177    fn fold_remote_hlc_bounds_cross_node_skew() {
178        // Local clock is fresh — its first `now()` will sit near
179        // current wall-clock. A leader far in the future stamps an
180        // entry; folding it MUST advance the local clock past it so
181        // any subsequent local stamp can never collide with or
182        // precede the leader's observation.
183        let tracker = ClosedTimestampTracker::new();
184        let local_before = tracker.hlc().now();
185        let remote = Hlc::new(local_before.wall_ns + 60_000_000_000, 7); // +60s
186        let merged = tracker.fold_remote_hlc(1, remote);
187
188        assert!(merged > remote, "merged HLC strictly greater than remote");
189        assert!(
190            merged > local_before,
191            "merged HLC strictly greater than prior local"
192        );
193        assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
194
195        // Subsequent local `now()` is strictly greater than the merged
196        // observation — the skew bound holds for every following stamp.
197        let after = tracker.hlc().now();
198        assert!(
199            after > merged,
200            "subsequent local stamp dominates folded remote"
201        );
202    }
203
204    #[test]
205    fn fold_remote_hlc_idempotent_under_replay() {
206        // Replaying the same remote HLC must not regress the clock.
207        let tracker = ClosedTimestampTracker::new();
208        let remote = Hlc::new(1_000_000_000_000, 0);
209        let first = tracker.fold_remote_hlc(1, remote);
210        let second = tracker.fold_remote_hlc(1, remote);
211        assert!(
212            second > first,
213            "replay still advances logical counter, never regresses"
214        );
215    }
216
217    #[test]
218    fn with_hlc_shares_clock_across_subsystems() {
219        // Two trackers sharing one HlcClock observe each other's
220        // remote folds. This is the production wiring shape:
221        // ClosedTimestampTracker + MetadataCache + descriptor lease
222        // applier all hold the same Arc<HlcClock>.
223        let hlc = Arc::new(HlcClock::new());
224        let t1 = ClosedTimestampTracker::with_hlc(Arc::clone(&hlc));
225        let t2 = ClosedTimestampTracker::with_hlc(Arc::clone(&hlc));
226
227        let remote = Hlc::new(2_000_000_000_000, 5);
228        let merged = t1.fold_remote_hlc(1, remote);
229        // t2's clock has already advanced past `remote` because the
230        // Arc is shared.
231        let observed = t2.hlc().now();
232        assert!(observed > merged);
233    }
234}