polyc_runtime/
supervise.rs1use tokio::task::JoinSet;
9use tokio_util::sync::CancellationToken;
10
11use crate::health::Health;
12
13pub async fn until_shutdown(
24 mut servers: JoinSet<anyhow::Result<()>>,
25 health: &Health,
26 shutdown: &CancellationToken,
27) -> anyhow::Result<()> {
28 let early = tokio::select! {
29 biased;
32 () = shutdown.cancelled() => None,
33 res = servers.join_next() => res,
34 };
35 health.set_ready(false);
36 shutdown.cancel();
37
38 let mut failure = early.map(|res| {
39 task_failure(res).map_or_else(
40 || anyhow::anyhow!("server exited cleanly before shutdown"),
41 |err| err.context("server exited before shutdown"),
42 )
43 });
44 if failure.is_some() {
45 tracing::error!("server task exited before shutdown; draining and exiting");
46 } else {
47 tracing::info!("shutdown signal received; draining in-flight work");
48 }
49
50 while let Some(res) = servers.join_next().await {
51 if let Some(err) = task_failure(res) {
52 tracing::error!(error = ?err, "server task failed during drain");
53 failure.get_or_insert(err);
54 }
55 }
56 failure.map_or(Ok(()), Err)
57}
58
59fn task_failure(res: Result<anyhow::Result<()>, tokio::task::JoinError>) -> Option<anyhow::Error> {
62 match res {
63 Ok(Ok(())) => None,
64 Ok(Err(err)) => Some(err),
65 Err(join) => Some(anyhow::Error::new(join).context("server task panicked")),
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 #![allow(clippy::pedantic, clippy::nursery, missing_docs)]
72
73 use tokio::task::JoinSet;
74 use tokio_util::sync::CancellationToken;
75
76 use super::until_shutdown;
77 use crate::health::Health;
78
79 #[tokio::test]
80 async fn graceful_shutdown_drains_and_returns_ok() {
81 let health = Health::new();
82 let shutdown = CancellationToken::new();
83 let mut servers = JoinSet::new();
84 for _ in 0..2 {
85 let shutdown = shutdown.clone();
86 servers.spawn(async move {
87 shutdown.cancelled().await;
88 Ok(())
89 });
90 }
91 health.set_ready(true);
92
93 shutdown.cancel();
94 until_shutdown(servers, &health, &shutdown)
95 .await
96 .expect("graceful shutdown is not an error");
97 }
98
99 #[tokio::test]
100 async fn failing_server_is_fatal_and_cancels_the_rest() {
101 let health = Health::new();
102 let shutdown = CancellationToken::new();
103 let mut servers = JoinSet::new();
104 servers.spawn(async { Err(anyhow::anyhow!("bind lost")) });
105 let peer = shutdown.clone();
108 servers.spawn(async move {
109 peer.cancelled().await;
110 Ok(())
111 });
112 health.set_ready(true);
113
114 let err = until_shutdown(servers, &health, &shutdown)
115 .await
116 .expect_err("a dead server is fatal");
117 assert!(err.to_string().contains("server exited before shutdown"));
118 assert!(
119 shutdown.is_cancelled(),
120 "remaining servers are told to stop"
121 );
122 }
123
124 #[tokio::test]
125 async fn clean_early_exit_is_still_fatal() {
126 let health = Health::new();
127 let shutdown = CancellationToken::new();
128 let mut servers = JoinSet::new();
129 servers.spawn(async { Ok(()) });
130
131 let err = until_shutdown(servers, &health, &shutdown)
132 .await
133 .expect_err("servers run until told to stop");
134 assert!(err.to_string().contains("exited cleanly before shutdown"));
135 }
136}