#![allow(dead_code)]
use std::fmt;
use std::sync::{Arc, Mutex};
use mio::{self, Evented};
use once_cell::sync::Lazy;
use slab::Slab;
use async_std::io;
use async_std::task::{Context, Poll, Waker};
#[derive(Debug)]
struct Entry {
token: mio::Token,
readers: Mutex<Readers>,
writers: Mutex<Writers>,
}
#[derive(Debug)]
struct Readers {
ready: bool,
wakers: Vec<Waker>,
}
#[derive(Debug)]
struct Writers {
ready: bool,
wakers: Vec<Waker>,
}
struct Reactor {
poller: mio::Poll,
entries: Mutex<Slab<Arc<Entry>>>,
notify_reg: (mio::Registration, mio::SetReadiness),
notify_token: mio::Token,
}
impl Reactor {
fn new() -> io::Result<Reactor> {
let poller = mio::Poll::new()?;
let notify_reg = mio::Registration::new2();
let mut reactor = Reactor {
poller,
entries: Mutex::new(Slab::new()),
notify_reg,
notify_token: mio::Token(0),
};
let entry = reactor.register(&reactor.notify_reg.0)?;
reactor.notify_token = entry.token;
Ok(reactor)
}
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
let mut entries = self.entries.lock().unwrap();
let vacant = entries.vacant_entry();
let token = mio::Token(vacant.key());
let entry = Arc::new(Entry {
token,
readers: Mutex::new(Readers {
ready: false,
wakers: Vec::new(),
}),
writers: Mutex::new(Writers {
ready: false,
wakers: Vec::new(),
}),
});
vacant.insert(entry.clone());
let interest = mio::Ready::all();
let opts = mio::PollOpt::edge();
self.poller.register(source, token, interest, opts)?;
Ok(entry)
}
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
self.poller.deregister(source)?;
self.entries.lock().unwrap().remove(entry.token.0);
Ok(())
}
}
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
std::thread::Builder::new()
.name("async-std/net".to_string())
.spawn(move || {
main_loop().expect("async networking thread has panicked");
})
.expect("cannot start a thread driving blocking tasks");
Reactor::new().expect("cannot initialize reactor")
});
fn main_loop() -> io::Result<()> {
let reactor = &REACTOR;
let mut events = mio::Events::with_capacity(1000);
loop {
reactor.poller.poll(&mut events, None)?;
let entries = reactor.entries.lock().unwrap();
for event in events.iter() {
let token = event.token();
if token == reactor.notify_token {
reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
} else {
if let Some(entry) = entries.get(token.0) {
let readiness = event.readiness();
if !(readiness & reader_interests()).is_empty() {
let mut readers = entry.readers.lock().unwrap();
readers.ready = true;
for w in readers.wakers.drain(..) {
w.wake();
}
}
if !(readiness & writer_interests()).is_empty() {
let mut writers = entry.writers.lock().unwrap();
writers.ready = true;
for w in writers.wakers.drain(..) {
w.wake();
}
}
}
}
}
}
}
pub(crate) struct Watcher<T: Evented> {
entry: Arc<Entry>,
source: Option<T>,
}
impl<T: Evented> Watcher<T> {
pub(crate) fn new(source: T) -> Watcher<T> {
Watcher {
entry: REACTOR
.register(&source)
.expect("cannot register an I/O event source"),
source: Some(source),
}
}
pub(crate) fn get_ref(&self) -> &T {
self.source.as_ref().unwrap()
}
pub(crate) fn poll_read_with<'a, F, R>(
&'a self,
cx: &mut Context<'_>,
mut f: F,
) -> Poll<io::Result<R>>
where
F: FnMut(&'a T) -> io::Result<R>,
{
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
let mut readers = self.entry.readers.lock().unwrap();
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
readers.wakers.push(cx.waker().clone());
}
readers.ready = false;
Poll::Pending
}
pub(crate) fn poll_write_with<'a, F, R>(
&'a self,
cx: &mut Context<'_>,
mut f: F,
) -> Poll<io::Result<R>>
where
F: FnMut(&'a T) -> io::Result<R>,
{
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
let mut writers = self.entry.writers.lock().unwrap();
match f(self.source.as_ref().unwrap()) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
writers.wakers.push(cx.waker().clone());
}
writers.ready = false;
Poll::Pending
}
#[allow(dead_code)]
pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
let mut readers = self.entry.readers.lock().unwrap();
if readers.ready {
return Poll::Ready(());
}
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
readers.wakers.push(cx.waker().clone());
}
Poll::Pending
}
pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
let mut writers = self.entry.writers.lock().unwrap();
if writers.ready {
return Poll::Ready(());
}
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
writers.wakers.push(cx.waker().clone());
}
Poll::Pending
}
#[allow(dead_code)]
pub(crate) fn into_inner(mut self) -> T {
let source = self.source.take().unwrap();
REACTOR
.deregister(&source, &self.entry)
.expect("cannot deregister I/O event source");
source
}
}
impl<T: Evented> Drop for Watcher<T> {
fn drop(&mut self) {
if let Some(ref source) = self.source {
REACTOR
.deregister(source, &self.entry)
.expect("cannot deregister I/O event source");
}
}
}
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Watcher")
.field("entry", &self.entry)
.field("source", &self.source)
.finish()
}
}
#[inline]
fn reader_interests() -> mio::Ready {
mio::Ready::all() - mio::Ready::writable()
}
#[inline]
fn writer_interests() -> mio::Ready {
mio::Ready::writable() | hup()
}
#[inline]
fn hup() -> mio::Ready {
#[cfg(unix)]
let ready = mio::unix::UnixReady::hup().into();
#[cfg(not(unix))]
let ready = mio::Ready::empty();
ready
}