1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use super::{Metric, Pct, TimedEvent};
use crate::frame::Frame;
use crate::io::codec::vectorize;
use crate::io::provider::StreamType;
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(Debug)]
pub struct HistogramMetric;
impl Metric for HistogramMetric {
type State = HistogramState;
type Event = HistogramEvent;
fn stream_type() -> StreamType {
StreamType::from("rillrate.histogram.v0")
}
fn apply(state: &mut Self::State, event: TimedEvent<Self::Event>) {
match event.event {
HistogramEvent::Add(amount) => {
state.total.add(amount);
let expected = OrderedFloat::from(amount);
for (level, stat) in &mut state.buckets {
if &expected <= level {
stat.add(amount);
break;
}
}
if let Some(frame) = state.frame.as_mut() {
if let Some(amount) = frame.insert_pop(amount) {
state.total.del(amount);
let expected = OrderedFloat::from(amount);
for (level, stat) in &mut state.buckets {
if &expected <= level {
stat.del(amount);
break;
}
}
}
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stat {
pub sum: f64,
pub count: u64,
}
impl Default for Stat {
fn default() -> Self {
Self { sum: 0.0, count: 0 }
}
}
impl Stat {
fn add(&mut self, value: f64) {
self.sum += value;
self.count += 1;
}
fn del(&mut self, value: f64) {
self.sum -= value;
self.count -= 1;
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistogramState {
#[serde(with = "vectorize")]
pub buckets: BTreeMap<OrderedFloat<f64>, Stat>,
pub total: Stat,
frame: Option<Frame<f64>>,
}
impl HistogramState {
pub fn new(levels: &[f64], window: Option<u32>) -> Self {
let mut buckets: BTreeMap<_, _> = levels
.iter()
.map(|level| (OrderedFloat::from(*level), Stat::default()))
.collect();
let inf_level = OrderedFloat::from(f64::INFINITY);
buckets.entry(inf_level).or_default();
Self {
buckets,
total: Stat::default(),
frame: window.map(|size| Frame::new(size)),
}
}
pub fn bars(&self) -> impl Iterator<Item = Bar> + '_ {
let total = self.total.sum;
self.buckets.iter().map(move |(level, stat)| Bar {
level: *level,
count: stat.count,
pct: Pct::from_div(stat.sum, total),
})
}
}
pub struct Bar {
pub level: OrderedFloat<f64>,
pub count: u64,
pub pct: Pct,
}
pub type HistogramDelta = Vec<TimedEvent<HistogramEvent>>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HistogramEvent {
Add(f64),
}