Skip to main content

nodedb_cluster/
follower_read.rs

1//! Follower-read decision gate.
2//!
3//! [`FollowerReadGate`] answers a single question: "given the
4//! session's `ReadConsistency` and the local node's role + closed
5//! timestamp for the target Raft group, can this read be served
6//! locally without forwarding to the leader?"
7//!
8//! ## Decision table
9//!
10//! | Consistency           | Local role  | Closed TS fresh? | Serve locally? |
11//! |-----------------------|-------------|------------------|----------------|
12//! | Strong                | *           | *                | Only if leader |
13//! | BoundedStaleness(d)   | Follower    | ≤ d              | Yes            |
14//! | BoundedStaleness(d)   | Follower    | > d              | No → forward   |
15//! | BoundedStaleness(d)   | Leader      | *                | Yes            |
16//! | Eventual              | *           | *                | Yes            |
17//!
18//! The gate is stateless — it reads from shared handles to the
19//! closed-timestamp tracker and the raft-status provider.
20
21use std::sync::Arc;
22use std::time::Duration;
23
24use crate::closed_timestamp::ClosedTimestampTracker;
25
26/// Consistency level for a single read — mirrors the `ReadConsistency`
27/// enum in the `nodedb` crate without coupling `nodedb-cluster` to it.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum ReadLevel {
30    Strong,
31    BoundedStaleness(Duration),
32    Eventual,
33}
34
35/// Answers "can this read be served locally?"
36pub struct FollowerReadGate {
37    closed_ts: Arc<ClosedTimestampTracker>,
38    /// Type-erased function that returns true if this node is the
39    /// leader for the given group. Injection seam — production wraps
40    /// `MultiRaft::group_statuses`, tests supply a closure.
41    is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
42}
43
44impl FollowerReadGate {
45    pub fn new(
46        closed_ts: Arc<ClosedTimestampTracker>,
47        is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
48    ) -> Self {
49        Self {
50            closed_ts,
51            is_leader_fn,
52        }
53    }
54
55    /// Returns `true` if the read can be served from this node's
56    /// local replica without forwarding to the leader.
57    pub fn can_serve_locally(&self, group_id: u64, level: ReadLevel) -> bool {
58        match level {
59            ReadLevel::Strong => (self.is_leader_fn)(group_id),
60            ReadLevel::Eventual => true,
61            ReadLevel::BoundedStaleness(max) => {
62                if (self.is_leader_fn)(group_id) {
63                    return true;
64                }
65                self.closed_ts.is_fresh_enough(group_id, max)
66            }
67        }
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    fn gate(leader_groups: &'static [u64]) -> FollowerReadGate {
76        FollowerReadGate::new(
77            Arc::new(ClosedTimestampTracker::new()),
78            Box::new(move |gid| leader_groups.contains(&gid)),
79        )
80    }
81
82    fn gate_with_tracker(
83        leader_groups: &'static [u64],
84        tracker: Arc<ClosedTimestampTracker>,
85    ) -> FollowerReadGate {
86        FollowerReadGate::new(tracker, Box::new(move |gid| leader_groups.contains(&gid)))
87    }
88
89    #[test]
90    fn strong_requires_leader() {
91        let g = gate(&[1]);
92        assert!(g.can_serve_locally(1, ReadLevel::Strong));
93        assert!(!g.can_serve_locally(2, ReadLevel::Strong));
94    }
95
96    #[test]
97    fn eventual_always_local() {
98        let g = gate(&[]);
99        assert!(g.can_serve_locally(99, ReadLevel::Eventual));
100    }
101
102    #[test]
103    fn bounded_staleness_leader_always_local() {
104        let g = gate(&[1]);
105        assert!(g.can_serve_locally(1, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
106    }
107
108    #[test]
109    fn bounded_staleness_follower_fresh_enough() {
110        let tracker = Arc::new(ClosedTimestampTracker::new());
111        tracker.mark_applied(2);
112        let g = gate_with_tracker(&[], tracker);
113        assert!(g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
114    }
115
116    #[test]
117    fn bounded_staleness_follower_too_stale() {
118        let tracker = Arc::new(ClosedTimestampTracker::new());
119        let old = std::time::Instant::now() - Duration::from_secs(30);
120        tracker.mark_applied_at(2, old);
121        let g = gate_with_tracker(&[], tracker);
122        assert!(!g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
123    }
124
125    #[test]
126    fn bounded_staleness_unknown_group_not_local() {
127        let g = gate(&[]);
128        assert!(!g.can_serve_locally(99, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
129    }
130}