Skip to main content

nodedb_cluster/
routing_liveness.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Liveness-driven routing invalidation.
4//!
5//! [`RoutingLivenessHook`] is a [`MembershipSubscriber`] that clears
6//! the leader hint for every Raft group whose leaseholder has just
7//! been marked `Suspect`, `Dead`, or `Left` by the SWIM failure
8//! detector. After the hook fires, the next query that consults the
9//! routing table observes `leader == 0` (the "no leader known"
10//! sentinel) and falls through to a fresh leader discovery via the
11//! existing `NotLeader`-triggered election path. Clients see at most
12//! one retry: the stale hint, the failed dispatch, and a refreshed
13//! leader lookup.
14//!
15//! The hook is storage-agnostic: it holds `Arc<RwLock<RoutingTable>>`
16//! and a resolver closure that maps the string-keyed SWIM `NodeId`
17//! to the numeric `u64` id used throughout the rest of the cluster
18//! crate. Wiring layers (start_cluster, tests) supply the resolver
19//! appropriate to their topology source.
20//!
21//! The hook is intentionally sync and cheap — a single `RwLock::write`,
22//! a linear scan over group_members, and `set_leader(gid, 0)` for
23//! each affected group. No I/O, no spawning. That keeps it safe to
24//! call directly from the detector run loop.
25
26use std::sync::{Arc, RwLock};
27
28use nodedb_types::NodeId;
29use tracing::debug;
30
31use crate::routing::RoutingTable;
32use crate::swim::MemberState;
33use crate::swim::subscriber::MembershipSubscriber;
34
35/// Resolver mapping SWIM `NodeId` → numeric `u64` routing-table id.
36///
37/// Returns `None` for members SWIM knows about but the routing table
38/// does not (placeholder `seed:<addr>` entries before the first real
39/// probe, transient learners, etc.). Those are silently ignored.
40pub type NodeIdResolver = Arc<dyn Fn(&NodeId) -> Option<u64> + Send + Sync>;
41
42/// Clears the leader hint for every group led by a node that SWIM
43/// has marked Suspect/Dead/Left.
44pub struct RoutingLivenessHook {
45    routing: Arc<RwLock<RoutingTable>>,
46    resolver: NodeIdResolver,
47}
48
49impl RoutingLivenessHook {
50    pub fn new(routing: Arc<RwLock<RoutingTable>>, resolver: NodeIdResolver) -> Self {
51        Self { routing, resolver }
52    }
53}
54
55impl MembershipSubscriber for RoutingLivenessHook {
56    fn on_state_change(&self, node_id: &NodeId, _old: Option<MemberState>, new: MemberState) {
57        // Alive transitions are a no-op: the next query will refresh
58        // the leader hint naturally on NotLeader. We only invalidate
59        // when a leader has observably stopped being reachable.
60        if !matches!(
61            new,
62            MemberState::Suspect | MemberState::Dead | MemberState::Left
63        ) {
64            return;
65        }
66
67        let Some(numeric_id) = (self.resolver)(node_id) else {
68            // SWIM knows about a node the routing table doesn't — a
69            // seed placeholder, a learner mid-join, or a node that
70            // was never registered. Nothing to invalidate.
71            return;
72        };
73
74        let mut rt = self.routing.write().unwrap_or_else(|p| p.into_inner());
75        let affected: Vec<u64> = rt
76            .group_members()
77            .iter()
78            .filter(|(_, info)| info.leader == numeric_id)
79            .map(|(gid, _)| *gid)
80            .collect();
81        for gid in &affected {
82            rt.set_leader(*gid, 0);
83        }
84        if !affected.is_empty() {
85            debug!(
86                ?node_id,
87                ?new,
88                numeric_id,
89                groups_invalidated = affected.len(),
90                "routing liveness hook cleared leader hints"
91            );
92        }
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    fn rt_with_leaders(pairs: &[(u64, u64)], rf: usize) -> Arc<RwLock<RoutingTable>> {
101        // Build a routing table with `pairs.len()` groups where group
102        // `gid` has leader `leader`. Uses the uniform constructor to
103        // pick a membership, then overrides the leader.
104        let nodes: Vec<u64> = pairs.iter().map(|(_, l)| *l).collect();
105        let mut rt = RoutingTable::uniform(pairs.len() as u64, &nodes, rf);
106        for (gid, leader) in pairs {
107            rt.set_leader(*gid, *leader);
108        }
109        Arc::new(RwLock::new(rt))
110    }
111
112    fn resolver_for(map: &'static [(&'static str, u64)]) -> NodeIdResolver {
113        Arc::new(move |nid: &NodeId| {
114            map.iter()
115                .find(|(s, _)| *s == nid.as_str())
116                .map(|(_, n)| *n)
117        })
118    }
119
120    #[test]
121    fn dead_transition_clears_leader_for_owned_groups() {
122        let rt = rt_with_leaders(&[(0, 1), (1, 2), (2, 1), (3, 3)], 1);
123        let hook =
124            RoutingLivenessHook::new(rt.clone(), resolver_for(&[("a", 1), ("b", 2), ("c", 3)]));
125
126        hook.on_state_change(
127            &NodeId::try_new("a").expect("test fixture"),
128            Some(MemberState::Alive),
129            MemberState::Dead,
130        );
131
132        let guard = rt.read().unwrap();
133        assert_eq!(guard.group_info(0).unwrap().leader, 0);
134        assert_eq!(guard.group_info(1).unwrap().leader, 2);
135        assert_eq!(guard.group_info(2).unwrap().leader, 0);
136        assert_eq!(guard.group_info(3).unwrap().leader, 3);
137    }
138
139    #[test]
140    fn suspect_transition_also_invalidates() {
141        let rt = rt_with_leaders(&[(0, 7)], 1);
142        let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("x", 7)]));
143        hook.on_state_change(
144            &NodeId::try_new("x").expect("test fixture"),
145            Some(MemberState::Alive),
146            MemberState::Suspect,
147        );
148        assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 0);
149    }
150
151    #[test]
152    fn alive_transition_is_noop() {
153        let rt = rt_with_leaders(&[(0, 5)], 1);
154        let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("q", 5)]));
155        hook.on_state_change(
156            &NodeId::try_new("q").expect("test fixture"),
157            None,
158            MemberState::Alive,
159        );
160        assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 5);
161    }
162
163    #[test]
164    fn unresolved_node_id_is_ignored() {
165        let rt = rt_with_leaders(&[(0, 1)], 1);
166        let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("a", 1)]));
167        // NodeId "seed:127.0.0.1:9000" is not in the resolver map.
168        hook.on_state_change(
169            &NodeId::try_new("seed:127.0.0.1:9000").expect("test fixture"),
170            Some(MemberState::Alive),
171            MemberState::Dead,
172        );
173        // Leader untouched because the resolver returned None.
174        assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 1);
175    }
176
177    #[test]
178    fn left_is_also_invalidating() {
179        let rt = rt_with_leaders(&[(0, 2)], 1);
180        let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("b", 2)]));
181        hook.on_state_change(
182            &NodeId::try_new("b").expect("test fixture"),
183            Some(MemberState::Alive),
184            MemberState::Left,
185        );
186        assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 0);
187    }
188}