polyc-runtime 0.1.3

Shared Unix-coherence runtime for polychrome binaries: logging, health/metrics side-server, signals.
Documentation
//! Server-task supervision: run a binary's spawned servers until a shutdown
//! signal, treating any earlier exit as fatal.
//!
//! A process whose serving task has died must flip readiness off and exit
//! (so the orchestrator restarts it) rather than keep answering `/readyz`
//! over a dead surface.

use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;

use crate::health::Health;

/// Block until `shutdown` is cancelled or any task in `servers` finishes.
///
/// Either way, readiness is flipped off, `shutdown` is cancelled, and the
/// remaining tasks are drained. A task finishing before the shutdown signal —
/// even cleanly — is an error: servers run until told to stop.
///
/// # Errors
///
/// Returns the first server failure: an exit before shutdown, a serve error
/// surfaced during drain, or a panic.
pub async fn until_shutdown(
    mut servers: JoinSet<anyhow::Result<()>>,
    health: &Health,
    shutdown: &CancellationToken,
) -> anyhow::Result<()> {
    let early = tokio::select! {
        // Biased so an already-delivered shutdown signal reads as graceful
        // even when a server task has (consequently) already finished.
        biased;
        () = shutdown.cancelled() => None,
        res = servers.join_next() => res,
    };
    health.set_ready(false);
    shutdown.cancel();

    let mut failure = early.map(|res| {
        task_failure(res).map_or_else(
            || anyhow::anyhow!("server exited cleanly before shutdown"),
            |err| err.context("server exited before shutdown"),
        )
    });
    if failure.is_some() {
        tracing::error!("server task exited before shutdown; draining and exiting");
    } else {
        tracing::info!("shutdown signal received; draining in-flight work");
    }

    while let Some(res) = servers.join_next().await {
        if let Some(err) = task_failure(res) {
            tracing::error!(error = ?err, "server task failed during drain");
            failure.get_or_insert(err);
        }
    }
    failure.map_or(Ok(()), Err)
}

/// The error inside a finished task's join outcome, if any: a serve error or
/// a panic. A clean `Ok(())` exit returns `None`.
fn task_failure(res: Result<anyhow::Result<()>, tokio::task::JoinError>) -> Option<anyhow::Error> {
    match res {
        Ok(Ok(())) => None,
        Ok(Err(err)) => Some(err),
        Err(join) => Some(anyhow::Error::new(join).context("server task panicked")),
    }
}

#[cfg(test)]
mod tests {
    #![allow(clippy::pedantic, clippy::nursery, missing_docs)]

    use tokio::task::JoinSet;
    use tokio_util::sync::CancellationToken;

    use super::until_shutdown;
    use crate::health::Health;

    #[tokio::test]
    async fn graceful_shutdown_drains_and_returns_ok() {
        let health = Health::new();
        let shutdown = CancellationToken::new();
        let mut servers = JoinSet::new();
        for _ in 0..2 {
            let shutdown = shutdown.clone();
            servers.spawn(async move {
                shutdown.cancelled().await;
                Ok(())
            });
        }
        health.set_ready(true);

        shutdown.cancel();
        until_shutdown(servers, &health, &shutdown)
            .await
            .expect("graceful shutdown is not an error");
    }

    #[tokio::test]
    async fn failing_server_is_fatal_and_cancels_the_rest() {
        let health = Health::new();
        let shutdown = CancellationToken::new();
        let mut servers = JoinSet::new();
        servers.spawn(async { Err(anyhow::anyhow!("bind lost")) });
        // The healthy peer must be drained via the cancelled token, proving
        // the early exit propagates shutdown to the rest.
        let peer = shutdown.clone();
        servers.spawn(async move {
            peer.cancelled().await;
            Ok(())
        });
        health.set_ready(true);

        let err = until_shutdown(servers, &health, &shutdown)
            .await
            .expect_err("a dead server is fatal");
        assert!(err.to_string().contains("server exited before shutdown"));
        assert!(
            shutdown.is_cancelled(),
            "remaining servers are told to stop"
        );
    }

    #[tokio::test]
    async fn clean_early_exit_is_still_fatal() {
        let health = Health::new();
        let shutdown = CancellationToken::new();
        let mut servers = JoinSet::new();
        servers.spawn(async { Ok(()) });

        let err = until_shutdown(servers, &health, &shutdown)
            .await
            .expect_err("servers run until told to stop");
        assert!(err.to_string().contains("exited cleanly before shutdown"));
    }
}