nodedb_cluster/
follower_read.rs1use std::sync::Arc;
22use std::time::Duration;
23
24use crate::closed_timestamp::ClosedTimestampTracker;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ReadLevel {
30 Strong,
31 BoundedStaleness(Duration),
32 Eventual,
33}
34
35pub struct FollowerReadGate {
37 closed_ts: Arc<ClosedTimestampTracker>,
38 is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
42}
43
44impl FollowerReadGate {
45 pub fn new(
46 closed_ts: Arc<ClosedTimestampTracker>,
47 is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
48 ) -> Self {
49 Self {
50 closed_ts,
51 is_leader_fn,
52 }
53 }
54
55 pub fn can_serve_locally(&self, group_id: u64, level: ReadLevel) -> bool {
58 match level {
59 ReadLevel::Strong => (self.is_leader_fn)(group_id),
60 ReadLevel::Eventual => true,
61 ReadLevel::BoundedStaleness(max) => {
62 if (self.is_leader_fn)(group_id) {
63 return true;
64 }
65 self.closed_ts.is_fresh_enough(group_id, max)
66 }
67 }
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74
75 fn gate(leader_groups: &'static [u64]) -> FollowerReadGate {
76 FollowerReadGate::new(
77 Arc::new(ClosedTimestampTracker::new()),
78 Box::new(move |gid| leader_groups.contains(&gid)),
79 )
80 }
81
82 fn gate_with_tracker(
83 leader_groups: &'static [u64],
84 tracker: Arc<ClosedTimestampTracker>,
85 ) -> FollowerReadGate {
86 FollowerReadGate::new(tracker, Box::new(move |gid| leader_groups.contains(&gid)))
87 }
88
89 #[test]
90 fn strong_requires_leader() {
91 let g = gate(&[1]);
92 assert!(g.can_serve_locally(1, ReadLevel::Strong));
93 assert!(!g.can_serve_locally(2, ReadLevel::Strong));
94 }
95
96 #[test]
97 fn eventual_always_local() {
98 let g = gate(&[]);
99 assert!(g.can_serve_locally(99, ReadLevel::Eventual));
100 }
101
102 #[test]
103 fn bounded_staleness_leader_always_local() {
104 let g = gate(&[1]);
105 assert!(g.can_serve_locally(1, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
106 }
107
108 #[test]
109 fn bounded_staleness_follower_fresh_enough() {
110 let tracker = Arc::new(ClosedTimestampTracker::new());
111 tracker.mark_applied(2);
112 let g = gate_with_tracker(&[], tracker);
113 assert!(g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
114 }
115
116 #[test]
117 fn bounded_staleness_follower_too_stale() {
118 let tracker = Arc::new(ClosedTimestampTracker::new());
119 let old = std::time::Instant::now() - Duration::from_secs(30);
120 tracker.mark_applied_at(2, old);
121 let g = gate_with_tracker(&[], tracker);
122 assert!(!g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
123 }
124
125 #[test]
126 fn bounded_staleness_unknown_group_not_local() {
127 let g = gate(&[]);
128 assert!(!g.can_serve_locally(99, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
129 }
130}