use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use crate::health::Health;
pub async fn until_shutdown(
mut servers: JoinSet<anyhow::Result<()>>,
health: &Health,
shutdown: &CancellationToken,
) -> anyhow::Result<()> {
let early = tokio::select! {
biased;
() = shutdown.cancelled() => None,
res = servers.join_next() => res,
};
health.set_ready(false);
shutdown.cancel();
let mut failure = early.map(|res| {
task_failure(res).map_or_else(
|| anyhow::anyhow!("server exited cleanly before shutdown"),
|err| err.context("server exited before shutdown"),
)
});
if failure.is_some() {
tracing::error!("server task exited before shutdown; draining and exiting");
} else {
tracing::info!("shutdown signal received; draining in-flight work");
}
while let Some(res) = servers.join_next().await {
if let Some(err) = task_failure(res) {
tracing::error!(error = ?err, "server task failed during drain");
failure.get_or_insert(err);
}
}
failure.map_or(Ok(()), Err)
}
fn task_failure(res: Result<anyhow::Result<()>, tokio::task::JoinError>) -> Option<anyhow::Error> {
match res {
Ok(Ok(())) => None,
Ok(Err(err)) => Some(err),
Err(join) => Some(anyhow::Error::new(join).context("server task panicked")),
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::pedantic, clippy::nursery, missing_docs)]
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use super::until_shutdown;
use crate::health::Health;
#[tokio::test]
async fn graceful_shutdown_drains_and_returns_ok() {
let health = Health::new();
let shutdown = CancellationToken::new();
let mut servers = JoinSet::new();
for _ in 0..2 {
let shutdown = shutdown.clone();
servers.spawn(async move {
shutdown.cancelled().await;
Ok(())
});
}
health.set_ready(true);
shutdown.cancel();
until_shutdown(servers, &health, &shutdown)
.await
.expect("graceful shutdown is not an error");
}
#[tokio::test]
async fn failing_server_is_fatal_and_cancels_the_rest() {
let health = Health::new();
let shutdown = CancellationToken::new();
let mut servers = JoinSet::new();
servers.spawn(async { Err(anyhow::anyhow!("bind lost")) });
let peer = shutdown.clone();
servers.spawn(async move {
peer.cancelled().await;
Ok(())
});
health.set_ready(true);
let err = until_shutdown(servers, &health, &shutdown)
.await
.expect_err("a dead server is fatal");
assert!(err.to_string().contains("server exited before shutdown"));
assert!(
shutdown.is_cancelled(),
"remaining servers are told to stop"
);
}
#[tokio::test]
async fn clean_early_exit_is_still_fatal() {
let health = Health::new();
let shutdown = CancellationToken::new();
let mut servers = JoinSet::new();
servers.spawn(async { Ok(()) });
let err = until_shutdown(servers, &health, &shutdown)
.await
.expect_err("servers run until told to stop");
assert!(err.to_string().contains("exited cleanly before shutdown"));
}
}