hreq/server/
serv_handle.rs

1use std::fmt;
2use std::future::Future;
3use std::sync::Arc;
4
5use hreq_h1::mpsc::{Receiver, Sender};
6
7/// Handle to a running server.
8///
9/// The server functions as long as this handle is not dropped.
10pub struct ServerHandle {
11    tx_shutdown: Sender<()>,
12    rx_confirm: Receiver<()>,
13}
14
15impl ServerHandle {
16    pub(crate) async fn new() -> (Self, EndFut) {
17        let (tx_shutdown, rx_shutdown) = Receiver::new(1);
18        let (tx_confirm, rx_confirm) = Receiver::new(1);
19
20        (
21            ServerHandle {
22                tx_shutdown,
23                rx_confirm,
24            },
25            EndFut {
26                rx_shutdown,
27                tx_confirm: Arc::new(tx_confirm),
28            },
29        )
30    }
31
32    /// Signal to the server to close down. Stop listening to the port and exit.
33    pub async fn shutdown(self) {
34        // When we drop the tx_shutdown sender, all connected
35        // receivers are woken up and realise it's gone.
36        let ServerHandle {
37            tx_shutdown,
38            rx_confirm,
39        } = self;
40
41        drop(tx_shutdown);
42
43        trace!("Await server shutdown confirmation");
44        rx_confirm.recv().await;
45    }
46
47    /// Await this to keep the server alive forever. Will never return.
48    pub async fn keep_alive(self) -> ! {
49        NoFuture.await;
50        unreachable!()
51    }
52}
53
54#[derive(Clone)]
55pub(crate) struct EndFut {
56    rx_shutdown: Receiver<()>,
57    tx_confirm: Arc<Sender<()>>,
58}
59
60impl EndFut {
61    pub async fn race<F>(&self, f: F) -> Option<F::Output>
62    where
63        F: Future,
64    {
65        // first to complete...
66
67        let wait_for_value = Box::pin(async {
68            let v = f.await;
69            Some(v)
70        });
71
72        let wait_for_end = Box::pin(async {
73            self.rx_shutdown.recv().await;
74            None
75        });
76
77        let ret = Select(Some(Inner(wait_for_value, wait_for_end))).await;
78
79        trace!("Race is ended: {}", ret.is_none());
80
81        ret
82    }
83}
84
85impl Drop for EndFut {
86    fn drop(&mut self) {
87        let count = Arc::strong_count(&self.tx_confirm);
88        trace!("EndFut instances left: {}", count - 1);
89    }
90}
91
92struct NoFuture;
93
94impl std::future::Future for NoFuture {
95    type Output = ();
96    fn poll(
97        self: std::pin::Pin<&mut Self>,
98        _cx: &mut std::task::Context,
99    ) -> std::task::Poll<Self::Output> {
100        std::task::Poll::Pending
101    }
102}
103
104impl fmt::Debug for ServerHandle {
105    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106        write!(f, "ServerHandle")
107    }
108}
109
110use std::pin::Pin;
111use std::task::{Context, Poll};
112
113struct Select<A, B>(Option<Inner<A, B>>);
114
115struct Inner<A, B>(A, B);
116
117impl<A, B, T> Future for Select<A, B>
118where
119    A: Future<Output = T> + Unpin,
120    B: Future<Output = T> + Unpin,
121{
122    type Output = T;
123
124    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
125        let this = self.get_mut();
126
127        if let Some(inner) = &mut this.0 {
128            let ret = match Pin::new(&mut inner.0).poll(cx) {
129                Poll::Ready(v) => Poll::Ready(v),
130                Poll::Pending => Pin::new(&mut inner.1).poll(cx),
131            };
132
133            if ret.is_ready() {
134                this.0 = None;
135            }
136
137            ret
138        } else {
139            warn!("Poll::Pending on finished Select");
140
141            Poll::Pending
142        }
143    }
144}