#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
#![deny(rustdoc::broken_intra_doc_links)]
#![deny(rustdoc::private_intra_doc_links)]
mod event;
mod eventp_ops;
mod interest;
#[cfg(feature = "mock")]
pub mod mock;
mod pinned;
#[cfg(feature = "remote-endpoint")]
pub mod remote_endpoint;
pub mod subscriber;
pub mod thin;
pub mod tri_subscriber;
mod utils;
pub mod epoll {
pub use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
}
#[cfg(docsrs)]
pub mod _technical {
#![doc = include_str!("../docs/technical.md")]
}
#[cfg(docsrs)]
pub mod _technical_zh {
#![doc = include_str!("../docs/technical.zh.md")]
}
use std::marker::PhantomPinned;
use std::mem::{self, ManuallyDrop, MaybeUninit};
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
use std::{hint, io, ptr};
use rustc_hash::FxHashMap;
use crate::epoll::*;
pub use crate::event::Event;
pub use crate::eventp_ops::{EventpOps, EventpOpsAdd};
pub use crate::interest::{interest, Interest};
#[cfg(feature = "mock")]
pub use crate::mock::MockEventp;
pub use crate::pinned::Pinned;
#[cfg(feature = "remote-endpoint")]
pub use crate::remote_endpoint::remote_endpoint;
pub use crate::subscriber::Subscriber;
use crate::thin::ThinBoxSubscriber;
const DEFAULT_EVENT_BUF_CAPACITY: usize = 512;
pub struct Eventp {
registered: FxHashMap<RawFd, ThinBoxSubscriber<Eventp>>,
epoll: Epoll,
event_buf: Vec<MaybeUninit<EpollEvent>>,
handling: Option<Handling>,
_pinned: PhantomPinned,
}
struct Handling {
fd: RawFd,
drop_current: bool,
deferred_drop: Vec<ThinBoxSubscriber<Eventp>>,
}
impl Default for Eventp {
fn default() -> Self {
Self::new(DEFAULT_EVENT_BUF_CAPACITY, EpollCreateFlags::EPOLL_CLOEXEC)
.expect("Failed to create epoll instance")
}
}
impl Eventp {
pub fn new(capacity: usize, flags: EpollCreateFlags) -> io::Result<Self> {
assert!(capacity > 0, "Capacity must be greater than zero");
let mut buf = Vec::with_capacity(capacity);
unsafe { buf.set_len(capacity) };
Ok(Self {
epoll: Epoll::new(flags).map_err(io::Error::from)?,
registered: Default::default(),
event_buf: buf,
handling: None,
_pinned: PhantomPinned,
})
}
pub fn into_inner(self) -> (Epoll, impl Iterator<Item = ThinBoxSubscriber<Eventp>>) {
(self.epoll, self.registered.into_values())
}
pub fn get(&self, raw_fd: &RawFd) -> Option<&dyn Subscriber<Eventp>> {
self.registered.get(raw_fd).and_then(|s| s.try_deref())
}
pub fn get_mut(&mut self, raw_fd: &RawFd) -> Option<&mut dyn Subscriber<Eventp>> {
self.registered
.get_mut(raw_fd)
.and_then(|s| s.try_deref_mut())
}
pub fn run_forever(&mut self) -> io::Result<()> {
loop {
match self.run_once() {
Ok(_) => continue,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
}
pub fn run_once(&mut self) -> io::Result<()> {
self.run_once_with_timeout(EpollTimeout::NONE)
}
pub fn run_once_with_timeout(&mut self, timeout: EpollTimeout) -> io::Result<()> {
if let Some(handling) = &self.handling {
panic!(
"Recursive call to `Eventp::run_once_with_timeout` while handling fd {}",
handling.fd
);
}
let buf: &mut [MaybeUninit<EpollEvent>] = &mut self.event_buf;
let buf: &mut [EpollEvent] = unsafe { mem::transmute(buf) };
let n = self.epoll.wait(buf, timeout)?;
let buf = &buf[..n];
if self.handling.is_some() {
unsafe { hint::unreachable_unchecked() }
} else {
self.handling = Some(Handling {
fd: -1, drop_current: false,
deferred_drop: vec![],
});
}
for ev in buf {
let addr = ev.data() as usize;
let mut subscriber = ManuallyDrop::new(unsafe {
mem::transmute::<usize, ThinBoxSubscriber<Eventp>>(addr)
});
{
let handling = unsafe { self.handling.as_mut().unwrap_unchecked() };
handling.fd = *subscriber.raw_fd_ref();
}
if let Some(s) = subscriber.try_deref_mut() {
s.handle(Event::from(ev), Pinned(unsafe { Pin::new_unchecked(self) }));
}
let handling = unsafe { self.handling.as_mut().unwrap_unchecked() };
if handling.drop_current {
handling.drop_current = false;
debug_assert!(handling.fd >= 0, "Invalid fd in handling state.");
self.registered.remove(&handling.fd);
}
}
unsafe { self.handling.take().unwrap_unchecked() };
Ok(())
}
}
impl EventpOpsAdd<Self> for Eventp {
#[doc = include_str!("../docs/eventp-ops.add.md")]
fn add(&mut self, subscriber: ThinBoxSubscriber<Self>) -> io::Result<()> {
let addr = unsafe { mem::transmute_copy::<_, usize>(&subscriber) };
let dyn_subscriber = match subscriber.try_deref() {
Some(s) => s,
None => panic!("Subscriber is already dropped"),
};
let raw_fd = dyn_subscriber.as_fd().as_raw_fd();
if self.registered.contains_key(&raw_fd) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"subscriber with same fd already registered",
));
}
let interest = dyn_subscriber.interest().get();
let epoll_event = EpollEvent::new(interest.bitflags(), addr as u64);
self.epoll.add(dyn_subscriber.as_fd(), epoll_event)?;
self.registered.insert(raw_fd, subscriber);
Ok(())
}
}
impl EventpOps for Eventp {
#[doc = include_str!("../docs/eventp-ops.modify.md")]
fn modify(&mut self, fd: RawFd, interest: Interest) -> io::Result<()> {
let subscriber = self
.registered
.get_mut(&fd)
.ok_or(io::Error::new(io::ErrorKind::NotFound, "fd not registered"))?;
let addr = unsafe { mem::transmute_copy::<_, usize>(subscriber) };
let mut epoll_event = EpollEvent::new(interest.bitflags(), addr as u64);
let ret = unsafe {
libc::epoll_ctl(
self.epoll.0.as_raw_fd(),
libc::EPOLL_CTL_MOD,
fd,
&mut epoll_event as *mut _ as _,
)
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
if let Some(s) = subscriber.try_deref_mut() {
s.interest().set(interest);
}
Ok(())
}
#[doc = include_str!("../docs/eventp-ops.delete.md")]
fn delete(&mut self, fd: RawFd) -> io::Result<()> {
if !self.registered.contains_key(&fd) {
return Err(io::Error::new(io::ErrorKind::NotFound, "fd not registered"));
}
let ret = unsafe {
libc::epoll_ctl(
self.epoll.0.as_raw_fd(),
libc::EPOLL_CTL_DEL,
fd,
ptr::null_mut(),
)
};
if ret == -1 {
return Err(io::Error::last_os_error());
}
if let Some(handling) = &mut self.handling {
if handling.fd == fd {
handling.drop_current = true;
} else {
let mut subscriber = self.registered.remove(&fd).unwrap();
subscriber.drop_in_place();
handling.deferred_drop.push(subscriber);
}
} else {
self.registered.remove(&fd);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::cell::{Cell, RefCell};
use std::os::fd::{AsFd, BorrowedFd};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::rc::Rc;
use std::time::Duration;
use nix::sys::eventfd::{EfdFlags, EventFd};
use super::*;
use crate::subscriber::{Handler, HasInterest};
fn new_eventfd() -> EventFd {
EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK).unwrap()
}
fn fire(efd: &EventFd) {
efd.write(1).expect("eventfd write");
}
fn drain(efd: &EventFd) {
let _ = efd.read();
}
struct CbSub<F> {
eventfd: EventFd,
interest: Cell<Interest>,
f: F,
}
impl<F> AsFd for CbSub<F> {
fn as_fd(&self) -> BorrowedFd<'_> {
self.eventfd.as_fd()
}
}
impl<F> HasInterest for CbSub<F> {
fn interest(&self) -> &Cell<Interest> {
&self.interest
}
}
impl<F> Handler<Eventp> for CbSub<F>
where
F: FnMut(&EventFd, Pinned<'_, Eventp>),
{
fn handle(&mut self, _event: Event, eventp: Pinned<'_, Eventp>) {
drain(&self.eventfd);
(self.f)(&self.eventfd, eventp);
}
}
fn cb_sub<F>(efd: EventFd, f: F) -> CbSub<F>
where
F: FnMut(&EventFd, Pinned<'_, Eventp>),
{
CbSub {
eventfd: efd,
interest: Cell::new(crate::interest().read()),
f,
}
}
struct BorrowSub {
raw: RawFd,
interest: Cell<Interest>,
}
impl AsFd for BorrowSub {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.raw) }
}
}
impl HasInterest for BorrowSub {
fn interest(&self) -> &Cell<Interest> {
&self.interest
}
}
impl Handler<Eventp> for BorrowSub {
fn handle(&mut self, _: Event, _: Pinned<'_, Eventp>) {}
}
fn poll_timeout() -> EpollTimeout {
EpollTimeout::from(500u16)
}
#[test]
fn default_creates_usable_reactor() {
let mut ep = Eventp::default();
ep.run_once_with_timeout(EpollTimeout::from(10u16)).unwrap();
}
#[test]
#[should_panic]
fn new_with_zero_capacity_panics() {
let _ = Eventp::new(0, EpollCreateFlags::EPOLL_CLOEXEC);
}
#[test]
fn add_duplicate_fd_returns_already_exists() {
let mut ep = Eventp::default();
let efd = new_eventfd();
let raw = efd.as_fd().as_raw_fd();
cb_sub(efd, |_, _| {}).register_into(&mut ep).unwrap();
let err = BorrowSub {
raw,
interest: Cell::new(crate::interest().read()),
}
.register_into(&mut ep)
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::AlreadyExists);
}
#[test]
fn modify_unknown_fd_returns_not_found() {
let mut ep = Eventp::default();
let err = ep.modify(424242, crate::interest().read()).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::NotFound);
}
#[test]
fn delete_unknown_fd_returns_not_found() {
let mut ep = Eventp::default();
let err = ep.delete(424242).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::NotFound);
}
#[test]
fn ready_event_dispatches_to_handler() {
let mut ep = Eventp::default();
let efd = new_eventfd();
let counter = Rc::new(Cell::new(0u32));
let c = counter.clone();
cb_sub(efd, move |_, _| c.set(c.get() + 1))
.register_into(&mut ep)
.unwrap();
let raw = ep.registered.keys().copied().next().unwrap();
let dup = unsafe { BorrowedFd::borrow_raw(raw) }
.try_clone_to_owned()
.unwrap();
let writer = unsafe { EventFd::from_owned_fd(dup) };
fire(&writer);
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert_eq!(counter.get(), 1);
}
#[test]
fn handler_can_delete_self_and_subscriber_is_dropped() {
let mut ep = Eventp::default();
let drop_witness = Rc::new(());
let weak = Rc::downgrade(&drop_witness);
struct Witness(#[allow(dead_code)] Rc<()>);
let efd = new_eventfd();
let raw = efd.as_fd().as_raw_fd();
let sub = CbSub {
eventfd: efd,
interest: Cell::new(crate::interest().read()),
f: {
let _w = Witness(drop_witness);
move |fd: &EventFd, mut ep: Pinned<'_, Eventp>| {
drain(fd);
let _ = &_w;
ep.delete(fd.as_fd().as_raw_fd()).unwrap();
}
},
};
sub.register_into(&mut ep).unwrap();
let dup = unsafe { BorrowedFd::borrow_raw(raw) }
.try_clone_to_owned()
.unwrap();
let writer = unsafe { EventFd::from_owned_fd(dup) };
fire(&writer);
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert!(!ep.registered.contains_key(&raw));
assert!(
weak.upgrade().is_none(),
"self-delete must drop the subscriber"
);
}
#[test]
fn handler_can_delete_other_fd_during_dispatch() {
let mut ep = Eventp::default();
let efd_a = new_eventfd();
let raw_a = efd_a.as_fd().as_raw_fd();
let efd_b = new_eventfd();
let raw_b = efd_b.as_fd().as_raw_fd();
let b_drop = Rc::new(());
let b_weak = Rc::downgrade(&b_drop);
struct B {
eventfd: EventFd,
interest: Cell<Interest>,
_w: Rc<()>,
}
impl AsFd for B {
fn as_fd(&self) -> BorrowedFd<'_> {
self.eventfd.as_fd()
}
}
impl HasInterest for B {
fn interest(&self) -> &Cell<Interest> {
&self.interest
}
}
impl Handler<Eventp> for B {
fn handle(&mut self, _: Event, _: Pinned<'_, Eventp>) {}
}
B {
eventfd: efd_b,
interest: Cell::new(crate::interest().read()),
_w: b_drop,
}
.register_into(&mut ep)
.unwrap();
cb_sub(efd_a, move |_, mut ep| {
ep.delete(raw_b).unwrap();
})
.register_into(&mut ep)
.unwrap();
let dup = unsafe { BorrowedFd::borrow_raw(raw_a) }
.try_clone_to_owned()
.unwrap();
fire(&unsafe { EventFd::from_owned_fd(dup) });
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert!(!ep.registered.contains_key(&raw_b));
assert!(
b_weak.upgrade().is_none(),
"B must be deallocated by end of dispatch"
);
}
#[test]
fn handler_can_re_add_other_fd_after_delete() {
let mut ep = Eventp::default();
let efd_a = new_eventfd();
let raw_a = efd_a.as_fd().as_raw_fd();
let efd_b = new_eventfd();
let raw_b = efd_b.as_fd().as_raw_fd();
let marker = Rc::new(Cell::new(0u32));
let m_for_first = marker.clone();
struct CountingBorrowSub {
raw: RawFd,
interest: Cell<Interest>,
marker: Rc<Cell<u32>>,
}
impl AsFd for CountingBorrowSub {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.raw) }
}
}
impl HasInterest for CountingBorrowSub {
fn interest(&self) -> &Cell<Interest> {
&self.interest
}
}
impl Handler<Eventp> for CountingBorrowSub {
fn handle(&mut self, _: Event, _: Pinned<'_, Eventp>) {
self.marker.set(self.marker.get() + 1);
}
}
CountingBorrowSub {
raw: raw_b,
interest: Cell::new(crate::interest().read()),
marker: m_for_first,
}
.register_into(&mut ep)
.unwrap();
let m_for_readd = marker.clone();
cb_sub(efd_a, move |_, mut ep| {
ep.delete(raw_b).unwrap();
CountingBorrowSub {
raw: raw_b,
interest: Cell::new(crate::interest().read()),
marker: m_for_readd.clone(),
}
.register_into(&mut ep)
.expect("re-add of same fd within handler must succeed");
})
.register_into(&mut ep)
.unwrap();
let dup_a = unsafe { BorrowedFd::borrow_raw(raw_a) }
.try_clone_to_owned()
.unwrap();
fire(&unsafe { EventFd::from_owned_fd(dup_a) });
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert!(ep.registered.contains_key(&raw_b));
assert_eq!(marker.get(), 0, "B's handler should not have run yet");
fire(&efd_b);
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert_eq!(marker.get(), 1);
let _ = ep.into_inner();
drop(efd_b);
}
#[test]
fn self_delete_then_re_add_same_fd_returns_already_exists() {
let mut ep = Eventp::default();
let efd = new_eventfd();
let raw = efd.as_fd().as_raw_fd();
let observed_kind = Rc::new(Cell::new(None::<io::ErrorKind>));
let ok = observed_kind.clone();
cb_sub(efd, move |_, mut ep| {
ep.delete(raw).unwrap();
let err = BorrowSub {
raw,
interest: Cell::new(crate::interest().read()),
}
.register_into(&mut ep)
.unwrap_err();
ok.set(Some(err.kind()));
})
.register_into(&mut ep)
.unwrap();
let raw_dup = unsafe { BorrowedFd::borrow_raw(raw) }
.try_clone_to_owned()
.unwrap();
fire(&unsafe { EventFd::from_owned_fd(raw_dup) });
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert_eq!(observed_kind.get(), Some(io::ErrorKind::AlreadyExists));
}
#[test]
fn handler_add_new_fd_fires_on_next_iteration() {
let mut ep = Eventp::default();
let trigger = new_eventfd();
let raw_trigger = trigger.as_fd().as_raw_fd();
let added = new_eventfd();
let added_raw = added.as_fd().as_raw_fd();
let added_slot = Rc::new(RefCell::new(Some(added)));
let added_fired = Rc::new(Cell::new(false));
let af = added_fired.clone();
let slot = added_slot.clone();
cb_sub(trigger, move |_, mut ep| {
let efd = slot.borrow_mut().take().unwrap();
let af = af.clone();
cb_sub(efd, move |_, _| af.set(true))
.register_into(&mut ep)
.unwrap();
})
.register_into(&mut ep)
.unwrap();
let dup = unsafe { BorrowedFd::borrow_raw(raw_trigger) }
.try_clone_to_owned()
.unwrap();
fire(&unsafe { EventFd::from_owned_fd(dup) });
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert!(
!added_fired.get(),
"newly-added fd must not fire in same batch"
);
let dup2 = unsafe { BorrowedFd::borrow_raw(added_raw) }
.try_clone_to_owned()
.unwrap();
fire(&unsafe { EventFd::from_owned_fd(dup2) });
ep.run_once_with_timeout(poll_timeout()).unwrap();
assert!(added_fired.get());
}
#[test]
fn modify_updates_subscriber_interest_cell() {
let mut ep = Eventp::default();
let efd = new_eventfd();
let raw = efd.as_fd().as_raw_fd();
cb_sub(efd, |_, _| {}).register_into(&mut ep).unwrap();
let new_interest = crate::interest().read().write();
ep.modify(raw, new_interest).unwrap();
let stored = ep
.registered
.get_mut(&raw)
.unwrap()
.try_deref_mut()
.unwrap()
.interest()
.get();
assert_eq!(stored, new_interest);
}
#[test]
fn recursive_run_inside_handler_panics() {
let mut ep = Eventp::default();
let efd = new_eventfd();
cb_sub(efd, |_, mut ep| {
let inner: &mut Eventp = unsafe { ep.0.as_mut().get_unchecked_mut() };
let _ = inner.run_once_with_timeout(EpollTimeout::from(1u16));
})
.register_into(&mut ep)
.unwrap();
let raw = *ep.registered.keys().next().unwrap();
let dup = unsafe { BorrowedFd::borrow_raw(raw) }
.try_clone_to_owned()
.unwrap();
fire(&unsafe { EventFd::from_owned_fd(dup) });
let result = catch_unwind(AssertUnwindSafe(|| {
ep.run_once_with_timeout(poll_timeout()).unwrap();
}));
assert!(result.is_err(), "recursive run_once must panic");
}
#[test]
fn into_inner_returns_registered_subscribers() {
let mut ep = Eventp::default();
let efd = new_eventfd();
let raw = efd.as_fd().as_raw_fd();
cb_sub(efd, |_, _| {}).register_into(&mut ep).unwrap();
let (_epoll, mut registered) = ep.into_inner();
assert!(registered.any(|s| {
let boxed: Box<dyn Subscriber<Eventp>> = s.try_into().ok().unwrap();
boxed.as_fd().as_raw_fd() == raw
}));
}
#[test]
fn timeout_with_no_ready_fd_does_not_dispatch() {
let mut ep = Eventp::default();
let efd = new_eventfd();
let counter = Rc::new(Cell::new(0u32));
let c = counter.clone();
cb_sub(efd, move |_, _| c.set(c.get() + 1))
.register_into(&mut ep)
.unwrap();
let start = std::time::Instant::now();
ep.run_once_with_timeout(EpollTimeout::from(20u16)).unwrap();
assert!(start.elapsed() >= Duration::from_millis(15));
assert_eq!(counter.get(), 0);
}
}