mod shutdown;
use core::{any::Any, sync::atomic::AtomicBool, time::Duration};
use std::{io, rc::Rc, thread};
use tokio::{task::JoinHandle, time::sleep};
use tracing::{error, info};
use xitca_io::net::Stream;
use xitca_service::{Service, ready::ReadyService};
use crate::net::ListenerDyn;
use self::shutdown::ShutdownHandle;
pub(crate) type ServiceAny = Rc<dyn Any>;
pub(crate) fn start<S, Req>(listener: &ListenerDyn, service: &Rc<S>) -> JoinHandle<()>
where
S: ReadyService + Service<Req> + 'static,
S::Ready: 'static,
Req: TryFrom<Stream> + 'static,
{
let listener = listener.clone();
let service = service.clone();
tokio::task::spawn_local(async move {
loop {
let ready = service.ready().await;
match listener.accept_dyn().await {
Ok(stream) => {
if let Ok(req) = TryFrom::try_from(stream) {
let service = service.clone();
tokio::task::spawn_local(async move {
let _ = service.call(req).await;
drop(ready);
});
}
}
Err(ref e) if connection_error(e) => continue,
Err(ref e) if fatal_error(e) => return,
Err(ref e) if os_error(e) => {
error!("Error accepting connection: {e}");
sleep(Duration::from_secs(1)).await;
}
Err(_) => return,
}
}
})
}
pub(crate) async fn wait_for_stop(
handles: Vec<JoinHandle<()>>,
services: Vec<ServiceAny>,
shutdown_timeout: Duration,
is_graceful_shutdown: &AtomicBool,
) {
with_worker_name_str(|name| info!("Started {name}"));
let shutdown_handle = ShutdownHandle::new(shutdown_timeout, services, is_graceful_shutdown);
for handle in handles {
handle
.await
.unwrap_or_else(|e| with_worker_name_str(|name| error!("{name} exit on error: {e}")));
}
shutdown_handle.shutdown().await;
}
#[cold]
#[inline(never)]
fn with_worker_name_str<F, O>(func: F) -> O
where
F: FnOnce(&str) -> O,
{
match thread::current().name() {
Some(name) => func(name),
None => func("xitca-server-worker"),
}
}
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
fn fatal_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::BrokenPipe
}
fn os_error(e: &io::Error) -> bool {
e.raw_os_error().is_some()
}