Skip to main content

a10/
lib.rs

1//! The A10 I/O library. [^1]
2//!
3//! This library is meant as a low-level library safely exposing different OS's
4//! abilities to perform non-blocking I/O.
5//!
6//! For simplicity this only has the following main types and a number of helper
7//! types:
8//!  * [`AsyncFd`] is a wrapper around a file descriptor that provides a safe
9//!    API to perform I/O operations such as `read(2)` using [`Future`]s.
10//!  * [`SubmissionQueue`] is needed to start operations, such as opening a file
11//!    or new socket, but on its own can't do much.
12//!  * [`Ring`] needs be [polled] so that the I/O operations can make progress.
13//!
14//! Some modules provide ways to create `AsyncFd`, e.g. [`socket`] or
15//! [`OpenOptions`], others are simply a place to expose the [`Future`]s
16//! supporting the I/O operations. The modules try to roughly follow the same
17//! structure as that of the standard library.
18//!
19//! [polled]: Ring::poll
20//! [`socket`]: net::socket
21//! [`OpenOptions`]: fs::OpenOptions
22//! [`Future`]: std::future::Future
23//!
24//! # Implementation Notes
25//!
26//! On Linux this uses io_uring, which is a completion based API. For the BSD
27//! family of OS (FreeBSD, OpenBSD, NetBSD, etc.) and for the Apple family
28//! (macOS, iOS, etc.) this uses kqueue, which is a poll based API.
29//!
30//! To support both the completion and poll based API most I/O operations need
31//! ownership of the data, e.g. a buffer, so it can delay deallocation if
32//! needed. [^2] The input data can be retrieved again by using the [`Extract`]
33//! trait.
34//!
35//! Additional documentation can be found in the [`io_uring(7)`] and
36//! [`kqueue(2)`] manuals.
37//!
38//! [`io_uring(7)`]: https://man7.org/linux/man-pages/man7/io_uring.7.html
39//! [`kqueue(2)`]: https://man.freebsd.org/cgi/man.cgi?query=kqueue
40//!
41//! # Examples
42//!
43//! Examples can be found in the examples directory of the source code,
44//! [available online on GitHub].
45//!
46//! [available online on GitHub]: https://github.com/Thomasdezeeuw/a10/tree/main/examples
47//!
48//! [^1]: The name A10 comes from the [A10 ring road around Amsterdam], which
49//!       relates to the ring buffers that io_uring uses in its design.
50//! [^2]: Delaying of the deallocation needs to happen for completion based APIs
51//!       where an I/O operation `Future` is dropped before it's complete -- the
52//!       OS will continue to use the resources, which would result in a
53//!       use-after-free bug.
54//!
55//! [A10 ring road around Amsterdam]: https://en.wikipedia.org/wiki/A10_motorway_(Netherlands)
56
57#![cfg_attr(feature = "nightly", feature(async_iterator, cfg_sanitize))]
58
59#[cfg(not(any(
60    target_os = "android",
61    target_os = "dragonfly",
62    target_os = "freebsd",
63    target_os = "ios",
64    target_os = "linux",
65    target_os = "macos",
66    target_os = "netbsd",
67    target_os = "openbsd",
68    target_os = "tvos",
69    target_os = "visionos",
70    target_os = "watchos",
71)))]
72compile_error!("OS not supported");
73
74use std::time::Duration;
75
76// This must come before the other modules for the documentation.
77pub mod fd;
78
79mod asan;
80mod config;
81mod msan;
82mod op;
83#[cfg(unix)]
84mod unix;
85
86pub mod extract;
87pub mod fs;
88pub mod io;
89pub mod mem;
90pub mod net;
91pub mod pipe;
92pub mod process;
93
94#[cfg(any(target_os = "android", target_os = "linux"))]
95mod io_uring;
96#[cfg(any(target_os = "android", target_os = "linux"))]
97use io_uring as sys;
98
99#[cfg(any(
100    target_os = "dragonfly",
101    target_os = "freebsd",
102    target_os = "ios",
103    target_os = "macos",
104    target_os = "netbsd",
105    target_os = "openbsd",
106    target_os = "tvos",
107    target_os = "visionos",
108    target_os = "watchos",
109))]
110mod kqueue;
111#[cfg(any(
112    target_os = "dragonfly",
113    target_os = "freebsd",
114    target_os = "ios",
115    target_os = "macos",
116    target_os = "netbsd",
117    target_os = "openbsd",
118    target_os = "tvos",
119    target_os = "visionos",
120    target_os = "watchos",
121))]
122use kqueue as sys;
123
124#[doc(inline)]
125pub use config::Config;
126#[doc(no_inline)]
127pub use extract::Extract;
128#[doc(no_inline)]
129pub use fd::AsyncFd;
130
131/// Ring.
132///
133/// The API on this type is quite minimal. It provides access to the
134/// [`SubmissionQueue`], which is used to perform I/O operations. And it exposes
135/// [`Ring::poll`] needs to be called to make progress on the operations and
136/// mark the [`Future`]s are ready to poll.
137#[derive(Debug)]
138pub struct Ring {
139    cq: sys::Completions,
140    sq: sys::Submissions,
141}
142
143impl Ring {
144    /// Configure a `Ring`.
145    pub const fn config<'r>() -> Config<'r> {
146        Config {
147            sys: crate::sys::Config::new(),
148        }
149    }
150
151    /// Create a new `Ring` with the default configuration.
152    ///
153    /// For more configuration options see [`Config`].
154    #[doc(alias = "io_uring_setup")]
155    #[doc(alias = "kqueue")]
156    pub fn new() -> io::Result<Ring> {
157        Ring::config().build()
158    }
159
160    /// Returns the `SubmissionQueue` used by this ring.
161    ///
162    /// The submission queue can be used to queue asynchronous I/O operations.
163    pub fn sq(&self) -> SubmissionQueue {
164        SubmissionQueue(self.sq.clone())
165    }
166
167    /// Poll the ring for completions.
168    ///
169    /// This will wake all completed [`Future`]s with the result of their
170    /// operations.
171    ///
172    /// [`Future`]: std::future::Future
173    #[doc(alias = "io_uring_enter")]
174    #[doc(alias = "kevent")]
175    pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
176        self.cq.poll(self.sq.shared(), timeout)
177    }
178}
179
180/// Queue to submit asynchronous operations to.
181///
182/// This type doesn't have many public methods, but is used by all I/O types to
183/// queue asynchronous operations. The queue can be acquired by using
184/// [`Ring::sq`].
185///
186/// The submission queue can be shared by cloning it, it's a cheap operation.
187#[derive(Clone, Debug)]
188#[repr(transparent)]
189pub struct SubmissionQueue(sys::Submissions);
190
191impl SubmissionQueue {
192    /// Wake the connected [`Ring`].
193    ///
194    /// All this does is interrupt a call to [`Ring::poll`].
195    ///
196    /// # Notes
197    ///
198    /// When using [`Config::single_issuer`] (io_uring only) this will always
199    /// return `EEXIST` (when calling on a different thread).
200    pub fn wake(&self) {
201        if let Err(err) = self.0.wake() {
202            log::warn!("failed to wake a10::Ring: {err}");
203        }
204    }
205
206    pub(crate) fn submissions(&self) -> &sys::Submissions {
207        &self.0
208    }
209
210    /// Returns itself.
211    ///
212    /// Used by the operation macro to be generic over `SubmissionQueue` and
213    /// `AsyncFd`.
214    pub(crate) const fn sq(&self) -> &SubmissionQueue {
215        self
216    }
217}
218
219/// Helper macro to execute a system call that returns an `io::Result`.
220macro_rules! syscall {
221    ($fn: ident ( $($arg: expr),* $(,)? ) ) => {{
222        #[allow(unused_unsafe)]
223        let res = unsafe { libc::$fn($( $arg, )*) };
224        if res == -1 {
225            ::std::result::Result::Err(::std::io::Error::last_os_error())
226        } else {
227            ::std::result::Result::Ok(res)
228        }
229    }};
230}
231
232/// Link to online manual.
233#[rustfmt::skip]
234macro_rules! man_link {
235    ($syscall: tt ( $section: tt ) ) => {
236        concat!(
237            "\n\nAdditional documentation can be found in the ",
238            "[`", stringify!($syscall), "(", stringify!($section), ")`]",
239            "(https://man7.org/linux/man-pages/man", stringify!($section), "/", stringify!($syscall), ".", stringify!($section), ".html)",
240            " manual.\n"
241        )
242    };
243}
244
245macro_rules! new_flag {
246    (
247        $(
248        $(#[$type_meta:meta])*
249        $type_vis: vis struct $type_name: ident ( $type_repr: ty ) $( impl BitOr $( $type_or: ty )* )? {
250            $(
251            $(#[$value_meta:meta])*
252            $value_name: ident = $libc: ident :: $value_type: ident,
253            )*
254        }
255        )+
256    ) => {
257        $(
258        $(#[$type_meta])*
259        #[derive(Copy, Clone, Eq, PartialEq)]
260        $type_vis struct $type_name(pub(crate) $type_repr);
261
262        impl $type_name {
263            $(
264            $(#[$value_meta])*
265            #[allow(trivial_numeric_casts, clippy::cast_sign_loss)]
266            $type_vis const $value_name: $type_name = $type_name($libc::$value_type as $type_repr);
267            )*
268
269            // Need the value_meta to set the cfg attribute, but that also
270            // includes documentation, which we can ignore.
271            #[allow(unused_doc_comments, dead_code)]
272            pub(crate) const ALL_VALUES: &[$type_name] = &[
273                $(
274                $(#[$value_meta])*
275                $type_name::$value_name,
276                )*
277            ];
278        }
279
280        $crate::debug_detail!(impl match for $type_name($type_repr), $( $(#[$value_meta])* $libc::$value_type ),*);
281
282        $(
283        impl std::ops::BitOr for $type_name {
284            type Output = Self;
285
286            fn bitor(self, rhs: Self) -> Self::Output {
287                $type_name(self.0 | rhs.0)
288            }
289        }
290
291        $(
292        impl std::ops::BitOr<$type_or> for $type_name {
293            type Output = Self;
294
295            #[allow(clippy::cast_sign_loss)]
296            fn bitor(self, rhs: $type_or) -> Self::Output {
297                $type_name(self.0 | rhs as $type_repr)
298            }
299        }
300        )*
301        )?
302        )+
303    };
304}
305
306macro_rules! debug_detail {
307    (
308        // Match a value exactly.
309        match $type: ident ($event_type: ty),
310        $( $( #[$meta: meta] )* $libc: ident :: $flag: ident ),+ $(,)?
311    ) => {
312        struct $type($event_type);
313
314        $crate::debug_detail!(impl match for $type($event_type), $( $(#[$meta])* $libc::$flag ),*);
315    };
316    (
317        // Match a value exactly.
318        impl match for $type: ident ($type_repr: ty),
319        $( $( #[$meta: meta] )* $libc: ident :: $flag: ident ),* $(,)?
320    ) => {
321        impl ::std::fmt::Debug for $type {
322            #[allow(trivial_numeric_casts, unreachable_patterns, unreachable_code, unused_doc_comments, clippy::bad_bit_mask)]
323            fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
324                mod consts {
325                    $(
326                    $(#[$meta])*
327                    pub(super) const $flag: $type_repr = $libc :: $flag as $type_repr;
328                    )*
329                }
330
331                f.write_str(match self.0 {
332                    $(
333                    $(#[$meta])*
334                    consts::$flag => stringify!($flag),
335                    )*
336                    value => return value.fmt(f),
337                })
338            }
339        }
340    };
341    (
342        // Integer bitset.
343        bitset $type: ident ($event_type: ty),
344        $( $( #[$meta: meta] )* $libc: ident :: $flag: ident ),+ $(,)?
345    ) => {
346        struct $type($event_type);
347
348        $crate::debug_detail!(impl bitset for $type($event_type), $( $(#[$meta])* $libc::$flag ),*);
349    };
350    (
351        // Integer bitset.
352        impl bitset for $type: ident ($event_type: ty),
353        $( $( #[$meta: meta] )* $libc: ident :: $flag: ident ),+ $(,)?
354    ) => {
355        impl ::std::fmt::Debug for $type {
356            fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
357                let mut written_one = false;
358                $(
359                    $(#[$meta])*
360                    #[allow(clippy::bad_bit_mask)] // Apparently some flags are zero.
361                    {
362                        if self.0 & $libc :: $flag != 0 {
363                            if !written_one {
364                                write!(f, "{}", stringify!($flag))?;
365                                written_one = true;
366                            } else {
367                                write!(f, "|{}", stringify!($flag))?;
368                            }
369                        }
370                    }
371                )+
372                if !written_one {
373                    write!(f, "(empty)")
374                } else {
375                    Ok(())
376                }
377            }
378        }
379    };
380}
381
382use {debug_detail, man_link, new_flag, syscall};
383
384/// Lock `mutex` clearing any poison set.
385fn lock<'a, T>(mutex: &'a std::sync::Mutex<T>) -> std::sync::MutexGuard<'a, T> {
386    match mutex.lock() {
387        Ok(guard) => guard,
388        Err(err) => {
389            mutex.clear_poison();
390            err.into_inner()
391        }
392    }
393}
394
395/// Same as [`lock`], but doesn't block if the mutex is locked.
396#[cfg(any(target_os = "android", target_os = "linux"))]
397fn try_lock<'a, T>(mutex: &'a std::sync::Mutex<T>) -> Option<std::sync::MutexGuard<'a, T>> {
398    match mutex.try_lock() {
399        Ok(guard) => Some(guard),
400        Err(std::sync::TryLockError::Poisoned(err)) => {
401            mutex.clear_poison();
402            Some(err.into_inner())
403        }
404        Err(std::sync::TryLockError::WouldBlock) => None,
405    }
406}
407
408/// Get mutable access to the lock's data.
409#[cfg(any(target_os = "android", target_os = "linux"))]
410fn get_mut<'a, T>(mutex: &'a mut std::sync::Mutex<T>) -> &'a mut T {
411    match mutex.get_mut() {
412        Ok(guard) => guard,
413        Err(err) => err.into_inner(),
414    }
415}
416
417/// Trait to work with results for singleshot (`io::Result`) and multishot
418/// (`Option<io::Result>`) operations.
419// Replace this with std::ops::FromResidual once stable.
420#[allow(unused)]
421trait OpPollResult<T> {
422    fn from_ok(ok: T) -> Self;
423    fn from_err(err: io::Error) -> Self;
424    fn from_res(res: io::Result<T>) -> Self;
425    fn done() -> Self;
426}
427
428impl<T> OpPollResult<T> for io::Result<T> {
429    fn from_ok(ok: T) -> Self {
430        Ok(ok)
431    }
432
433    fn from_err(err: io::Error) -> Self {
434        Err(err)
435    }
436
437    fn from_res(res: io::Result<T>) -> Self {
438        res
439    }
440
441    fn done() -> Self {
442        unreachable!()
443    }
444}
445
446impl<T> OpPollResult<T> for Option<io::Result<T>> {
447    fn from_ok(ok: T) -> Self {
448        Some(Ok(ok))
449    }
450
451    fn from_err(err: io::Error) -> Self {
452        Some(Err(err))
453    }
454
455    fn from_res(res: io::Result<T>) -> Self {
456        Some(res)
457    }
458
459    fn done() -> Self {
460        None
461    }
462}