1use std::io;
2use std::marker::PhantomData;
3use std::mem::MaybeUninit;
4use std::ptr::{self, NonNull};
5
6use super::{IoUring, resultify};
7
8pub struct CQE {
10 user_data: u64,
11 res: i32,
12 flags: CompletionFlags,
13}
14
15impl CQE {
16 pub fn from_raw(cqe: uring_sys::io_uring_cqe) -> CQE {
17 CQE {
18 user_data: cqe.user_data,
19 res: cqe.res,
20 flags: CompletionFlags::from_bits_truncate(cqe.flags),
21 }
22 }
23
24 pub fn from_raw_parts(user_data: u64, res: i32, flags: CompletionFlags) -> CQE {
25 CQE {
26 user_data, res, flags,
27 }
28 }
29
30 pub(crate) fn new(ring: NonNull<uring_sys::io_uring>, cqe: &mut uring_sys::io_uring_cqe) -> CQE {
31 let user_data = cqe.user_data;
32 let res = cqe.res;
33 let flags = CompletionFlags::from_bits_truncate(cqe.flags);
34
35 unsafe {
36 uring_sys::io_uring_cqe_seen(ring.as_ptr(), cqe);
37 }
38
39 CQE::from_raw_parts(user_data, res, flags)
40 }
41
42 pub fn user_data(&self) -> u64 {
43 self.user_data as u64
44 }
45
46 pub fn result(&self) -> io::Result<u32> {
47 resultify(self.res)
48 }
49
50 pub fn flags(&self) -> CompletionFlags {
51 self.flags
52 }
53
54 pub fn raw_result(&self) -> i32 {
55 self.res
56 }
57
58 pub fn raw_flags(&self) -> u32 {
59 self.flags.bits()
60 }
61}
62
63unsafe impl Send for CQE { }
64unsafe impl Sync for CQE { }
65
66pub struct CQEs<'a> {
70 ring: NonNull<uring_sys::io_uring>,
71 ready: u32,
72 marker: PhantomData<&'a mut IoUring>,
73}
74
75impl<'a> CQEs<'a> {
76 pub(crate) fn new(ring: NonNull<uring_sys::io_uring>) -> CQEs<'a> {
77 CQEs { ring, ready: 0, marker: PhantomData }
78 }
79
80 #[inline(always)]
81 fn ready(&self) -> u32 {
82 unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) }
83 }
84
85 #[inline(always)]
86 fn peek_for_cqe(&mut self) -> Option<CQE> {
87 unsafe {
88 let mut cqe = MaybeUninit::uninit();
89 uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr());
90 let cqe = cqe.assume_init();
91 if cqe != ptr::null_mut() {
92 Some(CQE::new(self.ring, &mut *cqe))
93 } else {
94 None
95 }
96 }
97 }
98}
99
100impl Iterator for CQEs<'_> {
101 type Item = CQE;
102
103 fn next(&mut self) -> Option<Self::Item> {
104 if self.ready == 0 {
105 self.ready = self.ready();
106 if self.ready == 0 {
107 return None;
108 }
109 }
110
111 self.ready -= 1;
112 self.peek_for_cqe()
113 }
114}
115
116
117pub struct CQEsBlocking<'a> {
122 ring: NonNull<uring_sys::io_uring>,
123 ready: u32,
124 wait_for: u32,
125 marker: PhantomData<&'a mut IoUring>,
126}
127
128impl<'a> CQEsBlocking<'a> {
129 pub(crate) fn new(ring: NonNull<uring_sys::io_uring>, wait_for: u32) -> CQEsBlocking<'a> {
130 CQEsBlocking { ring, ready: 0, wait_for, marker: PhantomData }
131 }
132
133 #[inline(always)]
134 fn ready(&self) -> u32 {
135 unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) }
136 }
137
138 #[inline(always)]
139 fn peek_for_cqe(&mut self) -> Option<CQE> {
140 unsafe {
141 let mut cqe = MaybeUninit::uninit();
142 uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr());
143 let cqe = cqe.assume_init();
144 if cqe != ptr::null_mut() {
145 Some(CQE::new(self.ring, &mut *cqe))
146 } else {
147 None
148 }
149 }
150 }
151
152 #[inline(always)]
153 fn wait(&mut self) -> io::Result<&mut uring_sys::io_uring_cqe> {
154 unsafe {
155 let mut cqe = MaybeUninit::uninit();
156
157 resultify(uring_sys::io_uring_wait_cqes(
158 self.ring.as_ptr(),
159 cqe.as_mut_ptr(),
160 self.wait_for as _,
161 ptr::null(),
162 ptr::null(),
163 ))?;
164
165 Ok(&mut *cqe.assume_init())
166 }
167 }
168}
169
170impl Iterator for CQEsBlocking<'_> {
171 type Item = io::Result<CQE>;
172
173 fn next(&mut self) -> Option<Self::Item> {
174 if self.ready == 0 {
175 self.ready = self.ready();
176 if self.ready == 0 {
177 let ring = self.ring;
178 return Some(self.wait().map(|cqe| CQE::new(ring, cqe)))
179 }
180 }
181
182 self.ready -= 1;
183 self.peek_for_cqe().map(Ok)
184 }
185}
186
187bitflags::bitflags! {
188 pub struct CompletionFlags: u32 {
190 const BUFFER_SHIFT = 1 << 0;
191 }
192}