1use std::{collections::HashMap, future::Future, io, pin::Pin, time::Duration};
2
3#[cfg(not(target_family = "wasm"))]
4use std::net;
5
6use std::sync::Arc;
7
8use xitca_io::net::Stream;
9
10use crate::{
11 net::{IntoListener, ListenerDyn},
12 server::{IntoServiceObj, Server, ServerFuture, ServiceObj},
13};
14
15type ListenerFn = Box<dyn FnOnce() -> io::Result<ListenerDyn> + Send>;
16
17pub struct Builder {
18 pub(crate) server_threads: usize,
19 pub(crate) worker_threads: usize,
20 pub(crate) worker_max_blocking_threads: usize,
21 pub(crate) listeners: HashMap<String, Vec<ListenerFn>>,
22 pub(crate) factories: HashMap<String, ServiceObj>,
23 pub(crate) enable_signal: bool,
24 pub(crate) shutdown_timeout: Duration,
25 pub(crate) on_worker_start: Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
26 backlog: u32,
27}
28
29impl Default for Builder {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl Builder {
36 pub fn new() -> Self {
38 Self {
39 server_threads: 1,
40 worker_threads: std::thread::available_parallelism().map(|size| size.get()).unwrap_or(1),
41 worker_max_blocking_threads: 512,
42 listeners: HashMap::new(),
43 factories: HashMap::new(),
44 enable_signal: true,
45 shutdown_timeout: Duration::from_secs(30),
46 on_worker_start: Box::new(|| Box::pin(async {})),
47 backlog: 2048,
48 }
49 }
50
51 pub fn server_threads(mut self, num: usize) -> Self {
58 assert_ne!(num, 0, "There must be at least one server thread");
59 self.server_threads = num;
60 self
61 }
62
63 pub fn worker_threads(mut self, num: usize) -> Self {
70 assert_ne!(num, 0, "There must be at least one worker thread");
71
72 self.worker_threads = num;
73 self
74 }
75
76 pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
90 assert_ne!(num, 0, "Blocking threads must be higher than 0");
91
92 self.worker_max_blocking_threads = num;
93 self
94 }
95
96 pub fn disable_signal(mut self) -> Self {
99 self.enable_signal = false;
100 self
101 }
102
103 pub fn shutdown_timeout(mut self, secs: u64) -> Self {
110 self.shutdown_timeout = Duration::from_secs(secs);
111 self
112 }
113
114 pub fn backlog(mut self, num: u32) -> Self {
115 self.backlog = num;
116 self
117 }
118
119 #[doc(hidden)]
120 pub fn on_worker_start<F, Fut>(mut self, on_start: F) -> Self
124 where
125 F: Fn() -> Fut + Send + Sync + 'static,
126 Fut: Future + Send + 'static,
127 {
128 self.on_worker_start = Box::new(move || {
129 let fut = on_start();
130 Box::pin(async {
131 fut.await;
132 })
133 });
134
135 self
136 }
137
138 pub fn listen<N, L, F, St>(mut self, name: N, listener: L, service: F) -> Self
139 where
140 N: AsRef<str>,
141 F: IntoServiceObj<St>,
142 St: TryFrom<Stream> + 'static,
143 L: IntoListener + 'static,
144 {
145 self.listeners
146 .entry(name.as_ref().to_string())
147 .or_default()
148 .push(Box::new(|| listener.into_listener().map(|l| Arc::new(l) as _)));
149
150 self.factories.insert(name.as_ref().to_string(), service.into_object());
151
152 self
153 }
154
155 pub fn build(self) -> ServerFuture {
156 let enable_signal = self.enable_signal;
157 match Server::new(self) {
158 Ok(server) => ServerFuture::Init { server, enable_signal },
159 Err(e) => ServerFuture::Error(e),
160 }
161 }
162}
163
164#[cfg(not(target_family = "wasm"))]
165impl Builder {
166 pub fn bind<N, A, F, St>(self, name: N, addr: A, service: F) -> io::Result<Self>
167 where
168 N: AsRef<str>,
169 A: net::ToSocketAddrs,
170 F: IntoServiceObj<St>,
171 St: TryFrom<Stream> + 'static,
172 {
173 let addr = addr
174 .to_socket_addrs()?
175 .next()
176 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
177
178 self._bind(name, addr, service)
179 }
180
181 fn _bind<N, F, St>(self, name: N, addr: net::SocketAddr, service: F) -> io::Result<Self>
182 where
183 N: AsRef<str>,
184 F: IntoServiceObj<St>,
185 St: TryFrom<Stream> + 'static,
186 {
187 let listener = net::TcpListener::bind(addr)?;
188
189 let socket = socket2::SockRef::from(&listener);
190 socket.set_reuse_address(true)?;
191 socket.listen(self.backlog as _)?;
192
193 Ok(self.listen(name, listener, service))
194 }
195}
196
197#[cfg(unix)]
198impl Builder {
199 pub fn bind_unix<N, P, F, St>(self, name: N, path: P, service: F) -> io::Result<Self>
200 where
201 N: AsRef<str>,
202 P: AsRef<std::path::Path>,
203 F: IntoServiceObj<St>,
204 St: TryFrom<Stream> + 'static,
205 {
206 if let Err(e) = std::fs::remove_file(path.as_ref()) {
209 if e.kind() != io::ErrorKind::NotFound {
211 return Err(e);
212 }
213 }
214
215 let listener = std::os::unix::net::UnixListener::bind(path)?;
216
217 Ok(self.listen(name, listener, service))
218 }
219}
220
221#[cfg(feature = "quic")]
222impl Builder {
223 pub fn bind_all<N, A, F>(
226 mut self,
227 name: N,
228 addr: A,
229 config: xitca_io::net::QuicConfig,
230 service: F,
231 ) -> io::Result<Self>
232 where
233 N: AsRef<str>,
234 A: net::ToSocketAddrs,
235 F: IntoServiceObj<Stream>,
236 {
237 let addr = addr
238 .to_socket_addrs()?
239 .next()
240 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
241
242 self = self._bind(name.as_ref(), addr, service)?;
243
244 let builder = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);
245
246 self.listeners
247 .get_mut(name.as_ref())
248 .unwrap()
249 .push(Box::new(|| builder.into_listener().map(|l| Arc::new(l) as _)));
250
251 Ok(self)
252 }
253
254 pub fn bind_h3<N, A, F, St>(
255 self,
256 name: N,
257 addr: A,
258 config: xitca_io::net::QuicConfig,
259 service: F,
260 ) -> io::Result<Self>
261 where
262 N: AsRef<str>,
263 A: net::ToSocketAddrs,
264 F: IntoServiceObj<St>,
265 St: TryFrom<Stream> + 'static,
266 {
267 let addr = addr
268 .to_socket_addrs()?
269 .next()
270 .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
271
272 let listener = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);
273
274 Ok(self.listen(name, listener, service))
275 }
276}