a10/
lib.rs

1//! The [A10] io_uring library.
2//!
3//! This library is meant as a low-level library safely exposing the io_uring
4//! API. For simplicity this only has two main types and a number of helper
5//! types:
6//!  * [`Ring`] is a wrapper around io_uring used to poll for completion events.
7//!  * [`AsyncFd`] is a wrapper around a file descriptor that provides a safe
8//!    API to schedule operations.
9//!
10//! Some modules provide ways to create `AsyncFd`, e.g. [`OpenOptions`], others
11//! are simply a place to expose the [`Future`]s supporting the scheduled
12//! operations. The modules try to follow the same structure as that of the
13//! standard library.
14//!
15//! Additional documentation can be found in the [`io_uring(7)`] manual.
16//!
17//! [A10]: https://en.wikipedia.org/wiki/A10_motorway_(Netherlands)
18//! [`OpenOptions`]: fs::OpenOptions
19//! [`Future`]: std::future::Future
20//! [`io_uring(7)`]: https://man7.org/linux/man-pages/man7/io_uring.7.html
21//!
22//! # Notes
23//!
24//! Most I/O operations need ownership of the data, e.g. a buffer, so it can
25//! delay deallocation if needed. For example when a `Future` is dropped before
26//! being polled to completion. This data can be retrieved again by using the
27//! [`Extract`] trait.
28//!
29//! # Examples
30//!
31//! Examples can be found in the examples directory of the source code,
32//! [available online on GitHub].
33//!
34//! [available online on GitHub]: https://github.com/Thomasdezeeuw/a10/tree/main/examples
35
36#![cfg_attr(
37    feature = "nightly",
38    feature(async_iterator, cfg_sanitize, io_error_more)
39)]
40#![warn(
41    anonymous_parameters,
42    bare_trait_objects,
43    missing_debug_implementations,
44    missing_docs,
45    trivial_numeric_casts,
46    unused_extern_crates,
47    unused_import_braces,
48    variant_size_differences
49)]
50
51use std::sync::atomic::AtomicBool;
52use std::sync::{Arc, Mutex, MutexGuard};
53use std::time::Duration;
54use std::{fmt, task};
55
56// This must come before the other modules for the documentation.
57pub mod fd;
58
59mod asan;
60mod bitmap;
61mod config;
62mod cq;
63mod drop_waker;
64mod msan;
65mod op;
66mod sq;
67#[cfg(unix)]
68mod unix;
69
70#[cfg(any(target_os = "android", target_os = "linux"))]
71mod io_uring;
72#[cfg(any(target_os = "android", target_os = "linux"))]
73use io_uring as sys;
74
75#[cfg(any(
76    target_os = "dragonfly",
77    target_os = "freebsd",
78    target_os = "ios",
79    target_os = "macos",
80    target_os = "netbsd",
81    target_os = "openbsd",
82    target_os = "tvos",
83    target_os = "visionos",
84    target_os = "watchos",
85))]
86mod kqueue;
87
88#[cfg(any(
89    target_os = "dragonfly",
90    target_os = "freebsd",
91    target_os = "ios",
92    target_os = "macos",
93    target_os = "netbsd",
94    target_os = "openbsd",
95    target_os = "tvos",
96    target_os = "visionos",
97    target_os = "watchos",
98))]
99use kqueue as sys;
100
101pub mod cancel;
102pub mod extract;
103pub mod fs;
104pub mod io;
105pub mod mem;
106pub mod msg;
107pub mod net;
108pub mod pipe;
109pub mod poll;
110pub mod process;
111
112#[doc(no_inline)]
113pub use cancel::Cancel;
114#[doc(inline)]
115pub use config::Config;
116#[doc(no_inline)]
117pub use extract::Extract;
118#[doc(no_inline)]
119pub use fd::AsyncFd;
120
121use crate::bitmap::AtomicBitMap;
122use crate::sys::Submission;
123
124/// This type represents the user space side of an io_uring.
125///
126/// An io_uring is split into two queues: the submissions and completions queue.
127/// The [`SubmissionQueue`] is public, but doesn't provide many methods. The
128/// `SubmissionQueue` is used by I/O types in the crate to schedule asynchronous
129/// operations.
130///
131/// The completions queue is not exposed by the crate and only used internally.
132/// Instead it will wake the [`Future`]s exposed by the various I/O types, such
133/// as [`AsyncFd::write`]'s [`Write`] `Future`.
134///
135/// [`Future`]: std::future::Future
136/// [`AsyncFd::write`]: AsyncFd::write
137/// [`Write`]: io::Write
138#[derive(Debug)]
139pub struct Ring {
140    sq: SubmissionQueue,
141    cq: cq::Queue<sys::Implementation>,
142}
143
144impl Ring {
145    /// Configure a `Ring`.
146    ///
147    /// `queued_operations` is the number of queued operations, i.e. the number
148    /// of concurrent A10 operation.
149    ///
150    /// # Notes
151    ///
152    /// A10 uses `IORING_SETUP_SQPOLL` by default for io_uring, which required
153    /// Linux kernel 5.11 to work correctly. Furthermore before Linux 5.13 the
154    /// user needs the `CAP_SYS_NICE` capability if run as non-root. This can be
155    /// disabled by [`Config::with_kernel_thread`].
156    pub const fn config<'r>(queued_operations: usize) -> Config<'r> {
157        Config {
158            queued_operations,
159            sys: crate::sys::Config::new(),
160        }
161    }
162
163    /// Create a new `Ring` with the default configuration.
164    ///
165    /// For more configuration options see [`Config`].
166    #[doc(alias = "io_uring_setup")]
167    #[doc(alias = "kqueue")]
168    pub fn new(queued_operations: usize) -> io::Result<Ring> {
169        Ring::config(queued_operations).build()
170    }
171
172    /// Build a new `Ring`.
173    fn build(
174        submissions: sys::Submissions,
175        shared_data: sys::Shared,
176        completions: sys::Completions,
177        queued_operations: usize,
178    ) -> Ring {
179        let shared = SharedState::new(submissions, shared_data, queued_operations);
180        let sq = SubmissionQueue::new(shared.clone());
181        let cq = cq::Queue::new(completions, shared);
182        Ring { sq, cq }
183    }
184
185    /// Returns the `SubmissionQueue` used by this ring.
186    ///
187    /// The `SubmissionQueue` can be used to queue asynchronous I/O operations.
188    pub const fn sq(&self) -> &SubmissionQueue {
189        &self.sq
190    }
191
192    /// Poll the ring for completions.
193    ///
194    /// This will wake all completed [`Future`]s with the result of their
195    /// operations.
196    ///
197    /// If a zero duration timeout (i.e. `Some(Duration::ZERO)`) is passed this
198    /// function will only wake all already completed operations. When using
199    /// io_uring it also guarantees to not make a system call, but it also means
200    /// it doesn't guarantee at least one completion was processed.
201    ///
202    /// [`Future`]: std::future::Future
203    #[doc(alias = "io_uring_enter")]
204    #[doc(alias = "kevent")]
205    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
206        self.cq.poll(timeout)
207    }
208}
209
210/// Queue to submit asynchronous operations to.
211///
212/// This type doesn't have many public methods, but is used by all I/O types, to
213/// queue asynchronous operations. The queue can be acquired by using
214/// [`Ring::sq`].
215///
216/// The submission queue can be shared by cloning it, it's a cheap operation.
217#[derive(Clone)]
218pub struct SubmissionQueue {
219    inner: sq::Queue<sys::Implementation>,
220}
221
222impl SubmissionQueue {
223    const fn new(shared: Arc<SharedState<sys::Implementation>>) -> SubmissionQueue {
224        SubmissionQueue {
225            inner: sq::Queue::new(shared),
226        }
227    }
228
229    /// Wake the connected [`Ring`].
230    ///
231    /// All this does is interrupt a call to [`Ring::poll`].
232    pub fn wake(&self) {
233        self.inner.wake();
234    }
235
236    /// See [`sq::Queue::get_op`].
237    #[allow(clippy::type_complexity)]
238    pub(crate) unsafe fn get_op(
239        &self,
240        op_id: OperationId,
241    ) -> MutexGuard<
242        '_,
243        Option<QueuedOperation<<<<sys::Implementation as Implementation>::Completions as cq::Completions>::Event as cq::Event>::State>>,
244    >{
245        unsafe { self.inner.get_op(op_id) }
246    }
247
248    /// See [`sq::Queue::make_op_available`].
249    #[allow(clippy::type_complexity)]
250    pub(crate) unsafe fn make_op_available(
251        &self,
252        op_id: OperationId,
253        op: MutexGuard<
254        '_,
255        Option<QueuedOperation<<<<sys::Implementation as Implementation>::Completions as cq::Completions>::Event as cq::Event>::State>>,
256        >,
257    ) {
258        unsafe { self.inner.make_op_available(op_id, op) };
259    }
260}
261
262impl fmt::Debug for SubmissionQueue {
263    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264        self.inner.fmt(f)
265    }
266}
267
268/// State shared between the submission and completion side.
269struct SharedState<I: Implementation> {
270    /// [`sq::Submissions`] implementation.
271    submissions: I::Submissions,
272    /// Data shared between the submission and completion queues.
273    data: I::Shared,
274    /// Boolean indicating a thread is [`Ring::poll`]ing.
275    is_polling: AtomicBool,
276    /// Bitmap which can be used to create [`OperationId`]s, used as index into
277    /// `queued_ops`.
278    op_ids: Box<AtomicBitMap>,
279    /// State of queued operations.
280    ///
281    /// Indexed by a [`OperationId`]s, created by `op_ids`.
282    #[rustfmt::skip]
283    #[allow(clippy::type_complexity)]
284    queued_ops: Box<[Mutex<Option<QueuedOperation<<<I::Completions as cq::Completions>::Event as cq::Event>::State>>>]>,
285    /// Futures that are waiting for a slot in `queued_ops`.
286    blocked_futures: Mutex<Vec<task::Waker>>,
287}
288
289impl<I: Implementation> SharedState<I> {
290    /// `queued_operations` is the maximum number of queued operations, will be
291    /// rounded up depending on the capacity of `AtomicBitMap`.
292    fn new(
293        submissions: I::Submissions,
294        data: I::Shared,
295        queued_operations: usize,
296    ) -> Arc<SharedState<I>> {
297        let op_ids = AtomicBitMap::new(queued_operations);
298        let mut queued_ops = Vec::with_capacity(op_ids.capacity());
299        queued_ops.resize_with(queued_ops.capacity(), || Mutex::new(None));
300        let queued_ops = queued_ops.into_boxed_slice();
301        let blocked_futures = Mutex::new(Vec::new());
302        Arc::new(SharedState {
303            submissions,
304            data,
305            is_polling: AtomicBool::new(false),
306            op_ids,
307            queued_ops,
308            blocked_futures,
309        })
310    }
311}
312
313impl<I: Implementation> fmt::Debug for SharedState<I> {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        f.debug_struct("SharedState")
316            .field("submissions", &self.submissions)
317            .field("data", &self.data)
318            .field("is_polling", &self.is_polling)
319            .field("op_ids", &self.op_ids)
320            .field("queued_ops", &self.queued_ops)
321            .field("blocked_futures", &self.blocked_futures)
322            .finish()
323    }
324}
325
326/// In progress/queued operation.
327#[derive(Debug)]
328struct QueuedOperation<T> {
329    /// State of the operation.
330    state: T,
331    /// True if the connected `Future`/`AsyncIterator` is dropped and thus no
332    /// longer will retrieve the result.
333    dropped: bool,
334    /// Boolean used by operations that result in multiple completion events.
335    /// For example zero copy: one completion to report the result another to
336    /// indicate the resources are no longer used.
337    /// For io_uring multishot this will be true if no more completion events
338    /// are coming, for example in case a previous event returned an error.
339    done: bool,
340    /// Waker to wake when the operation is done.
341    waker: task::Waker,
342}
343
344impl<T> QueuedOperation<T> {
345    const fn new(state: T, waker: task::Waker) -> QueuedOperation<T> {
346        QueuedOperation {
347            state,
348            dropped: false,
349            done: false,
350            waker,
351        }
352    }
353
354    /// Update the waker to `waker`, if it's different.
355    fn update_waker(&mut self, waker: &task::Waker) {
356        if !self.waker.will_wake(waker) {
357            self.waker.clone_from(waker);
358        }
359    }
360
361    fn prep_retry(&mut self)
362    where
363        T: cq::OperationState,
364    {
365        self.state.prep_retry();
366    }
367}
368
369/// Operation id.
370///
371/// Used to relate completion events to submission events and operations. Also
372/// used as index into [`SharedState::queued_ops`], created by
373/// [`SharedState::op_ids`].
374type OperationId = usize;
375
376/// Id to use for internal wake ups.
377const WAKE_ID: OperationId = usize::MAX;
378/// Id to use for submissions without a completions event (in the case we do
379/// actually get a completion event).
380const NO_COMPLETION_ID: OperationId = usize::MAX - 1;
381
382/// Platform specific implementation.
383trait Implementation {
384    /// Data shared between the submission and completion queues.
385    type Shared: fmt::Debug + Sized;
386
387    /// See [`sq::Submissions`].
388    type Submissions: sq::Submissions<Shared = Self::Shared>;
389
390    /// See [`cq::Completions`].
391    type Completions: cq::Completions<Shared = Self::Shared>;
392}
393
394/// Link to online manual.
395#[rustfmt::skip]
396macro_rules! man_link {
397    ($syscall: tt ( $section: tt ) ) => {
398        concat!(
399            "\n\nAdditional documentation can be found in the ",
400            "[`", stringify!($syscall), "(", stringify!($section), ")`]",
401            "(https://man7.org/linux/man-pages/man", stringify!($section), "/", stringify!($syscall), ".", stringify!($section), ".html)",
402            " manual.\n"
403        )
404    };
405}
406
407/// Helper macro to execute a system call that returns an `io::Result`.
408macro_rules! syscall {
409    ($fn: ident ( $($arg: expr),* $(,)? ) ) => {{
410        #[allow(unused_unsafe)]
411        let res = unsafe { libc::$fn($( $arg, )*) };
412        if res == -1 {
413            ::std::result::Result::Err(::std::io::Error::last_os_error())
414        } else {
415            ::std::result::Result::Ok(res)
416        }
417    }};
418}
419
420macro_rules! new_flag {
421    (
422        $(
423        $(#[$type_meta:meta])*
424        $type_vis: vis struct $type_name: ident ( $type_repr: ty ) $(impl BitOr $( $type_or: ty )*)? {
425            $(
426            $(#[$value_meta:meta])*
427            $value_name: ident = $libc: ident :: $value_type: ident,
428            )*
429        }
430        )+
431    ) => {
432        $(
433        $(#[$type_meta])*
434        #[derive(Copy, Clone, Eq, PartialEq)]
435        $type_vis struct $type_name(pub(crate) $type_repr);
436
437        impl $type_name {
438            $(
439            $(#[$value_meta])*
440            #[allow(trivial_numeric_casts, clippy::cast_sign_loss)]
441            $type_vis const $value_name: $type_name = $type_name($libc::$value_type as $type_repr);
442            )*
443        }
444
445        $crate::debug_detail!(impl for $type_name($type_repr) match $( $libc::$value_type ),*);
446
447        $(
448        impl std::ops::BitOr for $type_name {
449            type Output = Self;
450
451            fn bitor(self, rhs: Self) -> Self::Output {
452                $type_name(self.0 | rhs.0)
453            }
454        }
455
456        $(
457        impl std::ops::BitOr<$type_or> for $type_name {
458            type Output = Self;
459
460            #[allow(clippy::cast_sign_loss)]
461            fn bitor(self, rhs: $type_or) -> Self::Output {
462                $type_name(self.0 | rhs as $type_repr)
463            }
464        }
465        )*
466        )?
467        )+
468    };
469}
470
471#[allow(unused_macros)] // Not used on all OS.
472macro_rules! debug_detail {
473    (
474        // Match a value exactly.
475        impl for $type: ident ($type_repr: ty) match
476        $( $( #[$target: meta] )* $libc: ident :: $flag: ident ),* $(,)?
477    ) => {
478        impl ::std::fmt::Debug for $type {
479            #[allow(trivial_numeric_casts, unreachable_patterns, unreachable_code, clippy::bad_bit_mask)]
480            fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
481                mod consts {
482                    $(
483                    $(#[$target])*
484                    pub(super) const $flag: $type_repr = $libc :: $flag as $type_repr;
485                    )*
486                }
487
488                f.write_str(match self.0 {
489                    $(
490                    $(#[$target])*
491                    consts::$flag => stringify!($flag),
492                    )*
493                    value => return value.fmt(f),
494                })
495            }
496        }
497    };
498    (
499        // Match a value exactly.
500        match $type: ident ($event_type: ty),
501        $( $( #[$target: meta] )* $libc: ident :: $flag: ident ),+ $(,)?
502    ) => {
503        struct $type($event_type);
504
505        $crate::debug_detail!(impl for $type($event_type) match $( $libc::$flag ),*);
506    };
507    (
508        // Integer bitset.
509        bitset $type: ident ($event_type: ty),
510        $( $( #[$target: meta] )* $libc: ident :: $flag: ident ),+ $(,)?
511    ) => {
512        struct $type($event_type);
513
514        impl fmt::Debug for $type {
515            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
516                let mut written_one = false;
517                $(
518                    $(#[$target])*
519                    #[allow(clippy::bad_bit_mask)] // Apparently some flags are zero.
520                    {
521                        if self.0 & $libc :: $flag != 0 {
522                            if !written_one {
523                                write!(f, "{}", stringify!($flag))?;
524                                written_one = true;
525                            } else {
526                                write!(f, "|{}", stringify!($flag))?;
527                            }
528                        }
529                    }
530                )+
531                if !written_one {
532                    write!(f, "(empty)")
533                } else {
534                    Ok(())
535                }
536            }
537        }
538    };
539}
540
541#[allow(unused_imports)] // Not used on all OS.
542use {debug_detail, man_link, new_flag, syscall};