runtime-rs 0.1.4

Typed service registry and Tokio lifecycle runtime for boot, reload, background tasks, and graceful shutdown.
Documentation
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use async_trait::async_trait;
#[cfg(feature = "events")]
use runtime_rs::LifecycleBus;
use runtime_rs::{
    Error, Provider, ProviderOrder, Registry, ReloadState, Reloadable, Result, Runnable, Runtime,
    SharedState,
};
use tokio_util::sync::CancellationToken;

#[derive(Clone)]
struct State(Arc<Inner>);

struct Inner {
    shutdown: CancellationToken,
    registry: Registry<State>,
    demo_interval_ms: AtomicU64,
    boot_order: Mutex<Vec<&'static str>>,
    #[cfg(feature = "events")]
    events: LifecycleBus,
}

impl Default for State {
    fn default() -> Self {
        Self(Arc::new(Inner {
            shutdown: CancellationToken::new(),
            registry: Registry::default(),
            demo_interval_ms: AtomicU64::new(1_000),
            boot_order: Mutex::new(Vec::new()),
            #[cfg(feature = "events")]
            events: LifecycleBus::new(),
        }))
    }
}

impl State {
    fn new() -> Self {
        Self::default()
    }

    fn on_shutdown(&self) -> impl Future<Output = ()> + '_ {
        self.0.shutdown.cancelled()
    }

    fn demo_interval(&self) -> Duration {
        Duration::from_millis(self.0.demo_interval_ms.load(Ordering::Relaxed))
    }

    fn speed_up_demo(&self) {
        self.0.demo_interval_ms.store(300, Ordering::Relaxed);
    }

    fn mark_booted(
        &self,
        name: &'static str,
    ) {
        self.0.boot_order.lock().expect("boot order lock poisoned").push(name);
    }

    fn assert_boot_order(
        &self,
        expected: &[&'static str],
    ) {
        let actual = self.0.boot_order.lock().expect("boot order lock poisoned").clone();
        assert_eq!(actual, expected);
        println!("Boot order: {}", actual.join(" -> "));
    }
}

impl SharedState for State {
    fn shutdown_token(&self) -> CancellationToken {
        self.0.shutdown.clone()
    }

    fn registry_ref(&self) -> &Registry<Self> {
        &self.0.registry
    }

    #[cfg(feature = "events")]
    fn events(&self) -> &LifecycleBus {
        &self.0.events
    }
}

#[async_trait]
impl ReloadState for State {
    async fn reload(&self) -> Result<()> {
        println!("State reloaded");
        Ok(())
    }
}

struct IdleService;

struct DbProvider;

struct ControlProvider;

#[async_trait]
impl Provider<State> for IdleService {
    fn name(&self) -> &'static str {
        "idle"
    }

    async fn boot(
        &self,
        state: &State,
    ) -> Result<()> {
        state.mark_booted(self.name());
        println!("Idle Service booted");
        Ok(())
    }

    fn validate(
        &self,
        _state: &State,
    ) -> Result<()> {
        println!("Idle Service validated");
        Ok(())
    }

    fn as_reloadable(&self) -> Option<&dyn Reloadable<State>> {
        Some(self)
    }

    fn as_runnable(self: Arc<Self>) -> Option<Arc<dyn Runnable<State>>> {
        Some(self)
    }
}

#[async_trait]
impl Runnable<State> for IdleService {
    async fn run(
        self: Arc<Self>,
        state: State,
    ) -> Result<()> {
        start_service(state).await
    }
}

#[async_trait]
impl Reloadable<State> for IdleService {
    async fn reload(
        &self,
        state: &State,
    ) -> Result<()> {
        state.speed_up_demo();
        println!("Idle Service reloaded; interval is now 300ms");
        Ok(())
    }
}

#[async_trait]
impl Provider<State> for DbProvider {
    fn name(&self) -> &'static str {
        "db"
    }

    async fn boot(
        &self,
        state: &State,
    ) -> Result<()> {
        state.mark_booted(self.name());
        println!("DB provider booted");
        Ok(())
    }

    fn order(&self) -> ProviderOrder {
        ProviderOrder::new().before::<IdleService>()
    }

    fn validate(
        &self,
        _state: &State,
    ) -> Result<()> {
        println!("DB provider validated");
        Ok(())
    }

    fn as_runnable(self: Arc<Self>) -> Option<Arc<dyn Runnable<State>>> {
        Some(self)
    }
}

#[async_trait]
impl Runnable<State> for DbProvider {
    async fn run(
        self: Arc<Self>,
        state: State,
    ) -> Result<()> {
        run_db_probe(state).await
    }
}

#[async_trait]
impl Provider<State> for ControlProvider {
    fn name(&self) -> &'static str {
        "control"
    }

    async fn boot(
        &self,
        state: &State,
    ) -> Result<()> {
        state.mark_booted(self.name());
        println!("Control provider booted; it will send reload and then shut down");
        Ok(())
    }

    fn validate(
        &self,
        _state: &State,
    ) -> Result<()> {
        println!("Control provider validated");
        Ok(())
    }

    fn as_runnable(self: Arc<Self>) -> Option<Arc<dyn Runnable<State>>> {
        Some(self)
    }
}

#[async_trait]
impl Runnable<State> for ControlProvider {
    async fn run(
        self: Arc<Self>,
        state: State,
    ) -> Result<()> {
        run_control_provider(state).await
    }
}

async fn run_db_probe(state: State) -> Result<()> {
    tokio::select! {
        _ = state.on_shutdown() => Ok(()),
        _ = tokio::time::sleep(Duration::from_millis(500)) => {
            println!("DB provider reports a recoverable error");
            Err(Error::recoverable("temporary database probe failed"))
        }
    }
}

async fn run_control_provider(state: State) -> Result<()> {
    tokio::time::sleep(Duration::from_secs(3)).await;
    println!("Control provider sending reload signal (simulates SIGHUP)");
    state.registry_ref().reload_all(&state).await?;

    tokio::time::sleep(Duration::from_secs(2)).await;
    println!("Control provider initiating shutdown");
    state.initiate_shutdown();

    Ok(())
}

async fn start_service(state: State) -> Result<()> {
    println!("Idle Service starting");

    loop {
        tokio::select! {
            _ = state.on_shutdown() => {
                println!("🔸 Idle Service got shutdown signal");
                return Ok(());
            }
            _ = tokio::time::sleep(state.demo_interval()) => {
                println!("Idle Service is working");
            }
        }
    }
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
    let state = State::new();
    state.registry_ref().insert(Arc::new(IdleService));
    state.registry_ref().insert(Arc::new(DbProvider));
    state.registry_ref().insert(Arc::new(ControlProvider));
    println!("Planned boot order: {}", state.registry_ref().lifecycle_names()?.join(" -> "));
    state.registry_ref().boot_all(&state).await?;
    state.assert_boot_order(&["db", "idle", "control"]);
    state.registry_ref().validate_all(&state)?;

    let provider = state.registry_ref().resolve::<IdleService>().expect("IdleProvider registered");
    println!("Resolved provider: {}", provider.name());

    let mut runtime = Runtime::<State>::default();
    runtime.spawn_all(state.registry_ref(), state.clone());

    runtime.wait_until_shutdown(&state).await?;
    println!("Waiting for all services to finish gracefully..");
    runtime.drain().await?;
    println!("All services finished gracefully");
    Ok(())
}