glommio/lib.rs
1// Unless explicitly stated otherwise all files in this repository are licensed
2// under the MIT/Apache-2.0 License, at your convenience
3//
4// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2020
5// Datadog, Inc.
6//
7//! # Glommio - asynchronous thread per core applications in Rust.
8//!
9//! ## What is Glommio
10//!
11//! Glommio is a library providing a safe Rust interface for asynchronous,
12//! thread-local I/O, based on the linux `io_uring` interface and Rust's `async`
13//! support. Glommio also provides support for pinning threads to CPUs, allowing
14//! thread-per-core applications in Rust.
15//!
16//! This library depends on linux's `io_uring` interface, so this is Linux-only,
17//! with a kernel version 5.8 or newer recommended.
18//!
19//! This library provides abstractions for timers, file I/O and networking plus
20//! support for multiple-queues and an internal scheduler, all without using
21//! helper threads.
22//!
23//! A more detailed exposition of Glommio's architecture is [available in this
24//! blog post](https://www.datadoghq.com/blog/engineering/introducing-glommio/)
25//!
26//! ### Rust `async`
27//!
28//! Using Glommio is not hard if you are familiar with rust async. All you have
29//! to do is:
30//!
31//! ```
32//! use glommio::LocalExecutorBuilder;
33//! LocalExecutorBuilder::default()
34//! .spawn(|| async move {
35//! // your code here
36//! })
37//! .unwrap();
38//! ```
39//!
40//! ### Pinned threads
41//!
42//! Although pinned threads are not required for use of glommio, by creating N
43//! executors and binding each to a specific CPU one can use this crate to
44//! implement a thread-per-core system where context switches essentially never
45//! happen, allowing much higher efficiency.
46//!
47//! You can easily bind an executor to a CPU by adjusting the
48//! LocalExecutorBuilder in the example above:
49//!
50//! ```
51//! /// This will now never leave CPU 0
52//! use glommio::{LocalExecutorBuilder, Placement};
53//! LocalExecutorBuilder::new(Placement::Fixed(0))
54//! .spawn(|| async move {
55//! // your code here
56//! })
57//! .unwrap();
58//! ```
59//!
60//! Note that you can only have one executor per thread, so if you need more
61//! executors, you will have to create more threads. A more ergonomic interface
62//! for that is planned but not yet available.
63//!
64//! ### Scheduling
65//!
66//! For a Thread-per-core system to work well, it is paramount that some form of
67//! scheduling can happen within the thread. Traditional applications use many
68//! threads to divide the many aspects of its workload and rely on the operating
69//! system and runtime to schedule these threads fairly and switch between these
70//! as necessary. For a thread-per-core system, each thread must handle its
71//! own scheduling at the application level.
72//!
73//! Glommio provides extensive abstractions for handling scheduling, allowing
74//! multiple tasks to proceed on the same thread. Task scheduling can be handled
75//! broadly through static shares, or more dynamically through the use of
76//! controllers:
77//!
78//! ```
79//! use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares};
80//!
81//! LocalExecutorBuilder::new(Placement::Fixed(0))
82//! .spawn(|| async move {
83//! let tq1 =
84//! executor().create_task_queue(Shares::Static(2), Latency::NotImportant, "test1");
85//! let tq2 =
86//! executor().create_task_queue(Shares::Static(1), Latency::NotImportant, "test2");
87//! let t1 = glommio::spawn_local_into(
88//! async move {
89//! // your code here
90//! },
91//! tq1,
92//! )
93//! .unwrap();
94//! let t2 = glommio::spawn_local_into(
95//! async move {
96//! // your code here
97//! },
98//! tq2,
99//! )
100//! .unwrap();
101//!
102//! t1.await;
103//! t2.await;
104//! })
105//! .unwrap();
106//! ```
107//!
108//! This example creates two task queues: `tq1` has 2 shares, `tq2` has 1 share.
109//! This means that if both want to use the CPU to its maximum, `tq1` will have
110//! `2/3` of the CPU time `(2 / (1 + 2))` and `tq2` will have `1/3` of the CPU
111//! time. Those shares are dynamic and can be changed at any time. Notice that
112//! this scheduling method doesn't prevent either `tq1` no `tq2` from using 100%
113//! of CPU time at times in which they are the only task queue running: the
114//! shares are only considered when multiple queues need to run.
115//!
116//! ## Direct I/O
117//!
118//! Glommio makes Direct I/O a first-class citizen, although Buffered I/O is
119//! present as well for situations where it may make sense.
120//!
121//! This rides the trend of devices getting faster over the years and tries to
122//! bridge the software gap between fast devices, and fast storage applications.
123//! You can read more about it [in this article](https://itnext.io/modern-storage-is-plenty-fast-it-is-the-apis-that-are-bad-6a68319fbc1a)
124//!
125//! ## Controlled processes
126//!
127//! Glommio ships with embedded controllers. You can read more about them in the
128//! [Controllers](controllers) module documentation. Controllers allow one to
129//! automatically adjust the scheduler shares to control how fast a particular
130//! process should happen given a user-provided criteria.
131//!
132//! For a real-life application of such technology I recommend reading [this
133//! post](https://www.scylladb.com/2018/06/12/scylla-leverages-control-theory/) from Glauber.
134//!
135//! ## Prior work
136//!
137//! This work is heavily inspired (with some code respectfully imported) by the
138//! great work by Stjepan Glavina, in particular the following crates:
139//!
140//! * [async-io](https://github.com/stjepang/async-io)
141//! * [async-task](https://github.com/stjepang/async-task)
142//! * [async-executor](https://github.com/stjepang/async-executor)
143//!
144//! Aside from Stjepan's work, this is also inspired greatly by the [Seastar](http://seastar.io)
145//! Framework for C++ that powers I/O intensive systems that are pushing the
146//! performance envelope, like [ScyllaDB](https://www.scylladb.com/).
147//!
148//! ## Why is this its own crate?
149//!
150//! Cooperative Thread-per-core is a very specific programming model. Because
151//! only one task is executing per thread, the programmer never needs any
152//! locking to be held. Atomic operations are therefore rare, delegated to only
153//! a handful of corner case tasks.
154//!
155//! As atomic operations are costlier than their non-atomic counterparts, this
156//! improves efficiency by itself. However, it comes with the added benefits
157//! that context switches are virtually non-existent (they only occur for kernel
158//! threads and interrupts) and no time is ever wasted in waiting on locks.
159//!
160//! ## Why is this a single monolith instead of many crates
161//!
162//! Take as an example the [async-io](https://github.com/stjepang/async-io) crate. It has `park()`
163//! and `unpark()` methods. One can `park()` the current executor, and a helper
164//! thread will unpark it. This allows one to effectively use that crate with
165//! very little need for anything else for the simpler cases. Combined with
166//! synchronization primitives like `Condvar`, and other thread-pool based
167//! future crates, it excels in conjunction with others, but it is useful on its
168//! own.
169//!
170//! Now contrast that to the equivalent bits in this crate: once you `park()`
171//! the thread, you can't unpark it. I/O never gets dispatched without explicit
172//! calling into the reactor, which makes for a very weird programming model,
173//! and it is very hard to integrate with the outside world since most external
174//! I/O related crates have threads that sooner or later will require [`Send`] +
175//! [`Sync`].
176//!
177//! A single crate is a way to minimize friction.
178//!
179//! ## `io_uring`
180//!
181//! This crate depends heavily on Linux's `io_uring`. The reactor will register
182//! 3 rings per CPU:
183//!
184//! * *Main ring*: The main ring, as its name implies, is where most operations
185//! will be placed. Once the reactor is parked, it only returns if the main
186//! ring has events to report.
187//!
188//! * *Latency ring*: Operations that are latency sensitive can be put in the
189//! latency ring. The crate has a function called `yield_if_needed()` that
190//! efficiently checks if there are events pending in the latency ring.
191//! Because this crate uses `cooperative` programming, tasks run until they
192//! either complete or decide to yield, which means they can run for a very
193//! long time before tasks that are latency sensitive have a chance to run.
194//! Every time you fire a long-running operation (usually a loop) it is good
195//! practice to check [`yield_if_needed()`] periodically (for example after x
196//! iterations of the loop). In particular, a when a new priority class is
197//! registered, one can specify if it contains latency sensitive tasks or
198//! not. And if the queue is marked as latency sensitive, the Latency enum
199//! takes a duration parameter that determines for how long other tasks can
200//! run even if there are no external events (by registering a timer with the
201//! io_uring). If no runnable tasks in the system are latency sensitive, this
202//! timer is not registered. Because `io_uring` allows for polling in the
203//! ring file descriptor, it is safe to `park()` even if work is present in
204//! the latency ring: before going to sleep, the latency ring's file
205//! descriptor is registered with the main ring and any events it sees will
206//! also wake up the main ring.
207//!
208//! * *Poll ring*: Read and write operations on NVMe devices are put in the
209//! poll ring. The poll ring does not rely on interrupts so the system has to
210//! keep constantly polling if there is any pending work. By not relying on
211//! interrupts we can be even more efficient with I/O in high IOPS scenarios
212//!
213//! ## Before using Glommio
214//!
215//! Please note Glommio requires at least 512 KiB of locked memory for
216//! `io_uring` to work. You can increase the `memlock` resource limit (rlimit)
217//! as follows:
218//!
219//! ```sh
220//! $ vi /etc/security/limits.conf
221//! * hard memlock 512
222//! * soft memlock 512
223//! ```
224//!
225//! To make the new limits effective, you need to log in to the machine again.
226//! You can verify that the limits are updated by running the following:
227//!
228//! ```sh
229//! $ ulimit -l
230//! 512
231//! ```
232//!
233//! Glommio also requires a kernel with a recent enough `io_uring` support, at
234//! least recent enough to run discovery probes. The minimum version at this
235//! time is 5.8
236//!
237//!
238//! ## Examples
239//!
240//! Connect to `example.com:80`, or time out after 10 seconds:
241//!
242//! ```
243//! use futures_lite::{future::FutureExt, io};
244//! use glommio::{net::TcpStream, timer::Timer, LocalExecutor};
245//!
246//! use std::time::Duration;
247//!
248//! let local_ex = LocalExecutor::default();
249//! local_ex.run(async {
250//! let timeout = async {
251//! Timer::new(Duration::from_secs(10)).await;
252//! Err(io::Error::new(io::ErrorKind::TimedOut, "").into())
253//! };
254//! let stream = TcpStream::connect("::80").or(timeout).await?;
255//!
256//! // Read or write from stream
257//!
258//! std::io::Result::Ok(())
259//! });
260//! ```
261#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
262#![cfg_attr(doc, deny(rustdoc::broken_intra_doc_links))]
263#![cfg_attr(feature = "native-tls", feature(thread_local))]
264
265#[macro_use]
266extern crate nix;
267extern crate alloc;
268#[macro_use]
269extern crate lazy_static;
270#[macro_use(defer)]
271extern crate scopeguard;
272
273/// Call [`Waker::wake()`] and log to `error` if panicked.
274macro_rules! wake {
275 ($waker:expr $(,)?) => {
276 use log::error;
277
278 if let Err(x) = std::panic::catch_unwind(|| $waker.wake()) {
279 error!("Panic while calling waker! {x:?}");
280 }
281 };
282}
283
284mod free_list;
285
286#[allow(clippy::redundant_slicing)]
287#[allow(dead_code)]
288#[allow(clippy::upper_case_acronyms)]
289mod iou;
290mod parking;
291mod reactor;
292mod sys;
293pub mod task;
294
295#[allow(dead_code)]
296#[allow(clippy::upper_case_acronyms)]
297mod uring_sys;
298
299#[cfg(feature = "bench")]
300#[doc(hidden)]
301pub mod nop;
302
303/// Unwraps a Result to Poll<T>: if error returns right away.
304///
305/// Usage is similar to `future_lite::ready!`
306macro_rules! poll_err {
307 ($e:expr $(,)?) => {
308 match $e {
309 Ok(t) => t,
310 Err(x) => return std::task::Poll::Ready(Err(x)),
311 }
312 };
313}
314
315/// Unwraps an Option to Poll<T>: if Some returns right away.
316///
317/// Usage is similar to `future_lite::ready!`
318#[allow(unused)]
319macro_rules! poll_some {
320 ($e:expr $(,)?) => {
321 match $e {
322 Some(t) => return std::task::Poll::Ready(t),
323 None => {}
324 }
325 };
326}
327
328#[macro_export]
329/// Converts a Nix error into a native ErrorKind
330macro_rules! to_io_error {
331 ($e:expr) => {
332 match $e {
333 nix::errno::Errno::EACCES => io::Error::from(io::ErrorKind::PermissionDenied),
334 nix::errno::Errno::EADDRINUSE => io::Error::from(io::ErrorKind::AddrInUse),
335 nix::errno::Errno::EADDRNOTAVAIL => io::Error::from(io::ErrorKind::AddrNotAvailable),
336 nix::errno::Errno::EAGAIN => io::Error::from(io::ErrorKind::WouldBlock),
337 nix::errno::Errno::ECONNABORTED => io::Error::from(io::ErrorKind::ConnectionAborted),
338 nix::errno::Errno::ECONNREFUSED => io::Error::from(io::ErrorKind::ConnectionRefused),
339 nix::errno::Errno::ECONNRESET => io::Error::from(io::ErrorKind::ConnectionReset),
340 nix::errno::Errno::EINTR => io::Error::from(io::ErrorKind::Interrupted),
341 nix::errno::Errno::EINVAL => io::Error::from(io::ErrorKind::InvalidInput),
342 nix::errno::Errno::ENAMETOOLONG => io::Error::from(io::ErrorKind::InvalidInput),
343 nix::errno::Errno::ENOENT => io::Error::from(io::ErrorKind::NotFound),
344 nix::errno::Errno::ENOTCONN => io::Error::from(io::ErrorKind::NotConnected),
345 nix::errno::Errno::ENOTEMPTY => io::Error::from(io::ErrorKind::AlreadyExists),
346 nix::errno::Errno::EPERM => io::Error::from(io::ErrorKind::PermissionDenied),
347 nix::errno::Errno::ETIMEDOUT => io::Error::from(io::ErrorKind::TimedOut),
348 _ => io::Error::from(io::ErrorKind::Other),
349 }
350 };
351}
352
353#[cfg(test)]
354macro_rules! test_executor {
355 ($( $fut:expr ),+ ) => {
356 use futures::future::join_all;
357
358 let local_ex = crate::executor::LocalExecutorBuilder::new(crate::executor::Placement::Unbound)
359 .record_io_latencies(true)
360 .make()
361 .unwrap();
362 local_ex.run(async move {
363 let mut joins = Vec::new();
364 $(
365 joins.push(crate::spawn_local($fut));
366 )*
367 join_all(joins).await;
368 });
369 }
370}
371
372/// Wait for a variable to acquire a specific value.
373/// The variable is expected to be a Rc<RefCell>
374///
375/// Alternatively it is possible to pass a timeout in seconds
376/// (through an Instant object)
377///
378/// Updates to the variable gating the condition can be done (if convenient)
379/// through update_cond!() (below)
380///
381/// Mostly useful for tests.
382#[cfg(test)]
383macro_rules! wait_on_cond {
384 ($var:expr, $val:expr) => {
385 loop {
386 if *($var.borrow()) == $val {
387 break;
388 }
389 crate::executor().yield_task_queue_now().await;
390 }
391 };
392 ($var:expr, $val:expr, $instantval:expr) => {
393 let start = Instant::now();
394 loop {
395 if *($var.borrow()) == $val {
396 break;
397 }
398
399 if start.elapsed().as_secs() > $instantval {
400 panic!("test timed out");
401 }
402 crate::executor().yield_task_queue_now().await;
403 }
404 };
405}
406
407#[cfg(test)]
408macro_rules! update_cond {
409 ($cond:expr, $val:expr) => {
410 *($cond.borrow_mut()) = $val;
411 };
412}
413
414#[cfg(test)]
415macro_rules! make_shared_var {
416 ($var:expr, $( $name:ident ),+ ) => {
417 let local_name = Rc::new($var);
418 $( let $name = local_name.clone(); )*
419 }
420}
421
422#[cfg(test)]
423macro_rules! make_shared_var_mut {
424 ($var:expr, $( $name:ident ),+ ) => {
425 let local_name = Rc::new(RefCell::new($var));
426 $( let $name = local_name.clone(); )*
427 }
428}
429
430mod byte_slice_ext;
431pub mod channels;
432pub mod controllers;
433mod error;
434mod executor;
435pub mod io;
436pub mod net;
437mod shares;
438pub mod sync;
439pub mod timer;
440
441use crate::reactor::Reactor;
442pub use crate::{
443 byte_slice_ext::{ByteSliceExt, ByteSliceMutExt},
444 error::{
445 BuilderErrorKind, ExecutorErrorKind, GlommioError, QueueErrorKind, ReactorErrorKind,
446 ResourceType, Result,
447 },
448 executor::{
449 allocate_dma_buffer, allocate_dma_buffer_global, executor, spawn_local, spawn_local_into,
450 spawn_scoped_local, spawn_scoped_local_into,
451 stall::{DefaultStallDetectionHandler, StallDetectionHandler},
452 yield_if_needed, CpuSet, ExecutorJoinHandle, ExecutorProxy, ExecutorStats, LocalExecutor,
453 LocalExecutorBuilder, LocalExecutorPoolBuilder, Placement, PoolPlacement,
454 PoolThreadHandles, ScopedTask, Task, TaskQueueHandle, TaskQueueStats,
455 },
456 shares::{Shares, SharesManager},
457 sys::hardware_topology::CpuLocation,
458};
459pub use enclose::enclose;
460pub use scopeguard::defer;
461use sketches_ddsketch::DDSketch;
462use std::{
463 fmt::{Debug, Formatter},
464 iter::Sum,
465 time::Duration,
466};
467
468/// Provides common imports that almost all Glommio applications will need
469pub mod prelude {
470 #[doc(no_inline)]
471 pub use crate::{
472 error::GlommioError, executor, spawn_local, spawn_local_into, yield_if_needed,
473 ByteSliceExt, ByteSliceMutExt, ExecutorProxy, IoStats, Latency, LocalExecutor,
474 LocalExecutorBuilder, LocalExecutorPoolBuilder, Placement, PoolPlacement,
475 PoolThreadHandles, RingIoStats, Shares, TaskQueueHandle,
476 };
477}
478
479/// An attribute of a [`TaskQueue`], passed during its creation.
480///
481/// This tells the executor whether tasks in this class are latency
482/// sensitive. Latency sensitive tasks will be placed in their own I/O ring,
483/// and tasks in background classes can cooperatively preempt themselves in
484/// the faces of pending events for latency classes.
485///
486/// [`TaskQueue`]: struct.TaskQueueHandle.html
487#[derive(Clone, Copy, Debug)]
488pub enum Latency {
489 /// Tasks marked as `Latency::Matters` will cooperatively signal to other
490 /// tasks that they should preempt often. The `Duration` argument
491 /// contributes to the rate of preemption of the scheduler.
492 Matters(Duration),
493
494 /// Tasks marked as `Latency::NotImportant` will not signal to other tasks
495 /// that they should preempt often
496 NotImportant,
497}
498
499#[derive(Clone, Copy, Debug)]
500pub(crate) struct IoRequirements {
501 latency_req: Latency,
502 _io_handle: usize,
503}
504
505impl Default for IoRequirements {
506 fn default() -> Self {
507 Self {
508 latency_req: Latency::NotImportant,
509 _io_handle: 0,
510 }
511 }
512}
513
514impl IoRequirements {
515 fn new(latency: Latency, handle: usize) -> Self {
516 Self {
517 latency_req: latency,
518 _io_handle: handle,
519 }
520 }
521}
522
523/// Stores information about IO performed in a specific ring
524#[derive(Clone)]
525pub struct RingIoStats {
526 // Counters
527 pub(crate) files_opened: u64,
528 pub(crate) files_closed: u64,
529 pub(crate) file_reads: u64,
530 pub(crate) file_bytes_read: u64,
531 pub(crate) file_buffered_reads: u64,
532 pub(crate) file_buffered_bytes_read: u64,
533 pub(crate) file_deduped_reads: u64,
534 pub(crate) file_deduped_bytes_read: u64,
535 pub(crate) file_writes: u64,
536 pub(crate) file_bytes_written: u64,
537 pub(crate) file_buffered_writes: u64,
538 pub(crate) file_buffered_bytes_written: u64,
539
540 // Distributions
541 pub(crate) pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
542 pub(crate) io_latency_us: sketches_ddsketch::DDSketch,
543 pub(crate) post_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
544}
545
546impl Default for RingIoStats {
547 fn default() -> Self {
548 Self {
549 files_opened: 0,
550 files_closed: 0,
551 file_reads: 0,
552 file_bytes_read: 0,
553 file_buffered_reads: 0,
554 file_buffered_bytes_read: 0,
555 file_deduped_reads: 0,
556 file_deduped_bytes_read: 0,
557 file_writes: 0,
558 file_bytes_written: 0,
559 file_buffered_writes: 0,
560 file_buffered_bytes_written: 0,
561 pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch::new(
562 sketches_ddsketch::Config::new(0.01, 2048, 1.0e-9),
563 ),
564 io_latency_us: sketches_ddsketch::DDSketch::new(sketches_ddsketch::Config::new(
565 0.01, 2048, 1.0e-9,
566 )),
567 post_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch::new(
568 sketches_ddsketch::Config::new(0.01, 2048, 1.0e-9),
569 ),
570 }
571 }
572}
573
574impl Debug for RingIoStats {
575 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
576 f.debug_struct("RingIoStats")
577 .field("files_opened", &self.files_opened)
578 .field("files_closed", &self.files_closed)
579 .field("file_reads", &self.file_reads)
580 .field("file_bytes_read", &self.file_bytes_read)
581 .field("file_buffered_reads", &self.file_buffered_reads)
582 .field("file_buffered_bytes_read", &self.file_buffered_bytes_read)
583 .field("file_deduped_reads", &self.file_deduped_reads)
584 .field("file_deduped_bytes_read", &self.file_deduped_bytes_read)
585 .field("file_writes", &self.file_writes)
586 .field("file_bytes_written", &self.file_bytes_written)
587 .field("file_buffered_writes", &self.file_buffered_writes)
588 .field(
589 "file_buffered_bytes_written",
590 &self.file_buffered_bytes_written,
591 )
592 .finish_non_exhaustive()
593 }
594}
595
596impl RingIoStats {
597 /// The total amount of files opened in this executor so far.
598 ///
599 /// [`files_opened`] - [`files_closed`] gives the current open files count
600 ///
601 /// [`files_opened`]: RingIoStats::files_opened
602 /// [`files_closed`]: RingIoStats::files_closed
603 pub fn files_opened(&self) -> u64 {
604 self.files_opened
605 }
606
607 /// The total amount of files closed in this executor so far.
608 ///
609 /// [`files_opened`] - [`files_closed`] gives the current open files count
610 ///
611 /// [`files_opened`]: RingIoStats::files_opened
612 /// [`files_closed`]: RingIoStats::files_closed
613 pub fn files_closed(&self) -> u64 {
614 self.files_closed
615 }
616
617 /// File read IO stats
618 ///
619 /// Returns the number of individual read ops as well as bytes read
620 pub fn file_reads(&self) -> (u64, u64) {
621 (self.file_reads, self.file_bytes_read)
622 }
623
624 /// File read IO stats (deduplicated)
625 ///
626 /// Returns the number of reads that fed from another preexisting buffer
627 pub fn file_deduped_reads(&self) -> (u64, u64) {
628 (self.file_deduped_reads, self.file_deduped_bytes_read)
629 }
630
631 /// Buffered file read IO stats
632 ///
633 /// Returns the number of individual buffered read ops as well as bytes read
634 pub fn file_buffered_reads(&self) -> (u64, u64) {
635 (self.file_buffered_reads, self.file_buffered_bytes_read)
636 }
637
638 /// File write IO stats
639 ///
640 /// Returns the number of individual write ops as well as bytes written
641 pub fn file_writes(&self) -> (u64, u64) {
642 (self.file_writes, self.file_bytes_written)
643 }
644
645 /// Buffered file write IO stats
646 ///
647 /// Returns the number of individual buffered write ops as well as bytes
648 /// written
649 pub fn file_buffered_writes(&self) -> (u64, u64) {
650 (self.file_buffered_writes, self.file_buffered_bytes_written)
651 }
652
653 /// The pre-reactor IO scheduler latency
654 ///
655 /// Returns a distribution of measures tracking the time between the moment
656 /// an IO operation was queued up and the moment it was submitted to the
657 /// kernel
658 pub fn pre_reactor_io_scheduler_latency_us(&self) -> &DDSketch {
659 &self.pre_reactor_io_scheduler_latency_us
660 }
661
662 /// The IO latency
663 ///
664 /// Returns a distribution of measures tracking the time sources spent in
665 /// the ring
666 pub fn io_latency_us(&self) -> &DDSketch {
667 &self.io_latency_us
668 }
669
670 /// The post-reactor IO scheduler latency
671 ///
672 /// Returns a distribution of measures tracking the time between the moment
673 /// an IO operation was marked as fulfilled by the reactor and when the
674 /// result was consumed by the application code.
675 pub fn post_reactor_io_scheduler_latency_us(&self) -> &DDSketch {
676 &self.post_reactor_io_scheduler_latency_us
677 }
678}
679
680impl<'a> Sum<&'a RingIoStats> for RingIoStats {
681 fn sum<I: Iterator<Item = &'a RingIoStats>>(iter: I) -> Self {
682 iter.fold(RingIoStats::default(), |mut a, b| {
683 a.files_opened += b.files_opened;
684 a.files_closed += b.files_closed;
685 a.file_reads += b.file_reads;
686 a.file_bytes_read += b.file_bytes_read;
687 a.file_buffered_reads += b.file_buffered_reads;
688 a.file_buffered_bytes_read += b.file_buffered_bytes_read;
689 a.file_deduped_reads += b.file_deduped_reads;
690 a.file_deduped_bytes_read += b.file_deduped_bytes_read;
691 a.file_writes += b.file_writes;
692 a.file_bytes_written += b.file_bytes_written;
693 a.file_buffered_writes += b.file_buffered_writes;
694 a.file_buffered_bytes_written += b.file_buffered_bytes_written;
695 a.pre_reactor_io_scheduler_latency_us
696 .merge(&b.pre_reactor_io_scheduler_latency_us)
697 .unwrap();
698 a.io_latency_us.merge(&b.io_latency_us).unwrap();
699 a.post_reactor_io_scheduler_latency_us
700 .merge(&b.post_reactor_io_scheduler_latency_us)
701 .unwrap();
702 a
703 })
704 }
705}
706
707/// Stores information about IO
708#[derive(Debug)]
709pub struct IoStats {
710 /// The IO stats of the main ring
711 pub main_ring: RingIoStats,
712 /// The IO stats of the latency ring
713 pub latency_ring: RingIoStats,
714 /// The IO stats of the poll ring
715 pub poll_ring: RingIoStats,
716}
717
718impl IoStats {
719 fn new(main_ring: RingIoStats, latency_ring: RingIoStats, poll_ring: RingIoStats) -> IoStats {
720 IoStats {
721 main_ring,
722 latency_ring,
723 poll_ring,
724 }
725 }
726
727 /// Combine stats from all rings
728 pub fn all_rings(&self) -> RingIoStats {
729 [&self.main_ring, &self.latency_ring, &self.poll_ring]
730 .iter()
731 .copied()
732 .sum()
733 }
734}
735
736#[cfg(test)]
737pub(crate) mod test_utils {
738 use super::*;
739 use nix::sys::statfs::*;
740 use std::path::{Path, PathBuf};
741 use tracing::{debug, error, info, trace, warn};
742 use tracing_subscriber::EnvFilter;
743
744 #[derive(Copy, Clone)]
745 pub(crate) enum TestDirectoryKind {
746 TempFs,
747 PollMedia,
748 NonPollMedia,
749 }
750
751 pub(crate) struct TestDirectory {
752 pub(crate) path: PathBuf,
753 pub(crate) kind: TestDirectoryKind,
754 }
755
756 impl Drop for TestDirectory {
757 fn drop(&mut self) {
758 let _ = std::fs::remove_dir_all(&self.path);
759 }
760 }
761
762 pub(crate) fn make_test_directories(test_name: &str) -> std::vec::Vec<TestDirectory> {
763 let mut vec = Vec::new();
764
765 // Glommio currently only supports NVMe-backed volumes formatted with XFS or
766 // EXT4. We therefore let the user decide what directory glommio should
767 // use to host the unit tests in. For more information regarding this
768 // limitation, see the README
769 match std::env::var("GLOMMIO_TEST_POLLIO_ROOTDIR") {
770 Err(_) => {
771 eprintln!(
772 "Glommio currently only supports NVMe-backed volumes formatted with XFS or \
773 EXT4. To run poll io-related tests, please set GLOMMIO_TEST_POLLIO_ROOTDIR \
774 to a NVMe-backed directory path in your environment.\nPoll io tests will not \
775 run."
776 );
777 }
778 Ok(path) => {
779 for p in path.split(',') {
780 vec.push(make_poll_test_directory(p, test_name));
781 }
782 }
783 };
784
785 vec.push(make_tmp_test_directory(test_name));
786 vec
787 }
788
789 pub(crate) fn make_poll_test_directory<P: AsRef<Path>>(
790 path: P,
791 test_name: &str,
792 ) -> TestDirectory {
793 let mut dir = path.as_ref().to_owned();
794 std::assert!(dir.exists());
795
796 dir.push(test_name);
797 let _ = std::fs::remove_dir_all(&dir);
798 std::fs::create_dir_all(&dir).unwrap();
799
800 TestDirectory {
801 path: dir,
802 kind: TestDirectoryKind::PollMedia,
803 }
804 }
805
806 pub(crate) fn make_tmp_test_directory(test_name: &str) -> TestDirectory {
807 let mut dir = std::env::temp_dir();
808 dir.push(test_name);
809 let _ = std::fs::remove_dir_all(&dir);
810 std::fs::create_dir_all(&dir).unwrap();
811 let buf = statfs(&dir).unwrap();
812 let fstype = buf.filesystem_type();
813
814 let kind = if (fstype.0 as u64) == (libc::TMPFS_MAGIC as u64) {
815 TestDirectoryKind::TempFs
816 } else {
817 TestDirectoryKind::NonPollMedia
818 };
819 TestDirectory { path: dir, kind }
820 }
821
822 #[test]
823 #[allow(unused_must_use)]
824 fn test_tracing_init() {
825 tracing_subscriber::fmt::fmt()
826 .with_env_filter(EnvFilter::from_env("GLOMMIO_TRACE"))
827 .try_init();
828
829 info!("Started tracing..");
830 debug!("Started tracing..");
831 warn!("Started tracing..");
832 trace!("Started tracing..");
833 error!("Started tracing..");
834 }
835}