Skip to main content

nodedb_cluster/
lifecycle.rs

1//! Node lifecycle management: join, leave, decommission.
2//!
3//! Handles the full lifecycle of a node in the cluster:
4//!
5//! 1. **Join**: Node contacts seed, receives topology, joins as Learner,
6//!    catches up Raft logs, promoted to Active voter.
7//! 2. **Decommission**: Node drains leadership, migrates all vShards to
8//!    other nodes, then shuts down cleanly.
9//! 3. **Leave**: Node is removed from topology after decommission completes.
10//!
11//! All transitions are replicated via the metadata Raft group as
12//! [`MetadataEntry::TopologyChange`] / [`MetadataEntry::RoutingChange`]
13//! entries and applied through the `MetadataApplier` on every node.
14
15use tracing::{info, warn};
16
17use crate::error::{ClusterError, Result};
18use crate::metadata_group::{MetadataEntry, RoutingChange, TopologyChange};
19use crate::routing::RoutingTable;
20use crate::topology::{ClusterTopology, NodeInfo, NodeState};
21
22/// Result of a decommission operation.
23#[derive(Debug)]
24pub struct DecommissionResult {
25    pub vshards_migrated: usize,
26    pub leadership_transferred: usize,
27    pub completed: bool,
28}
29
30/// Plan a node decommission: compute which vShards to migrate and where.
31///
32/// Produces a sequence of [`MetadataEntry`] values to be proposed against
33/// the metadata Raft group in order. Steps:
34/// 1. Start decommission (topology transition).
35/// 2. Transfer leadership of all Raft groups led by this node.
36pub fn plan_decommission(
37    node_id: u64,
38    topology: &ClusterTopology,
39    routing: &RoutingTable,
40) -> Result<Vec<MetadataEntry>> {
41    let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
42        detail: format!("node {node_id} not found in topology"),
43    })?;
44
45    if node.state == NodeState::Decommissioned {
46        return Err(ClusterError::Transport {
47            detail: format!("node {node_id} is already decommissioned"),
48        });
49    }
50
51    let mut entries = Vec::new();
52
53    // Step 1: Start decommission.
54    entries.push(MetadataEntry::TopologyChange(
55        TopologyChange::StartDecommission { node_id },
56    ));
57
58    // Step 2: Leadership transfers for groups led by this node.
59    for group_id in routing.group_ids() {
60        if let Some(info) = routing.group_info(group_id)
61            && info.leader == node_id
62            && let Some(&new_leader) = info.members.iter().find(|&&m| m != node_id)
63        {
64            entries.push(MetadataEntry::RoutingChange(
65                RoutingChange::LeadershipTransfer {
66                    group_id,
67                    new_leader_node_id: new_leader,
68                },
69            ));
70        }
71    }
72
73    info!(
74        node_id,
75        metadata_entries = entries.len(),
76        "decommission plan computed"
77    );
78    Ok(entries)
79}
80
81/// Check if a node can be safely removed from the cluster.
82pub fn is_safe_to_remove(node_id: u64, topology: &ClusterTopology, routing: &RoutingTable) -> bool {
83    let Some(node) = topology.get_node(node_id) else {
84        return false;
85    };
86    if !matches!(node.state, NodeState::Draining | NodeState::Decommissioned) {
87        return false;
88    }
89
90    for group_id in routing.group_ids() {
91        if let Some(info) = routing.group_info(group_id)
92            && info.leader == node_id
93            && info.members.len() <= 1
94        {
95            return false;
96        }
97    }
98
99    true
100}
101
102/// Register a joining node in the local topology and produce the
103/// [`MetadataEntry`] to be proposed on the metadata Raft group.
104pub fn handle_node_join(node_id: u64, addr: &str, topology: &mut ClusterTopology) -> MetadataEntry {
105    use std::net::SocketAddr;
106
107    let socket_addr: SocketAddr = addr.parse().unwrap_or_else(|_| {
108        warn!(node_id, addr, "invalid address, using default");
109        SocketAddr::from(([0, 0, 0, 0], 0))
110    });
111
112    let info = NodeInfo::new(node_id, socket_addr, NodeState::Joining);
113    topology.join_as_learner(info);
114
115    info!(node_id, addr, "node joining as learner");
116    MetadataEntry::TopologyChange(TopologyChange::Join {
117        node_id,
118        addr: addr.to_string(),
119    })
120}
121
122/// Handle learner promotion after state catch-up validation.
123pub fn handle_learner_promotion(
124    node_id: u64,
125    topology: &mut ClusterTopology,
126    log_lag: u64,
127    max_lag: u64,
128) -> Result<MetadataEntry> {
129    let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
130        detail: format!("node {node_id} not found"),
131    })?;
132
133    if node.state != NodeState::Learner {
134        return Err(ClusterError::Transport {
135            detail: format!("node {node_id} is not a learner (state: {:?})", node.state),
136        });
137    }
138
139    if log_lag > max_lag {
140        return Err(ClusterError::Transport {
141            detail: format!("node {node_id} not caught up: lag={log_lag}, max={max_lag}"),
142        });
143    }
144
145    topology.promote_to_voter(node_id);
146    info!(node_id, log_lag, "learner promoted to voter");
147
148    Ok(MetadataEntry::TopologyChange(
149        TopologyChange::PromoteToVoter { node_id },
150    ))
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use std::net::SocketAddr;
157
158    fn make_topology_and_routing() -> (ClusterTopology, RoutingTable) {
159        let mut topo = ClusterTopology::new();
160        let addr1: SocketAddr = "127.0.0.1:9000".parse().unwrap();
161        let addr2: SocketAddr = "127.0.0.1:9001".parse().unwrap();
162        let addr3: SocketAddr = "127.0.0.1:9002".parse().unwrap();
163
164        topo.add_node(NodeInfo::new(1, addr1, NodeState::Active));
165        topo.add_node(NodeInfo::new(2, addr2, NodeState::Active));
166        topo.add_node(NodeInfo::new(3, addr3, NodeState::Active));
167
168        let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
169        (topo, routing)
170    }
171
172    #[test]
173    fn decommission_plan_creates_metadata_entries() {
174        let (topo, routing) = make_topology_and_routing();
175        let entries = plan_decommission(1, &topo, &routing).unwrap();
176        assert!(!entries.is_empty());
177        match &entries[0] {
178            MetadataEntry::TopologyChange(TopologyChange::StartDecommission { node_id }) => {
179                assert_eq!(*node_id, 1);
180            }
181            other => panic!("expected StartDecommission, got {other:?}"),
182        }
183    }
184
185    #[test]
186    fn safe_to_remove_draining_node() {
187        let (mut topo, routing) = make_topology_and_routing();
188        topo.set_state(1, NodeState::Draining);
189        let _ = is_safe_to_remove(1, &topo, &routing);
190    }
191
192    #[test]
193    fn node_join_creates_learner() {
194        let mut topo = ClusterTopology::new();
195        let entry = handle_node_join(5, "10.0.0.5:9000", &mut topo);
196        assert!(topo.contains(5));
197        assert_eq!(topo.learner_nodes().len(), 1);
198        match entry {
199            MetadataEntry::TopologyChange(TopologyChange::Join { node_id, .. }) => {
200                assert_eq!(node_id, 5);
201            }
202            other => panic!("expected Join, got {other:?}"),
203        }
204    }
205
206    #[test]
207    fn learner_promotion_checks_lag() {
208        let mut topo = ClusterTopology::new();
209        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
210        let info = NodeInfo::new(10, addr, NodeState::Joining);
211        topo.join_as_learner(info);
212
213        let result = handle_learner_promotion(10, &mut topo, 100, 10);
214        assert!(result.is_err());
215
216        let result = handle_learner_promotion(10, &mut topo, 5, 10);
217        assert!(result.is_ok());
218        assert_eq!(topo.get_node(10).unwrap().state, NodeState::Active);
219    }
220
221    #[test]
222    fn decommission_already_decommissioned_fails() {
223        let (mut topo, routing) = make_topology_and_routing();
224        topo.set_state(1, NodeState::Decommissioned);
225        let result = plan_decommission(1, &topo, &routing);
226        assert!(result.is_err());
227    }
228}