#![allow(dead_code)]
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Default)]
pub struct LatencyBucket {
pub sub_us: u64,
pub us_1_10: u64,
pub us_10_100: u64,
pub us_100_1000: u64,
pub ms_1_10: u64,
pub ms_10_plus: u64,
}
impl LatencyBucket {
pub fn observe(&mut self, d: Duration) {
let nanos = d.as_nanos();
if nanos < 1_000 {
self.sub_us = self.sub_us.saturating_add(1);
} else if nanos < 10_000 {
self.us_1_10 = self.us_1_10.saturating_add(1);
} else if nanos < 100_000 {
self.us_10_100 = self.us_10_100.saturating_add(1);
} else if nanos < 1_000_000 {
self.us_100_1000 = self.us_100_1000.saturating_add(1);
} else if nanos < 10_000_000 {
self.ms_1_10 = self.ms_1_10.saturating_add(1);
} else {
self.ms_10_plus = self.ms_10_plus.saturating_add(1);
}
}
#[must_use]
pub fn total(&self) -> u64 {
self.sub_us
.saturating_add(self.us_1_10)
.saturating_add(self.us_10_100)
.saturating_add(self.us_100_1000)
.saturating_add(self.ms_1_10)
.saturating_add(self.ms_10_plus)
}
}
#[derive(Debug, Clone, Default)]
pub struct IoMetrics {
pub read_bytes: u64,
pub read_ops: u64,
pub read_errors: u64,
pub read_latency: LatencyBucket,
pub write_bytes: u64,
pub write_ops: u64,
pub write_errors: u64,
pub write_latency: LatencyBucket,
pub seek_ops: u64,
pub seek_distance: u64,
pub seek_errors: u64,
pub first_op_ns: Option<u64>,
}
impl IoMetrics {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn record_read(&mut self, bytes: u64, success: bool) {
self.touch();
self.read_ops = self.read_ops.saturating_add(1);
if success {
self.read_bytes = self.read_bytes.saturating_add(bytes);
} else {
self.read_errors = self.read_errors.saturating_add(1);
}
}
pub fn record_read_timed(&mut self, bytes: u64, success: bool, latency: Duration) {
self.record_read(bytes, success);
self.read_latency.observe(latency);
}
pub fn record_write(&mut self, bytes: u64, success: bool) {
self.touch();
self.write_ops = self.write_ops.saturating_add(1);
if success {
self.write_bytes = self.write_bytes.saturating_add(bytes);
} else {
self.write_errors = self.write_errors.saturating_add(1);
}
}
pub fn record_write_timed(&mut self, bytes: u64, success: bool, latency: Duration) {
self.record_write(bytes, success);
self.write_latency.observe(latency);
}
pub fn record_seek(&mut self, distance: u64, success: bool) {
self.touch();
self.seek_ops = self.seek_ops.saturating_add(1);
if success {
self.seek_distance = self.seek_distance.saturating_add(distance);
} else {
self.seek_errors = self.seek_errors.saturating_add(1);
}
}
#[must_use]
pub fn total_errors(&self) -> u64 {
self.read_errors
.saturating_add(self.write_errors)
.saturating_add(self.seek_errors)
}
#[must_use]
pub fn total_bytes(&self) -> u64 {
self.read_bytes.saturating_add(self.write_bytes)
}
pub fn merge(&mut self, other: &IoMetrics) {
self.read_bytes = self.read_bytes.saturating_add(other.read_bytes);
self.read_ops = self.read_ops.saturating_add(other.read_ops);
self.read_errors = self.read_errors.saturating_add(other.read_errors);
self.write_bytes = self.write_bytes.saturating_add(other.write_bytes);
self.write_ops = self.write_ops.saturating_add(other.write_ops);
self.write_errors = self.write_errors.saturating_add(other.write_errors);
self.seek_ops = self.seek_ops.saturating_add(other.seek_ops);
self.seek_distance = self.seek_distance.saturating_add(other.seek_distance);
self.seek_errors = self.seek_errors.saturating_add(other.seek_errors);
if self.first_op_ns.is_none() {
self.first_op_ns = other.first_op_ns;
}
}
fn touch(&mut self) {
if self.first_op_ns.is_none() {
let epoch_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
self.first_op_ns = Some(epoch_ns);
}
}
}
pub struct IoTimer {
start: Instant,
}
impl IoTimer {
#[must_use]
pub fn start() -> Self {
Self { start: Instant::now() }
}
#[must_use]
pub fn elapsed(&self) -> Duration {
self.start.elapsed()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_record_read() {
let mut m = IoMetrics::new();
m.record_read(1024, true);
assert_eq!(m.read_bytes, 1024);
assert_eq!(m.read_ops, 1);
assert_eq!(m.read_errors, 0);
}
#[test]
fn test_record_read_failure() {
let mut m = IoMetrics::new();
m.record_read(0, false);
assert_eq!(m.read_bytes, 0);
assert_eq!(m.read_errors, 1);
}
#[test]
fn test_total_bytes() {
let mut m = IoMetrics::new();
m.record_read(100, true);
m.record_write(200, true);
assert_eq!(m.total_bytes(), 300);
}
#[test]
fn test_total_errors() {
let mut m = IoMetrics::new();
m.record_read(0, false);
m.record_write(0, false);
m.record_seek(0, false);
assert_eq!(m.total_errors(), 3);
}
#[test]
fn test_merge() {
let mut a = IoMetrics::new();
let mut b = IoMetrics::new();
a.record_read(100, true);
b.record_read(200, true);
a.merge(&b);
assert_eq!(a.read_bytes, 300);
}
#[test]
fn test_latency_bucket_observe() {
let mut bucket = LatencyBucket::default();
bucket.observe(Duration::from_nanos(500));
bucket.observe(Duration::from_nanos(5_000));
assert_eq!(bucket.sub_us, 1);
assert_eq!(bucket.us_1_10, 1);
assert_eq!(bucket.total(), 2);
}
#[test]
fn test_io_timer() {
let timer = IoTimer::start();
std::thread::sleep(Duration::from_millis(1));
let elapsed = timer.elapsed();
assert!(elapsed >= Duration::from_millis(1));
}
}