io_engine/
lib.rs

1//! # IO Engine
2//!
3//! A high-performance asynchronous IO library for Linux, masking `AIO` and `io_uring`
4//! interfaces behind a unified API.
5//!
6//! ## Architecture
7//!
8//! Key components:
9//! - [IOContext`]: The main driver entry point. Manages submission and completion of IO events.
10//! - [IOEvent](crate::tasks::IOEvent): Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
11//! - [IOCallback`]: Trait for handling completion. `ClosureCb` is provided for closure-based callbacks.
12//! - [Worker](crate::callback_worker::Worker): Trait for workers handling completions.
13//! - [IOWorkers](crate::callback_worker::IOWorkers): Worker threads handling completions (implements `Worker`).
14//! - **IO Merging**: The engine supports merging sequential IO requests to reduce system call overhead. See the [`merge`] module for details.
15//!
16//! ## Callbacks
17//!
18//! The engine supports flexible callback mechanisms. You can use either closures or custom structs implementing the [IOCallback](crate::tasks::IOCallback) trait.
19//!
20//! ### Closure Callback
21//!
22//! Use `ClosureCb` to wrap a closure. This is convenient for simple logic or one-off tasks.
23//! The closure takes ownership of the completed `IOEvent`.
24//!
25//! ### Struct Callback
26//!
27//! For more complex state management or to avoid allocation overhead of `Box<dyn Fn...>`,
28//! you can define your own struct and implement `IOCallback`.
29//! For multiple types of callback, you can use enum.
30//!
31//! ```rust
32//! use io_engine::tasks::{IOCallback, IOEvent};
33//!
34//! struct MyCallback {
35//!     id: u64,
36//! }
37//!
38//! impl IOCallback for MyCallback {
39//!     fn call(self, event: IOEvent<Self>) {
40//!         if event.is_done() {
41//!             println!("Operation {} completed, result len: {}", self.id, event.get_size());
42//!         }
43//!     }
44//! }
45//! ```
46//!
47//! ## Short Read/Write Handling
48//!
49//! The engine supports transparent handling of short reads and writes (partial IO).
50//! This is achieved through the `IOEvent` structure which tracks the progress of the operation.
51//!
52//! ### How It Works
53//!
54//! - The `res` field in `IOEvent` (initialized to `i32::MIN`) stores the accumulated bytes transferred
55//!   when `res >= 0`.
56//! - When a driver (`aio` or `uring`) processes an event, it checks if `res >= 0`.
57//! - If true, it treats the event as a continuation (retry) and adjusts the buffer pointer,
58//!   length, and file offset based on the bytes already transferred.
59//! - This allows the upper layers or callback mechanisms to re-submit incomplete events
60//!   without manually slicing buffers or updating offsets.
61//!
62//! ### Handling Short IO in Your Code
63//!
64//! When you receive a completed `IOEvent`, you should:
65//!
66//! 1. Call `get_result()` to check how many bytes were actually transferred
67//! 2. Compare with the expected length to detect short IO
68//! 3. If incomplete, create a new oneshot channel, set a new callback, and resubmit the **same** `IOEvent`
69//! 4. The driver will automatically continue from where it left off
70//!
71//! **Important:** Do NOT recreate the `IOEvent` for retries. Reuse the original event.
72//!
73//! ### Example: Handling Short Reads
74//!
75//! ```rust,no_run
76//! use io_engine::tasks::{IOEvent, IOAction, ClosureCb};
77//! use io_buffer::Buffer;
78//! use crossfire::oneshot;
79//! use crossfire::mpsc;
80//! use std::os::fd::RawFd;
81//!
82//! async fn read_full(
83//!     fd: RawFd,
84//!     offset: u64,
85//!     buf: Buffer,
86//!     queue_tx: &crossfire::MTx<mpsc::Array<IOEvent<ClosureCb>>>,
87//! ) -> Result<Buffer, String> {
88//!     let total_len = buf.len();
89//!     let (tx, mut rx) = oneshot::oneshot();
90//!
91//!     // Submit initial read
92//!     let mut event = IOEvent::new(fd, buf, IOAction::Read, offset as i64);
93//!     event.set_callback(ClosureCb(Box::new(move |evt| {
94//!         let _ = tx.send(evt);
95//!     })));
96//!     queue_tx.send(event).expect("submit");
97//!
98//!     loop {
99//!         let mut event = rx.await.map_err(|_| "Channel error")?;
100//!
101//!         // Check result
102//!         let n = event.get_result().map_err(|_| "IO error")?;
103//!
104//!         if n >= total_len {
105//!             // Complete
106//!             let mut buf = event.get_read_result().map_err(|_| "Get buffer error")?;
107//!             buf.set_len(n);
108//!             return Ok(buf);
109//!         }
110//!
111//!         // Short read detected
112//!         // NOTE: In production code, you should check if this is EOF by comparing
113//!         // (offset + n) with the file size to distinguish between:
114//!         // - EOF: reached end of file, return partial data
115//!         // - Short read: temporary condition, retry needed
116//!         // For example:
117//!         // if offset + n >= file_size {
118//!         //     // EOF - return what we have
119//!         //     let mut buf = event.get_read_result()?;
120//!         //     buf.set_len(n);
121//!         //     return Ok(buf);
122//!         // }
123//!
124//!         // Short read but not EOF - retry with new oneshot channel
125//!         let (tx, new_rx) = oneshot::oneshot();
126//!         rx = new_rx;
127//!
128//!         event.set_callback(ClosureCb(Box::new(move |evt| {
129//!             let _ = tx.send(evt);
130//!         })));
131//!         queue_tx.send(event).expect("resubmit");
132//!     }
133//! }
134//! ```
135//!
136//! ## Usage Example (io_uring)
137//!
138//! ```rust
139//! use io_engine::callback_worker::IOWorkers;
140//! use io_engine::{IOContext, Driver};
141//! use io_engine::tasks::{ClosureCb, IOAction, IOEvent};
142//! use io_buffer::Buffer;
143//! use std::fs::OpenOptions;
144//! use std::os::fd::AsRawFd;
145//! use crossfire::oneshot;
146//!
147//! fn main() {
148//!     // 1. Prepare file
149//!     let file = OpenOptions::new()
150//!         .read(true)
151//!         .write(true)
152//!         .create(true)
153//!         .open("/tmp/test_io_engine.data")
154//!         .unwrap();
155//!     let fd = file.as_raw_fd();
156//!
157//!     // 2. Create channels for submission
158//!     // This channel is used to send events into the engine's submission queue
159//!     let (tx, rx) = crossfire::mpsc::bounded_blocking(128);
160//!
161//!     // 3. Create IOContext (io_uring)
162//!     // worker_num=1, depth=16
163//!     // This spawns the necessary driver threads.
164//!     let _ctx = IOContext::<ClosureCb, _, _>::new(
165//!         16,
166//!         rx,
167//!         IOWorkers::new(1),
168//!         Driver::Uring
169//!     ).expect("Failed to create context");
170//!
171//!     // 4. Submit a Write
172//!     let mut buffer = Buffer::aligned(4096).unwrap();
173//!     buffer[0] = 65; // 'A'
174//!     let mut event = IOEvent::new(fd, buffer, IOAction::Write, 0);
175//!
176//!     // Create oneshot for this event's completion
177//!     let (done_tx, done_rx) = oneshot::oneshot();
178//!     event.set_callback(ClosureCb(Box::new(move |event| {
179//!         let _ = done_tx.send(event);
180//!     })));
181//!
182//!     // Send to engine
183//!     tx.send(event).expect("submit");
184//!
185//!     // 5. Wait for completion
186//!     let event = done_rx.recv().unwrap();
187//!     assert!(event.is_done());
188//!     event.get_write_result().expect("Write failed");
189//!
190//!     // 6. Submit a Read
191//!     let buffer = Buffer::aligned(4096).unwrap();
192//!     let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);
193//!
194//!     let (done_tx, done_rx) = oneshot::oneshot();
195//!     event.set_callback(ClosureCb(Box::new(move |event| {
196//!         let _ = done_tx.send(event);
197//!     })));
198//!
199//!     tx.send(event).expect("submit");
200//!
201//!     let mut event = done_rx.recv().unwrap();
202//!     let read_buf = event.get_read_result().expect("Read failed");
203//!     assert_eq!(read_buf.len(), 4096);
204//!     assert_eq!(read_buf[0], 65);
205//! }
206//! ```
207
208#[macro_use]
209extern crate log;
210#[macro_use]
211extern crate captains_log;
212
213pub mod callback_worker;
214mod context;
215pub use context::{Driver, IOContext};
216mod driver;
217pub mod merge;
218pub mod tasks;
219
220#[cfg(test)]
221extern crate rand;
222#[cfg(test)]
223mod test;