use std::sync::Arc;
use std::time::{Duration, Instant};
use super::context::BootstrapCtx;
use super::errors::{BootstrapError, ShutdownError};
use super::health::ClusterHealth;
use super::topo_sort::topo_sort;
use super::r#trait::{ClusterSubsystem, SubsystemHandle};
#[derive(Default)]
pub struct SubsystemRegistry {
subsystems: Vec<Arc<dyn ClusterSubsystem>>,
}
impl SubsystemRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.subsystems.is_empty()
}
pub fn register(&mut self, subsystem: Arc<dyn ClusterSubsystem>) {
self.subsystems.push(subsystem);
}
pub async fn start_all(&self, ctx: &BootstrapCtx) -> Result<RunningCluster, BootstrapError> {
let order = topo_sort(&self.subsystems)?;
let mut handles: Vec<SubsystemHandle> = Vec::with_capacity(order.len());
let mut started_names: Vec<&'static str> = Vec::with_capacity(order.len());
for idx in &order {
let subsystem = &self.subsystems[*idx];
let name = subsystem.name();
ctx.health
.set(name, crate::subsystem::health::SubsystemHealth::Starting)
.await;
match subsystem.start(ctx).await {
Ok(handle) => {
ctx.health
.set(name, crate::subsystem::health::SubsystemHealth::Running)
.await;
handles.push(handle);
started_names.push(name);
}
Err(start_err) => {
ctx.health
.set(
name,
crate::subsystem::health::SubsystemHealth::Failed {
reason: start_err.to_string(),
},
)
.await;
let shutdown_deadline = Instant::now() + SHUTDOWN_CLEANUP_DEADLINE;
let mut shutdown_errors = Vec::new();
for handle in handles.drain(..).rev() {
if let Err(e) = handle.shutdown_and_wait(shutdown_deadline).await {
shutdown_errors.push(e);
}
}
if shutdown_errors.is_empty() {
return Err(start_err);
} else {
return Err(BootstrapError::StartAndShutdownFailure {
name,
shutdown_errors,
});
}
}
}
}
Ok(RunningCluster {
handles,
health: ctx.health.clone(),
})
}
}
const SHUTDOWN_CLEANUP_DEADLINE: Duration = Duration::from_secs(10);
pub struct RunningCluster {
pub handles: Vec<SubsystemHandle>,
pub health: ClusterHealth,
}
impl RunningCluster {
pub async fn shutdown_all(self, per_subsystem_deadline: Duration) -> Vec<ShutdownError> {
let mut errors = Vec::new();
for handle in self.handles.into_iter().rev() {
let deadline = Instant::now() + per_subsystem_deadline;
if let Err(e) = handle.shutdown_and_wait(deadline).await {
errors.push(e);
}
}
errors
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use tokio::sync::watch;
use super::*;
use crate::subsystem::context::BootstrapCtx;
use crate::subsystem::errors::{BootstrapError, ShutdownError};
use crate::subsystem::health::SubsystemHealth;
use crate::subsystem::r#trait::{ClusterSubsystem, SubsystemHandle};
struct NamedSubsystem {
name: &'static str,
deps: &'static [&'static str],
}
#[async_trait]
impl ClusterSubsystem for NamedSubsystem {
fn name(&self) -> &'static str {
self.name
}
fn dependencies(&self) -> &'static [&'static str] {
self.deps
}
async fn start(&self, _ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
let (tx, _rx) = watch::channel(false);
let handle = tokio::spawn(async {});
Ok(SubsystemHandle::new(self.name, handle, tx))
}
async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
Ok(())
}
fn health(&self) -> SubsystemHealth {
SubsystemHealth::Running
}
}
struct FailingSubsystem {
name: &'static str,
}
#[async_trait]
impl ClusterSubsystem for FailingSubsystem {
fn name(&self) -> &'static str {
self.name
}
fn dependencies(&self) -> &'static [&'static str] {
&[]
}
async fn start(&self, _ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
Err(BootstrapError::SubsystemStart {
name: self.name,
cause: "intentional test failure".into(),
})
}
async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
Ok(())
}
fn health(&self) -> SubsystemHealth {
SubsystemHealth::Failed {
reason: "intentional".into(),
}
}
}
#[test]
fn empty_registry_topo_sorts_cleanly() {
let registry = SubsystemRegistry::new();
let order = topo_sort(®istry.subsystems).unwrap();
assert!(order.is_empty());
}
#[test]
fn registry_topo_sorts_dependencies_correctly() {
let mut registry = SubsystemRegistry::new();
registry.register(Arc::new(NamedSubsystem {
name: "beta",
deps: &["alpha"],
}));
registry.register(Arc::new(NamedSubsystem {
name: "alpha",
deps: &[],
}));
let order = topo_sort(®istry.subsystems).unwrap();
let alpha_pos = order
.iter()
.position(|&i| registry.subsystems[i].name() == "alpha")
.unwrap();
let beta_pos = order
.iter()
.position(|&i| registry.subsystems[i].name() == "beta")
.unwrap();
assert!(alpha_pos < beta_pos, "alpha must precede beta");
}
#[tokio::test]
async fn failing_subsystem_start_returns_bootstrap_error() {
let mut registry = SubsystemRegistry::new();
registry.register(Arc::new(FailingSubsystem { name: "broken" }));
let order = topo_sort(®istry.subsystems).unwrap();
assert_eq!(order.len(), 1, "one subsystem registered");
}
}