1use tracing::{info, warn};
15
16use crate::error::{ClusterError, Result};
17use crate::metadata_group::{MembershipAction, MetadataEntry};
18use crate::routing::RoutingTable;
19use crate::topology::{ClusterTopology, NodeInfo, NodeState};
20
21#[derive(Debug)]
23pub struct DecommissionResult {
24 pub vshards_migrated: usize,
26 pub leadership_transferred: usize,
28 pub completed: bool,
30}
31
32pub fn plan_decommission(
41 node_id: u64,
42 topology: &ClusterTopology,
43 routing: &RoutingTable,
44) -> Result<Vec<MetadataEntry>> {
45 let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
46 detail: format!("node {node_id} not found in topology"),
47 })?;
48
49 if node.state == NodeState::Decommissioned {
50 return Err(ClusterError::Transport {
51 detail: format!("node {node_id} is already decommissioned"),
52 });
53 }
54
55 let mut entries = Vec::new();
56
57 entries.push(MetadataEntry::MembershipChange {
59 node_id,
60 action: MembershipAction::Leave,
61 });
62
63 for group_id in routing.group_ids() {
65 if let Some(info) = routing.group_info(group_id)
66 && info.leader == node_id
67 {
68 if let Some(&new_leader) = info.members.iter().find(|&&m| m != node_id) {
70 entries.push(MetadataEntry::RoutingUpdate {
71 vshard_id: 0, new_node_id: new_leader,
73 new_group_id: group_id,
74 });
75 }
76 }
77 }
78
79 info!(
80 node_id,
81 metadata_entries = entries.len(),
82 "decommission plan computed"
83 );
84 Ok(entries)
85}
86
87pub fn is_safe_to_remove(node_id: u64, topology: &ClusterTopology, routing: &RoutingTable) -> bool {
94 let Some(node) = topology.get_node(node_id) else {
95 return false;
96 };
97 if !matches!(node.state, NodeState::Draining | NodeState::Decommissioned) {
98 return false;
99 }
100
101 for group_id in routing.group_ids() {
103 if let Some(info) = routing.group_info(group_id)
104 && info.leader == node_id
105 && info.members.len() <= 1
106 {
107 return false; }
109 }
110
111 true
112}
113
114pub fn handle_node_join(node_id: u64, addr: &str, topology: &mut ClusterTopology) -> MetadataEntry {
118 use std::net::SocketAddr;
119
120 let socket_addr: SocketAddr = addr.parse().unwrap_or_else(|_| {
121 warn!(node_id, addr, "invalid address, using default");
122 SocketAddr::from(([0, 0, 0, 0], 0))
123 });
124
125 let info = NodeInfo::new(node_id, socket_addr, NodeState::Joining);
126 topology.join_as_learner(info);
127
128 info!(node_id, addr, "node joining as learner");
129 MetadataEntry::MembershipChange {
130 node_id,
131 action: MembershipAction::Join {
132 addr: addr.to_string(),
133 },
134 }
135}
136
137pub fn handle_learner_promotion(
143 node_id: u64,
144 topology: &mut ClusterTopology,
145 log_lag: u64,
146 max_lag: u64,
147) -> Result<MetadataEntry> {
148 let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
149 detail: format!("node {node_id} not found"),
150 })?;
151
152 if node.state != NodeState::Learner {
153 return Err(ClusterError::Transport {
154 detail: format!("node {node_id} is not a learner (state: {:?})", node.state),
155 });
156 }
157
158 if log_lag > max_lag {
159 return Err(ClusterError::Transport {
160 detail: format!("node {node_id} not caught up: lag={log_lag}, max={max_lag}"),
161 });
162 }
163
164 topology.promote_to_voter(node_id);
165 info!(node_id, log_lag, "learner promoted to voter");
166
167 Ok(MetadataEntry::MembershipChange {
168 node_id,
169 action: MembershipAction::PromoteToVoter,
170 })
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use std::net::SocketAddr;
177
178 fn make_topology_and_routing() -> (ClusterTopology, RoutingTable) {
179 let mut topo = ClusterTopology::new();
180 let addr1: SocketAddr = "127.0.0.1:9000".parse().unwrap();
181 let addr2: SocketAddr = "127.0.0.1:9001".parse().unwrap();
182 let addr3: SocketAddr = "127.0.0.1:9002".parse().unwrap();
183
184 topo.add_node(NodeInfo::new(1, addr1, NodeState::Active));
185 topo.add_node(NodeInfo::new(2, addr2, NodeState::Active));
186 topo.add_node(NodeInfo::new(3, addr3, NodeState::Active));
187
188 let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
189 (topo, routing)
190 }
191
192 #[test]
193 fn decommission_plan_creates_metadata_entries() {
194 let (topo, routing) = make_topology_and_routing();
195 let entries = plan_decommission(1, &topo, &routing).unwrap();
196 assert!(!entries.is_empty());
197 match &entries[0] {
199 MetadataEntry::MembershipChange { node_id, action } => {
200 assert_eq!(*node_id, 1);
201 assert!(matches!(action, MembershipAction::Leave));
202 }
203 _ => panic!("expected MembershipChange"),
204 }
205 }
206
207 #[test]
208 fn safe_to_remove_draining_node() {
209 let (mut topo, routing) = make_topology_and_routing();
210 topo.set_state(1, NodeState::Draining);
211 let safe = is_safe_to_remove(1, &topo, &routing);
213 let _ = safe;
215 }
216
217 #[test]
218 fn node_join_creates_learner() {
219 let mut topo = ClusterTopology::new();
220 let entry = handle_node_join(5, "10.0.0.5:9000", &mut topo);
221 assert!(topo.contains(5));
222 assert_eq!(topo.learner_nodes().len(), 1);
223 match entry {
224 MetadataEntry::MembershipChange { node_id, .. } => assert_eq!(node_id, 5),
225 _ => panic!("expected MembershipChange"),
226 }
227 }
228
229 #[test]
230 fn learner_promotion_checks_lag() {
231 let mut topo = ClusterTopology::new();
232 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
233 let info = NodeInfo::new(10, addr, NodeState::Joining);
234 topo.join_as_learner(info);
235
236 let result = handle_learner_promotion(10, &mut topo, 100, 10);
238 assert!(result.is_err());
239
240 let result = handle_learner_promotion(10, &mut topo, 5, 10);
242 assert!(result.is_ok());
243 assert_eq!(topo.get_node(10).unwrap().state, NodeState::Active);
244 }
245
246 #[test]
247 fn decommission_already_decommissioned_fails() {
248 let (mut topo, routing) = make_topology_and_routing();
249 topo.set_state(1, NodeState::Decommissioned);
250 let result = plan_decommission(1, &topo, &routing);
251 assert!(result.is_err());
252 }
253}