mod future;
mod handle;
mod service;
pub use self::{future::ServerFuture, handle::ServerHandle};
pub(crate) use self::service::{IntoServiceObj, ServiceObj};
use std::{
io, mem,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
};
use tokio::{
runtime::Runtime,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use crate::{builder::Builder, worker};
pub struct Server {
is_graceful_shutdown: Arc<AtomicBool>,
tx_cmd: UnboundedSender<Command>,
rx_cmd: UnboundedReceiver<Command>,
rt: Option<Runtime>,
worker_join_handles: Vec<thread::JoinHandle<io::Result<()>>>,
}
impl Server {
#[cfg(target_family = "wasm")]
pub fn new(builder: Builder) -> io::Result<Self> {
let Builder {
listeners,
factories,
shutdown_timeout,
on_worker_start,
..
} = builder;
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
let fut = async {
listeners
.into_iter()
.flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l))))
.collect::<Result<Vec<_>, io::Error>>()
};
let listeners = rt.block_on(fut)?;
let is_graceful_shutdown = Arc::new(AtomicBool::new(false));
let on_start_fut = on_worker_start();
let fut = async {
on_start_fut.await;
let mut handles = Vec::new();
let mut services = Vec::new();
for (name, factory) in factories.iter() {
let (h, s) = factory
.call((name, &listeners))
.await
.map_err(|_| io::Error::from(io::ErrorKind::Other))?;
handles.extend(h);
services.push(s);
}
worker::wait_for_stop(handles, services, shutdown_timeout, &is_graceful_shutdown).await;
Ok::<_, io::Error>(())
};
rt.block_on(tokio::task::LocalSet::new().run_until(fut))?;
unreachable!("")
}
#[cfg(not(target_family = "wasm"))]
pub fn new(builder: Builder) -> io::Result<Self> {
let Builder {
server_threads,
worker_threads,
worker_max_blocking_threads,
listeners,
factories,
shutdown_timeout,
on_worker_start,
..
} = builder;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(server_threads)
.build()?;
let fut = async {
listeners
.into_iter()
.flat_map(|(name, listeners)| listeners.into_iter().map(move |l| l().map(|l| (name.to_owned(), l))))
.collect::<Result<Vec<_>, io::Error>>()
};
let listeners = thread::scope(|s| s.spawn(|| rt.block_on(fut)).join()).unwrap()?;
let is_graceful_shutdown = Arc::new(AtomicBool::new(false));
let is_graceful_shutdown2 = is_graceful_shutdown.clone();
let worker_handles = thread::Builder::new()
.name(String::from("xitca-server-worker-shared-scope"))
.spawn(move || {
let is_graceful_shutdown = is_graceful_shutdown2;
thread::scope(|scope| {
for idx in 0..worker_threads {
let thread = thread::Builder::new().name(format!("xitca-server-worker-{idx}"));
let task = || async {
on_worker_start().await;
let mut handles = Vec::new();
let mut services = Vec::new();
for (name, factory) in factories.iter() {
match factory.call((name, &listeners)).await {
Ok((h, s)) => {
handles.extend(h);
services.push(s);
}
Err(_) => return,
}
}
worker::wait_for_stop(handles, services, shutdown_timeout, &is_graceful_shutdown).await;
};
#[cfg(not(feature = "io-uring"))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(worker_max_blocking_threads)
.build()?;
thread.spawn_scoped(scope, move || {
rt.block_on(tokio::task::LocalSet::new().run_until(task()))
})?;
}
#[cfg(feature = "io-uring")]
{
thread.spawn_scoped(scope, move || {
let _ = worker_max_blocking_threads;
tokio_uring_xitca::start(task())
})?;
}
}
Ok(())
})
})?;
let (tx_cmd, rx_cmd) = tokio::sync::mpsc::unbounded_channel();
Ok(Self {
is_graceful_shutdown,
tx_cmd,
rx_cmd,
rt: Some(rt),
worker_join_handles: vec![worker_handles],
})
}
pub(crate) fn stop(&mut self, graceful: bool) {
if let Some(rt) = self.rt.take() {
self.is_graceful_shutdown.store(graceful, Ordering::SeqCst);
rt.shutdown_background();
mem::take(&mut self.worker_join_handles).into_iter().for_each(|handle| {
let _ = handle.join().unwrap();
});
}
}
}
enum Command {
GracefulStop,
ForceStop,
}