use {Handle, Registration};
use futures::{task, Async, Poll};
use mio;
use mio::event::Evented;
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "unstable-futures")]
use futures2;
use std::fmt;
use std::io::{self, Read, Write};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
pub struct PollEvented<E: Evented> {
io: Option<E>,
inner: Inner,
}
struct Inner {
registration: Registration,
read_readiness: AtomicUsize,
write_readiness: AtomicUsize,
}
macro_rules! poll_ready {
($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
$me.register()?;
let mut cached = $me.inner.$cache.load(Relaxed);
let mask = $mask | ::platform::hup();
let mut ret = mio::Ready::from_usize(cached) & $mask;
if ret.is_empty() {
loop {
let ready = try_ready!($poll);
cached |= ready.as_usize();
$me.inner.$cache.store(cached, Relaxed);
ret |= ready & mask;
if !ret.is_empty() {
return Ok(ret.into());
}
}
} else {
if let Some(ready) = $me.inner.registration.$take()? {
cached |= ready.as_usize();
$me.inner.$cache.store(cached, Relaxed);
}
Ok(mio::Ready::from_usize(cached).into())
}
}}
}
impl<E> PollEvented<E>
where E: Evented
{
pub fn new(io: E) -> PollEvented<E> {
PollEvented {
io: Some(io),
inner: Inner {
registration: Registration::new(),
read_readiness: AtomicUsize::new(0),
write_readiness: AtomicUsize::new(0),
}
}
}
pub fn new_with_handle(io: E, handle: &Handle) -> io::Result<Self> {
let ret = PollEvented::new(io);
if let Some(handle) = handle.as_priv() {
ret.inner.registration
.register_with_priv(ret.io.as_ref().unwrap(), handle)?;
}
Ok(ret)
}
pub fn get_ref(&self) -> &E {
self.io.as_ref().unwrap()
}
pub fn get_mut(&mut self) -> &mut E {
self.io.as_mut().unwrap()
}
pub fn into_inner(mut self) -> io::Result<E> {
let io = self.io.take().unwrap();
self.inner.registration.deregister(&io)?;
Ok(io)
}
pub fn poll_read_ready(&self, mask: mio::Ready) -> Poll<mio::Ready, io::Error> {
assert!(!mask.is_writable(), "cannot poll for write readiness");
poll_ready!(
self, mask, read_readiness, take_read_ready,
self.inner.registration.poll_read_ready()
)
}
#[cfg(feature = "unstable-futures")]
pub fn poll_read_ready2(&self, cx: &mut futures2::task::Context, mask: mio::Ready)
-> futures2::Poll<mio::Ready, io::Error>
{
assert!(!mask.is_writable(), "cannot poll for write readiness");
let mut res = || poll_ready!(
self, mask, read_readiness, take_read_ready,
self.inner.registration.poll_read_ready2(cx).map(::lower_async)
);
res().map(::lift_async)
}
pub fn clear_read_ready(&self, ready: mio::Ready) -> io::Result<()> {
assert!(!ready.is_writable(), "cannot clear write readiness");
assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness");
self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_read_ready(ready)?.is_ready() {
task::current().notify();
}
Ok(())
}
#[cfg(feature = "unstable-futures")]
pub fn clear_read_ready2(&self, cx: &mut futures2::task::Context, ready: mio::Ready)
-> io::Result<()>
{
assert!(!ready.is_writable(), "cannot clear write readiness");
assert!(!::platform::is_hup(&ready), "cannot clear HUP readiness");
self.inner.read_readiness.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_read_ready2(cx, ready)?.is_ready() {
cx.waker().wake()
}
Ok(())
}
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
poll_ready!(
self,
mio::Ready::writable(),
write_readiness,
take_write_ready,
self.inner.registration.poll_write_ready()
)
}
#[cfg(feature = "unstable-futures")]
pub fn poll_write_ready2(&self, cx: &mut futures2::task::Context)
-> futures2::Poll<mio::Ready, io::Error>
{
let mut res = || poll_ready!(
self,
mio::Ready::writable(),
write_readiness,
take_write_ready,
self.inner.registration.poll_write_ready2(cx).map(::lower_async)
);
res().map(::lift_async)
}
pub fn clear_write_ready(&self) -> io::Result<()> {
let ready = mio::Ready::writable();
self.inner.write_readiness.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_write_ready()?.is_ready() {
task::current().notify();
}
Ok(())
}
#[cfg(feature = "unstable-futures")]
pub fn clear_write_ready2(&self, cx: &mut futures2::task::Context) -> io::Result<()> {
let ready = mio::Ready::writable();
self.inner.write_readiness.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_write_ready2(cx)?.is_ready() {
cx.waker().wake()
}
Ok(())
}
fn register(&self) -> io::Result<()> {
self.inner.registration.register(self.io.as_ref().unwrap())?;
Ok(())
}
}
impl<E> Read for PollEvented<E>
where E: Evented + Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().read(buf);
if is_wouldblock(&r) {
self.clear_read_ready(mio::Ready::readable())?;
}
return r
}
}
#[cfg(feature = "unstable-futures")]
impl<E> futures2::io::AsyncRead for PollEvented<E>
where E: Evented, E: Read,
{
fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8])
-> futures2::Poll<usize, io::Error>
{
if let futures2::Async::Pending = self.poll_read_ready2(cx, mio::Ready::readable())? {
return Ok(futures2::Async::Pending);
}
match self.get_mut().read(buf) {
Ok(n) => Ok(futures2::Async::Ready(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_read_ready2(cx, mio::Ready::readable())?;
Ok(futures2::Async::Pending)
}
Err(e) => Err(e),
}
}
}
impl<E> Write for PollEvented<E>
where E: Evented + Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().write(buf);
if is_wouldblock(&r) {
self.clear_write_ready()?;
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().flush();
if is_wouldblock(&r) {
self.clear_write_ready()?;
}
return r
}
}
#[cfg(feature = "unstable-futures")]
impl<E> futures2::io::AsyncWrite for PollEvented<E>
where E: Evented, E: Write,
{
fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8])
-> futures2::Poll<usize, io::Error>
{
if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
return Ok(futures2::Async::Pending);
}
match self.get_mut().write(buf) {
Ok(n) => Ok(futures2::Async::Ready(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_write_ready2(cx)?;
Ok(futures2::Async::Pending)
}
Err(e) => Err(e),
}
}
fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
return Ok(futures2::Async::Pending);
}
match self.get_mut().flush() {
Ok(_) => Ok(futures2::Async::Ready(())),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_write_ready2(cx)?;
Ok(futures2::Async::Pending)
}
Err(e) => Err(e),
}
}
fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
futures2::io::AsyncWrite::poll_flush(self, cx)
}
}
impl<E> AsyncRead for PollEvented<E>
where E: Evented + Read,
{
}
impl<E> AsyncWrite for PollEvented<E>
where E: Evented + Write,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
impl<'a, E> Read for &'a PollEvented<E>
where E: Evented, &'a E: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read_ready(mio::Ready::readable())? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().read(buf);
if is_wouldblock(&r) {
self.clear_read_ready(mio::Ready::readable())?;
}
return r
}
}
#[cfg(feature = "unstable-futures")]
impl<'a, E> futures2::io::AsyncRead for &'a PollEvented<E>
where E: Evented, &'a E: Read,
{
fn poll_read(&mut self, cx: &mut futures2::task::Context, buf: &mut [u8])
-> futures2::Poll<usize, io::Error>
{
if let futures2::Async::Pending = self.poll_read_ready2(cx, mio::Ready::readable())? {
return Ok(futures2::Async::Pending);
}
match self.get_ref().read(buf) {
Ok(n) => Ok(futures2::Async::Ready(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_read_ready2(cx, mio::Ready::readable())?;
Ok(futures2::Async::Pending)
}
Err(e) => Err(e),
}
}
}
impl<'a, E> Write for &'a PollEvented<E>
where E: Evented, &'a E: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().write(buf);
if is_wouldblock(&r) {
self.clear_write_ready()?;
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().flush();
if is_wouldblock(&r) {
self.clear_write_ready()?;
}
return r
}
}
#[cfg(feature = "unstable-futures")]
impl<'a, E> futures2::io::AsyncWrite for &'a PollEvented<E>
where E: Evented, &'a E: Write,
{
fn poll_write(&mut self, cx: &mut futures2::task::Context, buf: &[u8])
-> futures2::Poll<usize, io::Error>
{
if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
return Ok(futures2::Async::Pending);
}
match self.get_ref().write(buf) {
Ok(n) => Ok(futures2::Async::Ready(n)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_write_ready2(cx)?;
Ok(futures2::Async::Pending)
}
Err(e) => Err(e),
}
}
fn poll_flush(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
if let futures2::Async::Pending = self.poll_write_ready2(cx)? {
return Ok(futures2::Async::Pending);
}
match self.get_ref().flush() {
Ok(_) => Ok(futures2::Async::Ready(())),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_write_ready2(cx)?;
Ok(futures2::Async::Pending)
}
Err(e) => Err(e),
}
}
fn poll_close(&mut self, cx: &mut futures2::task::Context) -> futures2::Poll<(), io::Error> {
futures2::io::AsyncWrite::poll_flush(self, cx)
}
}
impl<'a, E> AsyncRead for &'a PollEvented<E>
where E: Evented, &'a E: Read,
{
}
impl<'a, E> AsyncWrite for &'a PollEvented<E>
where E: Evented, &'a E: Write,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl<E: Evented + fmt::Debug> fmt::Debug for PollEvented<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PollEvented")
.field("io", &self.io)
.finish()
}
}
impl<E: Evented> Drop for PollEvented<E> {
fn drop(&mut self) {
if let Some(io) = self.io.take() {
let _ = self.inner.registration.deregister(&io);
}
}
}