Crate io_engine

Crate io_engine 

Source
Expand description

§IO Engine

A high-performance asynchronous IO library for Linux, masking AIO and io_uring interfaces behind a unified API.

§Architecture

Key components:

  • IOContext`: The main driver entry point. Manages submission and completion of IO events.
  • IOEvent: Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
  • [IOCallback]: Trait for handling completion. ClosureCb` is provided for closure-based callbacks.
  • Worker: Trait for workers handling completions.
  • IOWorkers: Worker threads handling completions (implements Worker).
  • IO Merging: The engine supports merging sequential IO requests to reduce system call overhead. See the merge module for details.

§Callbacks

The engine supports flexible callback mechanisms. You can use either closures or custom structs implementing the IOCallback trait.

§Closure Callback

Use ClosureCb to wrap a closure. This is convenient for simple logic or one-off tasks. The closure takes ownership of the completed IOEvent.

§Struct Callback

For more complex state management or to avoid allocation overhead of Box<dyn Fn...>, you can define your own struct and implement IOCallback. For multiple types of callback, you can use enum.

use io_engine::tasks::{IOCallback, IOEvent};

struct MyCallback {
    id: u64,
}

impl IOCallback for MyCallback {
    fn call(self, event: IOEvent<Self>) {
        if event.is_done() {
            println!("Operation {} completed, result len: {}", self.id, event.get_size());
        }
    }
}

§Short Read/Write Handling

The engine supports transparent handling of short reads and writes (partial IO). This is achieved through the IOEvent structure which tracks the progress of the operation.

§How It Works

  • The res field in IOEvent (initialized to i32::MIN) stores the accumulated bytes transferred when res >= 0.
  • When a driver (aio or uring) processes an event, it checks if res >= 0.
  • If true, it treats the event as a continuation (retry) and adjusts the buffer pointer, length, and file offset based on the bytes already transferred.
  • This allows the upper layers or callback mechanisms to re-submit incomplete events without manually slicing buffers or updating offsets.

§Handling Short IO in Your Code

When you receive a completed IOEvent, you should:

  1. Call get_result() to check how many bytes were actually transferred
  2. Compare with the expected length to detect short IO
  3. If incomplete, create a new oneshot channel, set a new callback, and resubmit the same IOEvent
  4. The driver will automatically continue from where it left off

Important: Do NOT recreate the IOEvent for retries. Reuse the original event.

§Example: Handling Short Reads

use io_engine::tasks::{IOEvent, IOAction, ClosureCb};
use io_buffer::Buffer;
use crossfire::oneshot;
use crossfire::mpsc;
use std::os::fd::RawFd;

async fn read_full(
    fd: RawFd,
    offset: u64,
    buf: Buffer,
    queue_tx: &crossfire::MTx<mpsc::Array<IOEvent<ClosureCb>>>,
) -> Result<Buffer, String> {
    let total_len = buf.len();
    let (tx, mut rx) = oneshot::oneshot();

    // Submit initial read
    let mut event = IOEvent::new(fd, buf, IOAction::Read, offset as i64);
    event.set_callback(ClosureCb(Box::new(move |evt| {
        let _ = tx.send(evt);
    })));
    queue_tx.send(event).expect("submit");

    loop {
        let mut event = rx.await.map_err(|_| "Channel error")?;

        // Check result
        let n = event.get_result().map_err(|_| "IO error")?;

        if n >= total_len {
            // Complete
            let mut buf = event.get_read_result().map_err(|_| "Get buffer error")?;
            buf.set_len(n);
            return Ok(buf);
        }

        // Short read detected
        // NOTE: In production code, you should check if this is EOF by comparing
        // (offset + n) with the file size to distinguish between:
        // - EOF: reached end of file, return partial data
        // - Short read: temporary condition, retry needed
        // For example:
        // if offset + n >= file_size {
        //     // EOF - return what we have
        //     let mut buf = event.get_read_result()?;
        //     buf.set_len(n);
        //     return Ok(buf);
        // }

        // Short read but not EOF - retry with new oneshot channel
        let (tx, new_rx) = oneshot::oneshot();
        rx = new_rx;

        event.set_callback(ClosureCb(Box::new(move |evt| {
            let _ = tx.send(evt);
        })));
        queue_tx.send(event).expect("resubmit");
    }
}

§Usage Example (io_uring)

use io_engine::callback_worker::IOWorkers;
use io_engine::{IOContext, Driver};
use io_engine::tasks::{ClosureCb, IOAction, IOEvent};
use io_buffer::Buffer;
use std::fs::OpenOptions;
use std::os::fd::AsRawFd;
use crossfire::oneshot;

fn main() {
    // 1. Prepare file
    let file = OpenOptions::new()
        .read(true)
        .write(true)
        .create(true)
        .open("/tmp/test_io_engine.data")
        .unwrap();
    let fd = file.as_raw_fd();

    // 2. Create channels for submission
    // This channel is used to send events into the engine's submission queue
    let (tx, rx) = crossfire::mpsc::bounded_blocking(128);

    // 3. Create IOContext (io_uring)
    // worker_num=1, depth=16
    // This spawns the necessary driver threads.
    let _ctx = IOContext::<ClosureCb, _, _>::new(
        16,
        rx,
        IOWorkers::new(1),
        Driver::Uring
    ).expect("Failed to create context");

    // 4. Submit a Write
    let mut buffer = Buffer::aligned(4096).unwrap();
    buffer[0] = 65; // 'A'
    let mut event = IOEvent::new(fd, buffer, IOAction::Write, 0);

    // Create oneshot for this event's completion
    let (done_tx, done_rx) = oneshot::oneshot();
    event.set_callback(ClosureCb(Box::new(move |event| {
        let _ = done_tx.send(event);
    })));

    // Send to engine
    tx.send(event).expect("submit");

    // 5. Wait for completion
    let event = done_rx.recv().unwrap();
    assert!(event.is_done());
    event.get_write_result().expect("Write failed");

    // 6. Submit a Read
    let buffer = Buffer::aligned(4096).unwrap();
    let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);

    let (done_tx, done_rx) = oneshot::oneshot();
    event.set_callback(ClosureCb(Box::new(move |event| {
        let _ = done_tx.send(event);
    })));

    tx.send(event).expect("submit");

    let mut event = done_rx.recv().unwrap();
    let read_buf = event.get_read_result().expect("Read failed");
    assert_eq!(read_buf.len(), 4096);
    assert_eq!(read_buf[0], 65);
}

Modules§

callback_worker
merge
IO Merging
tasks

Structs§

IOContext
IOContext manages the submission of IO tasks to the underlying driver.

Enums§

Driver