Skip to main content

io_engine/
merge.rs

1//! # IO Merging
2//!
3//! This module provides functionality to merge multiple sequential IO requests into a single larger request.
4//!
5//! ## Overview
6//!
7//! Merging IO requests can significantly improve performance by:
8//! - Reducing the number of system calls (`io_uring_enter` or `io_submit`).
9//! - Allowing larger sequential transfers which are often more efficient for storage devices.
10//! - Reducing per-request overhead in the driver and completion handling.
11//!
12//! ## Mechanism
13//!
14//! The core component is [`MergeSubmitter`], which buffers incoming [`IOEvent`]s.
15//!
16//! - **Buffering**: Events are added to [`MergeBuffer`]. They are merged if they are:
17//!   - Sequential (contiguous offsets).
18//!   - Same IO action (Read/Write).
19//!   - Same file descriptor.
20//!   - Total size does not exceed `merge_size_limit`.
21//!
22//! - **Flushing**: When the buffer is full, the limit is reached, or `flush()` is called, the merged request is submitted.
23//!
24//! - **Sub-tasks**:
25//!   - If events are merged, a new "master" [`IOEvent`] is created covering the entire range.
26//!   - The original events are attached as `sub_tasks` (a linked list) to this master event.
27//!   - **Write**: The data from individual buffers is copied into a single large aligned buffer.
28//!   - **Read**: A large buffer is allocated for the master event. Upon completion, data is copied back to the individual event buffers.
29//!   - **Completion**: When the master event completes, it iterates over sub-tasks, sets their results (copying data for reads), and triggers their individual callbacks.
30//!
31//! ## Components
32//! - [`MergeBuffer`]: Internal buffer logic.
33//! - [`MergeSubmitter`]: Wraps a sender channel and manages the merge logic before sending.
34
35use crate::tasks::{IOAction, IOCallback, IOEvent, IOEventMerged};
36use crossfire::BlockingTxTrait;
37use embed_collections::SegList;
38use io_buffer::Buffer;
39use nix::errno::Errno;
40use std::io;
41use std::os::fd::RawFd;
42
43/// Info about the first event and merged state.
44struct MergedInfo<C: IOCallback> {
45    /// First event stored as Box<IOEvent> to allow reuse when merging.
46    first_event: Box<IOEvent<C>>,
47    /// Tail offset: next contiguous address that can be merged.
48    tail_offset: i64,
49    /// Total size of all events including the first.
50    total_size: usize,
51}
52
53/// Buffers sequential IO events for merging.
54///
55/// This internal component collects [`IOEvent`]s,
56/// presuming the same IO action and file descriptor (it does not check),
57/// the merge upper bound is specified in `merge_size_limit`.
58pub struct MergeBuffer<C: IOCallback> {
59    pub merge_size_limit: usize,
60    merged_info: Option<MergedInfo<C>>,
61    /// Subsequent events stored as IOEventMerged for cache-friendly storage.
62    merged_events: SegList<IOEventMerged<C>>,
63}
64
65impl<C: IOCallback> MergeBuffer<C> {
66    /// Creates a new `MergeBuffer` with the specified merge size limit.
67    ///
68    /// # Arguments
69    /// * `merge_size_limit` - The maximum total data size to produce a merged event.
70    #[inline(always)]
71    pub fn new(merge_size_limit: usize) -> Self {
72        Self { merge_size_limit, merged_info: None, merged_events: SegList::new() }
73    }
74
75    /// Checks if a new event can be added to the current buffer for merging.
76    ///
77    /// An event can be added if:
78    /// - The buffer is empty.
79    /// - The event is contiguous with the last event in the buffer.
80    /// - Adding the event does not exceed the `merge_size_limit`.
81    ///
82    /// # Arguments
83    /// * `event` - The [`IOEvent`] to check.
84    ///
85    /// # Returns
86    /// `true` if the event can be added, `false` otherwise.
87    #[inline(always)]
88    pub fn may_add_event(&mut self, event: &IOEvent<C>) -> bool {
89        if let Some(ref info) = self.merged_info {
90            if event.get_size() as usize > self.merge_size_limit {
91                return false;
92            }
93            return info.tail_offset == event.offset;
94        } else {
95            return true;
96        }
97    }
98
99    /// Pushes an event into the buffer.
100    ///
101    /// This method assumes that `may_add_event` has already been called and returned `true`.
102    /// It updates the merged data size and tracks the merged offset.
103    ///
104    /// # Arguments
105    /// * `event` - The [`IOEvent`] to push.
106    ///
107    /// # Safety
108    ///
109    /// You should always check whether event is contiguous with [Self::may_add_event] before calling `push_event()`
110    ///
111    /// # Returns
112    /// `true` if the buffer size has reached or exceeded `merge_size_limit` after adding the event, `false` otherwise.
113    #[inline(always)]
114    pub fn push_event(&mut self, event: IOEvent<C>) -> bool {
115        if let Some(ref mut info) = self.merged_info {
116            // Safety check: ensure may_add_event was called
117            debug_assert_eq!(info.tail_offset, event.offset, "push_event: event not contiguous");
118            debug_assert!(
119                info.total_size + event.get_size() as usize <= self.merge_size_limit,
120                "push_event: exceeds merge_size_limit"
121            );
122            // If this is the second event, move first event's buffer to merged_events
123            if self.merged_events.is_empty() {
124                let first_merged = info.first_event.extract_merged();
125                self.merged_events.push(first_merged);
126            }
127            // Subsequent events: convert to IOEventMerged and store in SegList
128            info.total_size += event.get_size() as usize;
129            info.tail_offset += event.get_size() as i64;
130            self.merged_events.push(event.into_merged());
131            return info.total_size >= self.merge_size_limit;
132        } else {
133            // First event: store as Box<IOEvent> for potential reuse
134            let size = event.get_size() as usize;
135            let offset = event.offset;
136            self.merged_info = Some(MergedInfo {
137                first_event: Box::new(event),
138                tail_offset: offset + size as i64,
139                total_size: size,
140            });
141            return size >= self.merge_size_limit;
142        }
143    }
144
145    /// Returns the number of events currently in the buffer.
146    #[inline(always)]
147    pub fn len(&self) -> usize {
148        if !self.merged_events.is_empty() {
149            // First event buffer moved to merged_events, count is merged_events.len()
150            self.merged_events.len()
151        } else {
152            // Single event or empty
153            self.merged_info.as_ref().map(|_| 1).unwrap_or(0)
154        }
155    }
156
157    /// Takes all buffered events, building merged buffer if needed.
158    /// Returns the master event (Box<IOEvent>) or None if empty.
159    #[inline(always)]
160    fn take(&mut self, action: IOAction) -> Option<Box<IOEvent<C>>> {
161        let info = self.merged_info.take()?;
162
163        // Single event: return directly without mem::replace
164        if self.merged_events.is_empty() {
165            return Some(info.first_event);
166        }
167
168        // Multiple events: take merged_events and build merged buffer
169        let sub_tasks = std::mem::replace(&mut self.merged_events, SegList::new());
170        debug_assert!(sub_tasks.len() > 1);
171        let size = info.total_size;
172        let offset = info.first_event.offset;
173
174        match Buffer::aligned(size as i32) {
175            Ok(mut buffer) => {
176                if action == IOAction::Write {
177                    let mut write_offset = 0;
178                    for merged in sub_tasks.iter() {
179                        buffer.copy_from(write_offset, merged.buf.as_ref());
180                        write_offset += merged.buf.len();
181                    }
182                }
183
184                // Reuse first_event as master, set merged buffer and subtasks
185                let mut master = info.first_event;
186                master.set_merged_tasks(buffer, sub_tasks);
187                Some(master)
188            }
189            Err(_) => {
190                // Allocation failed: error out all events
191                for merged in sub_tasks.drain() {
192                    if let Some(cb) = merged.cb {
193                        cb.call(offset, Err(Errno::ENOMEM));
194                    }
195                }
196                None
197            }
198        }
199    }
200
201    /// Flushes the buffered events, potentially merging them into a single [`IOEvent`].
202    ///
203    /// This method handles different scenarios based on the number of events in the buffer:
204    /// - If the buffer is empty, it returns `None`.
205    /// - If there is a single event, it returns `Some(event)` with the original event.
206    /// - If there are multiple events, it attempts to merge them:
207    ///   - If successful, reuses the first `Box<IOEvent>` as the master event, replacing its buffer.
208    ///   - If buffer allocation for the merged event fails, all original events are marked with an `ENOMEM` error and their callbacks are triggered, then `None` is returned.
209    /// - This function will always override fd in IOEvent with argument
210    ///
211    /// After flushing, the buffer is reset.
212    ///
213    /// # Arguments
214    /// * `fd` - The raw file descriptor associated with the IO operations.
215    /// * `action` - The IO action (Read/Write) for the events.
216    ///
217    /// # Returns
218    /// An `Option<IOEvent<C>>` representing the merged event, a single original event, or `None` if the buffer was empty or merging failed.
219    #[inline]
220    pub fn flush(&mut self, fd: RawFd, action: IOAction) -> Option<IOEvent<C>> {
221        let mut master = self.take(action)?;
222        master.set_fd(fd);
223        Some(*master)
224    }
225}
226
227/// Manages the submission of IO events, attempting to merge sequential events
228/// before sending them to the IO driver.
229///
230/// This component buffers incoming [`IOEvent`]s into a [`MergeBuffer`].
231/// It ensures that events for the same file descriptor and IO action are
232/// considered for merging to optimize system calls.
233pub struct MergeSubmitter<C: IOCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> {
234    fd: RawFd,
235    buffer: MergeBuffer<C>,
236    sender: S,
237    action: IOAction,
238}
239
240impl<C: IOCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> MergeSubmitter<C, S> {
241    /// Creates a new `MergeSubmitter`.
242    ///
243    /// # Arguments
244    /// * `fd` - The raw file descriptor for IO operations.
245    /// * `sender` - A channel sender to send prepared [`IOEvent`]s to the IO driver.
246    /// * `merge_size_limit` - The maximum data size for a merged event buffer.
247    /// * `action` - The primary IO action (Read/Write) for this submitter.
248    pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
249        log_assert!(merge_size_limit > 0);
250        Self { fd, buffer: MergeBuffer::<C>::new(merge_size_limit), sender, action }
251    }
252
253    /// Adds an [`IOEvent`] to the internal buffer, potentially triggering a flush.
254    ///
255    /// If the event cannot be merged with current buffered events (e.g., non-contiguous,
256    /// exceeding merge limit), the existing buffered events are flushed first.
257    /// If adding the new event fills the buffer to its `merge_size_limit`, a flush is also triggered.
258    ///
259    /// # Arguments
260    /// * `event` - The [`IOEvent`] to add.
261    ///
262    /// # Returns
263    /// An `Ok(())` on success, or an `io::Error` if flushing fails.
264    /// On debug mode, will validate event.fd and event.action.
265    pub fn add_event(&mut self, mut event: IOEvent<C>) -> Result<(), io::Error> {
266        log_debug_assert_eq!(self.fd, event.fd);
267        log_debug_assert_eq!(event.action, self.action);
268        let event_size = event.get_size();
269
270        if event_size >= self.buffer.merge_size_limit as u64 || !self.buffer.may_add_event(&event) {
271            if let Err(e) = self._flush() {
272                event.set_error(Errno::ESHUTDOWN as i32);
273                event.callback_unchecked(false);
274                return Err(e);
275            }
276        }
277        if self.buffer.push_event(event) {
278            self._flush()?;
279        }
280        return Ok(());
281    }
282
283    /// Explicitly flushes any pending buffered events to the IO driver.
284    ///
285    /// # Returns
286    /// An `Ok(())` on success, or an `io::Error` if sending the flushed event fails.
287    pub fn flush(&mut self) -> Result<(), io::Error> {
288        self._flush()
289    }
290
291    #[inline(always)]
292    fn _flush(&mut self) -> Result<(), io::Error> {
293        if let Some(event) = self.buffer.flush(self.fd, self.action) {
294            trace!("mio: submit event from flush {:?}", event);
295            self.sender
296                .send(Box::new(event))
297                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
298        }
299        Ok(())
300    }
301}