use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::mem;
use std::panic;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
use std::task::{ready, Context, Poll, Waker};
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
use polling::{Event, Events, Poller};
use slab::Slab;
cfg_if::cfg_if! {
if #[cfg(windows)] {
mod windows;
pub use windows::Registration;
} else if #[cfg(any(
target_vendor = "apple",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))] {
mod kqueue;
pub use kqueue::Registration;
} else if #[cfg(unix)] {
mod unix;
pub use unix::Registration;
} else {
compile_error!("unsupported platform");
}
}
#[cfg(not(target_os = "espidf"))]
const TIMER_QUEUE_SIZE: usize = 1000;
#[cfg(target_os = "espidf")]
const TIMER_QUEUE_SIZE: usize = 100;
const READ: usize = 0;
const WRITE: usize = 1;
pub(crate) struct Reactor {
pub(crate) poller: Poller,
ticker: AtomicUsize,
sources: Mutex<Slab<Arc<Source>>>,
events: Mutex<Events>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
timer_ops: ConcurrentQueue<TimerOp>,
}
impl Reactor {
pub(crate) fn get() -> &'static Reactor {
static REACTOR: OnceLock<Reactor> = OnceLock::new();
REACTOR.get_or_init(|| {
crate::driver::init();
Reactor {
poller: Poller::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
sources: Mutex::new(Slab::new()),
events: Mutex::new(Events::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(TIMER_QUEUE_SIZE),
}
})
}
pub(crate) fn ticker(&self) -> usize {
self.ticker.load(Ordering::SeqCst)
}
pub(crate) fn insert_io(&self, raw: Registration) -> io::Result<Arc<Source>> {
let source = {
let mut sources = self.sources.lock().unwrap();
let key = sources.vacant_entry().key();
let source = Arc::new(Source {
registration: raw,
key,
state: Default::default(),
});
sources.insert(source.clone());
source
};
if let Err(err) = source.registration.add(&self.poller, source.key) {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
return Err(err);
}
Ok(source)
}
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
source.registration.delete(&self.poller)
}
pub(crate) fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
while self
.timer_ops
.push(TimerOp::Insert(when, id, waker.clone()))
.is_err()
{
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
self.notify();
id
}
pub(crate) fn remove_timer(&self, when: Instant, id: usize) {
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
}
}
pub(crate) fn notify(&self) {
self.poller.notify().expect("failed to notify reactor");
}
pub(crate) fn lock(&self) -> ReactorLock<'_> {
let reactor = self;
let events = self.events.lock().unwrap();
ReactorLock { reactor, events }
}
pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
self.events.try_lock().ok().map(|events| {
let reactor = self;
ReactorLock { reactor, events }
})
}
fn process_timers(&self, wakers: &mut Vec<Waker>) -> Option<Duration> {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("process_timers");
#[cfg(feature = "tracing")]
let _enter = span.enter();
let mut timers = self.timers.lock().unwrap();
self.process_timer_ops(&mut timers);
let now = Instant::now();
let pending = timers.split_off(&(now + Duration::from_nanos(1), 0));
let ready = mem::replace(&mut *timers, pending);
let dur = if ready.is_empty() {
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
Some(Duration::from_secs(0))
};
drop(timers);
#[cfg(feature = "tracing")]
tracing::trace!("{} ready wakers", ready.len());
for (_, waker) in ready {
wakers.push(waker);
}
dur
}
fn process_timer_ops(&self, timers: &mut MutexGuard<'_, BTreeMap<(Instant, usize), Waker>>) {
self.timer_ops
.try_iter()
.take(self.timer_ops.capacity().unwrap())
.for_each(|op| match op {
TimerOp::Insert(when, id, waker) => {
timers.insert((when, id), waker);
}
TimerOp::Remove(when, id) => {
timers.remove(&(when, id));
}
});
}
}
pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor,
events: MutexGuard<'a, Events>,
}
impl ReactorLock<'_> {
pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
#[cfg(feature = "tracing")]
let span = tracing::trace_span!("react");
#[cfg(feature = "tracing")]
let _enter = span.enter();
let mut wakers = Vec::new();
let next_timer = self.reactor.process_timers(&mut wakers);
let timeout = match (next_timer, timeout) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.min(b)),
};
let tick = self
.reactor
.ticker
.fetch_add(1, Ordering::SeqCst)
.wrapping_add(1);
self.events.clear();
let res = match self.reactor.poller.wait(&mut self.events, timeout) {
Ok(0) => {
if timeout != Some(Duration::from_secs(0)) {
self.reactor.process_timers(&mut wakers);
}
Ok(())
}
Ok(_) => {
let sources = self.reactor.sources.lock().unwrap();
for ev in self.events.iter() {
if let Some(source) = sources.get(ev.key) {
let mut state = source.state.lock().unwrap();
for &(dir, emitted) in &[(WRITE, ev.writable), (READ, ev.readable)] {
if emitted {
state[dir].tick = tick;
state[dir].drain_into(&mut wakers);
}
}
if !state[READ].is_empty() || !state[WRITE].is_empty() {
let event = {
let mut event = Event::none(source.key);
event.readable = !state[READ].is_empty();
event.writable = !state[WRITE].is_empty();
event
};
source.registration.modify(&self.reactor.poller, event)?;
}
}
}
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
Err(err) => Err(err),
};
#[cfg(feature = "tracing")]
tracing::trace!("{} ready wakers", wakers.len());
for waker in wakers {
panic::catch_unwind(|| waker.wake()).ok();
}
res
}
}
enum TimerOp {
Insert(Instant, usize, Waker),
Remove(Instant, usize),
}
#[derive(Debug)]
pub(crate) struct Source {
registration: Registration,
key: usize,
state: Mutex<[Direction; 2]>,
}
#[derive(Debug, Default)]
struct Direction {
tick: usize,
ticks: Option<(usize, usize)>,
waker: Option<Waker>,
wakers: Slab<Option<Waker>>,
}
impl Direction {
fn is_empty(&self) -> bool {
self.waker.is_none() && self.wakers.iter().all(|(_, opt)| opt.is_none())
}
fn drain_into(&mut self, dst: &mut Vec<Waker>) {
if let Some(w) = self.waker.take() {
dst.push(w);
}
for (_, opt) in self.wakers.iter_mut() {
if let Some(w) = opt.take() {
dst.push(w);
}
}
}
}
impl Source {
pub(crate) fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(READ, cx)
}
pub(crate) fn poll_writable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_ready(WRITE, cx)
}
fn poll_ready(&self, dir: usize, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut state = self.state.lock().unwrap();
if let Some((a, b)) = state[dir].ticks {
if state[dir].tick != a && state[dir].tick != b {
state[dir].ticks = None;
return Poll::Ready(Ok(()));
}
}
let was_empty = state[dir].is_empty();
if let Some(w) = state[dir].waker.take() {
if w.will_wake(cx.waker()) {
state[dir].waker = Some(w);
return Poll::Pending;
}
panic::catch_unwind(|| w.wake()).ok();
}
state[dir].waker = Some(cx.waker().clone());
state[dir].ticks = Some((Reactor::get().ticker(), state[dir].tick));
if was_empty {
let event = {
let mut event = Event::none(self.key);
event.readable = !state[READ].is_empty();
event.writable = !state[WRITE].is_empty();
event
};
self.registration.modify(&Reactor::get().poller, event)?;
}
Poll::Pending
}
pub(crate) fn readable<T>(handle: &crate::Async<T>) -> Readable<'_, T> {
Readable(Self::ready(handle, READ))
}
pub(crate) fn readable_owned<T>(handle: Arc<crate::Async<T>>) -> ReadableOwned<T> {
ReadableOwned(Self::ready(handle, READ))
}
pub(crate) fn writable<T>(handle: &crate::Async<T>) -> Writable<'_, T> {
Writable(Self::ready(handle, WRITE))
}
pub(crate) fn writable_owned<T>(handle: Arc<crate::Async<T>>) -> WritableOwned<T> {
WritableOwned(Self::ready(handle, WRITE))
}
fn ready<H: Borrow<crate::Async<T>> + Clone, T>(handle: H, dir: usize) -> Ready<H, T> {
Ready {
handle,
dir,
ticks: None,
index: None,
_capture: PhantomData,
}
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Readable<'a, T>(Ready<&'a crate::Async<T>, T>);
impl<T> Future for Readable<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "readable");
Poll::Ready(Ok(()))
}
}
impl<T> fmt::Debug for Readable<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Readable").finish()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
impl<T> Future for ReadableOwned<T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "readable_owned");
Poll::Ready(Ok(()))
}
}
impl<T> fmt::Debug for ReadableOwned<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadableOwned").finish()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Writable<'a, T>(Ready<&'a crate::Async<T>, T>);
impl<T> Future for Writable<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "writable");
Poll::Ready(Ok(()))
}
}
impl<T> fmt::Debug for Writable<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Writable").finish()
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WritableOwned<T>(Ready<Arc<crate::Async<T>>, T>);
impl<T> Future for WritableOwned<T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(&mut self.0).poll(cx))?;
#[cfg(feature = "tracing")]
tracing::trace!(fd = ?self.0.handle.source.registration, "writable_owned");
Poll::Ready(Ok(()))
}
}
impl<T> fmt::Debug for WritableOwned<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WritableOwned").finish()
}
}
struct Ready<H: Borrow<crate::Async<T>>, T> {
handle: H,
dir: usize,
ticks: Option<(usize, usize)>,
index: Option<usize>,
_capture: PhantomData<fn() -> T>,
}
impl<H: Borrow<crate::Async<T>>, T> Unpin for Ready<H, T> {}
impl<H: Borrow<crate::Async<T>> + Clone, T> Future for Ready<H, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
ref handle,
dir,
ticks,
index,
..
} = &mut *self;
let mut state = handle.borrow().source.state.lock().unwrap();
if let Some((a, b)) = *ticks {
if state[*dir].tick != a && state[*dir].tick != b {
return Poll::Ready(Ok(()));
}
}
let was_empty = state[*dir].is_empty();
let i = match *index {
Some(i) => i,
None => {
let i = state[*dir].wakers.insert(None);
*index = Some(i);
*ticks = Some((Reactor::get().ticker(), state[*dir].tick));
i
}
};
state[*dir].wakers[i] = Some(cx.waker().clone());
if was_empty {
let event = {
let mut event = Event::none(handle.borrow().source.key);
event.readable = !state[READ].is_empty();
event.writable = !state[WRITE].is_empty();
event
};
handle
.borrow()
.source
.registration
.modify(&Reactor::get().poller, event)?;
}
Poll::Pending
}
}
impl<H: Borrow<crate::Async<T>>, T> Drop for Ready<H, T> {
fn drop(&mut self) {
if let Some(key) = self.index {
let mut state = self.handle.borrow().source.state.lock().unwrap();
let wakers = &mut state[self.dir].wakers;
if wakers.contains(key) {
wakers.remove(key);
}
}
}
}