use std::{collections::HashMap, future::Future, net, pin::Pin, time::Duration};
#[cfg(not(target_family = "wasm"))]
use std::io;
use xitca_io::net::Stream;
use crate::{
net::AsListener,
server::{IntoServiceObj, Server, ServerFuture, ServiceObj},
};
pub struct Builder {
pub(crate) server_threads: usize,
pub(crate) worker_threads: usize,
pub(crate) worker_max_blocking_threads: usize,
pub(crate) listeners: HashMap<String, Vec<Box<dyn AsListener>>>,
pub(crate) factories: HashMap<String, ServiceObj>,
pub(crate) enable_signal: bool,
pub(crate) shutdown_timeout: Duration,
pub(crate) on_worker_start: Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
backlog: u32,
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
impl Builder {
pub fn new() -> Self {
Self {
server_threads: 1,
worker_threads: std::thread::available_parallelism().map(|size| size.get()).unwrap_or(1),
worker_max_blocking_threads: 512,
listeners: HashMap::new(),
factories: HashMap::new(),
enable_signal: true,
shutdown_timeout: Duration::from_secs(30),
on_worker_start: Box::new(|| Box::pin(async {})),
backlog: 2048,
}
}
pub fn server_threads(mut self, num: usize) -> Self {
assert_ne!(num, 0, "There must be at least one server thread");
self.server_threads = num;
self
}
pub fn worker_threads(mut self, num: usize) -> Self {
assert_ne!(num, 0, "There must be at least one worker thread");
self.worker_threads = num;
self
}
pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
assert_ne!(num, 0, "Blocking threads must be higher than 0");
self.worker_max_blocking_threads = num;
self
}
pub fn disable_signal(mut self) -> Self {
self.enable_signal = false;
self
}
pub fn shutdown_timeout(mut self, secs: u64) -> Self {
self.shutdown_timeout = Duration::from_secs(secs);
self
}
pub fn backlog(mut self, num: u32) -> Self {
self.backlog = num;
self
}
#[doc(hidden)]
pub fn on_worker_start<F, Fut>(mut self, on_start: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future + Send + 'static,
{
self.on_worker_start = Box::new(move || {
let fut = on_start();
Box::pin(async {
fut.await;
})
});
self
}
pub fn listen<N, L, F, St>(mut self, name: N, listener: L, service: F) -> Self
where
N: AsRef<str>,
F: IntoServiceObj<St>,
St: TryFrom<Stream> + 'static,
Option<L>: AsListener + 'static,
{
self.listeners
.entry(name.as_ref().to_string())
.or_default()
.push(Box::new(Some(listener)));
self.factories.insert(name.as_ref().to_string(), service.into_object());
self
}
pub fn build(self) -> ServerFuture {
let enable_signal = self.enable_signal;
match Server::new(self) {
Ok(server) => ServerFuture::Init { server, enable_signal },
Err(e) => ServerFuture::Error(e),
}
}
}
#[cfg(not(target_family = "wasm"))]
impl Builder {
pub fn bind<N, A, F, St>(self, name: N, addr: A, service: F) -> io::Result<Self>
where
N: AsRef<str>,
A: net::ToSocketAddrs,
F: IntoServiceObj<St>,
St: TryFrom<Stream> + 'static,
{
let addr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
self._bind(name, addr, service)
}
fn _bind<N, F, St>(self, name: N, addr: net::SocketAddr, service: F) -> io::Result<Self>
where
N: AsRef<str>,
F: IntoServiceObj<St>,
St: TryFrom<Stream> + 'static,
{
let listener = net::TcpListener::bind(addr)?;
listener.set_nonblocking(true)?;
let socket = socket2::SockRef::from(&listener);
socket.set_reuse_address(true)?;
socket.listen(self.backlog as _)?;
Ok(self.listen(name, listener, service))
}
}
#[cfg(unix)]
impl Builder {
pub fn bind_unix<N, P, F, St>(self, name: N, path: P, service: F) -> io::Result<Self>
where
N: AsRef<str>,
P: AsRef<std::path::Path>,
F: IntoServiceObj<St>,
St: TryFrom<Stream> + 'static,
{
if let Err(e) = std::fs::remove_file(path.as_ref()) {
if e.kind() != io::ErrorKind::NotFound {
return Err(e);
}
}
let listener = std::os::unix::net::UnixListener::bind(path)?;
Ok(self.listen(name, listener, service))
}
}
#[cfg(feature = "quic")]
impl Builder {
pub fn bind_all<N, A, F>(
mut self,
name: N,
addr: A,
config: xitca_io::net::QuicConfig,
service: F,
) -> io::Result<Self>
where
N: AsRef<str>,
A: net::ToSocketAddrs,
F: IntoServiceObj<Stream>,
{
let addr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
self = self._bind(name.as_ref(), addr, service)?;
let builder = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);
self.listeners
.get_mut(name.as_ref())
.unwrap()
.push(Box::new(Some(builder)));
Ok(self)
}
pub fn bind_h3<N, A, F, St>(
self,
name: N,
addr: A,
config: xitca_io::net::QuicConfig,
service: F,
) -> io::Result<Self>
where
N: AsRef<str>,
A: net::ToSocketAddrs,
F: IntoServiceObj<St>,
St: TryFrom<Stream> + 'static,
{
let addr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::AddrNotAvailable, "Can not parse SocketAddr"))?;
let listener = xitca_io::net::QuicListenerBuilder::new(addr, config).backlog(self.backlog);
Ok(self.listen(name, listener, service))
}
}