1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr::{self, NonNull};
use super::{IoUring, CQE, CQEs, CQEsBlocking, resultify};
pub struct CompletionQueue<'ring> {
pub(crate) ring: NonNull<uring_sys::io_uring>,
_marker: PhantomData<&'ring mut IoUring>,
}
impl<'ring> CompletionQueue<'ring> {
pub(crate) fn new(ring: &'ring IoUring) -> CompletionQueue<'ring> {
CompletionQueue {
ring: NonNull::from(&ring.ring),
_marker: PhantomData,
}
}
pub fn peek_for_cqe(&mut self) -> Option<CQE> {
unsafe {
let mut cqe = MaybeUninit::uninit();
uring_sys::io_uring_peek_cqe(self.ring.as_ptr(), cqe.as_mut_ptr());
let cqe = cqe.assume_init();
if cqe != ptr::null_mut() {
Some(CQE::new(self.ring, &mut *cqe))
} else {
None
}
}
}
pub fn wait_for_cqe(&mut self) -> io::Result<CQE> {
self.wait_for_cqes(1)
}
#[inline(always)]
pub(crate) fn wait_for_cqes(&mut self, count: u32) -> io::Result<CQE> {
let ring = self.ring;
self.wait_inner(count).map(|cqe| CQE::new(ring, cqe))
}
pub fn wait(&mut self, count: u32) -> io::Result<()> {
self.wait_inner(count).map(|_| ())
}
#[inline(always)]
fn wait_inner(&mut self, count: u32) -> io::Result<&mut uring_sys::io_uring_cqe> {
unsafe {
let mut cqe = MaybeUninit::uninit();
resultify(uring_sys::io_uring_wait_cqes(
self.ring.as_ptr(),
cqe.as_mut_ptr(),
count as _,
ptr::null(),
ptr::null(),
))?;
Ok(&mut *cqe.assume_init())
}
}
pub fn cqes(&mut self) -> CQEs<'_> {
CQEs::new(self.ring)
}
pub fn cqes_blocking(&mut self, wait_for: u32) -> CQEsBlocking<'_> {
CQEsBlocking::new(self.ring, wait_for)
}
pub fn ready(&self) -> u32 {
unsafe { uring_sys::io_uring_cq_ready(self.ring.as_ptr()) }
}
pub fn eventfd_enabled(&self) -> bool {
unsafe { uring_sys::io_uring_cq_eventfd_enabled(self.ring.as_ptr()) }
}
pub fn eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> {
resultify(unsafe { uring_sys::io_uring_cq_eventfd_toggle(self.ring.as_ptr(), enabled) })?;
Ok(())
}
}
impl fmt::Debug for CompletionQueue<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let fd = unsafe { self.ring.as_ref().ring_fd };
f.debug_struct(std::any::type_name::<Self>()).field("fd", &fd).finish()
}
}
unsafe impl<'ring> Send for CompletionQueue<'ring> { }
unsafe impl<'ring> Sync for CompletionQueue<'ring> { }