nodedb_cluster/closed_timestamp.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-group closed-timestamp tracker with HLC skew bounding.
4//!
5//! Every time a Raft group applies a committed entry, the applier
6//! records the wall-clock instant as that group's "closed timestamp".
7//! A follower whose closed timestamp for a group is within the
8//! caller's staleness bound can serve reads locally — no gateway hop
9//! to the leader.
10//!
11//! ## HLC integration
12//!
13//! The tracker also owns the node-wide [`HlcClock`]. When an apply
14//! path knows the leader-stamped `Hlc` for the entry it is applying,
15//! it should call [`ClosedTimestampTracker::fold_remote_hlc`] instead
16//! of [`ClosedTimestampTracker::mark_applied`]. Folding the remote
17//! HLC into the local clock bounds cross-node `_ts_system` skew at
18//! this node: any subsequent local stamp is strictly greater than
19//! every observed remote HLC, so versions written here can never
20//! collide with — or appear earlier than — versions a leader has
21//! already replicated.
22//!
23//! Apply-side wiring is intentionally optional. Code paths that don't
24//! yet thread the leader's HLC keep using `mark_applied` and only
25//! lose the cross-node skew bound; correctness of the local
26//! `_ts_system` stamp is unaffected because [`HlcClock::now`] already
27//! advances past the local wall clock.
28
29use std::collections::HashMap;
30use std::sync::{Arc, RwLock};
31use std::time::{Duration, Instant};
32
33use nodedb_types::{Hlc, HlcClock};
34
35/// Tracks the most recent apply instant per Raft group plus the
36/// shared node-wide HLC.
37pub struct ClosedTimestampTracker {
38 groups: RwLock<HashMap<u64, Instant>>,
39 hlc: Arc<HlcClock>,
40}
41
42impl ClosedTimestampTracker {
43 /// Construct a tracker with a fresh, node-private HLC. Tests and
44 /// stand-alone follower-read setups use this; production paths
45 /// should call [`Self::with_hlc`] to share the node-wide clock.
46 pub fn new() -> Self {
47 Self {
48 groups: RwLock::new(HashMap::new()),
49 hlc: Arc::new(HlcClock::new()),
50 }
51 }
52
53 /// Construct a tracker wired to a caller-supplied HLC. Use this
54 /// in production so the tracker's `fold_remote_hlc` advances the
55 /// same clock that other subsystems read via `now()`.
56 pub fn with_hlc(hlc: Arc<HlcClock>) -> Self {
57 Self {
58 groups: RwLock::new(HashMap::new()),
59 hlc,
60 }
61 }
62
63 /// Read access to the shared HLC. Other apply-side subsystems
64 /// (descriptor leases, metadata cache) advance and read it
65 /// through this handle.
66 pub fn hlc(&self) -> &Arc<HlcClock> {
67 &self.hlc
68 }
69
70 /// Record that `group_id` just applied one or more entries.
71 /// Called by the raft-loop applier after each apply batch.
72 pub fn mark_applied(&self, group_id: u64) {
73 let mut g = self.groups.write().unwrap_or_else(|p| p.into_inner());
74 g.insert(group_id, Instant::now());
75 }
76
77 /// Record that `group_id` just applied, using a caller-supplied
78 /// instant. Exposed for deterministic testing with paused time.
79 pub fn mark_applied_at(&self, group_id: u64, at: Instant) {
80 let mut g = self.groups.write().unwrap_or_else(|p| p.into_inner());
81 g.insert(group_id, at);
82 }
83
84 /// Mark a group applied AND fold the leader-stamped `remote` HLC
85 /// into the local clock. Returns the merged HLC that any local
86 /// stamp emitted after this call is guaranteed to be strictly
87 /// greater than.
88 ///
89 /// This is the production apply-path entry point: every committed
90 /// entry that carries a leader HLC (descriptor leases, catalog
91 /// DDL, drain events) should route through here so cross-node
92 /// `_ts_system` skew is bounded at this node.
93 pub fn fold_remote_hlc(&self, group_id: u64, remote: Hlc) -> Hlc {
94 self.mark_applied(group_id);
95 self.hlc.update(remote)
96 }
97
98 /// Check whether this node's replica of `group_id` has applied
99 /// recently enough that a read with `max_staleness` can be
100 /// served locally.
101 ///
102 /// Returns `false` if the group has never applied on this node
103 /// (no closed timestamp recorded).
104 pub fn is_fresh_enough(&self, group_id: u64, max_staleness: Duration) -> bool {
105 let g = self.groups.read().unwrap_or_else(|p| p.into_inner());
106 match g.get(&group_id) {
107 Some(last) => last.elapsed() <= max_staleness,
108 None => false,
109 }
110 }
111
112 /// Return the age of the closed timestamp for a group, or `None`
113 /// if the group has never applied on this node. Useful for
114 /// observability (metrics, SHOW commands).
115 pub fn staleness(&self, group_id: u64) -> Option<Duration> {
116 let g = self.groups.read().unwrap_or_else(|p| p.into_inner());
117 g.get(&group_id).map(|last| last.elapsed())
118 }
119}
120
121impl Default for ClosedTimestampTracker {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn unknown_group_is_not_fresh() {
133 let tracker = ClosedTimestampTracker::new();
134 assert!(!tracker.is_fresh_enough(99, Duration::from_secs(10)));
135 }
136
137 #[test]
138 fn recently_applied_is_fresh() {
139 let tracker = ClosedTimestampTracker::new();
140 tracker.mark_applied(1);
141 assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
142 }
143
144 #[test]
145 fn stale_group_is_not_fresh() {
146 let tracker = ClosedTimestampTracker::new();
147 let old = Instant::now() - Duration::from_secs(30);
148 tracker.mark_applied_at(1, old);
149 assert!(!tracker.is_fresh_enough(1, Duration::from_secs(5)));
150 }
151
152 #[test]
153 fn staleness_returns_none_for_unknown() {
154 let tracker = ClosedTimestampTracker::new();
155 assert!(tracker.staleness(42).is_none());
156 }
157
158 #[test]
159 fn staleness_returns_age_for_known() {
160 let tracker = ClosedTimestampTracker::new();
161 tracker.mark_applied(1);
162 let s = tracker.staleness(1).unwrap();
163 assert!(s < Duration::from_millis(100));
164 }
165
166 #[test]
167 fn mark_applied_updates_monotonically() {
168 let tracker = ClosedTimestampTracker::new();
169 let old = Instant::now() - Duration::from_secs(10);
170 tracker.mark_applied_at(1, old);
171 assert!(!tracker.is_fresh_enough(1, Duration::from_secs(5)));
172 tracker.mark_applied(1);
173 assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
174 }
175
176 #[test]
177 fn fold_remote_hlc_bounds_cross_node_skew() {
178 // Local clock is fresh — its first `now()` will sit near
179 // current wall-clock. A leader far in the future stamps an
180 // entry; folding it MUST advance the local clock past it so
181 // any subsequent local stamp can never collide with or
182 // precede the leader's observation.
183 let tracker = ClosedTimestampTracker::new();
184 let local_before = tracker.hlc().now();
185 let remote = Hlc::new(local_before.wall_ns + 60_000_000_000, 7); // +60s
186 let merged = tracker.fold_remote_hlc(1, remote);
187
188 assert!(merged > remote, "merged HLC strictly greater than remote");
189 assert!(
190 merged > local_before,
191 "merged HLC strictly greater than prior local"
192 );
193 assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
194
195 // Subsequent local `now()` is strictly greater than the merged
196 // observation — the skew bound holds for every following stamp.
197 let after = tracker.hlc().now();
198 assert!(
199 after > merged,
200 "subsequent local stamp dominates folded remote"
201 );
202 }
203
204 #[test]
205 fn fold_remote_hlc_idempotent_under_replay() {
206 // Replaying the same remote HLC must not regress the clock.
207 let tracker = ClosedTimestampTracker::new();
208 let remote = Hlc::new(1_000_000_000_000, 0);
209 let first = tracker.fold_remote_hlc(1, remote);
210 let second = tracker.fold_remote_hlc(1, remote);
211 assert!(
212 second > first,
213 "replay still advances logical counter, never regresses"
214 );
215 }
216
217 #[test]
218 fn with_hlc_shares_clock_across_subsystems() {
219 // Two trackers sharing one HlcClock observe each other's
220 // remote folds. This is the production wiring shape:
221 // ClosedTimestampTracker + MetadataCache + descriptor lease
222 // applier all hold the same Arc<HlcClock>.
223 let hlc = Arc::new(HlcClock::new());
224 let t1 = ClosedTimestampTracker::with_hlc(Arc::clone(&hlc));
225 let t2 = ClosedTimestampTracker::with_hlc(Arc::clone(&hlc));
226
227 let remote = Hlc::new(2_000_000_000_000, 5);
228 let merged = t1.fold_remote_hlc(1, remote);
229 // t2's clock has already advanced past `remote` because the
230 // Arc is shared.
231 let observed = t2.hlc().now();
232 assert!(observed > merged);
233 }
234}