nodedb_cluster/bootstrap/start.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cluster startup entry point: dispatches to bootstrap, join, or restart.
4//!
5//! The decision tree is deliberately small and delegates every
6//! non-trivial choice to a dedicated module:
7//!
8//! - **restart** (`super::restart`) — if the catalog already reports
9//! this node as bootstrapped, we always take the restart path,
10//! regardless of `seed_nodes` or `force_bootstrap`. The catalog is
11//! the authoritative source of truth once it exists.
12//! - **bootstrap** (`super::bootstrap_fn`) — taken when this node is
13//! the elected bootstrapper (lowest-addr seed), or when the operator
14//! forced it via `ClusterConfig::force_bootstrap`, or when no other
15//! seed is running. See [`super::probe::should_bootstrap`].
16//! - **join** (`super::join`) — everything else. The join path owns
17//! its own retry-with-backoff loop and leader-redirect handling, so
18//! this dispatcher does not need to retry at this layer.
19
20use std::sync::Arc;
21use std::time::Duration;
22
23use nodedb_types::NodeId;
24
25use crate::catalog::ClusterCatalog;
26use crate::error::Result;
27use crate::lifecycle_state::ClusterLifecycleTracker;
28use crate::migration_executor::MigrationExecutor;
29use crate::reachability::driver::ReachabilityDriverConfig;
30use crate::rebalancer::driver::RebalancerLoopConfig;
31use crate::subsystem::context::BootstrapCtx;
32use crate::subsystem::health::ClusterHealth;
33use crate::subsystem::{
34 DecommissionSubsystem, ReachabilitySubsystem, RebalancerSubsystem, RunningCluster,
35 SubsystemRegistry, SwimSubsystem, SwimSubsystemConfig,
36};
37use crate::transport::NexarTransport;
38
39use super::bootstrap_fn::bootstrap;
40use super::config::{ClusterConfig, ClusterState};
41use super::join::join;
42use super::probe::should_bootstrap;
43use super::restart::restart;
44
45/// Register the default set of cluster subsystems into `registry`.
46///
47/// Called by [`start_cluster`] after the initial cluster state is resolved
48/// and the `BootstrapCtx` is assembled. The four subsystems are:
49///
50/// 1. `SwimSubsystem` (root, `deps = []`) — failure detector with
51/// `RoutingLivenessHook` attached before the UDP socket opens.
52/// `RoutingLivenessHook` is NOT its own subsystem; it is wired
53/// inside `SwimSubsystem::start()` as a SWIM subscriber.
54///
55/// 2. `ReachabilitySubsystem` (deps: swim) — periodic probe loop that
56/// drives the shared circuit breaker's Open → Half-Open transitions.
57///
58/// 3. `DecommissionSubsystem` (deps: swim) — polls local node topology
59/// state and fires a shutdown signal when the node is decommissioned.
60///
61/// 4. `RebalancerSubsystem` (deps: swim + reachability) — load-based
62/// vShard mover backed by `MigrationExecutor`.
63///
64/// `RoutingLivenessHook` is wired inside `SwimSubsystem` — it is not
65/// registered as a top-level subsystem because it is sync/cheap and runs
66/// directly on the SWIM detector event loop, not as a separate task.
67pub fn register_default_subsystems(
68 registry: &mut SubsystemRegistry,
69 config: &ClusterConfig,
70 ctx: &BootstrapCtx,
71 executor: Arc<MigrationExecutor>,
72) -> crate::error::Result<()> {
73 let swim_cfg = SwimSubsystemConfig {
74 swim: crate::swim::config::SwimConfig::default(),
75 local_id: NodeId::try_new(config.node_id.to_string()).map_err(|e| {
76 crate::error::ClusterError::Config {
77 detail: format!("node_id is not a valid ID: {e}"),
78 }
79 })?,
80 // Use the explicit SWIM UDP addr if set; otherwise let the OS
81 // pick an ephemeral port by binding to port 0 on the listen addr.
82 swim_addr: config.swim_udp_addr.unwrap_or_else(|| {
83 let mut a = config.listen_addr;
84 a.set_port(0);
85 a
86 }),
87 seeds: config.seed_nodes.clone(),
88 };
89
90 registry.register(Arc::new(SwimSubsystem::new(
91 swim_cfg,
92 Arc::clone(&ctx.routing),
93 Arc::clone(&ctx.topology),
94 vec![],
95 )));
96
97 registry.register(Arc::new(ReachabilitySubsystem::new(
98 ReachabilityDriverConfig::default(),
99 )));
100
101 registry.register(Arc::new(DecommissionSubsystem::new(
102 ctx.transport.node_id(),
103 Duration::from_secs(5),
104 )));
105
106 registry.register(Arc::new(RebalancerSubsystem::new(
107 RebalancerLoopConfig::default(),
108 executor,
109 )));
110
111 Ok(())
112}
113
114/// Start the cluster state machine — bootstrap, join, or restart.
115///
116/// Returns the initialized [`ClusterState`] only. Subsystems are NOT
117/// spawned here: they share an `Arc<Mutex<MultiRaft>>` with the
118/// `RaftLoop`, which only exists after the host calls
119/// [`crate::raft_loop::RaftLoop::new`]. The host therefore drives a
120/// two-step startup:
121///
122/// ```text
123/// 1. start_cluster(...) -> ClusterState
124/// 2. start_raft(...) -> RaftLoop owns multi_raft
125/// 3. start_cluster_subsystems(...) -> spawn subsystems sharing the
126/// loop's multi_raft Arc
127/// ```
128///
129/// This split exists because [`crate::raft_loop::RaftLoop::new`] takes
130/// `MultiRaft` *by value* and re-wraps it in its own
131/// `Arc<Mutex<MultiRaft>>`. If we registered subsystems before the
132/// hand-off, every subsystem would hold a strong clone of the
133/// pre-hand-off Arc, blocking `Arc::try_unwrap` in the host's
134/// `init.rs`. The host runs `start_raft` between steps 1 and 3, then
135/// passes the loop's shared multi_raft handle to step 3.
136///
137/// `lifecycle` is the caller-owned phase tracker. This function
138/// transitions it to `Restarting` / `Bootstrapping` / `Joining` as
139/// the dispatcher picks a branch, and to `Failed` on terminal error.
140pub async fn start_cluster(
141 config: &ClusterConfig,
142 catalog: &ClusterCatalog,
143 transport: Arc<NexarTransport>,
144 lifecycle: &ClusterLifecycleTracker,
145) -> Result<ClusterState> {
146 // Authoritative catalog state wins — a previously bootstrapped
147 // node always takes the restart path on boot.
148 let cluster_state = if catalog.is_bootstrapped()? {
149 lifecycle.to_restarting();
150 restart(config, catalog, &transport).inspect_err(|e| {
151 lifecycle.to_failed(format!("restart failed: {e}"));
152 })?
153 } else {
154 // No existing state — decide bootstrap vs join.
155 let is_seed = config.seed_nodes.contains(&config.listen_addr);
156 if is_seed && should_bootstrap(config, &transport).await {
157 lifecycle.to_bootstrapping();
158 bootstrap(config, catalog, transport.local_spki_pin()).inspect_err(|e| {
159 lifecycle.to_failed(format!("bootstrap failed: {e}"));
160 })?
161 } else {
162 join(config, catalog, &transport, lifecycle).await?
163 }
164 };
165
166 Ok(cluster_state)
167}
168
169/// Spawn the default cluster subsystems sharing `raft_multi_raft` with
170/// the running [`crate::raft_loop::RaftLoop`].
171///
172/// Called after [`start_cluster`] has produced [`ClusterState`] and
173/// the host has handed `MultiRaft` over to the `RaftLoop`. The host
174/// passes `raft_multi_raft = raft_loop.multi_raft_handle()` here so
175/// subsystems use the same `Arc<Mutex<MultiRaft>>` the loop owns —
176/// no double-ownership, no orphan Arcs blocking shutdown.
177///
178/// The returned [`RunningCluster`] keeps subsystem background tasks
179/// alive; dropping it signals all of them to shut down. The host
180/// **must** call [`RunningCluster::shutdown_all`] explicitly during
181/// orderly shutdown so subsystems release their `MultiRaft` Arc
182/// before the loop exits.
183pub async fn start_cluster_subsystems(
184 config: &ClusterConfig,
185 topology: Arc<std::sync::RwLock<crate::topology::ClusterTopology>>,
186 routing: Arc<std::sync::RwLock<crate::routing::RoutingTable>>,
187 transport: Arc<NexarTransport>,
188 raft_multi_raft: Arc<std::sync::Mutex<crate::multi_raft::MultiRaft>>,
189) -> Result<RunningCluster> {
190 let health = ClusterHealth::new();
191 let ctx = BootstrapCtx::new(
192 Arc::clone(&topology),
193 Arc::clone(&routing),
194 Arc::clone(&transport),
195 Arc::clone(&raft_multi_raft),
196 health,
197 );
198
199 let executor = Arc::new(MigrationExecutor::new(
200 Arc::clone(&raft_multi_raft),
201 Arc::clone(&routing),
202 Arc::clone(&topology),
203 Arc::clone(&transport),
204 ));
205
206 let mut registry = SubsystemRegistry::new();
207 register_default_subsystems(&mut registry, config, &ctx, executor)?;
208
209 registry
210 .start_all(&ctx)
211 .await
212 .map_err(|e| crate::error::ClusterError::Storage {
213 detail: format!("subsystem start failed: {e}"),
214 })
215}