Skip to main content

nodedb_cluster/
follower_read.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Follower-read decision gate.
4//!
5//! [`FollowerReadGate`] answers a single question: "given the
6//! session's `ReadConsistency` and the local node's role + closed
7//! timestamp for the target Raft group, can this read be served
8//! locally without forwarding to the leader?"
9//!
10//! ## Decision table
11//!
12//! | Consistency           | Local role  | Closed TS fresh? | Serve locally? |
13//! |-----------------------|-------------|------------------|----------------|
14//! | Strong                | *           | *                | Only if leader |
15//! | BoundedStaleness(d)   | Follower    | ≤ d              | Yes            |
16//! | BoundedStaleness(d)   | Follower    | > d              | No → forward   |
17//! | BoundedStaleness(d)   | Leader      | *                | Yes            |
18//! | Eventual              | *           | *                | Yes            |
19//!
20//! The gate is stateless — it reads from shared handles to the
21//! closed-timestamp tracker and the raft-status provider.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use crate::closed_timestamp::ClosedTimestampTracker;
27
28/// Consistency level for a single read — mirrors the `ReadConsistency`
29/// enum in the `nodedb` crate without coupling `nodedb-cluster` to it.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum ReadLevel {
32    Strong,
33    BoundedStaleness(Duration),
34    Eventual,
35}
36
37/// Answers "can this read be served locally?"
38pub struct FollowerReadGate {
39    closed_ts: Arc<ClosedTimestampTracker>,
40    /// Type-erased function that returns true if this node is the
41    /// leader for the given group. Injection seam — production wraps
42    /// `MultiRaft::group_statuses`, tests supply a closure.
43    is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
44}
45
46impl FollowerReadGate {
47    pub fn new(
48        closed_ts: Arc<ClosedTimestampTracker>,
49        is_leader_fn: Box<dyn Fn(u64) -> bool + Send + Sync>,
50    ) -> Self {
51        Self {
52            closed_ts,
53            is_leader_fn,
54        }
55    }
56
57    /// Returns `true` if the read can be served from this node's
58    /// local replica without forwarding to the leader.
59    pub fn can_serve_locally(&self, group_id: u64, level: ReadLevel) -> bool {
60        match level {
61            ReadLevel::Strong => (self.is_leader_fn)(group_id),
62            ReadLevel::Eventual => true,
63            ReadLevel::BoundedStaleness(max) => {
64                if (self.is_leader_fn)(group_id) {
65                    return true;
66                }
67                self.closed_ts.is_fresh_enough(group_id, max)
68            }
69        }
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76
77    fn gate(leader_groups: &'static [u64]) -> FollowerReadGate {
78        FollowerReadGate::new(
79            Arc::new(ClosedTimestampTracker::new()),
80            Box::new(move |gid| leader_groups.contains(&gid)),
81        )
82    }
83
84    fn gate_with_tracker(
85        leader_groups: &'static [u64],
86        tracker: Arc<ClosedTimestampTracker>,
87    ) -> FollowerReadGate {
88        FollowerReadGate::new(tracker, Box::new(move |gid| leader_groups.contains(&gid)))
89    }
90
91    #[test]
92    fn strong_requires_leader() {
93        let g = gate(&[1]);
94        assert!(g.can_serve_locally(1, ReadLevel::Strong));
95        assert!(!g.can_serve_locally(2, ReadLevel::Strong));
96    }
97
98    #[test]
99    fn eventual_always_local() {
100        let g = gate(&[]);
101        assert!(g.can_serve_locally(99, ReadLevel::Eventual));
102    }
103
104    #[test]
105    fn bounded_staleness_leader_always_local() {
106        let g = gate(&[1]);
107        assert!(g.can_serve_locally(1, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
108    }
109
110    #[test]
111    fn bounded_staleness_follower_fresh_enough() {
112        let tracker = Arc::new(ClosedTimestampTracker::new());
113        tracker.mark_applied(2);
114        let g = gate_with_tracker(&[], tracker);
115        assert!(g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
116    }
117
118    #[test]
119    fn bounded_staleness_follower_too_stale() {
120        let tracker = Arc::new(ClosedTimestampTracker::new());
121        let old = std::time::Instant::now() - Duration::from_secs(30);
122        tracker.mark_applied_at(2, old);
123        let g = gate_with_tracker(&[], tracker);
124        assert!(!g.can_serve_locally(2, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
125    }
126
127    #[test]
128    fn bounded_staleness_unknown_group_not_local() {
129        let g = gate(&[]);
130        assert!(!g.can_serve_locally(99, ReadLevel::BoundedStaleness(Duration::from_secs(5))));
131    }
132}