1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
use crate::notify_once::NotifyOnce;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::Duration,
};
use tokio::{sync::Notify, time::sleep};
/// A handle to manage and interact with the server.
///
/// `Handle` provides methods to access server information, such as the number of active connections,
/// and to perform actions like initiating a shutdown.
#[derive(Clone, Debug, Default)]
pub struct Handle {
inner: Arc<HandleInner>,
}
#[derive(Debug, Default)]
struct HandleInner {
addr: Mutex<Option<SocketAddr>>,
addr_notify: Notify,
conn_count: AtomicUsize,
shutdown: NotifyOnce,
graceful: NotifyOnce,
graceful_dur: Mutex<Option<Duration>>,
conn_end: NotifyOnce,
}
impl Handle {
/// Create a new handle for the server.
///
/// # Returns
///
/// A new `Handle` instance.
pub fn new() -> Self {
Self::default()
}
/// Get the number of active connections to the server.
///
/// # Returns
///
/// The number of active connections.
pub fn connection_count(&self) -> usize {
self.inner.conn_count.load(Ordering::SeqCst)
}
/// Initiate an immediate shutdown of the server.
///
/// This method will terminate the server without waiting for active connections to close.
pub fn shutdown(&self) {
self.inner.shutdown.notify_waiters();
}
/// Initiate a graceful shutdown of the server.
///
/// The server will wait for active connections to close before shutting down. If a duration
/// is provided, the server will wait up to that duration for active connections to close
/// before forcing a shutdown.
///
/// # Parameters
///
/// - `duration`: Maximum time to wait for active connections to close. `None` means the server
/// will wait indefinitely.
pub fn graceful_shutdown(&self, duration: Option<Duration>) {
*self.inner.graceful_dur.lock().unwrap() = duration;
self.inner.graceful.notify_waiters();
}
/// Wait until the server starts listening and then returns its local address and port.
///
/// # Returns
///
/// The local `SocketAddr` if the server successfully binds, otherwise `None`.
pub async fn listening(&self) -> Option<SocketAddr> {
let notified = self.inner.addr_notify.notified();
if let Some(addr) = *self.inner.addr.lock().unwrap() {
return Some(addr);
}
notified.await;
*self.inner.addr.lock().unwrap()
}
/// Internal method to notify the handle when the server starts listening on a particular address.
pub(crate) fn notify_listening(&self, addr: Option<SocketAddr>) {
*self.inner.addr.lock().unwrap() = addr;
self.inner.addr_notify.notify_waiters();
}
/// Creates a watcher that monitors server status and connection activity.
pub(crate) fn watcher(&self) -> Watcher {
Watcher::new(self.clone())
}
/// Internal method to wait until the server is shut down.
pub(crate) async fn wait_shutdown(&self) {
self.inner.shutdown.notified().await;
}
/// Internal method to wait until the server is gracefully shut down.
pub(crate) async fn wait_graceful_shutdown(&self) {
self.inner.graceful.notified().await;
}
/// Internal method to wait until all connections have ended, or the optional graceful duration has expired.
pub(crate) async fn wait_connections_end(&self) {
if self.inner.conn_count.load(Ordering::SeqCst) == 0 {
return;
}
let deadline = *self.inner.graceful_dur.lock().unwrap();
match deadline {
Some(duration) => tokio::select! {
biased;
_ = sleep(duration) => self.shutdown(),
_ = self.inner.conn_end.notified() => (),
},
None => self.inner.conn_end.notified().await,
}
}
}
/// A watcher that monitors server status and connection activity.
///
/// The watcher keeps track of active connections and listens for shutdown or graceful shutdown signals.
pub(crate) struct Watcher {
handle: Handle,
}
impl Watcher {
/// Creates a new watcher linked to the given server handle.
fn new(handle: Handle) -> Self {
handle.inner.conn_count.fetch_add(1, Ordering::SeqCst);
Self { handle }
}
/// Internal method to wait until the server is gracefully shut down.
pub(crate) async fn wait_graceful_shutdown(&self) {
self.handle.wait_graceful_shutdown().await
}
/// Internal method to wait until the server is shut down.
pub(crate) async fn wait_shutdown(&self) {
self.handle.wait_shutdown().await
}
}
impl Drop for Watcher {
/// Reduces the active connection count when a watcher is dropped.
///
/// If the connection count reaches zero and a graceful shutdown has been initiated, the server is notified that
/// all connections have ended.
fn drop(&mut self) {
let count = self.handle.inner.conn_count.fetch_sub(1, Ordering::SeqCst) - 1;
if count == 0 && self.handle.inner.graceful.is_notified() {
self.handle.inner.conn_end.notify_waiters();
}
}
}