nodedb_cluster/decommission/
flow.rs1use crate::error::Result;
16use crate::metadata_group::{MetadataEntry, RoutingChange, TopologyChange};
17use crate::routing::RoutingTable;
18use crate::topology::ClusterTopology;
19
20use super::safety::check_can_decommission;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
25pub struct DecommissionPlan {
26 pub node_id: u64,
27 pub entries: Vec<MetadataEntry>,
28}
29
30pub fn plan_full_decommission(
49 node_id: u64,
50 topology: &ClusterTopology,
51 routing: &RoutingTable,
52 replication_factor: usize,
53) -> Result<DecommissionPlan> {
54 check_can_decommission(node_id, topology, routing, replication_factor)?;
55
56 let mut entries = Vec::new();
57 entries.push(MetadataEntry::TopologyChange(
58 TopologyChange::StartDecommission { node_id },
59 ));
60
61 let mut group_ids: Vec<u64> = routing
64 .group_members()
65 .iter()
66 .filter(|(_, info)| info.members.contains(&node_id) || info.learners.contains(&node_id))
67 .map(|(gid, _)| *gid)
68 .collect();
69 group_ids.sort_unstable();
70
71 for gid in &group_ids {
73 let info = routing
74 .group_info(*gid)
75 .expect("group id came from routing snapshot");
76 if info.leader != node_id {
77 continue;
78 }
79 if let Some(&new_leader) = info.members.iter().find(|&&m| m != node_id) {
80 entries.push(MetadataEntry::RoutingChange(
81 RoutingChange::LeadershipTransfer {
82 group_id: *gid,
83 new_leader_node_id: new_leader,
84 },
85 ));
86 }
87 }
88
89 for gid in &group_ids {
91 entries.push(MetadataEntry::RoutingChange(RoutingChange::RemoveMember {
92 group_id: *gid,
93 node_id,
94 }));
95 }
96
97 entries.push(MetadataEntry::TopologyChange(
99 TopologyChange::FinishDecommission { node_id },
100 ));
101
102 entries.push(MetadataEntry::TopologyChange(TopologyChange::Leave {
104 node_id,
105 }));
106
107 Ok(DecommissionPlan { node_id, entries })
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::topology::{NodeInfo, NodeState};
114 use std::net::SocketAddr;
115
116 fn topo(nodes: &[u64]) -> ClusterTopology {
117 let mut t = ClusterTopology::new();
118 for (i, id) in nodes.iter().enumerate() {
119 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
120 t.add_node(NodeInfo::new(*id, addr, NodeState::Active));
121 }
122 t
123 }
124
125 #[test]
126 fn plan_shape_matches_spec() {
127 let t = topo(&[1, 2, 3]);
128 let routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
131 let plan = plan_full_decommission(1, &t, &routing, 2).unwrap();
132 assert_eq!(plan.node_id, 1);
133
134 assert!(matches!(
136 plan.entries.first(),
137 Some(MetadataEntry::TopologyChange(
138 TopologyChange::StartDecommission { node_id: 1 }
139 ))
140 ));
141
142 let n = plan.entries.len();
144 assert!(matches!(
145 plan.entries[n - 2],
146 MetadataEntry::TopologyChange(TopologyChange::FinishDecommission { node_id: 1 })
147 ));
148 assert!(matches!(
149 plan.entries[n - 1],
150 MetadataEntry::TopologyChange(TopologyChange::Leave { node_id: 1 })
151 ));
152
153 let remove_count = plan
157 .entries
158 .iter()
159 .filter(|e| {
160 matches!(
161 e,
162 MetadataEntry::RoutingChange(RoutingChange::RemoveMember { node_id: 1, .. })
163 )
164 })
165 .count();
166 assert_eq!(remove_count, 3);
167 }
168
169 #[test]
170 fn plan_emits_leadership_transfer_when_target_leads() {
171 let t = topo(&[1, 2, 3]);
172 let mut routing = RoutingTable::uniform(2, &[1, 2, 3], 3);
175 routing.set_leader(0, 2);
176 routing.set_leader(1, 1);
177 routing.set_leader(2, 2);
178 let plan = plan_full_decommission(1, &t, &routing, 2).unwrap();
179 let transfers: Vec<_> = plan
180 .entries
181 .iter()
182 .filter_map(|e| match e {
183 MetadataEntry::RoutingChange(RoutingChange::LeadershipTransfer {
184 group_id,
185 new_leader_node_id,
186 }) => Some((*group_id, *new_leader_node_id)),
187 _ => None,
188 })
189 .collect();
190 assert_eq!(transfers.len(), 1);
191 assert_eq!(transfers[0].0, 1);
192 assert_ne!(transfers[0].1, 1, "new leader must not be the target");
193 }
194
195 #[test]
196 fn plan_is_deterministic() {
197 let t = topo(&[1, 2, 3]);
198 let routing = RoutingTable::uniform(4, &[1, 2, 3], 3);
199 let p1 = plan_full_decommission(2, &t, &routing, 2).unwrap();
200 let p2 = plan_full_decommission(2, &t, &routing, 2).unwrap();
201 assert_eq!(p1.entries, p2.entries);
202 }
203
204 #[test]
205 fn plan_rejected_when_safety_fails() {
206 let t = topo(&[1, 2]);
207 let routing = RoutingTable::uniform(2, &[1, 2], 2);
208 let err = plan_full_decommission(1, &t, &routing, 2).unwrap_err();
209 assert!(err.to_string().contains("replication factor"));
210 }
211
212 #[test]
213 fn plan_skips_groups_target_is_not_in() {
214 let t = topo(&[1, 2, 3]);
215 let mut routing = RoutingTable::uniform(4, &[1, 2, 3], 3);
217 routing.set_group_members(0, vec![2, 3]);
219 routing.set_group_members(1, vec![2, 3]);
220 routing.set_group_members(2, vec![1, 2, 3]);
221 routing.set_group_members(3, vec![1, 2, 3]);
222 routing.set_group_members(4, vec![2, 3]);
223 let plan = plan_full_decommission(1, &t, &routing, 2).unwrap();
224 let removes: Vec<u64> = plan
225 .entries
226 .iter()
227 .filter_map(|e| match e {
228 MetadataEntry::RoutingChange(RoutingChange::RemoveMember { group_id, .. }) => {
229 Some(*group_id)
230 }
231 _ => None,
232 })
233 .collect();
234 assert_eq!(removes, vec![2, 3]);
235 }
236}