io_engine/
merge.rs

1//! # IO Merging
2//!
3//! This module provides functionality to merge multiple sequential IO requests into a single larger request.
4//!
5//! ## Overview
6//!
7//! Merging IO requests can significantly improve performance by:
8//! - Reducing the number of system calls (`io_uring_enter` or `io_submit`).
9//! - Allowing larger sequential transfers which are often more efficient for storage devices.
10//! - Reducing per-request overhead in the driver and completion handling.
11//!
12//! ## Mechanism
13//!
14//! The core component is [`IOMergeSubmitter`], which buffers incoming [`IOEvent`]s.
15//!
16//! - **Buffering**: Events are added to [`EventMergeBuffer`]. They are merged if they are:
17//!   - Sequential (contiguous offsets).
18//!   - Same IO action (Read/Write).
19//!   - Same file descriptor.
20//!   - Total size does not exceed `merge_size_limit`.
21//!
22//! - **Flushing**: When the buffer is full, the limit is reached, or `flush()` is called, the merged request is submitted.
23//!
24//! - **Sub-tasks**:
25//!   - If events are merged, a new "master" [`IOEvent`] is created covering the entire range.
26//!   - The original events are attached as `sub_tasks` (a linked list) to this master event.
27//!   - **Write**: The data from individual buffers is copied into a single large aligned buffer.
28//!   - **Read**: A large buffer is allocated for the master event. Upon completion, data is copied back to the individual event buffers.
29//!   - **Completion**: When the master event completes, `callback_merged` (in `tasks.rs`) is invoked. It iterates over sub-tasks, sets their results (copying data for reads), and triggers their individual callbacks.
30//!
31//! ## Components
32//! - [`EventMergeBuffer`]: Internal buffer logic.
33//! - [`IOMergeSubmitter`]: Wraps a sender channel and manages the merge logic before sending.
34
35use crate::tasks::*;
36use crossfire::BlockingTxTrait;
37use embed_collections::dlist::DLinkedList;
38use io_buffer::Buffer;
39use std::io;
40use std::os::fd::RawFd;
41
42pub struct EventMergeBuffer<C: IOCallback> {
43    pub merge_size_limit: usize,
44    merged_events: DLinkedList<Box<IOEvent_<C>>, ()>,
45    merged_offset: i64,
46    merged_data_size: usize,
47    merged_count: usize,
48}
49
50impl<C: IOCallback> EventMergeBuffer<C> {
51    pub fn new(merge_size_limit: usize) -> Self {
52        Self {
53            merge_size_limit,
54            merged_count: 0,
55            merged_events: DLinkedList::new(),
56            merged_offset: -1,
57            merged_data_size: 0,
58        }
59    }
60
61    #[inline(always)]
62    pub fn may_add_event(&mut self, event: &IOEvent<C>) -> bool {
63        if self.merged_count > 0 {
64            if self.merged_data_size + event.get_size() > self.merge_size_limit {
65                return false;
66            }
67            return self.merged_offset + self.merged_data_size as i64 == event.offset;
68        } else {
69            return true;
70        }
71    }
72
73    // return full
74    #[inline(always)]
75    pub fn push_event(&mut self, event: IOEvent<C>) -> bool {
76        if self.merged_count == 0 {
77            self.merged_offset = event.offset;
78        }
79        self.merged_data_size += event.get_size();
80        self.merged_count += 1;
81        event.push_to_list(&mut self.merged_events);
82
83        return self.merged_data_size >= self.merge_size_limit;
84    }
85
86    #[inline(always)]
87    pub fn len(&self) -> usize {
88        self.merged_count
89    }
90
91    #[inline(always)]
92    pub fn take(&mut self) -> (DLinkedList<Box<IOEvent_<C>>, ()>, i64, usize) {
93        log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
94        // Move the list content out by swapping with empty new list
95        let tasks = std::mem::replace(&mut self.merged_events, DLinkedList::new());
96        let merged_data_size = self.merged_data_size;
97        let merged_offset = self.merged_offset;
98
99        self.merged_offset = -1;
100        self.merged_data_size = 0;
101        self.merged_count = 0;
102        (tasks, merged_offset, merged_data_size)
103    }
104
105    #[inline(always)]
106    pub fn take_one(&mut self) -> IOEvent<C> {
107        log_debug_assert_eq!(self.merged_count, 1);
108        self.merged_offset = -1;
109        self.merged_data_size = 0;
110        self.merged_count = 0;
111
112        IOEvent::pop_from_list(&mut self.merged_events).expect("Should have one event")
113    }
114}
115
116/// Try to merge sequential IOEvent.
117///
118/// NOTE: Assuming all the IOEvents are of the same file, only debug mode will do validation.
119pub struct IOMergeSubmitter<C: IOCallback, S: BlockingTxTrait<IOEvent<C>>> {
120    fd: RawFd,
121    buffer: EventMergeBuffer<C>,
122    sender: S,
123    action: IOAction,
124}
125
126impl<C: IOCallback, S: BlockingTxTrait<IOEvent<C>>> IOMergeSubmitter<C, S> {
127    pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
128        log_assert!(merge_size_limit > 0);
129        Self { fd, buffer: EventMergeBuffer::<C>::new(merge_size_limit), sender, action }
130    }
131
132    /// On debug mode, will validate event.fd and event.action.
133    pub fn add_event(&mut self, event: IOEvent<C>) -> Result<(), io::Error> {
134        log_debug_assert_eq!(self.fd, event.fd);
135        log_debug_assert_eq!(event.action, self.action);
136        let event_size = event.get_size();
137
138        if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
139            if let Err(e) = self._flush() {
140                event.callback();
141                return Err(e);
142            }
143        }
144        if self.buffer.push_event(event) {
145            self._flush()?;
146        }
147        return Ok(());
148    }
149
150    pub fn flush(&mut self) -> Result<(), io::Error> {
151        self._flush()
152    }
153
154    #[inline(always)]
155    fn _flush(&mut self) -> Result<(), io::Error> {
156        let batch_len = self.buffer.len();
157        if batch_len == 0 {
158            return Ok(());
159        }
160        if batch_len == 1 {
161            let submit_event = self.buffer.take_one();
162            trace!("mio: submit {:?} not merged", submit_event);
163            self.sender
164                .send(submit_event)
165                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
166        } else {
167            let (mut events, offset, size) = self.buffer.take();
168            log_assert!(size > 0);
169
170            match Buffer::aligned(size as i32) {
171                Ok(mut buffer) => {
172                    // trace!("mio: merged {} ev into {} @{}", events.len(), size, offset);
173                    if self.action == IOAction::Write {
174                        let mut _offset = 0;
175                        // Iterate references safely
176                        for event_box in events.iter() {
177                            // event_box is &IOEvent<C>>
178                            let b = event_box.buf.as_ref().unwrap().as_ref();
179                            let _size = b.len();
180                            buffer.copy_from(_offset, b);
181                            _offset += _size;
182                        }
183                    }
184                    let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
185                    event.set_subtasks(events);
186                    self.sender
187                        .send(event)
188                        .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
189                }
190                Err(_) => {
191                    // commit separately
192                    warn!("mio: alloc buffer size {} failed", size);
193                    let mut e: Option<io::Error> = None;
194                    while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
195                        if let Err(_e) = self
196                            .sender
197                            .send(event)
198                            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))
199                        {
200                            e.replace(_e);
201                        }
202                    }
203                    if let Some(_e) = e {
204                        return Err(_e);
205                    }
206                }
207            }
208        }
209        Ok(())
210    }
211}