1use crate::callback_worker::IOWorkers;
2
3use crate::context::IoCtxShared;
4use crate::tasks::{IOAction, IOEvent, IoCallback};
5use crossfire::{BlockingRxTrait, Rx, Tx, spsc};
6use nix::errno::Errno;
7use std::collections::VecDeque;
8use std::fs::File;
9use std::{
10 cell::UnsafeCell,
11 io,
12 mem::transmute,
13 os::fd::AsRawFd,
14 sync::{Arc, atomic::Ordering},
15 thread,
16 time::Duration,
17};
18
19pub struct AioSlot<C: IoCallback> {
20 pub(crate) iocb: iocb,
21 pub(crate) event: Option<Box<IOEvent<C>>>,
22}
23
24impl<C: IoCallback> AioSlot<C> {
25 pub fn new(slot_id: u64) -> Self {
26 Self { iocb: iocb { aio_data: slot_id, aio_reqprio: 1, ..Default::default() }, event: None }
27 }
28
29 #[inline(always)]
30 pub fn fill_slot(&mut self, event: Box<IOEvent<C>>, slot_id: u16) {
31 let iocb = &mut self.iocb;
32 iocb.aio_data = slot_id as libc::__u64;
33 iocb.aio_fildes = event.fd as libc::__u32;
34 if let Some(buf) = event.buf.as_ref() {
35 iocb.aio_lio_opcode = event.action as u16;
36 iocb.aio_buf = buf.get_raw() as u64;
37 iocb.aio_nbytes = buf.len() as u64;
38 } else {
39 iocb.aio_lio_opcode = IOAction::Read as u16;
41 iocb.aio_buf = 0;
42 iocb.aio_nbytes = 0;
43 }
44 iocb.aio_offset = event.offset;
45 self.event.replace(event);
46 }
47
48 #[inline(always)]
49 pub fn set_result(&mut self, written: usize, cb: &IOWorkers<C>) {
50 if let Some(mut event) = self.event.take() {
51 event.set_copied(written);
53 cb.send(event);
54 }
55 }
56
57 #[inline(always)]
58 pub fn set_error(&mut self, errno: i32, cb: &IOWorkers<C>) {
59 if let Some(mut event) = self.event.take() {
60 event.set_error(errno);
61 cb.send(event);
62 }
63 }
64}
65
66struct AioInner<C: IoCallback> {
67 context: aio_context_t,
68 slots: UnsafeCell<Vec<AioSlot<C>>>,
69 null_file: File, }
71
72unsafe impl<C: IoCallback> Send for AioInner<C> {}
73unsafe impl<C: IoCallback> Sync for AioInner<C> {}
74
75pub struct AioDriver<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>>> {
76 _marker: std::marker::PhantomData<(C, Q)>,
77}
78
79impl<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>> + Send + 'static> AioDriver<C, Q> {
80 pub fn start(ctx: Arc<IoCtxShared<C, Q>>) -> io::Result<()> {
81 let depth = ctx.depth;
82 let mut aio_context: aio_context_t = 0;
83 if io_setup(depth as c_long, &mut aio_context) != 0 {
84 return Err(io::Error::last_os_error());
85 }
86
87 let mut slots = Vec::with_capacity(depth);
88 for slot_id in 0..depth {
89 slots.push(AioSlot::new(slot_id as u64));
90 }
91
92 let null_file = File::open("/dev/null")?;
93
94 let inner =
95 Arc::new(AioInner { context: aio_context, slots: UnsafeCell::new(slots), null_file });
96
97 let (s_free, r_free) = spsc::bounded_blocking::<u16>(depth);
98 for i in 0..depth {
99 let _ = s_free.send(i as u16);
100 }
101
102 let ctx_submit = ctx.clone();
103 let inner_submit = inner.clone();
104 thread::spawn(move || Self::submit_loop(ctx_submit, inner_submit, r_free));
105
106 let ctx_poll = ctx.clone();
107 let inner_poll = inner.clone();
108 thread::spawn(move || Self::poll_loop(ctx_poll, inner_poll, s_free));
109
110 Ok(())
111 }
112
113 fn submit_loop(
114 ctx: Arc<IoCtxShared<C, Q>>, inner: Arc<AioInner<C>>, free_recv: Rx<spsc::Array<u16>>,
115 ) {
116 let depth = ctx.depth;
117 let mut iocbs = Vec::<*mut iocb>::with_capacity(depth);
118 let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
119 let aio_context = inner.context;
120 let mut events_to_process = VecDeque::with_capacity(depth);
121
122 loop {
123 if events_to_process.is_empty() {
126 match ctx.queue.recv() {
127 Ok(event) => events_to_process.push_back(event),
128 Err(_) => {
129 let slot_id = free_recv.recv().unwrap();
133 let exit_event = IOEvent::new_exit_signal(inner.null_file.as_raw_fd());
134 let slot = &mut slots_ref[slot_id as usize];
135 slot.fill_slot(exit_event, slot_id);
136 let mut iocb_ptr: *mut iocb = &mut slot.iocb as *mut _;
137
138 let _ = ctx.free_slots_count.fetch_sub(1, Ordering::SeqCst);
139 let res = io_submit(aio_context, 1, &mut iocb_ptr);
140 if res != 1 {
141 let _ = ctx.free_slots_count.fetch_add(1, Ordering::SeqCst);
142 error!("Failed to submit exit signal: {}", res);
143 }
144 info!("io_submit worker exit due to queue closing");
145 break;
146 }
147 }
148 }
149
150 while events_to_process.len() < depth {
152 if let Ok(event) = ctx.queue.try_recv() {
153 events_to_process.push_back(event);
154 } else {
155 break;
156 }
157 }
158
159 let mut first = true;
162 while !events_to_process.is_empty() {
163 let slot_id_opt =
164 if first { Some(free_recv.recv().unwrap()) } else { free_recv.try_recv().ok() };
165
166 if let Some(slot_id) = slot_id_opt {
167 first = false;
168 let event = events_to_process.pop_front().unwrap();
169 let slot = &mut slots_ref[slot_id as usize];
170 slot.fill_slot(event, slot_id);
171 iocbs.push(&mut slot.iocb as *mut iocb);
172 } else {
173 break;
175 }
176 }
177
178 if !iocbs.is_empty() {
180 let mut done: libc::c_long = 0;
181 let mut left = iocbs.len();
182
183 let _ = ctx.free_slots_count.fetch_sub(left, Ordering::SeqCst);
185
186 'submit: loop {
187 let result = unsafe {
188 let arr = iocbs.as_mut_ptr().add(done as usize);
189 io_submit(aio_context, left as libc::c_long, arr)
190 };
191
192 if result < 0 {
193 let _ = ctx.free_slots_count.fetch_add(left, Ordering::SeqCst);
195 if -result == Errno::EINTR as i64 {
196 continue 'submit;
197 }
198 error!("io_submit error: {}", result);
199 break 'submit;
200 } else {
201 if result == left as libc::c_long {
203 trace!("io submit {} events", result);
204 break 'submit;
205 } else {
206 let _ = ctx
207 .free_slots_count
208 .fetch_add(left - result as usize, Ordering::SeqCst);
209 done += result;
210 left -= result as usize;
211 trace!("io submit {}/{} events", result, left);
212 }
213 }
214 }
215 iocbs.clear();
216 }
217 }
218 }
219
220 fn poll_loop(
221 ctx: Arc<IoCtxShared<C, Q>>, inner: Arc<AioInner<C>>, free_sender: Tx<spsc::Array<u16>>,
222 ) {
223 let depth = ctx.depth;
224 let mut infos = Vec::<io_event>::with_capacity(depth);
225 let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
226 let aio_context = inner.context;
227 let mut exit_received = false;
228
229 loop {
230 infos.clear();
231 let result = io_getevents(
232 aio_context,
233 1,
234 depth as i64,
235 infos.as_mut_ptr(),
236 std::ptr::null_mut(),
237 );
238
239 if result < 0 {
240 if -result == Errno::EINTR as i64 {
241 continue;
242 }
243 error!("io_getevents errno: {}", -result);
244 thread::sleep(Duration::from_millis(10));
245 continue;
246 }
247
248 assert!(result > 0);
249 let _ = ctx.free_slots_count.fetch_add(result as usize, Ordering::SeqCst);
250 unsafe {
251 infos.set_len(result as usize);
252 }
253 for ref info in &infos {
254 let slot_id = (*info).data as usize;
255 let slot = &mut slots_ref[slot_id];
256
257 if slot.iocb.aio_nbytes == 0 && (*info).res == 0 {
259 exit_received = true;
260 let _ = free_sender.send(slot_id as u16);
262 continue;
263 }
264
265 Self::verify_result(&ctx, slot, info);
266 let _ = free_sender.send(slot_id as u16);
267 }
268
269 if exit_received && ctx.free_slots_count.load(Ordering::SeqCst) == ctx.depth {
270 info!("io_poll worker exit gracefully");
271 break;
272 }
273 }
274 info!("io_poll worker exit cleaning up");
275 let _ = io_destroy(aio_context);
276 }
277
278 #[inline(always)]
279 fn verify_result(ctx: &IoCtxShared<C, Q>, slot: &mut AioSlot<C>, info: &io_event) {
280 if info.res < 0 {
281 println!("set error {:?}", info.res);
282 slot.set_error((-info.res) as i32, &ctx.cb_workers);
283 return;
284 }
285 slot.set_result(info.res as usize, &ctx.cb_workers);
286 }
287}
288
289use io_engine_aio_bindings::{
291 __NR_io_destroy, __NR_io_getevents, __NR_io_setup, __NR_io_submit, aio_context_t, io_event,
292 iocb, syscall, timespec,
293};
294use libc::c_long;
295
296#[inline(always)]
305fn io_setup(nr: c_long, ctxp: *mut aio_context_t) -> c_long {
306 unsafe { syscall(__NR_io_setup as c_long, nr, ctxp) }
307}
308
309#[inline(always)]
313fn io_destroy(ctx: aio_context_t) -> c_long {
314 unsafe { syscall(__NR_io_destroy as c_long, ctx) }
315}
316
317#[inline(always)]
321fn io_submit(ctx: aio_context_t, nr: c_long, iocbpp: *mut *mut iocb) -> c_long {
322 unsafe { syscall(__NR_io_submit as c_long, ctx, nr, iocbpp) }
323}
324
325#[inline(always)]
329fn io_getevents(
330 ctx: aio_context_t, min_nr: c_long, max_nr: c_long, events: *mut io_event,
331 timeout: *mut timespec,
332) -> c_long {
333 unsafe { syscall(__NR_io_getevents as c_long, ctx, min_nr, max_nr, events, timeout) }
334}