io-engine 0.10.0

Library for block-based IO, intend to mask AIO/io_uring underneath.
Documentation
use crate::callback_worker::InlineClosure;
use crate::context::{Driver, setup};
use crate::merge::MergeSubmitter;
use crate::tasks::{IOAction, IOEvent};
use std::os::fd::{AsRawFd, RawFd};
use std::{
    sync::{Arc, Mutex},
    time::Duration,
};

use crate::test::*;
use crossfire::waitgroup::{WaitGroup, WaitGroupGuard};
use io_buffer::Buffer;
use rstest::rstest;

#[rstest]
#[case(Driver::Aio)]
#[case(Driver::Uring)]
fn test_merged_submit(#[case] driver: Driver) {
    setup_log();
    let temp_file = make_temp_file();
    let owned_fd = create_temp_file(temp_file.as_ref());
    let fd = owned_fd.as_raw_fd();
    let (tx, rx) = crossfire::mpsc::bounded_blocking(128);

    // Use a shared closure that can be updated for different test phases
    type CbFn = Box<dyn Fn(i64, Result<Option<Buffer>, rustix::io::Errno>) + Send>;
    let current_cb = Arc::new(Mutex::new(None::<CbFn>));
    let current_cb_clone = current_cb.clone();

    let worker = InlineClosure(Box::new(move |_guard: WaitGroupGuard<()>, offset, res| {
        if let Some(cb) = current_cb_clone.lock().unwrap().as_ref() {
            cb(offset, res);
        }
    }));
    setup::<WaitGroupGuard<()>, _, _>(128, rx, worker, driver).unwrap();

    let runner = |io_size, batch_num, merge_size_limit| {
        _test_merge_submit(
            fd,
            tx.clone(),
            io_size,
            batch_num,
            merge_size_limit,
            current_cb.clone(),
        );
    };

    runner(1024, 1024, 16 * 1024);
    runner(1024, 512, 16 * 1024);
    runner(1024, 256, 16 * 1024);
    runner(1024, 64, 64 * 1024);
    runner(1024, 64, 32 * 1024);
    runner(1024, 64, 16 * 1024);
    runner(1024, 64, 1 * 1024);

    std::thread::sleep(Duration::from_millis(100));
}

fn _test_merge_submit<
    S: crossfire::BlockingTxTrait<Box<IOEvent<WaitGroupGuard<()>>>> + Clone + Send + 'static,
>(
    fd: RawFd, sender: S, io_size: usize, batch_num: usize, merge_size_limit: usize,
    current_cb: Arc<
        Mutex<Option<Box<dyn Fn(i64, Result<Option<Buffer>, rustix::io::Errno>) + Send>>>,
    >,
) {
    println!("test_merged_submit {} {} {}", io_size, batch_num, merge_size_limit);
    let mut m_write = MergeSubmitter::<WaitGroupGuard<()>, _>::new(
        fd,
        sender.clone(),
        merge_size_limit,
        IOAction::Write,
    );

    let mut buf_all = Vec::<u8>::with_capacity(batch_num * io_size);
    buf_all.resize_with(batch_num * io_size, || fastrand::u8(..));
    let md51 = md5::compute(&buf_all);
    println!("buf all md5 {:?}", md51);

    let wg = WaitGroup::new((), 0);

    // Set write callback
    *current_cb.lock().unwrap() = None;

    for i in (0..batch_num / 2).step_by(2) {
        let mut buf = Buffer::aligned(io_size as i32).unwrap();
        buf.copy_from(0, &buf_all[i * io_size..(i + 1) * io_size]);
        let mut event = IOEvent::new(fd, buf, IOAction::Write, (i * io_size) as i64);
        event.set_args(wg.add_guard());
        m_write.add_event(event).expect("add_event");
    }

    for i in batch_num / 2..batch_num {
        let mut buf = Buffer::aligned(io_size as i32).unwrap();
        buf.copy_from(0, &buf_all[i * io_size..(i + 1) * io_size]);
        let mut event = IOEvent::new(fd, buf, IOAction::Write, (i * io_size) as i64);
        event.set_args(wg.add_guard());
        m_write.add_event(event).expect("add_event");
    }

    for i in (1..(batch_num / 2 + 1)).step_by(2) {
        let mut buf = Buffer::aligned(io_size as i32).unwrap();
        buf.copy_from(0, &buf_all[i * io_size..(i + 1) * io_size]);
        let mut event = IOEvent::new(fd, buf, IOAction::Write, (i * io_size) as i64);
        event.set_args(wg.add_guard());
        m_write.add_event(event).expect("add_event");
    }
    m_write.flush().expect("flush");
    wg.wait();
    println!("written");

    let read_buf = Arc::new(Mutex::new(Buffer::aligned((batch_num * io_size) as i32).unwrap()));
    let mut m_read = MergeSubmitter::<WaitGroupGuard<()>, _>::new(
        fd,
        sender.clone(),
        merge_size_limit,
        IOAction::Read,
    );

    // Set read callback
    {
        let _read_buf = read_buf.clone();
        *current_cb.lock().unwrap() = Some(Box::new(move |offset, res| {
            let mut _buf_all = _read_buf.lock().unwrap();
            match res {
                Ok(Some(buffer)) => {
                    _buf_all.copy_from(offset as usize, buffer.as_ref());
                }
                Ok(None) => {}
                Err(_e) => {
                    panic!("read error: {}", _e);
                }
            }
        }));
    }

    for i in (0..batch_num / 2).step_by(2) {
        let buf = Buffer::aligned(io_size as i32).unwrap();
        let mut event = IOEvent::new(fd, buf, IOAction::Read, (i * io_size) as i64);
        event.set_args(wg.add_guard());
        m_read.add_event(event).expect("add_event");
    }

    for i in batch_num / 2..batch_num {
        let buf = Buffer::aligned(io_size as i32).unwrap();
        let mut event = IOEvent::new(fd, buf, IOAction::Read, (i * io_size) as i64);
        event.set_args(wg.add_guard());
        m_read.add_event(event).expect("add_event");
    }

    for i in (1..(batch_num / 2 + 1)).step_by(2) {
        let buf = Buffer::aligned(io_size as i32).unwrap();
        let mut event = IOEvent::new(fd, buf, IOAction::Read, (i * io_size) as i64);
        event.set_args(wg.add_guard());
        m_read.add_event(event).expect("add_event");
    }
    m_read.flush().expect("flush");
    wg.wait();

    assert_eq!(md51, md5::compute(read_buf.lock().unwrap().as_ref()));

    // Clear callback
    *current_cb.lock().unwrap() = None;
}

