use {Handle, Direction};
use futures::{Async, Poll};
use futures::task::{self, Task};
use mio::{self, Evented};
use std::{io, mem, usize};
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
#[derive(Debug)]
pub struct Registration {
inner: UnsafeCell<Option<Inner>>,
state: AtomicUsize,
}
#[derive(Debug)]
struct Inner {
handle: Handle,
token: usize,
}
#[derive(Debug)]
struct Node {
direction: Direction,
task: Task,
next: Option<Box<Node>>,
}
const INIT: usize = 0;
const LOCKED: usize = 1;
const READY: usize = 2;
const LIFECYCLE_MASK: usize = 0b11;
const ERROR: usize = usize::MAX;
impl Registration {
pub fn new() -> Registration {
Registration {
inner: UnsafeCell::new(None),
state: AtomicUsize::new(INIT),
}
}
pub fn register<T>(&self, io: &T) -> io::Result<bool>
where T: Evented,
{
self.register2(io, || Handle::try_current())
}
pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
where T: Evented,
{
if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } {
inner.deregister(io)?;
}
Ok(())
}
pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool>
where T: Evented,
{
self.register2(io, || Ok(handle.clone()))
}
fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
where T: Evented,
F: Fn() -> io::Result<Handle>,
{
let mut state = self.state.load(SeqCst);
loop {
match state {
INIT => {
let handle = f()?;
let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
if actual != state {
state = actual;
continue;
}
let (inner, res) = Inner::new(io, handle);
unsafe { *self.inner.get() = Some(inner); }
let actual = self.state.swap(READY, SeqCst);
let ptr = actual & !LIFECYCLE_MASK;
if ptr != 0 {
let mut read = false;
let mut write = false;
let mut curr = unsafe { Box::from_raw(ptr as *mut Node) };
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
loop {
let node = *curr;
let Node {
direction,
task,
next,
} = node;
let flag = match direction {
Direction::Read => &mut read,
Direction::Write => &mut write,
};
if !*flag {
*flag = true;
inner.register(direction, task);
}
match next {
Some(next) => curr = next,
None => break,
}
}
}
return res.map(|_| true);
}
_ => return Ok(false),
}
}
}
pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Read, true)
.map(|v| match v {
Some(v) => Async::Ready(v),
_ => Async::NotReady,
})
}
pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Read, false)
}
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Write, true)
.map(|v| match v {
Some(v) => Async::Ready(v),
_ => Async::NotReady,
})
}
pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Write, false)
}
fn poll_ready(&self, direction: Direction, notify: bool)
-> io::Result<Option<mio::Ready>>
{
let mut state = self.state.load(SeqCst);
let mut node = None;
loop {
match state {
INIT => {
return Err(io::Error::new(io::ErrorKind::Other, "must call `register`
before poll_read_ready"));
}
READY => {
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
return inner.poll_ready(direction, notify);
}
_ => {
if !notify {
return Ok(None);
}
let ptr = state & !LIFECYCLE_MASK;
let mut n = node.take().unwrap_or_else(|| {
Box::new(Node {
direction,
task: task::current(),
next: None,
})
});
n.next = if ptr == 0 {
None
} else {
Some(unsafe { Box::from_raw(ptr as *mut Node) })
};
let ptr = Box::into_raw(n);
let next = ptr as usize | (state & LIFECYCLE_MASK);
let actual = self.state.compare_and_swap(state, next, SeqCst);
if actual != state {
let mut n = unsafe { Box::from_raw(ptr) };
mem::forget(n.next.take());
node = Some(n);
state = actual;
continue;
}
return Ok(None);
}
}
}
}
}
unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}
impl Inner {
fn new<T>(io: &T, handle: Handle) -> (Self, io::Result<()>)
where T: Evented,
{
let mut res = Ok(());
let token = match handle.inner() {
Some(inner) => match inner.add_source(io) {
Ok(token) => token,
Err(e) => {
res = Err(e);
ERROR
}
},
None => {
res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
ERROR
}
};
let inner = Inner {
handle,
token,
};
(inner, res)
}
fn register(&self, direction: Direction, task: Task) {
if self.token == ERROR {
task.notify();
return;
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => {
task.notify();
return;
}
};
inner.register(self.token, direction, task);
}
fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
if self.token == ERROR {
return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor"));
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.deregister_source(io)
}
fn poll_ready(&self, direction: Direction, notify: bool)
-> io::Result<Option<mio::Ready>>
{
if self.token == ERROR {
return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor"));
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
let mask = direction.mask();
let mask_no_hup = (mask - ::platform::hup()).as_usize();
let io_dispatch = inner.io_dispatch.read().unwrap();
let sched = &io_dispatch[self.token];
let mut ready = mask & mio::Ready::from_usize(
sched.readiness.fetch_and(!mask_no_hup, SeqCst));
if ready.is_empty() && notify {
match direction {
Direction::Read => sched.reader.register(),
Direction::Write => sched.writer.register(),
}
ready = mask & mio::Ready::from_usize(
sched.readiness.fetch_and(!mask_no_hup, SeqCst));
}
if ready.is_empty() {
Ok(None)
} else {
Ok(Some(ready))
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
if self.token == ERROR {
return;
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
inner.drop_source(self.token);
}
}