hyper_server/
handle.rs

1use crate::notify_once::NotifyOnce;
2use std::{
3    net::SocketAddr,
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc, Mutex,
7    },
8    time::Duration,
9};
10use tokio::{sync::Notify, time::sleep};
11
12/// A handle to manage and interact with the server.
13///
14/// `Handle` provides methods to access server information, such as the number of active connections,
15/// and to perform actions like initiating a shutdown.
16#[derive(Clone, Debug, Default)]
17pub struct Handle {
18    inner: Arc<HandleInner>,
19}
20
21#[derive(Debug, Default)]
22struct HandleInner {
23    addr: Mutex<Option<SocketAddr>>,
24    addr_notify: Notify,
25    conn_count: AtomicUsize,
26    shutdown: NotifyOnce,
27    graceful: NotifyOnce,
28    graceful_dur: Mutex<Option<Duration>>,
29    conn_end: NotifyOnce,
30}
31
32impl Handle {
33    /// Create a new handle for the server.
34    ///
35    /// # Returns
36    ///
37    /// A new `Handle` instance.
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    /// Get the number of active connections to the server.
43    ///
44    /// # Returns
45    ///
46    /// The number of active connections.
47    pub fn connection_count(&self) -> usize {
48        self.inner.conn_count.load(Ordering::SeqCst)
49    }
50
51    /// Initiate an immediate shutdown of the server.
52    ///
53    /// This method will terminate the server without waiting for active connections to close.
54    pub fn shutdown(&self) {
55        self.inner.shutdown.notify_waiters();
56    }
57
58    /// Initiate a graceful shutdown of the server.
59    ///
60    /// The server will wait for active connections to close before shutting down. If a duration
61    /// is provided, the server will wait up to that duration for active connections to close
62    /// before forcing a shutdown.
63    ///
64    /// # Parameters
65    ///
66    /// - `duration`: Maximum time to wait for active connections to close. `None` means the server
67    /// will wait indefinitely.
68    pub fn graceful_shutdown(&self, duration: Option<Duration>) {
69        *self.inner.graceful_dur.lock().unwrap() = duration;
70        self.inner.graceful.notify_waiters();
71    }
72
73    /// Wait until the server starts listening and then returns its local address and port.
74    ///
75    /// # Returns
76    ///
77    /// The local `SocketAddr` if the server successfully binds, otherwise `None`.
78    pub async fn listening(&self) -> Option<SocketAddr> {
79        let notified = self.inner.addr_notify.notified();
80
81        if let Some(addr) = *self.inner.addr.lock().unwrap() {
82            return Some(addr);
83        }
84
85        notified.await;
86
87        *self.inner.addr.lock().unwrap()
88    }
89
90    /// Internal method to notify the handle when the server starts listening on a particular address.
91    pub(crate) fn notify_listening(&self, addr: Option<SocketAddr>) {
92        *self.inner.addr.lock().unwrap() = addr;
93        self.inner.addr_notify.notify_waiters();
94    }
95
96    /// Creates a watcher that monitors server status and connection activity.
97    pub(crate) fn watcher(&self) -> Watcher {
98        Watcher::new(self.clone())
99    }
100
101    /// Internal method to wait until the server is shut down.
102    pub(crate) async fn wait_shutdown(&self) {
103        self.inner.shutdown.notified().await;
104    }
105
106    /// Internal method to wait until the server is gracefully shut down.
107    pub(crate) async fn wait_graceful_shutdown(&self) {
108        self.inner.graceful.notified().await;
109    }
110
111    /// Internal method to wait until all connections have ended, or the optional graceful duration has expired.
112    pub(crate) async fn wait_connections_end(&self) {
113        if self.inner.conn_count.load(Ordering::SeqCst) == 0 {
114            return;
115        }
116
117        let deadline = *self.inner.graceful_dur.lock().unwrap();
118
119        match deadline {
120            Some(duration) => tokio::select! {
121                biased;
122                _ = sleep(duration) => self.shutdown(),
123                _ = self.inner.conn_end.notified() => (),
124            },
125            None => self.inner.conn_end.notified().await,
126        }
127    }
128}
129
130/// A watcher that monitors server status and connection activity.
131///
132/// The watcher keeps track of active connections and listens for shutdown or graceful shutdown signals.
133pub(crate) struct Watcher {
134    handle: Handle,
135}
136
137impl Watcher {
138    /// Creates a new watcher linked to the given server handle.
139    fn new(handle: Handle) -> Self {
140        handle.inner.conn_count.fetch_add(1, Ordering::SeqCst);
141        Self { handle }
142    }
143
144    /// Internal method to wait until the server is gracefully shut down.
145    pub(crate) async fn wait_graceful_shutdown(&self) {
146        self.handle.wait_graceful_shutdown().await
147    }
148
149    /// Internal method to wait until the server is shut down.
150    pub(crate) async fn wait_shutdown(&self) {
151        self.handle.wait_shutdown().await
152    }
153}
154
155impl Drop for Watcher {
156    /// Reduces the active connection count when a watcher is dropped.
157    ///
158    /// If the connection count reaches zero and a graceful shutdown has been initiated, the server is notified that
159    /// all connections have ended.
160    fn drop(&mut self) {
161        let count = self.handle.inner.conn_count.fetch_sub(1, Ordering::SeqCst) - 1;
162
163        if count == 0 && self.handle.inner.graceful.is_notified() {
164            self.handle.inner.conn_end.notify_waiters();
165        }
166    }
167}