io_engine/scheduler/
merge.rs1use super::embedded_list::*;
24use std::io;
25use std::mem::transmute;
26use std::os::fd::RawFd;
27use std::sync::Arc;
28
29use super::{
30 context::{IOChannelType, IOContext},
31 tasks::*,
32};
33use crate::buffer::Buffer;
34
35pub struct EventMergeBuffer<C: IOCallbackCustom> {
36 pub merge_size_limit: usize,
37 last_event: Option<Box<IOEvent<C>>>,
38 merged_events: Option<EmbeddedList>,
39 merged_offset: i64,
40 merged_data_size: usize,
41 merged_count: usize,
42 list_node_offset: usize,
43}
44
45impl<C: IOCallbackCustom> EventMergeBuffer<C> {
46 pub fn new(merge_size_limit: usize) -> Self {
47 Self {
48 merge_size_limit,
49 last_event: None,
50 merged_count: 0,
51 merged_events: None,
52 merged_offset: -1,
53 merged_data_size: 0,
54 list_node_offset: std::mem::offset_of!(IOEvent<C>, node),
55 }
56 }
57
58 #[inline(always)]
59 pub fn may_add_event(&mut self, event: &Box<IOEvent<C>>) -> bool {
60 if self.merged_count > 0 {
61 if self.merged_data_size + event.get_size() > self.merge_size_limit {
62 return false;
63 }
64 return self.merged_offset + self.merged_data_size as i64 == event.offset;
65 } else {
66 return true;
67 }
68 }
69
70 #[inline(always)]
72 pub fn push_event(&mut self, event: Box<IOEvent<C>>) -> bool {
73 if self.merged_count == 0 {
74 self.merged_offset = event.offset;
75 self.merged_data_size = event.get_size();
76 self.last_event = Some(event);
77 self.merged_count = 1;
78 return false;
79 } else {
80 self.merged_data_size += event.get_size();
81 if self.merged_count == 1 {
82 let last_event = self.last_event.take().unwrap();
83 let mut list = EmbeddedList::new(self.list_node_offset);
84 last_event.push_to_list(&mut list);
85 event.push_to_list(&mut list);
86 self.merged_events = Some(list);
87 } else {
88 let merged_events = self.merged_events.as_mut().unwrap();
89 event.push_to_list(merged_events);
90 }
91 self.merged_count += 1;
92 return self.merged_data_size >= self.merge_size_limit;
93 }
94 }
95
96 #[inline(always)]
97 pub fn len(&self) -> usize {
98 self.merged_count
99 }
100
101 #[inline(always)]
102 pub fn take(&mut self) -> (EmbeddedList, i64, usize) {
103 log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
104 let tasks = self.merged_events.take().unwrap();
105 let merged_data_size = self.merged_data_size;
106 let merged_offset = self.merged_offset;
107 self.merged_offset = -1;
108 self.merged_data_size = 0;
109 self.merged_count = 0;
110 (tasks, merged_offset, merged_data_size)
111 }
112
113 #[inline(always)]
114 pub fn take_one(&mut self) -> Box<IOEvent<C>> {
115 log_debug_assert_eq!(self.merged_count, 1);
116 self.merged_offset = -1;
117 self.merged_data_size = 0;
118 self.merged_count = 0;
119 self.last_event.take().unwrap()
120 }
121}
122
123pub struct IOMergeSubmitter<C: IOCallbackCustom> {
127 fd: RawFd,
128 buffer: EventMergeBuffer<C>,
129 ctx: Arc<IOContext<C>>,
130 action: IOAction,
131 channel_type: IOChannelType,
132}
133
134impl<C: IOCallbackCustom> IOMergeSubmitter<C> {
135 pub fn new(
136 fd: RawFd,
137 ctx: Arc<IOContext<C>>,
138 merge_size_limit: usize,
139 action: IOAction,
140 channel_type: IOChannelType,
141 ) -> Self {
142 log_assert!(merge_size_limit > 0);
143 match action {
144 IOAction::Read => {
145 assert_eq!(channel_type, IOChannelType::Read);
146 }
147 IOAction::Write => {
148 assert!(channel_type != IOChannelType::Read);
149 }
150 }
151 Self {
152 fd,
153 buffer: EventMergeBuffer::new(merge_size_limit),
154 ctx,
155 action,
156 channel_type,
157 }
158 }
159
160 pub fn add_event(&mut self, event: Box<IOEvent<C>>) -> Result<(), io::Error> {
162 log_debug_assert_eq!(self.fd, event.fd);
163 log_debug_assert_eq!(event.action, self.action);
164 let event_size = event.get_size();
165
166 if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
167 if let Err(e) = self._flush() {
168 event.callback();
169 return Err(e);
170 }
171 }
172 if self.buffer.push_event(event) {
173 self._flush()?;
174 }
175 return Ok(());
176 }
177
178 pub fn flush(&mut self) -> Result<(), io::Error> {
179 self._flush()
180 }
181
182 #[inline(always)]
183 fn _flush(&mut self) -> Result<(), io::Error> {
184 let batch_len = self.buffer.len();
185 if batch_len == 0 {
186 return Ok(());
187 }
188 if batch_len == 1 {
189 let submit_event = self.buffer.take_one();
190 trace!("mio: submit {:?} not merged", submit_event);
191 self.ctx.submit(submit_event, self.channel_type)?;
192 } else {
193 let (mut events, offset, size) = self.buffer.take();
194 log_assert!(size > 0);
195 match Buffer::aligned(size) {
196 Ok(mut buffer) => {
197 trace!(
198 "mio: merged {} ev into {} @{}",
199 events.get_length(),
200 size,
201 offset
202 );
203 if self.action == IOAction::Write {
204 let mut _offset = 0;
205 for _event in events.iter::<IOEvent<C>>() {
206 let event: &IOEvent<C> = unsafe { transmute(_event) };
207 let b = event.get_buf_ref();
208 let _size = b.len();
209 buffer.copy_from(_offset, b);
210 _offset += _size;
211 }
212 }
213 let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
214 event.set_subtasks(events);
215 self.ctx.submit(event, self.channel_type)?;
216 }
217 Err(_) => {
218 warn!("mio: alloc buffer size {} failed", size);
220 let mut e: Option<io::Error> = None;
221 while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
222 if let Err(_e) = self.ctx.submit(event, self.channel_type) {
223 e.replace(_e);
224 }
225 }
226 if let Some(_e) = e {
227 return Err(_e);
228 }
229 }
230 }
231 }
232 Ok(())
233 }
234}