Skip to main content

Crate io_engine

Crate io_engine 

Source
Expand description

§IO Engine

A high-performance asynchronous IO library for Linux, masking AIO and io_uring interfaces behind a unified API.

§Architecture

Key components:

  • IOEvent:
    • Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
    • Because IOEvent is large (>=64B), you should submit Box<IOEvent<_>> through channel
  • CbArgs: Optional completion arguments along with IOEvent.
  • Worker: Trait for workers handling completions:
    • Inline closure InlineClosure
    • Inline function
    • Send the complete IOEvent through spsc, mpsc, mpmc channel sender
  • IO Merging: The engine supports merging sequential IO requests to reduce system call overhead. See the merge module for details.

§Callbacks

The engine supports flexible callback mechanisms. You may:

  • Capture some global arguments inside closure of callback workers
  • Pass arguments with IOEvent with IOEvent::set_args()

§Example (with WaitGroupGuard as CbArgs)

use io_engine::{InlineClosure, Driver, setup, IOAction, IOEvent};
use crossfire::{mpsc, waitgroup::{WaitGroup, WaitGroupGuard}};
use io_buffer::{Buffer, rand_buffer};
use rustix::io::Errno;

let (tx, rx) = mpsc::bounded_blocking(128);

// Use a shared closure that can be updated for different test phases
let worker = InlineClosure(Box::new(move |_guard: WaitGroupGuard<()>, offset, res| {}));
// WaitGroupGuard has impl CbArgs trait
setup::<WaitGroupGuard<()>, _, _>(128, rx, worker, Driver::Uring).unwrap();
let wg = WaitGroup::new((), 0);
let mut buf = Buffer::aligned(4096i32).unwrap();
rand_buffer(&mut buf);
let fd = todo!("init your fd");
let mut event = IOEvent::new(fd, buf, IOAction::Write, 0);
event.set_args(wg.add_guard());
let _ = tx.send(Box::new(event));

§Short Read/Write Handling

The engine supports transparent handling of short reads and writes (partial IO). When a read or write operation completes, the callback worker can use callback_unchecked to automatically adjust the buffer length to reflect the actual bytes transferred.

§How It Works

  • The IOEvent::callback method accepts a closure that allows you to detect if short I/O is due to reaching the file end (which is normal) or an actual error condition that requires retry.
  • Optionally, you can ignore all short I/O by calling IOEvent::callback_unchecked, which will adjust Buffer into the length actually read/write

§Example (TxOneshot as CbArgs and Short I/O handling)

use io_engine::{setup, Driver, IOAction, IOEvent, CbArgs};
use io_buffer::{Buffer, rand_buffer};
use std::fs::OpenOptions;
use std::os::fd::AsRawFd;
use std::thread;
use rustix::io::Errno;
use crossfire::{mpsc, oneshot, MTx};

struct OneshotArg (oneshot::TxOneshot<Result<Option<Buffer>, Errno>>);

impl CbArgs for OneshotArg {}

// 1. Prepare file
let file = OpenOptions::new()
    .read(true)
    .write(true)
    .create(true)
    .open("/tmp/test_io_engine.data")
    .unwrap();
let fd = file.as_raw_fd();


// 2. Create channels for submission
let (submit_tx, submit_rx) = mpsc::bounded_blocking(128);

// 3. Setup the driver (io_uring) with a channel worker
// This example uses a channel to wait for completion.
let (done_tx, done_rx) = crossfire::mpsc::bounded_blocking(128);
/// By default `MTx<_>` has impl Worker trait
setup::<OneshotArg, _, _>(
    16,
    submit_rx,
    done_tx, // Sender implements Worker
    Driver::Uring
).expect("Failed to setup driver");

// example for short-io handling
let weak_tx = submit_tx.downgrade();
let file_size = 4096;

// spawn callback worker
let th = thread::spawn(move || {
    let cb = | done_tx: OneshotArg, _offset, res| {
        done_tx.0.send(res);
    };
    while let Ok(event) = done_rx.recv() {
        if let Err(short_event) = event.callback(
            // for simplicity, we just demonstrate checking a static file bound here
            | offset | offset < file_size,
            &cb,
        ) {
            if let Some(submit_tx) = weak_tx.upgrade::<MTx<mpsc::Array<Box<IOEvent<OneshotArg>>>>>() {
                submit_tx.send(short_event).expect("resubmit");
            } else {
                // submit tx already dropped, does not matter.
                short_event.callback_unchecked(&cb);
            }
        }
    }
});

// 4. Submit a Write
let mut buf = Buffer::aligned(4096).unwrap();
rand_buffer(&mut buf);
let written_buf = buf.clone();
let mut event = IOEvent::new(fd, buf, IOAction::Write, 0);
let (task_tx, task_rx) = oneshot::oneshot();
event.set_args(OneshotArg(task_tx));

// Send to engine
submit_tx.send(Box::new(event)).expect("submit");

// Wait for completion
let res: Result<Option<Buffer>, Errno> = task_rx.recv().unwrap_or(Err(Errno::SHUTDOWN));
assert!(res.is_ok());

// 5. Submit a Read
let buffer = Buffer::aligned(4096).unwrap();
let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);
let (task_tx, task_rx) = oneshot::oneshot();
event.set_args(OneshotArg(task_tx));
submit_tx.send(Box::new(event)).expect("submit");

let res: Result<Option<Buffer>, Errno> = task_rx.recv().unwrap_or(Err(Errno::SHUTDOWN));
let read_buf = res.expect("ok").unwrap();
assert_eq!(read_buf.len(), 4096);
assert_eq!(&read_buf[..], &written_buf[..]);
drop(submit_tx);
th.join();
// callback worker exited

Modules§

merge
IO Merging

Structs§

IOEvent
InlineClosure
Example Inline worker that executes callbacks directly without spawning threads. Use this for very lightweight callback logic to avoid thread context switching overhead.

Enums§

Driver
IOAction

Traits§

CbArgs
Worker
A trait for workers that accept IO events.

Functions§

setup
Setup the submission of IO tasks to the underlying driver.