1#![cfg_attr(
37 feature = "nightly",
38 feature(async_iterator, cfg_sanitize, io_error_more)
39)]
40#![warn(
41 anonymous_parameters,
42 bare_trait_objects,
43 missing_debug_implementations,
44 missing_docs,
45 trivial_numeric_casts,
46 unused_extern_crates,
47 unused_import_braces,
48 variant_size_differences
49)]
50
51use std::sync::atomic::AtomicBool;
52use std::sync::{Arc, Mutex, MutexGuard};
53use std::time::Duration;
54use std::{fmt, task};
55
56pub mod fd;
58
59mod asan;
60mod bitmap;
61mod config;
62mod cq;
63mod drop_waker;
64mod msan;
65mod op;
66mod sq;
67#[cfg(unix)]
68mod unix;
69
70#[cfg(any(target_os = "android", target_os = "linux"))]
71mod io_uring;
72#[cfg(any(target_os = "android", target_os = "linux"))]
73use io_uring as sys;
74
75#[cfg(any(
76 target_os = "dragonfly",
77 target_os = "freebsd",
78 target_os = "ios",
79 target_os = "macos",
80 target_os = "netbsd",
81 target_os = "openbsd",
82 target_os = "tvos",
83 target_os = "visionos",
84 target_os = "watchos",
85))]
86mod kqueue;
87
88#[cfg(any(
89 target_os = "dragonfly",
90 target_os = "freebsd",
91 target_os = "ios",
92 target_os = "macos",
93 target_os = "netbsd",
94 target_os = "openbsd",
95 target_os = "tvos",
96 target_os = "visionos",
97 target_os = "watchos",
98))]
99use kqueue as sys;
100
101pub mod cancel;
102pub mod extract;
103pub mod fs;
104pub mod io;
105pub mod mem;
106pub mod msg;
107pub mod net;
108pub mod pipe;
109pub mod poll;
110pub mod process;
111
112#[doc(no_inline)]
113pub use cancel::Cancel;
114#[doc(inline)]
115pub use config::Config;
116#[doc(no_inline)]
117pub use extract::Extract;
118#[doc(no_inline)]
119pub use fd::AsyncFd;
120
121use crate::bitmap::AtomicBitMap;
122use crate::sys::Submission;
123
124#[derive(Debug)]
139pub struct Ring {
140 sq: SubmissionQueue,
141 cq: cq::Queue<sys::Implementation>,
142}
143
144impl Ring {
145 pub const fn config<'r>(queued_operations: usize) -> Config<'r> {
157 Config {
158 queued_operations,
159 sys: crate::sys::Config::new(),
160 }
161 }
162
163 #[doc(alias = "io_uring_setup")]
167 #[doc(alias = "kqueue")]
168 pub fn new(queued_operations: usize) -> io::Result<Ring> {
169 Ring::config(queued_operations).build()
170 }
171
172 fn build(
174 submissions: sys::Submissions,
175 shared_data: sys::Shared,
176 completions: sys::Completions,
177 queued_operations: usize,
178 ) -> Ring {
179 let shared = SharedState::new(submissions, shared_data, queued_operations);
180 let sq = SubmissionQueue::new(shared.clone());
181 let cq = cq::Queue::new(completions, shared);
182 Ring { sq, cq }
183 }
184
185 pub const fn sq(&self) -> &SubmissionQueue {
189 &self.sq
190 }
191
192 #[doc(alias = "io_uring_enter")]
204 #[doc(alias = "kevent")]
205 pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
206 self.cq.poll(timeout)
207 }
208}
209
210#[derive(Clone)]
218pub struct SubmissionQueue {
219 inner: sq::Queue<sys::Implementation>,
220}
221
222impl SubmissionQueue {
223 const fn new(shared: Arc<SharedState<sys::Implementation>>) -> SubmissionQueue {
224 SubmissionQueue {
225 inner: sq::Queue::new(shared),
226 }
227 }
228
229 pub fn wake(&self) {
233 self.inner.wake();
234 }
235
236 #[allow(clippy::type_complexity)]
238 pub(crate) unsafe fn get_op(
239 &self,
240 op_id: OperationId,
241 ) -> MutexGuard<
242 '_,
243 Option<QueuedOperation<<<<sys::Implementation as Implementation>::Completions as cq::Completions>::Event as cq::Event>::State>>,
244 >{
245 unsafe { self.inner.get_op(op_id) }
246 }
247
248 #[allow(clippy::type_complexity)]
250 pub(crate) unsafe fn make_op_available(
251 &self,
252 op_id: OperationId,
253 op: MutexGuard<
254 '_,
255 Option<QueuedOperation<<<<sys::Implementation as Implementation>::Completions as cq::Completions>::Event as cq::Event>::State>>,
256 >,
257 ) {
258 unsafe { self.inner.make_op_available(op_id, op) };
259 }
260}
261
262impl fmt::Debug for SubmissionQueue {
263 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264 self.inner.fmt(f)
265 }
266}
267
268struct SharedState<I: Implementation> {
270 submissions: I::Submissions,
272 data: I::Shared,
274 is_polling: AtomicBool,
276 op_ids: Box<AtomicBitMap>,
279 #[rustfmt::skip]
283 #[allow(clippy::type_complexity)]
284 queued_ops: Box<[Mutex<Option<QueuedOperation<<<I::Completions as cq::Completions>::Event as cq::Event>::State>>>]>,
285 blocked_futures: Mutex<Vec<task::Waker>>,
287}
288
289impl<I: Implementation> SharedState<I> {
290 fn new(
293 submissions: I::Submissions,
294 data: I::Shared,
295 queued_operations: usize,
296 ) -> Arc<SharedState<I>> {
297 let op_ids = AtomicBitMap::new(queued_operations);
298 let mut queued_ops = Vec::with_capacity(op_ids.capacity());
299 queued_ops.resize_with(queued_ops.capacity(), || Mutex::new(None));
300 let queued_ops = queued_ops.into_boxed_slice();
301 let blocked_futures = Mutex::new(Vec::new());
302 Arc::new(SharedState {
303 submissions,
304 data,
305 is_polling: AtomicBool::new(false),
306 op_ids,
307 queued_ops,
308 blocked_futures,
309 })
310 }
311}
312
313impl<I: Implementation> fmt::Debug for SharedState<I> {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 f.debug_struct("SharedState")
316 .field("submissions", &self.submissions)
317 .field("data", &self.data)
318 .field("is_polling", &self.is_polling)
319 .field("op_ids", &self.op_ids)
320 .field("queued_ops", &self.queued_ops)
321 .field("blocked_futures", &self.blocked_futures)
322 .finish()
323 }
324}
325
326#[derive(Debug)]
328struct QueuedOperation<T> {
329 state: T,
331 dropped: bool,
334 done: bool,
340 waker: task::Waker,
342}
343
344impl<T> QueuedOperation<T> {
345 const fn new(state: T, waker: task::Waker) -> QueuedOperation<T> {
346 QueuedOperation {
347 state,
348 dropped: false,
349 done: false,
350 waker,
351 }
352 }
353
354 fn update_waker(&mut self, waker: &task::Waker) {
356 if !self.waker.will_wake(waker) {
357 self.waker.clone_from(waker);
358 }
359 }
360
361 fn prep_retry(&mut self)
362 where
363 T: cq::OperationState,
364 {
365 self.state.prep_retry();
366 }
367}
368
369type OperationId = usize;
375
376const WAKE_ID: OperationId = usize::MAX;
378const NO_COMPLETION_ID: OperationId = usize::MAX - 1;
381
382trait Implementation {
384 type Shared: fmt::Debug + Sized;
386
387 type Submissions: sq::Submissions<Shared = Self::Shared>;
389
390 type Completions: cq::Completions<Shared = Self::Shared>;
392}
393
394#[rustfmt::skip]
396macro_rules! man_link {
397 ($syscall: tt ( $section: tt ) ) => {
398 concat!(
399 "\n\nAdditional documentation can be found in the ",
400 "[`", stringify!($syscall), "(", stringify!($section), ")`]",
401 "(https://man7.org/linux/man-pages/man", stringify!($section), "/", stringify!($syscall), ".", stringify!($section), ".html)",
402 " manual.\n"
403 )
404 };
405}
406
407macro_rules! syscall {
409 ($fn: ident ( $($arg: expr),* $(,)? ) ) => {{
410 #[allow(unused_unsafe)]
411 let res = unsafe { libc::$fn($( $arg, )*) };
412 if res == -1 {
413 ::std::result::Result::Err(::std::io::Error::last_os_error())
414 } else {
415 ::std::result::Result::Ok(res)
416 }
417 }};
418}
419
420macro_rules! new_flag {
421 (
422 $(
423 $(#[$type_meta:meta])*
424 $type_vis: vis struct $type_name: ident ( $type_repr: ty ) $(impl BitOr $( $type_or: ty )*)? {
425 $(
426 $(#[$value_meta:meta])*
427 $value_name: ident = $libc: ident :: $value_type: ident,
428 )*
429 }
430 )+
431 ) => {
432 $(
433 $(#[$type_meta])*
434 #[derive(Copy, Clone, Eq, PartialEq)]
435 $type_vis struct $type_name(pub(crate) $type_repr);
436
437 impl $type_name {
438 $(
439 $(#[$value_meta])*
440 #[allow(trivial_numeric_casts, clippy::cast_sign_loss)]
441 $type_vis const $value_name: $type_name = $type_name($libc::$value_type as $type_repr);
442 )*
443 }
444
445 $crate::debug_detail!(impl for $type_name($type_repr) match $( $libc::$value_type ),*);
446
447 $(
448 impl std::ops::BitOr for $type_name {
449 type Output = Self;
450
451 fn bitor(self, rhs: Self) -> Self::Output {
452 $type_name(self.0 | rhs.0)
453 }
454 }
455
456 $(
457 impl std::ops::BitOr<$type_or> for $type_name {
458 type Output = Self;
459
460 #[allow(clippy::cast_sign_loss)]
461 fn bitor(self, rhs: $type_or) -> Self::Output {
462 $type_name(self.0 | rhs as $type_repr)
463 }
464 }
465 )*
466 )?
467 )+
468 };
469}
470
471#[allow(unused_macros)] macro_rules! debug_detail {
473 (
474 impl for $type: ident ($type_repr: ty) match
476 $( $( #[$target: meta] )* $libc: ident :: $flag: ident ),* $(,)?
477 ) => {
478 impl ::std::fmt::Debug for $type {
479 #[allow(trivial_numeric_casts, unreachable_patterns, unreachable_code, clippy::bad_bit_mask)]
480 fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
481 mod consts {
482 $(
483 $(#[$target])*
484 pub(super) const $flag: $type_repr = $libc :: $flag as $type_repr;
485 )*
486 }
487
488 f.write_str(match self.0 {
489 $(
490 $(#[$target])*
491 consts::$flag => stringify!($flag),
492 )*
493 value => return value.fmt(f),
494 })
495 }
496 }
497 };
498 (
499 match $type: ident ($event_type: ty),
501 $( $( #[$target: meta] )* $libc: ident :: $flag: ident ),+ $(,)?
502 ) => {
503 struct $type($event_type);
504
505 $crate::debug_detail!(impl for $type($event_type) match $( $libc::$flag ),*);
506 };
507 (
508 bitset $type: ident ($event_type: ty),
510 $( $( #[$target: meta] )* $libc: ident :: $flag: ident ),+ $(,)?
511 ) => {
512 struct $type($event_type);
513
514 impl fmt::Debug for $type {
515 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516 let mut written_one = false;
517 $(
518 $(#[$target])*
519 #[allow(clippy::bad_bit_mask)] {
521 if self.0 & $libc :: $flag != 0 {
522 if !written_one {
523 write!(f, "{}", stringify!($flag))?;
524 written_one = true;
525 } else {
526 write!(f, "|{}", stringify!($flag))?;
527 }
528 }
529 }
530 )+
531 if !written_one {
532 write!(f, "(empty)")
533 } else {
534 Ok(())
535 }
536 }
537 }
538 };
539}
540
541#[allow(unused_imports)] use {debug_detail, man_link, new_flag, syscall};