nodedb_cluster/decommission/
safety.rs1use crate::error::{ClusterError, Result};
12use crate::routing::RoutingTable;
13use crate::topology::{ClusterTopology, NodeState};
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum DecommissionSafetyError {
18 NodeNotFound { node_id: u64 },
20 AlreadyDecommissioned { node_id: u64 },
22 WouldViolateReplicationFactor {
26 node_id: u64,
27 group_id: u64,
28 current_voters: usize,
29 replication_factor: usize,
30 },
31}
32
33impl std::fmt::Display for DecommissionSafetyError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::NodeNotFound { node_id } => {
37 write!(f, "node {node_id} not found in topology")
38 }
39 Self::AlreadyDecommissioned { node_id } => {
40 write!(f, "node {node_id} is already decommissioned")
41 }
42 Self::WouldViolateReplicationFactor {
43 node_id,
44 group_id,
45 current_voters,
46 replication_factor,
47 } => write!(
48 f,
49 "removing node {node_id} from group {group_id} \
50 would leave {} voter(s), below replication factor {replication_factor}",
51 current_voters.saturating_sub(1)
52 ),
53 }
54 }
55}
56
57impl std::error::Error for DecommissionSafetyError {}
58
59impl From<DecommissionSafetyError> for ClusterError {
60 fn from(value: DecommissionSafetyError) -> Self {
61 ClusterError::Transport {
62 detail: value.to_string(),
63 }
64 }
65}
66
67pub fn check_can_decommission(
76 node_id: u64,
77 topology: &ClusterTopology,
78 routing: &RoutingTable,
79 replication_factor: usize,
80) -> Result<()> {
81 let node = topology
82 .get_node(node_id)
83 .ok_or(DecommissionSafetyError::NodeNotFound { node_id })?;
84
85 if node.state == NodeState::Decommissioned {
86 return Err(DecommissionSafetyError::AlreadyDecommissioned { node_id }.into());
87 }
88
89 for (group_id, info) in routing.group_members() {
90 if !info.members.contains(&node_id) {
91 continue;
92 }
93 let current_voters = info.members.len();
94 if current_voters.saturating_sub(1) < replication_factor {
97 return Err(DecommissionSafetyError::WouldViolateReplicationFactor {
98 node_id,
99 group_id: *group_id,
100 current_voters,
101 replication_factor,
102 }
103 .into());
104 }
105 }
106
107 Ok(())
108}
109
110#[cfg(test)]
111mod tests {
112 use super::*;
113 use crate::topology::NodeInfo;
114 use std::net::SocketAddr;
115
116 fn topo(nodes: &[u64]) -> ClusterTopology {
117 let mut t = ClusterTopology::new();
118 for (i, id) in nodes.iter().enumerate() {
119 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
120 t.add_node(NodeInfo::new(*id, addr, NodeState::Active));
121 }
122 t
123 }
124
125 #[test]
126 fn rejects_unknown_node() {
127 let t = topo(&[1, 2, 3]);
128 let r = RoutingTable::uniform(2, &[1, 2, 3], 3);
129 let err = check_can_decommission(99, &t, &r, 2).unwrap_err();
130 assert!(err.to_string().contains("99"));
131 }
132
133 #[test]
134 fn rejects_already_decommissioned() {
135 let mut t = topo(&[1, 2, 3]);
136 t.set_state(1, NodeState::Decommissioned);
137 let r = RoutingTable::uniform(2, &[1, 2, 3], 3);
138 let err = check_can_decommission(1, &t, &r, 2).unwrap_err();
139 assert!(err.to_string().contains("already decommissioned"));
140 }
141
142 #[test]
143 fn rejects_when_rf_would_be_violated() {
144 let t = topo(&[1, 2]);
145 let r = RoutingTable::uniform(2, &[1, 2], 2);
148 let err = check_can_decommission(1, &t, &r, 2).unwrap_err();
149 assert!(err.to_string().contains("replication factor"));
150 }
151
152 #[test]
153 fn accepts_when_extra_voter_available() {
154 let t = topo(&[1, 2, 3]);
155 let r = RoutingTable::uniform(2, &[1, 2, 3], 3);
160 check_can_decommission(1, &t, &r, 2).unwrap();
161 }
162
163 #[test]
164 fn skips_groups_target_is_not_member_of() {
165 let t = topo(&[1, 2, 3]);
166 let mut r = RoutingTable::uniform(2, &[1, 2, 3], 3);
168 r.set_group_members(0, vec![1, 3]);
169 r.set_group_members(1, vec![2, 3]);
170 check_can_decommission(1, &t, &r, 1).unwrap();
173 }
174}