use crate::reactor::platform;
use crate::reactor::registration::Registration;
use futures::io::{AsyncRead, AsyncWrite};
use futures::{ready, Poll};
use mio;
use mio::event::Evented;
use std::fmt;
use std::io::{self, Read, Write};
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::task::Context;
pub struct PollEvented<E: Evented> {
io: Option<E>,
inner: Inner,
}
impl<E: Evented> Unpin for PollEvented<E> {}
struct Inner {
registration: Registration,
read_readiness: AtomicUsize,
write_readiness: AtomicUsize,
}
impl<E> PollEvented<E>
where
E: Evented,
{
pub fn new(io: E) -> PollEvented<E> {
PollEvented {
io: Some(io),
inner: Inner {
registration: Registration::new(),
read_readiness: AtomicUsize::new(0),
write_readiness: AtomicUsize::new(0),
},
}
}
pub fn get_ref(&self) -> &E {
self.io.as_ref().unwrap()
}
pub fn get_mut(&mut self) -> &mut E {
self.io.as_mut().unwrap()
}
pub fn poll_read_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<mio::Ready>> {
self.register()?;
let mut cached = self.inner.read_readiness.load(Relaxed);
let mask = mio::Ready::readable() | platform::hup();
let mut ret = mio::Ready::from_usize(cached) & mio::Ready::readable();
if ret.is_empty() {
loop {
let ready = ready!(self.inner.registration.poll_read_ready(cx)?);
cached |= ready.as_usize();
self.inner.read_readiness.store(cached, Relaxed);
ret |= ready & mask;
if !ret.is_empty() {
return Poll::Ready(Ok(ret));
}
}
} else {
if let Some(ready) = self.inner.registration.take_read_ready()? {
cached |= ready.as_usize();
self.inner.read_readiness.store(cached, Relaxed);
}
Poll::Ready(Ok(mio::Ready::from_usize(cached)))
}
}
pub fn clear_read_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Result<()> {
self.inner
.read_readiness
.fetch_and(!mio::Ready::readable().as_usize(), Relaxed);
if self.poll_read_ready(cx)?.is_ready() {
cx.waker().wake_by_ref();
}
Ok(())
}
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<Result<mio::Ready, io::Error>> {
self.register()?;
let mut cached = self.inner.write_readiness.load(Relaxed);
let mask = mio::Ready::writable() | platform::hup();
let mut ret = mio::Ready::from_usize(cached) & mio::Ready::writable();
if ret.is_empty() {
loop {
let ready = ready!(self.inner.registration.poll_write_ready(cx)?);
cached |= ready.as_usize();
self.inner.write_readiness.store(cached, Relaxed);
ret |= ready & mask;
if !ret.is_empty() {
return Poll::Ready(Ok(ret));
}
}
} else {
if let Some(ready) = self.inner.registration.take_write_ready()? {
cached |= ready.as_usize();
self.inner.write_readiness.store(cached, Relaxed);
}
Poll::Ready(Ok(mio::Ready::from_usize(cached)))
}
}
pub fn clear_write_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> io::Result<()> {
self.inner
.write_readiness
.fetch_and(!mio::Ready::writable().as_usize(), Relaxed);
if self.poll_write_ready(cx)?.is_ready() {
cx.waker().wake_by_ref();
}
Ok(())
}
fn register(&self) -> io::Result<()> {
self.inner
.registration
.register(self.io.as_ref().unwrap())?;
Ok(())
}
}
impl<E> AsyncRead for PollEvented<E>
where
E: Evented + Read,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
ready!(Pin::new(&mut *self).poll_read_ready(cx)?);
let r = PollEvented::get_mut(&mut *self).read(buf);
if is_wouldblock(&r) {
self.clear_read_ready(cx)?;
Poll::Pending
} else {
Poll::Ready(r)
}
}
}
impl<E> AsyncWrite for PollEvented<E>
where
E: Evented + Write,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
ready!(self.poll_write_ready(cx)?);
let r = PollEvented::get_mut(&mut *self).write(buf);
if is_wouldblock(&r) {
self.clear_write_ready(cx)?;
Poll::Pending
} else {
Poll::Ready(r)
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.poll_write_ready(cx)?);
let r = PollEvented::get_mut(&mut *self).flush();
if is_wouldblock(&r) {
self.clear_write_ready(cx)?;
Poll::Pending
} else {
Poll::Ready(r)
}
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollEvented").field("io", &self.io).finish()
}
}
impl<E: Evented> Drop for PollEvented<E> {
fn drop(&mut self) {
if let Some(io) = self.io.take() {
let _ = self.inner.registration.deregister(&io);
}
}
}