1use std::{
24 cell::UnsafeCell,
25 collections::VecDeque,
26 io,
27 mem::transmute,
28 sync::{
29 Arc,
30 atomic::{AtomicBool, AtomicUsize, Ordering},
31 },
32 thread,
33};
34
35use crossbeam::{
36 channel::{Receiver, Sender, bounded},
37 queue::SegQueue,
38};
39use libc::c_long;
40use nix::errno::Errno;
41use parking_lot::Mutex;
42
43use super::{aio, callback_worker::*, tasks::*};
44
45struct IOContextInner<C: IOCallbackCustom> {
46 context: aio::aio_context_t,
48
49 depth: usize,
50 slots: UnsafeCell<Vec<IOEventTaskSlot<C>>>,
52 prio_count: AtomicUsize,
53 read_count: AtomicUsize,
54 write_count: AtomicUsize,
55 total_count: AtomicUsize,
56 prio_queue: SegQueue<Box<IOEvent<C>>>,
58 read_queue: SegQueue<Box<IOEvent<C>>>,
59 write_queue: SegQueue<Box<IOEvent<C>>>,
60 threads: Mutex<Vec<thread::JoinHandle<()>>>,
61 running: AtomicBool,
62 cb_workers: IOWorkers<C>,
63 free_slots_count: AtomicUsize,
64}
65
66unsafe impl<C: IOCallbackCustom> Send for IOContextInner<C> {}
67
68unsafe impl<C: IOCallbackCustom> Sync for IOContextInner<C> {}
69
70pub struct IOContext<C: IOCallbackCustom> {
71 inner: Arc<IOContextInner<C>>,
72 noti_sender: Sender<()>,
73}
74
75#[derive(PartialEq, Debug, Clone, Copy)]
76#[repr(u8)]
77pub enum IOChannelType {
78 Prio = 0,
79 Read = 1,
80 Write = 2,
81}
82
83impl<C: IOCallbackCustom> Drop for IOContext<C> {
84 fn drop(&mut self) {
85 error!("drop");
86 self.inner.running.store(false, Ordering::SeqCst);
87 }
88}
89
90impl<C: IOCallbackCustom> IOContext<C> {
91 pub fn new(depth: usize, cbs: &IOWorkers<C>) -> Result<Arc<Self>, io::Error> {
92 let mut context: aio::aio_context_t = 0;
93 if aio::io_setup(depth as c_long, &mut context) != 0 {
94 return Err(io::Error::last_os_error());
95 }
96 let (s_noti, r_noti) = bounded::<()>(1);
97 let (s_free, r_free) = bounded::<u16>(depth);
98 for i in 0..depth {
99 let _ = s_free.send(i as u16);
100 }
101 let mut slots = Vec::with_capacity(depth);
102 for slot_id in 0..depth {
103 slots.push(IOEventTaskSlot::new(slot_id as u64));
104 }
105 let inner = Arc::new(IOContextInner {
106 context,
107 depth,
108 slots: UnsafeCell::new(slots),
109 running: AtomicBool::new(true),
110 threads: Mutex::new(Vec::new()),
111 prio_count: AtomicUsize::new(0),
112 read_count: AtomicUsize::new(0),
113 write_count: AtomicUsize::new(0),
114 total_count: AtomicUsize::new(0),
115 prio_queue: SegQueue::new(),
116 read_queue: SegQueue::new(),
117 write_queue: SegQueue::new(),
118 cb_workers: cbs.clone(),
119 free_slots_count: AtomicUsize::new(depth),
120 });
121
122 {
123 let mut threads = inner.threads.lock();
124 let inner1 = inner.clone();
125 let th = thread::spawn(move || inner1.worker_submit(r_noti, r_free));
126 threads.push(th);
127 let inner2 = inner.clone();
128 let sender_free = s_free.clone();
129 let th = thread::spawn(move || inner2.worker_poll(sender_free));
130 threads.push(th);
131 }
132 Ok(Arc::new(Self {
133 inner,
134 noti_sender: s_noti,
135 }))
136 }
137
138 #[inline]
139 pub fn get_depth(&self) -> usize {
140 self.get_inner().depth
141 }
142
143 #[inline(always)]
144 pub fn submit(
145 &self,
146 event: Box<IOEvent<C>>,
147 channel_type: IOChannelType,
148 ) -> Result<(), io::Error> {
149 let inner = &self.get_inner();
150 match channel_type {
151 IOChannelType::Prio => {
152 let _ = inner.prio_count.fetch_add(1, Ordering::SeqCst);
153 inner.prio_queue.push(event);
154 }
155 IOChannelType::Read => {
156 let _ = inner.read_count.fetch_add(1, Ordering::SeqCst);
157 inner.read_queue.push(event);
158 }
159 IOChannelType::Write => {
160 let _ = inner.write_count.fetch_add(1, Ordering::SeqCst);
161 inner.write_queue.push(event);
162 }
163 }
164 inner.total_count.fetch_add(1, Ordering::SeqCst);
165 let _ = self.noti_sender.try_send(());
166 Ok(())
167 }
168
169 #[inline(always)]
170 pub fn pending_count(&self) -> usize {
171 self.inner.total_count.load(Ordering::Acquire)
172 }
173
174 pub fn running_count(&self) -> usize {
175 let inner = self.get_inner();
176 let free = inner.free_slots_count.load(Ordering::SeqCst);
177 if free > inner.depth {
178 0
179 } else {
180 inner.depth - free
181 }
182 }
183
184 #[inline(always)]
185 fn get_inner(&self) -> &IOContextInner<C> {
186 self.inner.as_ref()
187 }
188}
189
190impl<C: IOCallbackCustom> IOContextInner<C> {
191 #[inline(always)]
192 fn verify_result(&self, slot: &mut IOEventTaskSlot<C>, info: &aio::io_event) -> bool {
193 if info.res <= 0 {
194 slot.set_error((-info.res) as i32, &self.cb_workers);
195 return true;
196 }
197 if slot.set_written(info.res as usize, &self.cb_workers) {
198 return true;
199 }
200 trace!("io not enough, resubmit");
201 let mut arr: [*mut aio::iocb; 1] = [&mut slot.iocb as *mut aio::iocb];
203 'submit: loop {
204 let result = aio::io_submit(self.context, 1, arr.as_mut_ptr() as *mut *mut aio::iocb);
205 if result < 0 {
206 if -result == Errno::EINTR as i64 {
207 continue 'submit;
208 }
209 error!("io_re_submit error: {}", result);
210 slot.set_error(-result as i32, &self.cb_workers);
211 return true;
212 } else if result > 0 {
213 return false;
214 }
215 }
216 }
217
218 fn worker_poll(&self, free_sender: Sender<u16>) {
219 let depth = self.depth;
220 let mut infos = Vec::<aio::io_event>::with_capacity(depth);
221 let context = self.context;
222 let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
223 let ts = aio::timespec {
224 tv_sec: 2,
225 tv_nsec: 0,
226 };
227 loop {
228 infos.clear();
229 let result = aio::io_getevents(context, 1, depth as i64, infos.as_mut_ptr(), unsafe {
230 std::mem::transmute::<&aio::timespec, *mut aio::timespec>(&ts)
231 });
232 if result < 0 {
233 if -result == Errno::EINTR as i64 {
234 continue;
235 }
236 if !self.running.load(Ordering::Acquire) {
237 break;
239 }
240 error!("io_getevents errno: {}", -result);
241 continue;
242 } else if result == 0 {
243 if !self.running.load(Ordering::Acquire) {
244 if self.free_slots_count.load(Ordering::SeqCst) == self.depth {
246 break;
247 }
248 }
249 continue;
250 }
251 let _ = self
252 .free_slots_count
253 .fetch_add(result as usize, Ordering::SeqCst);
254 unsafe {
255 infos.set_len(result as usize);
256 }
257 for ref info in &infos {
258 let slot_id = (*info).data as usize;
259 if self.verify_result(&mut slots[slot_id], info) {
260 let _ = free_sender.send(slot_id as u16);
261 }
262 }
263 }
264 info!("io_poll worker exit due to closing");
265 let _ = aio::io_destroy(self.context);
266 }
267
268 fn worker_submit(&self, noti_recv: Receiver<()>, free_recv: Receiver<u16>) {
269 let depth = self.depth;
270 let mut events = VecDeque::<Box<IOEvent<C>>>::with_capacity(depth);
271 let mut iocbs = Vec::<*mut aio::iocb>::with_capacity(depth);
272 let context = self.context;
273 let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
274 let mut last_write: bool = false;
275
276 'outer: loop {
277 if events.len() == 0 && self.total_count.load(Ordering::Acquire) == 0 {
278 if noti_recv.recv().is_err() {
279 info!("io_submit worker exit due to closing");
280 return;
281 }
282 }
283 'inner_queue: while events.len() < depth {
284 let mut got = false;
285 macro_rules! probe_queue {
286 ($queue: expr, $count: expr) => {
287 loop {
288 if events.len() < depth {
289 if let Some(event) = $queue.pop() {
290 got = true;
291 $count.fetch_sub(1, Ordering::SeqCst);
292 self.total_count.fetch_sub(1, Ordering::SeqCst);
293 events.push_back(event);
294 } else {
295 break;
296 }
297 } else {
298 break 'inner_queue;
299 }
300 }
301 };
302 }
303 if self.prio_count.load(Ordering::Acquire) > 0 {
304 probe_queue!(self.prio_queue, self.prio_count);
305 }
306 if last_write {
307 last_write = false;
308 if self.read_count.load(Ordering::Acquire) > 0 {
309 probe_queue!(self.read_queue, self.read_count);
310 }
311 if self.write_count.load(Ordering::Acquire) > 0 {
312 probe_queue!(self.write_queue, self.write_count);
313 }
314 } else {
315 last_write = true;
316 if self.write_count.load(Ordering::Acquire) > 0 {
317 probe_queue!(self.write_queue, self.write_count);
318 }
319 if self.read_count.load(Ordering::Acquire) > 0 {
320 probe_queue!(self.read_queue, self.read_count);
321 }
322 }
323 if got {
324 continue 'inner_queue;
326 } else {
327 if events.len() > 0 {
329 break;
330 } else {
331 continue 'outer;
333 }
334 }
335 }
336 log_debug_assert!(
337 events.len() <= self.depth,
338 "events {} {} {}",
339 events.len(),
340 events.capacity(),
341 self.depth
342 );
343 log_debug_assert!(events.len() > 0, "events.len()>0");
344 while events.len() > 0 {
345 let slot_id = {
346 if iocbs.len() == 0 {
347 free_recv.recv().unwrap()
348 } else {
349 if let Ok(_slot_id) = free_recv.try_recv() {
350 _slot_id
351 } else {
352 break;
353 }
354 }
355 };
356 let event = events.pop_front().unwrap();
357 let slot = &mut slots[slot_id as usize];
358 slot.fill_slot(event, slot_id);
359 iocbs.push(&mut slot.iocb as *mut aio::iocb);
360 }
361 let mut done: libc::c_long = 0;
362 let mut left = iocbs.len();
363 if left > 0 {
364 'submit: loop {
365 let _ = self.free_slots_count.fetch_sub(left, Ordering::SeqCst);
366 let result = unsafe {
367 let arr = iocbs.as_mut_ptr().add(done as usize);
368 aio::io_submit(context, left as libc::c_long, arr)
370 };
371 if result < 0 {
372 let _ = self.free_slots_count.fetch_add(left, Ordering::SeqCst); if -result == Errno::EINTR as i64 {
374 continue 'submit;
375 }
376 error!("io_submit error: {}", result);
377 } else {
379 if result == left as libc::c_long {
380 trace!("io submit {} events", result);
381 break 'submit;
382 } else {
383 let _ = self
384 .free_slots_count
385 .fetch_add(left - result as usize, Ordering::SeqCst); done += result;
387 left -= result as usize;
388 trace!("io submit {}/{} events", result, left);
389 }
390 }
391 }
392 }
393 iocbs.clear();
394 }
395 }
396}