#[cfg(feature = "allocator_api")]
use std::alloc::Allocator;
#[doc(no_inline)]
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::{
collections::{HashMap, HashSet},
io,
ops::ControlFlow,
time::Duration,
};
pub(crate) use libc::{sockaddr_storage, socklen_t};
use mio::{
event::{Event, Source},
unix::SourceFd,
Events, Interest, Poll, Registry, Token,
};
#[cfg(feature = "time")]
use crate::driver::time::TimerWheel;
use crate::{
driver::{CompleteIo, Entry, OpObject, Operation},
vec_deque_alloc,
};
pub(crate) mod op;
pub trait OpCode {
fn pre_submit(self: &mut Self) -> io::Result<Decision>;
fn on_event(self: &mut Self, event: &Event) -> io::Result<ControlFlow<usize>>;
#[cfg(feature = "time")]
fn timer_delay(&self) -> std::time::Duration {
unimplemented!("operation is not a timer")
}
}
pub enum Decision {
Completed(usize),
Wait(WaitArg),
}
impl Decision {
pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
Self::Wait(WaitArg { fd, interest })
}
pub fn wait_readable(fd: RawFd) -> Self {
Self::wait_for(fd, Interest::READABLE)
}
pub fn wait_writable(fd: RawFd) -> Self {
Self::wait_for(fd, Interest::WRITABLE)
}
}
#[derive(Debug, Clone, Copy)]
pub struct WaitArg {
fd: RawFd,
interest: Interest,
}
#[cfg(feature = "time")]
const TIMER_PENDING: usize = usize::MAX - 2;
pub struct Driver<'arena> {
squeue: Vec<OpObject<'arena>>,
events: Events,
#[cfg(feature = "time")]
timers: TimerWheel,
poll: Poll,
waiting: HashMap<usize, WaitEntry<'arena>>,
cancelled: HashSet<usize>,
}
struct WaitEntry<'arena> {
op: &'arena mut dyn OpCode,
arg: WaitArg,
user_data: usize,
}
impl<'arena> WaitEntry<'arena> {
fn new(op: &'arena mut dyn OpCode, user_data: usize, arg: WaitArg) -> Self {
Self { op, arg, user_data }
}
}
impl<'arena> Driver<'arena> {
pub fn new() -> io::Result<Self> {
Self::with_entries(1024)
}
pub fn with_entries(entries: u32) -> io::Result<Self> {
let entries = entries as usize;
Ok(Self {
squeue: Vec::with_capacity(entries),
events: Events::with_capacity(entries),
#[cfg(feature = "time")]
timers: TimerWheel::with_capacity(16),
poll: Poll::new()?,
waiting: HashMap::new(),
cancelled: HashSet::new(),
})
}
fn submit(
cancelled: &mut HashSet<usize>,
waiting: &mut HashMap<usize, WaitEntry<'arena>>,
registry: &Registry,
op: &'arena mut dyn OpCode,
user_data: usize,
arg: WaitArg,
) -> io::Result<()> {
if !cancelled.remove(&user_data) {
let token = Token(user_data);
SourceFd(&arg.fd).register(registry, token, arg.interest)?;
waiting.insert(user_data, WaitEntry::new(op, user_data, arg));
}
Ok(())
}
}
impl<'arena> CompleteIo<'arena> for Driver<'arena> {
#[inline]
fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
Ok(())
}
#[inline]
fn try_cancel(&mut self, user_data: usize) -> Result<(), ()> {
if let Some(entry) = self.waiting.remove(&user_data) {
self.poll
.registry()
.deregister(&mut SourceFd(&entry.arg.fd))
.ok();
} else {
self.cancelled.insert(user_data);
}
Ok(())
}
#[inline]
fn try_push<O: OpCode>(
&mut self,
op: Operation<'arena, O>,
) -> Result<(), Operation<'arena, O>> {
if self.capacity_left() > 0 {
self.squeue.push(OpObject::from(op));
Ok(())
} else {
Err(op)
}
}
#[inline]
fn try_push_dyn(&mut self, op: OpObject<'arena>) -> Result<(), OpObject<'arena>> {
if self.capacity_left() > 0 {
self.squeue.push(op);
Ok(())
} else {
Err(op)
}
}
#[inline]
fn push_queue<#[cfg(feature = "allocator_api")] A: Allocator + Unpin + 'arena>(
&mut self,
ops_queue: &mut vec_deque_alloc!(OpObject<'arena>, A),
) {
let till = self.capacity_left().min(ops_queue.len());
self.squeue.extend(ops_queue.drain(..till));
}
#[inline]
fn capacity_left(&self) -> usize {
self.squeue.capacity() - self.squeue.len()
}
unsafe fn submit_and_wait_completed(
&mut self,
timeout: Option<Duration>,
entries: &mut impl Extend<Entry>,
) -> io::Result<()> {
let cancelled = &mut self.cancelled;
let waiting = &mut self.waiting;
let mut at_least_one_completion = false;
let submit_squeue_iter = {
let registry = self.poll.registry();
self.squeue.drain(..).filter_map(|entry| {
let user_data = entry.user_data;
let op = entry.op;
match op.pre_submit() {
Ok(Decision::Wait(arg)) => {
if let Err(err) =
Self::submit(cancelled, waiting, registry, op, user_data, arg)
{
at_least_one_completion = true;
Some(Entry::new(user_data, Err(err)))
} else {
None
}
}
#[cfg(feature = "time")]
Ok(Decision::Completed(TIMER_PENDING)) => {
self.timers.insert(user_data, op.timer_delay());
None
}
Ok(Decision::Completed(res)) => {
at_least_one_completion = true;
Some(Entry::new(user_data, Ok(res)))
}
Err(err) => {
at_least_one_completion = true;
Some(Entry::new(user_data, Err(err)))
}
}
})
};
entries.extend(submit_squeue_iter);
if at_least_one_completion {
return Ok(());
};
loop {
#[cfg(feature = "time")]
let timeout = self.timers.till_next_timer_or_timeout(timeout);
match self.poll.poll(&mut self.events, timeout) {
Ok(_) => break Ok(()),
Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => break Err(err),
};
}?;
#[cfg(feature = "time")]
self.timers.expire_timers(entries);
let registry = self.poll.registry();
let completed_iter = self.events.iter().filter_map(|event| {
let token = event.token();
let (user_data, fd, maybe_result) = {
let entry = waiting
.get_mut(&token.0)
.expect("Unknown token returned by mio"); let maybe_result = match entry.op.on_event(event) {
Ok(ControlFlow::Continue(_)) => None,
Ok(ControlFlow::Break(res)) => Some(Ok(res)),
Err(err) => Some(Err(err)),
};
(entry.user_data, &entry.arg.fd, maybe_result)
};
if let Some(result) = maybe_result {
let res = if let Err(err) = registry.deregister(&mut SourceFd(fd)) {
Err(err)
} else {
result
};
waiting.remove(&token.0);
Some(Entry::new(user_data, res))
} else {
None
}
});
entries.extend(completed_iter);
Ok(())
}
}
impl AsRawFd for Driver<'_> {
fn as_raw_fd(&self) -> RawFd {
self.poll.as_raw_fd()
}
}