extern crate divbuf;
extern crate futures;
extern crate nix;
extern crate sysctl;
extern crate tempdir;
extern crate tokio;
extern crate tokio_file;
use divbuf::DivBufShared;
use futures::future;
use nix::unistd::{SysconfVar, sysconf};
use std::borrow::Borrow;
use sysctl::CtlValue;
use tempdir::TempDir;
use tokio_file::File;
use tokio::executor::current_thread;
use tokio::reactor::Handle;
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}
#[test]
fn writev_at_eio() {
let alm = sysconf(SysconfVar::AIO_LISTIO_MAX).expect("sysconf").unwrap();
let maqpp = if let CtlValue::Int(x) = sysctl::value(
"vfs.aio.max_aio_queue_per_proc").unwrap(){
x
} else {
panic!("unknown sysctl");
};
let mut ops_per_listio = 0;
let mut num_listios = 0;
for i in (1..alm).rev() {
let _ops_per_listio = f64::from(i as u32);
let _num_listios = (f64::from(maqpp) / _ops_per_listio).ceil();
let delayed = _ops_per_listio * _num_listios - f64::from(maqpp);
if delayed > 0.01 {
ops_per_listio = i as usize;
num_listios = _num_listios as usize;
break
}
}
if num_listios == 0 {
panic!("Can't find a configuration for max_aio_queue_per_proc={} AIO_LISTIO_MAX={}");
}
let dir = t!(TempDir::new("tokio-file"));
let path = dir.path().join("writev_at_eio");
let file = t!(File::open(&path, Handle::current()));
let dbses: Vec<_> = (0..num_listios).map(|_| {
(0..ops_per_listio).map(|_| {
DivBufShared::from(vec![0u8; 4096])
}).collect::<Vec<_>>()
}).collect();
let futs: Vec<_> = (0..num_listios).map(|i| {
let mut wbufs: Vec<Box<Borrow<[u8]>>> = Vec::with_capacity(ops_per_listio);
for j in 0..ops_per_listio {
let wbuf = dbses[i][j].try().unwrap();
wbufs.push(Box::new(wbuf));
}
file.writev_at(wbufs, 4096 * (i * ops_per_listio) as u64)
.ok()
.expect("writev_at failed early")
}).collect();
let wi = t!(current_thread::block_on_all(future::lazy(|| {
future::join_all(futs)
})));
for lio_result in wi {
for aio_result in lio_result {
assert_eq!(aio_result.value.unwrap() as usize, 4096);
}
}
}