fastn_net/graceful.rs
1//! Graceful shutdown management for async services.
2//!
3//! This module provides the [`Graceful`] type for coordinating clean shutdown
4//! of async tasks in network services. It ensures all spawned tasks complete
5//! before the service exits.
6//!
7//! # Overview
8//!
9//! When building network services, you often spawn multiple async tasks for
10//! handling connections, background work, etc. The `Graceful` type helps you:
11//!
12//! - Signal all tasks to stop via cancellation tokens
13//! - Track all spawned tasks to ensure they complete
14//! - Coordinate shutdown across multiple components
15//!
16//! # Example: Basic HTTP Server with Graceful Shutdown
17//!
18//! ```no_run
19//! use fastn_net::Graceful;
20//! use tokio::net::TcpListener;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! let graceful = Graceful::new();
24//!
25//! // Spawn a server task
26//! let server_graceful = graceful.clone();
27//! graceful.spawn(async move {
28//!     let listener = TcpListener::bind("127.0.0.1:8080").await?;
29//!     
30//!     loop {
31//!         tokio::select! {
32//!             // Accept new connections
33//!             Ok((stream, _)) = listener.accept() => {
34//!                 // Handle connection in a tracked task
35//!                 server_graceful.spawn(async move {
36//!                     // Process the connection...
37//!                     Ok::<(), eyre::Error>(())
38//!                 });
39//!             }
40//!             // Stop accepting when cancelled
41//!             _ = server_graceful.cancelled() => {
42//!                 println!("Server shutting down...");
43//!                 break;
44//!             }
45//!         }
46//!     }
47//!     Ok::<(), eyre::Error>(())
48//! });
49//!
50//! // In your main or signal handler:
51//! // graceful.shutdown().await;
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! # Example: P2P Service with Multiple Components
57//!
58//! ```no_run
59//! use fastn_net::{Graceful, global_iroh_endpoint};
60//!
61//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
62//! let graceful = Graceful::new();
63//! let endpoint = global_iroh_endpoint().await;
64//!
65//! // Component 1: Accept incoming P2P connections
66//! let p2p_graceful = graceful.clone();
67//! graceful.spawn(async move {
68//!     while let Some(conn) = endpoint.accept().await {
69//!         tokio::select! {
70//!             _ = p2p_graceful.cancelled() => {
71//!                 break;
72//!             }
73//!             else => {
74//!                 // Handle each connection in a tracked task
75//!                 p2p_graceful.spawn(async move {
76//!                     // Process P2P connection...
77//!                     Ok::<(), eyre::Error>(())
78//!                 });
79//!             }
80//!         }
81//!     }
82//!     Ok::<(), eyre::Error>(())
83//! });
84//!
85//! // Component 2: HTTP API server
86//! let api_graceful = graceful.clone();
87//! graceful.spawn(async move {
88//!     // Run HTTP server with cancellation check
89//!     loop {
90//!         tokio::select! {
91//!             _ = api_graceful.cancelled() => {
92//!                 break;
93//!             }
94//!             _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
95//!                 // Handle HTTP requests...
96//!             }
97//!         }
98//!     }
99//!     Ok::<(), eyre::Error>(())
100//! });
101//!
102//! // Graceful shutdown on Ctrl+C
103//! tokio::select! {
104//!     _ = tokio::signal::ctrl_c() => {
105//!         println!("Shutting down gracefully...");
106//!         graceful.shutdown().await?;
107//!         println!("All tasks completed");
108//!     }
109//! }
110//! # Ok(())
111//! # }
112//! ```
113//!
114//! # Best Practices
115//!
116//! 1. **Clone for each component**: Each async task or component should get
117//!    its own clone of `Graceful` to spawn sub-tasks.
118//!
119//! 2. **Check cancellation in loops**: Long-running loops should use
120//!    `select!` with `cancelled()` for proper cancellation handling.
121//!
122//! 3. **Use spawn() for all tasks**: Always use `graceful.spawn()` instead of
123//!    `tokio::spawn()` to ensure tasks are tracked.
124//!
125//! 4. **Handle errors**: Tasks spawned with `spawn()` should return `Result`
126//!    to properly propagate errors during shutdown.
127//!
128//! 5. **Shutdown order**: Call `shutdown()` from your main function or signal
129//!    handler, which will:
130//!    - Cancel all tasks via the cancellation token
131//!    - Wait for all tracked tasks to complete
132//!    - Return any errors from failed tasks
133
134use eyre::Context;
135use tokio::task::JoinHandle;
136
137/// Manages graceful shutdown of async tasks.
138///
139/// Combines cancellation signaling with task tracking to ensure
140/// clean shutdown of all spawned tasks. Clone this freely - all
141/// clones share the same underlying state.
142#[derive(Clone)]
143pub struct Graceful {
144    cancel: tokio_util::sync::CancellationToken,
145    tracker: tokio_util::task::TaskTracker,
146    show_info_tx: tokio::sync::watch::Sender<bool>,
147    show_info_rx: tokio::sync::watch::Receiver<bool>,
148}
149
150impl Default for Graceful {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156impl Graceful {
157    pub fn new() -> Self {
158        let (show_info_tx, show_info_rx) = tokio::sync::watch::channel(false);
159
160        Self {
161            cancel: tokio_util::sync::CancellationToken::new(),
162            tracker: tokio_util::task::TaskTracker::new(),
163            show_info_tx,
164            show_info_rx,
165        }
166    }
167
168    pub async fn show_info(&mut self) -> eyre::Result<()> {
169        self.show_info_rx
170            .changed()
171            .await
172            .map_err(|e| eyre::anyhow!("failed to get show info signal: {e:?}"))
173    }
174
175    #[inline]
176    #[track_caller]
177    pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
178    where
179        F: Future + Send + 'static,
180        F::Output: Send + 'static,
181    {
182        self.tracker.spawn(task)
183    }
184
185    pub async fn shutdown(&self) -> eyre::Result<()> {
186        loop {
187            tokio::signal::ctrl_c()
188                .await
189                .wrap_err_with(|| "failed to get ctrl-c signal handler")?;
190
191            tracing::info!("Received ctrl-c signal, showing info.");
192            tracing::info!("Pending tasks: {}", self.tracker.len());
193
194            self.show_info_tx
195                .send(true)
196                .inspect_err(|e| tracing::error!("failed to send show info signal: {e:?}"))?;
197
198            tokio::select! {
199                _ = tokio::signal::ctrl_c() => {
200                    tracing::info!("Received second ctrl-c signal, shutting down.");
201                    tracing::debug!("Pending tasks: {}", self.tracker.len());
202
203                    self.cancel.cancel();
204                    self.tracker.close();
205
206                    let mut count = 0;
207                    loop {
208                        tokio::select! {
209                            _ = self.tracker.wait() => {
210                                tracing::info!("All tasks have exited.");
211                                break;
212                            }
213                            _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
214                                count += 1;
215                                if count > 10 {
216                                    eprintln!("Timeout expired, {} pending tasks. Exiting...", self.tracker.len());
217                                    break;
218                                }
219                                tracing::debug!("Pending tasks: {}", self.tracker.len());
220                            }
221                        }
222                    }
223                    break;
224                }
225                _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
226                    tracing::info!("Timeout expired. Continuing...");
227                    println!("Did not receive ctrl+c within 3 secs. Press ctrl+c in quick succession to exit.");
228                }
229            }
230        }
231
232        Ok(())
233    }
234
235    pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> {
236        self.cancel.cancelled()
237    }
238}