#![cfg(unix)]
extern crate conch_runtime;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
#[macro_use]
mod support;
pub use self::support::*;
use conch_runtime::env::AsyncIoEnvironment;
use conch_runtime::io::{FileDesc, Pipe};
use conch_runtime::os::unix::env::EventedAsyncIoEnv;
use conch_runtime::os::unix::io::{FileDescExt, MaybeEventedFd};
use std::fs::File;
use std::io::{ErrorKind, Read, Result, Write};
use std::time::Duration;
use std::thread;
use tokio_io::AsyncRead;
use tokio_io::io::read_to_end;
use tokio_core::reactor::Core;
struct TimesRead<R> {
times_read: usize,
times_would_block: usize,
reader: R,
}
impl<R> TimesRead<R> {
fn new(reader: R) -> Self {
TimesRead {
times_read: 0,
times_would_block: 0,
reader: reader,
}
}
}
impl<R: AsyncRead> AsyncRead for TimesRead<R> {}
impl<R: Read> Read for TimesRead<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
match self.reader.read(buf) {
ret@Ok(0) => ret,
ret@Ok(_) => {
self.times_read += 1;
ret
},
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
self.times_would_block += 1;
}
Err(e)
},
}
}
}
#[test]
fn evented_is_async() {
let msg = "hello world";
let Pipe { reader, mut writer } = Pipe::new().expect("failed to create pipe");
let mut lp = Core::new().expect("failed to create event loop");
let reader = reader.into_evented2(&lp.handle())
.expect("failed to register reader with event loop");
let reader = if let MaybeEventedFd::Registered(fd) = reader {
fd
} else {
panic!("unexpected result: {:#?}", reader);
};
let join_handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
for c in msg.as_bytes() {
writer.write(&[*c]).expect("failed to write byte");
thread::sleep(Duration::from_millis(10));
}
});
let (tr, data) = lp.run(read_to_end(TimesRead::new(reader), vec!()))
.map(|(tr, data)| (tr, String::from_utf8(data).expect("invaild utf8")))
.expect("future did not exit successfully");
join_handle.join().expect("thread did not exit successfully");
assert_eq!(data, msg);
assert!(tr.times_read > 1);
assert!(tr.times_would_block > 1);
}
#[test]
fn evented_supports_regular_files() {
let tempdir = mktmp!();
let path = tempdir.path().join("sample_file");
let msg = "hello\nworld\n";
let mut lp = Core::new().expect("failed to create event loop");
let mut env = EventedAsyncIoEnv::new(lp.remote());
lp.run(futures::lazy(|| {
let fd = File::create(&path)
.map(FileDesc::from)
.expect("failed to create file");
env.write_all(fd, msg.to_owned().into_bytes())
})).expect("failed to write data");
let fd = File::open(path)
.map(FileDesc::from)
.expect("failed to open file");
let data = lp.run(read_to_end(env.read_async(fd), vec!()))
.map(|(_, data)| String::from_utf8(data).expect("invaild utf8"))
.expect("future did not exit successfully");
assert_eq!(data, msg);
}