use std::sync::Arc;
use std::time::Duration;
use nodedb_types::NodeId;
use crate::catalog::ClusterCatalog;
use crate::error::Result;
use crate::lifecycle_state::ClusterLifecycleTracker;
use crate::migration_executor::MigrationExecutor;
use crate::reachability::driver::ReachabilityDriverConfig;
use crate::rebalancer::driver::RebalancerLoopConfig;
use crate::subsystem::context::BootstrapCtx;
use crate::subsystem::health::ClusterHealth;
use crate::subsystem::{
DecommissionSubsystem, ReachabilitySubsystem, RebalancerSubsystem, RunningCluster,
SubsystemRegistry, SwimSubsystem, SwimSubsystemConfig,
};
use crate::transport::NexarTransport;
use super::bootstrap_fn::bootstrap;
use super::config::{ClusterConfig, ClusterState};
use super::join::join;
use super::probe::should_bootstrap;
use super::restart::restart;
pub fn register_default_subsystems(
registry: &mut SubsystemRegistry,
config: &ClusterConfig,
ctx: &BootstrapCtx,
executor: Arc<MigrationExecutor>,
) -> crate::error::Result<()> {
let swim_cfg = SwimSubsystemConfig {
swim: crate::swim::config::SwimConfig::default(),
local_id: NodeId::try_new(config.node_id.to_string()).map_err(|e| {
crate::error::ClusterError::Config {
detail: format!("node_id is not a valid ID: {e}"),
}
})?,
swim_addr: config.swim_udp_addr.unwrap_or_else(|| {
let mut a = config.listen_addr;
a.set_port(0);
a
}),
seeds: config.seed_nodes.clone(),
};
registry.register(Arc::new(SwimSubsystem::new(
swim_cfg,
Arc::clone(&ctx.routing),
Arc::clone(&ctx.topology),
vec![],
)));
registry.register(Arc::new(ReachabilitySubsystem::new(
ReachabilityDriverConfig::default(),
)));
registry.register(Arc::new(DecommissionSubsystem::new(
ctx.transport.node_id(),
Duration::from_secs(5),
)));
registry.register(Arc::new(RebalancerSubsystem::new(
RebalancerLoopConfig::default(),
executor,
)));
Ok(())
}
pub async fn start_cluster(
config: &ClusterConfig,
catalog: &ClusterCatalog,
transport: Arc<NexarTransport>,
lifecycle: &ClusterLifecycleTracker,
) -> Result<ClusterState> {
let cluster_state = if catalog.is_bootstrapped()? {
lifecycle.to_restarting();
restart(config, catalog, &transport).inspect_err(|e| {
lifecycle.to_failed(format!("restart failed: {e}"));
})?
} else {
let is_seed = config.seed_nodes.contains(&config.listen_addr);
if is_seed && should_bootstrap(config, &transport).await {
lifecycle.to_bootstrapping();
bootstrap(config, catalog, transport.local_spki_pin()).inspect_err(|e| {
lifecycle.to_failed(format!("bootstrap failed: {e}"));
})?
} else {
join(config, catalog, &transport, lifecycle).await?
}
};
Ok(cluster_state)
}
pub async fn start_cluster_subsystems(
config: &ClusterConfig,
topology: Arc<std::sync::RwLock<crate::topology::ClusterTopology>>,
routing: Arc<std::sync::RwLock<crate::routing::RoutingTable>>,
transport: Arc<NexarTransport>,
raft_multi_raft: Arc<std::sync::Mutex<crate::multi_raft::MultiRaft>>,
) -> Result<RunningCluster> {
let health = ClusterHealth::new();
let ctx = BootstrapCtx::new(
Arc::clone(&topology),
Arc::clone(&routing),
Arc::clone(&transport),
Arc::clone(&raft_multi_raft),
health,
);
let executor = Arc::new(MigrationExecutor::new(
Arc::clone(&raft_multi_raft),
Arc::clone(&routing),
Arc::clone(&topology),
Arc::clone(&transport),
));
let mut registry = SubsystemRegistry::new();
register_default_subsystems(&mut registry, config, &ctx, executor)?;
registry
.start_all(&ctx)
.await
.map_err(|e| crate::error::ClusterError::Storage {
detail: format!("subsystem start failed: {e}"),
})
}