1use crate::tasks::*;
36use crossfire::BlockingTxTrait;
37use embed_collections::dlist::DLinkedList;
38use io_buffer::Buffer;
39use std::io;
40use std::os::fd::RawFd;
41
42pub struct EventMergeBuffer<C: IOCallback> {
43 pub merge_size_limit: usize,
44 merged_events: DLinkedList<Box<IOEvent_<C>>, ()>,
45 merged_offset: i64,
46 merged_data_size: usize,
47 merged_count: usize,
48}
49
50impl<C: IOCallback> EventMergeBuffer<C> {
51 pub fn new(merge_size_limit: usize) -> Self {
52 Self {
53 merge_size_limit,
54 merged_count: 0,
55 merged_events: DLinkedList::new(),
56 merged_offset: -1,
57 merged_data_size: 0,
58 }
59 }
60
61 #[inline(always)]
62 pub fn may_add_event(&mut self, event: &IOEvent<C>) -> bool {
63 if self.merged_count > 0 {
64 if self.merged_data_size + event.get_size() > self.merge_size_limit {
65 return false;
66 }
67 return self.merged_offset + self.merged_data_size as i64 == event.offset;
68 } else {
69 return true;
70 }
71 }
72
73 #[inline(always)]
75 pub fn push_event(&mut self, event: IOEvent<C>) -> bool {
76 if self.merged_count == 0 {
77 self.merged_offset = event.offset;
78 }
79 self.merged_data_size += event.get_size();
80 self.merged_count += 1;
81 event.push_to_list(&mut self.merged_events);
82
83 return self.merged_data_size >= self.merge_size_limit;
84 }
85
86 #[inline(always)]
87 pub fn len(&self) -> usize {
88 self.merged_count
89 }
90
91 #[inline(always)]
92 pub fn take(&mut self) -> (DLinkedList<Box<IOEvent_<C>>, ()>, i64, usize) {
93 log_debug_assert!(self.merged_count > 1, "merged_count {}", self.merged_count);
94 let tasks = std::mem::replace(&mut self.merged_events, DLinkedList::new());
96 let merged_data_size = self.merged_data_size;
97 let merged_offset = self.merged_offset;
98
99 self.merged_offset = -1;
100 self.merged_data_size = 0;
101 self.merged_count = 0;
102 (tasks, merged_offset, merged_data_size)
103 }
104
105 #[inline(always)]
106 pub fn take_one(&mut self) -> IOEvent<C> {
107 log_debug_assert_eq!(self.merged_count, 1);
108 self.merged_offset = -1;
109 self.merged_data_size = 0;
110 self.merged_count = 0;
111
112 IOEvent::pop_from_list(&mut self.merged_events).expect("Should have one event")
113 }
114}
115
116pub struct IOMergeSubmitter<C: IOCallback, S: BlockingTxTrait<IOEvent<C>>> {
120 fd: RawFd,
121 buffer: EventMergeBuffer<C>,
122 sender: S,
123 action: IOAction,
124}
125
126impl<C: IOCallback, S: BlockingTxTrait<IOEvent<C>>> IOMergeSubmitter<C, S> {
127 pub fn new(fd: RawFd, sender: S, merge_size_limit: usize, action: IOAction) -> Self {
128 log_assert!(merge_size_limit > 0);
129 Self { fd, buffer: EventMergeBuffer::<C>::new(merge_size_limit), sender, action }
130 }
131
132 pub fn add_event(&mut self, event: IOEvent<C>) -> Result<(), io::Error> {
134 log_debug_assert_eq!(self.fd, event.fd);
135 log_debug_assert_eq!(event.action, self.action);
136 let event_size = event.get_size();
137
138 if event_size >= self.buffer.merge_size_limit || !self.buffer.may_add_event(&event) {
139 if let Err(e) = self._flush() {
140 event.callback();
141 return Err(e);
142 }
143 }
144 if self.buffer.push_event(event) {
145 self._flush()?;
146 }
147 return Ok(());
148 }
149
150 pub fn flush(&mut self) -> Result<(), io::Error> {
151 self._flush()
152 }
153
154 #[inline(always)]
155 fn _flush(&mut self) -> Result<(), io::Error> {
156 let batch_len = self.buffer.len();
157 if batch_len == 0 {
158 return Ok(());
159 }
160 if batch_len == 1 {
161 let submit_event = self.buffer.take_one();
162 trace!("mio: submit {:?} not merged", submit_event);
163 self.sender
164 .send(submit_event)
165 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
166 } else {
167 let (mut events, offset, size) = self.buffer.take();
168 log_assert!(size > 0);
169
170 match Buffer::aligned(size as i32) {
171 Ok(mut buffer) => {
172 if self.action == IOAction::Write {
174 let mut _offset = 0;
175 for event_box in events.iter() {
177 let b = event_box.buf.as_ref().unwrap().as_ref();
179 let _size = b.len();
180 buffer.copy_from(_offset, b);
181 _offset += _size;
182 }
183 }
184 let mut event = IOEvent::<C>::new(self.fd, buffer, self.action, offset);
185 event.set_subtasks(events);
186 self.sender
187 .send(event)
188 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))?;
189 }
190 Err(_) => {
191 warn!("mio: alloc buffer size {} failed", size);
193 let mut e: Option<io::Error> = None;
194 while let Some(event) = IOEvent::<C>::pop_from_list(&mut events) {
195 if let Err(_e) = self
196 .sender
197 .send(event)
198 .map_err(|_| io::Error::new(io::ErrorKind::Other, "Queue closed"))
199 {
200 e.replace(_e);
201 }
202 }
203 if let Some(_e) = e {
204 return Err(_e);
205 }
206 }
207 }
208 }
209 Ok(())
210 }
211}