use crate::callback_worker::InlineClosure;
use crate::context::{Driver, setup};
use crate::merge::MergeSubmitter;
use crate::tasks::{IOAction, IOEvent};
use std::os::fd::{AsRawFd, RawFd};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use crate::test::*;
use crossfire::waitgroup::{WaitGroup, WaitGroupGuard};
use io_buffer::Buffer;
use rstest::rstest;
#[rstest]
#[case(Driver::Aio)]
#[case(Driver::Uring)]
fn test_merged_submit(#[case] driver: Driver) {
setup_log();
let temp_file = make_temp_file();
let owned_fd = create_temp_file(temp_file.as_ref());
let fd = owned_fd.as_raw_fd();
let (tx, rx) = crossfire::mpsc::bounded_blocking(128);
type CbFn = Box<dyn Fn(i64, Result<Option<Buffer>, rustix::io::Errno>) + Send>;
let current_cb = Arc::new(Mutex::new(None::<CbFn>));
let current_cb_clone = current_cb.clone();
let worker = InlineClosure(Box::new(move |_guard: WaitGroupGuard<()>, offset, res| {
if let Some(cb) = current_cb_clone.lock().unwrap().as_ref() {
cb(offset, res);
}
}));
setup::<WaitGroupGuard<()>, _, _>(128, rx, worker, driver).unwrap();
let runner = |io_size, batch_num, merge_size_limit| {
_test_merge_submit(
fd,
tx.clone(),
io_size,
batch_num,
merge_size_limit,
current_cb.clone(),
);
};
runner(1024, 1024, 16 * 1024);
runner(1024, 512, 16 * 1024);
runner(1024, 256, 16 * 1024);
runner(1024, 64, 64 * 1024);
runner(1024, 64, 32 * 1024);
runner(1024, 64, 16 * 1024);
runner(1024, 64, 1 * 1024);
std::thread::sleep(Duration::from_millis(100));
}
fn _test_merge_submit<
S: crossfire::BlockingTxTrait<Box<IOEvent<WaitGroupGuard<()>>>> + Clone + Send + 'static,
>(
fd: RawFd, sender: S, io_size: usize, batch_num: usize, merge_size_limit: usize,
current_cb: Arc<
Mutex<Option<Box<dyn Fn(i64, Result<Option<Buffer>, rustix::io::Errno>) + Send>>>,
>,
) {
println!("test_merged_submit {} {} {}", io_size, batch_num, merge_size_limit);
let mut m_write = MergeSubmitter::<WaitGroupGuard<()>, _>::new(
fd,
sender.clone(),
merge_size_limit,
IOAction::Write,
);
let mut buf_all = Vec::<u8>::with_capacity(batch_num * io_size);
buf_all.resize_with(batch_num * io_size, || fastrand::u8(..));
let md51 = md5::compute(&buf_all);
println!("buf all md5 {:?}", md51);
let wg = WaitGroup::new((), 0);
*current_cb.lock().unwrap() = None;
for i in (0..batch_num / 2).step_by(2) {
let mut buf = Buffer::aligned(io_size as i32).unwrap();
buf.copy_from(0, &buf_all[i * io_size..(i + 1) * io_size]);
let mut event = IOEvent::new(fd, buf, IOAction::Write, (i * io_size) as i64);
event.set_args(wg.add_guard());
m_write.add_event(event).expect("add_event");
}
for i in batch_num / 2..batch_num {
let mut buf = Buffer::aligned(io_size as i32).unwrap();
buf.copy_from(0, &buf_all[i * io_size..(i + 1) * io_size]);
let mut event = IOEvent::new(fd, buf, IOAction::Write, (i * io_size) as i64);
event.set_args(wg.add_guard());
m_write.add_event(event).expect("add_event");
}
for i in (1..(batch_num / 2 + 1)).step_by(2) {
let mut buf = Buffer::aligned(io_size as i32).unwrap();
buf.copy_from(0, &buf_all[i * io_size..(i + 1) * io_size]);
let mut event = IOEvent::new(fd, buf, IOAction::Write, (i * io_size) as i64);
event.set_args(wg.add_guard());
m_write.add_event(event).expect("add_event");
}
m_write.flush().expect("flush");
wg.wait();
println!("written");
let read_buf = Arc::new(Mutex::new(Buffer::aligned((batch_num * io_size) as i32).unwrap()));
let mut m_read = MergeSubmitter::<WaitGroupGuard<()>, _>::new(
fd,
sender.clone(),
merge_size_limit,
IOAction::Read,
);
{
let _read_buf = read_buf.clone();
*current_cb.lock().unwrap() = Some(Box::new(move |offset, res| {
let mut _buf_all = _read_buf.lock().unwrap();
match res {
Ok(Some(buffer)) => {
_buf_all.copy_from(offset as usize, buffer.as_ref());
}
Ok(None) => {}
Err(_e) => {
panic!("read error: {}", _e);
}
}
}));
}
for i in (0..batch_num / 2).step_by(2) {
let buf = Buffer::aligned(io_size as i32).unwrap();
let mut event = IOEvent::new(fd, buf, IOAction::Read, (i * io_size) as i64);
event.set_args(wg.add_guard());
m_read.add_event(event).expect("add_event");
}
for i in batch_num / 2..batch_num {
let buf = Buffer::aligned(io_size as i32).unwrap();
let mut event = IOEvent::new(fd, buf, IOAction::Read, (i * io_size) as i64);
event.set_args(wg.add_guard());
m_read.add_event(event).expect("add_event");
}
for i in (1..(batch_num / 2 + 1)).step_by(2) {
let buf = Buffer::aligned(io_size as i32).unwrap();
let mut event = IOEvent::new(fd, buf, IOAction::Read, (i * io_size) as i64);
event.set_args(wg.add_guard());
m_read.add_event(event).expect("add_event");
}
m_read.flush().expect("flush");
wg.wait();
assert_eq!(md51, md5::compute(read_buf.lock().unwrap().as_ref()));
*current_cb.lock().unwrap() = None;
}
#[test]
fn test_event_merge_buffer_logic() {
setup_log();
let merge_size_limit = 4 * 1024;
let mut buffer = crate::merge::MergeBuffer::<()>::new(merge_size_limit);
let fd = 100;
assert!(buffer.flush(fd, IOAction::Write).is_none());
assert_eq!(buffer.len(), 0);
let event1 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 0);
let event1_clone: IOEvent<()> =
IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 0);
assert!(buffer.may_add_event(&event1));
buffer.push_event(event1);
assert_eq!(buffer.len(), 1);
let single_event_opt = buffer.flush(fd, IOAction::Write);
assert!(single_event_opt.is_some());
let single_event = single_event_opt.unwrap();
assert_eq!(single_event.offset, event1_clone.offset);
assert_eq!(single_event.get_size(), event1_clone.get_size());
assert_eq!(buffer.len(), 0);
let event2 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 0);
buffer.push_event(event2);
let event3 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 1024);
buffer.push_event(event3);
assert_eq!(buffer.len(), 2);
let event4 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 3072);
assert!(!buffer.may_add_event(&event4));
assert_eq!(buffer.len(), 2);
let merged_event_opt = buffer.flush(fd, IOAction::Write);
assert!(merged_event_opt.is_some());
let merged_event = merged_event_opt.unwrap();
assert_eq!(merged_event.offset, 0);
assert_eq!(merged_event.get_size(), 2048);
assert_eq!(buffer.len(), 0);
assert!(buffer.may_add_event(&event4));
buffer.push_event(event4);
assert_eq!(buffer.len(), 1);
let event5 = IOEvent::new(fd, Buffer::aligned(3072).unwrap(), IOAction::Write, 4096);
assert!(buffer.may_add_event(&event5));
let is_full = buffer.push_event(event5);
assert!(is_full);
assert_eq!(buffer.len(), 2);
let merged_event_opt_2 = buffer.flush(fd, IOAction::Write);
assert!(merged_event_opt_2.is_some());
let merged_event_2 = merged_event_opt_2.unwrap();
assert_eq!(merged_event_2.offset, 3072);
assert_eq!(merged_event_2.get_size(), 4096);
assert_eq!(buffer.len(), 0);
}