Skip to main content

io_engine/
merge.rs

1//! # IO Merging
2//!
3//! This module provides functionality to merge multiple sequential IO requests into a single larger request.
4//!
5//! ## Overview
6//!
7//! Merging IO requests can significantly improve performance by:
8//! - Reducing the number of system calls (`io_uring_enter` or `io_submit`).
9//! - Allowing larger sequential transfers which are often more efficient for storage devices.
10//! - Reducing per-request overhead in the driver and completion handling.
11//!
12//! ## Mechanism
13//!
14//! The core component is [`MergeSubmitter`], which buffers incoming [`IOEvent`]s.
15//!
16//! - **Buffering**: Events are added to [`MergeBuffer`]. They are merged if they are:
17//!   - Sequential (contiguous offsets).
18//!   - Same IO action (Read/Write).
19//!   - Same file descriptor.
20//!   - Total size does not exceed `merge_size_limit`.
21//!
22//! - **Flushing**: When the buffer is full, the limit is reached, or `flush()` is called, the merged request is submitted.
23//!
24//! - **Sub-tasks**:
25//!   - If events are merged, a new "master" [`IOEvent`] is created covering the entire range.
26//!   - The original events are attached as `sub_tasks` (a linked list) to this master event.
27//!   - **Write**: The data from individual buffers is copied into a single large aligned buffer.
28//!   - **Read**: A large buffer is allocated for the master event. Upon completion, data is copied back to the individual event buffers.
29//!   - **Completion**: When the master event completes, `callback_merged` (in `tasks.rs`) is invoked. It iterates over sub-tasks, sets their results (copying data for reads), and triggers their individual callbacks.
30//!
31//! ## Components
32//! - [`MergeBuffer`]: Internal buffer logic.
33//! - [`MergeSubmitter`]: Wraps a sender channel and manages the merge logic before sending.
34
35use crate::tasks::{BufOrLen, IOAction, IOCallback, IOEvent, IOEvent_};
36use crossfire::BlockingTxTrait;
37use embed_collections::slist::SLinkedList;
38use io_buffer::Buffer;
39use libc;
40use std::io;
41use std::os::fd::RawFd;
42
43/// Buffers sequential IO events for merging.
44///
45/// This internal component collects [`IOEvent`]s,
46/// presuming the same IO action and file descriptor (it does not check),
47/// the merge upper bound is specified in `merge_size_limit`.
48pub struct MergeBuffer<C: IOCallback> {
49    pub merge_size_limit: usize,
50    merged_events: SLinkedList<Box<IOEvent_<C>>, ()>,
51    merged_offset: i64,
52    merged_data_size: usize,
53}
54
55impl<C: IOCallback> MergeBuffer<C> {
56    /// Creates a new `MergeBuffer` with the specified merge size limit.
57    ///
58    /// # Arguments
59    /// * `merge_size_limit` - The maximum total data size to produce a merged event.
60    #[inline(always)]
61    pub fn new(merge_size_limit: usize) -> Self {
62        Self {
63            merge_size_limit,
64            merged_events: SLinkedList::new(),
65            merged_offset: -1,
66            merged_data_size: 0,
67        }
68    }
69
70    /// Checks if a new event can be added to the current buffer for merging.
71    ///
72    /// An event can be added if:
73    /// - The buffer is empty.
74    /// - The event is contiguous with the last event in the buffer.
75    /// - Adding the event does not exceed the `merge_size_limit`.
76    ///
77    /// # Arguments
78    /// * `event` - The [`IOEvent`] to check.
79    ///
80    /// # Returns
81    /// `true` if the event can be added, `false` otherwise.
82    #[inline(always)]
83    pub fn may_add_event(&mut self, event: &IOEvent<C>) -> bool {
84        if !self.merged_events.is_empty() {
85            if self.merged_data_size + event.get_size() > self.merge_size_limit {
86                return false;
87            }
88            return self.merged_offset + self.merged_data_size as i64 == event.offset;
89        } else {
90            return true;
91        }
92    }
93
94    /// Pushes an event into the buffer.
95    ///
96    /// This method assumes that `may_add_event` has already been called and returned `true`.
97    /// It updates the merged data size and tracks the merged offset.
98    ///
99    /// # Arguments
100    /// * `event` - The [`IOEvent`] to push.
101    ///
102    /// # Safety
103    ///
104    /// You should always check whether event is contiguous with [Self::may_add_event] before calling `push_event()`
105    ///
106    /// # Returns
107    /// `true` if the buffer size has reached or exceeded `merge_size_limit` after adding the event, `false` otherwise.
108    #[inline(always)]
109    pub fn push_event(&mut self, event: IOEvent<C>) -> bool {
110        if self.merged_events.is_empty() {
111            self.merged_offset = event.offset;
112        }
113        self.merged_data_size += event.get_size();
114        event.push_to_list(&mut self.merged_events);
115
116        return self.merged_data_size >= self.merge_size_limit;
117    }
118
119    /// Returns the number of events currently in the buffer.
120    #[inline(always)]
121    pub fn len(&self) -> usize {
122        self.merged_events.len()
123    }
124
125    /// Takes all buffered events and their merging metadata, resetting the buffer.
126    ///
127    /// This is an internal helper method.
128    ///
129    /// # Returns
130    /// A tuple containing:
131    /// - The `SLinkedList` of buffered events.
132    /// - The starting offset of the merged events.
133    /// - The total data size of the merged events.
134    #[inline(always)]
135    fn take(&mut self) -> (SLinkedList<Box<IOEvent_<C>>, ()>, i64, usize) {
136        // Move the list content out by swapping with empty new list
137        let tasks = std::mem::replace(&mut self.merged_events, SLinkedList::new());
138        let merged_data_size = self.merged_data_size;
139        let merged_offset = self.merged_offset;
140        self.merged_offset = -1;
141        self.merged_data_size = 0;
142        (tasks, merged_offset, merged_data_size)
143    }
144
145    /// Flushes the buffered events, potentially merging them into a single [`IOEvent`].
146    ///
147    /// This method handles different scenarios based on the number of events in the buffer:
148    /// - If the buffer is empty, it returns `None`.
149    /// - If there is a single event, it returns `Some(event)` with the original event.
150    /// - If there are multiple events, it attempts to merge them:
151    ///   - If successful, a new master [`IOEvent`] covering the merged range is returned as `Some(merged_event)`.
152    ///   - If buffer allocation for the merged event fails, all original events are marked with an `ENOMEM` error and their callbacks are triggered, then `None` is returned.
153    /// - This function will always override fd in IOEvent with argument
154    ///
155    /// After flushing, the buffer is reset.
156    ///
157    /// # Arguments
158    /// * `fd` - The raw file descriptor associated with the IO operations.
159    /// * `action` - The IO action (Read/Write) for the events.
160    ///
161    /// # Returns
162    /// An `Option<IOEvent<C>>` representing the merged event, a single original event, or `None` if the buffer was empty or merging failed.
163    #[inline]
164    pub fn flush(&mut self, fd: RawFd, action: IOAction) -> Option<IOEvent<C>> {
165        let batch_len = self.len();
166        if batch_len == 0 {
167            return None;
168        }
169        if batch_len == 1 {
170            self.merged_offset = -1;
171            self.merged_data_size = 0;
172            let mut event = IOEvent::pop_from_list(&mut self.merged_events).unwrap();
173            // NOTE: always reset fd, allow false fd while adding
174            event.set_fd(fd);
175            return Some(event);
176        }
177        let (mut tasks, offset, size) = self.take();
178        log_assert!(size > 0);
179
180        match Buffer::aligned(size as i32) {
181            Ok(mut buffer) => {
182                if action == IOAction::Write {
183                    let mut _offset = 0;
184                    for event_box in tasks.iter() {
185                        if let BufOrLen::Buffer(b) = &event_box.buf_or_len {
186                            let _size = b.len();
187                            buffer.copy_from(_offset, b.as_ref());
188                            _offset += _size;
189                        }
190                    }
191                }
192                let mut event = IOEvent::<C>::new(fd, buffer, action, offset);
193                event.set_subtasks(tasks);
194                Some(event)
195            }
196            Err(e) => {
197                warn!("mio: alloc buffer size {} failed: {}", size, e);
198                while let Some(mut event) = IOEvent::<C>::pop_from_list(&mut tasks) {
199                    event.set_error(libc::ENOMEM);
200                    event.callback();
201                }
202                None
203            }
204        }
205    }
206}
207
208/// Manages the submission of IO events, attempting to merge sequential events
209/// before sending them to the IO driver.
210///
211/// This component buffers incoming [`IOEvent`]s into a [`MergeBuffer`].
212/// It ensures that events for the same file descriptor and IO action are
213/// considered for merging to optimize system calls.
214pub struct MergeSubmitter<C: IOCallback, S: BlockingTxTrait<IOEvent<C>>> {
215    fd: RawFd,
216    buffer: MergeBuffer<C>,
217    sender: S,
218    action: IOAction,
219}
220
221impl<C: IOCallback, S: BlockingTxTrait<IOEvent<C>>> MergeSubmitter<C, S> {
222    /// Creates a new `MergeSubmitter`.
223    ///
224    /// # Arguments
225    /// * `fd` - The raw file descriptor for IO operations.
226    /// * `sender` - A channel sender to send prepared [`IOEvent`]s to the IO driver.
227    /// * `merge_size_limit` - The maximum data size for a merged event buffer.
228    /// * `action` - The primary IO action (Read/Write) for this submitter.
229    pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
230        log_assert!(merge_size_limit > 0);
231        Self { fd, buffer: MergeBuffer::<C>::new(merge_size_limit), sender, action }
232    }
233
234    /// Adds an [`IOEvent`] to the internal buffer, potentially triggering a flush.
235    ///
236    /// If the event cannot be merged with current buffered events (e.g., non-contiguous,
237    /// exceeding merge limit), the existing buffered events are flushed first.
238    /// If adding the new event fills the buffer to its `merge_size_limit`, a flush is also triggered.
239    ///
240    /// # Arguments
241    /// * `event` - The [`IOEvent`] to add.
242    ///
243    /// # Returns
244    /// An `Ok(())` on success, or an `io::Error` if flushing fails.
245    /// On debug mode, will validate event.fd and event.action.
246    pub fn add_event(&mut self, event: IOEvent<C>) -> Result<(), io::Error> {
247        log_debug_assert_eq!(self.fd, event.fd);
248        log_debug_assert_eq!(event.action, self.action);
249        let event_size = event.get_size();
250
251        if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
252            if let Err(e) = self._flush() {
253                event.callback();
254                return Err(e);
255            }
256        }
257        if self.buffer.push_event(event) {
258            self._flush()?;
259        }
260        return Ok(());
261    }
262
263    /// Explicitly flushes any pending buffered events to the IO driver.
264    ///
265    /// # Returns
266    /// An `Ok(())` on success, or an `io::Error` if sending the flushed event fails.
267    pub fn flush(&mut self) -> Result<(), io::Error> {
268        self._flush()
269    }
270
271    #[inline(always)]
272    fn _flush(&mut self) -> Result<(), io::Error> {
273        if let Some(event) = self.buffer.flush(self.fd, self.action) {
274            trace!("mio: submit event from flush {:?}", event);
275            self.sender
276                .send(event)
277                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
278        }
279        Ok(())
280    }
281}