1use crate::embedded_list::*;
2use crate::tasks::*;
3use crossfire::BlockingTxTrait;
4use io_buffer::Buffer;
5use std::io;
6use std::mem::transmute;
7use std::os::fd::RawFd;
8
9pub struct EventMergeBuffer<C: IoCallback> {
10 pub merge_size_limit: usize,
11 last_event: Option<Box<IOEvent<C>>>,
12 merged_events: Option<EmbeddedList>,
13 merged_offset: i64,
14 merged_data_size: usize,
15 merged_count: usize,
16 list_node_offset: usize,
17}
18
19impl<C: IoCallback> EventMergeBuffer<C> {
20 pub fn new(merge_size_limit: usize) -> Self {
21 Self {
22 merge_size_limit,
23 last_event: None,
24 merged_count: 0,
25 merged_events: None,
26 merged_offset: -1,
27 merged_data_size: 0,
28 list_node_offset: std::mem::offset_of!(IOEvent<C>, node),
29 }
30 }
31
32 #[inline(always)]
33 pub fn may_add_event(&mut self, event: &Box<IOEvent<C>>) -> bool {
34 if self.merged_count > 0 {
35 if self.merged_data_size + event.get_size() > self.merge_size_limit {
36 return false;
37 }
38 return self.merged_offset + self.merged_data_size as i64 == event.offset;
39 } else {
40 return true;
41 }
42 }
43
44 #[inline(always)]
46 pub fn push_event(&mut self, event: Box<IOEvent<C>>) -> bool {
47 if self.merged_count == 0 {
48 self.merged_offset = event.offset;
49 self.merged_data_size = event.get_size();
50 self.last_event = Some(event);
51 self.merged_count = 1;
52 return false;
53 } else {
54 self.merged_data_size += event.get_size();
55 if self.merged_count == 1 {
56 let last_event = self.last_event.take().unwrap();
57 let mut list = EmbeddedList::new(self.list_node_offset);
58 last_event.push_to_list(&mut list);
59 event.push_to_list(&mut list);
60 self.merged_events = Some(list);
61 } else {
62 let merged_events = self.merged_events.as_mut().unwrap();
63 event.push_to_list(merged_events);
64 }
65 self.merged_count += 1;
66 return self.merged_data_size >= self.merge_size_limit;
67 }
68 }
69
70 #[inline(always)]
71 pub fn len(&self) -> usize {
72 self.merged_count
73 }
74
75 #[inline(always)]
76 pub fn take(&mut self) -> (EmbeddedList, i64, usize) {
77 log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
78 let tasks = self.merged_events.take().unwrap();
79 let merged_data_size = self.merged_data_size;
80 let merged_offset = self.merged_offset;
81 self.merged_offset = -1;
82 self.merged_data_size = 0;
83 self.merged_count = 0;
84 (tasks, merged_offset, merged_data_size)
85 }
86
87 #[inline(always)]
88 pub fn take_one(&mut self) -> Box<IOEvent<C>> {
89 log_debug_assert_eq!(self.merged_count, 1);
90 self.merged_offset = -1;
91 self.merged_data_size = 0;
92 self.merged_count = 0;
93 self.last_event.take().unwrap()
94 }
95}
96
97pub struct IOMergeSubmitter<C: IoCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> {
101 fd: RawFd,
102 buffer: EventMergeBuffer<C>,
103 sender: S,
104 action: IOAction,
105}
106
107impl<C: IoCallback, S: BlockingTxTrait<Box<IOEvent<C>>>> IOMergeSubmitter<C, S> {
108 pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
109 log_assert!(merge_size_limit > 0);
110 Self { fd, buffer: EventMergeBuffer::new(merge_size_limit), sender, action }
111 }
112
113 pub fn add_event(&mut self, event: Box<IOEvent<C>>) -> Result<(), io::Error> {
115 log_debug_assert_eq!(self.fd, event.fd);
116 log_debug_assert_eq!(event.action, self.action);
117 let event_size = event.get_size();
118
119 if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
120 if let Err(e) = self._flush() {
121 event.callback();
122 return Err(e);
123 }
124 }
125 if self.buffer.push_event(event) {
126 self._flush()?;
127 }
128 return Ok(());
129 }
130
131 pub fn flush(&mut self) -> Result<(), io::Error> {
132 self._flush()
133 }
134
135 #[inline(always)]
136 fn _flush(&mut self) -> Result<(), io::Error> {
137 let batch_len = self.buffer.len();
138 if batch_len == 0 {
139 return Ok(());
140 }
141 if batch_len == 1 {
142 let submit_event = self.buffer.take_one();
143 trace!("mio: submit {:?} not merged", submit_event);
144 self.sender
145 .send(submit_event)
146 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
147 } else {
148 let (mut events, offset, size) = self.buffer.take();
149 log_assert!(size > 0);
150 match Buffer::aligned(size as i32) {
151 Ok(mut buffer) => {
152 trace!("mio: merged {} ev into {} @{}", events.get_length(), size, offset);
153 if self.action == IOAction::Write {
154 let mut _offset = 0;
155 for _event in events.iter::<IOEvent<C>>() {
156 let event: &IOEvent<C> = unsafe { transmute(_event) };
157 let b = event.get_buf_ref();
158 let _size = b.len();
159 buffer.copy_from(_offset, b);
160 _offset += _size;
161 }
162 }
163 let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
164 event.set_subtasks(events);
165 self.sender
166 .send(event)
167 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
168 }
169 Err(_) => {
170 warn!("mio: alloc buffer size {} failed", size);
172 let mut e: Option<io::Error> = None;
173 while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
174 if let Err(_e) = self
175 .sender
176 .send(event)
177 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))
178 {
179 e.replace(_e);
180 }
181 }
182 if let Some(_e) = e {
183 return Err(_e);
184 }
185 }
186 }
187 }
188 Ok(())
189 }
190}