1use tracing::{info, warn};
16
17use crate::error::{ClusterError, Result};
18use crate::metadata_group::{MetadataEntry, TopologyChange};
19use crate::routing::RoutingTable;
20use crate::topology::{ClusterTopology, NodeInfo, NodeState};
21
22#[derive(Debug)]
24pub struct DecommissionResult {
25 pub vshards_migrated: usize,
26 pub leadership_transferred: usize,
27 pub completed: bool,
28}
29
30pub fn plan_decommission(
36 node_id: u64,
37 topology: &ClusterTopology,
38 routing: &RoutingTable,
39) -> Result<Vec<MetadataEntry>> {
40 let rf = routing
44 .group_members()
45 .values()
46 .map(|info| info.members.len())
47 .min()
48 .unwrap_or(1)
49 .saturating_sub(1)
50 .max(1);
51 let plan = crate::decommission::plan_full_decommission(node_id, topology, routing, rf)?;
52 info!(
53 node_id,
54 metadata_entries = plan.entries.len(),
55 "decommission plan computed"
56 );
57 Ok(plan.entries)
58}
59
60pub fn is_safe_to_remove(node_id: u64, topology: &ClusterTopology, routing: &RoutingTable) -> bool {
62 let Some(node) = topology.get_node(node_id) else {
63 return false;
64 };
65 if !matches!(node.state, NodeState::Draining | NodeState::Decommissioned) {
66 return false;
67 }
68
69 for group_id in routing.group_ids() {
70 if let Some(info) = routing.group_info(group_id)
71 && info.leader == node_id
72 && info.members.len() <= 1
73 {
74 return false;
75 }
76 }
77
78 true
79}
80
81pub fn handle_node_join(node_id: u64, addr: &str, topology: &mut ClusterTopology) -> MetadataEntry {
84 use std::net::SocketAddr;
85
86 let socket_addr: SocketAddr = addr.parse().unwrap_or_else(|_| {
87 warn!(node_id, addr, "invalid address, using default");
88 SocketAddr::from(([0, 0, 0, 0], 0))
89 });
90
91 let info = NodeInfo::new(node_id, socket_addr, NodeState::Joining);
92 topology.join_as_learner(info);
93
94 info!(node_id, addr, "node joining as learner");
95 MetadataEntry::TopologyChange(TopologyChange::Join {
96 node_id,
97 addr: addr.to_string(),
98 })
99}
100
101pub fn handle_learner_promotion(
103 node_id: u64,
104 topology: &mut ClusterTopology,
105 log_lag: u64,
106 max_lag: u64,
107) -> Result<MetadataEntry> {
108 let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
109 detail: format!("node {node_id} not found"),
110 })?;
111
112 if node.state != NodeState::Learner {
113 return Err(ClusterError::Transport {
114 detail: format!("node {node_id} is not a learner (state: {:?})", node.state),
115 });
116 }
117
118 if log_lag > max_lag {
119 return Err(ClusterError::Transport {
120 detail: format!("node {node_id} not caught up: lag={log_lag}, max={max_lag}"),
121 });
122 }
123
124 topology.promote_to_voter(node_id);
125 info!(node_id, log_lag, "learner promoted to voter");
126
127 Ok(MetadataEntry::TopologyChange(
128 TopologyChange::PromoteToVoter { node_id },
129 ))
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use std::net::SocketAddr;
136
137 fn make_topology_and_routing() -> (ClusterTopology, RoutingTable) {
138 let mut topo = ClusterTopology::new();
139 let addr1: SocketAddr = "127.0.0.1:9000".parse().unwrap();
140 let addr2: SocketAddr = "127.0.0.1:9001".parse().unwrap();
141 let addr3: SocketAddr = "127.0.0.1:9002".parse().unwrap();
142
143 topo.add_node(NodeInfo::new(1, addr1, NodeState::Active));
144 topo.add_node(NodeInfo::new(2, addr2, NodeState::Active));
145 topo.add_node(NodeInfo::new(3, addr3, NodeState::Active));
146
147 let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
148 (topo, routing)
149 }
150
151 #[test]
152 fn decommission_plan_creates_metadata_entries() {
153 let (topo, routing) = make_topology_and_routing();
154 let entries = plan_decommission(1, &topo, &routing).unwrap();
155 assert!(!entries.is_empty());
156 match &entries[0] {
157 MetadataEntry::TopologyChange(TopologyChange::StartDecommission { node_id }) => {
158 assert_eq!(*node_id, 1);
159 }
160 other => panic!("expected StartDecommission, got {other:?}"),
161 }
162 }
163
164 #[test]
165 fn safe_to_remove_draining_node() {
166 let (mut topo, routing) = make_topology_and_routing();
167 topo.set_state(1, NodeState::Draining);
168 let _ = is_safe_to_remove(1, &topo, &routing);
169 }
170
171 #[test]
172 fn node_join_creates_learner() {
173 let mut topo = ClusterTopology::new();
174 let entry = handle_node_join(5, "10.0.0.5:9000", &mut topo);
175 assert!(topo.contains(5));
176 assert_eq!(topo.learner_nodes().len(), 1);
177 match entry {
178 MetadataEntry::TopologyChange(TopologyChange::Join { node_id, .. }) => {
179 assert_eq!(node_id, 5);
180 }
181 other => panic!("expected Join, got {other:?}"),
182 }
183 }
184
185 #[test]
186 fn learner_promotion_checks_lag() {
187 let mut topo = ClusterTopology::new();
188 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
189 let info = NodeInfo::new(10, addr, NodeState::Joining);
190 topo.join_as_learner(info);
191
192 let result = handle_learner_promotion(10, &mut topo, 100, 10);
193 assert!(result.is_err());
194
195 let result = handle_learner_promotion(10, &mut topo, 5, 10);
196 assert!(result.is_ok());
197 assert_eq!(topo.get_node(10).unwrap().state, NodeState::Active);
198 }
199
200 #[test]
201 fn decommission_already_decommissioned_fails() {
202 let (mut topo, routing) = make_topology_and_routing();
203 topo.set_state(1, NodeState::Decommissioned);
204 let result = plan_decommission(1, &topo, &routing);
205 assert!(result.is_err());
206 }
207}