nodedb_cluster/
routing_liveness.rs1use std::sync::{Arc, RwLock};
27
28use nodedb_types::NodeId;
29use tracing::debug;
30
31use crate::routing::RoutingTable;
32use crate::swim::MemberState;
33use crate::swim::subscriber::MembershipSubscriber;
34
35pub type NodeIdResolver = Arc<dyn Fn(&NodeId) -> Option<u64> + Send + Sync>;
41
42pub struct RoutingLivenessHook {
45 routing: Arc<RwLock<RoutingTable>>,
46 resolver: NodeIdResolver,
47}
48
49impl RoutingLivenessHook {
50 pub fn new(routing: Arc<RwLock<RoutingTable>>, resolver: NodeIdResolver) -> Self {
51 Self { routing, resolver }
52 }
53}
54
55impl MembershipSubscriber for RoutingLivenessHook {
56 fn on_state_change(&self, node_id: &NodeId, _old: Option<MemberState>, new: MemberState) {
57 if !matches!(
61 new,
62 MemberState::Suspect | MemberState::Dead | MemberState::Left
63 ) {
64 return;
65 }
66
67 let Some(numeric_id) = (self.resolver)(node_id) else {
68 return;
72 };
73
74 let mut rt = self.routing.write().unwrap_or_else(|p| p.into_inner());
75 let affected: Vec<u64> = rt
76 .group_members()
77 .iter()
78 .filter(|(_, info)| info.leader == numeric_id)
79 .map(|(gid, _)| *gid)
80 .collect();
81 for gid in &affected {
82 rt.set_leader(*gid, 0);
83 }
84 if !affected.is_empty() {
85 debug!(
86 ?node_id,
87 ?new,
88 numeric_id,
89 groups_invalidated = affected.len(),
90 "routing liveness hook cleared leader hints"
91 );
92 }
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 fn rt_with_leaders(pairs: &[(u64, u64)], rf: usize) -> Arc<RwLock<RoutingTable>> {
101 let nodes: Vec<u64> = pairs.iter().map(|(_, l)| *l).collect();
105 let mut rt = RoutingTable::uniform(pairs.len() as u64, &nodes, rf);
106 for (gid, leader) in pairs {
107 rt.set_leader(*gid, *leader);
108 }
109 Arc::new(RwLock::new(rt))
110 }
111
112 fn resolver_for(map: &'static [(&'static str, u64)]) -> NodeIdResolver {
113 Arc::new(move |nid: &NodeId| {
114 map.iter()
115 .find(|(s, _)| *s == nid.as_str())
116 .map(|(_, n)| *n)
117 })
118 }
119
120 #[test]
121 fn dead_transition_clears_leader_for_owned_groups() {
122 let rt = rt_with_leaders(&[(0, 1), (1, 2), (2, 1), (3, 3)], 1);
123 let hook =
124 RoutingLivenessHook::new(rt.clone(), resolver_for(&[("a", 1), ("b", 2), ("c", 3)]));
125
126 hook.on_state_change(
127 &NodeId::try_new("a").expect("test fixture"),
128 Some(MemberState::Alive),
129 MemberState::Dead,
130 );
131
132 let guard = rt.read().unwrap();
133 assert_eq!(guard.group_info(0).unwrap().leader, 0);
134 assert_eq!(guard.group_info(1).unwrap().leader, 2);
135 assert_eq!(guard.group_info(2).unwrap().leader, 0);
136 assert_eq!(guard.group_info(3).unwrap().leader, 3);
137 }
138
139 #[test]
140 fn suspect_transition_also_invalidates() {
141 let rt = rt_with_leaders(&[(0, 7)], 1);
142 let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("x", 7)]));
143 hook.on_state_change(
144 &NodeId::try_new("x").expect("test fixture"),
145 Some(MemberState::Alive),
146 MemberState::Suspect,
147 );
148 assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 0);
149 }
150
151 #[test]
152 fn alive_transition_is_noop() {
153 let rt = rt_with_leaders(&[(0, 5)], 1);
154 let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("q", 5)]));
155 hook.on_state_change(
156 &NodeId::try_new("q").expect("test fixture"),
157 None,
158 MemberState::Alive,
159 );
160 assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 5);
161 }
162
163 #[test]
164 fn unresolved_node_id_is_ignored() {
165 let rt = rt_with_leaders(&[(0, 1)], 1);
166 let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("a", 1)]));
167 hook.on_state_change(
169 &NodeId::try_new("seed:127.0.0.1:9000").expect("test fixture"),
170 Some(MemberState::Alive),
171 MemberState::Dead,
172 );
173 assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 1);
175 }
176
177 #[test]
178 fn left_is_also_invalidating() {
179 let rt = rt_with_leaders(&[(0, 2)], 1);
180 let hook = RoutingLivenessHook::new(rt.clone(), resolver_for(&[("b", 2)]));
181 hook.on_state_change(
182 &NodeId::try_new("b").expect("test fixture"),
183 Some(MemberState::Alive),
184 MemberState::Left,
185 );
186 assert_eq!(rt.read().unwrap().group_info(0).unwrap().leader, 0);
187 }
188}