1use crate::callback_worker::Worker;
2
3use crate::context::CtxShared;
4use crate::tasks::{IOAction, IOCallback, IOEvent};
5use crossfire::{BlockingRxTrait, Rx, Tx, spsc};
6use nix::errno::Errno;
7use std::collections::VecDeque;
8use std::fs::File;
9use std::{
10 cell::UnsafeCell,
11 io,
12 mem::transmute,
13 os::fd::AsRawFd,
14 sync::{Arc, atomic::Ordering},
15 thread,
16 time::Duration,
17};
18
19pub struct AioSlot<C: IOCallback> {
20 pub(crate) iocb: iocb,
21 pub(crate) event: Option<IOEvent<C>>,
22}
23
24impl<C: IOCallback> AioSlot<C> {
25 pub fn new(slot_id: u64) -> Self {
26 Self { iocb: iocb { aio_data: slot_id, aio_reqprio: 1, ..Default::default() }, event: None }
27 }
28
29 #[inline(always)]
30 pub fn fill_slot(&mut self, event: IOEvent<C>, slot_id: u16) {
31 let iocb = &mut self.iocb;
32 iocb.aio_data = slot_id as libc::__u64;
33 iocb.aio_fildes = event.fd as libc::__u32;
34
35 if let Some(buf) = event.buf.as_ref() {
36 iocb.aio_lio_opcode = event.action as u16;
37
38 if event.res > 0 {
39 let progress = event.res as u64;
40 iocb.aio_buf = (buf.get_raw() as u64) + progress;
41 iocb.aio_nbytes = (buf.len() as u64) - progress;
42 iocb.aio_offset = event.offset + (progress as i64);
43 } else {
44 iocb.aio_buf = buf.get_raw() as u64;
45 iocb.aio_nbytes = buf.len() as u64;
46 iocb.aio_offset = event.offset;
47 }
48 } else {
49 iocb.aio_lio_opcode = IOAction::Read as u16;
51 iocb.aio_buf = 0;
52 iocb.aio_nbytes = 0;
53 iocb.aio_offset = event.offset;
54 }
55 self.event.replace(event);
56 }
57
58 #[inline(always)]
59 pub fn set_result<W: Worker<C>>(&mut self, written: usize, cb: &W) {
60 if let Some(mut event) = self.event.take() {
61 event.set_copied(written);
63 cb.done(event);
64 }
65 }
66
67 #[inline(always)]
68 pub fn set_error<W: Worker<C>>(&mut self, errno: i32, cb: &W) {
69 if let Some(mut event) = self.event.take() {
70 event.set_error(errno);
71 cb.done(event);
72 }
73 }
74}
75
76struct AioInner<C: IOCallback> {
77 context: aio_context_t,
78 slots: UnsafeCell<Vec<AioSlot<C>>>,
79 null_file: File, }
81
82unsafe impl<C: IOCallback> Send for AioInner<C> {}
83unsafe impl<C: IOCallback> Sync for AioInner<C> {}
84
85pub struct AioDriver<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>>, W: Worker<C>> {
86 _marker: std::marker::PhantomData<(C, Q, W)>,
87}
88
89impl<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>> + Send + 'static, W: Worker<C> + Send + 'static>
90 AioDriver<C, Q, W>
91{
92 pub fn start(ctx: Arc<CtxShared<C, Q, W>>) -> io::Result<()> {
93 let depth = ctx.depth;
94 let mut aio_context: aio_context_t = 0;
95 if io_setup(depth as c_long, &mut aio_context) != 0 {
96 return Err(io::Error::last_os_error());
97 }
98
99 let mut slots = Vec::with_capacity(depth);
100 for slot_id in 0..depth {
101 slots.push(AioSlot::new(slot_id as u64));
102 }
103
104 let null_file = File::open("/dev/null")?;
105
106 let inner =
107 Arc::new(AioInner { context: aio_context, slots: UnsafeCell::new(slots), null_file });
108
109 let (s_free, r_free) = spsc::bounded_blocking::<u16>(depth);
110 for i in 0..depth {
111 let _ = s_free.send(i as u16);
112 }
113
114 let ctx_submit = ctx.clone();
115 let inner_submit = inner.clone();
116 thread::spawn(move || Self::submit_loop(ctx_submit, inner_submit, r_free));
117
118 let ctx_poll = ctx.clone();
119 let inner_poll = inner.clone();
120 thread::spawn(move || Self::poll_loop(ctx_poll, inner_poll, s_free));
121
122 Ok(())
123 }
124
125 fn submit_loop(
126 ctx: Arc<CtxShared<C, Q, W>>, inner: Arc<AioInner<C>>, free_recv: Rx<spsc::Array<u16>>,
127 ) {
128 let depth = ctx.depth;
129 let mut iocbs = Vec::<*mut iocb>::with_capacity(depth);
130 let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
131 let aio_context = inner.context;
132 let mut events_to_process = VecDeque::with_capacity(depth);
133
134 loop {
135 if events_to_process.is_empty() {
138 match ctx.queue.recv() {
139 Ok(event) => events_to_process.push_back(event),
140 Err(_) => {
141 let slot_id = free_recv.recv().unwrap();
145 let exit_event = IOEvent::new_exit_signal(inner.null_file.as_raw_fd());
146 let slot = &mut slots_ref[slot_id as usize];
147 slot.fill_slot(exit_event, slot_id);
148 let mut iocb_ptr: *mut iocb = &mut slot.iocb as *mut _;
149
150 let _ = ctx.free_slots_count.fetch_sub(1, Ordering::SeqCst);
151 let res = io_submit(aio_context, 1, &mut iocb_ptr);
152 if res != 1 {
153 let _ = ctx.free_slots_count.fetch_add(1, Ordering::SeqCst);
154 error!("Failed to submit exit signal: {}", res);
155 }
156 info!("io_submit worker exit due to queue closing");
157 break;
158 }
159 }
160 }
161
162 while events_to_process.len() < depth {
164 if let Ok(event) = ctx.queue.try_recv() {
165 events_to_process.push_back(event);
166 } else {
167 break;
168 }
169 }
170
171 let mut first = true;
174 while !events_to_process.is_empty() {
175 let slot_id_opt =
176 if first { Some(free_recv.recv().unwrap()) } else { free_recv.try_recv().ok() };
177
178 if let Some(slot_id) = slot_id_opt {
179 first = false;
180 let event = events_to_process.pop_front().unwrap();
181 let slot = &mut slots_ref[slot_id as usize];
182 slot.fill_slot(event, slot_id);
183 iocbs.push(&mut slot.iocb as *mut iocb);
184 } else {
185 break;
187 }
188 }
189
190 if !iocbs.is_empty() {
192 let mut done: libc::c_long = 0;
193 let mut left = iocbs.len();
194
195 let _ = ctx.free_slots_count.fetch_sub(left, Ordering::SeqCst);
197
198 'submit: loop {
199 let result = unsafe {
200 let arr = iocbs.as_mut_ptr().add(done as usize);
201 io_submit(aio_context, left as libc::c_long, arr)
202 };
203
204 if result < 0 {
205 let _ = ctx.free_slots_count.fetch_add(left, Ordering::SeqCst);
207 if -result == Errno::EINTR as i64 {
208 continue 'submit;
209 }
210 error!("io_submit error: {}", result);
211 break 'submit;
212 } else {
213 if result == left as libc::c_long {
215 trace!("io submit {} events", result);
216 break 'submit;
217 } else {
218 let _ = ctx
219 .free_slots_count
220 .fetch_add(left - result as usize, Ordering::SeqCst);
221 done += result;
222 left -= result as usize;
223 trace!("io submit {}/{} events", result, left);
224 }
225 }
226 }
227 iocbs.clear();
228 }
229 }
230 }
231
232 fn poll_loop(
233 ctx: Arc<CtxShared<C, Q, W>>, inner: Arc<AioInner<C>>, free_sender: Tx<spsc::Array<u16>>,
234 ) {
235 let depth = ctx.depth;
236 let mut infos = Vec::<io_event>::with_capacity(depth);
237 let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
238 let aio_context = inner.context;
239 let mut exit_received = false;
240
241 loop {
242 infos.clear();
243 let result = io_getevents(
244 aio_context,
245 1,
246 depth as i64,
247 infos.as_mut_ptr(),
248 std::ptr::null_mut(),
249 );
250
251 if result < 0 {
252 if -result == Errno::EINTR as i64 {
253 continue;
254 }
255 error!("io_getevents errno: {}", -result);
256 thread::sleep(Duration::from_millis(10));
257 continue;
258 }
259
260 assert!(result > 0);
261 let _ = ctx.free_slots_count.fetch_add(result as usize, Ordering::SeqCst);
262 unsafe {
263 infos.set_len(result as usize);
264 }
265 for ref info in &infos {
266 let slot_id = (*info).data as usize;
267 let slot = &mut slots_ref[slot_id];
268
269 if slot.iocb.aio_nbytes == 0 && (*info).res == 0 {
271 exit_received = true;
272 let _ = free_sender.send(slot_id as u16);
274 continue;
275 }
276
277 Self::verify_result(&ctx, slot, info);
278 let _ = free_sender.send(slot_id as u16);
279 }
280
281 if exit_received && ctx.free_slots_count.load(Ordering::SeqCst) == ctx.depth {
282 info!("io_poll worker exit gracefully");
283 break;
284 }
285 }
286 info!("io_poll worker exit cleaning up");
287 let _ = io_destroy(aio_context);
288 }
289
290 #[inline(always)]
291 fn verify_result(ctx: &CtxShared<C, Q, W>, slot: &mut AioSlot<C>, info: &io_event) {
292 if info.res < 0 {
293 println!("set error {:?}", info.res);
294 slot.set_error((-info.res) as i32, &ctx.cb_workers);
295 return;
296 }
297 slot.set_result(info.res as usize, &ctx.cb_workers);
298 }
299}
300
301use io_engine_aio_bindings::{
303 __NR_io_destroy, __NR_io_getevents, __NR_io_setup, __NR_io_submit, aio_context_t, io_event,
304 iocb, syscall, timespec,
305};
306use libc::c_long;
307
308#[inline(always)]
317fn io_setup(nr: c_long, ctxp: *mut aio_context_t) -> c_long {
318 unsafe { syscall(__NR_io_setup as c_long, nr, ctxp) }
319}
320
321#[inline(always)]
325fn io_destroy(ctx: aio_context_t) -> c_long {
326 unsafe { syscall(__NR_io_destroy as c_long, ctx) }
327}
328
329#[inline(always)]
333fn io_submit(ctx: aio_context_t, nr: c_long, iocbpp: *mut *mut iocb) -> c_long {
334 unsafe { syscall(__NR_io_submit as c_long, ctx, nr, iocbpp) }
335}
336
337#[inline(always)]
341fn io_getevents(
342 ctx: aio_context_t, min_nr: c_long, max_nr: c_long, events: *mut io_event,
343 timeout: *mut timespec,
344) -> c_long {
345 unsafe { syscall(__NR_io_getevents as c_long, ctx, min_nr, max_nr, events, timeout) }
346}