xitca_server/server/
future.rs

1use std::{
2    future::Future,
3    io, mem,
4    pin::Pin,
5    task::{Context, Poll, ready},
6};
7
8use crate::signals::{self, Signal, SignalFuture};
9
10use super::{Command, Server, handle::ServerHandle};
11
12#[must_use = "ServerFuture must be .await/ spawn as task / consumed with ServerFuture::wait."]
13#[derive(Default)]
14pub enum ServerFuture {
15    Init {
16        server: Server,
17        enable_signal: bool,
18    },
19    Running(ServerFutureInner),
20    Error(io::Error),
21    #[default]
22    Finished,
23}
24
25impl ServerFuture {
26    /// A handle for mutate Server state.
27    ///
28    /// # Examples:
29    ///
30    /// ```rust
31    /// # use xitca_io::net::{TcpStream};
32    /// # use xitca_server::Builder;
33    /// # use xitca_service::fn_service;
34    /// # #[tokio::main]
35    /// # async fn main() {
36    /// let mut server = Builder::new()
37    ///     .bind("test", "127.0.0.1:0", fn_service(|_io: TcpStream| async { Ok::<_, ()>(())}))
38    ///     .unwrap()
39    ///     .build();
40    ///
41    /// // obtain a handle. if server fail to start a std::io::Error would return.
42    /// let handle = server.handle().unwrap();
43    ///
44    /// // spawn server future.
45    /// tokio::spawn(server);
46    ///
47    /// // do a graceful shutdown of server.
48    /// handle.stop(true);
49    /// # }
50    /// ```
51    pub fn handle(&mut self) -> io::Result<ServerHandle> {
52        match *self {
53            Self::Init { ref server, .. } => Ok(ServerHandle {
54                tx: server.tx_cmd.clone(),
55            }),
56            Self::Running(ref inner) => Ok(ServerHandle {
57                tx: inner.server.tx_cmd.clone(),
58            }),
59            Self::Error(_) => match mem::take(self) {
60                Self::Error(e) => Err(e),
61                _ => unreachable!(),
62            },
63            Self::Finished => panic!("ServerFuture used after finished"),
64        }
65    }
66
67    /// Consume ServerFuture and block current thread waitting for server stop.
68    ///
69    /// Server can be stopped through OS signal or [ServerHandle::stop]. If none is active this call
70    /// would block forever.
71    pub fn wait(self) -> io::Result<()> {
72        match self {
73            Self::Init {
74                mut server,
75                enable_signal,
76            } => {
77                let rt = server.rt.take().unwrap();
78
79                let func = move || {
80                    let (mut server_fut, cmd) = rt.block_on(async {
81                        let mut server_fut = ServerFutureInner::new(server, enable_signal);
82                        let cmd = std::future::poll_fn(|cx| server_fut.poll_cmd(cx)).await;
83                        (server_fut, cmd)
84                    });
85                    server_fut.server.rt = Some(rt);
86                    (server_fut, cmd)
87                };
88
89                let (mut server_fut, cmd) = match tokio::runtime::Handle::try_current() {
90                    Ok(_) => {
91                        tracing::warn!(
92                            "ServerFuture::wait is called from within tokio context. It would block current thread from handling async tasks."
93                        );
94                        std::thread::Builder::new()
95                            .name(String::from("xitca-server-wait-scoped"))
96                            .spawn(func)?
97                            .join()
98                            .expect("ServerFutureInner unexpected panicing")
99                    }
100                    Err(_) => func(),
101                };
102
103                server_fut.handle_cmd(cmd);
104                Ok(())
105            }
106            Self::Running(..) => panic!("ServerFuture is already polled."),
107            Self::Error(e) => Err(e),
108            Self::Finished => unreachable!(),
109        }
110    }
111}
112
113pub struct ServerFutureInner {
114    pub(crate) server: Server,
115    pub(crate) signals: Option<SignalFuture>,
116}
117
118impl ServerFutureInner {
119    #[inline(never)]
120    fn new(server: Server, enable_signal: bool) -> Self {
121        Self {
122            server,
123            signals: enable_signal.then(signals::start),
124        }
125    }
126
127    #[inline(never)]
128    fn poll_cmd(&mut self, cx: &mut Context<'_>) -> Poll<Command> {
129        if let Some(signals) = self.signals.as_mut() {
130            if let Poll::Ready(sig) = Pin::new(signals).poll(cx) {
131                tracing::info!("Signal {:?} received.", sig);
132                let cmd = match sig {
133                    Signal::Int | Signal::Quit => Command::ForceStop,
134                    Signal::Term => Command::GracefulStop,
135                    // Remove signal listening and keep Server running when
136                    // terminal closed which xitca-server process belong.
137                    Signal::Hup => {
138                        self.signals = None;
139                        return Poll::Pending;
140                    }
141                };
142                return Poll::Ready(cmd);
143            }
144        }
145
146        match ready!(Pin::new(&mut self.server.rx_cmd).poll_recv(cx)) {
147            Some(cmd) => Poll::Ready(cmd),
148            None => Poll::Pending,
149        }
150    }
151
152    #[inline(never)]
153    fn handle_cmd(&mut self, cmd: Command) {
154        match cmd {
155            Command::ForceStop => {
156                self.server.stop(false);
157            }
158            Command::GracefulStop => {
159                self.server.stop(true);
160            }
161        }
162    }
163}
164
165impl Future for ServerFuture {
166    type Output = io::Result<()>;
167
168    #[inline(never)]
169    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
170        let this = self.as_mut().get_mut();
171        match *this {
172            Self::Init { .. } => match mem::take(this) {
173                Self::Init { server, enable_signal } => {
174                    self.set(Self::Running(ServerFutureInner::new(server, enable_signal)));
175                    self.poll(cx)
176                }
177                _ => unreachable!(),
178            },
179            Self::Running(ref mut inner) => {
180                let cmd = ready!(inner.poll_cmd(cx));
181                inner.handle_cmd(cmd);
182                self.set(Self::Finished);
183                Poll::Ready(Ok(()))
184            }
185            Self::Error(_) => match mem::take(this) {
186                Self::Error(e) => Poll::Ready(Err(e)),
187                _ => unreachable!(""),
188            },
189            Self::Finished => unreachable!("ServerFuture polled after finish"),
190        }
191    }
192}