trillium_server_common/
server.rs

1use crate::{Acceptor, Config, ConfigExt, Stopper, Transport};
2use std::{
3    future::{ready, Future},
4    io::Result,
5    pin::Pin,
6    sync::Arc,
7};
8use trillium::{Handler, Info};
9
10/**
11The server trait, for standard network-based server implementations.
12*/
13pub trait Server: Sized + Send + Sync + 'static {
14    /// the individual byte stream that http
15    /// will be communicated over. This is often an async "stream"
16    /// like TcpStream or UnixStream. See [`Transport`]
17    type Transport: Transport;
18
19    /// The description of this server, to be appended to the Info and potentially logged.
20    const DESCRIPTION: &'static str;
21
22    /// Asynchronously return a single `Self::Transport` from a
23    /// `Self::Listener`. Must be implemented.
24    fn accept(&mut self) -> Pin<Box<dyn Future<Output = Result<Self::Transport>> + Send + '_>>;
25
26    /// Build an [`Info`] from the Self::Listener type. See [`Info`]
27    /// for more details.
28    fn info(&self) -> Info;
29
30    /// After the server has shut down, perform any housekeeping, eg
31    /// unlinking a unix socket.
32    fn clean_up(self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
33        Box::pin(ready(()))
34    }
35
36    /// Build a listener from the config. The default logic for this
37    /// is described elsewhere. To override the default logic, server
38    /// implementations could potentially implement this directly.  To
39    /// use this default logic, implement
40    /// [`Server::listener_from_tcp`] and
41    /// [`Server::listener_from_unix`].
42    #[cfg(unix)]
43    fn build_listener<A>(config: &Config<Self, A>) -> Self
44    where
45        A: Acceptor<Self::Transport>,
46    {
47        if let Some(listener) = config.binding.write().unwrap().take() {
48            log::debug!("taking prebound listener");
49            return listener;
50        }
51
52        use std::os::unix::prelude::FromRawFd;
53        let host = config.host();
54        if host.starts_with(|c| c == '/' || c == '.' || c == '~') {
55            Self::listener_from_unix(std::os::unix::net::UnixListener::bind(host).unwrap())
56        } else {
57            let tcp_listener = if let Some(fd) = std::env::var("LISTEN_FD")
58                .ok()
59                .and_then(|fd| fd.parse().ok())
60            {
61                log::debug!("using fd {} from LISTEN_FD", fd);
62                unsafe { std::net::TcpListener::from_raw_fd(fd) }
63            } else {
64                std::net::TcpListener::bind((host, config.port())).unwrap()
65            };
66
67            tcp_listener.set_nonblocking(true).unwrap();
68            Self::listener_from_tcp(tcp_listener)
69        }
70    }
71
72    /// Build a listener from the config. The default logic for this
73    /// is described elsewhere. To override the default logic, server
74    /// implementations could potentially implement this directly.  To
75    /// use this default logic, implement [`Server::listener_from_tcp`]
76    #[cfg(not(unix))]
77    fn build_listener<A>(config: &Config<Self, A>) -> Self
78    where
79        A: Acceptor<Self::Transport>,
80    {
81        if let Some(listener) = config.binding.write().unwrap().take() {
82            log::debug!("taking prebound listener");
83            return listener;
84        }
85
86        let tcp_listener = std::net::TcpListener::bind((config.host(), config.port())).unwrap();
87        tcp_listener.set_nonblocking(true).unwrap();
88        Self::listener_from_tcp(tcp_listener)
89    }
90
91    /// Build a Self::Listener from a tcp listener. This is called by
92    /// the [`Server::build_listener`] default implementation, and
93    /// is mandatory if the default implementation is used.
94    fn listener_from_tcp(_tcp: std::net::TcpListener) -> Self {
95        unimplemented!()
96    }
97
98    /// Build a Self::Listener from a tcp listener. This is called by
99    /// the [`Server::build_listener`] default implementation. You
100    /// will want to tag an implementation of this with #[cfg(unix)].
101    #[cfg(unix)]
102    fn listener_from_unix(_tcp: std::os::unix::net::UnixListener) -> Self {
103        unimplemented!()
104    }
105
106    /// Implementation hook for listening for any os signals and
107    /// stopping the provided [`Stopper`]. The returned future will be
108    /// spawned using [`Server::spawn`]
109    fn handle_signals(_stopper: Stopper) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
110        Box::pin(ready(()))
111    }
112
113    /// Runtime implementation hook for spawning a task.
114    fn spawn(fut: impl Future<Output = ()> + Send + 'static);
115
116    /// Runtime implementation hook for blocking on a top level future.
117    fn block_on(fut: impl Future<Output = ()> + 'static);
118
119    /// Run a trillium application from a sync context
120    fn run<A, H>(config: Config<Self, A>, handler: H)
121    where
122        A: Acceptor<Self::Transport>,
123        H: Handler,
124    {
125        Self::block_on(Self::run_async(config, handler))
126    }
127
128    /// Run a trillium application from an async context. The default
129    /// implementation of this method contains the core logic of this
130    /// Trait.
131    fn run_async<A, H>(
132        config: Config<Self, A>,
133        mut handler: H,
134    ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
135    where
136        A: Acceptor<Self::Transport>,
137        H: Handler,
138    {
139        Box::pin(async move {
140            if config.should_register_signals() {
141                #[cfg(unix)]
142                Self::spawn(Self::handle_signals(config.stopper()));
143
144                #[cfg(not(unix))]
145                log::error!("signals handling not supported on windows yet");
146            }
147
148            let mut listener = Self::build_listener(&config);
149            let mut info = Self::info(&listener);
150            info.server_description_mut().push_str(Self::DESCRIPTION);
151            handler.init(&mut info).await;
152            config.info.set(info);
153            let config = Arc::new(config);
154            let handler = Arc::new(handler);
155
156            while let Some(stream) = config
157                .stopper
158                .stop_future(Self::accept(&mut listener))
159                .await
160            {
161                match stream {
162                    Ok(stream) => {
163                        let config = Arc::clone(&config);
164                        let handler = Arc::clone(&handler);
165                        Self::spawn(async move { config.handle_stream(stream, handler).await })
166                    }
167                    Err(e) => log::error!("tcp error: {}", e),
168                }
169            }
170
171            config.graceful_shutdown().await;
172            Self::clean_up(listener).await;
173        })
174    }
175}