#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![cfg_attr(doc, deny(rustdoc::broken_intra_doc_links))]
#![cfg_attr(feature = "native-tls", feature(thread_local))]
#[macro_use]
extern crate nix;
extern crate alloc;
#[macro_use]
extern crate lazy_static;
#[macro_use(defer)]
extern crate scopeguard;
macro_rules! wake {
($waker:expr $(,)?) => {
use log::error;
if let Err(x) = std::panic::catch_unwind(|| $waker.wake()) {
error!("Panic while calling waker! {:?}", x);
}
};
}
mod free_list;
#[allow(clippy::redundant_slicing)]
#[allow(dead_code)]
#[allow(clippy::upper_case_acronyms)]
mod iou;
mod parking;
mod reactor;
mod sys;
pub mod task;
#[allow(dead_code)]
#[allow(clippy::upper_case_acronyms)]
mod uring_sys;
#[cfg(feature = "bench")]
#[doc(hidden)]
pub mod nop;
macro_rules! poll_err {
($e:expr $(,)?) => {
match $e {
Ok(t) => t,
Err(x) => return std::task::Poll::Ready(Err(x)),
}
};
}
#[allow(unused)]
macro_rules! poll_some {
($e:expr $(,)?) => {
match $e {
Some(t) => return std::task::Poll::Ready(t),
None => {}
}
};
}
#[macro_export]
macro_rules! to_io_error {
($e:expr) => {
match $e {
nix::errno::Errno::EACCES => io::Error::from(io::ErrorKind::PermissionDenied),
nix::errno::Errno::EADDRINUSE => io::Error::from(io::ErrorKind::AddrInUse),
nix::errno::Errno::EADDRNOTAVAIL => io::Error::from(io::ErrorKind::AddrNotAvailable),
nix::errno::Errno::EAGAIN => io::Error::from(io::ErrorKind::WouldBlock),
nix::errno::Errno::ECONNABORTED => io::Error::from(io::ErrorKind::ConnectionAborted),
nix::errno::Errno::ECONNREFUSED => io::Error::from(io::ErrorKind::ConnectionRefused),
nix::errno::Errno::ECONNRESET => io::Error::from(io::ErrorKind::ConnectionReset),
nix::errno::Errno::EINTR => io::Error::from(io::ErrorKind::Interrupted),
nix::errno::Errno::EINVAL => io::Error::from(io::ErrorKind::InvalidInput),
nix::errno::Errno::ENAMETOOLONG => io::Error::from(io::ErrorKind::InvalidInput),
nix::errno::Errno::ENOENT => io::Error::from(io::ErrorKind::NotFound),
nix::errno::Errno::ENOTCONN => io::Error::from(io::ErrorKind::NotConnected),
nix::errno::Errno::ENOTEMPTY => io::Error::from(io::ErrorKind::AlreadyExists),
nix::errno::Errno::EPERM => io::Error::from(io::ErrorKind::PermissionDenied),
nix::errno::Errno::ETIMEDOUT => io::Error::from(io::ErrorKind::TimedOut),
_ => io::Error::from(io::ErrorKind::Other),
}
};
}
#[cfg(test)]
macro_rules! test_executor {
($( $fut:expr ),+ ) => {
use futures::future::join_all;
let local_ex = crate::executor::LocalExecutorBuilder::new(crate::executor::Placement::Unbound)
.record_io_latencies(true)
.make()
.unwrap();
local_ex.run(async move {
let mut joins = Vec::new();
$(
joins.push(crate::spawn_local($fut));
)*
join_all(joins).await;
});
}
}
#[cfg(test)]
macro_rules! wait_on_cond {
($var:expr, $val:expr) => {
loop {
if *($var.borrow()) == $val {
break;
}
crate::executor().yield_task_queue_now().await;
}
};
($var:expr, $val:expr, $instantval:expr) => {
let start = Instant::now();
loop {
if *($var.borrow()) == $val {
break;
}
if start.elapsed().as_secs() > $instantval {
panic!("test timed out");
}
crate::executor().yield_task_queue_now().await;
}
};
}
#[cfg(test)]
macro_rules! update_cond {
($cond:expr, $val:expr) => {
*($cond.borrow_mut()) = $val;
};
}
#[cfg(test)]
macro_rules! make_shared_var {
($var:expr, $( $name:ident ),+ ) => {
let local_name = Rc::new($var);
$( let $name = local_name.clone(); )*
}
}
#[cfg(test)]
macro_rules! make_shared_var_mut {
($var:expr, $( $name:ident ),+ ) => {
let local_name = Rc::new(RefCell::new($var));
$( let $name = local_name.clone(); )*
}
}
mod byte_slice_ext;
pub mod channels;
pub mod controllers;
mod error;
mod executor;
pub mod io;
pub mod net;
mod shares;
pub mod sync;
pub mod timer;
use crate::reactor::Reactor;
pub use crate::{
byte_slice_ext::{ByteSliceExt, ByteSliceMutExt},
error::{
BuilderErrorKind,
ExecutorErrorKind,
GlommioError,
QueueErrorKind,
ReactorErrorKind,
ResourceType,
Result,
},
executor::{
allocate_dma_buffer,
allocate_dma_buffer_global,
executor,
spawn_local,
spawn_local_into,
spawn_scoped_local,
spawn_scoped_local_into,
stall::{DefaultStallDetectionHandler, StallDetectionHandler},
yield_if_needed,
CpuSet,
ExecutorJoinHandle,
ExecutorProxy,
ExecutorStats,
LocalExecutor,
LocalExecutorBuilder,
LocalExecutorPoolBuilder,
Placement,
PoolPlacement,
PoolThreadHandles,
ScopedTask,
Task,
TaskQueueHandle,
TaskQueueStats,
},
shares::{Shares, SharesManager},
sys::hardware_topology::CpuLocation,
};
pub use enclose::enclose;
pub use scopeguard::defer;
use sketches_ddsketch::DDSketch;
use std::{
fmt::{Debug, Formatter},
iter::Sum,
time::Duration,
};
pub mod prelude {
#[doc(no_inline)]
pub use crate::{
error::GlommioError,
executor,
spawn_local,
spawn_local_into,
yield_if_needed,
ByteSliceExt,
ByteSliceMutExt,
ExecutorProxy,
IoStats,
Latency,
LocalExecutor,
LocalExecutorBuilder,
LocalExecutorPoolBuilder,
Placement,
PoolPlacement,
PoolThreadHandles,
RingIoStats,
Shares,
TaskQueueHandle,
};
}
#[derive(Clone, Copy, Debug)]
pub enum Latency {
Matters(Duration),
NotImportant,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct IoRequirements {
latency_req: Latency,
_io_handle: usize,
}
impl Default for IoRequirements {
fn default() -> Self {
Self {
latency_req: Latency::NotImportant,
_io_handle: 0,
}
}
}
impl IoRequirements {
fn new(latency: Latency, handle: usize) -> Self {
Self {
latency_req: latency,
_io_handle: handle,
}
}
}
#[derive(Clone)]
pub struct RingIoStats {
pub(crate) files_opened: u64,
pub(crate) files_closed: u64,
pub(crate) file_reads: u64,
pub(crate) file_bytes_read: u64,
pub(crate) file_buffered_reads: u64,
pub(crate) file_buffered_bytes_read: u64,
pub(crate) file_deduped_reads: u64,
pub(crate) file_deduped_bytes_read: u64,
pub(crate) file_writes: u64,
pub(crate) file_bytes_written: u64,
pub(crate) file_buffered_writes: u64,
pub(crate) file_buffered_bytes_written: u64,
pub(crate) pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
pub(crate) io_latency_us: sketches_ddsketch::DDSketch,
pub(crate) post_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch,
}
impl Default for RingIoStats {
fn default() -> Self {
Self {
files_opened: 0,
files_closed: 0,
file_reads: 0,
file_bytes_read: 0,
file_buffered_reads: 0,
file_buffered_bytes_read: 0,
file_deduped_reads: 0,
file_deduped_bytes_read: 0,
file_writes: 0,
file_bytes_written: 0,
file_buffered_writes: 0,
file_buffered_bytes_written: 0,
pre_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch::new(
sketches_ddsketch::Config::new(0.01, 2048, 1.0e-9),
),
io_latency_us: sketches_ddsketch::DDSketch::new(sketches_ddsketch::Config::new(
0.01, 2048, 1.0e-9,
)),
post_reactor_io_scheduler_latency_us: sketches_ddsketch::DDSketch::new(
sketches_ddsketch::Config::new(0.01, 2048, 1.0e-9),
),
}
}
}
impl Debug for RingIoStats {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RingIoStats")
.field("files_opened", &self.files_opened)
.field("files_closed", &self.files_closed)
.field("file_reads", &self.file_reads)
.field("file_bytes_read", &self.file_bytes_read)
.field("file_buffered_reads", &self.file_buffered_reads)
.field("file_buffered_bytes_read", &self.file_buffered_bytes_read)
.field("file_deduped_reads", &self.file_deduped_reads)
.field("file_deduped_bytes_read", &self.file_deduped_bytes_read)
.field("file_writes", &self.file_writes)
.field("file_bytes_written", &self.file_bytes_written)
.field("file_buffered_writes", &self.file_buffered_writes)
.field(
"file_buffered_bytes_written",
&self.file_buffered_bytes_written,
)
.finish_non_exhaustive()
}
}
impl RingIoStats {
pub fn files_opened(&self) -> u64 {
self.files_opened
}
pub fn files_closed(&self) -> u64 {
self.files_opened
}
pub fn file_reads(&self) -> (u64, u64) {
(self.file_reads, self.file_bytes_read)
}
pub fn file_deduped_reads(&self) -> (u64, u64) {
(self.file_deduped_reads, self.file_deduped_bytes_read)
}
pub fn file_buffered_reads(&self) -> (u64, u64) {
(self.file_buffered_reads, self.file_buffered_bytes_read)
}
pub fn file_writes(&self) -> (u64, u64) {
(self.file_writes, self.file_bytes_written)
}
pub fn file_buffered_writes(&self) -> (u64, u64) {
(self.file_buffered_writes, self.file_buffered_bytes_written)
}
pub fn pre_reactor_io_scheduler_latency_us(&self) -> &DDSketch {
&self.pre_reactor_io_scheduler_latency_us
}
pub fn io_latency_us(&self) -> &DDSketch {
&self.io_latency_us
}
pub fn post_reactor_io_scheduler_latency_us(&self) -> &DDSketch {
&self.post_reactor_io_scheduler_latency_us
}
}
impl<'a> Sum<&'a RingIoStats> for RingIoStats {
fn sum<I: Iterator<Item = &'a RingIoStats>>(iter: I) -> Self {
iter.fold(RingIoStats::default(), |mut a, b| {
a.files_opened += b.files_opened;
a.files_closed += b.files_closed;
a.file_reads += b.file_reads;
a.file_bytes_read += b.file_bytes_read;
a.file_buffered_reads += b.file_buffered_reads;
a.file_buffered_bytes_read += b.file_buffered_bytes_read;
a.file_deduped_reads += b.file_deduped_reads;
a.file_deduped_bytes_read += b.file_deduped_bytes_read;
a.file_writes += b.file_writes;
a.file_bytes_written += b.file_bytes_written;
a.file_buffered_writes += b.file_buffered_writes;
a.file_buffered_bytes_written += b.file_buffered_bytes_written;
a.pre_reactor_io_scheduler_latency_us
.merge(&b.pre_reactor_io_scheduler_latency_us)
.unwrap();
a.io_latency_us.merge(&b.io_latency_us).unwrap();
a.post_reactor_io_scheduler_latency_us
.merge(&b.post_reactor_io_scheduler_latency_us)
.unwrap();
a
})
}
}
#[derive(Debug)]
pub struct IoStats {
pub main_ring: RingIoStats,
pub latency_ring: RingIoStats,
pub poll_ring: RingIoStats,
}
impl IoStats {
fn new(main_ring: RingIoStats, latency_ring: RingIoStats, poll_ring: RingIoStats) -> IoStats {
IoStats {
main_ring,
latency_ring,
poll_ring,
}
}
pub fn all_rings(&self) -> RingIoStats {
[&self.main_ring, &self.latency_ring, &self.poll_ring]
.iter()
.copied()
.sum()
}
}
#[cfg(test)]
pub(crate) mod test_utils {
use super::*;
use nix::sys::statfs::*;
use std::path::{Path, PathBuf};
use tracing::{debug, error, info, trace, warn};
use tracing_subscriber::EnvFilter;
#[derive(Copy, Clone)]
pub(crate) enum TestDirectoryKind {
TempFs,
PollMedia,
NonPollMedia,
}
pub(crate) struct TestDirectory {
pub(crate) path: PathBuf,
pub(crate) kind: TestDirectoryKind,
}
impl Drop for TestDirectory {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
pub(crate) fn make_test_directories(test_name: &str) -> std::vec::Vec<TestDirectory> {
let mut vec = Vec::new();
match std::env::var("GLOMMIO_TEST_POLLIO_ROOTDIR") {
Err(_) => {
eprintln!(
"Glommio currently only supports NVMe-backed volumes formatted with XFS or \
EXT4. To run poll io-related tests, please set GLOMMIO_TEST_POLLIO_ROOTDIR \
to a NVMe-backed directory path in your environment.\nPoll io tests will not \
run."
);
}
Ok(path) => {
for p in path.split(',') {
vec.push(make_poll_test_directory(p, test_name));
}
}
};
vec.push(make_tmp_test_directory(test_name));
vec
}
pub(crate) fn make_poll_test_directory<P: AsRef<Path>>(
path: P,
test_name: &str,
) -> TestDirectory {
let mut dir = path.as_ref().to_owned();
std::assert!(dir.exists());
dir.push(test_name);
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
TestDirectory {
path: dir,
kind: TestDirectoryKind::PollMedia,
}
}
pub(crate) fn make_tmp_test_directory(test_name: &str) -> TestDirectory {
let mut dir = std::env::temp_dir();
dir.push(test_name);
let _ = std::fs::remove_dir_all(&dir);
std::fs::create_dir_all(&dir).unwrap();
let buf = statfs(&dir).unwrap();
let fstype = buf.filesystem_type();
let kind = if fstype == TMPFS_MAGIC {
TestDirectoryKind::TempFs
} else {
TestDirectoryKind::NonPollMedia
};
TestDirectory { path: dir, kind }
}
#[test]
#[allow(unused_must_use)]
fn test_tracing_init() {
tracing_subscriber::fmt::fmt()
.with_env_filter(EnvFilter::from_env("GLOMMIO_TRACE"))
.try_init();
info!("Started tracing..");
debug!("Started tracing..");
warn!("Started tracing..");
trace!("Started tracing..");
error!("Started tracing..");
}
}