extern crate divbuf;
extern crate futures;
extern crate nix;
extern crate sysctl;
extern crate tempdir;
extern crate tokio;
extern crate tokio_file;
use divbuf::DivBufShared;
use futures::future::lazy;
use futures::{Future, future};
use tempdir::TempDir;
use tokio_file::{AioResult, File};
use tokio::executor::current_thread;
use tokio::reactor::Handle;
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}
#[test]
fn write_at_eagain() {
let limit = sysctl::value("vfs.aio.max_aio_queue_per_proc").unwrap();
let count = if let sysctl::CtlValue::Int(x) = limit {
(2 * x) as usize
} else {
panic!("sysctl: {:?}", limit);
};
let dir = t!(TempDir::new("tokio-file"));
let path = dir.path().join("write_at_eagain.0");
let file = t!(File::open(&path, Handle::current()));
let dbses: Vec<_> = (0..count).map(|_| {
DivBufShared::from(vec![0u8; 4096])
}).collect();
let futs : Vec<_> = (0..count).map(|i| {
let wbuf = Box::new(dbses[i].try().unwrap());
file.write_at(wbuf, 4096 * i as u64).unwrap()
.map(|r| -> Result<AioResult, nix::Error> {
Ok(r)
})
.or_else(|e| -> Result<Result<AioResult, nix::Error>, ()> {
Ok(Err(e))
})
}).collect();
let wi = t!(current_thread::block_on_all(lazy(|| {
future::join_all(futs)
})));
let mut n_ok = 0;
let mut n_eagain = 0;
for i in 0..count {
match wi[i].as_ref() {
Ok(aio_result) => {
n_ok += 1;
assert_eq!(aio_result.value.unwrap(), 4096);
},
Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)) => n_eagain += 1,
Err(e) => panic!("unexpected result {:?}", e)
}
}
assert!(n_ok >= count / 2);
assert!(n_eagain > 1);
}