use super::{Direction, HandlePriv};
use mio::{self, Evented};
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::task::{Context, Poll, Waker};
use std::{io, ptr, usize};
#[derive(Debug)]
pub(crate) struct Registration {
inner: UnsafeCell<Option<Inner>>,
state: AtomicUsize,
}
#[derive(Debug)]
struct Inner {
handle: HandlePriv,
token: usize,
}
#[derive(Debug)]
struct Node {
direction: Direction,
waker: *const Waker,
next: *mut Node,
}
const INIT: usize = 0;
const LOCKED: usize = 1;
const READY: usize = 2;
const LIFECYCLE_MASK: usize = 0b11;
const ERROR: usize = usize::MAX;
impl Registration {
pub fn new() -> Registration {
Registration {
inner: UnsafeCell::new(None),
state: AtomicUsize::new(INIT),
}
}
pub fn register(&self, io: &impl Evented) -> io::Result<bool> {
self.register2(io, || HandlePriv::try_current())
}
pub fn deregister(&mut self, io: &impl Evented) -> io::Result<()> {
if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } {
inner.deregister(io)?;
}
Ok(())
}
fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
where
T: Evented,
F: Fn() -> io::Result<HandlePriv>,
{
let mut state = self.state.load(SeqCst);
loop {
match state {
INIT => {
let handle = f()?;
let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
if actual != state {
state = actual;
continue;
}
let (inner, res) = Inner::new(io, handle);
unsafe {
*self.inner.get() = Some(inner);
}
let actual = self.state.swap(READY, SeqCst);
let mut read = false;
let mut write = false;
let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node;
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
while !ptr.is_null() {
let node = unsafe { Box::from_raw(ptr) };
let node = *node;
let Node {
direction,
waker,
next,
} = node;
let flag = match direction {
Direction::Read => &mut read,
Direction::Write => &mut write,
};
if !*flag {
*flag = true;
let waker = unsafe { &*waker };
let mut cx = Context::from_waker(&waker);
inner.register(&mut cx, direction);
}
ptr = next;
}
return res.map(|_| true);
}
_ => return Ok(false),
}
}
}
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
match self.poll_ready(Some(cx), Direction::Read) {
Ok(Some(v)) => Poll::Ready(Ok(v)),
Ok(None) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(None, Direction::Read)
}
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
match self.poll_ready(Some(cx), Direction::Write) {
Ok(Some(v)) => Poll::Ready(Ok(v)),
Ok(None) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(None, Direction::Write)
}
fn poll_ready(
&self,
mut cx: Option<&mut Context<'_>>,
direction: Direction,
) -> io::Result<Option<mio::Ready>> {
let mut state = self.state.load(SeqCst);
let mut node = None;
loop {
match state {
INIT => {
return Err(io::Error::new(
io::ErrorKind::Other,
"must call `register`
before poll_read_ready",
));
}
READY => {
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
return inner.poll_ready(cx, direction);
}
LOCKED => {
if cx.is_none() {
return Ok(None);
}
let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;
let cx = cx.as_mut().unwrap();
let mut n = node.take().unwrap_or_else(|| {
Box::new(Node {
direction,
waker: cx.waker(),
next: ptr::null_mut(),
})
});
n.next = next_ptr;
let node_ptr = Box::into_raw(n);
let next = node_ptr as usize | (state & LIFECYCLE_MASK);
let actual = self.state.compare_and_swap(state, next, SeqCst);
if actual != state {
let n = unsafe { Box::from_raw(node_ptr) };
node = Some(n);
state = actual;
continue;
}
return Ok(None);
}
_ => unreachable!(),
}
}
}
}
unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}
impl Inner {
fn new(io: &impl Evented, handle: HandlePriv) -> (Self, io::Result<()>) {
let mut res = Ok(());
let token = match handle.inner() {
Some(inner) => match inner.add_source(io) {
Ok(token) => token,
Err(e) => {
res = Err(e);
ERROR
}
},
None => {
res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
ERROR
}
};
let inner = Inner { handle, token };
(inner, res)
}
fn register(&self, cx: &mut Context<'_>, direction: Direction) {
if self.token == ERROR {
cx.waker().wake_by_ref();
return;
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => {
cx.waker().wake_by_ref();
return;
}
};
inner.register(cx, self.token, direction);
}
fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
if self.token == ERROR {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to associate with reactor",
));
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.deregister_source(io)
}
fn poll_ready(
&self,
cx: Option<&mut Context<'_>>,
direction: Direction,
) -> io::Result<Option<mio::Ready>> {
if self.token == ERROR {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to associate with reactor",
));
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
let mask = direction.mask();
let mask_no_hup = (mask - super::platform::hup()).as_usize();
let io_dispatch = inner.io_dispatch.read();
let sched = &io_dispatch[self.token];
let mut ready =
mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst));
if ready.is_empty() && cx.is_some() {
let cx = cx.unwrap();
match direction {
Direction::Read => sched.reader.register(&cx.waker()),
Direction::Write => sched.writer.register(&cx.waker()),
}
ready = mask & mio::Ready::from_usize(sched.readiness.fetch_and(!mask_no_hup, SeqCst));
}
if ready.is_empty() {
Ok(None)
} else {
Ok(Some(ready))
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
if self.token == ERROR {
return;
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
inner.drop_source(self.token);
}
}