use std::io;
use std::task::Waker;
use std::time::Duration;
use std::sync::Arc;
#[derive(Clone, Copy, Default)]
pub struct Readiness {
pub readable: bool,
pub writable: bool,
}
use mio::event::Source;
use mio::{Events, Interest, Poll, Token};
const WAKER_TOKEN: Token = Token(usize::MAX);
pub(crate) struct IoDriver {
poll: Poll,
events: Events,
mio_waker: Arc<mio::Waker>,
wakers: Vec<Option<Waker>>,
readiness: Vec<Readiness>,
next_free: Vec<usize>,
free_head: usize,
}
const NO_FREE: usize = usize::MAX;
impl IoDriver {
pub(crate) fn new(event_capacity: usize, token_capacity: usize) -> io::Result<Self> {
let poll = Poll::new()?;
let mio_waker = Arc::new(mio::Waker::new(poll.registry(), WAKER_TOKEN)?);
let events = Events::with_capacity(event_capacity);
let mut wakers = Vec::with_capacity(token_capacity);
let mut readiness = Vec::with_capacity(token_capacity);
let mut next_free = Vec::with_capacity(token_capacity);
wakers.resize_with(token_capacity, || None);
readiness.resize(token_capacity, Readiness::default());
for i in 0..token_capacity {
next_free.push(if i + 1 < token_capacity {
i + 1
} else {
NO_FREE
});
}
Ok(Self {
poll,
events,
mio_waker,
wakers,
readiness,
next_free,
free_head: if token_capacity > 0 { 0 } else { NO_FREE },
})
}
pub(crate) fn mio_waker(&self) -> Arc<mio::Waker> {
Arc::clone(&self.mio_waker)
}
pub(crate) fn registry(&self) -> &mio::Registry {
self.poll.registry()
}
pub(crate) fn claim_token(&mut self, waker: Waker) -> Token {
let idx = if self.free_head == NO_FREE {
let idx = self.wakers.len();
self.wakers.push(None);
self.readiness.push(Readiness::default());
self.next_free.push(NO_FREE);
idx
} else {
let idx = self.free_head;
self.free_head = self.next_free[idx];
idx
};
self.wakers[idx] = Some(waker);
Token(idx)
}
pub(crate) fn release_token(&mut self, token: Token) {
let idx = token.0;
if idx < self.wakers.len() {
self.wakers[idx] = None;
self.next_free[idx] = self.free_head;
self.free_head = idx;
}
}
pub(crate) fn set_waker(&mut self, token: Token, waker: Waker) {
if let Some(slot) = self.wakers.get_mut(token.0) {
*slot = Some(waker);
}
}
pub(crate) fn readiness(&self, token: Token) -> Readiness {
self.readiness.get(token.0).copied().unwrap_or_default()
}
pub(crate) fn clear_readable(&mut self, token: Token) {
if let Some(r) = self.readiness.get_mut(token.0) {
r.readable = false;
}
}
pub(crate) fn clear_writable(&mut self, token: Token) {
if let Some(r) = self.readiness.get_mut(token.0) {
r.writable = false;
}
}
pub(crate) fn poll_io(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
self.poll.poll(&mut self.events, timeout)?;
let mut woken = 0;
for event in &self.events {
let token = event.token();
if token == WAKER_TOKEN {
continue;
}
let idx = token.0;
if let Some(r) = self.readiness.get_mut(idx) {
if event.is_readable() {
r.readable = true;
}
if event.is_writable() {
r.writable = true;
}
}
if let Some(Some(waker)) = self.wakers.get(idx) {
waker.wake_by_ref();
woken += 1;
}
}
Ok(woken)
}
}
#[derive(Clone, Copy)]
pub struct IoHandle {
registry: *const mio::Registry,
driver: *mut IoDriver,
}
impl IoHandle {
pub(crate) fn new(driver: &mut IoDriver) -> Self {
Self {
registry: std::ptr::from_ref(driver.registry()),
driver: std::ptr::from_mut(driver),
}
}
#[must_use]
pub fn current() -> IoHandle {
let ptr = crate::context::current_io_ptr();
assert!(
!ptr.is_null(),
"IoHandle::current() called outside Runtime::block_on"
);
IoHandle::new(unsafe { &mut *ptr })
}
pub fn register(
&self,
source: &mut impl Source,
interest: Interest,
waker: Waker,
) -> io::Result<Token> {
let driver = unsafe { &mut *self.driver };
let token = driver.claim_token(waker);
let registry = unsafe { &*self.registry };
if let Err(e) = registry.register(source, token, interest) {
driver.release_token(token);
return Err(e);
}
Ok(token)
}
pub fn set_waker(&self, token: Token, waker: Waker) {
let driver = unsafe { &mut *self.driver };
driver.set_waker(token, waker);
}
pub fn readiness(&self, token: Token) -> Readiness {
let driver = unsafe { &*self.driver };
driver.readiness(token)
}
pub fn clear_readable(&self, token: Token) {
let driver = unsafe { &mut *self.driver };
driver.clear_readable(token);
}
pub fn clear_writable(&self, token: Token) {
let driver = unsafe { &mut *self.driver };
driver.clear_writable(token);
}
pub unsafe fn deregister(&self, source: &mut impl Source, token: Token) -> io::Result<()> {
let driver = unsafe { &mut *self.driver };
let registry = unsafe { &*self.registry };
registry.deregister(source)?;
driver.release_token(token);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(expected = "called outside Runtime::block_on")]
fn current_panics_outside_runtime() {
let _ = IoHandle::current();
}
}