use std::collections::VecDeque;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub struct LatencyHistogram {
write_buckets: Vec<u64>,
read_buckets: Vec<u64>,
flush_buckets: Vec<u64>,
checkpoint_buckets: Vec<u64>,
bucket_boundaries: Vec<u64>,
}
#[derive(Debug, Clone)]
pub struct ThroughputTracker {
records_per_second: VecDeque<(u64, u64)>,
bytes_per_second: VecDeque<(u64, u64)>,
transactions_per_second: VecDeque<(u64, u64)>,
time_window_seconds: usize,
max_samples: usize,
}
impl LatencyHistogram {
pub fn new() -> Self {
let bucket_boundaries = vec![1, 10, 50, 100, 500, 1000, 5000, 10000, 50000];
let bucket_count = bucket_boundaries.len() + 1;
Self {
write_buckets: vec![0; bucket_count],
read_buckets: vec![0; bucket_count],
flush_buckets: vec![0; bucket_count],
checkpoint_buckets: vec![0; bucket_count],
bucket_boundaries,
}
}
pub fn record_write_latency(&mut self, latency_us: u64) {
let bucket_index = self.get_bucket_index(latency_us);
self.write_buckets[bucket_index] += 1;
}
pub fn record_read_latency(&mut self, latency_us: u64) {
let bucket_index = self.get_bucket_index(latency_us);
self.read_buckets[bucket_index] += 1;
}
pub fn record_flush_latency(&mut self, latency_us: u64) {
let bucket_index = self.get_bucket_index(latency_us);
self.flush_buckets[bucket_index] += 1;
}
pub fn record_checkpoint_latency(&mut self, latency_us: u64) {
let bucket_index = self.get_bucket_index(latency_us);
self.checkpoint_buckets[bucket_index] += 1;
}
fn get_bucket_index(&self, latency_us: u64) -> usize {
for (i, &boundary) in self.bucket_boundaries.iter().enumerate() {
if latency_us <= boundary {
return i;
}
}
self.bucket_boundaries.len() }
pub fn reset(&mut self) {
for bucket in &mut self.write_buckets {
*bucket = 0;
}
for bucket in &mut self.read_buckets {
*bucket = 0;
}
for bucket in &mut self.flush_buckets {
*bucket = 0;
}
for bucket in &mut self.checkpoint_buckets {
*bucket = 0;
}
}
pub fn get_write_percentile(&self, percentile: f64) -> u64 {
self.get_percentile(&self.write_buckets, percentile)
}
pub fn get_read_percentile(&self, percentile: f64) -> u64 {
self.get_percentile(&self.read_buckets, percentile)
}
pub fn get_flush_percentile(&self, percentile: f64) -> u64 {
self.get_percentile(&self.flush_buckets, percentile)
}
pub fn get_checkpoint_percentile(&self, percentile: f64) -> u64 {
self.get_percentile(&self.checkpoint_buckets, percentile)
}
fn get_percentile(&self, buckets: &[u64], percentile: f64) -> u64 {
let total: u64 = buckets.iter().sum();
if total == 0 {
return 0;
}
let target = (total as f64 * percentile / 100.0) as u64;
let mut cumulative = 0;
#[allow(unused_assignments)]
let mut prev_cumulative = 0;
let mut prev_boundary = 0;
for (i, &count) in buckets.iter().enumerate() {
prev_cumulative = cumulative;
cumulative += count;
if cumulative >= target {
let bucket_start = prev_boundary;
let bucket_end = if i < self.bucket_boundaries.len() {
self.bucket_boundaries[i]
} else {
self.bucket_boundaries.last().copied().unwrap_or(0) * 2
};
if count == 0 {
return bucket_end;
}
let position_in_bucket = (target - prev_cumulative) as f64 / count as f64;
let interpolated_value =
bucket_start as f64 + (bucket_end - bucket_start) as f64 * position_in_bucket;
return interpolated_value as u64;
}
if i < self.bucket_boundaries.len() {
prev_boundary = self.bucket_boundaries[i];
}
}
self.bucket_boundaries.last().copied().unwrap_or(0) * 2
}
pub fn get_comprehensive_stats(&self) -> (u64, u64, u64, u64, u64, u64) {
(
self.get_write_percentile(50.0),
self.get_write_percentile(95.0),
self.get_write_percentile(99.0),
self.get_read_percentile(50.0),
self.get_read_percentile(95.0),
self.get_read_percentile(99.0),
)
}
}
impl ThroughputTracker {
pub fn new() -> Self {
Self {
records_per_second: VecDeque::new(),
bytes_per_second: VecDeque::new(),
transactions_per_second: VecDeque::new(),
time_window_seconds: 60, max_samples: 300, }
}
pub fn record_write_operation(&mut self, bytes: usize) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.records_per_second.push_back((now, 1));
self.bytes_per_second.push_back((now, bytes as u64));
self.cleanup_old_samples();
}
pub fn record_read_operation(&mut self, bytes: usize) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.records_per_second.push_back((now, 1));
self.bytes_per_second.push_back((now, bytes as u64));
self.cleanup_old_samples();
}
pub fn record_transaction(&mut self) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.transactions_per_second.push_back((now, 1));
self.cleanup_old_samples();
}
fn cleanup_old_samples(&mut self) {
let cutoff = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
.saturating_sub(self.time_window_seconds as u64);
while let Some((timestamp, _)) = self.records_per_second.front() {
if *timestamp < cutoff {
self.records_per_second.pop_front();
} else {
break;
}
}
while let Some((timestamp, _)) = self.bytes_per_second.front() {
if *timestamp < cutoff {
self.bytes_per_second.pop_front();
} else {
break;
}
}
while let Some((timestamp, _)) = self.transactions_per_second.front() {
if *timestamp < cutoff {
self.transactions_per_second.pop_front();
} else {
break;
}
}
while self.records_per_second.len() > self.max_samples {
self.records_per_second.pop_front();
}
while self.bytes_per_second.len() > self.max_samples {
self.bytes_per_second.pop_front();
}
while self.transactions_per_second.len() > self.max_samples {
self.transactions_per_second.pop_front();
}
}
pub fn get_current_throughput(&self) -> (f64, f64, f64) {
let records_per_sec = if self.records_per_second.is_empty() {
0.0
} else {
self.records_per_second
.iter()
.map(|(_, count)| *count)
.sum::<u64>() as f64
/ self.time_window_seconds as f64
};
let bytes_per_sec = if self.bytes_per_second.is_empty() {
0.0
} else {
self.bytes_per_second
.iter()
.map(|(_, bytes)| *bytes)
.sum::<u64>() as f64
/ self.time_window_seconds as f64
};
let tx_per_sec = if self.transactions_per_second.is_empty() {
0.0
} else {
self.transactions_per_second
.iter()
.map(|(_, count)| *count)
.sum::<u64>() as f64
/ self.time_window_seconds as f64
};
(records_per_sec, bytes_per_sec, tx_per_sec)
}
pub fn get_peak_throughput(&self) -> (f64, f64, f64) {
let peak_records_per_sec = self.records_per_second.len() as f64;
let peak_bytes_per_sec = self
.bytes_per_second
.iter()
.map(|(_, bytes)| *bytes)
.sum::<u64>() as f64;
let peak_tx_per_sec = self.transactions_per_second.len() as f64;
(peak_records_per_sec, peak_bytes_per_sec, peak_tx_per_sec)
}
pub fn reset(&mut self) {
self.records_per_second.clear();
self.bytes_per_second.clear();
self.transactions_per_second.clear();
}
}
impl Default for LatencyHistogram {
fn default() -> Self {
Self::new()
}
}
impl Default for ThroughputTracker {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_latency_histogram_new() {
let histogram = LatencyHistogram::new();
assert_eq!(histogram.write_buckets.len(), 10); assert_eq!(histogram.read_buckets.len(), 10);
assert_eq!(histogram.flush_buckets.len(), 10);
assert_eq!(histogram.checkpoint_buckets.len(), 10);
}
#[test]
fn test_latency_histogram_recording() {
let mut histogram = LatencyHistogram::new();
histogram.record_write_latency(5); histogram.record_write_latency(15); histogram.record_write_latency(5000);
let total_samples: u64 = histogram.write_buckets.iter().sum();
assert_eq!(total_samples, 3);
assert!(histogram.get_write_percentile(50.0) > 0);
}
#[test]
fn test_latency_histogram_percentiles() {
let mut histogram = LatencyHistogram::new();
for i in 0..100 {
histogram.record_write_latency((i + 1) * 100); }
let p50 = histogram.get_write_percentile(50.0);
let p95 = histogram.get_write_percentile(95.0);
let p99 = histogram.get_write_percentile(99.0);
assert!(p50 < p95);
assert!(p95 < p99);
}
#[test]
fn test_latency_histogram_reset() {
let mut histogram = LatencyHistogram::new();
histogram.record_write_latency(1000);
histogram.record_read_latency(500);
assert!(histogram.write_buckets.iter().sum::<u64>() > 0);
assert!(histogram.read_buckets.iter().sum::<u64>() > 0);
histogram.reset();
assert_eq!(histogram.write_buckets.iter().sum::<u64>(), 0);
assert_eq!(histogram.read_buckets.iter().sum::<u64>(), 0);
}
#[test]
fn test_throughput_tracker_new() {
let tracker = ThroughputTracker::new();
assert_eq!(tracker.time_window_seconds, 60);
assert_eq!(tracker.max_samples, 300);
assert!(tracker.records_per_second.is_empty());
assert!(tracker.bytes_per_second.is_empty());
assert!(tracker.transactions_per_second.is_empty());
}
#[test]
fn test_throughput_tracker_recording() {
let mut tracker = ThroughputTracker::new();
tracker.record_write_operation(100);
tracker.record_write_operation(200);
tracker.record_transaction();
let (records, bytes, tx) = tracker.get_current_throughput();
assert!(records > 0.0);
assert!(bytes > 0.0);
assert!(tx > 0.0);
}
#[test]
fn test_throughput_tracker_peak() {
let mut tracker = ThroughputTracker::new();
tracker.record_write_operation(1024);
tracker.record_transaction();
let (peak_records, peak_bytes, peak_tx) = tracker.get_peak_throughput();
assert!(peak_records >= 0.0);
assert!(peak_bytes >= 1024.0);
assert!(peak_tx >= 1.0);
}
#[test]
fn test_throughput_tracker_reset() {
let mut tracker = ThroughputTracker::new();
tracker.record_write_operation(100);
tracker.record_transaction();
assert!(!tracker.records_per_second.is_empty());
assert!(!tracker.transactions_per_second.is_empty());
tracker.reset();
assert!(tracker.records_per_second.is_empty());
assert!(tracker.bytes_per_second.is_empty());
assert!(tracker.transactions_per_second.is_empty());
}
#[test]
fn test_comprehensive_latency_stats() {
let mut histogram = LatencyHistogram::new();
for i in 1..=50 {
histogram.record_write_latency(i * 10); histogram.record_read_latency(i * 5); }
let (p50_write, p95_write, p99_write, p50_read, p95_read, p99_read) =
histogram.get_comprehensive_stats();
assert!(p50_write <= p95_write);
assert!(p95_write <= p99_write);
assert!(p50_read <= p95_read);
assert!(p95_read <= p99_read);
}
#[test]
fn test_bucket_index_calculation() {
let histogram = LatencyHistogram::new();
assert_eq!(histogram.get_bucket_index(0), 0); assert_eq!(histogram.get_bucket_index(1), 0); assert_eq!(histogram.get_bucket_index(10), 1); assert_eq!(histogram.get_bucket_index(1000), 5); assert_eq!(histogram.get_bucket_index(100000), 9); }
}