hreq/server/
serv_handle.rs1use std::fmt;
2use std::future::Future;
3use std::sync::Arc;
4
5use hreq_h1::mpsc::{Receiver, Sender};
6
7pub struct ServerHandle {
11 tx_shutdown: Sender<()>,
12 rx_confirm: Receiver<()>,
13}
14
15impl ServerHandle {
16 pub(crate) async fn new() -> (Self, EndFut) {
17 let (tx_shutdown, rx_shutdown) = Receiver::new(1);
18 let (tx_confirm, rx_confirm) = Receiver::new(1);
19
20 (
21 ServerHandle {
22 tx_shutdown,
23 rx_confirm,
24 },
25 EndFut {
26 rx_shutdown,
27 tx_confirm: Arc::new(tx_confirm),
28 },
29 )
30 }
31
32 pub async fn shutdown(self) {
34 let ServerHandle {
37 tx_shutdown,
38 rx_confirm,
39 } = self;
40
41 drop(tx_shutdown);
42
43 trace!("Await server shutdown confirmation");
44 rx_confirm.recv().await;
45 }
46
47 pub async fn keep_alive(self) -> ! {
49 NoFuture.await;
50 unreachable!()
51 }
52}
53
54#[derive(Clone)]
55pub(crate) struct EndFut {
56 rx_shutdown: Receiver<()>,
57 tx_confirm: Arc<Sender<()>>,
58}
59
60impl EndFut {
61 pub async fn race<F>(&self, f: F) -> Option<F::Output>
62 where
63 F: Future,
64 {
65 let wait_for_value = Box::pin(async {
68 let v = f.await;
69 Some(v)
70 });
71
72 let wait_for_end = Box::pin(async {
73 self.rx_shutdown.recv().await;
74 None
75 });
76
77 let ret = Select(Some(Inner(wait_for_value, wait_for_end))).await;
78
79 trace!("Race is ended: {}", ret.is_none());
80
81 ret
82 }
83}
84
85impl Drop for EndFut {
86 fn drop(&mut self) {
87 let count = Arc::strong_count(&self.tx_confirm);
88 trace!("EndFut instances left: {}", count - 1);
89 }
90}
91
92struct NoFuture;
93
94impl std::future::Future for NoFuture {
95 type Output = ();
96 fn poll(
97 self: std::pin::Pin<&mut Self>,
98 _cx: &mut std::task::Context,
99 ) -> std::task::Poll<Self::Output> {
100 std::task::Poll::Pending
101 }
102}
103
104impl fmt::Debug for ServerHandle {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 write!(f, "ServerHandle")
107 }
108}
109
110use std::pin::Pin;
111use std::task::{Context, Poll};
112
113struct Select<A, B>(Option<Inner<A, B>>);
114
115struct Inner<A, B>(A, B);
116
117impl<A, B, T> Future for Select<A, B>
118where
119 A: Future<Output = T> + Unpin,
120 B: Future<Output = T> + Unpin,
121{
122 type Output = T;
123
124 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
125 let this = self.get_mut();
126
127 if let Some(inner) = &mut this.0 {
128 let ret = match Pin::new(&mut inner.0).poll(cx) {
129 Poll::Ready(v) => Poll::Ready(v),
130 Poll::Pending => Pin::new(&mut inner.1).poll(cx),
131 };
132
133 if ret.is_ready() {
134 this.0 = None;
135 }
136
137 ret
138 } else {
139 warn!("Poll::Pending on finished Select");
140
141 Poll::Pending
142 }
143 }
144}