extern crate mio;
extern crate tokio_io;
use std::{
io,
ops::Deref,
sync::Arc,
};
use self::{
mio::{
event::Evented,
unix::EventedFd,
},
tokio_io::AsyncRead,
};
use futures::{
Async,
Poll,
Stream,
};
use tokio_reactor::{
Handle,
PollEvented,
};
use events::{
Event,
EventOwned,
};
use fd_guard::FdGuard;
use util::read_into_buffer;
pub struct EventStream<'buffer> {
fd: PollEvented<EventedFdGuard>,
buffer: &'buffer mut [u8],
buffer_pos: usize,
unused_bytes: usize,
}
impl<'buffer> EventStream<'buffer> {
pub(crate) fn new(fd: Arc<FdGuard>, buffer: &'buffer mut [u8]) -> Self {
EventStream {
fd: PollEvented::new(EventedFdGuard(fd)),
buffer: buffer,
buffer_pos: 0,
unused_bytes: 0,
}
}
pub(crate) fn new_with_handle(
fd : Arc<FdGuard>,
handle: &Handle,
buffer: &'buffer mut [u8],
)
-> io::Result<Self>
{
Ok(EventStream {
fd: PollEvented::new_with_handle(EventedFdGuard(fd), handle)?,
buffer: buffer,
buffer_pos: 0,
unused_bytes: 0,
})
}
}
impl<'buffer> Stream for EventStream<'buffer> {
type Item = EventOwned;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>
{
if self.unused_bytes == 0 {
self.buffer_pos = 0;
self.unused_bytes = try_ready!(self.fd.poll_read(&mut self.buffer));
}
if self.unused_bytes == 0 {
return Ok(Async::Ready(None));
}
let (bytes_consumed, event) = Event::from_buffer(
Arc::downgrade(self.fd.get_ref()),
&self.buffer[self.buffer_pos..],
);
self.buffer_pos += bytes_consumed;
self.unused_bytes -= bytes_consumed;
Ok(Async::Ready(Some(event.into_owned())))
}
}
#[derive(Clone, Debug, PartialEq)]
struct EventedFdGuard(Arc<FdGuard>);
impl Evented for EventedFdGuard {
#[inline]
fn register(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt)
-> io::Result<()>
{
EventedFd(&(self.fd)).register(poll, token, interest, opts)
}
#[inline]
fn reregister(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt)
-> io::Result<()>
{
EventedFd(&(self.fd)).reregister(poll, token, interest, opts)
}
#[inline]
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
EventedFd(&self.fd).deregister(poll)
}
}
impl io::Read for EventedFdGuard {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match read_into_buffer(self.fd, buf) {
i if i >= 0 => Ok(i as usize),
_ => Err(io::Error::last_os_error()),
}
}
}
impl Deref for EventedFdGuard {
type Target = Arc<FdGuard>;
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}