Skip to main content

polyc_runtime/
supervise.rs

1//! Server-task supervision: run a binary's spawned servers until a shutdown
2//! signal, treating any earlier exit as fatal.
3//!
4//! A process whose serving task has died must flip readiness off and exit
5//! (so the orchestrator restarts it) rather than keep answering `/readyz`
6//! over a dead surface.
7
8use tokio::task::JoinSet;
9use tokio_util::sync::CancellationToken;
10
11use crate::health::Health;
12
13/// Block until `shutdown` is cancelled or any task in `servers` finishes.
14///
15/// Either way, readiness is flipped off, `shutdown` is cancelled, and the
16/// remaining tasks are drained. A task finishing before the shutdown signal —
17/// even cleanly — is an error: servers run until told to stop.
18///
19/// # Errors
20///
21/// Returns the first server failure: an exit before shutdown, a serve error
22/// surfaced during drain, or a panic.
23pub async fn until_shutdown(
24    mut servers: JoinSet<anyhow::Result<()>>,
25    health: &Health,
26    shutdown: &CancellationToken,
27) -> anyhow::Result<()> {
28    let early = tokio::select! {
29        // Biased so an already-delivered shutdown signal reads as graceful
30        // even when a server task has (consequently) already finished.
31        biased;
32        () = shutdown.cancelled() => None,
33        res = servers.join_next() => res,
34    };
35    health.set_ready(false);
36    shutdown.cancel();
37
38    let mut failure = early.map(|res| {
39        task_failure(res).map_or_else(
40            || anyhow::anyhow!("server exited cleanly before shutdown"),
41            |err| err.context("server exited before shutdown"),
42        )
43    });
44    if failure.is_some() {
45        tracing::error!("server task exited before shutdown; draining and exiting");
46    } else {
47        tracing::info!("shutdown signal received; draining in-flight work");
48    }
49
50    while let Some(res) = servers.join_next().await {
51        if let Some(err) = task_failure(res) {
52            tracing::error!(error = ?err, "server task failed during drain");
53            failure.get_or_insert(err);
54        }
55    }
56    failure.map_or(Ok(()), Err)
57}
58
59/// The error inside a finished task's join outcome, if any: a serve error or
60/// a panic. A clean `Ok(())` exit returns `None`.
61fn task_failure(res: Result<anyhow::Result<()>, tokio::task::JoinError>) -> Option<anyhow::Error> {
62    match res {
63        Ok(Ok(())) => None,
64        Ok(Err(err)) => Some(err),
65        Err(join) => Some(anyhow::Error::new(join).context("server task panicked")),
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    #![allow(clippy::pedantic, clippy::nursery, missing_docs)]
72
73    use tokio::task::JoinSet;
74    use tokio_util::sync::CancellationToken;
75
76    use super::until_shutdown;
77    use crate::health::Health;
78
79    #[tokio::test]
80    async fn graceful_shutdown_drains_and_returns_ok() {
81        let health = Health::new();
82        let shutdown = CancellationToken::new();
83        let mut servers = JoinSet::new();
84        for _ in 0..2 {
85            let shutdown = shutdown.clone();
86            servers.spawn(async move {
87                shutdown.cancelled().await;
88                Ok(())
89            });
90        }
91        health.set_ready(true);
92
93        shutdown.cancel();
94        until_shutdown(servers, &health, &shutdown)
95            .await
96            .expect("graceful shutdown is not an error");
97    }
98
99    #[tokio::test]
100    async fn failing_server_is_fatal_and_cancels_the_rest() {
101        let health = Health::new();
102        let shutdown = CancellationToken::new();
103        let mut servers = JoinSet::new();
104        servers.spawn(async { Err(anyhow::anyhow!("bind lost")) });
105        // The healthy peer must be drained via the cancelled token, proving
106        // the early exit propagates shutdown to the rest.
107        let peer = shutdown.clone();
108        servers.spawn(async move {
109            peer.cancelled().await;
110            Ok(())
111        });
112        health.set_ready(true);
113
114        let err = until_shutdown(servers, &health, &shutdown)
115            .await
116            .expect_err("a dead server is fatal");
117        assert!(err.to_string().contains("server exited before shutdown"));
118        assert!(
119            shutdown.is_cancelled(),
120            "remaining servers are told to stop"
121        );
122    }
123
124    #[tokio::test]
125    async fn clean_early_exit_is_still_fatal() {
126        let health = Health::new();
127        let shutdown = CancellationToken::new();
128        let mut servers = JoinSet::new();
129        servers.spawn(async { Ok(()) });
130
131        let err = until_shutdown(servers, &health, &shutdown)
132            .await
133            .expect_err("servers run until told to stop");
134        assert!(err.to_string().contains("exited cleanly before shutdown"));
135    }
136}