Skip to main content

nodedb_cluster/
lifecycle.rs

1//! Node lifecycle management: join, leave, decommission.
2//!
3//! Handles the full lifecycle of a node in the cluster:
4//!
5//! 1. **Join**: Node contacts seed, receives topology, joins as Learner,
6//!    catches up Raft logs, promoted to Active voter.
7//! 2. **Decommission**: Node drains leadership, migrates all vShards to
8//!    other nodes, then shuts down cleanly.
9//! 3. **Leave**: Node is removed from topology after decommission completes.
10//!
11//! All transitions are replicated via the metadata Raft group as
12//! [`MetadataEntry::TopologyChange`] / [`MetadataEntry::RoutingChange`]
13//! entries and applied through the `MetadataApplier` on every node.
14
15use tracing::{info, warn};
16
17use crate::error::{ClusterError, Result};
18use crate::metadata_group::{MetadataEntry, TopologyChange};
19use crate::routing::RoutingTable;
20use crate::topology::{ClusterTopology, NodeInfo, NodeState};
21
22/// Result of a decommission operation.
23#[derive(Debug)]
24pub struct DecommissionResult {
25    pub vshards_migrated: usize,
26    pub leadership_transferred: usize,
27    pub completed: bool,
28}
29
30/// Plan a node decommission — thin wrapper over
31/// [`crate::decommission::plan_full_decommission`] that returns the
32/// full ordered sequence of metadata entries. Kept as a public
33/// convenience for older call sites; new code should use the
34/// `decommission` module directly.
35pub fn plan_decommission(
36    node_id: u64,
37    topology: &ClusterTopology,
38    routing: &RoutingTable,
39) -> Result<Vec<MetadataEntry>> {
40    // Historical callers assumed the full-cluster RF; derive a safe
41    // lower bound from the smallest existing group so the check is
42    // never stricter than the cluster is already running under.
43    let rf = routing
44        .group_members()
45        .values()
46        .map(|info| info.members.len())
47        .min()
48        .unwrap_or(1)
49        .saturating_sub(1)
50        .max(1);
51    let plan = crate::decommission::plan_full_decommission(node_id, topology, routing, rf)?;
52    info!(
53        node_id,
54        metadata_entries = plan.entries.len(),
55        "decommission plan computed"
56    );
57    Ok(plan.entries)
58}
59
60/// Check if a node can be safely removed from the cluster.
61pub fn is_safe_to_remove(node_id: u64, topology: &ClusterTopology, routing: &RoutingTable) -> bool {
62    let Some(node) = topology.get_node(node_id) else {
63        return false;
64    };
65    if !matches!(node.state, NodeState::Draining | NodeState::Decommissioned) {
66        return false;
67    }
68
69    for group_id in routing.group_ids() {
70        if let Some(info) = routing.group_info(group_id)
71            && info.leader == node_id
72            && info.members.len() <= 1
73        {
74            return false;
75        }
76    }
77
78    true
79}
80
81/// Register a joining node in the local topology and produce the
82/// [`MetadataEntry`] to be proposed on the metadata Raft group.
83pub fn handle_node_join(node_id: u64, addr: &str, topology: &mut ClusterTopology) -> MetadataEntry {
84    use std::net::SocketAddr;
85
86    let socket_addr: SocketAddr = addr.parse().unwrap_or_else(|_| {
87        warn!(node_id, addr, "invalid address, using default");
88        SocketAddr::from(([0, 0, 0, 0], 0))
89    });
90
91    let info = NodeInfo::new(node_id, socket_addr, NodeState::Joining);
92    topology.join_as_learner(info);
93
94    info!(node_id, addr, "node joining as learner");
95    MetadataEntry::TopologyChange(TopologyChange::Join {
96        node_id,
97        addr: addr.to_string(),
98    })
99}
100
101/// Handle learner promotion after state catch-up validation.
102pub fn handle_learner_promotion(
103    node_id: u64,
104    topology: &mut ClusterTopology,
105    log_lag: u64,
106    max_lag: u64,
107) -> Result<MetadataEntry> {
108    let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
109        detail: format!("node {node_id} not found"),
110    })?;
111
112    if node.state != NodeState::Learner {
113        return Err(ClusterError::Transport {
114            detail: format!("node {node_id} is not a learner (state: {:?})", node.state),
115        });
116    }
117
118    if log_lag > max_lag {
119        return Err(ClusterError::Transport {
120            detail: format!("node {node_id} not caught up: lag={log_lag}, max={max_lag}"),
121        });
122    }
123
124    topology.promote_to_voter(node_id);
125    info!(node_id, log_lag, "learner promoted to voter");
126
127    Ok(MetadataEntry::TopologyChange(
128        TopologyChange::PromoteToVoter { node_id },
129    ))
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use std::net::SocketAddr;
136
137    fn make_topology_and_routing() -> (ClusterTopology, RoutingTable) {
138        let mut topo = ClusterTopology::new();
139        let addr1: SocketAddr = "127.0.0.1:9000".parse().unwrap();
140        let addr2: SocketAddr = "127.0.0.1:9001".parse().unwrap();
141        let addr3: SocketAddr = "127.0.0.1:9002".parse().unwrap();
142
143        topo.add_node(NodeInfo::new(1, addr1, NodeState::Active));
144        topo.add_node(NodeInfo::new(2, addr2, NodeState::Active));
145        topo.add_node(NodeInfo::new(3, addr3, NodeState::Active));
146
147        let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
148        (topo, routing)
149    }
150
151    #[test]
152    fn decommission_plan_creates_metadata_entries() {
153        let (topo, routing) = make_topology_and_routing();
154        let entries = plan_decommission(1, &topo, &routing).unwrap();
155        assert!(!entries.is_empty());
156        match &entries[0] {
157            MetadataEntry::TopologyChange(TopologyChange::StartDecommission { node_id }) => {
158                assert_eq!(*node_id, 1);
159            }
160            other => panic!("expected StartDecommission, got {other:?}"),
161        }
162    }
163
164    #[test]
165    fn safe_to_remove_draining_node() {
166        let (mut topo, routing) = make_topology_and_routing();
167        topo.set_state(1, NodeState::Draining);
168        let _ = is_safe_to_remove(1, &topo, &routing);
169    }
170
171    #[test]
172    fn node_join_creates_learner() {
173        let mut topo = ClusterTopology::new();
174        let entry = handle_node_join(5, "10.0.0.5:9000", &mut topo);
175        assert!(topo.contains(5));
176        assert_eq!(topo.learner_nodes().len(), 1);
177        match entry {
178            MetadataEntry::TopologyChange(TopologyChange::Join { node_id, .. }) => {
179                assert_eq!(node_id, 5);
180            }
181            other => panic!("expected Join, got {other:?}"),
182        }
183    }
184
185    #[test]
186    fn learner_promotion_checks_lag() {
187        let mut topo = ClusterTopology::new();
188        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
189        let info = NodeInfo::new(10, addr, NodeState::Joining);
190        topo.join_as_learner(info);
191
192        let result = handle_learner_promotion(10, &mut topo, 100, 10);
193        assert!(result.is_err());
194
195        let result = handle_learner_promotion(10, &mut topo, 5, 10);
196        assert!(result.is_ok());
197        assert_eq!(topo.get_node(10).unwrap().state, NodeState::Active);
198    }
199
200    #[test]
201    fn decommission_already_decommissioned_fails() {
202        let (mut topo, routing) = make_topology_and_routing();
203        topo.set_state(1, NodeState::Decommissioned);
204        let result = plan_decommission(1, &topo, &routing);
205        assert!(result.is_err());
206    }
207}