glommio/
lib.rs

1// Unless explicitly stated otherwise all files in this repository are licensed
2// under the MIT/Apache-2.0 License, at your convenience
3//
4// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020
5// Datadog, Inc.
6//
7//! # Glommio - asynchronous thread per core applications in Rust.
8//!
9//! ## What is Glommio
10//!
11//! Glommio is a library providing a safe Rust interface for asynchronous,
12//! thread-local I/O, based on the linux `io_uring` interface and Rust's `async`
13//! support. Glommio also provides support for pinning threads to CPUs, allowing
14//! thread-per-core applications in Rust.
15//!
16//! This library depends on linux's `io_uring` interface, so this is Linux-only,
17//! with a kernel version 5.8 or newer recommended.
18//!
19//! This library provides abstractions for timers, file I/O and networking plus
20//! support for multiple-queues and an internal scheduler, all without using
21//! helper threads.
22//!
23//! A more detailed exposition of Glommio's architecture is [available in this
24//! blog post](https://www.datadoghq.com/blog/engineering/introducing-glommio/)
25//!
26//! ### Rust `async`
27//!
28//! Using Glommio is not hard if you are familiar with rust async. All you have
29//! to do is:
30//!
31//! ```
32//! use glommio::LocalExecutorBuilder;
33//! LocalExecutorBuilder::default()
34//!     .spawn(|| async move {
35//!         // your code here
36//!     })
37//!     .unwrap();
38//! ```
39//!
40//! ### Pinned threads
41//!
42//! Although pinned threads are not required for use of glommio, by creating N
43//! executors and binding each to a specific CPU one can use this crate to
44//! implement a thread-per-core system where context switches essentially never
45//! happen, allowing much higher efficiency.
46//!
47//! You can easily bind an executor to a CPU by adjusting the
48//! LocalExecutorBuilder in the example above:
49//!
50//! ```
51//! /// This will now never leave CPU 0
52//! use glommio::{LocalExecutorBuilder, Placement};
53//! LocalExecutorBuilder::new(Placement::Fixed(0))
54//!     .spawn(|| async move {
55//!         // your code here
56//!     })
57//!     .unwrap();
58//! ```
59//!
60//! Note that you can only have one executor per thread, so if you need more
61//! executors, you will have to create more threads. A more ergonomic interface
62//! for that is planned but not yet available.
63//!
64//! ### Scheduling
65//!
66//! For a Thread-per-core system to work well, it is paramount that some form of
67//! scheduling can happen within the thread. Traditional applications use many
68//! threads to divide the many aspects of its workload and rely on the operating
69//! system and runtime to schedule these threads fairly and switch between these
70//! as necessary. For a thread-per-core system, each thread must handle its
71//! own scheduling at the application level.
72//!
73//! Glommio provides extensive abstractions for handling scheduling, allowing
74//! multiple tasks to proceed on the same thread. Task scheduling can be handled
75//! broadly through static shares, or more dynamically through the use of
76//! controllers:
77//!
78//! ```
79//! use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares};
80//!
81//! LocalExecutorBuilder::new(Placement::Fixed(0))
82//!     .spawn(|| async move {
83//!         let tq1 =
84//!             executor().create_task_queue(Shares::Static(2), Latency::NotImportant, "test1");
85//!         let tq2 =
86//!             executor().create_task_queue(Shares::Static(1), Latency::NotImportant, "test2");
87//!         let t1 = glommio::spawn_local_into(
88//!             async move {
89//!                 // your code here
90//!             },
91//!             tq1,
92//!         )
93//!         .unwrap();
94//!         let t2 = glommio::spawn_local_into(
95//!             async move {
96//!                 // your code here
97//!             },
98//!             tq2,
99//!         )
100//!         .unwrap();
101//!
102//!         t1.await;
103//!         t2.await;
104//!     })
105//!     .unwrap();
106//! ```
107//!
108//! This example creates two task queues: `tq1` has 2 shares, `tq2` has 1 share.
109//! This means that if both want to use the CPU to its maximum, `tq1` will have
110//! `2/3` of the CPU time `(2 / (1 + 2))` and `tq2` will have `1/3` of the CPU
111//! time. Those shares are dynamic and can be changed at any time. Notice that
112//! this scheduling method doesn't prevent either `tq1` no `tq2` from using 100%
113//! of CPU time at times in which they are the only task queue running: the
114//! shares are only considered when multiple queues need to run.
115//!
116//! ## Direct I/O
117//!
118//! Glommio makes Direct I/O a first-class citizen, although Buffered I/O is
119//! present as well for situations where it may make sense.
120//!
121//! This rides the trend of devices getting faster over the years and tries to
122//! bridge the software gap between fast devices, and fast storage applications.
123//! You can read more about it [in this article](https://itnext.io/modern-storage-is-plenty-fast-it-is-the-apis-that-are-bad-6a68319fbc1a)
124//!
125//! ## Controlled processes
126//!
127//! Glommio ships with embedded controllers. You can read more about them in the
128//! [Controllers](controllers) module documentation. Controllers allow one to
129//! automatically adjust the scheduler shares to control how fast a particular
130//! process should happen given a user-provided criteria.
131//!
132//! For a real-life application of such technology I recommend reading [this
133//! post](https://www.scylladb.com/2018/06/12/scylla-leverages-control-theory/) from Glauber.
134//!
135//! ## Prior work
136//!
137//! This work is heavily inspired (with some code respectfully imported) by the
138//! great work by Stjepan Glavina, in particular the following crates:
139//!
140//! * [async-io](https://github.com/stjepang/async-io)
141//! * [async-task](https://github.com/stjepang/async-task)
142//! * [async-executor](https://github.com/stjepang/async-executor)
143//!
144//! Aside from Stjepan's work, this is also inspired greatly by the [Seastar](http://seastar.io)
145//! Framework for C++ that powers I/O intensive systems that are pushing the
146//! performance envelope, like [ScyllaDB](https://www.scylladb.com/).
147//!
148//! ## Why is this its own crate?
149//!
150//! Cooperative Thread-per-core is a very specific programming model. Because
151//! only one task is executing per thread, the programmer never needs any
152//! locking to be held. Atomic operations are therefore rare, delegated to only
153//! a handful of corner case tasks.
154//!
155//! As atomic operations are costlier than their non-atomic counterparts, this
156//! improves efficiency by itself. However, it comes with the added benefits
157//! that context switches are virtually non-existent (they only occur for kernel
158//! threads and interrupts) and no time is ever wasted in waiting on locks.
159//!
160//! ## Why is this a single monolith instead of many crates
161//!
162//! Take as an example the [async-io](https://github.com/stjepang/async-io) crate. It has `park()`
163//! and `unpark()` methods. One can `park()` the current executor, and a helper
164//! thread will unpark it. This allows one to effectively use that crate with
165//! very little need for anything else for the simpler cases. Combined with
166//! synchronization primitives like `Condvar`, and other thread-pool based
167//! future crates, it excels in conjunction with others, but it is useful on its
168//! own.
169//!
170//! Now contrast that to the equivalent bits in this crate: once you `park()`
171//! the thread, you can't unpark it. I/O never gets dispatched without explicit
172//! calling into the reactor, which makes for a very weird programming model,
173//! and it is very hard to integrate with the outside world since most external
174//! I/O related crates have threads that sooner or later will require [`Send`] +
175//! [`Sync`].
176//!
177//! A single crate is a way to minimize friction.
178//!
179//! ## `io_uring`
180//!
181//! This crate depends heavily on Linux's `io_uring`. The reactor will register
182//! 3 rings per CPU:
183//!
184//!  * *Main ring*: The main ring, as its name implies, is where most operations
185//!    will be placed. Once the reactor is parked, it only returns if the main
186//!    ring has events to report.
187//!
188//!  * *Latency ring*: Operations that are latency sensitive can be put in the
189//!    latency ring. The crate has a function called `yield_if_needed()` that
190//!    efficiently checks if there are events pending in the latency ring.
191//!    Because this crate uses `cooperative` programming, tasks run until they
192//!    either complete or decide to yield, which means they can run for a very
193//!    long time before tasks that are latency sensitive have a chance to run.
194//!    Every time you fire a long-running operation (usually a loop) it is good
195//!    practice to check [`yield_if_needed()`] periodically (for example after x
196//!    iterations of the loop). In particular, a when a new priority class is
197//!    registered, one can specify if it contains latency sensitive tasks or
198//!    not. And if the queue is marked as latency sensitive, the Latency enum
199//!    takes a duration parameter that determines for how long other tasks can
200//!    run even if there are no external events (by registering a timer with the
201//!    io_uring). If no runnable tasks in the system are latency sensitive, this
202//!    timer is not registered. Because `io_uring` allows for polling in the
203//!    ring file descriptor, it is safe to `park()` even if work is present in
204//!    the latency ring: before going to sleep, the latency ring's file
205//!    descriptor is registered with the main ring and any events it sees will
206//!    also wake up the main ring.
207//!
208//!  * *Poll ring*: Read and write operations on NVMe devices are put in the
209//!    poll ring. The poll ring does not rely on interrupts so the system has to
210//!    keep constantly polling if there is any pending work. By not relying on
211//!    interrupts we can be even more efficient with I/O in high IOPS scenarios
212//!
213//! ## Before using Glommio
214//!
215//! Please note Glommio requires at least 512 KiB of locked memory for
216//! `io_uring` to work. You can increase the `memlock` resource limit (rlimit)
217//! as follows:
218//!
219//! ```sh
220//! $ vi /etc/security/limits.conf
221//! *    hard    memlock        512
222//! *    soft    memlock        512
223//! ```
224//!
225//! To make the new limits effective, you need to log in to the machine again.
226//! You can verify that the limits are updated by running the following:
227//!
228//! ```sh
229//! $ ulimit -l
230//! 512
231//! ```
232//!
233//! Glommio also requires a kernel with a recent enough `io_uring` support, at
234//! least recent enough to run discovery probes. The minimum version at this
235//! time is 5.8
236//!
237//!
238//! ## Examples
239//!
240//! Connect to `example.com:80`, or time out after 10 seconds:
241//!
242//! ```
243//! use futures_lite::{future::FutureExt, io};
244//! use glommio::{net::TcpStream, timer::Timer, LocalExecutor};
245//!
246//! use std::time::Duration;
247//!
248//! let local_ex = LocalExecutor::default();
249//! local_ex.run(async {
250//!     let timeout = async {
251//!         Timer::new(Duration::from_secs(10)).await;
252//!         Err(io::Error::new(io::ErrorKind::TimedOut, "").into())
253//!     };
254//!     let stream = TcpStream::connect("::80").or(timeout).await?;
255//!
256//!     // Read or write from stream
257//!
258//!     std::io::Result::Ok(())
259//! });
260//! ```
261#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
262#![cfg_attr(doc, deny(rustdoc::broken_intra_doc_links))]
263#![cfg_attr(feature = "native-tls", feature(thread_local))]
264
265#[macro_use]
266extern crate nix;
267extern crate alloc;
268#[macro_use]
269extern crate lazy_static;
270#[macro_use(defer)]
271extern crate scopeguard;
272
273/// Call [`Waker::wake()`] and log to `error` if panicked.
274macro_rules! wake {
275    ($waker:expr $(,)?) => {
276        use log::error;
277
278        if let Err(x) = std::panic::catch_unwind(|| $waker.wake()) {
279            error!("Panic while calling waker! {x:?}");
280        }
281    };
282}
283
284mod free_list;
285
286#[allow(clippy::redundant_slicing)]
287#[allow(dead_code)]
288#[allow(clippy::upper_case_acronyms)]
289mod iou;
290mod parking;
291mod reactor;
292mod sys;
293pub mod task;
294
295#[allow(dead_code)]
296#[allow(clippy::upper_case_acronyms)]
297mod uring_sys;
298
299#[cfg(feature = "bench")]
300#[doc(hidden)]
301pub mod nop;
302
303/// Unwraps a Result to Poll<T>: if error returns right away.
304///
305/// Usage is similar to `future_lite::ready!`
306macro_rules! poll_err {
307    ($e:expr $(,)?) => {
308        match $e {
309            Ok(t) => t,
310            Err(x) => return std::task::Poll::Ready(Err(x)),
311        }
312    };
313}
314
315/// Unwraps an Option to Poll<T>: if Some returns right away.
316///
317/// Usage is similar to `future_lite::ready!`
318#[allow(unused)]
319macro_rules! poll_some {
320    ($e:expr $(,)?) => {
321        match $e {
322            Some(t) => return std::task::Poll::Ready(t),
323            None => {}
324        }
325    };
326}
327
328#[macro_export]
329/// Converts a Nix error into a native ErrorKind
330macro_rules! to_io_error {
331    ($e:expr) => {
332        match $e {
333            nix::errno::Errno::EACCES => io::Error::from(io::ErrorKind::PermissionDenied),
334            nix::errno::Errno::EADDRINUSE => io::Error::from(io::ErrorKind::AddrInUse),
335            nix::errno::Errno::EADDRNOTAVAIL => io::Error::from(io::ErrorKind::AddrNotAvailable),
336            nix::errno::Errno::EAGAIN => io::Error::from(io::ErrorKind::WouldBlock),
337            nix::errno::Errno::ECONNABORTED => io::Error::from(io::ErrorKind::ConnectionAborted),
338            nix::errno::Errno::ECONNREFUSED => io::Error::from(io::ErrorKind::ConnectionRefused),
339            nix::errno::Errno::ECONNRESET => io::Error::from(io::ErrorKind::ConnectionReset),
340            nix::errno::Errno::EINTR => io::Error::from(io::ErrorKind::Interrupted),
341            nix::errno::Errno::EINVAL => io::Error::from(io::ErrorKind::InvalidInput),
342            nix::errno::Errno::ENAMETOOLONG => io::Error::from(io::ErrorKind::InvalidInput),
343            nix::errno::Errno::ENOENT => io::Error::from(io::ErrorKind::NotFound),
344            nix::errno::Errno::ENOTCONN => io::Error::from(io::ErrorKind::NotConnected),
345            nix::errno::Errno::ENOTEMPTY => io::Error::from(io::ErrorKind::AlreadyExists),
346            nix::errno::Errno::EPERM => io::Error::from(io::ErrorKind::PermissionDenied),
347            nix::errno::Errno::ETIMEDOUT => io::Error::from(io::ErrorKind::TimedOut),
348            _ => io::Error::from(io::ErrorKind::Other),
349        }
350    };
351}
352
353#[cfg(test)]
354macro_rules! test_executor {
355    ($( $fut:expr ),+ ) => {
356    use futures::future::join_all;
357
358    let local_ex = crate::executor::LocalExecutorBuilder::new(crate::executor::Placement::Unbound)
359            .record_io_latencies(true)
360            .make()
361            .unwrap();
362    local_ex.run(async move {
363        let mut joins = Vec::new();
364        $(
365            joins.push(crate::spawn_local($fut));
366        )*
367        join_all(joins).await;
368    });
369    }
370}
371
372/// Wait for a variable to acquire a specific value.
373/// The variable is expected to be a Rc<RefCell>
374///
375/// Alternatively it is possible to pass a timeout in seconds
376/// (through an Instant object)
377///
378/// Updates to the variable gating the condition can be done (if convenient)
379/// through update_cond!() (below)
380///
381/// Mostly useful for tests.
382#[cfg(test)]
383macro_rules! wait_on_cond {
384    ($var:expr, $val:expr) => {
385        loop {
386            if *($var.borrow()) == $val {
387                break;
388            }
389            crate::executor().yield_task_queue_now().await;
390        }
391    };
392    ($var:expr, $val:expr, $instantval:expr) => {
393        let start = Instant::now();
394        loop {
395            if *($var.borrow()) == $val {
396                break;
397            }
398
399            if start.elapsed().as_secs() > $instantval {
400                panic!("test timed out");
401            }
402            crate::executor().yield_task_queue_now().await;
403        }
404    };
405}
406
407#[cfg(test)]
408macro_rules! update_cond {
409    ($cond:expr, $val:expr) => {
410        *($cond.borrow_mut()) = $val;
411    };
412}
413
414#[cfg(test)]
415macro_rules! make_shared_var {
416    ($var:expr, $( $name:ident ),+ ) => {
417        let local_name = Rc::new($var);
418        $( let $name = local_name.clone(); )*
419    }
420}
421
422#[cfg(test)]
423macro_rules! make_shared_var_mut {
424    ($var:expr, $( $name:ident ),+ ) => {
425        let local_name = Rc::new(RefCell::new($var));
426        $( let $name = local_name.clone(); )*
427    }
428}
429
430mod byte_slice_ext;
431pub mod channels;
432pub mod controllers;
433mod error;
434mod executor;
435pub mod io;
436pub mod net;
437mod shares;
438pub mod sync;
439pub mod timer;
440
441use crate::reactor::Reactor;
442pub use crate::{
443    byte_slice_ext::{ByteSliceExt, ByteSliceMutExt},
444    error::{
445        BuilderErrorKind, ExecutorErrorKind, GlommioError, QueueErrorKind, ReactorErrorKind,
446        ResourceType, Result,
447    },
448    executor::{
449        allocate_dma_buffer, allocate_dma_buffer_global, executor, spawn_local, spawn_local_into,
450        spawn_scoped_local, spawn_scoped_local_into,
451        stall::{DefaultStallDetectionHandler, StallDetectionHandler},
452        yield_if_needed, CpuSet, ExecutorJoinHandle, ExecutorProxy, ExecutorStats, LocalExecutor,
453        LocalExecutorBuilder, LocalExecutorPoolBuilder, Placement, PoolPlacement,
454        PoolThreadHandles, ScopedTask, Task, TaskQueueHandle, TaskQueueStats,
455    },
456    shares::{Shares, SharesManager},
457    sys::hardware_topology::CpuLocation,
458};
459pub use enclose::enclose;
460pub use scopeguard::defer;
461use sketches_ddsketch::DDSketch;
462use std::{
463    fmt::{Debug, Formatter},
464    iter::Sum,
465    time::Duration,
466};
467
468/// Provides common imports that almost all Glommio applications will need
469pub mod prelude {
470    #[doc(no_inline)]
471    pub use crate::{
472        error::GlommioError, executor, spawn_local, spawn_local_into, yield_if_needed,
473        ByteSliceExt, ByteSliceMutExt, ExecutorProxy, IoStats, Latency, LocalExecutor,
474        LocalExecutorBuilder, LocalExecutorPoolBuilder, Placement, PoolPlacement,
475        PoolThreadHandles, RingIoStats, Shares, TaskQueueHandle,
476    };
477}
478
479/// An attribute of a [`TaskQueue`], passed during its creation.
480///
481/// This tells the executor whether tasks in this class are latency
482/// sensitive. Latency sensitive tasks will be placed in their own I/O ring,
483/// and tasks in background classes can cooperatively preempt themselves in
484/// the faces of pending events for latency classes.
485///
486/// [`TaskQueue`]: struct.TaskQueueHandle.html
487#[derive(Clone, Copy, Debug)]
488pub enum Latency {
489    /// Tasks marked as `Latency::Matters` will cooperatively signal to other
490    /// tasks that they should preempt often. The `Duration` argument
491    /// contributes to the rate of preemption of the scheduler.
492    Matters(Duration),
493
494    /// Tasks marked as `Latency::NotImportant` will not signal to other tasks
495    /// that they should preempt often
496    NotImportant,
497}
498
499#[derive(Clone, Copy, Debug)]
500pub(crate) struct IoRequirements {
501    latency_req: Latency,
502    _io_handle: usize,
503}
504
505impl Default for IoRequirements {
506    fn default() -> Self {
507        Self {
508            latency_req: Latency::NotImportant,
509            _io_handle: 0,
510        }
511    }
512}
513
514impl IoRequirements {
515    fn new(latency: Latency, handle: usize) -> Self {
516        Self {
517            latency_req: latency,
518            _io_handle: handle,
519        }
520    }
521}
522
523/// Stores information about IO performed in a specific ring
524#[derive(Clone)]
525pub struct RingIoStats {
526    // Counters
527    pub(crate) files_opened: u64,
528    pub(crate) files_closed: u64,
529    pub(crate) file_reads: u64,
530    pub(crate) file_bytes_read: u64,
531    pub(crate) file_buffered_reads: u64,
532    pub(crate) file_buffered_bytes_read: u64,
533    pub(crate) file_deduped_reads: u64,
534    pub(crate) file_deduped_bytes_read: u64,
535    pub(crate) file_writes: u64,
536    pub(crate) file_bytes_written: u64,
537    pub(crate) file_buffered_writes: u64,
538    pub(crate) file_buffered_bytes_written: u64,
539
540    // Distributions
541    pub(crate) pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
542    pub(crate) io_latency_us: sketches_ddsketch::DDSketch,
543    pub(crate) post_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
544}
545
546impl Default for RingIoStats {
547    fn default() -> Self {
548        Self {
549            files_opened: 0,
550            files_closed: 0,
551            file_reads: 0,
552            file_bytes_read: 0,
553            file_buffered_reads: 0,
554            file_buffered_bytes_read: 0,
555            file_deduped_reads: 0,
556            file_deduped_bytes_read: 0,
557            file_writes: 0,
558            file_bytes_written: 0,
559            file_buffered_writes: 0,
560            file_buffered_bytes_written: 0,
561            pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch::new(
562                sketches_ddsketch::Config::new(0.01, 2048, 1.0e-9),
563            ),
564            io_latency_us: sketches_ddsketch::DDSketch::new(sketches_ddsketch::Config::new(
565                0.01, 2048, 1.0e-9,
566            )),
567            post_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch::new(
568                sketches_ddsketch::Config::new(0.01, 2048, 1.0e-9),
569            ),
570        }
571    }
572}
573
574impl Debug for RingIoStats {
575    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
576        f.debug_struct("RingIoStats")
577            .field("files_opened", &self.files_opened)
578            .field("files_closed", &self.files_closed)
579            .field("file_reads", &self.file_reads)
580            .field("file_bytes_read", &self.file_bytes_read)
581            .field("file_buffered_reads", &self.file_buffered_reads)
582            .field("file_buffered_bytes_read", &self.file_buffered_bytes_read)
583            .field("file_deduped_reads", &self.file_deduped_reads)
584            .field("file_deduped_bytes_read", &self.file_deduped_bytes_read)
585            .field("file_writes", &self.file_writes)
586            .field("file_bytes_written", &self.file_bytes_written)
587            .field("file_buffered_writes", &self.file_buffered_writes)
588            .field(
589                "file_buffered_bytes_written",
590                &self.file_buffered_bytes_written,
591            )
592            .finish_non_exhaustive()
593    }
594}
595
596impl RingIoStats {
597    /// The total amount of files opened in this executor so far.
598    ///
599    /// [`files_opened`] - [`files_closed`] gives the current open files count
600    ///
601    /// [`files_opened`]: RingIoStats::files_opened
602    /// [`files_closed`]: RingIoStats::files_closed
603    pub fn files_opened(&self) -> u64 {
604        self.files_opened
605    }
606
607    /// The total amount of files closed in this executor so far.
608    ///
609    /// [`files_opened`] - [`files_closed`] gives the current open files count
610    ///
611    /// [`files_opened`]: RingIoStats::files_opened
612    /// [`files_closed`]: RingIoStats::files_closed
613    pub fn files_closed(&self) -> u64 {
614        self.files_closed
615    }
616
617    /// File read IO stats
618    ///
619    /// Returns the number of individual read ops as well as bytes read
620    pub fn file_reads(&self) -> (u64, u64) {
621        (self.file_reads, self.file_bytes_read)
622    }
623
624    /// File read IO stats (deduplicated)
625    ///
626    /// Returns the number of reads that fed from another preexisting buffer
627    pub fn file_deduped_reads(&self) -> (u64, u64) {
628        (self.file_deduped_reads, self.file_deduped_bytes_read)
629    }
630
631    /// Buffered file read IO stats
632    ///
633    /// Returns the number of individual buffered read ops as well as bytes read
634    pub fn file_buffered_reads(&self) -> (u64, u64) {
635        (self.file_buffered_reads, self.file_buffered_bytes_read)
636    }
637
638    /// File write IO stats
639    ///
640    /// Returns the number of individual write ops as well as bytes written
641    pub fn file_writes(&self) -> (u64, u64) {
642        (self.file_writes, self.file_bytes_written)
643    }
644
645    /// Buffered file write IO stats
646    ///
647    /// Returns the number of individual buffered write ops as well as bytes
648    /// written
649    pub fn file_buffered_writes(&self) -> (u64, u64) {
650        (self.file_buffered_writes, self.file_buffered_bytes_written)
651    }
652
653    /// The pre-reactor IO scheduler latency
654    ///
655    /// Returns a distribution of measures tracking the time between the moment
656    /// an IO operation was queued up and the moment it was submitted to the
657    /// kernel
658    pub fn pre_reactor_io_scheduler_latency_us(&self) -> &DDSketch {
659        &self.pre_reactor_io_scheduler_latency_us
660    }
661
662    /// The IO latency
663    ///
664    /// Returns a distribution of measures tracking the time sources spent in
665    /// the ring
666    pub fn io_latency_us(&self) -> &DDSketch {
667        &self.io_latency_us
668    }
669
670    /// The post-reactor IO scheduler latency
671    ///
672    /// Returns a distribution of measures tracking the time between the moment
673    /// an IO operation was marked as fulfilled by the reactor and when the
674    /// result was consumed by the application code.
675    pub fn post_reactor_io_scheduler_latency_us(&self) -> &DDSketch {
676        &self.post_reactor_io_scheduler_latency_us
677    }
678}
679
680impl<'a> Sum<&'a RingIoStats> for RingIoStats {
681    fn sum<I: Iterator<Item = &'a RingIoStats>>(iter: I) -> Self {
682        iter.fold(RingIoStats::default(), |mut a, b| {
683            a.files_opened += b.files_opened;
684            a.files_closed += b.files_closed;
685            a.file_reads += b.file_reads;
686            a.file_bytes_read += b.file_bytes_read;
687            a.file_buffered_reads += b.file_buffered_reads;
688            a.file_buffered_bytes_read += b.file_buffered_bytes_read;
689            a.file_deduped_reads += b.file_deduped_reads;
690            a.file_deduped_bytes_read += b.file_deduped_bytes_read;
691            a.file_writes += b.file_writes;
692            a.file_bytes_written += b.file_bytes_written;
693            a.file_buffered_writes += b.file_buffered_writes;
694            a.file_buffered_bytes_written += b.file_buffered_bytes_written;
695            a.pre_reactor_io_scheduler_latency_us
696                .merge(&b.pre_reactor_io_scheduler_latency_us)
697                .unwrap();
698            a.io_latency_us.merge(&b.io_latency_us).unwrap();
699            a.post_reactor_io_scheduler_latency_us
700                .merge(&b.post_reactor_io_scheduler_latency_us)
701                .unwrap();
702            a
703        })
704    }
705}
706
707/// Stores information about IO
708#[derive(Debug)]
709pub struct IoStats {
710    /// The IO stats of the main ring
711    pub main_ring: RingIoStats,
712    /// The IO stats of the latency ring
713    pub latency_ring: RingIoStats,
714    /// The IO stats of the poll ring
715    pub poll_ring: RingIoStats,
716}
717
718impl IoStats {
719    fn new(main_ring: RingIoStats, latency_ring: RingIoStats, poll_ring: RingIoStats) -> IoStats {
720        IoStats {
721            main_ring,
722            latency_ring,
723            poll_ring,
724        }
725    }
726
727    /// Combine stats from all rings
728    pub fn all_rings(&self) -> RingIoStats {
729        [&self.main_ring, &self.latency_ring, &self.poll_ring]
730            .iter()
731            .copied()
732            .sum()
733    }
734}
735
736#[cfg(test)]
737pub(crate) mod test_utils {
738    use super::*;
739    use nix::sys::statfs::*;
740    use std::path::{Path, PathBuf};
741    use tracing::{debug, error, info, trace, warn};
742    use tracing_subscriber::EnvFilter;
743
744    #[derive(Copy, Clone)]
745    pub(crate) enum TestDirectoryKind {
746        TempFs,
747        PollMedia,
748        NonPollMedia,
749    }
750
751    pub(crate) struct TestDirectory {
752        pub(crate) path: PathBuf,
753        pub(crate) kind: TestDirectoryKind,
754    }
755
756    impl Drop for TestDirectory {
757        fn drop(&mut self) {
758            let _ = std::fs::remove_dir_all(&self.path);
759        }
760    }
761
762    pub(crate) fn make_test_directories(test_name: &str) -> std::vec::Vec<TestDirectory> {
763        let mut vec = Vec::new();
764
765        // Glommio currently only supports NVMe-backed volumes formatted with XFS or
766        // EXT4. We therefore let the user decide what directory glommio should
767        // use to host the unit tests in. For more information regarding this
768        // limitation, see the README
769        match std::env::var("GLOMMIO_TEST_POLLIO_ROOTDIR") {
770            Err(_) => {
771                eprintln!(
772                    "Glommio currently only supports NVMe-backed volumes formatted with XFS or \
773                     EXT4. To run poll io-related tests, please set GLOMMIO_TEST_POLLIO_ROOTDIR \
774                     to a NVMe-backed directory path in your environment.\nPoll io tests will not \
775                     run."
776                );
777            }
778            Ok(path) => {
779                for p in path.split(',') {
780                    vec.push(make_poll_test_directory(p, test_name));
781                }
782            }
783        };
784
785        vec.push(make_tmp_test_directory(test_name));
786        vec
787    }
788
789    pub(crate) fn make_poll_test_directory<P: AsRef<Path>>(
790        path: P,
791        test_name: &str,
792    ) -> TestDirectory {
793        let mut dir = path.as_ref().to_owned();
794        std::assert!(dir.exists());
795
796        dir.push(test_name);
797        let _ = std::fs::remove_dir_all(&dir);
798        std::fs::create_dir_all(&dir).unwrap();
799
800        TestDirectory {
801            path: dir,
802            kind: TestDirectoryKind::PollMedia,
803        }
804    }
805
806    pub(crate) fn make_tmp_test_directory(test_name: &str) -> TestDirectory {
807        let mut dir = std::env::temp_dir();
808        dir.push(test_name);
809        let _ = std::fs::remove_dir_all(&dir);
810        std::fs::create_dir_all(&dir).unwrap();
811        let buf = statfs(&dir).unwrap();
812        let fstype = buf.filesystem_type();
813
814        let kind = if (fstype.0 as u64) == (libc::TMPFS_MAGIC as u64) {
815            TestDirectoryKind::TempFs
816        } else {
817            TestDirectoryKind::NonPollMedia
818        };
819        TestDirectory { path: dir, kind }
820    }
821
822    #[test]
823    #[allow(unused_must_use)]
824    fn test_tracing_init() {
825        tracing_subscriber::fmt::fmt()
826            .with_env_filter(EnvFilter::from_env("GLOMMIO_TRACE"))
827            .try_init();
828
829        info!("Started tracing..");
830        debug!("Started tracing..");
831        warn!("Started tracing..");
832        trace!("Started tracing..");
833        error!("Started tracing..");
834    }
835}