use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
#[cfg(feature = "events")]
use runtime_rs::LifecycleBus;
use runtime_rs::{
Error, Provider, ProviderOrder, Registry, ReloadState, Reloadable, Result, Runnable, Runtime,
SharedState,
};
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
struct State(Arc<Inner>);
struct Inner {
shutdown: CancellationToken,
registry: Registry<State>,
demo_interval_ms: AtomicU64,
boot_order: Mutex<Vec<&'static str>>,
#[cfg(feature = "events")]
events: LifecycleBus,
}
impl Default for State {
fn default() -> Self {
Self(Arc::new(Inner {
shutdown: CancellationToken::new(),
registry: Registry::default(),
demo_interval_ms: AtomicU64::new(1_000),
boot_order: Mutex::new(Vec::new()),
#[cfg(feature = "events")]
events: LifecycleBus::new(),
}))
}
}
impl State {
fn new() -> Self {
Self::default()
}
fn on_shutdown(&self) -> impl Future<Output = ()> + '_ {
self.0.shutdown.cancelled()
}
fn demo_interval(&self) -> Duration {
Duration::from_millis(self.0.demo_interval_ms.load(Ordering::Relaxed))
}
fn speed_up_demo(&self) {
self.0.demo_interval_ms.store(300, Ordering::Relaxed);
}
fn mark_booted(
&self,
name: &'static str,
) {
self.0.boot_order.lock().expect("boot order lock poisoned").push(name);
}
fn assert_boot_order(
&self,
expected: &[&'static str],
) {
let actual = self.0.boot_order.lock().expect("boot order lock poisoned").clone();
assert_eq!(actual, expected);
println!("Boot order: {}", actual.join(" -> "));
}
}
impl SharedState for State {
fn shutdown_token(&self) -> CancellationToken {
self.0.shutdown.clone()
}
fn registry_ref(&self) -> &Registry<Self> {
&self.0.registry
}
#[cfg(feature = "events")]
fn events(&self) -> &LifecycleBus {
&self.0.events
}
}
#[async_trait]
impl ReloadState for State {
async fn reload(&self) -> Result<()> {
println!("State reloaded");
Ok(())
}
}
struct IdleService;
struct DbProvider;
struct ControlProvider;
#[async_trait]
impl Provider<State> for IdleService {
fn name(&self) -> &'static str {
"idle"
}
async fn boot(
&self,
state: &State,
) -> Result<()> {
state.mark_booted(self.name());
println!("Idle Service booted");
Ok(())
}
fn validate(
&self,
_state: &State,
) -> Result<()> {
println!("Idle Service validated");
Ok(())
}
fn as_reloadable(&self) -> Option<&dyn Reloadable<State>> {
Some(self)
}
fn as_runnable(self: Arc<Self>) -> Option<Arc<dyn Runnable<State>>> {
Some(self)
}
}
#[async_trait]
impl Runnable<State> for IdleService {
async fn run(
self: Arc<Self>,
state: State,
) -> Result<()> {
start_service(state).await
}
}
#[async_trait]
impl Reloadable<State> for IdleService {
async fn reload(
&self,
state: &State,
) -> Result<()> {
state.speed_up_demo();
println!("Idle Service reloaded; interval is now 300ms");
Ok(())
}
}
#[async_trait]
impl Provider<State> for DbProvider {
fn name(&self) -> &'static str {
"db"
}
async fn boot(
&self,
state: &State,
) -> Result<()> {
state.mark_booted(self.name());
println!("DB provider booted");
Ok(())
}
fn order(&self) -> ProviderOrder {
ProviderOrder::new().before::<IdleService>()
}
fn validate(
&self,
_state: &State,
) -> Result<()> {
println!("DB provider validated");
Ok(())
}
fn as_runnable(self: Arc<Self>) -> Option<Arc<dyn Runnable<State>>> {
Some(self)
}
}
#[async_trait]
impl Runnable<State> for DbProvider {
async fn run(
self: Arc<Self>,
state: State,
) -> Result<()> {
run_db_probe(state).await
}
}
#[async_trait]
impl Provider<State> for ControlProvider {
fn name(&self) -> &'static str {
"control"
}
async fn boot(
&self,
state: &State,
) -> Result<()> {
state.mark_booted(self.name());
println!("Control provider booted; it will send reload and then shut down");
Ok(())
}
fn validate(
&self,
_state: &State,
) -> Result<()> {
println!("Control provider validated");
Ok(())
}
fn as_runnable(self: Arc<Self>) -> Option<Arc<dyn Runnable<State>>> {
Some(self)
}
}
#[async_trait]
impl Runnable<State> for ControlProvider {
async fn run(
self: Arc<Self>,
state: State,
) -> Result<()> {
run_control_provider(state).await
}
}
async fn run_db_probe(state: State) -> Result<()> {
tokio::select! {
_ = state.on_shutdown() => Ok(()),
_ = tokio::time::sleep(Duration::from_millis(500)) => {
println!("DB provider reports a recoverable error");
Err(Error::recoverable("temporary database probe failed"))
}
}
}
async fn run_control_provider(state: State) -> Result<()> {
tokio::time::sleep(Duration::from_secs(3)).await;
println!("Control provider sending reload signal (simulates SIGHUP)");
state.registry_ref().reload_all(&state).await?;
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Control provider initiating shutdown");
state.initiate_shutdown();
Ok(())
}
async fn start_service(state: State) -> Result<()> {
println!("Idle Service starting");
loop {
tokio::select! {
_ = state.on_shutdown() => {
println!("🔸 Idle Service got shutdown signal");
return Ok(());
}
_ = tokio::time::sleep(state.demo_interval()) => {
println!("Idle Service is working");
}
}
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
let state = State::new();
state.registry_ref().insert(Arc::new(IdleService));
state.registry_ref().insert(Arc::new(DbProvider));
state.registry_ref().insert(Arc::new(ControlProvider));
println!("Planned boot order: {}", state.registry_ref().lifecycle_names()?.join(" -> "));
state.registry_ref().boot_all(&state).await?;
state.assert_boot_order(&["db", "idle", "control"]);
state.registry_ref().validate_all(&state)?;
let provider = state.registry_ref().resolve::<IdleService>().expect("IdleProvider registered");
println!("Resolved provider: {}", provider.name());
let mut runtime = Runtime::<State>::default();
runtime.spawn_all(state.registry_ref(), state.clone());
runtime.wait_until_shutdown(&state).await?;
println!("Waiting for all services to finish gracefully..");
runtime.drain().await?;
println!("All services finished gracefully");
Ok(())
}