mod common;
use common::{TempFile, poll_until_recv, setup_tcp_pair};
use lio::api::resource::Resource;
use lio::{Lio, api};
use std::os::fd::FromRawFd;
use std::sync::mpsc;
use std::time::{Duration, Instant};
#[test]
fn test_concurrent_nop_1000() {
let mut lio = Lio::new(256).unwrap();
let (sender, receiver) = mpsc::channel();
let total_ops = 1000;
let max_pending = 128;
let start = Instant::now();
let mut submitted = 0;
let mut completed = 0;
while completed < total_ops {
while submitted < total_ops && (submitted - completed) < max_pending {
api::nop().with_lio(&mut lio).send_with(sender.clone());
submitted += 1;
}
lio.run_timeout(Duration::from_micros(100)).unwrap();
loop {
match receiver.try_recv() {
Ok(result) => {
result.expect("nop should succeed");
completed += 1;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
}
}
let elapsed = start.elapsed();
let ops_per_sec = total_ops as f64 / elapsed.as_secs_f64();
eprintln!(
"1000 nops completed in {:?} ({:.0} ops/sec)",
elapsed, ops_per_sec
);
}
#[test]
fn test_concurrent_nop_10000() {
let mut lio = Lio::new(1024).unwrap();
let (sender, receiver) = mpsc::channel();
let total_ops = 10000;
let max_pending = 500;
let start = Instant::now();
let mut submitted = 0;
let mut completed = 0;
while completed < total_ops {
while submitted < total_ops && (submitted - completed) < max_pending {
api::nop().with_lio(&mut lio).send_with(sender.clone());
submitted += 1;
}
lio.run_timeout(Duration::from_micros(100)).unwrap();
loop {
match receiver.try_recv() {
Ok(result) => {
result.expect("nop should succeed");
completed += 1;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
}
}
let elapsed = start.elapsed();
let ops_per_sec = total_ops as f64 / elapsed.as_secs_f64();
eprintln!(
"10000 nops completed in {:?} ({:.0} ops/sec)",
elapsed, ops_per_sec
);
}
#[test]
fn test_concurrent_file_writes_256() {
let mut lio = Lio::new(512).unwrap();
let cwd = unsafe { Resource::from_raw_fd(libc::AT_FDCWD) };
let (sender_open, receiver_open) = mpsc::channel();
let (sender_write, receiver_write) = mpsc::channel();
let num_files = 256;
let mut temps = Vec::new();
for i in 0..num_files {
let temp = TempFile::new(&format!("stress_write_{}", i));
api::openat(
&cwd,
temp.path.clone(),
libc::O_CREAT | libc::O_WRONLY | libc::O_TRUNC,
)
.with_lio(&mut lio)
.send_with(sender_open.clone());
temps.push(temp);
}
let mut fds = Vec::new();
for i in 0..num_files {
let fd = poll_until_recv(&mut lio, &receiver_open)
.expect(&format!("Failed to open file {}", i));
fds.push(fd);
}
let start = Instant::now();
for (i, fd) in fds.iter().enumerate() {
let data = format!("Data for file {}\n", i).into_bytes();
api::write(fd, data).with_lio(&mut lio).send_with(sender_write.clone());
}
for i in 0..num_files {
let (result, _) = poll_until_recv(&mut lio, &receiver_write);
result.expect(&format!("Failed to write to file {}", i));
}
let elapsed = start.elapsed();
eprintln!("256 concurrent file writes completed in {:?}", elapsed);
std::mem::forget(cwd);
}
#[test]
fn test_concurrent_connections_100() {
let mut lio = Lio::new(1024).unwrap();
let num_connections = 100;
let mut pairs = Vec::new();
let start = Instant::now();
for i in 0..num_connections {
let pair = setup_tcp_pair(&mut lio);
pairs.push(pair);
if (i + 1) % 20 == 0 {
eprintln!("Created {} connections...", i + 1);
}
}
let connect_elapsed = start.elapsed();
eprintln!("100 connections established in {:?}", connect_elapsed);
let (sender_send, receiver_send) = mpsc::channel();
for (i, pair) in pairs.iter().enumerate() {
let data = format!("Message from connection {}", i).into_bytes();
api::send(&pair.client_sock, data, None)
.with_lio(&mut lio)
.send_with(sender_send.clone());
}
for i in 0..num_connections {
let (result, _) = poll_until_recv(&mut lio, &receiver_send);
result.expect(&format!("Send {} failed", i));
}
let send_elapsed = start.elapsed();
eprintln!("All sends completed in {:?}", send_elapsed);
let (sender_recv, receiver_recv) = mpsc::channel();
for pair in &pairs {
api::recv(&pair.accepted_fd, vec![0u8; 128], None)
.with_lio(&mut lio)
.send_with(sender_recv.clone());
}
for i in 0..num_connections {
let (result, buf) = poll_until_recv(&mut lio, &receiver_recv);
let bytes = result.expect(&format!("Recv {} failed", i)) as usize;
assert!(bytes > 0, "Should receive data");
let msg = String::from_utf8_lossy(&buf[..bytes]);
assert!(msg.starts_with("Message from connection"));
}
let total_elapsed = start.elapsed();
eprintln!("100 connections total time: {:?}", total_elapsed);
}
#[test]
fn test_sustained_io_5_seconds() {
let mut lio = Lio::new(1024).unwrap();
let duration = Duration::from_secs(5);
let start = Instant::now();
let mut ops_completed = 0u64;
let mut pending = 0u64;
let (sender, receiver) = mpsc::channel();
let max_pending = 500;
while start.elapsed() < duration {
while pending < max_pending {
api::nop().with_lio(&mut lio).send_with(sender.clone());
pending += 1;
}
lio.run_timeout(Duration::from_micros(100)).unwrap();
loop {
match receiver.try_recv() {
Ok(result) => {
result.expect("nop should succeed");
ops_completed += 1;
pending -= 1;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
}
}
while pending > 0 {
lio.run_timeout(Duration::from_millis(5)).unwrap();
loop {
match receiver.try_recv() {
Ok(result) => {
result.expect("nop should succeed");
ops_completed += 1;
pending -= 1;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => panic!("Channel disconnected"),
}
}
}
let elapsed = start.elapsed();
let ops_per_sec = ops_completed as f64 / elapsed.as_secs_f64();
eprintln!(
"Sustained load: {} ops in {:?} ({:.0} ops/sec)",
ops_completed, elapsed, ops_per_sec
);
}
#[test]
fn test_sequential_nop_200() {
let mut lio = Lio::new(64).unwrap();
let num_ops = 200;
let start = Instant::now();
for i in 0..num_ops {
let (sender, receiver) = mpsc::channel();
api::nop().with_lio(&mut lio).send_with(sender);
let result = poll_until_recv(&mut lio, &receiver);
result.expect(&format!("Op {} failed", i));
}
let elapsed = start.elapsed();
eprintln!(
"{} sequential nops in {:?} ({:.0} ops/sec)",
num_ops,
elapsed,
num_ops as f64 / elapsed.as_secs_f64()
);
}
#[test]
fn test_mixed_op_types() {
let mut lio = Lio::new(64).unwrap();
let cwd = unsafe { Resource::from_raw_fd(libc::AT_FDCWD) };
let temp =
TempFile::new(&format!("mixed_ops_{:?}", std::thread::current().id()));
let (sender_open, receiver_open) = mpsc::channel();
api::openat(
&cwd,
temp.path.clone(),
libc::O_CREAT | libc::O_RDWR | libc::O_TRUNC,
)
.with_lio(&mut lio)
.send_with(sender_open);
let fd =
poll_until_recv(&mut lio, &receiver_open).expect("Failed to create file");
let start = Instant::now();
let mut ops_completed = 0;
let num_rounds = 50;
for i in 0..num_rounds {
let (sender, receiver) = mpsc::channel();
api::nop().with_lio(&mut lio).send_with(sender);
poll_until_recv(&mut lio, &receiver).expect("nop failed");
ops_completed += 1;
let (sender, receiver) = mpsc::channel();
let data = format!("Round {} data\n", i).into_bytes();
api::write(&fd, data).with_lio(&mut lio).send_with(sender);
poll_until_recv(&mut lio, &receiver).0.expect("write failed");
ops_completed += 1;
if i % 10 == 0 {
let (sender, receiver) = mpsc::channel();
api::fsync(&fd).with_lio(&mut lio).send_with(sender);
poll_until_recv(&mut lio, &receiver).expect("fsync failed");
ops_completed += 1;
}
}
let elapsed = start.elapsed();
eprintln!(
"Mixed op types: {} ops in {:?} ({:.0} ops/sec)",
ops_completed,
elapsed,
ops_completed as f64 / elapsed.as_secs_f64()
);
std::mem::forget(cwd);
}