1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
use std::{ io, mem }; use std::sync::atomic; use linux_io_uring_sys as sys; use crate::util::{ Mmap, Fd, AtomicU32Ref }; use crate::mmap_offset; pub struct CompletionQueue { _cq_mmap: Mmap, head: AtomicU32Ref, tail: *const atomic::AtomicU32, ring_mask: *const u32, #[allow(dead_code)] ring_entries: *const u32, overflow: *const atomic::AtomicU32, cqes: *const sys::io_uring_cqe } #[derive(Clone)] pub struct Entry(sys::io_uring_cqe); pub struct AvailableQueue<'a> { head: u32, tail: u32, ring_mask: u32, queue: &'a mut CompletionQueue } impl CompletionQueue { pub(crate) fn new(fd: &Fd, p: &sys::io_uring_params) -> io::Result<CompletionQueue> { let cq_mmap = Mmap::new( &fd, sys::IORING_OFF_CQ_RING as _, p.cq_off.cqes as usize + p.cq_entries as usize * mem::size_of::<sys::io_uring_cqe>() )?; mmap_offset!{ unsafe let head = cq_mmap + p.cq_off.head => *const u32; let tail = cq_mmap + p.cq_off.tail => *const atomic::AtomicU32; let ring_mask = cq_mmap + p.cq_off.ring_mask => *const u32; let ring_entries = cq_mmap + p.cq_off.ring_entries => *const u32; let overflow = cq_mmap + p.cq_off.overflow => *const atomic::AtomicU32; let cqes = cq_mmap + p.cq_off.cqes => *const sys::io_uring_cqe; } Ok(CompletionQueue { _cq_mmap: cq_mmap, head: unsafe { AtomicU32Ref::new(head) }, tail, ring_mask, ring_entries, overflow, cqes }) } pub fn overflow(&self) -> u32 { unsafe { (*self.overflow).load(atomic::Ordering::Acquire) } } pub fn available(&mut self) -> AvailableQueue<'_> { unsafe { AvailableQueue { head: self.head.unsync_load(), tail: (*self.tail).load(atomic::Ordering::Acquire), ring_mask: *self.ring_mask, queue: self } } } } impl ExactSizeIterator for AvailableQueue<'_> { fn len(&self) -> usize { self.tail.wrapping_sub(self.head) as usize } } impl Iterator for AvailableQueue<'_> { type Item = Entry; fn next(&mut self) -> Option<Self::Item> { if self.head != self.tail { unsafe { let entry = self.queue.cqes.add((self.head & self.ring_mask) as usize); self.head = self.head.wrapping_add(1); Some(Entry(*entry)) } } else { None } } } impl Drop for AvailableQueue<'_> { fn drop(&mut self) { self.queue.head.store(self.head, atomic::Ordering::Release); } } impl Entry { pub fn result(&self) -> i32 { self.0.res } pub fn user_data(&self) -> u64 { self.0.user_data } }