fastn_net/graceful.rs
1//! Graceful shutdown management for async services.
2//!
3//! This module provides the [`Graceful`] type for coordinating clean shutdown
4//! of async tasks in network services. It ensures all spawned tasks complete
5//! before the service exits.
6//!
7//! # Overview
8//!
9//! When building network services, you often spawn multiple async tasks for
10//! handling connections, background work, etc. The `Graceful` type helps you:
11//!
12//! - Signal all tasks to stop via cancellation tokens
13//! - Track all spawned tasks to ensure they complete
14//! - Coordinate shutdown across multiple components
15//!
16//! # Example: Basic HTTP Server with Graceful Shutdown
17//!
18//! ```no_run
19//! use fastn_net::Graceful;
20//! use tokio::net::TcpListener;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! let graceful = Graceful::new();
24//!
25//! // Spawn a server task
26//! let server_graceful = graceful.clone();
27//! graceful.spawn(async move {
28//! let listener = TcpListener::bind("127.0.0.1:8080").await?;
29//!
30//! loop {
31//! tokio::select! {
32//! // Accept new connections
33//! Ok((stream, _)) = listener.accept() => {
34//! // Handle connection in a tracked task
35//! server_graceful.spawn(async move {
36//! // Process the connection...
37//! Ok::<(), eyre::Error>(())
38//! });
39//! }
40//! // Stop accepting when cancelled
41//! _ = server_graceful.cancelled() => {
42//! println!("Server shutting down...");
43//! break;
44//! }
45//! }
46//! }
47//! Ok::<(), eyre::Error>(())
48//! });
49//!
50//! // In your main or signal handler:
51//! // graceful.shutdown().await;
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! # Example: P2P Service with Multiple Components
57//!
58//! ```no_run
59//! use fastn_net::{Graceful, global_iroh_endpoint};
60//!
61//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
62//! let graceful = Graceful::new();
63//! let endpoint = global_iroh_endpoint().await;
64//!
65//! // Component 1: Accept incoming P2P connections
66//! let p2p_graceful = graceful.clone();
67//! graceful.spawn(async move {
68//! while let Some(conn) = endpoint.accept().await {
69//! tokio::select! {
70//! _ = p2p_graceful.cancelled() => {
71//! break;
72//! }
73//! else => {
74//! // Handle each connection in a tracked task
75//! p2p_graceful.spawn(async move {
76//! // Process P2P connection...
77//! Ok::<(), eyre::Error>(())
78//! });
79//! }
80//! }
81//! }
82//! Ok::<(), eyre::Error>(())
83//! });
84//!
85//! // Component 2: HTTP API server
86//! let api_graceful = graceful.clone();
87//! graceful.spawn(async move {
88//! // Run HTTP server with cancellation check
89//! loop {
90//! tokio::select! {
91//! _ = api_graceful.cancelled() => {
92//! break;
93//! }
94//! _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
95//! // Handle HTTP requests...
96//! }
97//! }
98//! }
99//! Ok::<(), eyre::Error>(())
100//! });
101//!
102//! // Graceful shutdown on Ctrl+C
103//! tokio::select! {
104//! _ = tokio::signal::ctrl_c() => {
105//! println!("Shutting down gracefully...");
106//! graceful.shutdown().await?;
107//! println!("All tasks completed");
108//! }
109//! }
110//! # Ok(())
111//! # }
112//! ```
113//!
114//! # Best Practices
115//!
116//! 1. **Clone for each component**: Each async task or component should get
117//! its own clone of `Graceful` to spawn sub-tasks.
118//!
119//! 2. **Check cancellation in loops**: Long-running loops should use
120//! `select!` with `cancelled()` for proper cancellation handling.
121//!
122//! 3. **Use spawn() for all tasks**: Always use `graceful.spawn()` instead of
123//! `tokio::spawn()` to ensure tasks are tracked.
124//!
125//! 4. **Handle errors**: Tasks spawned with `spawn()` should return `Result`
126//! to properly propagate errors during shutdown.
127//!
128//! 5. **Shutdown order**: Call `shutdown()` from your main function or signal
129//! handler, which will:
130//! - Cancel all tasks via the cancellation token
131//! - Wait for all tracked tasks to complete
132//! - Return any errors from failed tasks
133
134use eyre::Context;
135use tokio::task::JoinHandle;
136
137/// Manages graceful shutdown of async tasks.
138///
139/// Combines cancellation signaling with task tracking to ensure
140/// clean shutdown of all spawned tasks. Clone this freely - all
141/// clones share the same underlying state.
142#[derive(Clone)]
143pub struct Graceful {
144 cancel: tokio_util::sync::CancellationToken,
145 tracker: tokio_util::task::TaskTracker,
146 show_info_tx: tokio::sync::watch::Sender<bool>,
147 show_info_rx: tokio::sync::watch::Receiver<bool>,
148}
149
150impl Default for Graceful {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156impl Graceful {
157 pub fn new() -> Self {
158 let (show_info_tx, show_info_rx) = tokio::sync::watch::channel(false);
159
160 Self {
161 cancel: tokio_util::sync::CancellationToken::new(),
162 tracker: tokio_util::task::TaskTracker::new(),
163 show_info_tx,
164 show_info_rx,
165 }
166 }
167
168 pub async fn show_info(&mut self) -> eyre::Result<()> {
169 self.show_info_rx
170 .changed()
171 .await
172 .map_err(|e| eyre::anyhow!("failed to get show info signal: {e:?}"))
173 }
174
175 #[inline]
176 #[track_caller]
177 pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
178 where
179 F: Future + Send + 'static,
180 F::Output: Send + 'static,
181 {
182 self.tracker.spawn(task)
183 }
184
185 pub async fn shutdown(&self) -> eyre::Result<()> {
186 loop {
187 tokio::signal::ctrl_c()
188 .await
189 .wrap_err_with(|| "failed to get ctrl-c signal handler")?;
190
191 tracing::info!("Received ctrl-c signal, showing info.");
192 tracing::info!("Pending tasks: {}", self.tracker.len());
193
194 self.show_info_tx
195 .send(true)
196 .inspect_err(|e| tracing::error!("failed to send show info signal: {e:?}"))?;
197
198 tokio::select! {
199 _ = tokio::signal::ctrl_c() => {
200 tracing::info!("Received second ctrl-c signal, shutting down.");
201 tracing::debug!("Pending tasks: {}", self.tracker.len());
202
203 self.cancel.cancel();
204 self.tracker.close();
205
206 let mut count = 0;
207 loop {
208 tokio::select! {
209 _ = self.tracker.wait() => {
210 tracing::info!("All tasks have exited.");
211 break;
212 }
213 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
214 count += 1;
215 if count > 10 {
216 eprintln!("Timeout expired, {} pending tasks. Exiting...", self.tracker.len());
217 break;
218 }
219 tracing::debug!("Pending tasks: {}", self.tracker.len());
220 }
221 }
222 }
223 break;
224 }
225 _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
226 tracing::info!("Timeout expired. Continuing...");
227 println!("Did not receive ctrl+c within 3 secs. Press ctrl+c in quick succession to exit.");
228 }
229 }
230 }
231
232 Ok(())
233 }
234
235 pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> {
236 self.cancel.cancelled()
237 }
238}