1use tracing::{info, warn};
16
17use crate::error::{ClusterError, Result};
18use crate::metadata_group::{MetadataEntry, RoutingChange, TopologyChange};
19use crate::routing::RoutingTable;
20use crate::topology::{ClusterTopology, NodeInfo, NodeState};
21
22#[derive(Debug)]
24pub struct DecommissionResult {
25 pub vshards_migrated: usize,
26 pub leadership_transferred: usize,
27 pub completed: bool,
28}
29
30pub fn plan_decommission(
37 node_id: u64,
38 topology: &ClusterTopology,
39 routing: &RoutingTable,
40) -> Result<Vec<MetadataEntry>> {
41 let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
42 detail: format!("node {node_id} not found in topology"),
43 })?;
44
45 if node.state == NodeState::Decommissioned {
46 return Err(ClusterError::Transport {
47 detail: format!("node {node_id} is already decommissioned"),
48 });
49 }
50
51 let mut entries = Vec::new();
52
53 entries.push(MetadataEntry::TopologyChange(
55 TopologyChange::StartDecommission { node_id },
56 ));
57
58 for group_id in routing.group_ids() {
60 if let Some(info) = routing.group_info(group_id)
61 && info.leader == node_id
62 && let Some(&new_leader) = info.members.iter().find(|&&m| m != node_id)
63 {
64 entries.push(MetadataEntry::RoutingChange(
65 RoutingChange::LeadershipTransfer {
66 group_id,
67 new_leader_node_id: new_leader,
68 },
69 ));
70 }
71 }
72
73 info!(
74 node_id,
75 metadata_entries = entries.len(),
76 "decommission plan computed"
77 );
78 Ok(entries)
79}
80
81pub fn is_safe_to_remove(node_id: u64, topology: &ClusterTopology, routing: &RoutingTable) -> bool {
83 let Some(node) = topology.get_node(node_id) else {
84 return false;
85 };
86 if !matches!(node.state, NodeState::Draining | NodeState::Decommissioned) {
87 return false;
88 }
89
90 for group_id in routing.group_ids() {
91 if let Some(info) = routing.group_info(group_id)
92 && info.leader == node_id
93 && info.members.len() <= 1
94 {
95 return false;
96 }
97 }
98
99 true
100}
101
102pub fn handle_node_join(node_id: u64, addr: &str, topology: &mut ClusterTopology) -> MetadataEntry {
105 use std::net::SocketAddr;
106
107 let socket_addr: SocketAddr = addr.parse().unwrap_or_else(|_| {
108 warn!(node_id, addr, "invalid address, using default");
109 SocketAddr::from(([0, 0, 0, 0], 0))
110 });
111
112 let info = NodeInfo::new(node_id, socket_addr, NodeState::Joining);
113 topology.join_as_learner(info);
114
115 info!(node_id, addr, "node joining as learner");
116 MetadataEntry::TopologyChange(TopologyChange::Join {
117 node_id,
118 addr: addr.to_string(),
119 })
120}
121
122pub fn handle_learner_promotion(
124 node_id: u64,
125 topology: &mut ClusterTopology,
126 log_lag: u64,
127 max_lag: u64,
128) -> Result<MetadataEntry> {
129 let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
130 detail: format!("node {node_id} not found"),
131 })?;
132
133 if node.state != NodeState::Learner {
134 return Err(ClusterError::Transport {
135 detail: format!("node {node_id} is not a learner (state: {:?})", node.state),
136 });
137 }
138
139 if log_lag > max_lag {
140 return Err(ClusterError::Transport {
141 detail: format!("node {node_id} not caught up: lag={log_lag}, max={max_lag}"),
142 });
143 }
144
145 topology.promote_to_voter(node_id);
146 info!(node_id, log_lag, "learner promoted to voter");
147
148 Ok(MetadataEntry::TopologyChange(
149 TopologyChange::PromoteToVoter { node_id },
150 ))
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use std::net::SocketAddr;
157
158 fn make_topology_and_routing() -> (ClusterTopology, RoutingTable) {
159 let mut topo = ClusterTopology::new();
160 let addr1: SocketAddr = "127.0.0.1:9000".parse().unwrap();
161 let addr2: SocketAddr = "127.0.0.1:9001".parse().unwrap();
162 let addr3: SocketAddr = "127.0.0.1:9002".parse().unwrap();
163
164 topo.add_node(NodeInfo::new(1, addr1, NodeState::Active));
165 topo.add_node(NodeInfo::new(2, addr2, NodeState::Active));
166 topo.add_node(NodeInfo::new(3, addr3, NodeState::Active));
167
168 let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
169 (topo, routing)
170 }
171
172 #[test]
173 fn decommission_plan_creates_metadata_entries() {
174 let (topo, routing) = make_topology_and_routing();
175 let entries = plan_decommission(1, &topo, &routing).unwrap();
176 assert!(!entries.is_empty());
177 match &entries[0] {
178 MetadataEntry::TopologyChange(TopologyChange::StartDecommission { node_id }) => {
179 assert_eq!(*node_id, 1);
180 }
181 other => panic!("expected StartDecommission, got {other:?}"),
182 }
183 }
184
185 #[test]
186 fn safe_to_remove_draining_node() {
187 let (mut topo, routing) = make_topology_and_routing();
188 topo.set_state(1, NodeState::Draining);
189 let _ = is_safe_to_remove(1, &topo, &routing);
190 }
191
192 #[test]
193 fn node_join_creates_learner() {
194 let mut topo = ClusterTopology::new();
195 let entry = handle_node_join(5, "10.0.0.5:9000", &mut topo);
196 assert!(topo.contains(5));
197 assert_eq!(topo.learner_nodes().len(), 1);
198 match entry {
199 MetadataEntry::TopologyChange(TopologyChange::Join { node_id, .. }) => {
200 assert_eq!(node_id, 5);
201 }
202 other => panic!("expected Join, got {other:?}"),
203 }
204 }
205
206 #[test]
207 fn learner_promotion_checks_lag() {
208 let mut topo = ClusterTopology::new();
209 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
210 let info = NodeInfo::new(10, addr, NodeState::Joining);
211 topo.join_as_learner(info);
212
213 let result = handle_learner_promotion(10, &mut topo, 100, 10);
214 assert!(result.is_err());
215
216 let result = handle_learner_promotion(10, &mut topo, 5, 10);
217 assert!(result.is_ok());
218 assert_eq!(topo.get_node(10).unwrap().state, NodeState::Active);
219 }
220
221 #[test]
222 fn decommission_already_decommissioned_fails() {
223 let (mut topo, routing) = make_topology_and_routing();
224 topo.set_state(1, NodeState::Decommissioned);
225 let result = plan_decommission(1, &topo, &routing);
226 assert!(result.is_err());
227 }
228}