io_engine/lib.rs
1//! # IO Engine
2//!
3//! A high-performance asynchronous IO library for Linux, masking `AIO` and `io_uring`
4//! interfaces behind a unified API.
5//!
6//! ## Architecture
7//!
8//! Key components:
9//! - [IOEvent](crate::tasks::IOEvent): Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
10//! - [IOCallback](crate::tasks::IOCallback): Trait for handling completion. `ClosureCb` is provided for closure-based callbacks.
11//! - [Worker](crate::callback_worker::Worker): Trait for workers handling completions.
12//! - [IOWorkers](crate::callback_worker::IOWorkers): Worker threads handling completions (implements `Worker`).
13//! - **IO Merging**: The engine supports merging sequential IO requests to reduce system call overhead. See the [`merge`] module for details.
14//!
15//! ## Callbacks
16//!
17//! The engine supports flexible callback mechanisms. You can use either closures or custom structs implementing the [IOCallback](crate::tasks::IOCallback) trait.
18//!
19//! ### Closure Callback
20//!
21//! Use `ClosureCb` to wrap a closure. This is convenient for simple logic or one-off tasks.
22//! The closure takes ownership of the completed `IOEvent`.
23//!
24//! ### Struct Callback
25//!
26//! For more complex state management or to avoid allocation overhead of `Box<dyn Fn...>`,
27//! you can define your own struct and implement `IOCallback`.
28//! For multiple types of callback, you can use enum.
29//!
30//! ```rust
31//! use io_engine::tasks::{IOCallback, IOEvent};
32//! use nix::errno::Errno;
33//!
34//! struct MyCallback {
35//! id: u64,
36//! }
37//!
38//! impl IOCallback for MyCallback {
39//! fn call(self, _offset: i64, res: Result<Option<io_buffer::Buffer>, Errno>) {
40//! match res {
41//! Ok(Some(buf)) => println!("Operation {} completed, buffer len: {}", self.id, buf.len()),
42//! Ok(None) => println!("Operation {} completed (no buffer)", self.id),
43//! Err(e) => println!("Operation {} failed, error: {}", self.id, e),
44//! }
45//! }
46//! }
47//! ```
48//!
49//! ## Short Read/Write Handling
50//!
51//! The engine supports transparent handling of short reads and writes (partial IO).
52//! When a read or write operation completes, the callback worker automatically adjusts
53//! the buffer length to reflect the actual bytes transferred.
54//!
55//! ### How It Works
56//!
57//! - When an IO operation completes, the `callback_unchecked` method adjusts `Buffer::len()`
58//! to match the actual bytes transferred (`res`).
59//! - For read operations, this means the buffer contains exactly the data that was read.
60//! - For write operations, the buffer length is also adjusted to reflect completion status.
61//!
62//! ### Callback Worker Implementation
63//!
64//! When implementing a custom callback worker, you should use `callback_unchecked` which detect
65//! short I/O and change reading Buffer length to exact copied bytes.
66//!
67//! ```rust,ignore
68//! // In your callback worker thread
69//! loop {
70//! match rx.recv() {
71//! Ok(event) => event.callback_unchecked(true),
72//! Err(_) => break,
73//! }
74//! }
75//! ```
76//!
77//! ### Advanced: Detecting Short I/O with File Boundary Check
78//!
79//! The `callback` method accepts a closure that allows you to detect if short I/O
80//! is due to reaching the file end (which is normal) or an actual error condition
81//! that requires retry:
82//!
83//! ```rust,ignore
84//! // check_short_read returns true if offset exceeds file end
85//! event.callback(|offset| {
86//! // NOTE: you should probably use weak reference here
87//! offset < file_size
88//! })
89//! .unwrap_or_else(|event| {
90//! // Short I/O detected, resubmit the event
91//! queue_tx.send(event).unwrap();
92//! });
93//! ```
94//!
95//! The closure receives the current offset and should return `true` if the offset
96//! exceeds the file boundary (indicating EOF). If the closure returns `true`,
97//! the short I/O is considered an error condition that may need retry.
98//!
99//! ## Usage Example (io_uring)
100//!
101//! ```rust
102//! use io_engine::callback_worker::IOWorkers;
103//! use io_engine::{setup, Driver};
104//! use io_engine::tasks::{ClosureCb, IOAction, IOEvent};
105//! use io_buffer::Buffer;
106//! use std::fs::OpenOptions;
107//! use std::os::fd::AsRawFd;
108//! use crossfire::oneshot;
109//!
110//! fn main() {
111//! // 1. Prepare file
112//! let file = OpenOptions::new()
113//! .read(true)
114//! .write(true)
115//! .create(true)
116//! .open("/tmp/test_io_engine.data")
117//! .unwrap();
118//! let fd = file.as_raw_fd();
119//!
120//! // 2. Create channels for submission
121//! // This channel is used to send events into the engine's submission queue
122//! let (tx, rx) = crossfire::mpsc::bounded_blocking(128);
123//!
124//! // 3. Setup the driver (io_uring)
125//! // worker_num=1, depth=16
126//! // This spawns the necessary driver threads.
127//! setup::<ClosureCb, _, _>(
128//! 16,
129//! rx,
130//! IOWorkers::new(1),
131//! Driver::Uring
132//! ).expect("Failed to setup driver");
133//!
134//! // 4. Submit a Write
135//! let mut buffer = Buffer::aligned(4096).unwrap();
136//! buffer[0] = 65; // 'A'
137//! let mut event = IOEvent::new(fd, buffer, IOAction::Write, 0);
138//!
139//! // Create oneshot for this event's completion
140//! let (done_tx, done_rx) = oneshot::oneshot();
141//! event.set_callback(ClosureCb(Box::new(move |_offset, res| {
142//! let _ = done_tx.send(res);
143//! })));
144//!
145//! // Send to engine
146//! tx.send(Box::new(event)).expect("submit");
147//!
148//! // 5. Wait for completion
149//! let res = done_rx.recv().unwrap();
150//! res.map_err(|_| "Write failed").expect("Write failed");
151//!
152//! // 6. Submit a Read
153//! let buffer = Buffer::aligned(4096).unwrap();
154//! let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);
155//!
156//! let (done_tx, done_rx) = oneshot::oneshot();
157//! event.set_callback(ClosureCb(Box::new(move |_offset, res| {
158//! let _ = done_tx.send(res);
159//! })));
160//!
161//! tx.send(Box::new(event)).expect("submit");
162//!
163//! let res = done_rx.recv().unwrap();
164//! let read_buf = res.expect("Read failed");
165//! assert!(read_buf.is_some());
166//! let read_buf = read_buf.unwrap();
167//! assert_eq!(read_buf.len(), 4096);
168//! assert_eq!(read_buf[0], 65);
169//! }
170//! ```
171
172#[macro_use]
173extern crate captains_log;
174
175pub mod callback_worker;
176mod context;
177pub use context::{Driver, setup};
178mod driver;
179pub mod merge;
180pub mod tasks;
181
182#[cfg(test)]
183mod test;