use std::collections::{HashMap, VecDeque};
use std::time::Duration;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LatencyMeasurement {
pub value_us: u64,
pub timestamp: u64,
}
impl LatencyMeasurement {
#[must_use]
pub fn new(value_us: u64, timestamp: u64) -> Self {
Self {
value_us,
timestamp,
}
}
#[must_use]
pub fn from_duration(duration: Duration, timestamp: u64) -> Self {
Self {
value_us: duration.as_micros() as u64,
timestamp,
}
}
#[must_use]
pub fn as_millis(&self) -> u64 {
self.value_us / 1000
}
#[must_use]
pub fn as_duration(&self) -> Duration {
Duration::from_micros(self.value_us)
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LatencyStats {
pub count: u64,
pub min_us: u64,
pub max_us: u64,
pub avg_us: u64,
pub p50_us: u64,
pub p95_us: u64,
pub p99_us: u64,
pub std_dev_us: u64,
}
impl LatencyStats {
#[must_use]
pub fn from_measurements(measurements: &[LatencyMeasurement]) -> Self {
if measurements.is_empty() {
return Self::default();
}
let mut values: Vec<u64> = measurements.iter().map(|m| m.value_us).collect();
values.sort_unstable();
let count = values.len() as u64;
let min_us = values[0];
let max_us = values[values.len() - 1];
let sum: u64 = values.iter().sum();
let avg_us = sum / count;
let variance: u64 = values
.iter()
.map(|&v| {
let diff = v.abs_diff(avg_us);
diff * diff
})
.sum::<u64>()
/ count;
let std_dev_us = integer_sqrt(variance);
let p50_us = percentile(&values, 50.0);
let p95_us = percentile(&values, 95.0);
let p99_us = percentile(&values, 99.0);
Self {
count,
min_us,
max_us,
avg_us,
p50_us,
p95_us,
p99_us,
std_dev_us,
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.count == 0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum LatencyMetric {
OrderToAck,
OrderToFill,
OrderToCancel,
MarketDataDelay,
RoundTrip,
}
impl LatencyMetric {
#[must_use]
pub fn all() -> &'static [LatencyMetric] {
&[
LatencyMetric::OrderToAck,
LatencyMetric::OrderToFill,
LatencyMetric::OrderToCancel,
LatencyMetric::MarketDataDelay,
LatencyMetric::RoundTrip,
]
}
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
LatencyMetric::OrderToAck => "order_to_ack",
LatencyMetric::OrderToFill => "order_to_fill",
LatencyMetric::OrderToCancel => "order_to_cancel",
LatencyMetric::MarketDataDelay => "market_data_delay",
LatencyMetric::RoundTrip => "round_trip",
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LatencyTrackerConfig {
pub window_size: usize,
pub keep_histogram: bool,
pub histogram_bucket_us: u64,
pub histogram_num_buckets: usize,
}
impl Default for LatencyTrackerConfig {
fn default() -> Self {
Self {
window_size: 1000,
keep_histogram: false,
histogram_bucket_us: 100,
histogram_num_buckets: 100,
}
}
}
impl LatencyTrackerConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_window_size(mut self, size: usize) -> Self {
self.window_size = size;
self
}
#[must_use]
pub fn with_histogram(mut self, bucket_us: u64, num_buckets: usize) -> Self {
self.keep_histogram = true;
self.histogram_bucket_us = bucket_us;
self.histogram_num_buckets = num_buckets;
self
}
#[must_use]
pub fn without_histogram(mut self) -> Self {
self.keep_histogram = false;
self
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Histogram {
buckets: Vec<u64>,
bucket_size_us: u64,
total_count: u64,
overflow_count: u64,
}
impl Histogram {
#[must_use]
pub fn new(bucket_size_us: u64, num_buckets: usize) -> Self {
Self {
buckets: vec![0; num_buckets],
bucket_size_us,
total_count: 0,
overflow_count: 0,
}
}
pub fn record(&mut self, value_us: u64) {
let bucket_idx = (value_us / self.bucket_size_us) as usize;
if bucket_idx < self.buckets.len() {
self.buckets[bucket_idx] += 1;
} else {
self.overflow_count += 1;
}
self.total_count += 1;
}
#[must_use]
pub fn percentile(&self, p: f64) -> u64 {
if self.total_count == 0 {
return 0;
}
let target = ((p / 100.0) * self.total_count as f64).ceil() as u64;
let mut cumulative = 0u64;
for (i, &count) in self.buckets.iter().enumerate() {
cumulative += count;
if cumulative >= target {
return (i as u64 + 1) * self.bucket_size_us;
}
}
self.buckets.len() as u64 * self.bucket_size_us
}
#[must_use]
pub fn get_buckets(&self) -> &[u64] {
&self.buckets
}
#[must_use]
pub fn total_count(&self) -> u64 {
self.total_count
}
#[must_use]
pub fn overflow_count(&self) -> u64 {
self.overflow_count
}
#[must_use]
pub fn bucket_size_us(&self) -> u64 {
self.bucket_size_us
}
#[must_use]
pub fn max_trackable_us(&self) -> u64 {
self.bucket_size_us * self.buckets.len() as u64
}
pub fn reset(&mut self) {
self.buckets.fill(0);
self.total_count = 0;
self.overflow_count = 0;
}
}
#[derive(Debug)]
pub struct LatencyTracker {
config: LatencyTrackerConfig,
measurements: HashMap<LatencyMetric, VecDeque<LatencyMeasurement>>,
histograms: Option<HashMap<LatencyMetric, Histogram>>,
}
impl LatencyTracker {
#[must_use]
pub fn new(config: LatencyTrackerConfig) -> Self {
let histograms = if config.keep_histogram {
let mut h = HashMap::new();
for metric in LatencyMetric::all() {
h.insert(
*metric,
Histogram::new(config.histogram_bucket_us, config.histogram_num_buckets),
);
}
Some(h)
} else {
None
};
Self {
config,
measurements: HashMap::new(),
histograms,
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(LatencyTrackerConfig::default())
}
#[must_use]
pub fn config(&self) -> &LatencyTrackerConfig {
&self.config
}
pub fn record(&mut self, metric: LatencyMetric, latency_us: u64, timestamp: u64) {
let measurement = LatencyMeasurement::new(latency_us, timestamp);
let measurements = self.measurements.entry(metric).or_default();
measurements.push_back(measurement);
while measurements.len() > self.config.window_size {
measurements.pop_front();
}
if let Some(ref mut histograms) = self.histograms
&& let Some(hist) = histograms.get_mut(&metric)
{
hist.record(latency_us);
}
}
pub fn record_duration(&mut self, metric: LatencyMetric, duration: Duration, timestamp: u64) {
self.record(metric, duration.as_micros() as u64, timestamp);
}
#[must_use]
pub fn get_stats(&self, metric: LatencyMetric) -> Option<LatencyStats> {
self.measurements.get(&metric).map(|m| {
let vec: Vec<LatencyMeasurement> = m.iter().copied().collect();
LatencyStats::from_measurements(&vec)
})
}
#[must_use]
pub fn get_all_stats(&self) -> HashMap<LatencyMetric, LatencyStats> {
self.measurements
.iter()
.map(|(metric, m)| {
let vec: Vec<LatencyMeasurement> = m.iter().copied().collect();
(*metric, LatencyStats::from_measurements(&vec))
})
.collect()
}
#[must_use]
pub fn get_recent(&self, metric: LatencyMetric, count: usize) -> Vec<&LatencyMeasurement> {
self.measurements
.get(&metric)
.map(|m| m.iter().rev().take(count).collect())
.unwrap_or_default()
}
#[must_use]
pub fn is_degraded(&self, metric: LatencyMetric, threshold_us: u64) -> bool {
self.get_stats(metric)
.map(|stats| stats.avg_us > threshold_us)
.unwrap_or(false)
}
#[must_use]
pub fn is_p99_degraded(&self, metric: LatencyMetric, threshold_us: u64) -> bool {
self.get_stats(metric)
.map(|stats| stats.p99_us > threshold_us)
.unwrap_or(false)
}
#[must_use]
pub fn get_histogram(&self, metric: LatencyMetric) -> Option<&Histogram> {
self.histograms.as_ref().and_then(|h| h.get(&metric))
}
#[must_use]
pub fn measurement_count(&self, metric: LatencyMetric) -> usize {
self.measurements.get(&metric).map(|m| m.len()).unwrap_or(0)
}
#[must_use]
pub fn total_measurement_count(&self) -> usize {
self.measurements.values().map(|m| m.len()).sum()
}
pub fn reset(&mut self) {
self.measurements.clear();
if let Some(ref mut histograms) = self.histograms {
for hist in histograms.values_mut() {
hist.reset();
}
}
}
pub fn reset_metric(&mut self, metric: LatencyMetric) {
self.measurements.remove(&metric);
if let Some(ref mut histograms) = self.histograms
&& let Some(hist) = histograms.get_mut(&metric)
{
hist.reset();
}
}
}
fn percentile(sorted_values: &[u64], p: f64) -> u64 {
if sorted_values.is_empty() {
return 0;
}
let idx = ((p / 100.0) * (sorted_values.len() - 1) as f64).round() as usize;
sorted_values[idx.min(sorted_values.len() - 1)]
}
fn integer_sqrt(n: u64) -> u64 {
if n == 0 {
return 0;
}
let mut x = n;
let mut y = x.div_ceil(2);
while y < x {
x = y;
y = (x + n / x) / 2;
}
x
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_latency_measurement_new() {
let m = LatencyMeasurement::new(500, 1000);
assert_eq!(m.value_us, 500);
assert_eq!(m.timestamp, 1000);
assert_eq!(m.as_millis(), 0);
}
#[test]
fn test_latency_measurement_from_duration() {
let m = LatencyMeasurement::from_duration(Duration::from_millis(5), 1000);
assert_eq!(m.value_us, 5000);
assert_eq!(m.as_millis(), 5);
}
#[test]
fn test_latency_measurement_as_duration() {
let m = LatencyMeasurement::new(5000, 1000);
assert_eq!(m.as_duration(), Duration::from_micros(5000));
}
#[test]
fn test_latency_stats_empty() {
let stats = LatencyStats::from_measurements(&[]);
assert!(stats.is_empty());
assert_eq!(stats.count, 0);
}
#[test]
fn test_latency_stats_single() {
let measurements = vec![LatencyMeasurement::new(500, 1000)];
let stats = LatencyStats::from_measurements(&measurements);
assert_eq!(stats.count, 1);
assert_eq!(stats.min_us, 500);
assert_eq!(stats.max_us, 500);
assert_eq!(stats.avg_us, 500);
assert_eq!(stats.p50_us, 500);
assert_eq!(stats.std_dev_us, 0);
}
#[test]
fn test_latency_stats_multiple() {
let measurements: Vec<LatencyMeasurement> = (1..=100)
.map(|i| LatencyMeasurement::new(i * 100, i))
.collect();
let stats = LatencyStats::from_measurements(&measurements);
assert_eq!(stats.count, 100);
assert_eq!(stats.min_us, 100);
assert_eq!(stats.max_us, 10000);
assert_eq!(stats.avg_us, 5050); assert!(stats.p50_us >= 4900 && stats.p50_us <= 5100);
assert!(stats.p95_us >= 9400 && stats.p95_us <= 9600);
assert!(stats.p99_us >= 9800 && stats.p99_us <= 10000);
}
#[test]
fn test_latency_metric_all() {
let all = LatencyMetric::all();
assert_eq!(all.len(), 5);
}
#[test]
fn test_latency_metric_as_str() {
assert_eq!(LatencyMetric::OrderToAck.as_str(), "order_to_ack");
assert_eq!(LatencyMetric::OrderToFill.as_str(), "order_to_fill");
assert_eq!(LatencyMetric::MarketDataDelay.as_str(), "market_data_delay");
}
#[test]
fn test_latency_tracker_config_default() {
let config = LatencyTrackerConfig::default();
assert_eq!(config.window_size, 1000);
assert!(!config.keep_histogram);
}
#[test]
fn test_latency_tracker_config_builder() {
let config = LatencyTrackerConfig::new()
.with_window_size(500)
.with_histogram(50, 200);
assert_eq!(config.window_size, 500);
assert!(config.keep_histogram);
assert_eq!(config.histogram_bucket_us, 50);
assert_eq!(config.histogram_num_buckets, 200);
}
#[test]
fn test_histogram_new() {
let hist = Histogram::new(100, 100);
assert_eq!(hist.total_count(), 0);
assert_eq!(hist.bucket_size_us(), 100);
assert_eq!(hist.max_trackable_us(), 10000);
}
#[test]
fn test_histogram_record() {
let mut hist = Histogram::new(100, 100);
hist.record(50); hist.record(150); hist.record(250);
assert_eq!(hist.total_count(), 3);
assert_eq!(hist.get_buckets()[0], 1);
assert_eq!(hist.get_buckets()[1], 1);
assert_eq!(hist.get_buckets()[2], 1);
}
#[test]
fn test_histogram_overflow() {
let mut hist = Histogram::new(100, 10); hist.record(500);
hist.record(1500);
assert_eq!(hist.total_count(), 2);
assert_eq!(hist.overflow_count(), 1);
}
#[test]
fn test_histogram_percentile() {
let mut hist = Histogram::new(100, 100);
for i in 1..=100 {
hist.record(i * 100 - 50); }
let p50 = hist.percentile(50.0);
assert!((4900..=5100).contains(&p50));
let p99 = hist.percentile(99.0);
assert!((9800..=10000).contains(&p99));
}
#[test]
fn test_histogram_reset() {
let mut hist = Histogram::new(100, 100);
hist.record(500);
hist.record(600);
hist.reset();
assert_eq!(hist.total_count(), 0);
assert_eq!(hist.overflow_count(), 0);
}
#[test]
fn test_latency_tracker_record() {
let mut tracker = LatencyTracker::with_defaults();
tracker.record(LatencyMetric::OrderToAck, 500, 1000);
tracker.record(LatencyMetric::OrderToAck, 600, 1001);
assert_eq!(tracker.measurement_count(LatencyMetric::OrderToAck), 2);
}
#[test]
fn test_latency_tracker_record_duration() {
let mut tracker = LatencyTracker::with_defaults();
tracker.record_duration(LatencyMetric::OrderToFill, Duration::from_millis(5), 1000);
let stats = tracker.get_stats(LatencyMetric::OrderToFill).unwrap();
assert_eq!(stats.avg_us, 5000);
}
#[test]
fn test_latency_tracker_get_stats() {
let mut tracker = LatencyTracker::with_defaults();
for i in 1..=10 {
tracker.record(LatencyMetric::OrderToAck, i * 100, i);
}
let stats = tracker.get_stats(LatencyMetric::OrderToAck).unwrap();
assert_eq!(stats.count, 10);
assert_eq!(stats.min_us, 100);
assert_eq!(stats.max_us, 1000);
assert_eq!(stats.avg_us, 550);
}
#[test]
fn test_latency_tracker_get_all_stats() {
let mut tracker = LatencyTracker::with_defaults();
tracker.record(LatencyMetric::OrderToAck, 500, 1000);
tracker.record(LatencyMetric::OrderToFill, 5000, 1001);
let all_stats = tracker.get_all_stats();
assert_eq!(all_stats.len(), 2);
assert!(all_stats.contains_key(&LatencyMetric::OrderToAck));
assert!(all_stats.contains_key(&LatencyMetric::OrderToFill));
}
#[test]
fn test_latency_tracker_get_recent() {
let mut tracker = LatencyTracker::with_defaults();
for i in 1..=10 {
tracker.record(LatencyMetric::OrderToAck, i * 100, i);
}
let recent = tracker.get_recent(LatencyMetric::OrderToAck, 3);
assert_eq!(recent.len(), 3);
assert_eq!(recent[0].value_us, 1000); assert_eq!(recent[1].value_us, 900);
assert_eq!(recent[2].value_us, 800);
}
#[test]
fn test_latency_tracker_is_degraded() {
let mut tracker = LatencyTracker::with_defaults();
tracker.record(LatencyMetric::OrderToAck, 500, 1000);
tracker.record(LatencyMetric::OrderToAck, 600, 1001);
assert!(!tracker.is_degraded(LatencyMetric::OrderToAck, 1000));
assert!(tracker.is_degraded(LatencyMetric::OrderToAck, 500));
}
#[test]
fn test_latency_tracker_is_p99_degraded() {
let mut tracker = LatencyTracker::with_defaults();
for i in 1..=100 {
tracker.record(LatencyMetric::OrderToAck, i * 10, i);
}
assert!(!tracker.is_p99_degraded(LatencyMetric::OrderToAck, 1000));
assert!(tracker.is_p99_degraded(LatencyMetric::OrderToAck, 900));
}
#[test]
fn test_latency_tracker_window_rotation() {
let config = LatencyTrackerConfig::default().with_window_size(5);
let mut tracker = LatencyTracker::new(config);
for i in 1..=10 {
tracker.record(LatencyMetric::OrderToAck, i * 100, i);
}
assert_eq!(tracker.measurement_count(LatencyMetric::OrderToAck), 5);
let stats = tracker.get_stats(LatencyMetric::OrderToAck).unwrap();
assert_eq!(stats.min_us, 600); assert_eq!(stats.max_us, 1000); }
#[test]
fn test_latency_tracker_with_histogram() {
let config = LatencyTrackerConfig::default().with_histogram(100, 100);
let mut tracker = LatencyTracker::new(config);
tracker.record(LatencyMetric::OrderToAck, 500, 1000);
tracker.record(LatencyMetric::OrderToAck, 600, 1001);
let hist = tracker.get_histogram(LatencyMetric::OrderToAck).unwrap();
assert_eq!(hist.total_count(), 2);
}
#[test]
fn test_latency_tracker_reset() {
let mut tracker = LatencyTracker::with_defaults();
tracker.record(LatencyMetric::OrderToAck, 500, 1000);
tracker.record(LatencyMetric::OrderToFill, 5000, 1001);
tracker.reset();
assert_eq!(tracker.total_measurement_count(), 0);
}
#[test]
fn test_latency_tracker_reset_metric() {
let mut tracker = LatencyTracker::with_defaults();
tracker.record(LatencyMetric::OrderToAck, 500, 1000);
tracker.record(LatencyMetric::OrderToFill, 5000, 1001);
tracker.reset_metric(LatencyMetric::OrderToAck);
assert_eq!(tracker.measurement_count(LatencyMetric::OrderToAck), 0);
assert_eq!(tracker.measurement_count(LatencyMetric::OrderToFill), 1);
}
#[test]
fn test_integer_sqrt() {
assert_eq!(integer_sqrt(0), 0);
assert_eq!(integer_sqrt(1), 1);
assert_eq!(integer_sqrt(4), 2);
assert_eq!(integer_sqrt(9), 3);
assert_eq!(integer_sqrt(10), 3);
assert_eq!(integer_sqrt(100), 10);
assert_eq!(integer_sqrt(10000), 100);
}
#[test]
fn test_percentile_function() {
let values = vec![100, 200, 300, 400, 500, 600, 700, 800, 900, 1000];
assert_eq!(percentile(&values, 0.0), 100);
assert_eq!(percentile(&values, 50.0), 600);
assert_eq!(percentile(&values, 100.0), 1000);
}
#[cfg(feature = "serde")]
#[test]
fn test_serialization() {
let measurement = LatencyMeasurement::new(500, 1000);
let json = serde_json::to_string(&measurement).unwrap();
let deserialized: LatencyMeasurement = serde_json::from_str(&json).unwrap();
assert_eq!(measurement, deserialized);
}
}