1use std::sync::atomic;
4use linux_io_uring_sys as sys;
5use crate::util::{ Mmap, unsync_load };
6use crate::mmap_offset;
7
8
9pub struct CompletionQueue {
10 pub(crate) head: *const atomic::AtomicU32,
11 pub(crate) tail: *const atomic::AtomicU32,
12 pub(crate) ring_mask: *const u32,
13 pub(crate) ring_entries: *const u32,
14
15 overflow: *const atomic::AtomicU32,
16
17 pub(crate) cqes: *const sys::io_uring_cqe
18}
19
20#[repr(transparent)]
22#[derive(Clone)]
23pub struct Entry(pub(crate) sys::io_uring_cqe);
24
25pub struct AvailableQueue<'a> {
26 head: u32,
27 tail: u32,
28 ring_mask: u32,
29 ring_entries: u32,
30
31 queue: &'a mut CompletionQueue
32}
33
34impl CompletionQueue {
35 pub(crate) unsafe fn new(cq_mmap: &Mmap, p: &sys::io_uring_params) -> CompletionQueue {
36 mmap_offset!{
37 let head = cq_mmap + p.cq_off.head => *const atomic::AtomicU32;
38 let tail = cq_mmap + p.cq_off.tail => *const atomic::AtomicU32;
39 let ring_mask = cq_mmap + p.cq_off.ring_mask => *const u32;
40 let ring_entries = cq_mmap + p.cq_off.ring_entries => *const u32;
41 let overflow = cq_mmap + p.cq_off.overflow => *const atomic::AtomicU32;
42 let cqes = cq_mmap + p.cq_off.cqes => *const sys::io_uring_cqe;
43 }
44
45 CompletionQueue {
46 head, tail,
47 ring_mask, ring_entries,
48 overflow,
49 cqes
50 }
51 }
52
53 pub fn overflow(&self) -> u32 {
56 unsafe {
57 (*self.overflow).load(atomic::Ordering::Acquire)
58 }
59 }
60
61 pub fn capacity(&self) -> usize {
62 unsafe {
63 self.ring_entries.read_volatile() as usize
64 }
65 }
66
67 pub fn len(&self) -> usize {
68 let head = unsafe { unsync_load(self.head) };
69 let tail = unsafe { (*self.tail).load(atomic::Ordering::Acquire) };
70
71 tail.wrapping_sub(head) as usize
72 }
73
74 pub fn is_empty(&self) -> bool {
75 let head = unsafe { unsync_load(self.head) };
76 let tail = unsafe { (*self.tail).load(atomic::Ordering::Acquire) };
77
78 head == tail
79 }
80
81 pub fn is_full(&self) -> bool {
82 self.len() == self.capacity()
83 }
84
85 pub fn available(&mut self) -> AvailableQueue<'_> {
87 unsafe {
88 AvailableQueue {
89 head: unsync_load(self.head),
90 tail: (*self.tail).load(atomic::Ordering::Acquire),
91 ring_mask: self.ring_mask.read_volatile(),
92 ring_entries: self.ring_entries.read_volatile(),
93 queue: self
94 }
95 }
96 }
97}
98
99impl AvailableQueue<'_> {
100 pub fn sync(&mut self) {
102 unsafe {
103 (*self.queue.head).store(self.head, atomic::Ordering::Release);
104 self.tail = (*self.queue.tail).load(atomic::Ordering::Acquire);
105 }
106 }
107
108 pub fn capacity(&self) -> usize {
109 self.ring_entries as usize
110 }
111
112 pub fn is_full(&self) -> bool {
113 self.len() == self.capacity()
114 }
115}
116
117impl ExactSizeIterator for AvailableQueue<'_> {
118 fn len(&self) -> usize {
119 self.tail.wrapping_sub(self.head) as usize
120 }
121}
122
123impl<'a> Iterator for AvailableQueue<'a> {
124 type Item = Entry;
125
126 fn next(&mut self) -> Option<Self::Item> {
127 if self.head != self.tail {
128 unsafe {
129 let entry = self.queue.cqes.add((self.head & self.ring_mask) as usize);
130 self.head = self.head.wrapping_add(1);
131 Some(Entry(*entry))
132 }
133 } else {
134 None
135 }
136 }
137}
138
139impl Drop for AvailableQueue<'_> {
140 fn drop(&mut self) {
141 unsafe {
142 (*self.queue.head).store(self.head, atomic::Ordering::Release);
143 }
144 }
145}
146
147impl Entry {
148 pub fn result(&self) -> i32 {
150 self.0.res
151 }
152
153 pub fn user_data(&self) -> u64 {
157 self.0.user_data
158 }
159}