io_engine/scheduler/
merge.rs

1/*
2Copyright (c) NaturalIO Contributors
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in all
12copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20SOFTWARE.
21*/
22
23use super::embedded_list::*;
24use std::io;
25use std::mem::transmute;
26use std::os::fd::RawFd;
27use std::sync::Arc;
28
29use super::{
30    context::{IOChannelType, IOContext},
31    tasks::*,
32};
33use crate::buffer::Buffer;
34
35pub struct EventMergeBuffer<C: IOCallbackCustom> {
36    pub merge_size_limit: usize,
37    last_event: Option<Box<IOEvent<C>>>,
38    merged_events: Option<EmbeddedList>,
39    merged_offset: i64,
40    merged_data_size: usize,
41    merged_count: usize,
42    list_node_offset: usize,
43}
44
45impl<C: IOCallbackCustom> EventMergeBuffer<C> {
46    pub fn new(merge_size_limit: usize) -> Self {
47        Self {
48            merge_size_limit,
49            last_event: None,
50            merged_count: 0,
51            merged_events: None,
52            merged_offset: -1,
53            merged_data_size: 0,
54            list_node_offset: std::mem::offset_of!(IOEvent<C>, node),
55        }
56    }
57
58    #[inline(always)]
59    pub fn may_add_event(&mut self, event: &Box<IOEvent<C>>) -> bool {
60        if self.merged_count > 0 {
61            if self.merged_data_size + event.get_size() > self.merge_size_limit {
62                return false;
63            }
64            return self.merged_offset + self.merged_data_size as i64 == event.offset;
65        } else {
66            return true;
67        }
68    }
69
70    // return full
71    #[inline(always)]
72    pub fn push_event(&mut self, event: Box<IOEvent<C>>) -> bool {
73        if self.merged_count == 0 {
74            self.merged_offset = event.offset;
75            self.merged_data_size = event.get_size();
76            self.last_event = Some(event);
77            self.merged_count = 1;
78            return false;
79        } else {
80            self.merged_data_size += event.get_size();
81            if self.merged_count == 1 {
82                let last_event = self.last_event.take().unwrap();
83                let mut list = EmbeddedList::new(self.list_node_offset);
84                last_event.push_to_list(&mut list);
85                event.push_to_list(&mut list);
86                self.merged_events = Some(list);
87            } else {
88                let merged_events = self.merged_events.as_mut().unwrap();
89                event.push_to_list(merged_events);
90            }
91            self.merged_count += 1;
92            return self.merged_data_size >= self.merge_size_limit;
93        }
94    }
95
96    #[inline(always)]
97    pub fn len(&self) -> usize {
98        self.merged_count
99    }
100
101    #[inline(always)]
102    pub fn take(&mut self) -> (EmbeddedList, i64, usize) {
103        log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
104        let tasks = self.merged_events.take().unwrap();
105        let merged_data_size = self.merged_data_size;
106        let merged_offset = self.merged_offset;
107        self.merged_offset = -1;
108        self.merged_data_size = 0;
109        self.merged_count = 0;
110        (tasks, merged_offset, merged_data_size)
111    }
112
113    #[inline(always)]
114    pub fn take_one(&mut self) -> Box<IOEvent<C>> {
115        log_debug_assert_eq!(self.merged_count, 1);
116        self.merged_offset = -1;
117        self.merged_data_size = 0;
118        self.merged_count = 0;
119        self.last_event.take().unwrap()
120    }
121}
122
123/// Try to merge sequential IOEvent.
124///
125/// NOTE: Assuming all the IOEvents are of the same file, only debug mode will do validation.
126pub struct IOMergeSubmitter<C: IOCallbackCustom> {
127    fd: RawFd,
128    buffer: EventMergeBuffer<C>,
129    ctx: Arc<IOContext<C>>,
130    action: IOAction,
131    channel_type: IOChannelType,
132}
133
134impl<C: IOCallbackCustom> IOMergeSubmitter<C> {
135    pub fn new(
136        fd: RawFd,
137        ctx: Arc<IOContext<C>>,
138        merge_size_limit: usize,
139        action: IOAction,
140        channel_type: IOChannelType,
141    ) -> Self {
142        log_assert!(merge_size_limit > 0);
143        match action {
144            IOAction::Read => {
145                assert_eq!(channel_type, IOChannelType::Read);
146            }
147            IOAction::Write => {
148                assert!(channel_type != IOChannelType::Read);
149            }
150        }
151        Self {
152            fd,
153            buffer: EventMergeBuffer::new(merge_size_limit),
154            ctx,
155            action,
156            channel_type,
157        }
158    }
159
160    /// On debug mode, will validate event.fd and event.action.
161    pub fn add_event(&mut self, event: Box<IOEvent<C>>) -> Result<(), io::Error> {
162        log_debug_assert_eq!(self.fd, event.fd);
163        log_debug_assert_eq!(event.action, self.action);
164        let event_size = event.get_size();
165
166        if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
167            if let Err(e) = self._flush() {
168                event.callback();
169                return Err(e);
170            }
171        }
172        if self.buffer.push_event(event) {
173            self._flush()?;
174        }
175        return Ok(());
176    }
177
178    pub fn flush(&mut self) -> Result<(), io::Error> {
179        self._flush()
180    }
181
182    #[inline(always)]
183    fn _flush(&mut self) -> Result<(), io::Error> {
184        let batch_len = self.buffer.len();
185        if batch_len == 0 {
186            return Ok(());
187        }
188        if batch_len == 1 {
189            let submit_event = self.buffer.take_one();
190            trace!("mio: submit {:?} not merged", submit_event);
191            self.ctx.submit(submit_event, self.channel_type)?;
192        } else {
193            let (mut events, offset, size) = self.buffer.take();
194            log_assert!(size > 0);
195            match Buffer::aligned(size) {
196                Ok(mut buffer) => {
197                    trace!(
198                        "mio: merged {} ev into {} @{}",
199                        events.get_length(),
200                        size,
201                        offset
202                    );
203                    if self.action == IOAction::Write {
204                        let mut _offset = 0;
205                        for _event in events.iter::<IOEvent<C>>() {
206                            let event: &IOEvent<C> = unsafe { transmute(_event) };
207                            let b = event.get_buf_ref();
208                            let _size = b.len();
209                            buffer.copy_from(_offset, b);
210                            _offset += _size;
211                        }
212                    }
213                    let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
214                    event.set_subtasks(events);
215                    self.ctx.submit(event, self.channel_type)?;
216                }
217                Err(_) => {
218                    // commit seperately
219                    warn!("mio: alloc buffer size {} failed", size);
220                    let mut e: Option<io::Error> = None;
221                    while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
222                        if let Err(_e) = self.ctx.submit(event, self.channel_type) {
223                            e.replace(_e);
224                        }
225                    }
226                    if let Some(_e) = e {
227                        return Err(_e);
228                    }
229                }
230            }
231        }
232        Ok(())
233    }
234}