use crate::callback_worker::IOWorkers;
use crate::context::{Driver, setup};
use crate::tasks::{ClosureCb, IOAction, IOEvent};
use crate::test::*;
use crossfire::BlockingTxTrait;
use io_buffer::{Buffer, rand_buffer};
use nix::errno::Errno;
use std::os::fd::AsRawFd;
use std::sync::mpsc::channel as unbounded;
extern crate md5;
#[test]
fn test_read_write_aio() {
setup_log();
let temp_file = make_temp_file();
let owned_fd = create_temp_file(temp_file.as_ref());
let fd = owned_fd.as_raw_fd();
let (tx, rx) = crossfire::mpsc::bounded_blocking(2);
setup::<ClosureCb, _, _>(2, rx, IOWorkers::new(1), Driver::Aio).unwrap();
let (done_tx, done_rx) = unbounded::<Result<Option<Buffer>, Errno>>();
let callback = Box::new(move |_offset: i64, res: Result<Option<Buffer>, Errno>| {
let _ = done_tx.send(res);
});
let buffer3 = Buffer::aligned(4096).unwrap();
let mut event = IOEvent::new(fd, buffer3.clone(), IOAction::Read, 100);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit");
let res = done_rx.recv().unwrap();
match res {
Ok(Some(_buffer2)) => {
assert_eq!(_buffer2.len(), 0);
}
Ok(None) => {
panic!("unexpected empty buffer");
}
Err(e) => {
panic!("unexpected error: {:?}", e);
}
}
for _j in 0..100 {
for i in 0..10 {
let mut buffer = Buffer::aligned(4096).unwrap();
rand_buffer(&mut buffer);
let digest = md5::compute(&buffer);
let mut event = IOEvent::new(fd, buffer, IOAction::Write, 4096 * i as i64);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
let buffer2 = Buffer::aligned(4096).unwrap();
let mut event = IOEvent::new(fd, buffer2, IOAction::Read, 4096 * i as i64);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit");
let res = done_rx.recv().unwrap();
match res {
Ok(Some(_buffer2)) => {
let _digest = md5::compute(&_buffer2);
assert_eq!(_digest, digest);
}
Ok(None) => {
panic!("empty buffer");
}
Err(_) => {
panic!("error");
}
}
}
}
}
#[test]
fn test_fallocate_fsync_aio() {
setup_log();
let temp_file = make_temp_file();
let owned_fd = create_temp_file(temp_file.as_ref());
let fd = owned_fd.as_raw_fd();
let (tx, rx) = crossfire::mpsc::bounded_blocking(2);
setup::<ClosureCb, _, _>(2, rx, IOWorkers::new(1), Driver::Aio).unwrap();
let (done_tx, done_rx) = unbounded::<Result<(), Errno>>();
let callback = Box::new(move |_offset: i64, res: Result<Option<Buffer>, Errno>| {
let _ = done_tx.send(res.map(|_| ()));
});
let fallocate_len = 8192;
let mut event = IOEvent::new_no_buf(fd, IOAction::Alloc, 0, fallocate_len);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit fallocate");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
let metadata = std::fs::metadata(temp_file.as_ref()).unwrap();
assert_eq!(metadata.len(), fallocate_len);
let mut buffer = Buffer::aligned(4096).unwrap();
rand_buffer(&mut buffer);
let mut event = IOEvent::new(fd, buffer.clone(), IOAction::Write, 4096);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit write");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
let mut event = IOEvent::new_no_buf(fd, IOAction::Fsync, 0, 0);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit fsync");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
}
#[test]
fn test_fallocate_fsync_uring() {
setup_log();
let temp_file = make_temp_file();
let owned_fd = create_temp_file(temp_file.as_ref());
let fd = owned_fd.as_raw_fd();
let (tx, rx) = crossfire::mpsc::bounded_blocking(2);
setup::<ClosureCb, _, _>(2, rx, IOWorkers::new(1), Driver::Uring).unwrap();
let (done_tx, done_rx) = unbounded::<Result<(), Errno>>();
let callback = Box::new(move |_offset: i64, res: Result<Option<Buffer>, Errno>| {
let _ = done_tx.send(res.map(|_| ()));
});
let fallocate_len = 8192;
let mut event = IOEvent::new_no_buf(fd, IOAction::Alloc, 0, fallocate_len);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit fallocate");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
let metadata = std::fs::metadata(temp_file.as_ref()).unwrap();
assert_eq!(metadata.len(), fallocate_len);
let mut buffer = Buffer::aligned(4096).unwrap();
rand_buffer(&mut buffer);
let mut event = IOEvent::new(fd, buffer.clone(), IOAction::Write, 4096);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit write");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
let mut event = IOEvent::new_no_buf(fd, IOAction::Fsync, 0, 0);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit fsync");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
}
#[test]
fn test_read_write_uring() {
setup_log();
let temp_file = make_temp_file();
let owned_fd = create_temp_file(temp_file.as_ref());
let fd = owned_fd.as_raw_fd();
let (tx, rx) = crossfire::mpsc::bounded_blocking(2);
setup::<ClosureCb, _, _>(2, rx, IOWorkers::new(1), Driver::Uring).unwrap();
let (done_tx, done_rx) = unbounded::<Result<Option<Buffer>, Errno>>();
let callback = Box::new(move |_offset: i64, res: Result<Option<Buffer>, Errno>| {
let _ = done_tx.send(res);
});
let buffer3 = Buffer::aligned(4096).unwrap();
let mut event = IOEvent::new(fd, buffer3, IOAction::Read, 100);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit");
let res = done_rx.recv().unwrap();
match res {
Ok(Some(_buffer2)) => {
assert_eq!(_buffer2.len(), 0);
}
Ok(None) => {
panic!("unexpected empty buffer");
}
Err(_) => {
unreachable!("error");
}
}
for _j in 0..100 {
for i in 0..10 {
let mut buffer = Buffer::aligned(4096).unwrap();
rand_buffer(&mut buffer);
let digest = md5::compute(&buffer);
let mut event = IOEvent::new(fd, buffer, IOAction::Write, 4096 * i as i64);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit");
let res = done_rx.recv().unwrap();
assert!(res.is_ok());
let buffer2 = Buffer::aligned(4096).unwrap();
let mut event = IOEvent::new(fd, buffer2, IOAction::Read, 4096 * i as i64);
event.set_callback(ClosureCb(callback.clone()));
tx.send(Box::new(event)).expect("submit");
let res = done_rx.recv().unwrap();
match res {
Ok(Some(_buffer2)) => {
let _digest = md5::compute(&_buffer2);
assert_eq!(_digest, digest);
}
Ok(None) => {
panic!("empty buffer");
}
Err(_) => {
panic!("error");
}
}
}
}
}