use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
thread,
time::Duration,
};
use actix_rt::{time::sleep, System};
use futures_core::{future::BoxFuture, Stream};
use futures_util::stream::StreamExt as _;
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
use tracing::{error, info};
use crate::{
accept::Accept,
builder::ServerBuilder,
join_all::join_all,
service::InternalServiceFactory,
signals::{SignalKind, Signals},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle,
};
#[derive(Debug)]
pub(crate) enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Stop {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
force_system_stop: bool,
},
}
#[must_use = "Server does nothing unless you `.await` or poll it"]
pub struct Server {
handle: ServerHandle,
fut: BoxFuture<'static, io::Result<()>>,
}
impl Server {
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
pub(crate) fn new(builder: ServerBuilder) -> Self {
Server {
handle: ServerHandle::new(builder.cmd_tx.clone()),
fut: Box::pin(ServerInner::run(builder)),
}
}
pub fn handle(&self) -> ServerHandle {
self.handle.clone()
}
}
impl Future for Server {
type Output = io::Result<()>;
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
}
}
pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
accept_handle: Option<thread::JoinHandle<()>>,
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue,
system_stop: bool,
stopping: bool,
}
impl ServerInner {
async fn run(builder: ServerBuilder) -> io::Result<()> {
let (mut this, mut mux) = Self::run_sync(builder)?;
while let Some(cmd) = mux.next().await {
this.handle_cmd(cmd).await;
if this.stopping {
break;
}
}
Ok(())
}
fn run_sync(mut builder: ServerBuilder) -> io::Result<(Self, ServerEventMultiplexer)> {
let sockets = mem::take(&mut builder.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect();
let is_actix = actix_rt::System::try_current().is_some();
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
match (is_actix, is_tokio) {
(true, _) => info!("Actix runtime found; starting in Actix runtime"),
(_, true) => info!("Tokio runtime found; starting in existing Tokio runtime"),
(_, false) => panic!("Actix or Tokio runtime not found; halting"),
}
for (_, name, lst) in &builder.sockets {
info!(
r#"starting service: "{}", workers: {}, listening on: {}"#,
name,
builder.threads,
lst.local_addr()
);
}
let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
let mux = ServerEventMultiplexer {
signal_fut: (builder.listen_os_signals).then(Signals::new),
cmd_rx: builder.cmd_rx,
};
let server = ServerInner {
waker_queue,
accept_handle: Some(accept_handle),
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
system_stop: builder.exit,
stopping: false,
};
Ok((server, mux))
}
async fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
}
ServerCommand::Stop {
graceful,
completion,
force_system_stop,
} => {
self.stopping = true;
self.waker_queue.wake(WakerInterest::Stop);
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
if graceful {
let _ = join_all(workers_stop).await;
}
self.accept_handle
.take()
.unwrap()
.join()
.expect("Accept thread must not panic in any case");
if let Some(tx) = completion {
let _ = tx.send(());
}
if self.system_stop || force_system_stop {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}
ServerCommand::WorkerFaulted(idx) => {
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
error!("worker {} has died; restarting", idx);
let factories = self
.services
.iter()
.map(|service| service.clone_factory())
.collect();
match ServerWorker::start(
idx,
factories,
self.waker_queue.clone(),
self.worker_config,
) {
Ok((handle_accept, handle_server)) => {
*self
.worker_handles
.iter_mut()
.find(|wrk| wrk.idx == idx)
.unwrap() = handle_server;
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
}
Err(err) => error!("can not restart worker {}: {}", idx, err),
};
}
}
}
fn map_signal(signal: SignalKind) -> ServerCommand {
match signal {
SignalKind::Int => {
info!("SIGINT received; starting forced shutdown");
ServerCommand::Stop {
graceful: false,
completion: None,
force_system_stop: true,
}
}
SignalKind::Term => {
info!("SIGTERM received; starting graceful shutdown");
ServerCommand::Stop {
graceful: true,
completion: None,
force_system_stop: true,
}
}
SignalKind::Quit => {
info!("SIGQUIT received; starting forced shutdown");
ServerCommand::Stop {
graceful: false,
completion: None,
force_system_stop: true,
}
}
}
}
}
struct ServerEventMultiplexer {
cmd_rx: UnboundedReceiver<ServerCommand>,
signal_fut: Option<Signals>,
}
impl Stream for ServerEventMultiplexer {
type Item = ServerCommand;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);
if let Some(signal_fut) = &mut this.signal_fut {
if let Poll::Ready(signal) = Pin::new(signal_fut).poll(cx) {
this.signal_fut = None;
return Poll::Ready(Some(ServerInner::map_signal(signal)));
}
}
this.cmd_rx.poll_recv(cx)
}
}