use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash};
use std::time::Duration;
use foldhash::fast::FixedState;
use crate::clock;
use crate::config::{SIGNIFICANT_VALUE_DIGITS, TrackerConfig};
use crate::histogram::SlidingWindowHistogram;
pub const DEFAULT_SUB_WINDOWS: usize = 10;
pub struct LatencyTracker<
D,
I: clock::Instant = std::time::Instant,
H = foldhash::fast::RandomState,
const N: usize = DEFAULT_SUB_WINDOWS,
> {
config: TrackerConfig,
histograms: HashMap<D, SlidingWindowHistogram<I, N>, H>,
}
impl<D, I> Default for LatencyTracker<D, I, foldhash::fast::RandomState, DEFAULT_SUB_WINDOWS>
where
D: Hash + Eq + Clone,
I: clock::Instant,
{
fn default() -> Self {
Self::new(TrackerConfig::default())
}
}
impl<D, I, const N: usize> LatencyTracker<D, I, foldhash::fast::FixedState, N>
where
D: Hash + Eq + Clone,
I: clock::Instant,
{
pub const fn const_new(config: TrackerConfig) -> Self {
Self::with_hasher_and_config(FixedState::with_seed(125322317734512), config)
}
}
impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
where
D: Hash + Eq + Clone,
I: clock::Instant,
H: Default,
{
pub fn new(config: TrackerConfig) -> Self {
Self {
config,
histograms: HashMap::default(),
}
}
}
impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
where
D: Hash + Eq + Clone,
I: clock::Instant,
H: BuildHasher,
{
pub const fn with_hasher_and_config(hasher: H, config: TrackerConfig) -> Self {
Self {
config,
histograms: HashMap::with_hasher(hasher),
}
}
}
impl<D, I, H, const N: usize> LatencyTracker<D, I, H, N>
where
D: Hash + Eq + Clone,
I: clock::Instant,
H: BuildHasher,
{
#[inline]
pub fn record_latency_from<Q>(&mut self, dest: &Q, earlier: I, now: I) -> Duration
where
D: Borrow<Q>,
Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
{
let latency = now.duration_since(earlier);
self.record_latency_ms(dest, latency.as_millis() as u64, now);
latency
}
#[inline]
pub fn record_latency<Q>(&mut self, dest: &Q, latency: Duration, now: I)
where
D: Borrow<Q>,
Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
{
self.record_latency_ms(dest, latency.as_millis() as u64, now);
}
#[inline]
pub fn record_latency_ms<Q>(&mut self, dest: &Q, latency_ms: u64, now: I)
where
D: Borrow<Q>,
Q: Hash + Eq + ToOwned<Owned = D> + ?Sized,
{
if let Some(histogram) = self.histograms.get_mut(dest) {
histogram.record(latency_ms, now);
return;
}
self.record_latency_ms_cold(dest.to_owned(), latency_ms, now);
}
#[cold]
fn record_latency_ms_cold(&mut self, dest: D, latency_ms: u64, now: I) {
let mut histogram = self.new_histogram(now);
histogram.record(latency_ms, now);
self.histograms.insert(dest, histogram);
}
#[inline]
pub fn quantile_ms<Q>(&mut self, dest: &Q, quantile: f64, now: I) -> Option<u64>
where
D: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let histogram = self.histograms.get_mut(dest)?;
histogram.quantile(quantile, self.config.min_samples as u64, now)
}
#[inline]
pub fn quantile<Q>(&mut self, dest: &Q, quantile: f64, now: I) -> Option<Duration>
where
D: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.quantile_ms(dest, quantile, now)
.map(Duration::from_millis)
}
pub fn clear(&mut self) {
self.histograms.clear();
}
#[inline]
pub fn config(&self) -> &TrackerConfig {
&self.config
}
fn new_histogram(&self, now: I) -> SlidingWindowHistogram<I, N> {
SlidingWindowHistogram::new(
self.config.window(),
SIGNIFICANT_VALUE_DIGITS,
self.config.max_trackable_latency_ms as u64,
now,
)
}
}
#[cfg(test)]
mod tests {
use std::time::Instant;
use super::*;
type TestTracker = LatencyTracker<u32, Instant>;
fn make_tracker() -> TestTracker {
let config = TrackerConfig {
min_samples: 5,
..TrackerConfig::default()
};
LatencyTracker::new(config)
}
#[test]
fn no_data_returns_none() {
let now = Instant::now();
let mut tracker = make_tracker();
assert_eq!(tracker.quantile(&1, 0.5, now), None);
}
#[test]
fn record_latency_directly() {
let now = Instant::now();
let mut tracker = make_tracker();
for _ in 0..10 {
tracker.record_latency(&1, Duration::from_millis(100), now);
}
let p50 = tracker.quantile(&1, 0.5, now).unwrap();
assert_eq!(p50, Duration::from_millis(100));
}
#[test]
fn record_latency_ms_directly() {
let now = Instant::now();
let mut tracker = make_tracker();
for _ in 0..10 {
tracker.record_latency_ms(&1, 100, now);
}
let p50 = tracker.quantile_ms(&1, 0.5, now).unwrap();
assert_eq!(p50, 100);
}
#[test]
fn record_latency_from_computes_duration() {
let now = Instant::now();
let mut tracker = make_tracker();
let later = now + Duration::from_millis(42);
for _ in 0..10 {
let d = tracker.record_latency_from(&1, now, later);
assert_eq!(d, Duration::from_millis(42));
}
let p50 = tracker.quantile_ms(&1, 0.5, later).unwrap();
assert_eq!(p50, 42);
}
#[test]
fn per_destination_isolation() {
let now = Instant::now();
let mut tracker = make_tracker();
for _ in 0..10 {
tracker.record_latency(&1, Duration::from_millis(100), now);
tracker.record_latency(&2, Duration::from_millis(500), now);
}
let p1 = tracker.quantile(&1, 0.5, now).unwrap();
let p2 = tracker.quantile(&2, 0.5, now).unwrap();
assert_eq!(p1, Duration::from_millis(100));
assert!(
p2 >= Duration::from_millis(495) && p2 <= Duration::from_millis(505),
"p2 was {p2:?}"
);
assert_eq!(tracker.quantile(&3, 0.5, now), None);
}
#[test]
fn clear_resets_all_state() {
let now = Instant::now();
let mut tracker = make_tracker();
for _ in 0..10 {
tracker.record_latency(&1, Duration::from_millis(100), now);
}
tracker.clear();
assert_eq!(tracker.quantile(&1, 0.5, now), None);
}
#[test]
fn insufficient_samples_returns_none() {
let now = Instant::now();
let mut tracker = make_tracker();
for _ in 0..4 {
tracker.record_latency(&1, Duration::from_millis(100), now);
}
assert_eq!(tracker.quantile(&1, 0.5, now), None);
tracker.record_latency(&1, Duration::from_millis(100), now);
assert!(tracker.quantile(&1, 0.5, now).is_some());
}
}