xitca_server/
builder.rs

1use std::{collections::HashMap, future::Future, io, pin::Pin, time::Duration};
2
3#[cfg(not(target_family = "wasm"))]
4use std::net;
5
6use std::sync::Arc;
7
8use xitca_io::net::Stream;
9
10use crate::{
11    net::{IntoListener, ListenerDyn},
12    server::{IntoServiceObj, Server, ServerFuture, ServiceObj},
13};
14
15type ListenerFn = Box<dyn FnOnce() -> io::Result<ListenerDyn> + Send>;
16
17pub struct Builder {
18    pub(crate) server_threads: usize,
19    pub(crate) worker_threads: usize,
20    pub(crate) worker_max_blocking_threads: usize,
21    pub(crate) listeners: HashMap<String, Vec<ListenerFn>>,
22    pub(crate) factories: HashMap<String, ServiceObj>,
23    pub(crate) enable_signal: bool,
24    pub(crate) shutdown_timeout: Duration,
25    pub(crate) on_worker_start: Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
26    backlog: u32,
27}
28
29impl Default for Builder {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl Builder {
36    /// Create new Builder instance
37    pub fn new() -> Self {
38        Self {
39            server_threads: 1,
40            worker_threads: std::thread::available_parallelism().map(|size| size.get()).unwrap_or(1),
41            worker_max_blocking_threads: 512,
42            listeners: HashMap::new(),
43            factories: HashMap::new(),
44            enable_signal: true,
45            shutdown_timeout: Duration::from_secs(30),
46            on_worker_start: Box::new(|| Box::pin(async {})),
47            backlog: 2048,
48        }
49    }
50
51    /// Set number of threads dedicated to accepting connections.
52    ///
53    /// Default set to 1.
54    ///
55    /// # Panics:
56    /// When receive 0 as number of server thread.
57    pub fn server_threads(mut self, num: usize) -> Self {
58        assert_ne!(num, 0, "There must be at least one server thread");
59        self.server_threads = num;
60        self
61    }
62
63    /// Set number of workers to start.
64    ///
65    /// Default set to available logical cpu as workers count.
66    ///
67    /// # Panics:
68    /// When received 0 as number of worker thread.
69    pub fn worker_threads(mut self, num: usize) -> Self {
70        assert_ne!(num, 0, "There must be at least one worker thread");
71
72        self.worker_threads = num;
73        self
74    }
75
76    /// Set max number of threads for each worker's blocking task thread pool.
77    ///
78    /// One thread pool is set up **per worker**; not shared across workers.
79    ///
80    /// # Examples:
81    /// ```
82    /// # use xitca_server::Builder;
83    /// let builder = Builder::new()
84    ///     .worker_threads(4) // server has 4 worker threads.
85    ///     .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
86    /// ```
87    ///
88    /// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
89    pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
90        assert_ne!(num, 0, "Blocking threads must be higher than 0");
91
92        self.worker_max_blocking_threads = num;
93        self
94    }
95
96    /// Disable signal listening.
97    /// Server would only be shutdown from [ServerHandle](crate::server::ServerHandle)
98    pub fn disable_signal(mut self) -> Self {
99        self.enable_signal = false;
100        self
101    }
102
103    /// Timeout for graceful workers shutdown in seconds.
104    ///
105    /// After receiving a stop signal, workers have this much time to finish serving requests.
106    /// Workers still alive after the timeout are force dropped.
107    ///
108    /// By default shutdown timeout sets to 30 seconds.
109    pub fn shutdown_timeout(mut self, secs: u64) -> Self {
110        self.shutdown_timeout = Duration::from_secs(secs);
111        self
112    }
113
114    pub fn backlog(mut self, num: u32) -> Self {
115        self.backlog = num;
116        self
117    }
118
119    #[doc(hidden)]
120    /// Async callback called when worker thread is spawned.
121    ///
122    /// *. This API is subject to change with no stable guarantee.
123    pub fn on_worker_start<F, Fut>(mut self, on_start: F) -> Self
124    where
125        F: Fn() -> Fut + Send + Sync + 'static,
126        Fut: Future + Send + 'static,
127    {
128        self.on_worker_start = Box::new(move || {
129            let fut = on_start();
130            Box::pin(async {
131                fut.await;
132            })
133        });
134
135        self
136    }
137
138    pub fn listen<N, L, F, St>(mut self, name: N, listener: L, service: F) -> Self
139    where
140        N: AsRef<str>,
141        F: IntoServiceObj<St>,
142        St: TryFrom<Stream> + 'static,
143        L: IntoListener + 'static,
144    {
145        self.listeners
146            .entry(name.as_ref().to_string())
147            .or_default()
148            .push(Box::new(|| listener.into_listener().map(|l| Arc::new(l) as _)));
149
150        self.factories.insert(name.as_ref().to_string(), service.into_object());
151
152        self
153    }
154
155    pub fn build(self) -> ServerFuture {
156        let enable_signal = self.enable_signal;
157        match Server::new(self) {
158            Ok(server) => ServerFuture::Init { server, enable_signal },
159            Err(e) => ServerFuture::Error(e),
160        }
161    }
162}
163
164#[cfg(not(target_family = "wasm"))]
165impl Builder {
166    pub fn bind<N, A, F, St>(self, name: N, addr: A, service: F) -> io::Result<Self>
167    where
168        N: AsRef<str>,
169        A: net::ToSocketAddrs,
170        F: IntoServiceObj<St>,
171        St: TryFrom<Stream> + 'static,
172    {
173        let addr = addr
174            .to_socket_addrs()?
175            .next()
176            .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
177
178        self._bind(name, addr, service)
179    }
180
181    fn _bind<N, F, St>(self, name: N, addr: net::SocketAddr, service: F) -> io::Result<Self>
182    where
183        N: AsRef<str>,
184        F: IntoServiceObj<St>,
185        St: TryFrom<Stream> + 'static,
186    {
187        let listener = net::TcpListener::bind(addr)?;
188
189        let socket = socket2::SockRef::from(&listener);
190        socket.set_reuse_address(true)?;
191        socket.listen(self.backlog as _)?;
192
193        Ok(self.listen(name, listener, service))
194    }
195}
196
197#[cfg(unix)]
198impl Builder {
199    pub fn bind_unix<N, P, F, St>(self, name: N, path: P, service: F) -> io::Result<Self>
200    where
201        N: AsRef<str>,
202        P: AsRef<std::path::Path>,
203        F: IntoServiceObj<St>,
204        St: TryFrom<Stream> + 'static,
205    {
206        // The path must not exist when we try to bind.
207        // Try to remove it to avoid bind error.
208        if let Err(e) = std::fs::remove_file(path.as_ref()) {
209            // NotFound is expected and not an issue. Anything else is.
210            if e.kind() != io::ErrorKind::NotFound {
211                return Err(e);
212            }
213        }
214
215        let listener = std::os::unix::net::UnixListener::bind(path)?;
216
217        Ok(self.listen(name, listener, service))
218    }
219}
220
221#[cfg(feature = "quic")]
222impl Builder {
223    /// Bind to both Tcp and Udp of the same address to enable http/1/2/3 handling
224    /// with single service.
225    pub fn bind_all<N, A, F>(
226        mut self,
227        name: N,
228        addr: A,
229        config: xitca_io::net::QuicConfig,
230        service: F,
231    ) -> io::Result<Self>
232    where
233        N: AsRef<str>,
234        A: net::ToSocketAddrs,
235        F: IntoServiceObj<Stream>,
236    {
237        let addr = addr
238            .to_socket_addrs()?
239            .next()
240            .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
241
242        self = self._bind(name.as_ref(), addr, service)?;
243
244        let builder = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);
245
246        self.listeners
247            .get_mut(name.as_ref())
248            .unwrap()
249            .push(Box::new(|| builder.into_listener().map(|l| Arc::new(l) as _)));
250
251        Ok(self)
252    }
253
254    pub fn bind_h3<N, A, F, St>(
255        self,
256        name: N,
257        addr: A,
258        config: xitca_io::net::QuicConfig,
259        service: F,
260    ) -> io::Result<Self>
261    where
262        N: AsRef<str>,
263        A: net::ToSocketAddrs,
264        F: IntoServiceObj<St>,
265        St: TryFrom<Stream> + 'static,
266    {
267        let addr = addr
268            .to_socket_addrs()?
269            .next()
270            .ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
271
272        let listener = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);
273
274        Ok(self.listen(name, listener, service))
275    }
276}