#![allow(clippy::missing_panics_doc)]
use std::time::{Duration, Instant};
use std::{cell::Cell, fmt, io, sync::Arc, sync::mpsc, thread};
use std::{collections::VecDeque, num::NonZeroUsize};
use ntex_polling::{Event, Events, Poller};
use ntex_rt::System;
use ntex_util::{future::Either, time::Millis, time::sleep};
use super::socket::{Connection, Listener, SocketAddr};
use super::{Server, ServerStatus, Token};
const EXIT_TIMEOUT: Duration = Duration::from_millis(100);
const ERR_TIMEOUT: Duration = Duration::from_millis(500);
const ERR_SLEEP_TIMEOUT: Millis = Millis(525);
#[derive(Debug)]
pub enum AcceptorCommand {
Stop(oneshot::Sender<()>),
Terminate,
Pause,
Resume,
Timer,
}
#[derive(Debug)]
struct ServerSocketInfo {
addr: SocketAddr,
token: Token,
sock: Listener,
registered: Cell<bool>,
timeout: Cell<Option<Instant>>,
}
#[derive(Debug, Clone)]
pub struct AcceptNotify(Arc<Poller>, mpsc::Sender<AcceptorCommand>);
impl AcceptNotify {
fn new(waker: Arc<Poller>, tx: mpsc::Sender<AcceptorCommand>) -> Self {
AcceptNotify(waker, tx)
}
pub fn send(&self, cmd: AcceptorCommand) {
let _ = self.1.send(cmd);
let _ = self.0.notify();
}
}
pub struct AcceptLoop {
name: String,
testing: bool,
notify: AcceptNotify,
inner: Option<(mpsc::Receiver<AcceptorCommand>, Arc<Poller>)>,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}
impl Default for AcceptLoop {
fn default() -> Self {
Self::new()
}
}
impl AcceptLoop {
pub fn new() -> AcceptLoop {
let poll = Arc::new(
Poller::new()
.map_err(|e| panic!("Cannot create Poller {e}"))
.unwrap(),
);
let (tx, rx) = mpsc::channel();
let notify = AcceptNotify::new(poll.clone(), tx);
AcceptLoop {
notify,
name: "ntex:accept".to_string(),
inner: Some((rx, poll)),
testing: false,
status_handler: None,
}
}
pub fn name<T: AsRef<str>>(&mut self, name: T) {
self.name = format!("{}:accept", name.as_ref());
}
pub fn notify(&self) -> AcceptNotify {
self.notify.clone()
}
pub fn set_status_handler<F>(&mut self, f: F)
where
F: FnMut(ServerStatus) + Send + 'static,
{
self.status_handler = Some(Box::new(f));
}
pub fn testing(&mut self) {
self.testing = true;
}
pub fn start(mut self, socks: Vec<(Token, Listener)>, srv: Server) {
let (tx, rx_start) = oneshot::channel();
let (rx, poll) = self
.inner
.take()
.expect("AcceptLoop cannot be used multiple times");
Accept::start(
tx,
rx,
poll,
socks,
srv,
self.name.clone(),
self.notify.clone(),
self.testing,
self.status_handler.take(),
);
let _ = rx_start.recv();
}
}
impl fmt::Debug for AcceptLoop {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AcceptLoop")
.field("name", &self.name)
.field("notify", &self.notify)
.field("inner", &self.inner)
.field("status_handler", &self.status_handler.is_some())
.finish()
}
}
struct Accept {
name: String,
poller: Arc<Poller>,
rx: mpsc::Receiver<AcceptorCommand>,
tx: Option<oneshot::Sender<()>>,
sockets: Vec<ServerSocketInfo>,
srv: Server,
notify: AcceptNotify,
testing: bool,
backpressure: bool,
backlog: VecDeque<Connection>,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
}
impl Accept {
#[allow(clippy::too_many_arguments)]
fn start(
tx: oneshot::Sender<()>,
rx: mpsc::Receiver<AcceptorCommand>,
poller: Arc<Poller>,
socks: Vec<(Token, Listener)>,
srv: Server,
name: String,
notify: AcceptNotify,
testing: bool,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
) {
log::info!("Starting {name:?} accept loop");
let sys = System::current();
let _ = thread::Builder::new().name(name.clone()).spawn(move || {
System::set_current(sys);
Accept::new(
name,
tx,
rx,
poller,
socks,
srv,
notify,
testing,
status_handler,
)
.poll();
});
}
#[allow(clippy::too_many_arguments)]
fn new(
name: String,
tx: oneshot::Sender<()>,
rx: mpsc::Receiver<AcceptorCommand>,
poller: Arc<Poller>,
socks: Vec<(Token, Listener)>,
srv: Server,
notify: AcceptNotify,
testing: bool,
status_handler: Option<Box<dyn FnMut(ServerStatus) + Send>>,
) -> Accept {
let mut sockets = Vec::new();
for (hnd_token, lst) in socks {
sockets.push(ServerSocketInfo {
addr: lst.local_addr(),
sock: lst,
token: hnd_token,
registered: Cell::new(false),
timeout: Cell::new(None),
});
}
Accept {
name,
poller,
rx,
sockets,
notify,
srv,
testing,
status_handler,
tx: Some(tx),
backpressure: true,
backlog: VecDeque::new(),
}
}
fn update_status(&mut self, st: ServerStatus) {
if let Some(ref mut hnd) = self.status_handler {
(*hnd)(st);
}
}
fn poll(mut self) {
let mut events = Events::with_capacity(NonZeroUsize::new(512).unwrap());
for idx in 0..self.sockets.len() {
self.add_source(idx);
}
if let Some(tx) = self.tx.take() {
thread::sleep(Duration::from_millis(25));
let _ = tx.send(());
}
loop {
events.clear();
if let Err(e) = self.poller.wait(&mut events, None) {
assert!(
e.kind() == io::ErrorKind::Interrupted,
"Cannot wait for events in poller: {e}"
);
}
for idx in 0..self.sockets.len() {
if self.sockets[idx].registered.get() {
let readd = self.accept(idx);
if readd {
self.add_source(idx);
}
}
}
match self.process_cmd() {
Either::Left(()) => events.clear(),
Either::Right(rx) => {
for info in self.sockets.drain(..) {
info.sock.remove_source();
}
log::info!("Accept loop {:?} has been stopped", self.name);
if let Some(rx) = rx {
if !self.testing {
thread::sleep(EXIT_TIMEOUT);
}
let _ = rx.send(());
}
break;
}
}
}
}
fn add_source(&self, idx: usize) {
let info = &self.sockets[idx];
loop {
let result = if info.registered.get() {
self.poller.modify(&info.sock, Event::readable(idx))
} else {
unsafe { self.poller.add(&info.sock, Event::readable(idx)) }
};
if let Err(err) = result {
if err.kind() == io::ErrorKind::WouldBlock {
continue;
}
log::error!("Cannot register socket listener: {err}");
info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
let notify = self.notify.clone();
System::current().handle().spawn(async move {
sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(AcceptorCommand::Timer);
});
} else {
info.registered.set(true);
}
break;
}
}
fn remove_source(&self, key: usize) {
let info = &self.sockets[key];
let result = if info.registered.get() {
self.poller.modify(&info.sock, Event::none(key))
} else {
return;
};
if let Err(err) = result {
log::error!("Cannot stop socket listener for {} err: {}", info.addr, err);
}
}
fn process_timer(&mut self) {
let now = Instant::now();
for key in 0..self.sockets.len() {
let info = &mut self.sockets[key];
if let Some(inst) = info.timeout.get()
&& now > inst
&& !self.backpressure
{
log::info!("Resuming socket listener on {} after timeout", info.addr);
info.timeout.take();
self.add_source(key);
}
}
}
fn process_cmd(&mut self) -> Either<(), Option<oneshot::Sender<()>>> {
loop {
match self.rx.try_recv() {
Ok(cmd) => match cmd {
AcceptorCommand::Stop(rx) => {
if !self.backpressure {
log::info!("Stopping accept loop {:?}", self.name);
self.backpressure(true);
}
break Either::Right(Some(rx));
}
AcceptorCommand::Terminate => {
log::info!("Stopping accept loop {:?}", self.name);
self.backpressure(true);
break Either::Right(None);
}
AcceptorCommand::Pause => {
if !self.backpressure {
log::info!("Pausing accept loop {:?}", self.name);
self.backpressure(true);
}
}
AcceptorCommand::Resume => {
if self.backpressure {
log::info!("Resuming accept loop {:?}", self.name);
self.backpressure(false);
}
}
AcceptorCommand::Timer => {
self.process_timer();
}
},
Err(err) => {
break match err {
mpsc::TryRecvError::Empty => Either::Left(()),
mpsc::TryRecvError::Disconnected => {
log::error!("Dropping accept loop {:?}", self.name);
self.backpressure(true);
Either::Right(None)
}
};
}
}
}
}
fn backpressure(&mut self, on: bool) {
self.update_status(if on {
ServerStatus::NotReady
} else {
ServerStatus::Ready
});
if self.backpressure && !on {
while let Some(msg) = self.backlog.pop_front() {
if let Err(msg) = self.srv.process(msg) {
log::trace!("Server is unavailable");
self.backlog.push_front(msg);
return;
}
}
self.backpressure = false;
for (key, info) in self.sockets.iter().enumerate() {
if info.timeout.get().is_none() {
log::info!(
"Resuming socket listener on {} after back-pressure",
info.addr
);
self.add_source(key);
}
}
} else if !self.backpressure && on {
self.backpressure = true;
for key in 0..self.sockets.len() {
let info = &mut self.sockets[key];
if info.timeout.take().is_none() {
log::info!("Stopping socket listener on {}", info.addr);
self.remove_source(key);
}
}
}
}
fn accept(&mut self, token: usize) -> bool {
loop {
if let Some(info) = self.sockets.get_mut(token) {
match info.sock.accept() {
Ok(Some(io)) => {
let msg = Connection {
io,
token: info.token,
};
if let Err(msg) = self.srv.process(msg) {
log::trace!("Server is unavailable");
self.backlog.push_back(msg);
self.backpressure(true);
return false;
}
}
Ok(None) => return true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return true,
Err(ref e) if connection_error(e) => (),
Err(e) => {
log::error!("Error accepting socket: {e}");
info.timeout.set(Some(Instant::now() + ERR_TIMEOUT));
let notify = self.notify.clone();
System::current().handle().spawn(async move {
sleep(ERR_SLEEP_TIMEOUT).await;
notify.send(AcceptorCommand::Timer);
});
return false;
}
}
}
}
}
}
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
|| e.kind() == io::ErrorKind::InvalidInput
}