xitca_server/server/
future.rs

1use std::{
2    future::Future,
3    io, mem,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use crate::signals::{self, Signal, SignalFuture};
9
10use super::{Command, Server, handle::ServerHandle};
11
12#[must_use = "ServerFuture must be .await/ spawn as task / consumed with ServerFuture::wait."]
13pub enum ServerFuture {
14    Init { server: Server, enable_signal: bool },
15    Running(ServerFutureInner),
16    Error(io::Error),
17    Finished,
18}
19
20impl ServerFuture {
21    /// A handle for mutate Server state.
22    ///
23    /// # Examples:
24    ///
25    /// ```rust
26    /// # use xitca_io::net::{TcpStream};
27    /// # use xitca_server::Builder;
28    /// # use xitca_service::fn_service;
29    /// # #[tokio::main]
30    /// # async fn main() {
31    /// let mut server = Builder::new()
32    ///     .bind("test", "127.0.0.1:0", fn_service(|_io: TcpStream| async { Ok::<_, ()>(())}))
33    ///     .unwrap()
34    ///     .build();
35    ///
36    /// // obtain a handle. if server fail to start a std::io::Error would return.
37    /// let handle = server.handle().unwrap();
38    ///
39    /// // spawn server future.
40    /// tokio::spawn(server);
41    ///
42    /// // do a graceful shutdown of server.
43    /// handle.stop(true);
44    /// # }
45    /// ```
46    pub fn handle(&mut self) -> io::Result<ServerHandle> {
47        match *self {
48            Self::Init { ref server, .. } => Ok(ServerHandle {
49                tx: server.tx_cmd.clone(),
50            }),
51            Self::Running(ref inner) => Ok(ServerHandle {
52                tx: inner.server.tx_cmd.clone(),
53            }),
54            Self::Error(_) => match mem::take(self) {
55                Self::Error(e) => Err(e),
56                _ => unreachable!(),
57            },
58            Self::Finished => panic!("ServerFuture used after finished"),
59        }
60    }
61
62    /// Consume ServerFuture and block current thread waitting for server stop.
63    ///
64    /// Server can be stopped through OS signal or [ServerHandle::stop]. If none is active this call
65    /// would block forever.
66    pub fn wait(self) -> io::Result<()> {
67        match self {
68            Self::Init {
69                mut server,
70                enable_signal,
71            } => {
72                let rt = server.rt.take().unwrap();
73
74                let func = move || {
75                    let (mut server_fut, cmd) = rt.block_on(async {
76                        let mut server_fut = ServerFutureInner::new(server, enable_signal);
77                        let cmd = std::future::poll_fn(|cx| server_fut.poll_cmd(cx)).await;
78                        (server_fut, cmd)
79                    });
80                    server_fut.server.rt = Some(rt);
81                    (server_fut, cmd)
82                };
83
84                let (mut server_fut, cmd) = match tokio::runtime::Handle::try_current() {
85                    Ok(_) => {
86                        tracing::warn!(
87                            "ServerFuture::wait is called from within tokio context. It would block current thread from handling async tasks."
88                        );
89                        std::thread::Builder::new()
90                            .name(String::from("xitca-server-wait-scoped"))
91                            .spawn(func)?
92                            .join()
93                            .expect("ServerFutureInner unexpected panicing")
94                    }
95                    Err(_) => func(),
96                };
97
98                server_fut.handle_cmd(cmd);
99                Ok(())
100            }
101            Self::Running(..) => panic!("ServerFuture is already polled."),
102            Self::Error(e) => Err(e),
103            Self::Finished => unreachable!(),
104        }
105    }
106}
107
108pub struct ServerFutureInner {
109    pub(crate) server: Server,
110    pub(crate) signals: Option<SignalFuture>,
111}
112
113impl Default for ServerFuture {
114    fn default() -> Self {
115        Self::Finished
116    }
117}
118
119impl ServerFutureInner {
120    #[inline(never)]
121    fn new(server: Server, enable_signal: bool) -> Self {
122        Self {
123            server,
124            signals: enable_signal.then(signals::start),
125        }
126    }
127
128    #[inline(never)]
129    fn poll_cmd(&mut self, cx: &mut Context<'_>) -> Poll<Command> {
130        if let Some(signals) = self.signals.as_mut() {
131            if let Poll::Ready(sig) = Pin::new(signals).poll(cx) {
132                tracing::info!("Signal {:?} received.", sig);
133                let cmd = match sig {
134                    Signal::Int | Signal::Quit => Command::ForceStop,
135                    Signal::Term => Command::GracefulStop,
136                    // Remove signal listening and keep Server running when
137                    // terminal closed which xitca-server process belong.
138                    Signal::Hup => {
139                        self.signals = None;
140                        return Poll::Pending;
141                    }
142                };
143                return Poll::Ready(cmd);
144            }
145        }
146
147        match ready!(Pin::new(&mut self.server.rx_cmd).poll_recv(cx)) {
148            Some(cmd) => Poll::Ready(cmd),
149            None => Poll::Pending,
150        }
151    }
152
153    #[inline(never)]
154    fn handle_cmd(&mut self, cmd: Command) {
155        match cmd {
156            Command::ForceStop => {
157                self.server.stop(false);
158            }
159            Command::GracefulStop => {
160                self.server.stop(true);
161            }
162        }
163    }
164}
165
166impl Future for ServerFuture {
167    type Output = io::Result<()>;
168
169    #[inline(never)]
170    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171        let this = self.as_mut().get_mut();
172        match *this {
173            Self::Init { .. } => match mem::take(this) {
174                Self::Init { server, enable_signal } => {
175                    self.set(Self::Running(ServerFutureInner::new(server, enable_signal)));
176                    self.poll(cx)
177                }
178                _ => unreachable!(),
179            },
180            Self::Running(ref mut inner) => {
181                let cmd = ready!(inner.poll_cmd(cx));
182                inner.handle_cmd(cmd);
183                self.set(Self::Finished);
184                Poll::Ready(Ok(()))
185            }
186            Self::Error(_) => match mem::take(this) {
187                Self::Error(e) => Poll::Ready(Err(e)),
188                _ => unreachable!(""),
189            },
190            Self::Finished => unreachable!("ServerFuture polled after finish"),
191        }
192    }
193}