io_engine/driver/
aio.rs

1use crate::callback_worker::Worker;
2
3use crate::context::CtxShared;
4use crate::tasks::{IOAction, IOCallback, IOEvent};
5use crossfire::{BlockingRxTrait, Rx, Tx, spsc};
6use nix::errno::Errno;
7use std::collections::VecDeque;
8use std::fs::File;
9use std::{
10    cell::UnsafeCell,
11    io,
12    mem::transmute,
13    os::fd::AsRawFd,
14    sync::{Arc, atomic::Ordering},
15    thread,
16    time::Duration,
17};
18
19pub struct AioSlot<C: IOCallback> {
20    pub(crate) iocb: iocb,
21    pub(crate) event: Option<IOEvent<C>>,
22}
23
24impl<C: IOCallback> AioSlot<C> {
25    pub fn new(slot_id: u64) -> Self {
26        Self { iocb: iocb { aio_data: slot_id, aio_reqprio: 1, ..Default::default() }, event: None }
27    }
28
29    #[inline(always)]
30    pub fn fill_slot(&mut self, event: IOEvent<C>, slot_id: u16) {
31        let iocb = &mut self.iocb;
32        iocb.aio_data = slot_id as libc::__u64;
33        iocb.aio_fildes = event.fd as libc::__u32;
34
35        if let Some(buf) = event.buf.as_ref() {
36            iocb.aio_lio_opcode = event.action as u16;
37
38            if event.res > 0 {
39                let progress = event.res as u64;
40                iocb.aio_buf = (buf.get_raw() as u64) + progress;
41                iocb.aio_nbytes = (buf.len() as u64) - progress;
42                iocb.aio_offset = event.offset + (progress as i64);
43            } else {
44                iocb.aio_buf = buf.get_raw() as u64;
45                iocb.aio_nbytes = buf.len() as u64;
46                iocb.aio_offset = event.offset;
47            }
48        } else {
49            // Zero-length read for exit signal
50            iocb.aio_lio_opcode = IOAction::Read as u16;
51            iocb.aio_buf = 0;
52            iocb.aio_nbytes = 0;
53            iocb.aio_offset = event.offset;
54        }
55        self.event.replace(event);
56    }
57
58    #[inline(always)]
59    pub fn set_result<W: Worker<C>>(&mut self, written: usize, cb: &W) {
60        if let Some(mut event) = self.event.take() {
61            // If it was a zero-length read (exit signal), callback is usually None, so this is safe.
62            event.set_copied(written);
63            cb.done(event);
64        }
65    }
66
67    #[inline(always)]
68    pub fn set_error<W: Worker<C>>(&mut self, errno: i32, cb: &W) {
69        if let Some(mut event) = self.event.take() {
70            event.set_error(errno);
71            cb.done(event);
72        }
73    }
74}
75
76struct AioInner<C: IOCallback> {
77    context: aio_context_t,
78    slots: UnsafeCell<Vec<AioSlot<C>>>,
79    null_file: File, // Moved here
80}
81
82unsafe impl<C: IOCallback> Send for AioInner<C> {}
83unsafe impl<C: IOCallback> Sync for AioInner<C> {}
84
85pub struct AioDriver<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>>, W: Worker<C>> {
86    _marker: std::marker::PhantomData<(C, Q, W)>,
87}
88
89impl<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>> + Send + 'static, W: Worker<C> + Send + 'static>
90    AioDriver<C, Q, W>
91{
92    pub fn start(ctx: Arc<CtxShared<C, Q, W>>) -> io::Result<()> {
93        let depth = ctx.depth;
94        let mut aio_context: aio_context_t = 0;
95        if io_setup(depth as c_long, &mut aio_context) != 0 {
96            return Err(io::Error::last_os_error());
97        }
98
99        let mut slots = Vec::with_capacity(depth);
100        for slot_id in 0..depth {
101            slots.push(AioSlot::new(slot_id as u64));
102        }
103
104        let null_file = File::open("/dev/null")?;
105
106        let inner =
107            Arc::new(AioInner { context: aio_context, slots: UnsafeCell::new(slots), null_file });
108
109        let (s_free, r_free) = spsc::bounded_blocking::<u16>(depth);
110        for i in 0..depth {
111            let _ = s_free.send(i as u16);
112        }
113
114        let ctx_submit = ctx.clone();
115        let inner_submit = inner.clone();
116        thread::spawn(move || Self::submit_loop(ctx_submit, inner_submit, r_free));
117
118        let ctx_poll = ctx.clone();
119        let inner_poll = inner.clone();
120        thread::spawn(move || Self::poll_loop(ctx_poll, inner_poll, s_free));
121
122        Ok(())
123    }
124
125    fn submit_loop(
126        ctx: Arc<CtxShared<C, Q, W>>, inner: Arc<AioInner<C>>, free_recv: Rx<spsc::Array<u16>>,
127    ) {
128        let depth = ctx.depth;
129        let mut iocbs = Vec::<*mut iocb>::with_capacity(depth);
130        let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
131        let aio_context = inner.context;
132        let mut events_to_process = VecDeque::with_capacity(depth);
133
134        loop {
135            // 1. Fetch events
136            // Only block if we have no events pending.
137            if events_to_process.is_empty() {
138                match ctx.queue.recv() {
139                    Ok(event) => events_to_process.push_back(event),
140                    Err(_) => {
141                        // Queue closed. Time to exit.
142                        // We need a free slot to submit the exit signal.
143                        // We block to get one because we must signal exit to the poller.
144                        let slot_id = free_recv.recv().unwrap();
145                        let exit_event = IOEvent::new_exit_signal(inner.null_file.as_raw_fd());
146                        let slot = &mut slots_ref[slot_id as usize];
147                        slot.fill_slot(exit_event, slot_id);
148                        let mut iocb_ptr: *mut iocb = &mut slot.iocb as *mut _;
149
150                        let _ = ctx.free_slots_count.fetch_sub(1, Ordering::SeqCst);
151                        let res = io_submit(aio_context, 1, &mut iocb_ptr);
152                        if res != 1 {
153                            let _ = ctx.free_slots_count.fetch_add(1, Ordering::SeqCst);
154                            error!("Failed to submit exit signal: {}", res);
155                        }
156                        info!("io_submit worker exit due to queue closing");
157                        break;
158                    }
159                }
160            }
161
162            // Try to fetch more events up to depth
163            while events_to_process.len() < depth {
164                if let Ok(event) = ctx.queue.try_recv() {
165                    events_to_process.push_back(event);
166                } else {
167                    break;
168                }
169            }
170
171            // 2. Fill slots and prepare batch
172            // We need to move events from queue to slots.
173            let mut first = true;
174            while !events_to_process.is_empty() {
175                let slot_id_opt =
176                    if first { Some(free_recv.recv().unwrap()) } else { free_recv.try_recv().ok() };
177
178                if let Some(slot_id) = slot_id_opt {
179                    first = false;
180                    let event = events_to_process.pop_front().unwrap();
181                    let slot = &mut slots_ref[slot_id as usize];
182                    slot.fill_slot(event, slot_id);
183                    iocbs.push(&mut slot.iocb as *mut iocb);
184                } else {
185                    // No more slots available right now
186                    break;
187                }
188            }
189
190            // 3. Submit batch
191            if !iocbs.is_empty() {
192                let mut done: libc::c_long = 0;
193                let mut left = iocbs.len();
194
195                // Reserve quota
196                let _ = ctx.free_slots_count.fetch_sub(left, Ordering::SeqCst);
197
198                'submit: loop {
199                    let result = unsafe {
200                        let arr = iocbs.as_mut_ptr().add(done as usize);
201                        io_submit(aio_context, left as libc::c_long, arr)
202                    };
203
204                    if result < 0 {
205                        // All remaining failed
206                        let _ = ctx.free_slots_count.fetch_add(left, Ordering::SeqCst);
207                        if -result == Errno::EINTR as i64 {
208                            continue 'submit;
209                        }
210                        error!("io_submit error: {}", result);
211                        break 'submit;
212                    } else {
213                        // Success (partial or full)
214                        if result == left as libc::c_long {
215                            trace!("io submit {} events", result);
216                            break 'submit;
217                        } else {
218                            let _ = ctx
219                                .free_slots_count
220                                .fetch_add(left - result as usize, Ordering::SeqCst);
221                            done += result;
222                            left -= result as usize;
223                            trace!("io submit {}/{} events", result, left);
224                        }
225                    }
226                }
227                iocbs.clear();
228            }
229        }
230    }
231
232    fn poll_loop(
233        ctx: Arc<CtxShared<C, Q, W>>, inner: Arc<AioInner<C>>, free_sender: Tx<spsc::Array<u16>>,
234    ) {
235        let depth = ctx.depth;
236        let mut infos = Vec::<io_event>::with_capacity(depth);
237        let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
238        let aio_context = inner.context;
239        let mut exit_received = false;
240
241        loop {
242            infos.clear();
243            let result = io_getevents(
244                aio_context,
245                1,
246                depth as i64,
247                infos.as_mut_ptr(),
248                std::ptr::null_mut(),
249            );
250
251            if result < 0 {
252                if -result == Errno::EINTR as i64 {
253                    continue;
254                }
255                error!("io_getevents errno: {}", -result);
256                thread::sleep(Duration::from_millis(10));
257                continue;
258            }
259
260            assert!(result > 0);
261            let _ = ctx.free_slots_count.fetch_add(result as usize, Ordering::SeqCst);
262            unsafe {
263                infos.set_len(result as usize);
264            }
265            for ref info in &infos {
266                let slot_id = (*info).data as usize;
267                let slot = &mut slots_ref[slot_id];
268
269                // Check for exit signal: zero-length read on null_file
270                if slot.iocb.aio_nbytes == 0 && (*info).res == 0 {
271                    exit_received = true;
272                    // We also free this slot
273                    let _ = free_sender.send(slot_id as u16);
274                    continue;
275                }
276
277                Self::verify_result(&ctx, slot, info);
278                let _ = free_sender.send(slot_id as u16);
279            }
280
281            if exit_received && ctx.free_slots_count.load(Ordering::SeqCst) == ctx.depth {
282                info!("io_poll worker exit gracefully");
283                break;
284            }
285        }
286        info!("io_poll worker exit cleaning up");
287        let _ = io_destroy(aio_context);
288    }
289
290    #[inline(always)]
291    fn verify_result(ctx: &CtxShared<C, Q, W>, slot: &mut AioSlot<C>, info: &io_event) {
292        if info.res < 0 {
293            println!("set error {:?}", info.res);
294            slot.set_error((-info.res) as i32, &ctx.cb_workers);
295            return;
296        }
297        slot.set_result(info.res as usize, &ctx.cb_workers);
298    }
299}
300
301// Relevant symbols from the native bindings exposed via aio-bindings
302use io_engine_aio_bindings::{
303    __NR_io_destroy, __NR_io_getevents, __NR_io_setup, __NR_io_submit, aio_context_t, io_event,
304    iocb, syscall, timespec,
305};
306use libc::c_long;
307
308// -----------------------------------------------------------------------------------------------
309// Inline functions that wrap the kernel calls for the entry points corresponding to Linux
310// AIO functions
311// -----------------------------------------------------------------------------------------------
312
313// Initialize an AIO context for a given submission queue size within the kernel.
314//
315// See [io_setup(7)](http://man7.org/linux/man-pages/man2/io_setup.2.html) for details.
316#[inline(always)]
317fn io_setup(nr: c_long, ctxp: *mut aio_context_t) -> c_long {
318    unsafe { syscall(__NR_io_setup as c_long, nr, ctxp) }
319}
320
321// Destroy an AIO context.
322//
323// See [io_destroy(7)](http://man7.org/linux/man-pages/man2/io_destroy.2.html) for details.
324#[inline(always)]
325fn io_destroy(ctx: aio_context_t) -> c_long {
326    unsafe { syscall(__NR_io_destroy as c_long, ctx) }
327}
328
329// Submit a batch of IO operations.
330//
331// See [io_sumit(7)](http://man7.org/linux/man-pages/man2/io_submit.2.html) for details.
332#[inline(always)]
333fn io_submit(ctx: aio_context_t, nr: c_long, iocbpp: *mut *mut iocb) -> c_long {
334    unsafe { syscall(__NR_io_submit as c_long, ctx, nr, iocbpp) }
335}
336
337// Retrieve completion events for previously submitted IO requests.
338//
339// See [io_getevents(7)](http://man7.org/linux/man-pages/man2/io_getevents.2.html) for details.
340#[inline(always)]
341fn io_getevents(
342    ctx: aio_context_t, min_nr: c_long, max_nr: c_long, events: *mut io_event,
343    timeout: *mut timespec,
344) -> c_long {
345    unsafe { syscall(__NR_io_getevents as c_long, ctx, min_nr, max_nr, events, timeout) }
346}