io_engine/
lib.rs

1//! # IO Engine
2//!
3//! A high-performance asynchronous IO library for Linux, masking `AIO` and `io_uring`
4//! interfaces behind a unified API.
5//!
6//! ## Architecture
7//!
8//! Key components:
9//! - [IOContext`]: The main driver entry point. Manages submission and completion of IO events.
10//! - [IOEvent](crate::tasks::IOEvent): Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
11//! - [IOCallback`]: Trait for handling completion. `ClosureCb` is provided for closure-based callbacks.
12//! - [IOWorkers]: Worker threads handling completions.
13//! - **IO Merging**: The engine supports merging sequential IO requests to reduce system call overhead. See the [`merge`] module for details.
14//!
15//! ## Callbacks
16//!
17//! The engine supports flexible callback mechanisms. You can use either closures or custom structs implementing the [IOCallback](crate::tasks::IOCallback) trait.
18//!
19//! ### Closure Callback
20//!
21//! Use `ClosureCb` to wrap a closure. This is convenient for simple logic or one-off tasks.
22//! The closure takes ownership of the completed `IOEvent`.
23//!
24//! ### Struct Callback
25//!
26//! For more complex state management or to avoid allocation overhead of `Box<dyn Fn...>`,
27//! you can define your own struct and implement `IOCallback`.
28//! For multiple types of callback, you can use enum.
29//!
30//! ```rust
31//! use io_engine::tasks::{IOCallback, IOEvent};
32//!
33//! struct MyCallback {
34//!     id: u64,
35//! }
36//!
37//! impl IOCallback for MyCallback {
38//!     fn call(self, event: IOEvent<Self>) {
39//!         if event.is_done() {
40//!             println!("Operation {} completed, result len: {}", self.id, event.get_size());
41//!         }
42//!     }
43//! }
44//! ```
45//!
46//! ## Short Read/Write Handling
47//!
48//! The engine supports transparent handling of short reads and writes (partial IO).
49//! This is achieved through the `IOEvent` structure which tracks the progress of the operation.
50//!
51//! - The `res` field in `IOEvent` (initialized to `i32::MIN`) stores the accumulated bytes transferred
52//!   when `res >= 0`.
53//! - When a driver (`aio` or `uring`) processes an event, it checks if `res >= 0`.
54//! - If true, it treats the event as a continuation (retry) and adjusts the buffer pointer,
55//!   length, and file offset based on the bytes already transferred.
56//! - This allows the upper layers or callback mechanisms to re-submit incomplete events
57//!   without manually slicing buffers or updating offsets.
58//!
59//! ## Usage Example (io_uring)
60//!
61//! ```rust
62//! use io_engine::callback_worker::IOWorkers;
63//! use io_engine::{IOContext, Driver};
64//! use io_engine::tasks::{ClosureCb, IOAction, IOEvent};
65//! use io_buffer::Buffer;
66//! use std::fs::OpenOptions;
67//! use std::os::fd::AsRawFd;
68//! use crossfire::oneshot;
69//!
70//! fn main() {
71//!     // 1. Prepare file
72//!     let file = OpenOptions::new()
73//!         .read(true)
74//!         .write(true)
75//!         .create(true)
76//!         .open("/tmp/test_io_engine.data")
77//!         .unwrap();
78//!     let fd = file.as_raw_fd();
79//!
80//!     // 2. Create channels for submission
81//!     // This channel is used to send events into the engine's submission queue
82//!     let (tx, rx) = crossfire::mpsc::bounded_blocking(128);
83//!
84//!     // 3. Create IOContext (io_uring)
85//!     // worker_num=1, depth=16
86//!     // This spawns the necessary driver threads.
87//!     let _ctx = IOContext::<ClosureCb, _>::new(
88//!         16,
89//!         rx,
90//!         &IOWorkers::new(1),
91//!         Driver::Uring
92//!     ).expect("Failed to create context");
93//!
94//!     // 4. Submit a Write
95//!     let mut buffer = Buffer::aligned(4096).unwrap();
96//!     buffer[0] = 65; // 'A'
97//!     let mut event = IOEvent::new(fd, buffer, IOAction::Write, 0);
98//!
99//!     // Create oneshot for this event's completion
100//!     let (done_tx, done_rx) = oneshot::oneshot();
101//!     event.set_callback(ClosureCb(Box::new(move |event| {
102//!         let _ = done_tx.send(event);
103//!     })));
104//!
105//!     // Send to engine
106//!     tx.send(event).expect("submit");
107//!
108//!     // 5. Wait for completion
109//!     let event = done_rx.recv().unwrap();
110//!     assert!(event.is_done());
111//!     event.get_write_result().expect("Write failed");
112//!
113//!     // 6. Submit a Read
114//!     let buffer = Buffer::aligned(4096).unwrap();
115//!     let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);
116//!
117//!     let (done_tx, done_rx) = oneshot::oneshot();
118//!     event.set_callback(ClosureCb(Box::new(move |event| {
119//!         let _ = done_tx.send(event);
120//!     })));
121//!
122//!     tx.send(event).expect("submit");
123//!
124//!     let mut event = done_rx.recv().unwrap();
125//!     let read_buf = event.get_read_result().expect("Read failed");
126//!     assert_eq!(read_buf.len(), 4096);
127//!     assert_eq!(read_buf[0], 65);
128//! }
129//! ```
130
131#[macro_use]
132extern crate log;
133#[macro_use]
134extern crate captains_log;
135
136pub mod callback_worker;
137mod context;
138pub use context::{Driver, IOContext};
139mod driver;
140pub mod merge;
141pub mod tasks;
142
143#[cfg(test)]
144extern crate rand;
145#[cfg(test)]
146mod test;