xitca_server/server/
future.rs1use std::{
2 future::Future,
3 io, mem,
4 pin::Pin,
5 task::{Context, Poll, ready},
6};
7
8use crate::signals::{self, Signal, SignalFuture};
9
10use super::{Command, Server, handle::ServerHandle};
11
12#[must_use = "ServerFuture must be .await/ spawn as task / consumed with ServerFuture::wait."]
13pub enum ServerFuture {
14 Init { server: Server, enable_signal: bool },
15 Running(ServerFutureInner),
16 Error(io::Error),
17 Finished,
18}
19
20impl ServerFuture {
21 pub fn handle(&mut self) -> io::Result<ServerHandle> {
47 match *self {
48 Self::Init { ref server, .. } => Ok(ServerHandle {
49 tx: server.tx_cmd.clone(),
50 }),
51 Self::Running(ref inner) => Ok(ServerHandle {
52 tx: inner.server.tx_cmd.clone(),
53 }),
54 Self::Error(_) => match mem::take(self) {
55 Self::Error(e) => Err(e),
56 _ => unreachable!(),
57 },
58 Self::Finished => panic!("ServerFuture used after finished"),
59 }
60 }
61
62 pub fn wait(self) -> io::Result<()> {
67 match self {
68 Self::Init {
69 mut server,
70 enable_signal,
71 } => {
72 let rt = server.rt.take().unwrap();
73
74 let func = move || {
75 let (mut server_fut, cmd) = rt.block_on(async {
76 let mut server_fut = ServerFutureInner::new(server, enable_signal);
77 let cmd = std::future::poll_fn(|cx| server_fut.poll_cmd(cx)).await;
78 (server_fut, cmd)
79 });
80 server_fut.server.rt = Some(rt);
81 (server_fut, cmd)
82 };
83
84 let (mut server_fut, cmd) = match tokio::runtime::Handle::try_current() {
85 Ok(_) => {
86 tracing::warn!(
87 "ServerFuture::wait is called from within tokio context. It would block current thread from handling async tasks."
88 );
89 std::thread::Builder::new()
90 .name(String::from("xitca-server-wait-scoped"))
91 .spawn(func)?
92 .join()
93 .expect("ServerFutureInner unexpected panicing")
94 }
95 Err(_) => func(),
96 };
97
98 server_fut.handle_cmd(cmd);
99 Ok(())
100 }
101 Self::Running(..) => panic!("ServerFuture is already polled."),
102 Self::Error(e) => Err(e),
103 Self::Finished => unreachable!(),
104 }
105 }
106}
107
108pub struct ServerFutureInner {
109 pub(crate) server: Server,
110 pub(crate) signals: Option<SignalFuture>,
111}
112
113impl Default for ServerFuture {
114 fn default() -> Self {
115 Self::Finished
116 }
117}
118
119impl ServerFutureInner {
120 #[inline(never)]
121 fn new(server: Server, enable_signal: bool) -> Self {
122 Self {
123 server,
124 signals: enable_signal.then(signals::start),
125 }
126 }
127
128 #[inline(never)]
129 fn poll_cmd(&mut self, cx: &mut Context<'_>) -> Poll<Command> {
130 if let Some(signals) = self.signals.as_mut() {
131 if let Poll::Ready(sig) = Pin::new(signals).poll(cx) {
132 tracing::info!("Signal {:?} received.", sig);
133 let cmd = match sig {
134 Signal::Int | Signal::Quit => Command::ForceStop,
135 Signal::Term => Command::GracefulStop,
136 Signal::Hup => {
139 self.signals = None;
140 return Poll::Pending;
141 }
142 };
143 return Poll::Ready(cmd);
144 }
145 }
146
147 match ready!(Pin::new(&mut self.server.rx_cmd).poll_recv(cx)) {
148 Some(cmd) => Poll::Ready(cmd),
149 None => Poll::Pending,
150 }
151 }
152
153 #[inline(never)]
154 fn handle_cmd(&mut self, cmd: Command) {
155 match cmd {
156 Command::ForceStop => {
157 self.server.stop(false);
158 }
159 Command::GracefulStop => {
160 self.server.stop(true);
161 }
162 }
163 }
164}
165
166impl Future for ServerFuture {
167 type Output = io::Result<()>;
168
169 #[inline(never)]
170 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171 let this = self.as_mut().get_mut();
172 match *this {
173 Self::Init { .. } => match mem::take(this) {
174 Self::Init { server, enable_signal } => {
175 self.set(Self::Running(ServerFutureInner::new(server, enable_signal)));
176 self.poll(cx)
177 }
178 _ => unreachable!(),
179 },
180 Self::Running(ref mut inner) => {
181 let cmd = ready!(inner.poll_cmd(cx));
182 inner.handle_cmd(cmd);
183 self.set(Self::Finished);
184 Poll::Ready(Ok(()))
185 }
186 Self::Error(_) => match mem::take(this) {
187 Self::Error(e) => Poll::Ready(Err(e)),
188 _ => unreachable!(""),
189 },
190 Self::Finished => unreachable!("ServerFuture polled after finish"),
191 }
192 }
193}