trillium_server_common/
server.rs1use crate::{Acceptor, Config, ConfigExt, Stopper, Transport};
2use std::{
3 future::{ready, Future},
4 io::Result,
5 pin::Pin,
6 sync::Arc,
7};
8use trillium::{Handler, Info};
9
10pub trait Server: Sized + Send + Sync + 'static {
14 type Transport: Transport;
18
19 const DESCRIPTION: &'static str;
21
22 fn accept(&mut self) -> Pin<Box<dyn Future<Output = Result<Self::Transport>> + Send + '_>>;
25
26 fn info(&self) -> Info;
29
30 fn clean_up(self) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
33 Box::pin(ready(()))
34 }
35
36 #[cfg(unix)]
43 fn build_listener<A>(config: &Config<Self, A>) -> Self
44 where
45 A: Acceptor<Self::Transport>,
46 {
47 if let Some(listener) = config.binding.write().unwrap().take() {
48 log::debug!("taking prebound listener");
49 return listener;
50 }
51
52 use std::os::unix::prelude::FromRawFd;
53 let host = config.host();
54 if host.starts_with(|c| c == '/' || c == '.' || c == '~') {
55 Self::listener_from_unix(std::os::unix::net::UnixListener::bind(host).unwrap())
56 } else {
57 let tcp_listener = if let Some(fd) = std::env::var("LISTEN_FD")
58 .ok()
59 .and_then(|fd| fd.parse().ok())
60 {
61 log::debug!("using fd {} from LISTEN_FD", fd);
62 unsafe { std::net::TcpListener::from_raw_fd(fd) }
63 } else {
64 std::net::TcpListener::bind((host, config.port())).unwrap()
65 };
66
67 tcp_listener.set_nonblocking(true).unwrap();
68 Self::listener_from_tcp(tcp_listener)
69 }
70 }
71
72 #[cfg(not(unix))]
77 fn build_listener<A>(config: &Config<Self, A>) -> Self
78 where
79 A: Acceptor<Self::Transport>,
80 {
81 if let Some(listener) = config.binding.write().unwrap().take() {
82 log::debug!("taking prebound listener");
83 return listener;
84 }
85
86 let tcp_listener = std::net::TcpListener::bind((config.host(), config.port())).unwrap();
87 tcp_listener.set_nonblocking(true).unwrap();
88 Self::listener_from_tcp(tcp_listener)
89 }
90
91 fn listener_from_tcp(_tcp: std::net::TcpListener) -> Self {
95 unimplemented!()
96 }
97
98 #[cfg(unix)]
102 fn listener_from_unix(_tcp: std::os::unix::net::UnixListener) -> Self {
103 unimplemented!()
104 }
105
106 fn handle_signals(_stopper: Stopper) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> {
110 Box::pin(ready(()))
111 }
112
113 fn spawn(fut: impl Future<Output = ()> + Send + 'static);
115
116 fn block_on(fut: impl Future<Output = ()> + 'static);
118
119 fn run<A, H>(config: Config<Self, A>, handler: H)
121 where
122 A: Acceptor<Self::Transport>,
123 H: Handler,
124 {
125 Self::block_on(Self::run_async(config, handler))
126 }
127
128 fn run_async<A, H>(
132 config: Config<Self, A>,
133 mut handler: H,
134 ) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>
135 where
136 A: Acceptor<Self::Transport>,
137 H: Handler,
138 {
139 Box::pin(async move {
140 if config.should_register_signals() {
141 #[cfg(unix)]
142 Self::spawn(Self::handle_signals(config.stopper()));
143
144 #[cfg(not(unix))]
145 log::error!("signals handling not supported on windows yet");
146 }
147
148 let mut listener = Self::build_listener(&config);
149 let mut info = Self::info(&listener);
150 info.server_description_mut().push_str(Self::DESCRIPTION);
151 handler.init(&mut info).await;
152 config.info.set(info);
153 let config = Arc::new(config);
154 let handler = Arc::new(handler);
155
156 while let Some(stream) = config
157 .stopper
158 .stop_future(Self::accept(&mut listener))
159 .await
160 {
161 match stream {
162 Ok(stream) => {
163 let config = Arc::clone(&config);
164 let handler = Arc::clone(&handler);
165 Self::spawn(async move { config.handle_stream(stream, handler).await })
166 }
167 Err(e) => log::error!("tcp error: {}", e),
168 }
169 }
170
171 config.graceful_shutdown().await;
172 Self::clean_up(listener).await;
173 })
174 }
175}