ruststream 0.3.1

Async messaging framework for Rust: broker-agnostic traits, router, codecs, and a conformance harness for broker authors.
Documentation
//! Running the service: startup sequence, signal handling and graceful shutdown.

use std::{future::Future, sync::Arc, time::Duration};

#[cfg(unix)]
use tokio::signal::unix::{SignalKind, signal};

use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};

use super::{RustStream, RustStreamError};

impl<L> RustStream<L> {
    /// Runs the service until an interrupt (`SIGINT` / `SIGTERM`) is received, then shuts down
    /// gracefully.
    ///
    /// # Errors
    ///
    /// Returns [`RustStreamError`] if a broker fails to connect, a subscription fails to open, a
    /// dispatch task panics, or a broker fails to shut down.
    pub async fn run(self) -> Result<(), RustStreamError> {
        self.run_until(wait_for_signal()).await
    }

    /// Runs the service until `shutdown` resolves, then shuts down gracefully.
    ///
    /// Use this instead of [`run`](Self::run) to drive shutdown from a caller-owned future (a
    /// name, a timeout, a test signal) rather than from process signals.
    ///
    /// # Errors
    ///
    /// Returns [`RustStreamError`] if a broker fails to connect, a subscription fails to open, a
    /// dispatch task panics, or a broker fails to shut down.
    pub async fn run_until<F>(self, shutdown: F) -> Result<(), RustStreamError>
    where
        F: Future<Output = ()> + Send,
    {
        let Self {
            info,
            brokers,
            starters,
            handlers,
            mut state,
            on_startup,
            after_startup,
            on_shutdown,
            after_shutdown,
            shutdown_timeout,
            ..
        } = self;

        info!(
            target: "ruststream::lifecycle",
            service = %info.title,
            version = %info.version,
            brokers = brokers.len(),
            subscribers = starters.len(),
            "starting service",
        );

        if !on_startup.is_empty() {
            debug!(target: "ruststream::lifecycle", count = on_startup.len(), "running on_startup hooks");
        }
        for hook in on_startup {
            state = hook(state).await.map_err(RustStreamError::Startup)?;
        }
        let state = Arc::new(state);

        for broker in &brokers {
            broker.connect().await.map_err(RustStreamError::Connect)?;
            info!(target: "ruststream::lifecycle", broker = broker.name(), "broker connected");
        }

        let token = CancellationToken::new();
        let mut handles = Vec::with_capacity(starters.len());
        for (starter, meta) in starters.into_iter().zip(handlers) {
            let handle = starter(state.clone(), token.clone())
                .await
                .map_err(RustStreamError::Subscribe)?;
            info!(
                target: "ruststream::dispatch",
                subscriber = %meta.name,
                input = meta.input_type,
                "subscriber started",
            );
            handles.push(handle);
        }

        if !after_startup.is_empty() {
            debug!(target: "ruststream::lifecycle", count = after_startup.len(), "running after_startup hooks");
        }
        for hook in after_startup {
            hook(Arc::clone(&state))
                .await
                .map_err(RustStreamError::Startup)?;
        }

        info!(target: "ruststream::lifecycle", subscribers = handles.len(), "service running");

        shutdown.await;
        info!(target: "ruststream::lifecycle", "shutdown signal received");

        for hook in on_shutdown {
            if let Err(err) = hook(Arc::clone(&state)).await {
                warn!(target: "ruststream::lifecycle", error = %err, "on_shutdown hook failed");
            }
        }

        token.cancel();
        debug!(target: "ruststream::lifecycle", "draining in-flight handlers");
        drain_handles(handles, shutdown_timeout).await?;

        for broker in brokers.iter().rev() {
            broker.shutdown().await.map_err(RustStreamError::Shutdown)?;
            debug!(target: "ruststream::lifecycle", broker = broker.name(), "broker shut down");
        }

        for hook in after_shutdown {
            if let Err(err) = hook(Arc::clone(&state)).await {
                warn!(target: "ruststream::lifecycle", error = %err, "after_shutdown hook failed");
            }
        }
        info!(target: "ruststream::lifecycle", "service stopped");
        Ok(())
    }
}

/// Awaits all handler tasks, bounded by `timeout` if set. On timeout the remaining tasks are
/// aborted; without a timeout, a panicking task surfaces as [`RustStreamError::Join`].
async fn drain_handles(
    handles: Vec<JoinHandle<()>>,
    timeout: Option<Duration>,
) -> Result<(), RustStreamError> {
    let Some(timeout) = timeout else {
        for handle in handles {
            handle.await.map_err(RustStreamError::Join)?;
        }
        return Ok(());
    };

    let aborts: Vec<_> = handles.iter().map(JoinHandle::abort_handle).collect();
    if tokio::time::timeout(timeout, futures::future::join_all(handles))
        .await
        .is_err()
    {
        warn!(
            target: "ruststream::lifecycle",
            "graceful shutdown timed out; aborting in-flight handlers",
        );
        for abort in aborts {
            abort.abort();
        }
    }
    Ok(())
}

async fn wait_for_signal() {
    #[cfg(unix)]
    {
        let Ok(mut term) = signal(SignalKind::terminate()) else {
            let _ = tokio::signal::ctrl_c().await;
            return;
        };
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {}
            _ = term.recv() => {}
        }
    }
    #[cfg(not(unix))]
    {
        let _ = tokio::signal::ctrl_c().await;
    }
}