io_engine/scheduler/
merge.rs

1/*
2Copyright (c) NaturalIO Contributors
3
4Permission is hereby granted, free of charge, to any person obtaining a copy
5of this software and associated documentation files (the "Software"), to deal
6in the Software without restriction, including without limitation the rights
7to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8copies of the Software, and to permit persons to whom the Software is
9furnished to do so, subject to the following conditions:
10
11The above copyright notice and this permission notice shall be included in all
12copies or substantial portions of the Software.
13
14THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20SOFTWARE.
21*/
22
23use super::embedded_list::*;
24use std::io;
25use std::mem::transmute;
26use std::os::fd::RawFd;
27use std::sync::Arc;
28
29use super::{
30    context::{IOChannelType, IOContext},
31    tasks::*,
32};
33use crate::buffer::Buffer;
34
35pub struct EventMergeBuffer<C: IOCallbackCustom> {
36    pub merge_size_limit: usize,
37    last_event: Option<Box<IOEvent<C>>>,
38    merged_events: Option<EmbeddedList>,
39    merged_offset: i64,
40    merged_data_size: usize,
41    merged_count: usize,
42    list_node_offset: usize,
43}
44
45impl<C: IOCallbackCustom> EventMergeBuffer<C> {
46    pub fn new(merge_size_limit: usize) -> Self {
47        Self {
48            merge_size_limit,
49            last_event: None,
50            merged_count: 0,
51            merged_events: None,
52            merged_offset: -1,
53            merged_data_size: 0,
54            list_node_offset: std::mem::offset_of!(IOEvent<C>, node),
55        }
56    }
57
58    #[inline(always)]
59    pub fn may_add_event(&mut self, event: &Box<IOEvent<C>>) -> bool {
60        if self.merged_count > 0 {
61            if self.merged_data_size + event.get_size() > self.merge_size_limit {
62                return false;
63            }
64            return self.merged_offset + self.merged_data_size as i64 == event.offset;
65        } else {
66            return true;
67        }
68    }
69
70    // return full
71    #[inline(always)]
72    pub fn push_event(&mut self, event: Box<IOEvent<C>>) -> bool {
73        if self.merged_count == 0 {
74            self.merged_offset = event.offset;
75            self.merged_data_size = event.get_size();
76            self.last_event = Some(event);
77            self.merged_count = 1;
78            return false;
79        } else {
80            self.merged_data_size += event.get_size();
81            if self.merged_count == 1 {
82                let last_event = self.last_event.take().unwrap();
83                let mut list = EmbeddedList::new(self.list_node_offset);
84                last_event.push_to_list(&mut list);
85                event.push_to_list(&mut list);
86                self.merged_events = Some(list);
87            } else {
88                let merged_events = self.merged_events.as_mut().unwrap();
89                event.push_to_list(merged_events);
90            }
91            self.merged_count += 1;
92            return self.merged_data_size >= self.merge_size_limit;
93        }
94    }
95
96    #[inline(always)]
97    pub fn len(&self) -> usize {
98        self.merged_count
99    }
100
101    #[inline(always)]
102    pub fn take(&mut self) -> (EmbeddedList, i64, usize) {
103        log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
104        let tasks = self.merged_events.take().unwrap();
105        let merged_data_size = self.merged_data_size;
106        let merged_offset = self.merged_offset;
107        self.merged_offset = -1;
108        self.merged_data_size = 0;
109        self.merged_count = 0;
110        (tasks, merged_offset, merged_data_size)
111    }
112
113    #[inline(always)]
114    pub fn take_one(&mut self) -> Box<IOEvent<C>> {
115        log_debug_assert_eq!(self.merged_count, 1);
116        self.merged_offset = -1;
117        self.merged_data_size = 0;
118        self.merged_count = 0;
119        self.last_event.take().unwrap()
120    }
121}
122
123/// Try to merge sequential IOEvent.
124///
125/// NOTE: Assuming all the IOEvents are of the same file, only debug mode will do validation.
126pub struct IOMergeSubmitter<C: IOCallbackCustom> {
127    fd: RawFd,
128    buffer: EventMergeBuffer<C>,
129    ctx: Arc<IOContext<C>>,
130    action: IOAction,
131    channel_type: IOChannelType,
132}
133
134impl<C: IOCallbackCustom> IOMergeSubmitter<C> {
135    pub fn new(
136        fd: RawFd, ctx: Arc<IOContext<C>>, merge_size_limit: usize, action: IOAction,
137        channel_type: IOChannelType,
138    ) -> Self {
139        log_assert!(merge_size_limit > 0);
140        match action {
141            IOAction::Read => {
142                assert_eq!(channel_type, IOChannelType::Read);
143            }
144            IOAction::Write => {
145                assert!(channel_type != IOChannelType::Read);
146            }
147        }
148        Self { fd, buffer: EventMergeBuffer::new(merge_size_limit), ctx, action, channel_type }
149    }
150
151    /// On debug mode, will validate event.fd and event.action.
152    pub fn add_event(&mut self, event: Box<IOEvent<C>>) -> Result<(), io::Error> {
153        log_debug_assert_eq!(self.fd, event.fd);
154        log_debug_assert_eq!(event.action, self.action);
155        let event_size = event.get_size();
156
157        if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
158            if let Err(e) = self._flush() {
159                event.callback();
160                return Err(e);
161            }
162        }
163        if self.buffer.push_event(event) {
164            self._flush()?;
165        }
166        return Ok(());
167    }
168
169    pub fn flush(&mut self) -> Result<(), io::Error> {
170        self._flush()
171    }
172
173    #[inline(always)]
174    fn _flush(&mut self) -> Result<(), io::Error> {
175        let batch_len = self.buffer.len();
176        if batch_len == 0 {
177            return Ok(());
178        }
179        if batch_len == 1 {
180            let submit_event = self.buffer.take_one();
181            trace!("mio: submit {:?} not merged", submit_event);
182            self.ctx.submit(submit_event, self.channel_type)?;
183        } else {
184            let (mut events, offset, size) = self.buffer.take();
185            log_assert!(size > 0);
186            match Buffer::aligned(size) {
187                Ok(mut buffer) => {
188                    trace!("mio: merged {} ev into {} @{}", events.get_length(), size, offset);
189                    if self.action == IOAction::Write {
190                        let mut _offset = 0;
191                        for _event in events.iter::<IOEvent<C>>() {
192                            let event: &IOEvent<C> = unsafe { transmute(_event) };
193                            let b = event.get_buf_ref();
194                            let _size = b.len();
195                            buffer.copy_from(_offset, b);
196                            _offset += _size;
197                        }
198                    }
199                    let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
200                    event.set_subtasks(events);
201                    self.ctx.submit(event, self.channel_type)?;
202                }
203                Err(_) => {
204                    // commit seperately
205                    warn!("mio: alloc buffer size {} failed", size);
206                    let mut e: Option<io::Error> = None;
207                    while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
208                        if let Err(_e) = self.ctx.submit(event, self.channel_type) {
209                            e.replace(_e);
210                        }
211                    }
212                    if let Some(_e) = e {
213                        return Err(_e);
214                    }
215                }
216            }
217        }
218        Ok(())
219    }
220}