1use std::{
24 cell::UnsafeCell,
25 collections::VecDeque,
26 io,
27 mem::transmute,
28 sync::{
29 Arc,
30 atomic::{AtomicBool, AtomicUsize, Ordering},
31 },
32 thread,
33};
34
35use crossbeam::{
36 channel::{Receiver, Sender, bounded},
37 queue::SegQueue,
38};
39use libc::c_long;
40use nix::errno::Errno;
41use parking_lot::Mutex;
42
43use super::{aio, callback_worker::*, tasks::*};
44
45struct IOContextInner<C: IOCallbackCustom> {
46 context: aio::aio_context_t,
48
49 depth: usize,
50 slots: UnsafeCell<Vec<IOEventTaskSlot<C>>>,
52 prio_count: AtomicUsize,
53 read_count: AtomicUsize,
54 write_count: AtomicUsize,
55 total_count: AtomicUsize,
56 prio_queue: SegQueue<Box<IOEvent<C>>>,
58 read_queue: SegQueue<Box<IOEvent<C>>>,
59 write_queue: SegQueue<Box<IOEvent<C>>>,
60 threads: Mutex<Vec<thread::JoinHandle<()>>>,
61 running: AtomicBool,
62 cb_workers: IOWorkers<C>,
63 free_slots_count: AtomicUsize,
64}
65
66unsafe impl<C: IOCallbackCustom> Send for IOContextInner<C> {}
67
68unsafe impl<C: IOCallbackCustom> Sync for IOContextInner<C> {}
69
70pub struct IOContext<C: IOCallbackCustom> {
71 inner: Arc<IOContextInner<C>>,
72 noti_sender: Sender<()>,
73}
74
75#[derive(PartialEq, Debug, Clone, Copy)]
76#[repr(u8)]
77pub enum IOChannelType {
78 Prio = 0,
79 Read = 1,
80 Write = 2,
81}
82
83impl<C: IOCallbackCustom> Drop for IOContext<C> {
84 fn drop(&mut self) {
85 error!("drop");
86 self.inner.running.store(false, Ordering::SeqCst);
87 }
88}
89
90impl<C: IOCallbackCustom> IOContext<C> {
91 pub fn new(depth: usize, cbs: &IOWorkers<C>) -> Result<Arc<Self>, io::Error> {
92 let mut context: aio::aio_context_t = 0;
93 if aio::io_setup(depth as c_long, &mut context) != 0 {
94 return Err(io::Error::last_os_error());
95 }
96 let (s_noti, r_noti) = bounded::<()>(1);
97 let (s_free, r_free) = bounded::<u16>(depth);
98 for i in 0..depth {
99 let _ = s_free.send(i as u16);
100 }
101 let mut slots = Vec::with_capacity(depth);
102 for slot_id in 0..depth {
103 slots.push(IOEventTaskSlot::new(slot_id as u64));
104 }
105 let inner = Arc::new(IOContextInner {
106 context,
107 depth,
108 slots: UnsafeCell::new(slots),
109 running: AtomicBool::new(true),
110 threads: Mutex::new(Vec::new()),
111 prio_count: AtomicUsize::new(0),
112 read_count: AtomicUsize::new(0),
113 write_count: AtomicUsize::new(0),
114 total_count: AtomicUsize::new(0),
115 prio_queue: SegQueue::new(),
116 read_queue: SegQueue::new(),
117 write_queue: SegQueue::new(),
118 cb_workers: cbs.clone(),
119 free_slots_count: AtomicUsize::new(depth),
120 });
121
122 {
123 let mut threads = inner.threads.lock();
124 let inner1 = inner.clone();
125 let th = thread::spawn(move || inner1.worker_submit(r_noti, r_free));
126 threads.push(th);
127 let inner2 = inner.clone();
128 let sender_free = s_free.clone();
129 let th = thread::spawn(move || inner2.worker_poll(sender_free));
130 threads.push(th);
131 }
132 Ok(Arc::new(Self { inner, noti_sender: s_noti }))
133 }
134
135 #[inline]
136 pub fn get_depth(&self) -> usize {
137 self.get_inner().depth
138 }
139
140 #[inline(always)]
141 pub fn submit(
142 &self, event: Box<IOEvent<C>>, channel_type: IOChannelType,
143 ) -> Result<(), io::Error> {
144 let inner = &self.get_inner();
145 match channel_type {
146 IOChannelType::Prio => {
147 let _ = inner.prio_count.fetch_add(1, Ordering::SeqCst);
148 inner.prio_queue.push(event);
149 }
150 IOChannelType::Read => {
151 let _ = inner.read_count.fetch_add(1, Ordering::SeqCst);
152 inner.read_queue.push(event);
153 }
154 IOChannelType::Write => {
155 let _ = inner.write_count.fetch_add(1, Ordering::SeqCst);
156 inner.write_queue.push(event);
157 }
158 }
159 inner.total_count.fetch_add(1, Ordering::SeqCst);
160 let _ = self.noti_sender.try_send(());
161 Ok(())
162 }
163
164 #[inline(always)]
165 pub fn pending_count(&self) -> usize {
166 self.inner.total_count.load(Ordering::Acquire)
167 }
168
169 pub fn running_count(&self) -> usize {
170 let inner = self.get_inner();
171 let free = inner.free_slots_count.load(Ordering::SeqCst);
172 if free > inner.depth { 0 } else { inner.depth - free }
173 }
174
175 #[inline(always)]
176 fn get_inner(&self) -> &IOContextInner<C> {
177 self.inner.as_ref()
178 }
179}
180
181impl<C: IOCallbackCustom> IOContextInner<C> {
182 #[inline(always)]
183 fn verify_result(&self, slot: &mut IOEventTaskSlot<C>, info: &aio::io_event) -> bool {
184 if info.res <= 0 {
185 slot.set_error((-info.res) as i32, &self.cb_workers);
186 return true;
187 }
188 if slot.set_written(info.res as usize, &self.cb_workers) {
189 return true;
190 }
191 trace!("io not enough, resubmit");
192 let mut arr: [*mut aio::iocb; 1] = [&mut slot.iocb as *mut aio::iocb];
194 'submit: loop {
195 let result = aio::io_submit(self.context, 1, arr.as_mut_ptr() as *mut *mut aio::iocb);
196 if result < 0 {
197 if -result == Errno::EINTR as i64 {
198 continue 'submit;
199 }
200 error!("io_re_submit error: {}", result);
201 slot.set_error(-result as i32, &self.cb_workers);
202 return true;
203 } else if result > 0 {
204 return false;
205 }
206 }
207 }
208
209 fn worker_poll(&self, free_sender: Sender<u16>) {
210 let depth = self.depth;
211 let mut infos = Vec::<aio::io_event>::with_capacity(depth);
212 let context = self.context;
213 let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
214 let ts = aio::timespec { tv_sec: 2, tv_nsec: 0 };
215 loop {
216 infos.clear();
217 let result = aio::io_getevents(context, 1, depth as i64, infos.as_mut_ptr(), unsafe {
218 std::mem::transmute::<&aio::timespec, *mut aio::timespec>(&ts)
219 });
220 if result < 0 {
221 if -result == Errno::EINTR as i64 {
222 continue;
223 }
224 if !self.running.load(Ordering::Acquire) {
225 break;
227 }
228 error!("io_getevents errno: {}", -result);
229 continue;
230 } else if result == 0 {
231 if !self.running.load(Ordering::Acquire) {
232 if self.free_slots_count.load(Ordering::SeqCst) == self.depth {
234 break;
235 }
236 }
237 continue;
238 }
239 let _ = self.free_slots_count.fetch_add(result as usize, Ordering::SeqCst);
240 unsafe {
241 infos.set_len(result as usize);
242 }
243 for ref info in &infos {
244 let slot_id = (*info).data as usize;
245 if self.verify_result(&mut slots[slot_id], info) {
246 let _ = free_sender.send(slot_id as u16);
247 }
248 }
249 }
250 info!("io_poll worker exit due to closing");
251 let _ = aio::io_destroy(self.context);
252 }
253
254 fn worker_submit(&self, noti_recv: Receiver<()>, free_recv: Receiver<u16>) {
255 let depth = self.depth;
256 let mut events = VecDeque::<Box<IOEvent<C>>>::with_capacity(depth);
257 let mut iocbs = Vec::<*mut aio::iocb>::with_capacity(depth);
258 let context = self.context;
259 let slots: &mut Vec<IOEventTaskSlot<C>> = unsafe { transmute(self.slots.get()) };
260 let mut last_write: bool = false;
261
262 'outer: loop {
263 if events.len() == 0 && self.total_count.load(Ordering::Acquire) == 0 {
264 if noti_recv.recv().is_err() {
265 info!("io_submit worker exit due to closing");
266 return;
267 }
268 }
269 'inner_queue: while events.len() < depth {
270 let mut got = false;
271 macro_rules! probe_queue {
272 ($queue: expr, $count: expr) => {
273 loop {
274 if events.len() < depth {
275 if let Some(event) = $queue.pop() {
276 got = true;
277 $count.fetch_sub(1, Ordering::SeqCst);
278 self.total_count.fetch_sub(1, Ordering::SeqCst);
279 events.push_back(event);
280 } else {
281 break;
282 }
283 } else {
284 break 'inner_queue;
285 }
286 }
287 };
288 }
289 if self.prio_count.load(Ordering::Acquire) > 0 {
290 probe_queue!(self.prio_queue, self.prio_count);
291 }
292 if last_write {
293 last_write = false;
294 if self.read_count.load(Ordering::Acquire) > 0 {
295 probe_queue!(self.read_queue, self.read_count);
296 }
297 if self.write_count.load(Ordering::Acquire) > 0 {
298 probe_queue!(self.write_queue, self.write_count);
299 }
300 } else {
301 last_write = true;
302 if self.write_count.load(Ordering::Acquire) > 0 {
303 probe_queue!(self.write_queue, self.write_count);
304 }
305 if self.read_count.load(Ordering::Acquire) > 0 {
306 probe_queue!(self.read_queue, self.read_count);
307 }
308 }
309 if got {
310 continue 'inner_queue;
312 } else {
313 if events.len() > 0 {
315 break;
316 } else {
317 continue 'outer;
319 }
320 }
321 }
322 log_debug_assert!(
323 events.len() <= self.depth,
324 "events {} {} {}",
325 events.len(),
326 events.capacity(),
327 self.depth
328 );
329 log_debug_assert!(events.len() > 0, "events.len()>0");
330 while events.len() > 0 {
331 let slot_id = {
332 if iocbs.len() == 0 {
333 free_recv.recv().unwrap()
334 } else {
335 if let Ok(_slot_id) = free_recv.try_recv() {
336 _slot_id
337 } else {
338 break;
339 }
340 }
341 };
342 let event = events.pop_front().unwrap();
343 let slot = &mut slots[slot_id as usize];
344 slot.fill_slot(event, slot_id);
345 iocbs.push(&mut slot.iocb as *mut aio::iocb);
346 }
347 let mut done: libc::c_long = 0;
348 let mut left = iocbs.len();
349 if left > 0 {
350 'submit: loop {
351 let _ = self.free_slots_count.fetch_sub(left, Ordering::SeqCst);
352 let result = unsafe {
353 let arr = iocbs.as_mut_ptr().add(done as usize);
354 aio::io_submit(context, left as libc::c_long, arr)
356 };
357 if result < 0 {
358 let _ = self.free_slots_count.fetch_add(left, Ordering::SeqCst); if -result == Errno::EINTR as i64 {
360 continue 'submit;
361 }
362 error!("io_submit error: {}", result);
363 } else {
365 if result == left as libc::c_long {
366 trace!("io submit {} events", result);
367 break 'submit;
368 } else {
369 let _ = self
370 .free_slots_count
371 .fetch_add(left - result as usize, Ordering::SeqCst); done += result;
373 left -= result as usize;
374 trace!("io submit {}/{} events", result, left);
375 }
376 }
377 }
378 }
379 iocbs.clear();
380 }
381 }
382}