rill_view/flow/data/
histogram.rs1use ordered_float::OrderedFloat;
2use rill_protocol::flow::core::{Flow, TimedEvent};
3use rill_protocol::frame::Frame;
4use rill_protocol::io::provider::StreamType;
5use rill_protocol::range::Pct;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct Stat {
11 pub sum: f64,
12 pub count: u64,
13}
14
15impl Default for Stat {
16 fn default() -> Self {
17 Self { sum: 0.0, count: 0 }
18 }
19}
20
21impl Stat {
22 fn add(&mut self, value: f64) {
23 self.sum += value;
24 self.count += 1;
25 }
26
27 fn del(&mut self, value: f64) {
28 self.sum -= value;
29 self.count -= 1;
30 }
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct HistogramState {
35 #[serde(with = "vectorize")]
36 pub buckets: BTreeMap<OrderedFloat<f64>, Stat>,
37 pub total: Stat,
38 frame: Option<Frame<f64>>,
39}
40
41impl HistogramState {
42 pub fn new(levels: Vec<f64>, window: Option<u32>) -> Self {
43 let mut buckets: BTreeMap<_, _> = levels
44 .iter()
45 .map(|level| (OrderedFloat::from(*level), Stat::default()))
46 .collect();
47 let inf_level = OrderedFloat::from(f64::INFINITY);
48 buckets.entry(inf_level).or_default();
49 Self {
50 buckets,
51 total: Stat::default(),
52 frame: window.map(Frame::new),
53 }
54 }
55
56 pub fn bars(&self) -> impl Iterator<Item = Bar> + '_ {
57 let total = self.total.sum;
58 self.buckets.iter().map(move |(level, stat)| Bar {
59 level: *level,
60 count: stat.count,
61 pct: Pct::from_div(stat.sum, total),
62 })
63 }
64}
65
66impl Flow for HistogramState {
67 type Action = ();
68 type Event = HistogramEvent;
69
70 fn stream_type() -> StreamType {
71 StreamType::from("rillrate.data.histogram.v0")
72 }
73
74 fn apply(&mut self, event: TimedEvent<Self::Event>) {
75 match event.event {
76 HistogramEvent::Add(amount) => {
77 self.total.add(amount);
78 let expected = OrderedFloat::from(amount);
79 for (level, stat) in &mut self.buckets {
80 if &expected <= level {
81 stat.add(amount);
82 break;
83 }
84 }
85
86 if let Some(frame) = self.frame.as_mut() {
88 if let Some(amount) = frame.insert_pop(amount) {
89 self.total.del(amount);
90 let expected = OrderedFloat::from(amount);
91 for (level, stat) in &mut self.buckets {
92 if &expected <= level {
93 stat.del(amount);
94 break;
95 }
96 }
97 }
98 }
99 }
100 }
101 }
102}
103
104pub struct Bar {
105 pub level: OrderedFloat<f64>,
106 pub count: u64,
107 pub pct: Pct,
108}
109
110pub type HistogramDelta = Vec<TimedEvent<HistogramEvent>>;
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub enum HistogramEvent {
114 Add(f64),
115}