nodedb_cluster/decommission/
safety.rs1use crate::error::{ClusterError, Result};
10use crate::routing::RoutingTable;
11use crate::topology::{ClusterTopology, NodeState};
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum DecommissionSafetyError {
16 NodeNotFound { node_id: u64 },
18 AlreadyDecommissioned { node_id: u64 },
20 WouldViolateReplicationFactor {
24 node_id: u64,
25 group_id: u64,
26 current_voters: usize,
27 replication_factor: usize,
28 },
29}
30
31impl std::fmt::Display for DecommissionSafetyError {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 Self::NodeNotFound { node_id } => {
35 write!(f, "node {node_id} not found in topology")
36 }
37 Self::AlreadyDecommissioned { node_id } => {
38 write!(f, "node {node_id} is already decommissioned")
39 }
40 Self::WouldViolateReplicationFactor {
41 node_id,
42 group_id,
43 current_voters,
44 replication_factor,
45 } => write!(
46 f,
47 "removing node {node_id} from group {group_id} \
48 would leave {} voter(s), below replication factor {replication_factor}",
49 current_voters.saturating_sub(1)
50 ),
51 }
52 }
53}
54
55impl std::error::Error for DecommissionSafetyError {}
56
57impl From<DecommissionSafetyError> for ClusterError {
58 fn from(value: DecommissionSafetyError) -> Self {
59 ClusterError::Transport {
60 detail: value.to_string(),
61 }
62 }
63}
64
65pub fn check_can_decommission(
74 node_id: u64,
75 topology: &ClusterTopology,
76 routing: &RoutingTable,
77 replication_factor: usize,
78) -> Result<()> {
79 let node = topology
80 .get_node(node_id)
81 .ok_or(DecommissionSafetyError::NodeNotFound { node_id })?;
82
83 if node.state == NodeState::Decommissioned {
84 return Err(DecommissionSafetyError::AlreadyDecommissioned { node_id }.into());
85 }
86
87 for (group_id, info) in routing.group_members() {
88 if !info.members.contains(&node_id) {
89 continue;
90 }
91 let current_voters = info.members.len();
92 if current_voters.saturating_sub(1) < replication_factor {
95 return Err(DecommissionSafetyError::WouldViolateReplicationFactor {
96 node_id,
97 group_id: *group_id,
98 current_voters,
99 replication_factor,
100 }
101 .into());
102 }
103 }
104
105 Ok(())
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use crate::topology::NodeInfo;
112 use std::net::SocketAddr;
113
114 fn topo(nodes: &[u64]) -> ClusterTopology {
115 let mut t = ClusterTopology::new();
116 for (i, id) in nodes.iter().enumerate() {
117 let addr: SocketAddr = format!("127.0.0.1:{}", 9000 + i).parse().unwrap();
118 t.add_node(NodeInfo::new(*id, addr, NodeState::Active));
119 }
120 t
121 }
122
123 #[test]
124 fn rejects_unknown_node() {
125 let t = topo(&[1, 2, 3]);
126 let r = RoutingTable::uniform(2, &[1, 2, 3], 3);
127 let err = check_can_decommission(99, &t, &r, 2).unwrap_err();
128 assert!(err.to_string().contains("99"));
129 }
130
131 #[test]
132 fn rejects_already_decommissioned() {
133 let mut t = topo(&[1, 2, 3]);
134 t.set_state(1, NodeState::Decommissioned);
135 let r = RoutingTable::uniform(2, &[1, 2, 3], 3);
136 let err = check_can_decommission(1, &t, &r, 2).unwrap_err();
137 assert!(err.to_string().contains("already decommissioned"));
138 }
139
140 #[test]
141 fn rejects_when_rf_would_be_violated() {
142 let t = topo(&[1, 2]);
143 let r = RoutingTable::uniform(2, &[1, 2], 2);
146 let err = check_can_decommission(1, &t, &r, 2).unwrap_err();
147 assert!(err.to_string().contains("replication factor"));
148 }
149
150 #[test]
151 fn accepts_when_extra_voter_available() {
152 let t = topo(&[1, 2, 3]);
153 let r = RoutingTable::uniform(2, &[1, 2, 3], 3);
158 check_can_decommission(1, &t, &r, 2).unwrap();
159 }
160
161 #[test]
162 fn skips_groups_target_is_not_member_of() {
163 let t = topo(&[1, 2, 3]);
164 let mut r = RoutingTable::uniform(2, &[1, 2, 3], 3);
166 r.set_group_members(0, vec![1, 3]);
167 r.set_group_members(1, vec![2, 3]);
168 check_can_decommission(1, &t, &r, 1).unwrap();
171 }
172}