1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
use {
crate::slab::FixedSlab,
io_uring::{
cqueue, squeue,
types::{SubmitArgs, Timespec},
IoUring,
},
smallvec::{smallvec, SmallVec},
std::{io, time::Duration},
};
/// An io_uring instance.
pub struct Ring<T, E: RingOp<T>> {
ring: IoUring,
entries: FixedSlab<E>,
context: T,
}
impl<T, E: RingOp<T>> Ring<T, E> {
/// Creates a new ring with the provided io_uring instance and context.
///
/// The context `T` is a user defined value that will be passed to entries `E` once they
/// complete. This value can be used to update state or perform additional actions as operations
/// complete asynchronously.
pub fn new(ring: IoUring, ctx: T) -> Self {
Self {
entries: FixedSlab::with_capacity(ring.params().cq_entries() as usize),
ring,
context: ctx,
}
}
/// Returns a reference to the context value.
pub fn context(&self) -> &T {
&self.context
}
/// Returns a mutable reference to the context value.
pub fn context_mut(&mut self) -> &mut T {
&mut self.context
}
/// Registers in-memory fixed buffers for I/O with the kernel.
///
/// # Safety
///
/// Callers must ensure that the iov_base and iov_len values are valid and will be valid until
/// buffers are unregistered or the ring destroyed, otherwise undefined behaviour may occur.
///
/// See
/// [Submitter::register_buffers](https://docs.rs/io-uring/0.6.3/io_uring/struct.Submitter.html#method.register_buffers).
pub unsafe fn register_buffers(&self, iovecs: &[libc::iovec]) -> io::Result<()> {
self.ring.submitter().register_buffers(iovecs)
}
/// Pushes an operation to the submission queue.
///
/// Once completed, [RingOp::complete] will be called with the result.
///
/// Note that the operation is not submitted to the kernel until [Ring::submit] is called. If
/// the submission queue is full, submit will be called internally to make room for the new
/// operation.
///
/// See also [Ring::submit].
pub fn push(&mut self, op: E) -> io::Result<()> {
loop {
self.process_completions()?;
if !self.entries.is_full() {
break;
}
// if the entries slab is full, we need to submit and poll
// completions to make room
self.submit_and_wait(1, None)?;
}
let key = self.entries.insert(op);
let entry = self.entries.get_mut(key).unwrap().entry();
let entry = entry.user_data(key as u64);
// Safety: the entry is stored in self.entries and guaranteed to be valid for the lifetime
// of the operation. E implementations must still ensure that the entry
// remains valid until the last E::complete call.
while unsafe { self.ring.submission().push(&entry) }.is_err() {
self.submit()?;
self.process_completions()?;
}
Ok(())
}
/// Submits all pending operations to the kernel.
///
/// If the ring can't accept any more submissions because the completion
/// queue is full, this will process completions and retry until the
/// submissions are accepted.
///
/// See also [Ring::process_completions].
pub fn submit(&mut self) -> io::Result<()> {
self.submit_and_wait(0, None).map(|_| ())
}
/// Submits all pending operations to the kernel and waits for completions.
///
/// If no `timeout` is passed this will block until `want` completions are available. If a
/// timeout is passed, this will block until `want` completions are available or the timeout is
/// reached.
///
/// Returns the number of completions received.
pub fn submit_and_wait(&mut self, want: usize, timeout: Option<Duration>) -> io::Result<usize> {
let mut args = SubmitArgs::new();
let ts;
if let Some(timeout) = timeout {
ts = Timespec::from(timeout);
args = args.timespec(&ts);
}
loop {
match self.ring.submitter().submit_with_args(want, &args) {
Ok(n) => return Ok(n),
Err(e) if e.raw_os_error() == Some(libc::ETIME) => return Ok(0),
Err(e) if e.raw_os_error() == Some(libc::EBUSY) => {
// the completion queue is full, process completions and retry
self.process_completions()?;
continue;
}
Err(e) if e.raw_os_error() == Some(libc::EINTR) => return Ok(0),
Err(e) => return Err(e),
}
}
}
/// Processes completions from the kernel.
///
/// This will process all completions currently available in the completion
/// queue and invoke [RingOp::complete] for each completed operation.
pub fn process_completions(&mut self) -> io::Result<()> {
let mut completion = self.ring.completion();
let mut new_entries = smallvec![];
loop {
let Some(cqe) = completion.next() else {
break;
};
let completed_key = cqe.user_data() as usize;
let entry = self.entries.get_mut(completed_key).unwrap();
let result = entry.result(cqe.result());
let mut comp_ctx = Completion {
context: &mut self.context,
new_entries,
};
let res = entry.complete(&mut comp_ctx, result);
if !cqueue::more(cqe.flags()) {
self.entries.remove(completed_key);
}
res?;
new_entries = std::mem::take(&mut comp_ctx.new_entries);
if !new_entries.is_empty() {
completion.sync();
drop(completion);
for new_entry in new_entries.drain(..) {
self.push(new_entry)?;
}
completion = self.ring.completion();
}
}
Ok(())
}
/// Drains the ring.
///
/// This will submit all pending operations to the kernel and process all
/// completions until the ring is empty.
pub fn drain(&mut self) -> io::Result<()> {
loop {
self.process_completions()?;
if self.entries.is_empty() {
break;
}
match self.ring.submitter().submit_with_args(
1,
&SubmitArgs::new().timespec(&Timespec::from(Duration::from_millis(10))),
) {
Ok(_) => {}
Err(e) if e.raw_os_error() == Some(libc::ETIME) => {}
Err(e) => return Err(e),
}
}
Ok(())
}
}
/// Trait for operations that can be submitted to a [Ring].
pub trait RingOp<T> {
fn entry(&mut self) -> squeue::Entry;
fn complete(&mut self, ctx: &mut Completion<T, Self>, res: io::Result<i32>) -> io::Result<()>
where
Self: Sized;
fn result(&self, res: i32) -> io::Result<i32> {
if res < 0 {
Err(io::Error::from_raw_os_error(res.wrapping_neg()))
} else {
Ok(res)
}
}
}
/// Context object passed to [RingOp::complete].
pub struct Completion<'a, T, E: RingOp<T>> {
// Give new_entries a stack size of 2 to avoid heap allocations in the common case where only
// one or two ops are queued from a completion handler.
//
// It's common to want to queue some extra work after a completion, for instance if you've
// completed a read and want to close the file descriptor, or if you're doing chained operations
// and want to push the next one. It's less common to want to queue many operations.
new_entries: SmallVec<[E; 2]>,
context: &'a mut T,
}
impl<T, E: RingOp<T>> Completion<'_, T, E> {
/// Returns a reference to the context value stored in a [Ring].
pub fn context(&self) -> &T {
self.context
}
/// Returns a mutable reference to the context value stored in a [Ring].
pub fn context_mut(&mut self) -> &mut T {
self.context
}
/// Pushes an operation to the submission queue.
///
/// This can be used to push new operations from within [RingOp::complete].
///
/// See also [Ring::push].
pub fn push(&mut self, op: E) {
self.new_entries.push(op);
}
}