fastn_net/
graceful.rs

1//! Graceful shutdown management for async services.
2//!
3//! This module provides the [`Graceful`] type for coordinating clean shutdown
4//! of async tasks in network services. It ensures all spawned tasks complete
5//! before the service exits.
6//!
7//! # Overview
8//!
9//! When building network services, you often spawn multiple async tasks for
10//! handling connections, background work, etc. The `Graceful` type helps you:
11//!
12//! - Signal all tasks to stop via cancellation tokens
13//! - Track all spawned tasks to ensure they complete
14//! - Coordinate shutdown across multiple components
15//!
16//! # Example: Basic HTTP Server with Graceful Shutdown
17//!
18//! ```no_run
19//! use fastn_net::Graceful;
20//! use tokio::net::TcpListener;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! let graceful = Graceful::new();
24//!
25//! // Spawn a server task
26//! let server_graceful = graceful.clone();
27//! graceful.spawn(async move {
28//!     let listener = TcpListener::bind("127.0.0.1:8080").await?;
29//!     
30//!     loop {
31//!         tokio::select! {
32//!             // Accept new connections
33//!             Ok((stream, _)) = listener.accept() => {
34//!                 // Handle connection in a tracked task
35//!                 server_graceful.spawn(async move {
36//!                     // Process the connection...
37//!                     Ok::<(), eyre::Error>(())
38//!                 });
39//!             }
40//!             // Stop accepting when cancelled
41//!             _ = server_graceful.cancelled() => {
42//!                 println!("Server shutting down...");
43//!                 break;
44//!             }
45//!         }
46//!     }
47//!     Ok::<(), eyre::Error>(())
48//! });
49//!
50//! // In your main or signal handler:
51//! // graceful.shutdown().await;
52//! # Ok(())
53//! # }
54//! ```
55//!
56//! # Example: P2P Service with Multiple Components
57//!
58//! ```no_run
59//! use fastn_net::{Graceful, global_iroh_endpoint};
60//!
61//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
62//! let graceful = Graceful::new();
63//! let endpoint = global_iroh_endpoint().await;
64//!
65//! // Component 1: Accept incoming P2P connections
66//! let p2p_graceful = graceful.clone();
67//! graceful.spawn(async move {
68//!     while let Ok(conn) = endpoint.accept().await {
69//!         if p2p_graceful.is_cancelled() {
70//!             break;
71//!         }
72//!         
73//!         // Handle each connection in a tracked task
74//!         p2p_graceful.spawn(async move {
75//!             // Process P2P connection...
76//!             Ok::<(), eyre::Error>(())
77//!         });
78//!     }
79//!     Ok::<(), eyre::Error>(())
80//! });
81//!
82//! // Component 2: HTTP API server
83//! let api_graceful = graceful.clone();
84//! graceful.spawn(async move {
85//!     // Run HTTP server with cancellation check
86//!     while !api_graceful.is_cancelled() {
87//!         // Handle HTTP requests...
88//!         tokio::time::sleep(std::time::Duration::from_millis(100)).await;
89//!     }
90//!     Ok::<(), eyre::Error>(())
91//! });
92//!
93//! // Graceful shutdown on Ctrl+C
94//! tokio::select! {
95//!     _ = tokio::signal::ctrl_c() => {
96//!         println!("Shutting down gracefully...");
97//!         graceful.shutdown().await;
98//!         println!("All tasks completed");
99//!     }
100//! }
101//! # Ok(())
102//! # }
103//! ```
104//!
105//! # Best Practices
106//!
107//! 1. **Clone for each component**: Each async task or component should get
108//!    its own clone of `Graceful` to spawn sub-tasks.
109//!
110//! 2. **Check cancellation in loops**: Long-running loops should periodically
111//!    check `is_cancelled()` or use `select!` with `cancelled()`.
112//!
113//! 3. **Use spawn() for all tasks**: Always use `graceful.spawn()` instead of
114//!    `tokio::spawn()` to ensure tasks are tracked.
115//!
116//! 4. **Handle errors**: Tasks spawned with `spawn()` should return `Result`
117//!    to properly propagate errors during shutdown.
118//!
119//! 5. **Shutdown order**: Call `shutdown()` from your main function or signal
120//!    handler, which will:
121//!    - Cancel all tasks via the cancellation token
122//!    - Wait for all tracked tasks to complete
123//!    - Return any errors from failed tasks
124
125use eyre::Context;
126use tokio::task::JoinHandle;
127
128/// Manages graceful shutdown of async tasks.
129///
130/// Combines cancellation signaling with task tracking to ensure
131/// clean shutdown of all spawned tasks. Clone this freely - all
132/// clones share the same underlying state.
133#[derive(Clone)]
134pub struct Graceful {
135    cancel: tokio_util::sync::CancellationToken,
136    tracker: tokio_util::task::TaskTracker,
137    show_info_tx: tokio::sync::watch::Sender<bool>,
138    show_info_rx: tokio::sync::watch::Receiver<bool>,
139}
140
141impl Default for Graceful {
142    fn default() -> Self {
143        Self::new()
144    }
145}
146
147impl Graceful {
148    pub fn new() -> Self {
149        let (show_info_tx, show_info_rx) = tokio::sync::watch::channel(false);
150
151        Self {
152            cancel: tokio_util::sync::CancellationToken::new(),
153            tracker: tokio_util::task::TaskTracker::new(),
154            show_info_tx,
155            show_info_rx,
156        }
157    }
158
159    pub async fn show_info(&mut self) -> eyre::Result<()> {
160        self.show_info_rx
161            .changed()
162            .await
163            .map_err(|e| eyre::anyhow!("failed to get show info signal: {e:?}"))
164    }
165
166    #[inline]
167    #[track_caller]
168    pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
169    where
170        F: Future + Send + 'static,
171        F::Output: Send + 'static,
172    {
173        self.tracker.spawn(task)
174    }
175
176    pub async fn shutdown(&self) -> eyre::Result<()> {
177        loop {
178            tokio::signal::ctrl_c()
179                .await
180                .wrap_err_with(|| "failed to get ctrl-c signal handler")?;
181
182            tracing::info!("Received ctrl-c signal, showing info.");
183            tracing::info!("Pending tasks: {}", self.tracker.len());
184
185            self.show_info_tx
186                .send(true)
187                .inspect_err(|e| tracing::error!("failed to send show info signal: {e:?}"))?;
188
189            tokio::select! {
190                _ = tokio::signal::ctrl_c() => {
191                    tracing::info!("Received second ctrl-c signal, shutting down.");
192                    tracing::debug!("Pending tasks: {}", self.tracker.len());
193
194                    self.cancel.cancel();
195                    self.tracker.close();
196
197                    let mut count = 0;
198                    loop {
199                        tokio::select! {
200                            _ = self.tracker.wait() => {
201                                tracing::info!("All tasks have exited.");
202                                break;
203                            }
204                            _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
205                                count += 1;
206                                if count > 10 {
207                                    eprintln!("Timeout expired, {} pending tasks. Exiting...", self.tracker.len());
208                                    break;
209                                }
210                                tracing::debug!("Pending tasks: {}", self.tracker.len());
211                            }
212                        }
213                    }
214                    break;
215                }
216                _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
217                    tracing::info!("Timeout expired. Continuing...");
218                    println!("Did not receive ctrl+c within 3 secs. Press ctrl+c in quick succession to exit.");
219                }
220            }
221        }
222
223        Ok(())
224    }
225
226    pub fn cancelled(&self) -> tokio_util::sync::WaitForCancellationFuture<'_> {
227        self.cancel.cancelled()
228    }
229}