trillium_server_common/
server_handle.rs

1use crate::CloneCounterObserver;
2use async_cell::sync::AsyncCell;
3use event_listener::{Event, EventListener};
4use std::{
5    fmt::{Debug, Formatter, Result},
6    future::{Future, IntoFuture},
7    pin::Pin,
8    sync::{
9        atomic::{AtomicBool, Ordering},
10        Arc,
11    },
12    task::{Context, Poll},
13};
14use trillium::Info;
15use trillium_http::Stopper;
16
17/// A handle for a spawned trillium server. Returned by
18/// [`Config::handle`][crate::Config::handle] and
19/// [`Config::spawn`][crate::Config::spawn]
20#[derive(Clone, Debug)]
21pub struct ServerHandle {
22    pub(crate) stopper: Stopper,
23    pub(crate) info: Arc<AsyncCell<Info>>,
24    pub(crate) completion: CompletionFuture,
25    pub(crate) observer: CloneCounterObserver,
26}
27
28pub struct CompletionFuture(Arc<CompletionFutureInner>, Pin<Box<EventListener>>);
29
30impl Default for CompletionFuture {
31    fn default() -> Self {
32        let inner = Arc::new(CompletionFutureInner::default());
33        let listener = inner.event.listen();
34        Self(inner, listener)
35    }
36}
37
38impl Clone for CompletionFuture {
39    fn clone(&self) -> Self {
40        Self(Arc::clone(&self.0), self.0.event.listen())
41    }
42}
43
44impl CompletionFuture {
45    pub(crate) fn notify(self) {
46        if !self.0.complete.swap(true, Ordering::SeqCst) {
47            self.0.event.notify(usize::MAX);
48        }
49    }
50
51    pub(crate) fn is_complete(&self) -> bool {
52        self.0.complete.load(Ordering::SeqCst)
53    }
54
55    pub(crate) fn new() -> Self {
56        Self::default()
57    }
58}
59
60pub struct CompletionFutureInner {
61    complete: AtomicBool,
62    event: Event,
63}
64
65impl Default for CompletionFutureInner {
66    fn default() -> Self {
67        Self {
68            complete: AtomicBool::new(false),
69            event: Event::new(),
70        }
71    }
72}
73
74impl Debug for CompletionFuture {
75    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
76        f.debug_tuple("CompletionFuture")
77            .field(&self.0.complete)
78            .finish()
79    }
80}
81
82impl Future for CompletionFuture {
83    type Output = ();
84
85    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86        let Self(inner, listener) = &mut *self;
87        loop {
88            if inner.complete.load(Ordering::SeqCst) {
89                return Poll::Ready(());
90            }
91
92            if listener.is_listening() {
93                match listener.as_mut().poll(cx) {
94                    Poll::Ready(()) => continue,
95                    Poll::Pending => return Poll::Pending,
96                }
97            } else {
98                listener.as_mut().listen(&inner.event);
99            }
100        }
101    }
102}
103
104impl ServerHandle {
105    /// await server start and retrieve the server's [`Info`]
106    pub async fn info(&self) -> Info {
107        self.info.get().await
108    }
109
110    /// stop server and wait for it to shut down gracefully
111    pub async fn stop(&self) {
112        self.stopper.stop();
113        self.completion.clone().await
114    }
115
116    /// retrieves a clone of the [`Stopper`] used by this server
117    pub fn stopper(&self) -> Stopper {
118        self.stopper.clone()
119    }
120
121    /// retrieves a [`CloneCounterObserver`] which can be used to
122    /// monitor or modify the number of outstanding connections for
123    /// the purposes of graceful shutdown.
124    pub fn observer(&self) -> CloneCounterObserver {
125        self.observer.clone()
126    }
127
128    /// checks whether this server has shut down. It's preferable to await
129    /// this [`ServerHandle`] instead of polling this.
130    pub fn is_running(&self) -> bool {
131        !self.completion.is_complete()
132    }
133}
134
135impl IntoFuture for ServerHandle {
136    type Output = ();
137
138    type IntoFuture = CompletionFuture;
139
140    fn into_future(self) -> Self::IntoFuture {
141        self.completion
142    }
143}