use std::cell::RefCell;
use std::io;
use crate::platform::sys::{Events, Interest, RawSource};
#[cfg(target_os = "linux")]
mod epoll;
#[cfg(target_os = "linux")]
use epoll::EpollReactor as PlatformReactor;
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
mod kqueue;
#[cfg(any(target_os = "macos", target_os = "freebsd"))]
use kqueue::KqueueReactor as PlatformReactor;
#[cfg(target_os = "windows")]
mod iocp;
#[cfg(target_os = "windows")]
use iocp::IocpReactor as PlatformReactor;
pub mod source;
pub mod waker_registry;
pub use source::IoSource;
pub(crate) use waker_registry::WakerRegistry;
pub(crate) trait ReactorBackend: Sized {
fn new() -> io::Result<Self>;
fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()>;
fn reregister(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()>;
fn deregister(&self, source: RawSource) -> io::Result<()>;
fn poll(&self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize>;
}
pub struct Reactor {
inner: PlatformReactor,
pub(crate) wakers: WakerRegistry,
}
impl Reactor {
pub fn new() -> io::Result<Self> {
Ok(Self {
inner: PlatformReactor::new()?,
wakers: WakerRegistry::new(),
})
}
#[inline]
pub fn register(&self, source: RawSource, token: usize, interest: Interest) -> io::Result<()> {
self.inner.register(source, token, interest)
}
#[inline]
pub fn reregister(
&self,
source: RawSource,
token: usize,
interest: Interest,
) -> io::Result<()> {
self.inner.reregister(source, token, interest)
}
#[inline]
pub fn deregister(&self, source: RawSource) -> io::Result<()> {
self.inner.deregister(source)
}
pub(crate) fn deregister_with_token(
&mut self,
source: RawSource,
token: usize,
) -> io::Result<()> {
self.wakers.remove_token(token);
self.inner.deregister(source)
}
pub fn poll(&mut self, events: &mut Events, timeout_ms: Option<u64>) -> io::Result<usize> {
let n = self.inner.poll(events, timeout_ms)?;
for ev in events.iter() {
self.wakers.wake_token(ev.token, ev.readable, ev.writable);
}
Ok(n)
}
pub(crate) fn poll_raw(
&self,
events: &mut Events,
timeout_ms: Option<u64>,
) -> io::Result<usize> {
self.inner.poll(events, timeout_ms)
}
}
thread_local! {
static REACTOR: RefCell<Reactor> = RefCell::new(
Reactor::new().expect("failed to initialise platform reactor")
);
}
pub fn with_reactor<F, R>(f: F) -> R
where
F: FnOnce(&Reactor) -> R,
{
REACTOR.with(|cell| f(&cell.borrow()))
}
pub(crate) fn with_reactor_mut<F, R>(f: F) -> R
where
F: FnOnce(&mut Reactor) -> R,
{
REACTOR.with(|cell| f(&mut cell.borrow_mut()))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::platform::sys::{create_pipe, events_with_capacity};
#[test]
fn reactor_initialises_via_thread_local() {
with_reactor(|_r| {});
}
#[cfg(unix)]
#[test]
fn reactor_register_deregister_roundtrip() {
let (r, w) = create_pipe().unwrap();
with_reactor(|reactor| {
reactor
.register(r, 10, Interest::READABLE)
.expect("register");
reactor.deregister(r).expect("deregister");
});
unsafe { libc::close(r) };
unsafe { libc::close(w) };
}
#[test]
fn reactor_poll_timeout_zero_returns_immediately() {
let mut events = events_with_capacity(16);
with_reactor_mut(|reactor| {
let n = reactor.poll(&mut events, Some(0)).expect("poll");
assert_eq!(n, 0);
});
}
}