Skip to main content

iou/
cqe.rs

1use std::io;
2use std::marker::PhantomData;
3use std::mem::MaybeUninit;
4use std::ptr::{self, NonNull};
5
6use super::{IoUring, resultify};
7
8/// A completed IO event.
9pub struct CQE {
10    user_data: u64,
11    res: i32,
12    flags: CompletionFlags,
13}
14
15impl CQE {
16    pub fn from_raw(cqe: uring_sys::io_uring_cqe) -> CQE {
17        CQE {
18            user_data: cqe.user_data,
19            res: cqe.res,
20            flags: CompletionFlags::from_bits_truncate(cqe.flags),
21        }
22    }
23
24    pub fn from_raw_parts(user_data: u64, res: i32, flags: CompletionFlags) -> CQE {
25        CQE {
26            user_data, res, flags,
27        }
28    }
29
30    pub(crate) fn new(ring: NonNull<uring_sys::io_uring>, cqe: &mut uring_sys::io_uring_cqe) -> CQE {
31        let user_data = cqe.user_data;
32        let res = cqe.res;
33        let flags = CompletionFlags::from_bits_truncate(cqe.flags);
34
35        unsafe {
36            uring_sys::io_uring_cqe_seen(ring.as_ptr(), cqe);
37        }
38
39        CQE::from_raw_parts(user_data, res, flags)
40    }
41
42    pub fn user_data(&self) -> u64 {
43        self.user_data as u64
44    }
45
46    pub fn result(&self) -> io::Result<u32> {
47        resultify(self.res)
48    }
49
50    pub fn flags(&self) -> CompletionFlags {
51        self.flags
52    }
53
54    pub fn raw_result(&self) -> i32 {
55        self.res
56    }
57
58    pub fn raw_flags(&self) -> u32 {
59        self.flags.bits()
60    }
61}
62
63unsafe impl Send for CQE { }
64unsafe impl Sync for CQE { }
65
66/// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue).
67///
68/// This iterator will be exhausted when there are no `CQE`s ready, and return `None`.
69pub struct CQEs<'a> {
70    ring: NonNull<uring_sys::io_uring>,
71    ready: u32,
72    marker: PhantomData<&'a mut IoUring>,
73}
74
75impl<'a> CQEs<'a> {
76    pub(crate) fn new(ring: NonNull<uring_sys::io_uring>) -> CQEs<'a> {
77        CQEs { ring, ready: 0, marker: PhantomData }
78    }
79
80    #[inline(always)]
81    fn ready(&self) -> u32 {
82        unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) }
83    }
84
85    #[inline(always)]
86    fn peek_for_cqe(&mut self) -> Option<CQE> {
87        unsafe {
88            let mut cqe = MaybeUninit::uninit();
89            uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr());
90            let cqe = cqe.assume_init();
91            if cqe != ptr::null_mut() {
92                Some(CQE::new(self.ring, &mut *cqe))
93            } else {
94                None
95            }
96        }
97    }
98}
99
100impl Iterator for CQEs<'_> {
101    type Item = CQE;
102
103    fn next(&mut self) -> Option<Self::Item> {
104        if self.ready == 0 {
105            self.ready = self.ready();
106            if self.ready == 0 {
107                return None;
108            }
109        }
110
111        self.ready -= 1;
112        self.peek_for_cqe()
113    }
114}
115
116
117/// An iterator of [`CQE`]s from the [`CompletionQueue`](crate::CompletionQueue).
118///
119/// This iterator will never be exhausted; if there are no `CQE`s ready, it will block until there
120/// are.
121pub struct CQEsBlocking<'a> {
122    ring: NonNull<uring_sys::io_uring>,
123    ready: u32,
124    wait_for: u32,
125    marker: PhantomData<&'a mut IoUring>,
126}
127
128impl<'a> CQEsBlocking<'a> {
129    pub(crate) fn new(ring: NonNull<uring_sys::io_uring>, wait_for: u32) -> CQEsBlocking<'a> {
130        CQEsBlocking { ring, ready: 0, wait_for, marker: PhantomData }
131    }
132
133    #[inline(always)]
134    fn ready(&self) -> u32 {
135        unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) }
136    }
137
138    #[inline(always)]
139    fn peek_for_cqe(&mut self) -> Option<CQE> {
140        unsafe {
141            let mut cqe = MaybeUninit::uninit();
142            uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr());
143            let cqe = cqe.assume_init();
144            if cqe != ptr::null_mut() {
145                Some(CQE::new(self.ring, &mut *cqe))
146            } else {
147                None
148            }
149        }
150    }
151
152    #[inline(always)]
153    fn wait(&mut self) -> io::Result<&mut uring_sys::io_uring_cqe> {
154        unsafe {
155            let mut cqe = MaybeUninit::uninit();
156
157            resultify(uring_sys::io_uring_wait_cqes(
158                self.ring.as_ptr(),
159                cqe.as_mut_ptr(),
160                self.wait_for as _,
161                ptr::null(),
162                ptr::null(),
163            ))?;
164
165            Ok(&mut *cqe.assume_init())
166        }
167    }
168}
169
170impl Iterator for CQEsBlocking<'_> {
171    type Item = io::Result<CQE>;
172
173    fn next(&mut self) -> Option<Self::Item> {
174        if self.ready == 0 {
175            self.ready = self.ready();
176            if self.ready == 0 {
177                let ring = self.ring;
178                return Some(self.wait().map(|cqe| CQE::new(ring, cqe)))
179            }
180        }
181
182        self.ready -= 1;
183        self.peek_for_cqe().map(Ok)
184    }
185}
186
187bitflags::bitflags! {
188    /// Flags that can be returned from the kernel on [`CQE`]s.
189    pub struct CompletionFlags: u32 {
190        const BUFFER_SHIFT    = 1 << 0;
191    }
192}