1use tracing::{info, warn};
18
19use crate::error::{ClusterError, Result};
20use crate::metadata_group::{MetadataEntry, TopologyChange};
21use crate::routing::RoutingTable;
22use crate::topology::{ClusterTopology, NodeInfo, NodeState};
23
24#[derive(Debug)]
26pub struct DecommissionResult {
27 pub vshards_migrated: usize,
28 pub leadership_transferred: usize,
29 pub completed: bool,
30}
31
32pub fn plan_decommission(
38 node_id: u64,
39 topology: &ClusterTopology,
40 routing: &RoutingTable,
41) -> Result<Vec<MetadataEntry>> {
42 let rf = routing
46 .group_members()
47 .values()
48 .map(|info| info.members.len())
49 .min()
50 .unwrap_or(1)
51 .saturating_sub(1)
52 .max(1);
53 let plan = crate::decommission::plan_full_decommission(node_id, topology, routing, rf)?;
54 info!(
55 node_id,
56 metadata_entries = plan.entries.len(),
57 "decommission plan computed"
58 );
59 Ok(plan.entries)
60}
61
62pub fn is_safe_to_remove(node_id: u64, topology: &ClusterTopology, routing: &RoutingTable) -> bool {
64 let Some(node) = topology.get_node(node_id) else {
65 return false;
66 };
67 if !matches!(node.state, NodeState::Draining | NodeState::Decommissioned) {
68 return false;
69 }
70
71 for group_id in routing.group_ids() {
72 if let Some(info) = routing.group_info(group_id)
73 && info.leader == node_id
74 && info.members.len() <= 1
75 {
76 return false;
77 }
78 }
79
80 true
81}
82
83pub fn handle_node_join(node_id: u64, addr: &str, topology: &mut ClusterTopology) -> MetadataEntry {
86 use std::net::SocketAddr;
87
88 let socket_addr: SocketAddr = addr.parse().unwrap_or_else(|_| {
89 warn!(node_id, addr, "invalid address, using default");
90 SocketAddr::from(([0, 0, 0, 0], 0))
91 });
92
93 let info = NodeInfo::new(node_id, socket_addr, NodeState::Joining);
94 topology.join_as_learner(info);
95
96 info!(node_id, addr, "node joining as learner");
97 MetadataEntry::TopologyChange(TopologyChange::Join {
98 node_id,
99 addr: addr.to_string(),
100 })
101}
102
103pub fn handle_learner_promotion(
105 node_id: u64,
106 topology: &mut ClusterTopology,
107 log_lag: u64,
108 max_lag: u64,
109) -> Result<MetadataEntry> {
110 let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
111 detail: format!("node {node_id} not found"),
112 })?;
113
114 if node.state != NodeState::Learner {
115 return Err(ClusterError::Transport {
116 detail: format!("node {node_id} is not a learner (state: {:?})", node.state),
117 });
118 }
119
120 if log_lag > max_lag {
121 return Err(ClusterError::Transport {
122 detail: format!("node {node_id} not caught up: lag={log_lag}, max={max_lag}"),
123 });
124 }
125
126 topology.promote_to_voter(node_id);
127 info!(node_id, log_lag, "learner promoted to voter");
128
129 Ok(MetadataEntry::TopologyChange(
130 TopologyChange::PromoteToVoter { node_id },
131 ))
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use std::net::SocketAddr;
138
139 fn make_topology_and_routing() -> (ClusterTopology, RoutingTable) {
140 let mut topo = ClusterTopology::new();
141 let addr1: SocketAddr = "127.0.0.1:9000".parse().unwrap();
142 let addr2: SocketAddr = "127.0.0.1:9001".parse().unwrap();
143 let addr3: SocketAddr = "127.0.0.1:9002".parse().unwrap();
144
145 topo.add_node(NodeInfo::new(1, addr1, NodeState::Active));
146 topo.add_node(NodeInfo::new(2, addr2, NodeState::Active));
147 topo.add_node(NodeInfo::new(3, addr3, NodeState::Active));
148
149 let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
150 (topo, routing)
151 }
152
153 #[test]
154 fn decommission_plan_creates_metadata_entries() {
155 let (topo, routing) = make_topology_and_routing();
156 let entries = plan_decommission(1, &topo, &routing).unwrap();
157 assert!(!entries.is_empty());
158 match &entries[0] {
159 MetadataEntry::TopologyChange(TopologyChange::StartDecommission { node_id }) => {
160 assert_eq!(*node_id, 1);
161 }
162 other => panic!("expected StartDecommission, got {other:?}"),
163 }
164 }
165
166 #[test]
167 fn safe_to_remove_draining_node() {
168 let (mut topo, routing) = make_topology_and_routing();
169 topo.set_state(1, NodeState::Draining);
170 let _ = is_safe_to_remove(1, &topo, &routing);
171 }
172
173 #[test]
174 fn node_join_creates_learner() {
175 let mut topo = ClusterTopology::new();
176 let entry = handle_node_join(5, "10.0.0.5:9000", &mut topo);
177 assert!(topo.contains(5));
178 assert_eq!(topo.learner_nodes().len(), 1);
179 match entry {
180 MetadataEntry::TopologyChange(TopologyChange::Join { node_id, .. }) => {
181 assert_eq!(node_id, 5);
182 }
183 other => panic!("expected Join, got {other:?}"),
184 }
185 }
186
187 #[test]
188 fn learner_promotion_checks_lag() {
189 let mut topo = ClusterTopology::new();
190 let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
191 let info = NodeInfo::new(10, addr, NodeState::Joining);
192 topo.join_as_learner(info);
193
194 let result = handle_learner_promotion(10, &mut topo, 100, 10);
195 assert!(result.is_err());
196
197 let result = handle_learner_promotion(10, &mut topo, 5, 10);
198 assert!(result.is_ok());
199 assert_eq!(topo.get_node(10).unwrap().state, NodeState::Active);
200 }
201
202 #[test]
203 fn decommission_already_decommissioned_fails() {
204 let (mut topo, routing) = make_topology_and_routing();
205 topo.set_state(1, NodeState::Decommissioned);
206 let result = plan_decommission(1, &topo, &routing);
207 assert!(result.is_err());
208 }
209}