use env::SubEnvironment;
use futures::{Async, Future, Poll, Sink, Stream};
use futures::stream::Fuse;
use futures::sync::mpsc::{channel, Receiver};
use futures_cpupool::{CpuFuture, CpuPool};
use io::FileDesc;
use mio::would_block;
use std::io::{BufRead, BufReader, Error as IoError, ErrorKind, Read, Result, Write};
use std::fmt;
use tokio_core::reactor::Remote;
use tokio_io::AsyncRead;
use void::Void;
pub trait AsyncIoEnvironment {
type Read: AsyncRead;
type WriteAll: Future<Item = (), Error = IoError>;
fn read_async(&mut self, fd: FileDesc) -> Self::Read;
fn write_all(&mut self, fd: FileDesc, data: Vec<u8>) -> Self::WriteAll;
fn write_all_best_effort(&mut self, fd: FileDesc, data: Vec<u8>);
}
impl<'a, T: ?Sized + AsyncIoEnvironment> AsyncIoEnvironment for &'a mut T {
type Read = T::Read;
type WriteAll = T::WriteAll;
fn read_async(&mut self, fd: FileDesc) -> Self::Read {
(**self).read_async(fd)
}
fn write_all(&mut self, fd: FileDesc, data: Vec<u8>) -> Self::WriteAll {
(**self).write_all(fd, data)
}
fn write_all_best_effort(&mut self, fd: FileDesc, data: Vec<u8>) {
(**self).write_all_best_effort(fd, data);
}
}
#[derive(Debug)]
pub struct PlatformSpecificRead(
#[cfg(unix)] ::os::unix::env::ReadAsync,
#[cfg(not(unix))] ReadAsync,
);
impl AsyncRead for PlatformSpecificRead {}
impl Read for PlatformSpecificRead {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
fn assert_async_read<T: AsyncRead>(_: &T) {}
assert_async_read(&self.0);
self.0.read(buf)
}
}
#[allow(missing_debug_implementations)]
pub struct PlatformSpecificWriteAll(
#[cfg(unix)] ::os::unix::env::WriteAll,
#[cfg(not(unix))] CpuFuture<(), IoError>,
);
impl Future for PlatformSpecificWriteAll {
type Item = ();
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
}
}
#[derive(Debug, Clone)]
pub struct PlatformSpecificAsyncIoEnv {
#[cfg(unix)]
inner: ::os::unix::env::EventedAsyncIoEnv,
#[cfg(not(unix))]
inner: ThreadPoolAsyncIoEnv,
}
impl PlatformSpecificAsyncIoEnv {
pub fn new(remote: Remote, fallback_num_threads: Option<usize>) -> Self {
#[cfg(unix)]
let get_inner = |remote: Remote, _: Option<usize>| {
::os::unix::env::EventedAsyncIoEnv::new(remote)
};
#[cfg(not(unix))]
let get_inner = |_: Remote, num_threads: Option<usize>| {
num_threads.map_or_else(
|| ThreadPoolAsyncIoEnv::new_num_cpus(),
ThreadPoolAsyncIoEnv::new
)
};
PlatformSpecificAsyncIoEnv {
inner: get_inner(remote, fallback_num_threads),
}
}
}
impl SubEnvironment for PlatformSpecificAsyncIoEnv {
fn sub_env(&self) -> Self {
self.clone()
}
}
impl AsyncIoEnvironment for PlatformSpecificAsyncIoEnv {
type Read = PlatformSpecificRead;
type WriteAll = PlatformSpecificWriteAll;
fn read_async(&mut self, fd: FileDesc) -> Self::Read {
PlatformSpecificRead(self.inner.read_async(fd))
}
fn write_all(&mut self, fd: FileDesc, data: Vec<u8>) -> Self::WriteAll {
PlatformSpecificWriteAll(self.inner.write_all(fd, data))
}
fn write_all_best_effort(&mut self, fd: FileDesc, data: Vec<u8>) {
self.inner.write_all_best_effort(fd, data);
}
}
#[derive(Clone)]
pub struct ThreadPoolAsyncIoEnv {
pool: CpuPool, }
impl SubEnvironment for ThreadPoolAsyncIoEnv {
fn sub_env(&self) -> Self {
self.clone()
}
}
impl ThreadPoolAsyncIoEnv {
pub fn new(size: usize) -> Self {
ThreadPoolAsyncIoEnv {
pool: CpuPool::new(size),
}
}
pub fn new_num_cpus() -> Self {
ThreadPoolAsyncIoEnv {
pool: CpuPool::new_num_cpus(),
}
}
}
impl fmt::Debug for ThreadPoolAsyncIoEnv {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ThreadPoolAsyncIoEnv")
.field("pool", &"..")
.finish()
}
}
pub struct ReadAsync {
_cpu_future: CpuFuture<(), Void>,
rx: Fuse<Receiver<Vec<u8>>>,
buf: Option<Vec<u8>>,
}
impl fmt::Debug for ReadAsync {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ReadAsync")
.field("buf", &self.buf)
.finish()
}
}
impl AsyncRead for ReadAsync {}
impl Read for ReadAsync {
fn read(&mut self, mut buf: &mut [u8]) -> Result<usize> {
loop {
match self.buf {
None => {},
Some(ref data) if data.is_empty() => {},
Some(ref mut data) => {
let n = ::std::cmp::min(data.len(), try!(buf.write(data)));
let drain = data.drain(0..n);
drop(drain);
return Ok(n);
},
}
match self.rx.poll() {
Ok(Async::Ready(maybe_buf)) => {
self.buf = maybe_buf;
if self.buf.is_none() {
return Ok(0);
}
},
Ok(Async::NotReady) => return Err(would_block()),
Err(()) => {
self.buf = None;
return Ok(0);
},
};
}
}
}
impl AsyncIoEnvironment for ThreadPoolAsyncIoEnv {
type Read = ReadAsync;
type WriteAll = CpuFuture<(), IoError>;
fn read_async(&mut self, fd: FileDesc) -> Self::Read {
let _ = try_set_blocking(&fd);
let (mut tx, rx) = channel(0);
let cpu = self.pool.spawn_fn(move || {
let mut buf_reader = BufReader::new(fd);
loop {
let num_consumed = match buf_reader.fill_buf() {
Ok(filled_buf) => {
if filled_buf.is_empty() {
break;
}
let buf = Vec::from(filled_buf);
let len = buf.len();
match tx.send(buf).wait() {
Ok(t) => tx = t,
Err(_) => break,
}
len
},
Err(ref e) if e.kind() == ErrorKind::Interrupted => 0,
Err(_) => break,
};
buf_reader.consume(num_consumed);
}
Ok(())
});
ReadAsync {
_cpu_future: cpu,
rx: rx.fuse(),
buf: None,
}
}
fn write_all(&mut self, mut fd: FileDesc, data: Vec<u8>) -> Self::WriteAll {
let _ = try_set_blocking(&fd);
self.pool.spawn_fn(move || {
try!(fd.write_all(&data));
fd.flush()
})
}
fn write_all_best_effort(&mut self, fd: FileDesc, data: Vec<u8>) {
self.write_all(fd, data).forget();
}
}
#[cfg(unix)]
fn try_set_blocking(fd: &FileDesc) -> Result<()> {
use os::unix::io::FileDescExt;
fd.set_nonblock(false)
}
#[cfg(not(unix))]
fn try_set_blocking(_fd: &FileDesc) -> Result<()> {
Ok(())
}