Skip to main content

nodedb_cluster/
lifecycle.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Node lifecycle management: join, leave, decommission.
4//!
5//! Handles the full lifecycle of a node in the cluster:
6//!
7//! 1. **Join**: Node contacts seed, receives topology, joins as Learner,
8//!    catches up Raft logs, promoted to Active voter.
9//! 2. **Decommission**: Node drains leadership, migrates all vShards to
10//!    other nodes, then shuts down cleanly.
11//! 3. **Leave**: Node is removed from topology after decommission completes.
12//!
13//! All transitions are replicated via the metadata Raft group as
14//! [`MetadataEntry::TopologyChange`] / [`MetadataEntry::RoutingChange`]
15//! entries and applied through the `MetadataApplier` on every node.
16
17use tracing::{info, warn};
18
19use crate::error::{ClusterError, Result};
20use crate::metadata_group::{MetadataEntry, TopologyChange};
21use crate::routing::RoutingTable;
22use crate::topology::{ClusterTopology, NodeInfo, NodeState};
23
24/// Result of a decommission operation.
25#[derive(Debug)]
26pub struct DecommissionResult {
27    pub vshards_migrated: usize,
28    pub leadership_transferred: usize,
29    pub completed: bool,
30}
31
32/// Plan a node decommission — thin wrapper over
33/// [`crate::decommission::plan_full_decommission`] that returns the
34/// full ordered sequence of metadata entries. Kept as a public
35/// convenience for older call sites; new code should use the
36/// `decommission` module directly.
37pub fn plan_decommission(
38    node_id: u64,
39    topology: &ClusterTopology,
40    routing: &RoutingTable,
41) -> Result<Vec<MetadataEntry>> {
42    // Historical callers assumed the full-cluster RF; derive a safe
43    // lower bound from the smallest existing group so the check is
44    // never stricter than the cluster is already running under.
45    let rf = routing
46        .group_members()
47        .values()
48        .map(|info| info.members.len())
49        .min()
50        .unwrap_or(1)
51        .saturating_sub(1)
52        .max(1);
53    let plan = crate::decommission::plan_full_decommission(node_id, topology, routing, rf)?;
54    info!(
55        node_id,
56        metadata_entries = plan.entries.len(),
57        "decommission plan computed"
58    );
59    Ok(plan.entries)
60}
61
62/// Check if a node can be safely removed from the cluster.
63pub fn is_safe_to_remove(node_id: u64, topology: &ClusterTopology, routing: &RoutingTable) -> bool {
64    let Some(node) = topology.get_node(node_id) else {
65        return false;
66    };
67    if !matches!(node.state, NodeState::Draining | NodeState::Decommissioned) {
68        return false;
69    }
70
71    for group_id in routing.group_ids() {
72        if let Some(info) = routing.group_info(group_id)
73            && info.leader == node_id
74            && info.members.len() <= 1
75        {
76            return false;
77        }
78    }
79
80    true
81}
82
83/// Register a joining node in the local topology and produce the
84/// [`MetadataEntry`] to be proposed on the metadata Raft group.
85pub fn handle_node_join(node_id: u64, addr: &str, topology: &mut ClusterTopology) -> MetadataEntry {
86    use std::net::SocketAddr;
87
88    let socket_addr: SocketAddr = addr.parse().unwrap_or_else(|_| {
89        warn!(node_id, addr, "invalid address, using default");
90        SocketAddr::from(([0, 0, 0, 0], 0))
91    });
92
93    let info = NodeInfo::new(node_id, socket_addr, NodeState::Joining);
94    topology.join_as_learner(info);
95
96    info!(node_id, addr, "node joining as learner");
97    MetadataEntry::TopologyChange(TopologyChange::Join {
98        node_id,
99        addr: addr.to_string(),
100    })
101}
102
103/// Handle learner promotion after state catch-up validation.
104pub fn handle_learner_promotion(
105    node_id: u64,
106    topology: &mut ClusterTopology,
107    log_lag: u64,
108    max_lag: u64,
109) -> Result<MetadataEntry> {
110    let node = topology.get_node(node_id).ok_or(ClusterError::Transport {
111        detail: format!("node {node_id} not found"),
112    })?;
113
114    if node.state != NodeState::Learner {
115        return Err(ClusterError::Transport {
116            detail: format!("node {node_id} is not a learner (state: {:?})", node.state),
117        });
118    }
119
120    if log_lag > max_lag {
121        return Err(ClusterError::Transport {
122            detail: format!("node {node_id} not caught up: lag={log_lag}, max={max_lag}"),
123        });
124    }
125
126    topology.promote_to_voter(node_id);
127    info!(node_id, log_lag, "learner promoted to voter");
128
129    Ok(MetadataEntry::TopologyChange(
130        TopologyChange::PromoteToVoter { node_id },
131    ))
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137    use std::net::SocketAddr;
138
139    fn make_topology_and_routing() -> (ClusterTopology, RoutingTable) {
140        let mut topo = ClusterTopology::new();
141        let addr1: SocketAddr = "127.0.0.1:9000".parse().unwrap();
142        let addr2: SocketAddr = "127.0.0.1:9001".parse().unwrap();
143        let addr3: SocketAddr = "127.0.0.1:9002".parse().unwrap();
144
145        topo.add_node(NodeInfo::new(1, addr1, NodeState::Active));
146        topo.add_node(NodeInfo::new(2, addr2, NodeState::Active));
147        topo.add_node(NodeInfo::new(3, addr3, NodeState::Active));
148
149        let routing = RoutingTable::uniform(4, &[1, 2, 3], 2);
150        (topo, routing)
151    }
152
153    #[test]
154    fn decommission_plan_creates_metadata_entries() {
155        let (topo, routing) = make_topology_and_routing();
156        let entries = plan_decommission(1, &topo, &routing).unwrap();
157        assert!(!entries.is_empty());
158        match &entries[0] {
159            MetadataEntry::TopologyChange(TopologyChange::StartDecommission { node_id }) => {
160                assert_eq!(*node_id, 1);
161            }
162            other => panic!("expected StartDecommission, got {other:?}"),
163        }
164    }
165
166    #[test]
167    fn safe_to_remove_draining_node() {
168        let (mut topo, routing) = make_topology_and_routing();
169        topo.set_state(1, NodeState::Draining);
170        let _ = is_safe_to_remove(1, &topo, &routing);
171    }
172
173    #[test]
174    fn node_join_creates_learner() {
175        let mut topo = ClusterTopology::new();
176        let entry = handle_node_join(5, "10.0.0.5:9000", &mut topo);
177        assert!(topo.contains(5));
178        assert_eq!(topo.learner_nodes().len(), 1);
179        match entry {
180            MetadataEntry::TopologyChange(TopologyChange::Join { node_id, .. }) => {
181                assert_eq!(node_id, 5);
182            }
183            other => panic!("expected Join, got {other:?}"),
184        }
185    }
186
187    #[test]
188    fn learner_promotion_checks_lag() {
189        let mut topo = ClusterTopology::new();
190        let addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
191        let info = NodeInfo::new(10, addr, NodeState::Joining);
192        topo.join_as_learner(info);
193
194        let result = handle_learner_promotion(10, &mut topo, 100, 10);
195        assert!(result.is_err());
196
197        let result = handle_learner_promotion(10, &mut topo, 5, 10);
198        assert!(result.is_ok());
199        assert_eq!(topo.get_node(10).unwrap().state, NodeState::Active);
200    }
201
202    #[test]
203    fn decommission_already_decommissioned_fails() {
204        let (mut topo, routing) = make_topology_and_routing();
205        topo.set_state(1, NodeState::Decommissioned);
206        let result = plan_decommission(1, &topo, &routing);
207        assert!(result.is_err());
208    }
209}