io_engine/driver/
aio.rs

1use crate::callback_worker::IOWorkers;
2
3use crate::context::IoCtxShared;
4use crate::tasks::{IOAction, IOEvent, IoCallback};
5use crossfire::{BlockingRxTrait, Rx, Tx, spsc};
6use nix::errno::Errno;
7use std::collections::VecDeque;
8use std::fs::File;
9use std::{
10    cell::UnsafeCell,
11    io,
12    mem::transmute,
13    os::fd::AsRawFd,
14    sync::{Arc, atomic::Ordering},
15    thread,
16    time::Duration,
17};
18
19pub struct AioSlot<C: IoCallback> {
20    pub(crate) iocb: iocb,
21    pub(crate) event: Option<Box<IOEvent<C>>>,
22}
23
24impl<C: IoCallback> AioSlot<C> {
25    pub fn new(slot_id: u64) -> Self {
26        Self { iocb: iocb { aio_data: slot_id, aio_reqprio: 1, ..Default::default() }, event: None }
27    }
28
29    #[inline(always)]
30    pub fn fill_slot(&mut self, event: Box<IOEvent<C>>, slot_id: u16) {
31        let iocb = &mut self.iocb;
32        iocb.aio_data = slot_id as libc::__u64;
33        iocb.aio_fildes = event.fd as libc::__u32;
34        if let Some(buf) = event.buf.as_ref() {
35            iocb.aio_lio_opcode = event.action as u16;
36            iocb.aio_buf = buf.get_raw() as u64;
37            iocb.aio_nbytes = buf.len() as u64;
38        } else {
39            // Zero-length read for exit signal
40            iocb.aio_lio_opcode = IOAction::Read as u16;
41            iocb.aio_buf = 0;
42            iocb.aio_nbytes = 0;
43        }
44        iocb.aio_offset = event.offset;
45        self.event.replace(event);
46    }
47
48    #[inline(always)]
49    pub fn set_result(&mut self, written: usize, cb: &IOWorkers<C>) {
50        if let Some(mut event) = self.event.take() {
51            // If it was a zero-length read (exit signal), callback is usually None, so this is safe.
52            event.set_copied(written);
53            cb.send(event);
54        }
55    }
56
57    #[inline(always)]
58    pub fn set_error(&mut self, errno: i32, cb: &IOWorkers<C>) {
59        if let Some(mut event) = self.event.take() {
60            event.set_error(errno);
61            cb.send(event);
62        }
63    }
64}
65
66struct AioInner<C: IoCallback> {
67    context: aio_context_t,
68    slots: UnsafeCell<Vec<AioSlot<C>>>,
69    null_file: File, // Moved here
70}
71
72unsafe impl<C: IoCallback> Send for AioInner<C> {}
73unsafe impl<C: IoCallback> Sync for AioInner<C> {}
74
75pub struct AioDriver<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>>> {
76    _marker: std::marker::PhantomData<(C, Q)>,
77}
78
79impl<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>> + Send + 'static> AioDriver<C, Q> {
80    pub fn start(ctx: Arc<IoCtxShared<C, Q>>) -> io::Result<()> {
81        let depth = ctx.depth;
82        let mut aio_context: aio_context_t = 0;
83        if io_setup(depth as c_long, &mut aio_context) != 0 {
84            return Err(io::Error::last_os_error());
85        }
86
87        let mut slots = Vec::with_capacity(depth);
88        for slot_id in 0..depth {
89            slots.push(AioSlot::new(slot_id as u64));
90        }
91
92        let null_file = File::open("/dev/null")?;
93
94        let inner =
95            Arc::new(AioInner { context: aio_context, slots: UnsafeCell::new(slots), null_file });
96
97        let (s_free, r_free) = spsc::bounded_blocking::<u16>(depth);
98        for i in 0..depth {
99            let _ = s_free.send(i as u16);
100        }
101
102        let ctx_submit = ctx.clone();
103        let inner_submit = inner.clone();
104        thread::spawn(move || Self::submit_loop(ctx_submit, inner_submit, r_free));
105
106        let ctx_poll = ctx.clone();
107        let inner_poll = inner.clone();
108        thread::spawn(move || Self::poll_loop(ctx_poll, inner_poll, s_free));
109
110        Ok(())
111    }
112
113    fn submit_loop(
114        ctx: Arc<IoCtxShared<C, Q>>, inner: Arc<AioInner<C>>, free_recv: Rx<spsc::Array<u16>>,
115    ) {
116        let depth = ctx.depth;
117        let mut iocbs = Vec::<*mut iocb>::with_capacity(depth);
118        let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
119        let aio_context = inner.context;
120        let mut events_to_process = VecDeque::with_capacity(depth);
121
122        loop {
123            // 1. Fetch events
124            // Only block if we have no events pending.
125            if events_to_process.is_empty() {
126                match ctx.queue.recv() {
127                    Ok(event) => events_to_process.push_back(event),
128                    Err(_) => {
129                        // Queue closed. Time to exit.
130                        // We need a free slot to submit the exit signal.
131                        // We block to get one because we must signal exit to the poller.
132                        let slot_id = free_recv.recv().unwrap();
133                        let exit_event = IOEvent::new_exit_signal(inner.null_file.as_raw_fd());
134                        let slot = &mut slots_ref[slot_id as usize];
135                        slot.fill_slot(exit_event, slot_id);
136                        let mut iocb_ptr: *mut iocb = &mut slot.iocb as *mut _;
137
138                        let _ = ctx.free_slots_count.fetch_sub(1, Ordering::SeqCst);
139                        let res = io_submit(aio_context, 1, &mut iocb_ptr);
140                        if res != 1 {
141                            let _ = ctx.free_slots_count.fetch_add(1, Ordering::SeqCst);
142                            error!("Failed to submit exit signal: {}", res);
143                        }
144                        info!("io_submit worker exit due to queue closing");
145                        break;
146                    }
147                }
148            }
149
150            // Try to fetch more events up to depth
151            while events_to_process.len() < depth {
152                if let Ok(event) = ctx.queue.try_recv() {
153                    events_to_process.push_back(event);
154                } else {
155                    break;
156                }
157            }
158
159            // 2. Fill slots and prepare batch
160            // We need to move events from queue to slots.
161            let mut first = true;
162            while !events_to_process.is_empty() {
163                let slot_id_opt =
164                    if first { Some(free_recv.recv().unwrap()) } else { free_recv.try_recv().ok() };
165
166                if let Some(slot_id) = slot_id_opt {
167                    first = false;
168                    let event = events_to_process.pop_front().unwrap();
169                    let slot = &mut slots_ref[slot_id as usize];
170                    slot.fill_slot(event, slot_id);
171                    iocbs.push(&mut slot.iocb as *mut iocb);
172                } else {
173                    // No more slots available right now
174                    break;
175                }
176            }
177
178            // 3. Submit batch
179            if !iocbs.is_empty() {
180                let mut done: libc::c_long = 0;
181                let mut left = iocbs.len();
182
183                // Reserve quota
184                let _ = ctx.free_slots_count.fetch_sub(left, Ordering::SeqCst);
185
186                'submit: loop {
187                    let result = unsafe {
188                        let arr = iocbs.as_mut_ptr().add(done as usize);
189                        io_submit(aio_context, left as libc::c_long, arr)
190                    };
191
192                    if result < 0 {
193                        // All remaining failed
194                        let _ = ctx.free_slots_count.fetch_add(left, Ordering::SeqCst);
195                        if -result == Errno::EINTR as i64 {
196                            continue 'submit;
197                        }
198                        error!("io_submit error: {}", result);
199                        break 'submit;
200                    } else {
201                        // Success (partial or full)
202                        if result == left as libc::c_long {
203                            trace!("io submit {} events", result);
204                            break 'submit;
205                        } else {
206                            let _ = ctx
207                                .free_slots_count
208                                .fetch_add(left - result as usize, Ordering::SeqCst);
209                            done += result;
210                            left -= result as usize;
211                            trace!("io submit {}/{} events", result, left);
212                        }
213                    }
214                }
215                iocbs.clear();
216            }
217        }
218    }
219
220    fn poll_loop(
221        ctx: Arc<IoCtxShared<C, Q>>, inner: Arc<AioInner<C>>, free_sender: Tx<spsc::Array<u16>>,
222    ) {
223        let depth = ctx.depth;
224        let mut infos = Vec::<io_event>::with_capacity(depth);
225        let slots_ref: &mut Vec<AioSlot<C>> = unsafe { transmute(inner.slots.get()) };
226        let aio_context = inner.context;
227        let mut exit_received = false;
228
229        loop {
230            infos.clear();
231            let result = io_getevents(
232                aio_context,
233                1,
234                depth as i64,
235                infos.as_mut_ptr(),
236                std::ptr::null_mut(),
237            );
238
239            if result < 0 {
240                if -result == Errno::EINTR as i64 {
241                    continue;
242                }
243                error!("io_getevents errno: {}", -result);
244                thread::sleep(Duration::from_millis(10));
245                continue;
246            }
247
248            assert!(result > 0);
249            let _ = ctx.free_slots_count.fetch_add(result as usize, Ordering::SeqCst);
250            unsafe {
251                infos.set_len(result as usize);
252            }
253            for ref info in &infos {
254                let slot_id = (*info).data as usize;
255                let slot = &mut slots_ref[slot_id];
256
257                // Check for exit signal: zero-length read on null_file
258                if slot.iocb.aio_nbytes == 0 && (*info).res == 0 {
259                    exit_received = true;
260                    // We also free this slot
261                    let _ = free_sender.send(slot_id as u16);
262                    continue;
263                }
264
265                Self::verify_result(&ctx, slot, info);
266                let _ = free_sender.send(slot_id as u16);
267            }
268
269            if exit_received && ctx.free_slots_count.load(Ordering::SeqCst) == ctx.depth {
270                info!("io_poll worker exit gracefully");
271                break;
272            }
273        }
274        info!("io_poll worker exit cleaning up");
275        let _ = io_destroy(aio_context);
276    }
277
278    #[inline(always)]
279    fn verify_result(ctx: &IoCtxShared<C, Q>, slot: &mut AioSlot<C>, info: &io_event) {
280        if info.res < 0 {
281            println!("set error {:?}", info.res);
282            slot.set_error((-info.res) as i32, &ctx.cb_workers);
283            return;
284        }
285        slot.set_result(info.res as usize, &ctx.cb_workers);
286    }
287}
288
289// Relevant symbols from the native bindings exposed via aio-bindings
290use io_engine_aio_bindings::{
291    __NR_io_destroy, __NR_io_getevents, __NR_io_setup, __NR_io_submit, aio_context_t, io_event,
292    iocb, syscall, timespec,
293};
294use libc::c_long;
295
296// -----------------------------------------------------------------------------------------------
297// Inline functions that wrap the kernel calls for the entry points corresponding to Linux
298// AIO functions
299// -----------------------------------------------------------------------------------------------
300
301// Initialize an AIO context for a given submission queue size within the kernel.
302//
303// See [io_setup(7)](http://man7.org/linux/man-pages/man2/io_setup.2.html) for details.
304#[inline(always)]
305fn io_setup(nr: c_long, ctxp: *mut aio_context_t) -> c_long {
306    unsafe { syscall(__NR_io_setup as c_long, nr, ctxp) }
307}
308
309// Destroy an AIO context.
310//
311// See [io_destroy(7)](http://man7.org/linux/man-pages/man2/io_destroy.2.html) for details.
312#[inline(always)]
313fn io_destroy(ctx: aio_context_t) -> c_long {
314    unsafe { syscall(__NR_io_destroy as c_long, ctx) }
315}
316
317// Submit a batch of IO operations.
318//
319// See [io_sumit(7)](http://man7.org/linux/man-pages/man2/io_submit.2.html) for details.
320#[inline(always)]
321fn io_submit(ctx: aio_context_t, nr: c_long, iocbpp: *mut *mut iocb) -> c_long {
322    unsafe { syscall(__NR_io_submit as c_long, ctx, nr, iocbpp) }
323}
324
325// Retrieve completion events for previously submitted IO requests.
326//
327// See [io_getevents(7)](http://man7.org/linux/man-pages/man2/io_getevents.2.html) for details.
328#[inline(always)]
329fn io_getevents(
330    ctx: aio_context_t, min_nr: c_long, max_nr: c_long, events: *mut io_event,
331    timeout: *mut timespec,
332) -> c_long {
333    unsafe { syscall(__NR_io_getevents as c_long, ctx, min_nr, max_nr, events, timeout) }
334}