use futures::ready;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use tokio::io::unix::{AsyncFd, TryIoError};
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use super::event_err;
use super::{LineEvent, LineEventHandle, Result};
pub struct AsyncLineEventHandle {
asyncfd: AsyncFd<LineEventHandle>,
}
impl AsyncLineEventHandle {
pub fn new(handle: LineEventHandle) -> Result<AsyncLineEventHandle> {
let fd = handle.file.as_raw_fd();
unsafe {
let flags = libc::fcntl(fd, libc::F_GETFL, 0);
libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
}
Ok(AsyncLineEventHandle {
asyncfd: AsyncFd::new(handle)?,
})
}
}
impl Stream for AsyncLineEventHandle {
type Item = Result<LineEvent>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
let mut guard = ready!(self.asyncfd.poll_read_ready_mut(cx))?;
match guard.try_io(|inner| inner.get_mut().read_event()) {
Err(TryIoError { .. }) => {
}
Ok(Ok(Some(event))) => return Poll::Ready(Some(Ok(event))),
Ok(Ok(None)) => return Poll::Ready(Some(Err(event_err(nix::errno::Errno::EIO)))),
Ok(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
}
}
}
}
impl AsRef<LineEventHandle> for AsyncLineEventHandle {
fn as_ref(&self) -> &LineEventHandle {
self.asyncfd.get_ref()
}
}