Skip to main content

nodedb_cluster/bootstrap/
start.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Cluster startup entry point: dispatches to bootstrap, join, or restart.
4//!
5//! The decision tree is deliberately small and delegates every
6//! non-trivial choice to a dedicated module:
7//!
8//! - **restart** (`super::restart`) — if the catalog already reports
9//!   this node as bootstrapped, we always take the restart path,
10//!   regardless of `seed_nodes` or `force_bootstrap`. The catalog is
11//!   the authoritative source of truth once it exists.
12//! - **bootstrap** (`super::bootstrap_fn`) — taken when this node is
13//!   the elected bootstrapper (lowest-addr seed), or when the operator
14//!   forced it via `ClusterConfig::force_bootstrap`, or when no other
15//!   seed is running. See [`super::probe::should_bootstrap`].
16//! - **join** (`super::join`) — everything else. The join path owns
17//!   its own retry-with-backoff loop and leader-redirect handling, so
18//!   this dispatcher does not need to retry at this layer.
19
20use std::sync::Arc;
21use std::time::Duration;
22
23use nodedb_types::NodeId;
24
25use crate::catalog::ClusterCatalog;
26use crate::error::Result;
27use crate::lifecycle_state::ClusterLifecycleTracker;
28use crate::migration_executor::MigrationExecutor;
29use crate::reachability::driver::ReachabilityDriverConfig;
30use crate::rebalancer::driver::RebalancerLoopConfig;
31use crate::subsystem::context::BootstrapCtx;
32use crate::subsystem::health::ClusterHealth;
33use crate::subsystem::{
34    DecommissionSubsystem, ReachabilitySubsystem, RebalancerSubsystem, RunningCluster,
35    SubsystemRegistry, SwimSubsystem, SwimSubsystemConfig,
36};
37use crate::transport::NexarTransport;
38
39use super::bootstrap_fn::bootstrap;
40use super::config::{ClusterConfig, ClusterState};
41use super::join::join;
42use super::probe::should_bootstrap;
43use super::restart::restart;
44
45/// Register the default set of cluster subsystems into `registry`.
46///
47/// Called by [`start_cluster`] after the initial cluster state is resolved
48/// and the `BootstrapCtx` is assembled. The four subsystems are:
49///
50/// 1. `SwimSubsystem` (root, `deps = []`) — failure detector with
51///    `RoutingLivenessHook` attached before the UDP socket opens.
52///    `RoutingLivenessHook` is NOT its own subsystem; it is wired
53///    inside `SwimSubsystem::start()` as a SWIM subscriber.
54///
55/// 2. `ReachabilitySubsystem` (deps: swim) — periodic probe loop that
56///    drives the shared circuit breaker's Open → Half-Open transitions.
57///
58/// 3. `DecommissionSubsystem` (deps: swim) — polls local node topology
59///    state and fires a shutdown signal when the node is decommissioned.
60///
61/// 4. `RebalancerSubsystem` (deps: swim + reachability) — load-based
62///    vShard mover backed by `MigrationExecutor`.
63///
64/// `RoutingLivenessHook` is wired inside `SwimSubsystem` — it is not
65/// registered as a top-level subsystem because it is sync/cheap and runs
66/// directly on the SWIM detector event loop, not as a separate task.
67pub fn register_default_subsystems(
68    registry: &mut SubsystemRegistry,
69    config: &ClusterConfig,
70    ctx: &BootstrapCtx,
71    executor: Arc<MigrationExecutor>,
72) -> crate::error::Result<()> {
73    let swim_cfg = SwimSubsystemConfig {
74        swim: crate::swim::config::SwimConfig::default(),
75        local_id: NodeId::try_new(config.node_id.to_string()).map_err(|e| {
76            crate::error::ClusterError::Config {
77                detail: format!("node_id is not a valid ID: {e}"),
78            }
79        })?,
80        // Use the explicit SWIM UDP addr if set; otherwise let the OS
81        // pick an ephemeral port by binding to port 0 on the listen addr.
82        swim_addr: config.swim_udp_addr.unwrap_or_else(|| {
83            let mut a = config.listen_addr;
84            a.set_port(0);
85            a
86        }),
87        seeds: config.seed_nodes.clone(),
88    };
89
90    registry.register(Arc::new(SwimSubsystem::new(
91        swim_cfg,
92        Arc::clone(&ctx.routing),
93        Arc::clone(&ctx.topology),
94        vec![],
95    )));
96
97    registry.register(Arc::new(ReachabilitySubsystem::new(
98        ReachabilityDriverConfig::default(),
99    )));
100
101    registry.register(Arc::new(DecommissionSubsystem::new(
102        ctx.transport.node_id(),
103        Duration::from_secs(5),
104    )));
105
106    registry.register(Arc::new(RebalancerSubsystem::new(
107        RebalancerLoopConfig::default(),
108        executor,
109    )));
110
111    Ok(())
112}
113
114/// Start the cluster state machine — bootstrap, join, or restart.
115///
116/// Returns the initialized [`ClusterState`] only. Subsystems are NOT
117/// spawned here: they share an `Arc<Mutex<MultiRaft>>` with the
118/// `RaftLoop`, which only exists after the host calls
119/// [`crate::raft_loop::RaftLoop::new`]. The host therefore drives a
120/// two-step startup:
121///
122/// ```text
123/// 1. start_cluster(...)              -> ClusterState
124/// 2. start_raft(...)                 -> RaftLoop owns multi_raft
125/// 3. start_cluster_subsystems(...)   -> spawn subsystems sharing the
126///                                       loop's multi_raft Arc
127/// ```
128///
129/// This split exists because [`crate::raft_loop::RaftLoop::new`] takes
130/// `MultiRaft` *by value* and re-wraps it in its own
131/// `Arc<Mutex<MultiRaft>>`. If we registered subsystems before the
132/// hand-off, every subsystem would hold a strong clone of the
133/// pre-hand-off Arc, blocking `Arc::try_unwrap` in the host's
134/// `init.rs`. The host runs `start_raft` between steps 1 and 3, then
135/// passes the loop's shared multi_raft handle to step 3.
136///
137/// `lifecycle` is the caller-owned phase tracker. This function
138/// transitions it to `Restarting` / `Bootstrapping` / `Joining` as
139/// the dispatcher picks a branch, and to `Failed` on terminal error.
140pub async fn start_cluster(
141    config: &ClusterConfig,
142    catalog: &ClusterCatalog,
143    transport: Arc<NexarTransport>,
144    lifecycle: &ClusterLifecycleTracker,
145) -> Result<ClusterState> {
146    // Authoritative catalog state wins — a previously bootstrapped
147    // node always takes the restart path on boot.
148    let cluster_state = if catalog.is_bootstrapped()? {
149        lifecycle.to_restarting();
150        restart(config, catalog, &transport).inspect_err(|e| {
151            lifecycle.to_failed(format!("restart failed: {e}"));
152        })?
153    } else {
154        // No existing state — decide bootstrap vs join.
155        let is_seed = config.seed_nodes.contains(&config.listen_addr);
156        if is_seed && should_bootstrap(config, &transport).await {
157            lifecycle.to_bootstrapping();
158            bootstrap(config, catalog, transport.local_spki_pin()).inspect_err(|e| {
159                lifecycle.to_failed(format!("bootstrap failed: {e}"));
160            })?
161        } else {
162            join(config, catalog, &transport, lifecycle).await?
163        }
164    };
165
166    Ok(cluster_state)
167}
168
169/// Spawn the default cluster subsystems sharing `raft_multi_raft` with
170/// the running [`crate::raft_loop::RaftLoop`].
171///
172/// Called after [`start_cluster`] has produced [`ClusterState`] and
173/// the host has handed `MultiRaft` over to the `RaftLoop`. The host
174/// passes `raft_multi_raft = raft_loop.multi_raft_handle()` here so
175/// subsystems use the same `Arc<Mutex<MultiRaft>>` the loop owns —
176/// no double-ownership, no orphan Arcs blocking shutdown.
177///
178/// The returned [`RunningCluster`] keeps subsystem background tasks
179/// alive; dropping it signals all of them to shut down. The host
180/// **must** call [`RunningCluster::shutdown_all`] explicitly during
181/// orderly shutdown so subsystems release their `MultiRaft` Arc
182/// before the loop exits.
183pub async fn start_cluster_subsystems(
184    config: &ClusterConfig,
185    topology: Arc<std::sync::RwLock<crate::topology::ClusterTopology>>,
186    routing: Arc<std::sync::RwLock<crate::routing::RoutingTable>>,
187    transport: Arc<NexarTransport>,
188    raft_multi_raft: Arc<std::sync::Mutex<crate::multi_raft::MultiRaft>>,
189) -> Result<RunningCluster> {
190    let health = ClusterHealth::new();
191    let ctx = BootstrapCtx::new(
192        Arc::clone(&topology),
193        Arc::clone(&routing),
194        Arc::clone(&transport),
195        Arc::clone(&raft_multi_raft),
196        health,
197    );
198
199    let executor = Arc::new(MigrationExecutor::new(
200        Arc::clone(&raft_multi_raft),
201        Arc::clone(&routing),
202        Arc::clone(&topology),
203        Arc::clone(&transport),
204    ));
205
206    let mut registry = SubsystemRegistry::new();
207    register_default_subsystems(&mut registry, config, &ctx, executor)?;
208
209    registry
210        .start_all(&ctx)
211        .await
212        .map_err(|e| crate::error::ClusterError::Storage {
213            detail: format!("subsystem start failed: {e}"),
214        })
215}