use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestKind {
Frontier,
GapFill,
}
#[derive(Debug, Clone)]
pub struct RequestStats {
pub from_block: u64,
pub requested_end: u64,
pub next_block: u64,
pub requested_blocks: u64,
pub actual_blocks: u64,
pub projected_blocks: u64,
pub response_bytes: u64,
pub target_bytes: u64,
pub size_ratio: f64,
pub bytes_per_block: f64,
pub truncated: bool,
pub kind: RequestKind,
pub duration: Duration,
}
pub const NUM_SIZE_BUCKETS: usize = 6;
pub const SIZE_BUCKET_LABELS: [&str; NUM_SIZE_BUCKETS] = [
"<0.25", "0.25-0.5", "0.5-0.75", "0.75-1.0", "1.0-1.25", ">1.25",
];
fn size_bucket(ratio: f64) -> usize {
if ratio < 0.25 {
0
} else if ratio < 0.5 {
1
} else if ratio < 0.75 {
2
} else if ratio < 1.0 {
3
} else if ratio < 1.25 {
4
} else {
5
}
}
pub trait StreamObserver: Send + Sync {
fn on_request(&self, stats: &RequestStats);
fn on_progress(&self, _in_flight: u64, _buffered_bytes: u64) {}
fn on_finish(&self, _summary: &StreamSummary) {}
}
#[derive(Debug, Clone)]
pub struct StreamSummary {
pub num_requests: u64,
pub num_truncated: u64,
pub truncation_rate: f64,
pub total_bytes: u64,
pub total_blocks: u64,
pub wall_clock: Duration,
pub blocks_per_sec: f64,
pub bytes_per_sec: f64,
pub mean_size_ratio: f64,
pub size_histogram: [u64; NUM_SIZE_BUCKETS],
pub mean_bytes_per_block: f64,
pub min_blocks: u64,
pub mean_blocks: f64,
pub max_blocks: u64,
pub max_buffered_bytes_observed: u64,
pub mean_in_flight: f64,
pub num_frontier: u64,
pub num_gap_fill: u64,
}
#[derive(Debug)]
pub struct StreamMetrics {
num_requests: AtomicU64,
num_truncated: AtomicU64,
total_bytes: AtomicU64,
total_blocks: AtomicU64,
size_ratio_micros: AtomicU64,
size_histogram: [AtomicU64; NUM_SIZE_BUCKETS],
min_blocks: AtomicU64,
max_blocks: AtomicU64,
max_buffered_bytes_observed: AtomicU64,
in_flight_sum: AtomicU64,
in_flight_samples: AtomicU64,
num_frontier: AtomicU64,
num_gap_fill: AtomicU64,
created: Instant,
elapsed_nanos: AtomicU64,
}
impl Default for StreamMetrics {
fn default() -> Self {
Self {
num_requests: AtomicU64::new(0),
num_truncated: AtomicU64::new(0),
total_bytes: AtomicU64::new(0),
total_blocks: AtomicU64::new(0),
size_ratio_micros: AtomicU64::new(0),
size_histogram: Default::default(),
min_blocks: AtomicU64::new(u64::MAX),
max_blocks: AtomicU64::new(0),
max_buffered_bytes_observed: AtomicU64::new(0),
in_flight_sum: AtomicU64::new(0),
in_flight_samples: AtomicU64::new(0),
num_frontier: AtomicU64::new(0),
num_gap_fill: AtomicU64::new(0),
created: Instant::now(),
elapsed_nanos: AtomicU64::new(0),
}
}
}
impl StreamMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn record_buffered_bytes(&self, bytes: u64) {
self.max_buffered_bytes_observed
.fetch_max(bytes, Ordering::Relaxed);
}
pub fn record_in_flight(&self, count: u64) {
self.in_flight_sum.fetch_add(count, Ordering::Relaxed);
self.in_flight_samples.fetch_add(1, Ordering::Relaxed);
}
pub fn record_elapsed(&self, elapsed: Duration) {
self.elapsed_nanos
.fetch_max(elapsed.as_nanos() as u64, Ordering::Relaxed);
}
pub fn summary(&self) -> StreamSummary {
let num_requests = self.num_requests.load(Ordering::Relaxed);
let num_truncated = self.num_truncated.load(Ordering::Relaxed);
let total_bytes = self.total_bytes.load(Ordering::Relaxed);
let total_blocks = self.total_blocks.load(Ordering::Relaxed);
let in_flight_samples = self.in_flight_samples.load(Ordering::Relaxed);
let in_flight_sum = self.in_flight_sum.load(Ordering::Relaxed);
let mut size_histogram = [0u64; NUM_SIZE_BUCKETS];
for (dst, src) in size_histogram.iter_mut().zip(self.size_histogram.iter()) {
*dst = src.load(Ordering::Relaxed);
}
let wall_clock = {
let fed = self.elapsed_nanos.load(Ordering::Relaxed);
if fed > 0 {
Duration::from_nanos(fed)
} else {
self.created.elapsed()
}
};
let secs = wall_clock.as_secs_f64();
let div = |num: f64, den: f64| if den > 0.0 { num / den } else { 0.0 };
StreamSummary {
num_requests,
num_truncated,
truncation_rate: div(num_truncated as f64, num_requests as f64),
total_bytes,
total_blocks,
wall_clock,
blocks_per_sec: div(total_blocks as f64, secs),
bytes_per_sec: div(total_bytes as f64, secs),
mean_size_ratio: div(
self.size_ratio_micros.load(Ordering::Relaxed) as f64 / 1_000_000.0,
num_requests as f64,
),
size_histogram,
mean_bytes_per_block: div(total_bytes as f64, total_blocks as f64),
min_blocks: {
let m = self.min_blocks.load(Ordering::Relaxed);
if m == u64::MAX {
0
} else {
m
}
},
mean_blocks: div(total_blocks as f64, num_requests as f64),
max_blocks: self.max_blocks.load(Ordering::Relaxed),
max_buffered_bytes_observed: self.max_buffered_bytes_observed.load(Ordering::Relaxed),
mean_in_flight: div(in_flight_sum as f64, in_flight_samples as f64),
num_frontier: self.num_frontier.load(Ordering::Relaxed),
num_gap_fill: self.num_gap_fill.load(Ordering::Relaxed),
}
}
}
impl StreamObserver for StreamMetrics {
fn on_request(&self, stats: &RequestStats) {
self.num_requests.fetch_add(1, Ordering::Relaxed);
if stats.truncated {
self.num_truncated.fetch_add(1, Ordering::Relaxed);
}
self.total_bytes
.fetch_add(stats.response_bytes, Ordering::Relaxed);
self.total_blocks
.fetch_add(stats.actual_blocks, Ordering::Relaxed);
self.size_ratio_micros.fetch_add(
(stats.size_ratio * 1_000_000.0).round() as u64,
Ordering::Relaxed,
);
self.size_histogram[size_bucket(stats.size_ratio)].fetch_add(1, Ordering::Relaxed);
self.min_blocks
.fetch_min(stats.actual_blocks, Ordering::Relaxed);
self.max_blocks
.fetch_max(stats.actual_blocks, Ordering::Relaxed);
match stats.kind {
RequestKind::Frontier => self.num_frontier.fetch_add(1, Ordering::Relaxed),
RequestKind::GapFill => self.num_gap_fill.fetch_add(1, Ordering::Relaxed),
};
}
fn on_progress(&self, in_flight: u64, buffered_bytes: u64) {
self.record_in_flight(in_flight);
self.record_buffered_bytes(buffered_bytes);
}
}
#[cfg(test)]
mod tests {
use super::*;
fn stats(
actual_blocks: u64,
response_bytes: u64,
target: u64,
truncated: bool,
) -> RequestStats {
let size_ratio = response_bytes as f64 / target as f64;
RequestStats {
from_block: 0,
requested_end: actual_blocks,
next_block: actual_blocks,
requested_blocks: actual_blocks,
actual_blocks,
projected_blocks: actual_blocks,
response_bytes,
target_bytes: target,
size_ratio,
bytes_per_block: response_bytes as f64 / actual_blocks as f64,
truncated,
kind: RequestKind::Frontier,
duration: Duration::from_millis(10),
}
}
#[test]
fn size_bucket_boundaries() {
assert_eq!(size_bucket(0.0), 0);
assert_eq!(size_bucket(0.24), 0);
assert_eq!(size_bucket(0.25), 1);
assert_eq!(size_bucket(0.49), 1);
assert_eq!(size_bucket(0.5), 2);
assert_eq!(size_bucket(0.74), 2);
assert_eq!(size_bucket(0.75), 3);
assert_eq!(size_bucket(0.99), 3);
assert_eq!(size_bucket(1.0), 4);
assert_eq!(size_bucket(1.24), 4);
assert_eq!(size_bucket(1.25), 5);
assert_eq!(size_bucket(10.0), 5);
}
#[test]
fn empty_summary_is_zeroed() {
let m = StreamMetrics::new();
let s = m.summary();
assert_eq!(s.num_requests, 0);
assert_eq!(s.truncation_rate, 0.0);
assert_eq!(s.min_blocks, 0);
assert_eq!(s.max_blocks, 0);
assert_eq!(s.mean_in_flight, 0.0);
assert_eq!(s.blocks_per_sec, 0.0);
}
#[test]
fn aggregates_requests() {
let m = StreamMetrics::new();
m.on_request(&stats(100, 200_000, 400_000, true)); m.on_request(&stats(200, 400_000, 400_000, false)); m.on_request(&stats(50, 600_000, 400_000, false));
let s = m.summary();
assert_eq!(s.num_requests, 3);
assert_eq!(s.num_truncated, 1);
assert!((s.truncation_rate - 1.0 / 3.0).abs() < 1e-9);
assert_eq!(s.total_bytes, 1_200_000);
assert_eq!(s.total_blocks, 350);
assert_eq!(s.min_blocks, 50);
assert_eq!(s.max_blocks, 200);
assert!((s.mean_blocks - 350.0 / 3.0).abs() < 1e-9);
assert!((s.mean_size_ratio - 1.0).abs() < 1e-9); assert_eq!(s.size_histogram[2], 1);
assert_eq!(s.size_histogram[4], 1);
assert_eq!(s.size_histogram[5], 1);
assert!((s.mean_bytes_per_block - 1_200_000.0 / 350.0).abs() < 1e-6);
assert_eq!(s.num_frontier, 3);
assert_eq!(s.num_gap_fill, 0);
}
#[test]
fn in_flight_and_buffer_tracking() {
let m = StreamMetrics::new();
m.record_in_flight(2);
m.record_in_flight(4);
m.record_buffered_bytes(1000);
m.record_buffered_bytes(500);
let s = m.summary();
assert_eq!(s.mean_in_flight, 3.0);
assert_eq!(s.max_buffered_bytes_observed, 1000);
}
#[test]
fn on_progress_feeds_in_flight_and_buffer() {
let m = StreamMetrics::new();
m.on_progress(1, 100);
m.on_progress(5, 50);
let s = m.summary();
assert_eq!(s.mean_in_flight, 3.0);
assert_eq!(s.max_buffered_bytes_observed, 100);
}
#[test]
fn throughput_from_elapsed() {
let m = StreamMetrics::new();
m.on_request(&stats(1000, 400_000, 400_000, false));
m.record_elapsed(Duration::from_secs(2));
let s = m.summary();
assert_eq!(s.blocks_per_sec, 500.0);
assert_eq!(s.bytes_per_sec, 200_000.0);
}
#[test]
fn gap_fill_counted() {
let m = StreamMetrics::new();
let mut st = stats(100, 100_000, 400_000, false);
st.kind = RequestKind::GapFill;
m.on_request(&st);
let s = m.summary();
assert_eq!(s.num_gap_fill, 1);
assert_eq!(s.num_frontier, 0);
}
}