io_engine/
merge.rs

1use crate::embedded_list::*;
2use crate::tasks::*;
3use crossfire::BlockingTxTrait;
4use io_buffer::Buffer;
5use std::io;
6use std::mem::transmute;
7use std::os::fd::RawFd;
8
9pub struct EventMergeBuffer<C: IoCallback> {
10    pub merge_size_limit: usize,
11    last_event: Option<Box<IOEvent<C>>>,
12    merged_events: Option<EmbeddedList>,
13    merged_offset: i64,
14    merged_data_size: usize,
15    merged_count: usize,
16    list_node_offset: usize,
17}
18
19impl<C: IoCallback> EventMergeBuffer<C> {
20    pub fn new(merge_size_limit: usize) -> Self {
21        Self {
22            merge_size_limit,
23            last_event: None,
24            merged_count: 0,
25            merged_events: None,
26            merged_offset: -1,
27            merged_data_size: 0,
28            list_node_offset: std::mem::offset_of!(IOEvent<C>, node),
29        }
30    }
31
32    #[inline(always)]
33    pub fn may_add_event(&mut self, event: &Box<IOEvent<C>>) -> bool {
34        if self.merged_count > 0 {
35            if self.merged_data_size + event.get_size() > self.merge_size_limit {
36                return false;
37            }
38            return self.merged_offset + self.merged_data_size as i64 == event.offset;
39        } else {
40            return true;
41        }
42    }
43
44    // return full
45    #[inline(always)]
46    pub fn push_event(&mut self, event: Box<IOEvent<C>>) -> bool {
47        if self.merged_count == 0 {
48            self.merged_offset = event.offset;
49            self.merged_data_size = event.get_size();
50            self.last_event = Some(event);
51            self.merged_count = 1;
52            return false;
53        } else {
54            self.merged_data_size += event.get_size();
55            if self.merged_count == 1 {
56                let last_event = self.last_event.take().unwrap();
57                let mut list = EmbeddedList::new(self.list_node_offset);
58                last_event.push_to_list(&mut list);
59                event.push_to_list(&mut list);
60                self.merged_events = Some(list);
61            } else {
62                let merged_events = self.merged_events.as_mut().unwrap();
63                event.push_to_list(merged_events);
64            }
65            self.merged_count += 1;
66            return self.merged_data_size >= self.merge_size_limit;
67        }
68    }
69
70    #[inline(always)]
71    pub fn len(&self) -> usize {
72        self.merged_count
73    }
74
75    #[inline(always)]
76    pub fn take(&mut self) -> (EmbeddedList, i64, usize) {
77        log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
78        let tasks = self.merged_events.take().unwrap();
79        let merged_data_size = self.merged_data_size;
80        let merged_offset = self.merged_offset;
81        self.merged_offset = -1;
82        self.merged_data_size = 0;
83        self.merged_count = 0;
84        (tasks, merged_offset, merged_data_size)
85    }
86
87    #[inline(always)]
88    pub fn take_one(&mut self) -> Box<IOEvent<C>> {
89        log_debug_assert_eq!(self.merged_count, 1);
90        self.merged_offset = -1;
91        self.merged_data_size = 0;
92        self.merged_count = 0;
93        self.last_event.take().unwrap()
94    }
95}
96
97/// Try to merge sequential IOEvent.
98///
99/// NOTE: Assuming all the IOEvents are of the same file, only debug mode will do validation.
100pub struct IOMergeSubmitter<C: IoCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> {
101    fd: RawFd,
102    buffer: EventMergeBuffer<C>,
103    sender: S,
104    action: IOAction,
105}
106
107impl<C: IoCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> IOMergeSubmitter<C, S> {
108    pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
109        log_assert!(merge_size_limit > 0);
110        Self { fd, buffer: EventMergeBuffer::new(merge_size_limit), sender, action }
111    }
112
113    /// On debug mode, will validate event.fd and event.action.
114    pub fn add_event(&mut self, event: Box<IOEvent<C>>) -> Result<(), io::Error> {
115        log_debug_assert_eq!(self.fd, event.fd);
116        log_debug_assert_eq!(event.action, self.action);
117        let event_size = event.get_size();
118
119        if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
120            if let Err(e) = self._flush() {
121                event.callback();
122                return Err(e);
123            }
124        }
125        if self.buffer.push_event(event) {
126            self._flush()?;
127        }
128        return Ok(());
129    }
130
131    pub fn flush(&mut self) -> Result<(), io::Error> {
132        self._flush()
133    }
134
135    #[inline(always)]
136    fn _flush(&mut self) -> Result<(), io::Error> {
137        let batch_len = self.buffer.len();
138        if batch_len == 0 {
139            return Ok(());
140        }
141        if batch_len == 1 {
142            let submit_event = self.buffer.take_one();
143            trace!("mio: submit {:?} not merged", submit_event);
144            self.sender
145                .send(submit_event)
146                .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
147        } else {
148            let (mut events, offset, size) = self.buffer.take();
149            log_assert!(size > 0);
150            match Buffer::aligned(size as i32) {
151                Ok(mut buffer) => {
152                    trace!("mio: merged {} ev into {} @{}", events.get_length(), size, offset);
153                    if self.action == IOAction::Write {
154                        let mut _offset = 0;
155                        for _event in events.iter::<IOEvent<C>>() {
156                            let event: &IOEvent<C> = unsafe { transmute(_event) };
157                            let b = event.get_buf_ref();
158                            let _size = b.len();
159                            buffer.copy_from(_offset, b);
160                            _offset += _size;
161                        }
162                    }
163                    let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
164                    event.set_subtasks(events);
165                    self.sender
166                        .send(event)
167                        .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
168                }
169                Err(_) => {
170                    // commit separately
171                    warn!("mio: alloc buffer size {} failed", size);
172                    let mut e: Option<io::Error> = None;
173                    while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
174                        if let Err(_e) = self
175                            .sender
176                            .send(event)
177                            .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))
178                        {
179                            e.replace(_e);
180                        }
181                    }
182                    if let Some(_e) = e {
183                        return Err(_e);
184                    }
185                }
186            }
187        }
188        Ok(())
189    }
190}