io_engine/lib.rs
1//! # IO Engine
2//!
3//! A high-performance asynchronous IO library for Linux, masking `AIO` and `io_uring`
4//! interfaces behind a unified API.
5//!
6//! ## Architecture
7//!
8//! Key components:
9//! - [IOContext`]: The main driver entry point. Manages submission and completion of IO events.
10//! - [IOEvent](crate::tasks::IOEvent): Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
11//! - [IOCallback`]: Trait for handling completion. `ClosureCb` is provided for closure-based callbacks.
12//! - [Worker](crate::callback_worker::Worker): Trait for workers handling completions.
13//! - [IOWorkers](crate::callback_worker::IOWorkers): Worker threads handling completions (implements `Worker`).
14//! - **IO Merging**: The engine supports merging sequential IO requests to reduce system call overhead. See the [`merge`] module for details.
15//!
16//! ## Callbacks
17//!
18//! The engine supports flexible callback mechanisms. You can use either closures or custom structs implementing the [IOCallback](crate::tasks::IOCallback) trait.
19//!
20//! ### Closure Callback
21//!
22//! Use `ClosureCb` to wrap a closure. This is convenient for simple logic or one-off tasks.
23//! The closure takes ownership of the completed `IOEvent`.
24//!
25//! ### Struct Callback
26//!
27//! For more complex state management or to avoid allocation overhead of `Box<dyn Fn...>`,
28//! you can define your own struct and implement `IOCallback`.
29//! For multiple types of callback, you can use enum.
30//!
31//! ```rust
32//! use io_engine::tasks::{IOCallback, IOEvent};
33//!
34//! struct MyCallback {
35//! id: u64,
36//! }
37//!
38//! impl IOCallback for MyCallback {
39//! fn call(self, event: IOEvent<Self>) {
40//! if event.is_done() {
41//! println!("Operation {} completed, result len: {}", self.id, event.get_size());
42//! }
43//! }
44//! }
45//! ```
46//!
47//! ## Short Read/Write Handling
48//!
49//! The engine supports transparent handling of short reads and writes (partial IO).
50//! This is achieved through the `IOEvent` structure which tracks the progress of the operation.
51//!
52//! ### How It Works
53//!
54//! - The `res` field in `IOEvent` (initialized to `i32::MIN`) stores the accumulated bytes transferred
55//! when `res >= 0`.
56//! - When a driver (`aio` or `uring`) processes an event, it checks if `res >= 0`.
57//! - If true, it treats the event as a continuation (retry) and adjusts the buffer pointer,
58//! length, and file offset based on the bytes already transferred.
59//! - This allows the upper layers or callback mechanisms to re-submit incomplete events
60//! without manually slicing buffers or updating offsets.
61//!
62//! ### Handling Short IO in Your Code
63//!
64//! When you receive a completed `IOEvent`, you should:
65//!
66//! 1. Call `get_result()` to check how many bytes were actually transferred
67//! 2. Compare with the expected length to detect short IO
68//! 3. If incomplete, create a new oneshot channel, set a new callback, and resubmit the **same** `IOEvent`
69//! 4. The driver will automatically continue from where it left off
70//!
71//! **Important:** Do NOT recreate the `IOEvent` for retries. Reuse the original event.
72//!
73//! ### Example: Handling Short Reads
74//!
75//! ```rust,no_run
76//! use io_engine::tasks::{IOEvent, IOAction, ClosureCb};
77//! use io_buffer::Buffer;
78//! use crossfire::oneshot;
79//! use crossfire::mpsc;
80//! use std::os::fd::RawFd;
81//!
82//! async fn read_full(
83//! fd: RawFd,
84//! offset: u64,
85//! buf: Buffer,
86//! queue_tx: &crossfire::MTx<mpsc::Array<IOEvent<ClosureCb>>>,
87//! ) -> Result<Buffer, String> {
88//! let total_len = buf.len();
89//! let (tx, mut rx) = oneshot::oneshot();
90//!
91//! // Submit initial read
92//! let mut event = IOEvent::new(fd, buf, IOAction::Read, offset as i64);
93//! event.set_callback(ClosureCb(Box::new(move |evt| {
94//! let _ = tx.send(evt);
95//! })));
96//! queue_tx.send(event).expect("submit");
97//!
98//! loop {
99//! let mut event = rx.await.map_err(|_| "Channel error")?;
100//!
101//! // Check result
102//! let n = event.get_result().map_err(|_| "IO error")?;
103//!
104//! if n >= total_len {
105//! // Complete
106//! let mut buf = event.get_read_result().map_err(|_| "Get buffer error")?;
107//! buf.set_len(n);
108//! return Ok(buf);
109//! }
110//!
111//! // Short read detected
112//! // NOTE: In production code, you should check if this is EOF by comparing
113//! // (offset + n) with the file size to distinguish between:
114//! // - EOF: reached end of file, return partial data
115//! // - Short read: temporary condition, retry needed
116//! // For example:
117//! // if offset + n >= file_size {
118//! // // EOF - return what we have
119//! // let mut buf = event.get_read_result()?;
120//! // buf.set_len(n);
121//! // return Ok(buf);
122//! // }
123//!
124//! // Short read but not EOF - retry with new oneshot channel
125//! let (tx, new_rx) = oneshot::oneshot();
126//! rx = new_rx;
127//!
128//! event.set_callback(ClosureCb(Box::new(move |evt| {
129//! let _ = tx.send(evt);
130//! })));
131//! queue_tx.send(event).expect("resubmit");
132//! }
133//! }
134//! ```
135//!
136//! ## Usage Example (io_uring)
137//!
138//! ```rust
139//! use io_engine::callback_worker::IOWorkers;
140//! use io_engine::{IOContext, Driver};
141//! use io_engine::tasks::{ClosureCb, IOAction, IOEvent};
142//! use io_buffer::Buffer;
143//! use std::fs::OpenOptions;
144//! use std::os::fd::AsRawFd;
145//! use crossfire::oneshot;
146//!
147//! fn main() {
148//! // 1. Prepare file
149//! let file = OpenOptions::new()
150//! .read(true)
151//! .write(true)
152//! .create(true)
153//! .open("/tmp/test_io_engine.data")
154//! .unwrap();
155//! let fd = file.as_raw_fd();
156//!
157//! // 2. Create channels for submission
158//! // This channel is used to send events into the engine's submission queue
159//! let (tx, rx) = crossfire::mpsc::bounded_blocking(128);
160//!
161//! // 3. Create IOContext (io_uring)
162//! // worker_num=1, depth=16
163//! // This spawns the necessary driver threads.
164//! let _ctx = IOContext::<ClosureCb, _, _>::new(
165//! 16,
166//! rx,
167//! IOWorkers::new(1),
168//! Driver::Uring
169//! ).expect("Failed to create context");
170//!
171//! // 4. Submit a Write
172//! let mut buffer = Buffer::aligned(4096).unwrap();
173//! buffer[0] = 65; // 'A'
174//! let mut event = IOEvent::new(fd, buffer, IOAction::Write, 0);
175//!
176//! // Create oneshot for this event's completion
177//! let (done_tx, done_rx) = oneshot::oneshot();
178//! event.set_callback(ClosureCb(Box::new(move |event| {
179//! let _ = done_tx.send(event);
180//! })));
181//!
182//! // Send to engine
183//! tx.send(event).expect("submit");
184//!
185//! // 5. Wait for completion
186//! let event = done_rx.recv().unwrap();
187//! assert!(event.is_done());
188//! event.get_write_result().expect("Write failed");
189//!
190//! // 6. Submit a Read
191//! let buffer = Buffer::aligned(4096).unwrap();
192//! let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);
193//!
194//! let (done_tx, done_rx) = oneshot::oneshot();
195//! event.set_callback(ClosureCb(Box::new(move |event| {
196//! let _ = done_tx.send(event);
197//! })));
198//!
199//! tx.send(event).expect("submit");
200//!
201//! let mut event = done_rx.recv().unwrap();
202//! let read_buf = event.get_read_result().expect("Read failed");
203//! assert_eq!(read_buf.len(), 4096);
204//! assert_eq!(read_buf[0], 65);
205//! }
206//! ```
207
208#[macro_use]
209extern crate log;
210#[macro_use]
211extern crate captains_log;
212
213pub mod callback_worker;
214mod context;
215pub use context::{Driver, IOContext};
216mod driver;
217pub mod merge;
218pub mod tasks;
219
220#[cfg(test)]
221extern crate rand;
222#[cfg(test)]
223mod test;