Skip to main content

io_engine/
lib.rs

1//! # IO Engine
2//!
3//! A high-performance asynchronous IO library for Linux, masking `AIO` and `io_uring`
4//! interfaces behind a unified API.
5//!
6//! ## Architecture
7//!
8//! Key components:
9//! - [IOEvent](crate::tasks::IOEvent): Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
10//! - [IOCallback](crate::tasks::IOCallback): Trait for handling completion. `ClosureCb` is provided for closure-based callbacks.
11//! - [Worker](crate::callback_worker::Worker): Trait for workers handling completions.
12//! - [IOWorkers](crate::callback_worker::IOWorkers): Worker threads handling completions (implements `Worker`).
13//! - **IO Merging**: The engine supports merging sequential IO requests to reduce system call overhead. See the [`merge`] module for details.
14//!
15//! ## Callbacks
16//!
17//! The engine supports flexible callback mechanisms. You can use either closures or custom structs implementing the [IOCallback](crate::tasks::IOCallback) trait.
18//!
19//! ### Closure Callback
20//!
21//! Use `ClosureCb` to wrap a closure. This is convenient for simple logic or one-off tasks.
22//! The closure takes ownership of the completed `IOEvent`.
23//!
24//! ### Struct Callback
25//!
26//! For more complex state management or to avoid allocation overhead of `Box<dyn Fn...>`,
27//! you can define your own struct and implement `IOCallback`.
28//! For multiple types of callback, you can use enum.
29//!
30//! ```rust
31//! use io_engine::tasks::{IOCallback, IOEvent};
32//! use nix::errno::Errno;
33//!
34//! struct MyCallback {
35//!     id: u64,
36//! }
37//!
38//! impl IOCallback for MyCallback {
39//!     fn call(self, _offset: i64, res: Result<Option<io_buffer::Buffer>, Errno>) {
40//!         match res {
41//!             Ok(Some(buf)) => println!("Operation {} completed, buffer len: {}", self.id, buf.len()),
42//!             Ok(None) => println!("Operation {} completed (no buffer)", self.id),
43//!             Err(e) => println!("Operation {} failed, error: {}", self.id, e),
44//!         }
45//!     }
46//! }
47//! ```
48//!
49//! ## Short Read/Write Handling
50//!
51//! The engine supports transparent handling of short reads and writes (partial IO).
52//! When a read or write operation completes, the callback worker automatically adjusts
53//! the buffer length to reflect the actual bytes transferred.
54//!
55//! ### How It Works
56//!
57//! - When an IO operation completes, the `callback_unchecked` method adjusts `Buffer::len()`
58//!   to match the actual bytes transferred (`res`).
59//! - For read operations, this means the buffer contains exactly the data that was read.
60//! - For write operations, the buffer length is also adjusted to reflect completion status.
61//!
62//! ### Callback Worker Implementation
63//!
64//! When implementing a custom callback worker, you should use `callback_unchecked` which detect
65//! short I/O and change reading Buffer length to exact copied bytes.
66//!
67//! ```rust,ignore
68//! // In your callback worker thread
69//! loop {
70//!     match rx.recv() {
71//!         Ok(event) => event.callback_unchecked(true),
72//!         Err(_) => break,
73//!     }
74//! }
75//! ```
76//!
77//! ### Advanced: Detecting Short I/O with File Boundary Check
78//!
79//! The `callback` method accepts a closure that allows you to detect if short I/O
80//! is due to reaching the file end (which is normal) or an actual error condition
81//! that requires retry:
82//!
83//! ```rust,ignore
84//! // check_short_read returns true if offset exceeds file end
85//! event.callback(|offset| {
86//!         // NOTE: you should probably use weak reference here
87//!         offset < file_size
88//!     })
89//!     .unwrap_or_else(|event| {
90//!         // Short I/O detected, resubmit the event
91//!         queue_tx.send(event).unwrap();
92//!     });
93//! ```
94//!
95//! The closure receives the current offset and should return `true` if the offset
96//! exceeds the file boundary (indicating EOF). If the closure returns `true`,
97//! the short I/O is considered an error condition that may need retry.
98//!
99//! ## Usage Example (io_uring)
100//!
101//! ```rust
102//! use io_engine::callback_worker::IOWorkers;
103//! use io_engine::{setup, Driver};
104//! use io_engine::tasks::{ClosureCb, IOAction, IOEvent};
105//! use io_buffer::Buffer;
106//! use std::fs::OpenOptions;
107//! use std::os::fd::AsRawFd;
108//! use crossfire::oneshot;
109//!
110//! fn main() {
111//!     // 1. Prepare file
112//!     let file = OpenOptions::new()
113//!         .read(true)
114//!         .write(true)
115//!         .create(true)
116//!         .open("/tmp/test_io_engine.data")
117//!         .unwrap();
118//!     let fd = file.as_raw_fd();
119//!
120//!     // 2. Create channels for submission
121//!     // This channel is used to send events into the engine's submission queue
122//!     let (tx, rx) = crossfire::mpsc::bounded_blocking(128);
123//!
124//!     // 3. Setup the driver (io_uring)
125//!     // worker_num=1, depth=16
126//!     // This spawns the necessary driver threads.
127//!     setup::<ClosureCb, _, _>(
128//!         16,
129//!         rx,
130//!         IOWorkers::new(1),
131//!         Driver::Uring
132//!     ).expect("Failed to setup driver");
133//!
134//!     // 4. Submit a Write
135//!     let mut buffer = Buffer::aligned(4096).unwrap();
136//!     buffer[0] = 65; // 'A'
137//!     let mut event = IOEvent::new(fd, buffer, IOAction::Write, 0);
138//!
139//!     // Create oneshot for this event's completion
140//!     let (done_tx, done_rx) = oneshot::oneshot();
141//!     event.set_callback(ClosureCb(Box::new(move |_offset, res| {
142//!         let _ = done_tx.send(res);
143//!     })));
144//!
145//!     // Send to engine
146//!     tx.send(Box::new(event)).expect("submit");
147//!
148//!     // 5. Wait for completion
149//!     let res = done_rx.recv().unwrap();
150//!     res.map_err(|_| "Write failed").expect("Write failed");
151//!
152//!     // 6. Submit a Read
153//!     let buffer = Buffer::aligned(4096).unwrap();
154//!     let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);
155//!
156//!     let (done_tx, done_rx) = oneshot::oneshot();
157//!     event.set_callback(ClosureCb(Box::new(move |_offset, res| {
158//!         let _ = done_tx.send(res);
159//!     })));
160//!
161//!     tx.send(Box::new(event)).expect("submit");
162//!
163//!     let res = done_rx.recv().unwrap();
164//!     let read_buf = res.expect("Read failed");
165//!     assert!(read_buf.is_some());
166//!     let read_buf = read_buf.unwrap();
167//!     assert_eq!(read_buf.len(), 4096);
168//!     assert_eq!(read_buf[0], 65);
169//! }
170//! ```
171
172#[macro_use]
173extern crate captains_log;
174
175pub mod callback_worker;
176mod context;
177pub use context::{Driver, setup};
178mod driver;
179pub mod merge;
180pub mod tasks;
181
182#[cfg(test)]
183mod test;