io_engine/lib.rs
1//! # IO Engine
2//!
3//! A high-performance asynchronous IO library for Linux, masking `AIO` and `io_uring`
4//! interfaces behind a unified API.
5//!
6//! ## Architecture
7//!
8//! Key components:
9//! - [IOEvent]:
10//! - Represents a single IO operation (Read/Write). Carries buffer, offset, fd.
11//! - Because IOEvent is large (>=64B), you should submit `Box<IOEvent<_>>` through channel
12//! - [CbArgs]: Optional completion arguments along with IOEvent.
13//! - [Worker]: Trait for workers handling completions:
14//! - Inline closure [InlineClosure]
15//! - Inline function
16//! - Send the complete IOEvent through spsc, mpsc, mpmc channel sender
17//! - **IO Merging**: The engine supports merging sequential IO requests to reduce system call overhead. See the [`merge`] module for details.
18//!
19//! ## Callbacks
20//!
21//! The engine supports flexible callback mechanisms. You may:
22//! - Capture some global arguments inside closure of callback workers
23//! - Pass arguments with IOEvent with [IOEvent::set_args()]
24//!
25//! ### Example (with WaitGroupGuard as CbArgs)
26//!
27//! ```rust,no_run
28//! use io_engine::{InlineClosure, Driver, setup, IOAction, IOEvent};
29//! use crossfire::{mpsc, waitgroup::{WaitGroup, WaitGroupGuard}};
30//! use io_buffer::{Buffer, rand_buffer};
31//! use rustix::io::Errno;
32//!
33//! let (tx, rx) = mpsc::bounded_blocking(128);
34//!
35//! // Use a shared closure that can be updated for different test phases
36//! let worker = InlineClosure(Box::new(move |_guard: WaitGroupGuard<()>, offset, res| {}));
37//! // WaitGroupGuard has impl CbArgs trait
38//! setup::<WaitGroupGuard<()>, _, _>(128, rx, worker, Driver::Uring).unwrap();
39//! let wg = WaitGroup::new((), 0);
40//! let mut buf = Buffer::aligned(4096i32).unwrap();
41//! rand_buffer(&mut buf);
42//! let fd = todo!("init your fd");
43//! let mut event = IOEvent::new(fd, buf, IOAction::Write, 0);
44//! event.set_args(wg.add_guard());
45//! let _ = tx.send(Box::new(event));
46//! ```
47//!
48//! ## Short Read/Write Handling
49//!
50//! The engine supports transparent handling of short reads and writes (partial IO).
51//! When a read or write operation completes, the callback worker can use `callback_unchecked`
52//! to automatically adjust the buffer length to reflect the actual bytes transferred.
53//!
54//! ### How It Works
55//!
56//! - The [IOEvent::callback] method accepts a closure that allows you to detect if short I/O
57//! is due to reaching the file end (which is normal) or an actual error condition
58//! that requires retry.
59//! - Optionally, you can ignore all short I/O by calling [IOEvent::callback_unchecked], which will
60//! adjust Buffer into the length actually read/write
61//!
62//! ## Example (TxOneshot as CbArgs and Short I/O handling)
63//!
64//! ```rust
65//! use io_engine::{setup, Driver, IOAction, IOEvent, CbArgs};
66//! use io_buffer::{Buffer, rand_buffer};
67//! use std::fs::OpenOptions;
68//! use std::os::fd::AsRawFd;
69//! use std::thread;
70//! use rustix::io::Errno;
71//! use crossfire::{mpsc, oneshot, MTx};
72//!
73//! struct OneshotArg (oneshot::TxOneshot<Result<Option<Buffer>, Errno>>);
74//!
75//! impl CbArgs for OneshotArg {}
76//!
77//! // 1. Prepare file
78//! let file = OpenOptions::new()
79//! .read(true)
80//! .write(true)
81//! .create(true)
82//! .open("/tmp/test_io_engine.data")
83//! .unwrap();
84//! let fd = file.as_raw_fd();
85//!
86//!
87//! // 2. Create channels for submission
88//! let (submit_tx, submit_rx) = mpsc::bounded_blocking(128);
89//!
90//! // 3. Setup the driver (io_uring) with a channel worker
91//! // This example uses a channel to wait for completion.
92//! let (done_tx, done_rx) = crossfire::mpsc::bounded_blocking(128);
93//! /// By default `MTx<_>` has impl Worker trait
94//! setup::<OneshotArg, _, _>(
95//! 16,
96//! submit_rx,
97//! done_tx, // Sender implements Worker
98//! Driver::Uring
99//! ).expect("Failed to setup driver");
100//!
101//! // example for short-io handling
102//! let weak_tx = submit_tx.downgrade();
103//! let file_size = 4096;
104//!
105//! // spawn callback worker
106//! let th = thread::spawn(move || {
107//! let cb = | done_tx: OneshotArg, _offset, res| {
108//! done_tx.0.send(res);
109//! };
110//! while let Ok(event) = done_rx.recv() {
111//! if let Err(short_event) = event.callback(
112//! // for simplicity, we just demonstrate checking a static file bound here
113//! | offset | offset < file_size,
114//! &cb,
115//! ) {
116//! if let Some(submit_tx) = weak_tx.upgrade::<MTx<mpsc::Array<Box<IOEvent<OneshotArg>>>>>() {
117//! submit_tx.send(short_event).expect("resubmit");
118//! } else {
119//! // submit tx already dropped, does not matter.
120//! short_event.callback_unchecked(&cb);
121//! }
122//! }
123//! }
124//! });
125
126//!
127//! // 4. Submit a Write
128//! let mut buf = Buffer::aligned(4096).unwrap();
129//! rand_buffer(&mut buf);
130//! let written_buf = buf.clone();
131//! let mut event = IOEvent::new(fd, buf, IOAction::Write, 0);
132//! let (task_tx, task_rx) = oneshot::oneshot();
133//! event.set_args(OneshotArg(task_tx));
134//!
135//! // Send to engine
136//! submit_tx.send(Box::new(event)).expect("submit");
137//!
138//! // Wait for completion
139//! let res: Result<Option<Buffer>, Errno> = task_rx.recv().unwrap_or(Err(Errno::SHUTDOWN));
140//! assert!(res.is_ok());
141//!
142//! // 5. Submit a Read
143//! let buffer = Buffer::aligned(4096).unwrap();
144//! let mut event = IOEvent::new(fd, buffer, IOAction::Read, 0);
145//! let (task_tx, task_rx) = oneshot::oneshot();
146//! event.set_args(OneshotArg(task_tx));
147//! submit_tx.send(Box::new(event)).expect("submit");
148//!
149//! let res: Result<Option<Buffer>, Errno> = task_rx.recv().unwrap_or(Err(Errno::SHUTDOWN));
150//! let read_buf = res.expect("ok").unwrap();
151//! assert_eq!(read_buf.len(), 4096);
152//! assert_eq!(&read_buf[..], &written_buf[..]);
153//! drop(submit_tx);
154//! th.join();
155//! // callback worker exited
156//! ```
157
158#[macro_use]
159extern crate captains_log;
160
161mod callback_worker;
162pub use callback_worker::{InlineClosure, Worker};
163mod context;
164pub use context::{Driver, setup};
165mod driver;
166pub mod merge;
167mod tasks;
168pub use tasks::{CbArgs, IOAction, IOEvent};
169
170#[cfg(test)]
171mod test;