io_engine/merge.rs
1//! # IO Merging
2//!
3//! This module provides functionality to merge multiple sequential IO requests into a single larger request.
4//!
5//! ## Overview
6//!
7//! Merging IO requests can significantly improve performance by:
8//! - Reducing the number of system calls (`io_uring_enter` or `io_submit`).
9//! - Allowing larger sequential transfers which are often more efficient for storage devices.
10//! - Reducing per-request overhead in the driver and completion handling.
11//!
12//! ## Mechanism
13//!
14//! The core component is [`MergeSubmitter`], which buffers incoming [`IOEvent`]s.
15//!
16//! - **Buffering**: Events are added to [`MergeBuffer`]. They are merged if they are:
17//! - Sequential (contiguous offsets).
18//! - Same IO action (Read/Write).
19//! - Same file descriptor.
20//! - Total size does not exceed `merge_size_limit`.
21//!
22//! - **Flushing**: When the buffer is full, the limit is reached, or `flush()` is called, the merged request is submitted.
23//!
24//! - **Sub-tasks**:
25//! - If events are merged, a new "master" [`IOEvent`] is created covering the entire range.
26//! - The original events are attached as `sub_tasks` (a linked list) to this master event.
27//! - **Write**: The data from individual buffers is copied into a single large aligned buffer.
28//! - **Read**: A large buffer is allocated for the master event. Upon completion, data is copied back to the individual event buffers.
29//! - **Completion**: When the master event completes, it iterates over sub-tasks, sets their results (copying data for reads), and triggers their individual callbacks.
30//!
31//! ## Components
32//! - [`MergeBuffer`]: Internal buffer logic.
33//! - [`MergeSubmitter`]: Wraps a sender channel and manages the merge logic before sending.
34
35use crate::tasks::{CbArgs, IOAction, IOEvent, IOEventMerged};
36use crossfire::{BlockingTxTrait, SendError};
37use embed_collections::SegList;
38use io_buffer::Buffer;
39use rustix::io::Errno;
40use std::os::fd::RawFd;
41
42/// Info about the first event and merged state.
43struct MergedInfo<C: CbArgs> {
44 /// First event stored as Box<IOEvent> to allow reuse when merging.
45 first_event: Box<IOEvent<C>>,
46 /// Tail offset: next contiguous address that can be merged.
47 tail_offset: i64,
48 /// Total size of all events including the first.
49 total_size: usize,
50}
51
52/// Buffers sequential IO events for merging.
53///
54/// This internal component collects [`IOEvent`]s,
55/// presuming the same IO action and file descriptor (it does not check),
56/// the merge upper bound is specified in `merge_size_limit`.
57pub struct MergeBuffer<C: CbArgs> {
58 pub merge_size_limit: usize,
59 merged_info: Option<MergedInfo<C>>,
60 /// Subsequent events stored as IOEventMerged for cache-friendly storage.
61 merged_events: SegList<IOEventMerged<C>>,
62}
63
64impl<C: CbArgs> MergeBuffer<C> {
65 /// Creates a new `MergeBuffer` with the specified merge size limit.
66 ///
67 /// # Arguments
68 /// * `merge_size_limit` - The maximum total data size to produce a merged event.
69 #[inline(always)]
70 pub fn new(merge_size_limit: usize) -> Self {
71 Self { merge_size_limit, merged_info: None, merged_events: SegList::new() }
72 }
73
74 /// Checks if a new event can be added to the current buffer for merging.
75 ///
76 /// An event can be added if:
77 /// - The buffer is empty.
78 /// - The event is contiguous with the last event in the buffer.
79 /// - Adding the event does not exceed the `merge_size_limit`.
80 ///
81 /// # Arguments
82 /// * `event` - The [`IOEvent`] to check.
83 ///
84 /// # Returns
85 /// `true` if the event can be added, `false` otherwise.
86 #[inline(always)]
87 pub fn may_add_event(&mut self, event: &IOEvent<C>) -> bool {
88 if let Some(ref info) = self.merged_info {
89 if event.get_size() as usize > self.merge_size_limit {
90 return false;
91 }
92 return info.tail_offset == event.offset;
93 } else {
94 return true;
95 }
96 }
97
98 /// Pushes an event into the buffer.
99 ///
100 /// This method assumes that `may_add_event` has already been called and returned `true`.
101 /// It updates the merged data size and tracks the merged offset.
102 ///
103 /// # Arguments
104 /// * `event` - The [`IOEvent`] to push.
105 ///
106 /// # Safety
107 ///
108 /// You should always check whether event is contiguous with [Self::may_add_event] before calling `push_event()`
109 ///
110 /// # Returns
111 /// `true` if the buffer size has reached or exceeded `merge_size_limit` after adding the event, `false` otherwise.
112 #[inline(always)]
113 pub fn push_event(&mut self, event: IOEvent<C>) -> bool {
114 if let Some(ref mut info) = self.merged_info {
115 // Safety check: ensure may_add_event was called
116 debug_assert_eq!(info.tail_offset, event.offset, "push_event: event not contiguous");
117 debug_assert!(
118 info.total_size + event.get_size() as usize <= self.merge_size_limit,
119 "push_event: exceeds merge_size_limit"
120 );
121 // If this is the second event, move first event's buffer to merged_events
122 if self.merged_events.is_empty() {
123 let first_merged = info.first_event.extract_merged();
124 self.merged_events.push(first_merged);
125 }
126 // Subsequent events: convert to IOEventMerged and store in SegList
127 info.total_size += event.get_size() as usize;
128 info.tail_offset += event.get_size() as i64;
129 self.merged_events.push(event.into_merged());
130 return info.total_size >= self.merge_size_limit;
131 } else {
132 // First event: store as Box<IOEvent> for potential reuse
133 let size = event.get_size() as usize;
134 let offset = event.offset;
135 self.merged_info = Some(MergedInfo {
136 first_event: Box::new(event),
137 tail_offset: offset + size as i64,
138 total_size: size,
139 });
140 return size >= self.merge_size_limit;
141 }
142 }
143
144 /// Returns the number of events currently in the buffer.
145 #[inline(always)]
146 pub fn len(&self) -> usize {
147 if !self.merged_events.is_empty() {
148 // First event buffer moved to merged_events, count is merged_events.len()
149 self.merged_events.len()
150 } else {
151 // Single event or empty
152 self.merged_info.as_ref().map(|_| 1).unwrap_or(0)
153 }
154 }
155
156 /// Takes all buffered events, building merged buffer if needed.
157 /// Returns the master event (Box<IOEvent>) or None if empty.
158 #[inline(always)]
159 fn take(&mut self, action: IOAction) -> Option<Box<IOEvent<C>>> {
160 let info = self.merged_info.take()?;
161
162 // Single event: return directly without mem::replace
163 if self.merged_events.is_empty() {
164 return Some(info.first_event);
165 }
166
167 // Multiple events: take merged_events and build merged buffer
168 let sub_tasks = std::mem::replace(&mut self.merged_events, SegList::new());
169 debug_assert!(sub_tasks.len() > 1);
170 let size = info.total_size;
171 match Buffer::aligned(size as i32) {
172 Ok(mut buffer) => {
173 if action == IOAction::Write {
174 let mut write_offset = 0;
175 for merged in sub_tasks.iter() {
176 buffer.copy_from(write_offset, merged.buf.as_ref());
177 write_offset += merged.buf.len();
178 }
179 }
180
181 // Reuse first_event as master, set merged buffer and subtasks
182 let mut master = info.first_event;
183 master.set_merged_tasks(buffer, sub_tasks);
184 Some(master)
185 }
186 Err(_) => {
187 // Allocation failed: error out all events
188 for merged in sub_tasks.drain() {
189 if let Some(args) = merged.args {
190 args.set_merge_error(Errno::NOMEM);
191 }
192 }
193 None
194 }
195 }
196 }
197
198 /// Flushes the buffered events, potentially merging them into a single [`IOEvent`].
199 ///
200 /// This method handles different scenarios based on the number of events in the buffer:
201 /// - If the buffer is empty, it returns `None`.
202 /// - If there is a single event, it returns `Some(event)` with the original event.
203 /// - If there are multiple events, it attempts to merge them:
204 /// - If successful, reuses the first `Box<IOEvent>` as the master event, replacing its buffer.
205 /// - If buffer allocation for the merged event fails, all original events are marked with an `NOMEM` error and their callbacks are triggered, then `None` is returned.
206 /// - This function will always override fd in IOEvent with argument
207 ///
208 /// After flushing, the buffer is reset.
209 ///
210 /// # Arguments
211 /// * `fd` - The raw file descriptor associated with the IO operations.
212 /// * `action` - The IO action (Read/Write) for the events.
213 ///
214 /// # Returns
215 /// An `Option<IOEvent<C>>` representing the merged event, a single original event, or `None` if the buffer was empty or merging failed.
216 #[inline]
217 pub fn flush(&mut self, fd: RawFd, action: IOAction) -> Option<IOEvent<C>> {
218 let mut master = self.take(action)?;
219 master.set_fd(fd);
220 Some(*master)
221 }
222}
223
224/// Manages the submission of IO events, attempting to merge sequential events
225/// before sending them to the IO driver.
226///
227/// This component buffers incoming [`IOEvent`]s into a [`MergeBuffer`].
228/// It ensures that events for the same file descriptor and IO action are
229/// considered for merging to optimize system calls.
230pub struct MergeSubmitter<C: CbArgs, S: BlockingTxTrait<Box<IOEvent<C>>>> {
231 fd: RawFd,
232 buffer: MergeBuffer<C>,
233 sender: S,
234 action: IOAction,
235}
236
237impl<C: CbArgs, S: BlockingTxTrait<Box<IOEvent<C>>>> MergeSubmitter<C, S> {
238 /// Creates a new `MergeSubmitter`.
239 ///
240 /// # Arguments
241 /// * `fd` - The raw file descriptor for IO operations.
242 /// * `sender` - A channel sender to send prepared [`IOEvent`]s to the IO driver.
243 /// * `merge_size_limit` - The maximum data size for a merged event buffer.
244 /// * `action` - The primary IO action (Read/Write) for this submitter.
245 pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
246 log_assert!(merge_size_limit > 0);
247 Self { fd, buffer: MergeBuffer::<C>::new(merge_size_limit), sender, action }
248 }
249
250 /// Adds an [`IOEvent`] to the internal buffer, potentially triggering a flush.
251 ///
252 /// If the event cannot be merged with current buffered events (e.g., non-contiguous,
253 /// exceeding merge limit), the existing buffered events are flushed first.
254 /// If adding the new event fills the buffer to its `merge_size_limit`, a flush is also triggered.
255 ///
256 /// # Arguments
257 /// * `event` - The [`IOEvent`] to add.
258 ///
259 /// # Returns
260 /// An `Ok(())` on success.
261 /// When submit sender closed, set Errno::SHUTDOWN on `event` and return the same Errno
262 /// On debug mode, will validate event.fd and event.action.
263 pub fn add_event(&mut self, mut event: IOEvent<C>) -> Result<(), Errno> {
264 log_debug_assert_eq!(self.fd, event.fd);
265 log_debug_assert_eq!(event.action, self.action);
266 let event_size = event.get_size();
267
268 if (event_size >= self.buffer.merge_size_limit as u64 || !self.buffer.may_add_event(&event))
269 && let Err(e) = self._flush()
270 {
271 event.set_errno(e);
272 event._callback_unchecked(false, |args: C, _offset: i64, _res| {
273 args.set_merge_error(e);
274 });
275 return Err(e);
276 }
277 if self.buffer.push_event(event) {
278 self._flush()?;
279 }
280 return Ok(());
281 }
282
283 /// Explicitly flushes any pending buffered events to the IO driver.
284 ///
285 /// # Returns
286 /// An `Ok(())` on success, or an `rustix::Errno` when sending the submit tx closed.
287 pub fn flush(&mut self) -> Result<(), Errno> {
288 self._flush()
289 }
290
291 #[inline(always)]
292 fn _flush(&mut self) -> Result<(), Errno> {
293 if let Some(event) = self.buffer.flush(self.fd, self.action) {
294 trace!("mio: submit event from flush {:?}", event);
295 if let Err(SendError(mut fail_event)) = self.sender.send(Box::new(event)) {
296 let e = Errno::SHUTDOWN;
297 fail_event.set_errno(e);
298 fail_event._callback_unchecked(false, |args: C, _offset: i64, _res| {
299 args.set_merge_error(e);
300 });
301 return Err(e);
302 }
303 }
304 Ok(())
305 }
306}