1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
//! Configuration of a [`Ring`].
use std::mem::{self, size_of};
use std::os::fd::{FromRawFd, OwnedFd};
use std::time::Duration;
use std::{io, ptr};
use crate::io_uring::{self, Completions, Submissions, libc};
use crate::{Ring, SubmissionQueue, syscall};
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)] // This is just stupid.
pub(crate) struct Config<'r> {
submission_entries: u32,
completion_entries: Option<u32>,
disabled: bool,
single_issuer: bool,
defer_taskrun: bool,
clamp: bool,
kernel_thread: bool,
cpu_affinity: Option<u32>,
idle_timeout: Option<u32>,
direct_descriptors: Option<u32>,
attach: Option<&'r SubmissionQueue>,
}
impl<'r> Config<'r> {
pub(crate) const fn new() -> Config<'r> {
Config {
submission_entries: 32,
completion_entries: None,
disabled: false,
single_issuer: false,
defer_taskrun: false,
clamp: false,
kernel_thread: false,
cpu_affinity: None,
idle_timeout: None,
direct_descriptors: None,
attach: None,
}
}
}
/// io_uring specific configuration.
impl<'r> crate::Config<'r> {
/// Set the size of the io_uring submission queue.
///
/// It must be a power of two. Defaults to 32. Also see
/// [`Config::with_maximum_queue_size`] to set the maximum queue size.
///
/// [`Config::with_maximum_queue_size`]: crate::Config::with_maximum_queue_size
#[doc(alias = "io_uring_setup")]
pub const fn with_submission_queue_size(mut self, entries: u32) -> Self {
self.sys.submission_entries = entries;
self
}
/// Set the size of the io_uring completion queue.
///
/// It must be a power of two. Defaults to twice the size of the submission
/// queue. Also see [`Config::with_maximum_queue_size`] to set the maximum
/// queue size.
///
/// [`Config::with_maximum_queue_size`]: crate::Config::with_maximum_queue_size
#[doc(alias = "IORING_SETUP_CQSIZE")]
pub const fn with_completion_queue_size(mut self, entries: u32) -> Self {
self.sys.completion_entries = Some(entries);
self
}
/// Use the largest possible submission and completion queue sizes.
#[doc(alias = "IORING_SETUP_CLAMP")]
pub const fn with_maximum_queue_size(mut self) -> Self {
self.sys.submission_entries = u32::MAX;
self.sys.clamp = true;
self
}
/// Enable single issuer.
///
/// This hints to the kernel that only a single thread will submit requests,
/// which is used for optimisations within the kernel. This means that only
/// the thread that [`build`] the ring or [`enabled`] it (after starting in
/// disable mode) may register resources with the ring, resources such as
/// the [`ReadBufPool`].
///
/// This optimisation is enforces by the kernel, which will return `EEXIST`
/// or `AlreadyExists` if another thread attempts to register resource or
/// otherwise use the [`Ring`] in a way that is not allowed.
///
/// [`build`]: crate::Config::build
/// [`enabled`]: Ring::enable
/// [`ReadBufPool`]: crate::io::ReadBufPool
///
/// # Notes
///
/// This also prevents [`SubmissionQueue::wake`] from working as the event
/// to interrupt the call to [`Ring::poll`] can't be submitted.
#[doc(alias = "IORING_SETUP_SINGLE_ISSUER")]
pub const fn single_issuer(mut self) -> Self {
self.sys.single_issuer = true;
self
}
/// Defer task running.
///
/// By default, kernel will process all outstanding work at the end of any
/// system call or thread interrupt. This can delay the application from
/// making other progress.
///
/// Enabling this option will hint to kernel that it should defer work until
/// [`Ring::poll`] is called. This way the work is done in the
/// [`Ring::poll`].
///
/// This options required [`Config::single_issuer`] to be set. This option
/// does not work with [`Config::with_kernel_thread`] set.
///
/// [`Config::single_issuer`]: crate::Config::with_kernel_thread
/// [`Config::with_kernel_thread`]: crate::Config::with_kernel_thread
#[doc(alias = "IORING_SETUP_DEFER_TASKRUN")]
pub const fn defer_task_run(mut self) -> Self {
self.sys.defer_taskrun = true;
self
}
/// Start a kernel thread polling the [`Ring`].
///
/// When this option is enabled a kernel thread is created to perform
/// submission queue polling. This allows issuing I/O without ever context
/// switching into the kernel.
#[doc(alias = "IORING_SETUP_SQPOLL")]
pub const fn with_kernel_thread(mut self) -> Self {
self.sys.kernel_thread = true;
self
}
/// Set the CPU affinity of kernel thread polling the [`Ring`].
///
/// Only works in combination with [`Config::with_kernel_thread`].
///
/// [`Config::with_kernel_thread`]: crate::Config::with_kernel_thread
#[doc(alias = "IORING_SETUP_SQ_AFF")]
#[doc(alias = "sq_thread_cpu")]
pub const fn with_cpu_affinity(mut self, cpu: u32) -> Self {
self.sys.cpu_affinity = Some(cpu);
self
}
/// Set the idle timeout of the kernel thread polling the submission queue.
/// After `timeout` time has passed after the last I/O submission the kernel
/// thread will go to sleep. If the I/O is kept busy the kernel thread will
/// never sleep. Note that A10 will ensure the kernel thread is woken up
/// when more submissions are added.
///
/// The accuracy of `timeout` is only in milliseconds, anything more precise
/// will be discarded.
///
/// Only works in combination with [`Config::with_kernel_thread`].
///
/// [`Config::with_kernel_thread`]: crate::Config::with_kernel_thread
#[doc(alias = "sq_thread_idle")]
pub const fn with_idle_timeout(mut self, timeout: Duration) -> Self {
let millis = timeout.as_millis();
let ms = if millis > (u32::MAX as u128) {
u32::MAX
} else {
// SAFETY: checked if the value fits above.
// TODO: use `millis.try_into().unwrap_or(u32::MAX)` once that is
// stable in const fns.
millis as u32
};
self.sys.idle_timeout = Some(ms);
self
}
/// Enable direct descriptors.
///
/// This registers a sparse array of `size` direct descriptor slots enabling
/// direct descriptors to be used. If this is not used attempts to create a
/// direct descriptor will result in `ENXIO`.
///
/// By default direct descriptors are not enabled.
#[doc(alias = "IORING_REGISTER_FILES")]
#[doc(alias = "IORING_REGISTER_FILES2")]
#[doc(alias = "IORING_RSRC_REGISTER_SPARSE")]
pub const fn with_direct_descriptors(mut self, size: u32) -> Self {
self.sys.direct_descriptors = Some(size);
self
}
/// Start the ring in a disabled state.
///
/// While the ring is disabled submissions are not allowed. To enable the
/// ring use [`Ring::enable`].
#[doc(alias = "IORING_SETUP_R_DISABLED")]
pub const fn disable(mut self) -> Self {
self.sys.disabled = true;
self
}
/// Attach the new (to be created) ring to `other_ring`.
///
/// This will cause the `Ring` being created to share the asynchronous
/// worker thread backend of the specified `other_ring`, rather than create
/// a new separate thread pool.
///
/// Uses `IORING_SETUP_ATTACH_WQ`, added in Linux kernel 5.6.
#[doc(alias = "IORING_SETUP_ATTACH_WQ")]
pub fn attach(self, other_ring: &'r Ring) -> Self {
// SAFETY: this is safe because `SubmissionQueue` and `sys::Submissions`
// have the same layout due to `repr(transparent)`.
self.attach_queue(unsafe { &*ptr::from_ref(&other_ring.sq).cast() })
}
/// Same as [`Config::attach`], but accepts a [`SubmissionQueue`].
///
/// [`Config::attach`]: crate::Config::attach
#[doc(alias = "IORING_SETUP_ATTACH_WQ")]
pub fn attach_queue(mut self, other_sq: &'r SubmissionQueue) -> Self {
self.sys.attach = Some(other_sq);
self
}
pub(crate) fn build_sys(self) -> io::Result<(Completions, Submissions)> {
// SAFETY: all zero is valid for `io_uring_params`.
let mut parameters: libc::io_uring_params = unsafe { mem::zeroed() };
parameters.flags = libc::IORING_SETUP_SUBMIT_ALL // Submit all submissions on error.
| libc::IORING_SETUP_NO_SQARRAY; // Don't use indirection for submissions.
if self.sys.kernel_thread {
parameters.flags |= libc::IORING_SETUP_SQPOLL; // Kernel thread for polling.
} else {
// Don't interrupt userspace, the user must call `Ring::poll` any way.
parameters.flags |= libc::IORING_SETUP_COOP_TASKRUN;
}
if self.sys.disabled {
// Start the ring in disabled mode.
parameters.flags |= libc::IORING_SETUP_R_DISABLED;
}
if self.sys.single_issuer {
// Only allow access from a single thread.
parameters.flags |= libc::IORING_SETUP_SINGLE_ISSUER;
}
if self.sys.defer_taskrun {
parameters.flags |= libc::IORING_SETUP_DEFER_TASKRUN;
}
parameters.sq_entries = self.sys.submission_entries;
if let Some(completion_entries) = self.sys.completion_entries {
parameters.cq_entries = completion_entries;
parameters.flags |= libc::IORING_SETUP_CQSIZE;
}
if self.sys.clamp {
parameters.flags |= libc::IORING_SETUP_CLAMP;
}
if let Some(cpu) = self.sys.cpu_affinity {
parameters.flags |= libc::IORING_SETUP_SQ_AFF;
parameters.sq_thread_cpu = cpu;
}
if let Some(idle_timeout) = self.sys.idle_timeout {
parameters.sq_thread_idle = idle_timeout;
}
#[allow(clippy::cast_sign_loss)] // File descriptors are always positive.
if let Some(other_sq) = self.sys.attach {
parameters.wq_fd = other_sq.submissions().ring_fd() as u32;
parameters.flags |= libc::IORING_SETUP_ATTACH_WQ;
}
let rfd = match syscall!(io_uring_setup(parameters.sq_entries, &raw mut parameters)) {
// SAFETY: just created the fd (and checked the error).
Ok(rfd) => unsafe { OwnedFd::from_raw_fd(rfd) },
Err(err) => return Err(err),
};
check_feature!(parameters.features, IORING_FEAT_NODROP); // Never drop completions.
check_feature!(parameters.features, IORING_FEAT_SUBMIT_STABLE); // All data for async offload must be consumed.
check_feature!(parameters.features, IORING_FEAT_RW_CUR_POS); // Allow -1 as current position.
check_feature!(parameters.features, IORING_FEAT_SQPOLL_NONFIXED); // No need for fixed files.
let shared = io_uring::Shared::new(rfd, ¶meters)?;
let submissions = io_uring::Submissions::new(shared);
let completions = io_uring::Completions::new(submissions.ring_fd(), ¶meters)?;
if let Some(size) = self.sys.direct_descriptors {
let register = libc::io_uring_rsrc_register {
flags: libc::IORING_RSRC_REGISTER_SPARSE,
nr: size,
resv2: 0,
data: 0,
tags: 0,
};
let arg = ptr::from_ref(®ister).cast();
let size = size_of::<libc::io_uring_rsrc_register>();
submissions
.shared()
.register(libc::IORING_REGISTER_FILES2, arg, size as u32)?;
}
Ok((completions, submissions))
}
}
macro_rules! check_feature {
($features: expr, $required: ident $(,)?) => {
if $features & libc::$required == 0 {
return Err(::std::io::Error::new(
::std::io::ErrorKind::Unsupported,
::std::concat!(
"Kernel doesn't have required `",
stringify!($required),
"` feature"
),
));
};
};
}
use check_feature;