#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
#[allow(unused_imports)]
pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(aio)]
use std::ptr::NonNull;
use std::{
collections::{HashMap, VecDeque},
io,
num::NonZeroUsize,
pin::Pin,
sync::Arc,
task::{Poll, Wake, Waker},
time::Duration,
};
use compio_log::{instrument, trace};
use flume::{Receiver, Sender};
use polling::{Event, Events, Poller};
use smallvec::SmallVec;
use crate::{
AsyncifyPool, BufferPool, DriverType, Entry, ErasedKey, ProactorBuilder,
key::{BorrowedKey, Key, RefExt},
op::Interest,
syscall,
};
mod extra;
pub use extra::Extra;
pub(crate) mod op;
struct Track {
arg: WaitArg,
ready: bool,
}
impl From<WaitArg> for Track {
fn from(arg: WaitArg) -> Self {
Self { arg, ready: false }
}
}
pub unsafe trait OpCode {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision>;
fn op_type(self: Pin<&mut Self>) -> Option<OpType> {
None
}
fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>>;
}
pub use OpCode as PollOpCode;
type Multi<T> = SmallVec<[T; 1]>;
#[non_exhaustive]
pub enum Decision {
Completed(usize),
Wait(Multi<WaitArg>),
Blocking,
#[cfg(aio)]
Aio(AioControl),
}
impl Decision {
pub fn wait_for(fd: RawFd, interest: Interest) -> Self {
Self::Wait(SmallVec::from_buf([WaitArg { fd, interest }]))
}
pub fn wait_for_many<I: IntoIterator<Item = WaitArg>>(args: I) -> Self {
Self::Wait(Multi::from_iter(args))
}
pub fn wait_readable(fd: RawFd) -> Self {
Self::wait_for(fd, Interest::Readable)
}
pub fn wait_writable(fd: RawFd) -> Self {
Self::wait_for(fd, Interest::Writable)
}
#[cfg(aio)]
pub fn aio(
cb: &mut libc::aiocb,
submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
) -> Self {
Self::Aio(AioControl {
aiocbp: NonNull::from(cb),
submit,
})
}
}
#[derive(Debug, Clone, Copy)]
pub struct WaitArg {
pub fd: RawFd,
pub interest: Interest,
}
impl WaitArg {
pub fn readable(fd: RawFd) -> Self {
Self {
fd,
interest: Interest::Readable,
}
}
pub fn writable(fd: RawFd) -> Self {
Self {
fd,
interest: Interest::Writable,
}
}
}
#[cfg(aio)]
#[derive(Debug, Clone, Copy)]
pub struct AioControl {
pub aiocbp: NonNull<libc::aiocb>,
pub submit: unsafe extern "C" fn(*mut libc::aiocb) -> i32,
}
#[derive(Debug, Default)]
struct FdQueue {
read_queue: VecDeque<ErasedKey>,
write_queue: VecDeque<ErasedKey>,
}
struct RemoveToken {
idx: usize,
is_read: bool,
}
impl RemoveToken {
fn read(idx: usize) -> Self {
Self { idx, is_read: true }
}
fn write(idx: usize) -> Self {
Self {
idx,
is_read: false,
}
}
}
impl FdQueue {
fn is_empty(&self) -> bool {
self.read_queue.is_empty() && self.write_queue.is_empty()
}
fn remove_token(&mut self, token: RemoveToken) -> Option<ErasedKey> {
if token.is_read {
self.read_queue.remove(token.idx)
} else {
self.write_queue.remove(token.idx)
}
}
pub fn push_back_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
match interest {
Interest::Readable => {
self.read_queue.push_back(key);
RemoveToken::read(self.read_queue.len() - 1)
}
Interest::Writable => {
self.write_queue.push_back(key);
RemoveToken::write(self.write_queue.len() - 1)
}
}
}
pub fn push_front_interest(&mut self, key: ErasedKey, interest: Interest) -> RemoveToken {
let is_read = match interest {
Interest::Readable => {
self.read_queue.push_front(key);
true
}
Interest::Writable => {
self.write_queue.push_front(key);
false
}
};
RemoveToken { idx: 0, is_read }
}
pub fn remove(&mut self, key: &ErasedKey) {
self.read_queue.retain(|k| k != key);
self.write_queue.retain(|k| k != key);
}
pub fn event(&self) -> Event {
let mut event = Event::none(0);
if let Some(key) = self.read_queue.front() {
event.readable = true;
event.key = key.as_raw();
}
if let Some(key) = self.write_queue.front() {
event.writable = true;
event.key = key.as_raw();
}
event
}
pub fn pop_interest(&mut self, event: &Event) -> Option<(ErasedKey, Interest)> {
if event.readable
&& let Some(key) = self.read_queue.pop_front()
{
return Some((key, Interest::Readable));
}
if event.writable
&& let Some(key) = self.write_queue.pop_front()
{
return Some((key, Interest::Writable));
}
None
}
}
#[non_exhaustive]
pub enum OpType {
Fd(Multi<RawFd>),
#[cfg(aio)]
Aio(NonNull<libc::aiocb>),
}
impl OpType {
pub fn fd(fd: RawFd) -> Self {
Self::Fd(SmallVec::from_buf([fd]))
}
pub fn multi_fd<I: IntoIterator<Item = RawFd>>(fds: I) -> Self {
Self::Fd(Multi::from_iter(fds))
}
}
pub(crate) struct Driver {
events: Events,
notify: Arc<Notify>,
registry: HashMap<RawFd, FdQueue>,
pool: AsyncifyPool,
completed_tx: Sender<Entry>,
completed_rx: Receiver<Entry>,
}
impl Driver {
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
instrument!(compio_log::Level::TRACE, "new", ?builder);
trace!("new poll driver");
let events = if let Some(cap) = NonZeroUsize::new(builder.capacity as _) {
Events::with_capacity(cap)
} else {
Events::new()
};
let poll = Poller::new()?;
let notify = Arc::new(Notify::new(poll));
let (completed_tx, completed_rx) = flume::unbounded();
Ok(Self {
events,
notify,
registry: HashMap::new(),
pool: builder.create_or_get_thread_pool(),
completed_tx,
completed_rx,
})
}
pub fn driver_type(&self) -> DriverType {
DriverType::Poll
}
pub fn default_extra(&self) -> Extra {
Extra::new()
}
fn poller(&self) -> &Poller {
&self.notify.poll
}
fn with_events<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut Self, &mut Events) -> R,
{
let mut events = std::mem::take(&mut self.events);
let res = f(self, &mut events);
self.events = events;
res
}
fn try_get_queue(&mut self, fd: RawFd) -> Option<&mut FdQueue> {
self.registry.get_mut(&fd)
}
fn get_queue(&mut self, fd: RawFd) -> &mut FdQueue {
self.try_get_queue(fd).expect("the fd should be submitted")
}
unsafe fn submit(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
let Self {
registry, notify, ..
} = self;
let need_add = !registry.contains_key(&arg.fd);
let queue = registry.entry(arg.fd).or_default();
let token = queue.push_back_interest(key, arg.interest);
let event = queue.event();
let res = if need_add {
unsafe { notify.poll.add(arg.fd, event) }
} else {
let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
notify.poll.modify(fd, event)
};
if res.is_err() {
queue.remove_token(token);
if queue.is_empty() {
registry.remove(&arg.fd);
}
}
res
}
unsafe fn submit_front(&mut self, key: ErasedKey, arg: WaitArg) -> io::Result<()> {
let need_add = !self.registry.contains_key(&arg.fd);
let queue = self.registry.entry(arg.fd).or_default();
queue.push_front_interest(key, arg.interest);
let event = queue.event();
if need_add {
unsafe { self.poller().add(arg.fd, event)? }
} else {
let fd = unsafe { BorrowedFd::borrow_raw(arg.fd) };
self.poller().modify(fd, event)?;
}
Ok(())
}
fn renew(&mut self, fd: BorrowedFd, renew_event: Event) -> io::Result<()> {
if !renew_event.readable && !renew_event.writable {
self.poller().delete(fd)?;
self.registry.remove(&fd.as_raw_fd());
} else {
self.poller().modify(fd, renew_event)?;
}
Ok(())
}
fn remove_one(&mut self, key: &ErasedKey, fd: RawFd) -> io::Result<()> {
let Some(queue) = self.try_get_queue(fd) else {
return Ok(());
};
queue.remove(key);
let renew_event = queue.event();
if queue.is_empty() {
self.registry.remove(&fd);
}
self.renew(unsafe { BorrowedFd::borrow_raw(fd) }, renew_event)
}
fn cancel_one(&mut self, key: ErasedKey, fd: RawFd) -> Option<Entry> {
self.remove_one(&key, fd)
.map_or(None, |_| Some(Entry::new_cancelled(key)))
}
pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> {
Ok(())
}
pub fn cancel<T>(&mut self, key: Key<T>) {
let op_type = key.borrow().pinned_op().op_type();
match op_type {
None => {}
Some(OpType::Fd(fds)) => {
let mut pushed = false;
for fd in fds {
let entry = self.cancel_one(key.clone().erase(), fd);
if !pushed && let Some(entry) = entry {
_ = self.completed_tx.send(entry);
pushed = true;
}
}
}
#[cfg(aio)]
Some(OpType::Aio(aiocbp)) => {
let aiocb = unsafe { aiocbp.as_ref() };
let fd = aiocb.aio_fildes;
syscall!(libc::aio_cancel(fd, aiocbp.as_ptr())).ok();
}
}
}
pub fn push(&mut self, key: ErasedKey) -> Poll<io::Result<usize>> {
instrument!(compio_log::Level::TRACE, "push", ?key);
match { key.borrow().pinned_op().pre_submit()? } {
Decision::Wait(args) => {
key.borrow()
.extra_mut()
.as_poll_mut()
.set_args(args.clone());
for arg in args.iter().copied() {
let res = unsafe { self.submit(key.clone(), arg) };
if let Err(e) = res {
args.into_iter().for_each(|arg| {
let _ = self.remove_one(&key, arg.fd);
});
return Poll::Ready(Err(e));
}
trace!("register {:?}", arg);
}
Poll::Pending
}
Decision::Completed(res) => Poll::Ready(Ok(res)),
Decision::Blocking => {
self.push_blocking(key);
Poll::Pending
}
#[cfg(aio)]
Decision::Aio(AioControl { mut aiocbp, submit }) => {
let aiocb = unsafe { aiocbp.as_mut() };
let user_data = key.as_raw();
#[cfg(freebsd)]
{
aiocb.aio_sigevent.sigev_signo = self.as_raw_fd();
aiocb.aio_sigevent.sigev_notify = libc::SIGEV_KEVENT;
aiocb.aio_sigevent.sigev_value.sival_ptr = user_data as _;
}
#[cfg(solarish)]
let mut notify = libc::port_notify {
portnfy_port: self.as_raw_fd(),
portnfy_user: user_data as _,
};
#[cfg(solarish)]
{
aiocb.aio_sigevent.sigev_notify = libc::SIGEV_PORT;
aiocb.aio_sigevent.sigev_value.sival_ptr = &mut notify as *mut _ as _;
}
match syscall!(submit(aiocbp.as_ptr())) {
Ok(_) => {
key.into_raw();
Poll::Pending
}
Err(e)
if matches!(
e.raw_os_error(),
Some(libc::EOPNOTSUPP) | Some(libc::EAGAIN)
) =>
{
self.push_blocking(key);
Poll::Pending
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
}
fn push_blocking(&mut self, key: ErasedKey) {
let waker = self.waker();
let completed = self.completed_tx.clone();
let mut key = unsafe { key.freeze() };
let mut closure = move || {
let poll = key.pinned_op().operate();
let res = match poll {
Poll::Pending => unreachable!("this operation is not non-blocking"),
Poll::Ready(res) => res,
};
let _ = completed.send(Entry::new(key.into_inner(), res));
waker.wake();
};
while let Err(e) = self.pool.dispatch(closure) {
closure = e.0;
self.poll_completed();
}
}
fn poll_completed(&mut self) -> bool {
let mut ret = false;
while let Ok(entry) = self.completed_rx.try_recv() {
entry.notify();
ret = true;
}
ret
}
#[allow(clippy::blocks_in_conditions)]
fn poll_one(&mut self, event: Event, fd: RawFd) -> io::Result<()> {
let queue = self.get_queue(fd);
if let Some((key, _)) = queue.pop_interest(&event)
&& let mut op = key.borrow()
&& op.extra_mut().as_poll_mut().handle_event(fd)
{
match { op.pinned_op().operate() } {
Poll::Pending => {
let extra = op.extra_mut().as_poll_mut();
extra.reset();
for t in extra.track.iter() {
let res = unsafe { self.submit_front(key.clone(), t.arg) };
if let Err(e) = res {
for t in extra.track.iter() {
let _ = self.remove_one(&key, t.arg.fd);
}
return Err(e);
}
}
}
Poll::Ready(res) => {
drop(op);
Entry::new(key, res).notify()
}
};
}
let renew_event = self.get_queue(fd).event();
let fd = unsafe { BorrowedFd::borrow_raw(fd) };
self.renew(fd, renew_event)
}
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
if self.poll_completed() {
return Ok(());
}
self.events.clear();
self.notify.poll.wait(&mut self.events, timeout)?;
if self.events.is_empty() && timeout.is_some() {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
}
self.with_events(|this, events| {
for event in events.iter() {
trace!("receive {} for {:?}", event.key, event);
let key = unsafe { BorrowedKey::from_raw(event.key) };
let mut op = key.borrow();
let op_type = op.pinned_op().op_type();
match op_type {
None => {
trace!("op {} is completed", event.key);
}
Some(OpType::Fd(_)) => {
let Some(fd) = op.extra().as_poll().next_fd() else {
return Ok(());
};
drop(op);
this.poll_one(event, fd)?;
}
#[cfg(aio)]
Some(OpType::Aio(aiocbp)) => {
drop(op);
let err = unsafe { libc::aio_error(aiocbp.as_ptr()) };
let res = match err {
libc::EINPROGRESS => {
trace!("op {} is not completed", key.as_raw());
continue;
}
libc::ECANCELED => {
unsafe { libc::aio_return(aiocbp.as_ptr()) };
Err(io::Error::from_raw_os_error(libc::ETIMEDOUT))
}
_ => {
syscall!(libc::aio_return(aiocbp.as_ptr())).map(|res| res as usize)
}
};
let key = unsafe { ErasedKey::from_raw(event.key) };
Entry::new(key, res).notify()
}
}
}
Ok(())
})
}
pub fn waker(&self) -> Waker {
Waker::from(self.notify.clone())
}
pub fn create_buffer_pool(
&mut self,
buffer_len: u16,
buffer_size: usize,
) -> io::Result<BufferPool> {
#[cfg(fusion)]
{
Ok(BufferPool::new_poll(crate::FallbackBufferPool::new(
buffer_len,
buffer_size,
)))
}
#[cfg(not(fusion))]
{
Ok(BufferPool::new(buffer_len, buffer_size))
}
}
pub unsafe fn release_buffer_pool(&mut self, _: BufferPool) -> io::Result<()> {
Ok(())
}
}
impl AsRawFd for Driver {
fn as_raw_fd(&self) -> RawFd {
self.poller().as_raw_fd()
}
}
impl Drop for Driver {
fn drop(&mut self) {
for fd in self.registry.keys() {
unsafe {
let fd = BorrowedFd::borrow_raw(*fd);
self.poller().delete(fd).ok();
}
}
}
}
impl Entry {
pub(crate) fn new_cancelled(key: ErasedKey) -> Self {
Entry::new(key, Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)))
}
}
pub(crate) struct Notify {
poll: Poller,
}
impl Notify {
fn new(poll: Poller) -> Self {
Self { poll }
}
pub fn notify(&self) -> io::Result<()> {
self.poll.notify()
}
}
impl Wake for Notify {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}
fn wake_by_ref(self: &Arc<Self>) {
self.notify().ok();
}
}