#[test]
fn test_event_merge_buffer_logic() {
    setup_log();
    let merge_size_limit = 4 * 1024;
    let mut buffer = crate::merge::MergeBuffer::<()>::new(merge_size_limit);

    let fd = 100; // Dummy fd

    // --- Test empty flush ---
    assert!(buffer.flush(fd, IOAction::Write).is_none());
    assert_eq!(buffer.len(), 0);

    // --- Scenario 1: Add a single event ---
    let event1 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 0);
    // clone for later comparison
    let event1_clone: IOEvent<()> =
        IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 0);
    assert!(buffer.may_add_event(&event1));
    buffer.push_event(event1);
    assert_eq!(buffer.len(), 1);

    // Flush single event
    let single_event_opt = buffer.flush(fd, IOAction::Write);
    assert!(single_event_opt.is_some());
    let single_event = single_event_opt.unwrap();
    assert_eq!(single_event.offset, event1_clone.offset);
    assert_eq!(single_event.get_size(), event1_clone.get_size());
    assert_eq!(buffer.len(), 0);

    // --- Scenario 2: Add contiguous events ---
    // Event 2: offset 0, size 1KB
    let event2 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 0);
    buffer.push_event(event2);

    // Event 3: offset 1KB, size 1KB (contiguous)

    let event3 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 1024);
    buffer.push_event(event3);
    assert_eq!(buffer.len(), 2);

    // --- Scenario 3: Add non-contiguous event ---
    // Event 4: offset 3KB, size 1KB (non-contiguous)
    let event4 = IOEvent::new(fd, Buffer::aligned(1024).unwrap(), IOAction::Write, 3072);
    assert!(!buffer.may_add_event(&event4));

    // Now, flush the buffered events and check them
    assert_eq!(buffer.len(), 2);
    let merged_event_opt = buffer.flush(fd, IOAction::Write);
    assert!(merged_event_opt.is_some());
    let merged_event = merged_event_opt.unwrap();
    assert_eq!(merged_event.offset, 0);
    assert_eq!(merged_event.get_size(), 2048);
    assert_eq!(buffer.len(), 0);

    // The buffer is now empty. We can add event4.
    assert!(buffer.may_add_event(&event4));
    buffer.push_event(event4);
    assert_eq!(buffer.len(), 1);

    // --- Scenario 4: Add event that makes buffer full ---
    // Event 5: offset 4096, size 3072. Contiguous with event4 (offset 3072, size 1024).
    // Total size would be 1024 + 3072 = 4096, which is exactly merge_size_limit
    let event5 = IOEvent::new(fd, Buffer::aligned(3072).unwrap(), IOAction::Write, 4096);

    assert!(buffer.may_add_event(&event5));
    let is_full = buffer.push_event(event5);
    assert!(is_full);
    assert_eq!(buffer.len(), 2);

    let merged_event_opt_2 = buffer.flush(fd, IOAction::Write);
    assert!(merged_event_opt_2.is_some());
    let merged_event_2 = merged_event_opt_2.unwrap();
    assert_eq!(merged_event_2.offset, 3072);
    assert_eq!(merged_event_2.get_size(), 4096);
    assert_eq!(buffer.len(), 0);
}