io_engine/scheduler/
merge.rs1use super::embedded_list::*;
24use std::io;
25use std::mem::transmute;
26use std::os::fd::RawFd;
27use std::sync::Arc;
28
29use super::{
30 context::{IOChannelType, IOContext},
31 tasks::*,
32};
33use crate::buffer::Buffer;
34
35pub struct EventMergeBuffer<C: IOCallbackCustom> {
36 pub merge_size_limit: usize,
37 last_event: Option<Box<IOEvent<C>>>,
38 merged_events: Option<EmbeddedList>,
39 merged_offset: i64,
40 merged_data_size: usize,
41 merged_count: usize,
42 list_node_offset: usize,
43}
44
45impl<C: IOCallbackCustom> EventMergeBuffer<C> {
46 pub fn new(merge_size_limit: usize) -> Self {
47 Self {
48 merge_size_limit,
49 last_event: None,
50 merged_count: 0,
51 merged_events: None,
52 merged_offset: -1,
53 merged_data_size: 0,
54 list_node_offset: std::mem::offset_of!(IOEvent<C>, node),
55 }
56 }
57
58 #[inline(always)]
59 pub fn may_add_event(&mut self, event: &Box<IOEvent<C>>) -> bool {
60 if self.merged_count > 0 {
61 if self.merged_data_size + event.get_size() > self.merge_size_limit {
62 return false;
63 }
64 return self.merged_offset + self.merged_data_size as i64 == event.offset;
65 } else {
66 return true;
67 }
68 }
69
70 #[inline(always)]
72 pub fn push_event(&mut self, event: Box<IOEvent<C>>) -> bool {
73 if self.merged_count == 0 {
74 self.merged_offset = event.offset;
75 self.merged_data_size = event.get_size();
76 self.last_event = Some(event);
77 self.merged_count = 1;
78 return false;
79 } else {
80 self.merged_data_size += event.get_size();
81 if self.merged_count == 1 {
82 let last_event = self.last_event.take().unwrap();
83 let mut list = EmbeddedList::new(self.list_node_offset);
84 last_event.push_to_list(&mut list);
85 event.push_to_list(&mut list);
86 self.merged_events = Some(list);
87 } else {
88 let merged_events = self.merged_events.as_mut().unwrap();
89 event.push_to_list(merged_events);
90 }
91 self.merged_count += 1;
92 return self.merged_data_size >= self.merge_size_limit;
93 }
94 }
95
96 #[inline(always)]
97 pub fn len(&self) -> usize {
98 self.merged_count
99 }
100
101 #[inline(always)]
102 pub fn take(&mut self) -> (EmbeddedList, i64, usize) {
103 log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
104 let tasks = self.merged_events.take().unwrap();
105 let merged_data_size = self.merged_data_size;
106 let merged_offset = self.merged_offset;
107 self.merged_offset = -1;
108 self.merged_data_size = 0;
109 self.merged_count = 0;
110 (tasks, merged_offset, merged_data_size)
111 }
112
113 #[inline(always)]
114 pub fn take_one(&mut self) -> Box<IOEvent<C>> {
115 log_debug_assert_eq!(self.merged_count, 1);
116 self.merged_offset = -1;
117 self.merged_data_size = 0;
118 self.merged_count = 0;
119 self.last_event.take().unwrap()
120 }
121}
122
123pub struct IOMergeSubmitter<C: IOCallbackCustom> {
127 fd: RawFd,
128 buffer: EventMergeBuffer<C>,
129 ctx: Arc<IOContext<C>>,
130 action: IOAction,
131 channel_type: IOChannelType,
132}
133
134impl<C: IOCallbackCustom> IOMergeSubmitter<C> {
135 pub fn new(
136 fd: RawFd, ctx: Arc<IOContext<C>>, merge_size_limit: usize, action: IOAction,
137 channel_type: IOChannelType,
138 ) -> Self {
139 log_assert!(merge_size_limit > 0);
140 match action {
141 IOAction::Read => {
142 assert_eq!(channel_type, IOChannelType::Read);
143 }
144 IOAction::Write => {
145 assert!(channel_type != IOChannelType::Read);
146 }
147 }
148 Self { fd, buffer: EventMergeBuffer::new(merge_size_limit), ctx, action, channel_type }
149 }
150
151 pub fn add_event(&mut self, event: Box<IOEvent<C>>) -> Result<(), io::Error> {
153 log_debug_assert_eq!(self.fd, event.fd);
154 log_debug_assert_eq!(event.action, self.action);
155 let event_size = event.get_size();
156
157 if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
158 if let Err(e) = self._flush() {
159 event.callback();
160 return Err(e);
161 }
162 }
163 if self.buffer.push_event(event) {
164 self._flush()?;
165 }
166 return Ok(());
167 }
168
169 pub fn flush(&mut self) -> Result<(), io::Error> {
170 self._flush()
171 }
172
173 #[inline(always)]
174 fn _flush(&mut self) -> Result<(), io::Error> {
175 let batch_len = self.buffer.len();
176 if batch_len == 0 {
177 return Ok(());
178 }
179 if batch_len == 1 {
180 let submit_event = self.buffer.take_one();
181 trace!("mio: submit {:?} not merged", submit_event);
182 self.ctx.submit(submit_event, self.channel_type)?;
183 } else {
184 let (mut events, offset, size) = self.buffer.take();
185 log_assert!(size > 0);
186 match Buffer::aligned(size) {
187 Ok(mut buffer) => {
188 trace!("mio: merged {} ev into {} @{}", events.get_length(), size, offset);
189 if self.action == IOAction::Write {
190 let mut _offset = 0;
191 for _event in events.iter::<IOEvent<C>>() {
192 let event: &IOEvent<C> = unsafe { transmute(_event) };
193 let b = event.get_buf_ref();
194 let _size = b.len();
195 buffer.copy_from(_offset, b);
196 _offset += _size;
197 }
198 }
199 let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
200 event.set_subtasks(events);
201 self.ctx.submit(event, self.channel_type)?;
202 }
203 Err(_) => {
204 warn!("mio: alloc buffer size {} failed", size);
206 let mut e: Option<io::Error> = None;
207 while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
208 if let Err(_e) = self.ctx.submit(event, self.channel_type) {
209 e.replace(_e);
210 }
211 }
212 if let Some(_e) = e {
213 return Err(_e);
214 }
215 }
216 }
217 }
218 Ok(())
219 }
220}