hyper_server/handle.rs
1use crate::notify_once::NotifyOnce;
2use std::{
3 net::SocketAddr,
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc, Mutex,
7 },
8 time::Duration,
9};
10use tokio::{sync::Notify, time::sleep};
11
12/// A handle to manage and interact with the server.
13///
14/// `Handle` provides methods to access server information, such as the number of active connections,
15/// and to perform actions like initiating a shutdown.
16#[derive(Clone, Debug, Default)]
17pub struct Handle {
18 inner: Arc<HandleInner>,
19}
20
21#[derive(Debug, Default)]
22struct HandleInner {
23 addr: Mutex<Option<SocketAddr>>,
24 addr_notify: Notify,
25 conn_count: AtomicUsize,
26 shutdown: NotifyOnce,
27 graceful: NotifyOnce,
28 graceful_dur: Mutex<Option<Duration>>,
29 conn_end: NotifyOnce,
30}
31
32impl Handle {
33 /// Create a new handle for the server.
34 ///
35 /// # Returns
36 ///
37 /// A new `Handle` instance.
38 pub fn new() -> Self {
39 Self::default()
40 }
41
42 /// Get the number of active connections to the server.
43 ///
44 /// # Returns
45 ///
46 /// The number of active connections.
47 pub fn connection_count(&self) -> usize {
48 self.inner.conn_count.load(Ordering::SeqCst)
49 }
50
51 /// Initiate an immediate shutdown of the server.
52 ///
53 /// This method will terminate the server without waiting for active connections to close.
54 pub fn shutdown(&self) {
55 self.inner.shutdown.notify_waiters();
56 }
57
58 /// Initiate a graceful shutdown of the server.
59 ///
60 /// The server will wait for active connections to close before shutting down. If a duration
61 /// is provided, the server will wait up to that duration for active connections to close
62 /// before forcing a shutdown.
63 ///
64 /// # Parameters
65 ///
66 /// - `duration`: Maximum time to wait for active connections to close. `None` means the server
67 /// will wait indefinitely.
68 pub fn graceful_shutdown(&self, duration: Option<Duration>) {
69 *self.inner.graceful_dur.lock().unwrap() = duration;
70 self.inner.graceful.notify_waiters();
71 }
72
73 /// Wait until the server starts listening and then returns its local address and port.
74 ///
75 /// # Returns
76 ///
77 /// The local `SocketAddr` if the server successfully binds, otherwise `None`.
78 pub async fn listening(&self) -> Option<SocketAddr> {
79 let notified = self.inner.addr_notify.notified();
80
81 if let Some(addr) = *self.inner.addr.lock().unwrap() {
82 return Some(addr);
83 }
84
85 notified.await;
86
87 *self.inner.addr.lock().unwrap()
88 }
89
90 /// Internal method to notify the handle when the server starts listening on a particular address.
91 pub(crate) fn notify_listening(&self, addr: Option<SocketAddr>) {
92 *self.inner.addr.lock().unwrap() = addr;
93 self.inner.addr_notify.notify_waiters();
94 }
95
96 /// Creates a watcher that monitors server status and connection activity.
97 pub(crate) fn watcher(&self) -> Watcher {
98 Watcher::new(self.clone())
99 }
100
101 /// Internal method to wait until the server is shut down.
102 pub(crate) async fn wait_shutdown(&self) {
103 self.inner.shutdown.notified().await;
104 }
105
106 /// Internal method to wait until the server is gracefully shut down.
107 pub(crate) async fn wait_graceful_shutdown(&self) {
108 self.inner.graceful.notified().await;
109 }
110
111 /// Internal method to wait until all connections have ended, or the optional graceful duration has expired.
112 pub(crate) async fn wait_connections_end(&self) {
113 if self.inner.conn_count.load(Ordering::SeqCst) == 0 {
114 return;
115 }
116
117 let deadline = *self.inner.graceful_dur.lock().unwrap();
118
119 match deadline {
120 Some(duration) => tokio::select! {
121 biased;
122 _ = sleep(duration) => self.shutdown(),
123 _ = self.inner.conn_end.notified() => (),
124 },
125 None => self.inner.conn_end.notified().await,
126 }
127 }
128}
129
130/// A watcher that monitors server status and connection activity.
131///
132/// The watcher keeps track of active connections and listens for shutdown or graceful shutdown signals.
133pub(crate) struct Watcher {
134 handle: Handle,
135}
136
137impl Watcher {
138 /// Creates a new watcher linked to the given server handle.
139 fn new(handle: Handle) -> Self {
140 handle.inner.conn_count.fetch_add(1, Ordering::SeqCst);
141 Self { handle }
142 }
143
144 /// Internal method to wait until the server is gracefully shut down.
145 pub(crate) async fn wait_graceful_shutdown(&self) {
146 self.handle.wait_graceful_shutdown().await
147 }
148
149 /// Internal method to wait until the server is shut down.
150 pub(crate) async fn wait_shutdown(&self) {
151 self.handle.wait_shutdown().await
152 }
153}
154
155impl Drop for Watcher {
156 /// Reduces the active connection count when a watcher is dropped.
157 ///
158 /// If the connection count reaches zero and a graceful shutdown has been initiated, the server is notified that
159 /// all connections have ended.
160 fn drop(&mut self) {
161 let count = self.handle.inner.conn_count.fetch_sub(1, Ordering::SeqCst) - 1;
162
163 if count == 0 && self.handle.inner.graceful.is_notified() {
164 self.handle.inner.conn_end.notify_waiters();
165 }
166 }
167}