nodedb_cluster/
follower_read.rs1use std::sync::Arc;
24use std::time::Duration;
25
26use crate::closed_timestamp::ClosedTimestampTracker;
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum ReadLevel {
32 Strong,
33 BoundedStaleness(Duration),
34 Eventual,
35}
36
37pub struct FollowerReadGate {
39 closed_ts: Arc<ClosedTimestampTracker>,
40 is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
44}
45
46impl FollowerReadGate {
47 pub fn new(
48 closed_ts: Arc<ClosedTimestampTracker>,
49 is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
50 ) -> Self {
51 Self {
52 closed_ts,
53 is_leader_fn,
54 }
55 }
56
57 pub fn can_serve_locally(&self, group_id: u64, level: ReadLevel) -> bool {
60 match level {
61 ReadLevel::Strong => (self.is_leader_fn)(group_id),
62 ReadLevel::Eventual => true,
63 ReadLevel::BoundedStaleness(max) => {
64 if (self.is_leader_fn)(group_id) {
65 return true;
66 }
67 self.closed_ts.is_fresh_enough(group_id, max)
68 }
69 }
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76
77 fn gate(leader_groups: &'static [u64]) -> FollowerReadGate {
78 FollowerReadGate::new(
79 Arc::new(ClosedTimestampTracker::new()),
80 Box::new(move |gid| leader_groups.contains(&gid)),
81 )
82 }
83
84 fn gate_with_tracker(
85 leader_groups: &'static [u64],
86 tracker: Arc<ClosedTimestampTracker>,
87 ) -> FollowerReadGate {
88 FollowerReadGate::new(tracker, Box::new(move |gid| leader_groups.contains(&gid)))
89 }
90
91 #[test]
92 fn strong_requires_leader() {
93 let g = gate(&[1]);
94 assert!(g.can_serve_locally(1, ReadLevel::Strong));
95 assert!(!g.can_serve_locally(2, ReadLevel::Strong));
96 }
97
98 #[test]
99 fn eventual_always_local() {
100 let g = gate(&[]);
101 assert!(g.can_serve_locally(99, ReadLevel::Eventual));
102 }
103
104 #[test]
105 fn bounded_staleness_leader_always_local() {
106 let g = gate(&[1]);
107 assert!(g.can_serve_locally(1, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
108 }
109
110 #[test]
111 fn bounded_staleness_follower_fresh_enough() {
112 let tracker = Arc::new(ClosedTimestampTracker::new());
113 tracker.mark_applied(2);
114 let g = gate_with_tracker(&[], tracker);
115 assert!(g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
116 }
117
118 #[test]
119 fn bounded_staleness_follower_too_stale() {
120 let tracker = Arc::new(ClosedTimestampTracker::new());
121 let old = std::time::Instant::now() - Duration::from_secs(30);
122 tracker.mark_applied_at(2, old);
123 let g = gate_with_tracker(&[], tracker);
124 assert!(!g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
125 }
126
127 #[test]
128 fn bounded_staleness_unknown_group_not_local() {
129 let g = gate(&[]);
130 assert!(!g.can_serve_locally(99, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
131 }
132}