Skip to main content

nodedb_cluster/
closed_timestamp.rs

1//! Per-group closed-timestamp tracker.
2//!
3//! Every time a Raft group applies a committed entry, the applier
4//! records the wall-clock instant as that group's "closed timestamp".
5//! A follower whose closed timestamp for a group is within the
6//! caller's staleness bound can serve reads locally — no gateway hop
7//! to the leader.
8//!
9//! The tracker is intentionally simple: one `Instant` per group,
10//! updated monotonically. There is no HLC or cross-node coordination
11//! here — the closed timestamp is local to this node. Safety comes
12//! from the fact that a follower's applied index can only advance
13//! (Raft guarantees), so a read served at a given closed timestamp
14//! sees a consistent prefix of the log.
15
16use std::collections::HashMap;
17use std::sync::RwLock;
18use std::time::{Duration, Instant};
19
20/// Tracks the most recent apply instant per Raft group.
21pub struct ClosedTimestampTracker {
22    groups: RwLock<HashMap<u64, Instant>>,
23}
24
25impl ClosedTimestampTracker {
26    pub fn new() -> Self {
27        Self {
28            groups: RwLock::new(HashMap::new()),
29        }
30    }
31
32    /// Record that `group_id` just applied one or more entries.
33    /// Called by the raft-loop applier after each apply batch.
34    pub fn mark_applied(&self, group_id: u64) {
35        let mut g = self.groups.write().unwrap_or_else(|p| p.into_inner());
36        g.insert(group_id, Instant::now());
37    }
38
39    /// Record that `group_id` just applied, using a caller-supplied
40    /// instant. Exposed for deterministic testing with paused time.
41    pub fn mark_applied_at(&self, group_id: u64, at: Instant) {
42        let mut g = self.groups.write().unwrap_or_else(|p| p.into_inner());
43        g.insert(group_id, at);
44    }
45
46    /// Check whether this node's replica of `group_id` has applied
47    /// recently enough that a read with `max_staleness` can be
48    /// served locally.
49    ///
50    /// Returns `false` if the group has never applied on this node
51    /// (no closed timestamp recorded).
52    pub fn is_fresh_enough(&self, group_id: u64, max_staleness: Duration) -> bool {
53        let g = self.groups.read().unwrap_or_else(|p| p.into_inner());
54        match g.get(&group_id) {
55            Some(last) => last.elapsed() <= max_staleness,
56            None => false,
57        }
58    }
59
60    /// Return the age of the closed timestamp for a group, or `None`
61    /// if the group has never applied on this node. Useful for
62    /// observability (metrics, SHOW commands).
63    pub fn staleness(&self, group_id: u64) -> Option<Duration> {
64        let g = self.groups.read().unwrap_or_else(|p| p.into_inner());
65        g.get(&group_id).map(|last| last.elapsed())
66    }
67}
68
69impl Default for ClosedTimestampTracker {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[test]
80    fn unknown_group_is_not_fresh() {
81        let tracker = ClosedTimestampTracker::new();
82        assert!(!tracker.is_fresh_enough(99, Duration::from_secs(10)));
83    }
84
85    #[test]
86    fn recently_applied_is_fresh() {
87        let tracker = ClosedTimestampTracker::new();
88        tracker.mark_applied(1);
89        assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
90    }
91
92    #[test]
93    fn stale_group_is_not_fresh() {
94        let tracker = ClosedTimestampTracker::new();
95        let old = Instant::now() - Duration::from_secs(30);
96        tracker.mark_applied_at(1, old);
97        assert!(!tracker.is_fresh_enough(1, Duration::from_secs(5)));
98    }
99
100    #[test]
101    fn staleness_returns_none_for_unknown() {
102        let tracker = ClosedTimestampTracker::new();
103        assert!(tracker.staleness(42).is_none());
104    }
105
106    #[test]
107    fn staleness_returns_age_for_known() {
108        let tracker = ClosedTimestampTracker::new();
109        tracker.mark_applied(1);
110        let s = tracker.staleness(1).unwrap();
111        assert!(s < Duration::from_millis(100));
112    }
113
114    #[test]
115    fn mark_applied_updates_monotonically() {
116        let tracker = ClosedTimestampTracker::new();
117        let old = Instant::now() - Duration::from_secs(10);
118        tracker.mark_applied_at(1, old);
119        assert!(!tracker.is_fresh_enough(1, Duration::from_secs(5)));
120        tracker.mark_applied(1);
121        assert!(tracker.is_fresh_enough(1, Duration::from_secs(5)));
122    }
123}