pub mod background;
pub mod observer;
use background::Background;
use crossbeam::queue::SegQueue;
use futures::{
self,
task::{AtomicWaker, Waker},
};
use mio::{Evented, Events, Poll, PollOpt, Ready, Token};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::{
collections::HashMap,
io,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
},
time::Duration,
usize,
};
static DEFAULT_REACTOR: Lazy<Background> =
Lazy::new(|| Background::new(Reactor::new()).expect("could not create reactor thread"));
pub fn register<E: Evented>(
resource: &E,
interest: Ready,
opts: PollOpt,
) -> io::Result<Arc<IoWaker>> {
DEFAULT_REACTOR.register(resource, interest, opts)
}
pub fn handle() -> Handle {
DEFAULT_REACTOR.handle()
}
const MAX_EVENTS: usize = 2048;
#[derive(Debug)]
pub struct Reactor {
shared: Arc<Shared>,
events: Events,
}
impl Reactor {
pub fn new() -> Self {
Self::new_priv(None)
}
pub fn with_capacity(capacity: usize) -> Self {
assert_ne!(
capacity, 0,
"Can not create a reactor which polls for 0 events."
);
Self::new_priv(Some(capacity))
}
pub fn register<E: Evented>(
&self,
resource: &E,
interest: Ready,
opts: PollOpt,
) -> io::Result<Arc<IoWaker>> {
let token = match self.shared.tokens.pop() {
Ok(token) => token,
Err(_) => {
let id = self.shared.current_token.fetch_add(1, Ordering::Relaxed);
if id == usize::MAX {
panic!(
"Registered more than {} Evented types! How did you manage that?",
usize::MAX - 1
);
}
Token(id)
}
};
let io_waker = Arc::new(IoWaker {
token,
readiness: AtomicUsize::new(0),
read_waker: AtomicWaker::new(),
write_waker: AtomicWaker::new(),
});
self.shared.poll.register(resource, token, interest, opts)?;
self.shared
.resources
.lock()
.insert(token, Arc::clone(&io_waker));
Ok(io_waker)
}
pub fn reregister<E: Evented>(
&self,
resource: &E,
io_waker: &IoWaker,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
self.shared
.poll
.reregister(resource, io_waker.token, interest, opts)
}
pub fn deregister<E: Evented>(&self, resource: &E, io_waker: &IoWaker) -> io::Result<()> {
self.shared.poll.deregister(resource)?;
self.shared.resources.lock().remove(&io_waker.token);
self.shared.tokens.push(io_waker.token);
Ok(())
}
pub fn handle(&self) -> Handle {
Handle(Arc::downgrade(&self.shared))
}
pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
let events_polled = self.shared.poll.poll(&mut self.events, timeout)?;
let resources = self.shared.resources.lock();
for event in self.events.iter() {
let token = event.token();
if let Some(resource) = resources.get(&token) {
resource.wake_if_ready(event.readiness());
}
}
self.events.clear();
Ok(events_polled)
}
fn new_priv(capacity: Option<usize>) -> Reactor {
let capacity = capacity.unwrap_or(MAX_EVENTS);
let poll = Poll::new().expect("Failed to create new Poll instance!");
let shared = Arc::new(Shared {
poll,
tokens: SegQueue::new(),
current_token: AtomicUsize::new(0),
resources: Mutex::new(HashMap::new()),
});
Self {
shared,
events: Events::with_capacity(capacity),
}
}
}
#[derive(Debug, Clone)]
pub struct Handle(Weak<Shared>);
impl Handle {
pub fn register<E: Evented>(
&self,
resource: &E,
interest: Ready,
opts: PollOpt,
) -> io::Result<Arc<IoWaker>> {
match self.0.upgrade() {
Some(inner) => {
let token = match inner.tokens.pop() {
Ok(token) => token,
Err(_) => {
let id = inner.current_token.fetch_add(1, Ordering::AcqRel);
if id == usize::MAX {
panic!(
"Registered more than {} resources! How did you manage that?",
usize::MAX - 1
);
}
Token(id)
}
};
let io_waker = Arc::new(IoWaker {
token,
readiness: AtomicUsize::new(0),
read_waker: AtomicWaker::new(),
write_waker: AtomicWaker::new(),
});
inner.poll.register(resource, token, interest, opts)?;
inner.resources.lock().insert(token, Arc::clone(&io_waker));
Ok(io_waker)
}
None => Err(io::Error::new(io::ErrorKind::Other, "No Reactor")),
}
}
pub fn reregister<E: Evented>(
&self,
resource: &E,
io_waker: &IoWaker,
interest: Ready,
opts: PollOpt,
) -> io::Result<()> {
match self.0.upgrade() {
Some(inner) => inner
.poll
.reregister(resource, io_waker.token, interest, opts),
None => Err(io::Error::new(io::ErrorKind::Other, "No Reactor")),
}
}
pub fn deregister<E: Evented>(&self, resource: &E, io_waker: &IoWaker) -> io::Result<()> {
match self.0.upgrade() {
Some(inner) => {
inner.poll.deregister(resource)?;
inner.resources.lock().remove(&io_waker.token);
inner.tokens.push(io_waker.token);
Ok(())
}
None => Err(io::Error::new(io::ErrorKind::Other, "No Reactor")),
}
}
}
#[derive(Debug)]
struct Shared {
poll: Poll,
tokens: SegQueue<Token>,
current_token: AtomicUsize,
resources: Mutex<HashMap<Token, Arc<IoWaker>>>,
}
#[derive(Debug)]
pub struct IoWaker {
token: Token,
readiness: AtomicUsize,
read_waker: AtomicWaker,
write_waker: AtomicWaker,
}
impl IoWaker {
fn wake_if_ready(&self, ready: Ready) {
self.readiness.fetch_or(ready.as_usize(), Ordering::AcqRel);
if ready.is_readable() {
self.read_waker.wake();
}
if ready.is_writable() {
self.write_waker.wake();
}
}
fn register_read(&self, waker: &Waker) {
self.read_waker.register(waker)
}
fn register_write(&self, waker: &Waker) {
self.write_waker.register(waker)
}
fn clear_read(&self) {
self.readiness
.fetch_and(!Ready::readable().as_usize(), Ordering::AcqRel);
}
fn clear_write(&self) {
self.readiness
.fetch_and(!Ready::writable().as_usize(), Ordering::AcqRel);
}
}