use crate::{LabelValues, Labels};
use num_traits::Zero;
use parking_lot::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
const SAMPLING_WINDOWS: usize = 4;
pub struct Windowing<P> {
current_window: AtomicUsize,
windows: [Box<P>; SAMPLING_WINDOWS],
}
impl<P: Default> Windowing<P> {
pub fn new() -> Self {
Self {
current_window: AtomicUsize::new(0),
windows: [
Box::new(P::default()),
Box::new(P::default()),
Box::new(P::default()),
Box::new(P::default()),
],
}
}
pub fn current(&self) -> &P {
&self.windows[self.current_window.load(Ordering::SeqCst)]
}
pub fn cycle_windows(&self) -> &P {
let old_idx = self.current_window.load(Ordering::SeqCst);
self.current_window
.store((old_idx + 1) % self.windows.len(), Ordering::SeqCst);
&self.windows[old_idx]
}
}
const WINDOW_SIZE: usize = 65536;
struct ObservationSet<T: Ord + Zero + Copy> {
idx: usize,
wraps: usize,
data: Box<[T]>,
}
impl<T: Ord + Zero + Copy> ObservationSet<T> {
pub fn new() -> Self {
Self {
idx: 0,
wraps: 0,
data: vec![T::zero(); WINDOW_SIZE].into_boxed_slice(),
}
}
fn clear(&mut self) {
self.idx = 0;
self.wraps = 0;
}
fn wraps(&self) -> usize {
self.wraps
}
fn sorted_data(&mut self) -> &[T] {
let data = &mut self.data[..self.idx];
data.sort_unstable();
data
}
fn add(&mut self, observation: T) {
self.data[self.idx] = observation;
self.idx = (self.idx + 1) % WINDOW_SIZE;
if self.idx == 0 {
self.wraps = self.wraps.saturating_add(1);
}
}
#[allow(dead_code)]
fn size(&self) -> usize {
if self.wraps > 0 {
self.data.len()
} else {
self.idx
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct Sample<T: Ord + Zero + Copy> {
pub dropped: usize,
pub wraps: usize,
pub p25: T,
pub p50: T,
pub p75: T,
pub p90: T,
pub p95: T,
pub p99: T,
pub p99p9: T,
pub max: T,
pub count: usize,
}
pub struct Observations<T: Ord + Zero + Copy> {
observations: Mutex<ObservationSet<T>>,
drops: AtomicUsize,
name: &'static str,
}
impl<T: Ord + Zero + Copy> Observations<T> {
pub fn new(name: &'static str) -> Self {
Self {
observations: Mutex::new(ObservationSet::new()),
drops: AtomicUsize::new(0),
name,
}
}
pub fn name(&self) -> &'static str {
self.name
}
pub fn sample(&self) -> Sample<T> {
let mut observations = self.observations.lock();
let wraps = observations.wraps();
let sorted = observations.sorted_data();
fn percentile<T: Ord + Zero + Copy>(sorted_ts: &[T], p: f64) -> T {
if sorted_ts.len() == 0 {
T::zero()
} else {
let percentile_idx = ((sorted_ts.len() as f64 * p) / 100.0) as usize;
sorted_ts[percentile_idx]
}
}
let p25 = percentile(&sorted, 25.0);
let p50 = percentile(&sorted, 50.0);
let p75 = percentile(&sorted, 75.0);
let p90 = percentile(&sorted, 90.0);
let p95 = percentile(&sorted, 95.0);
let p99 = percentile(&sorted, 99.0);
let p99p9 = percentile(&sorted, 99.9);
let max = sorted.last().map(|x| *x).unwrap_or_else(|| T::zero());
let count = sorted.len();
observations.clear();
std::mem::drop(observations);
let dropped = self.drops.swap(0, Ordering::SeqCst);
Sample {
dropped,
wraps,
p25,
p50,
p75,
p90,
p95,
p99,
p99p9,
max,
count,
}
}
pub fn record(&self, observation: T) {
if let Some(mut observations) = self.observations.try_lock() {
observations.add(observation);
} else {
self.drops.fetch_add(1, Ordering::SeqCst);
}
}
}
crate::label_enum! {
pub enum TimingBucket {
P25,
P50,
P75,
P90,
P95,
P99,
P99P9,
Max,
Count,
}
}
impl Labels for TimingBucket {
fn label_names() -> Vec<&'static str> {
vec!["bucket"]
}
fn possible_label_values() -> Vec<LabelValues<'static>> {
Self::all_variants()
.into_iter()
.map(|b| vec![b.as_str()])
.collect()
}
fn label_values(&self) -> LabelValues {
vec![self.as_str()]
}
}
impl<T: Ord + Zero + Copy + Into<i64>> Sample<T> {
pub fn as_bucket_pairs(&self) -> Vec<(TimingBucket, i64)> {
vec![
(TimingBucket::P25, self.p25.into()),
(TimingBucket::P50, self.p50.into()),
(TimingBucket::P75, self.p75.into()),
(TimingBucket::P90, self.p90.into()),
(TimingBucket::P95, self.p95.into()),
(TimingBucket::P99, self.p99.into()),
(TimingBucket::P99P9, self.p99p9.into()),
(TimingBucket::Max, self.max.into()),
(TimingBucket::Count, self.count as i64),
]
}
pub fn dropped(&self) -> usize {
self.dropped
}
pub fn wraps(&self) -> usize {
self.wraps
}
}
#[cfg(test)]
mod tests {
use super::{Observations, Sample, WINDOW_SIZE};
#[test]
fn test_wraps_are_reported() {
let observations = Observations::new("test");
for i in 0..WINDOW_SIZE {
observations.record(i);
}
observations.record(500);
observations.record(501);
observations.record(502);
observations.record(503);
let sample = observations.sample();
assert_eq!(sample.dropped, 0);
assert_eq!(sample.wraps, 1);
let sample = observations.sample();
assert_eq!(
sample,
Sample {
dropped: 0,
wraps: 0,
p25: 0,
p50: 0,
p75: 0,
p90: 0,
p95: 0,
p99: 0,
p99p9: 0,
max: 0,
count: 0,
}
);
}
#[test]
fn test_percentiles_are_reported() {
#[rustfmt::skip]
let data = [
1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
40, 41, 42, 43, 44, 45, 46, 47, 48, 49,
50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
60, 61, 62, 63, 64, 65, 66, 67, 68, 69,
70, 71, 72, 73, 74, 75, 76, 77, 78, 79,
80, 81, 82, 83, 84, 85, 86, 87, 88, 89,
90, 91, 92, 93, 94, 95, 96, 97, 98, 99,
];
let observations = Observations::new("test");
for datum in data.iter().cloned() {
observations.record(datum);
}
let sample = observations.sample();
assert_eq!(
sample,
Sample {
dropped: 0,
wraps: 0,
p25: 25,
p50: 50,
p75: 75,
p90: 90,
p95: 95,
p99: 99,
p99p9: 99,
max: 99,
count: 99,
}
);
}
#[test]
fn test_small_sampleset() {
let observations = Observations::new("test");
observations.record(500);
observations.record(501);
observations.record(502);
observations.record(503);
observations.record(504);
let sample = observations.sample();
assert_eq!(
sample,
Sample {
dropped: 0,
wraps: 0,
p25: 501,
p50: 502,
p75: 503,
p90: 504,
p95: 504,
p99: 504,
p99p9: 504,
max: 504,
count: 5,
}
);
}
#[test]
fn test_overflow_wraps_writes() {
let observations = Observations::new("test");
for _ in 0..WINDOW_SIZE {
observations.record(1);
}
for _ in 0..(WINDOW_SIZE / 2) {
observations.record(2);
}
for _ in 0..(WINDOW_SIZE / 10) {
observations.record(3);
}
let sample = observations.sample();
assert_eq!(
sample,
Sample {
dropped: 0,
wraps: 1,
p25: 2,
p50: 2,
p75: 2,
p90: 3,
p95: 3,
p99: 3,
p99p9: 3,
max: 3,
count: WINDOW_SIZE / 2 + WINDOW_SIZE / 10,
}
);
}
}