rill_view/flow/data/
histogram.rs

1use ordered_float::OrderedFloat;
2use rill_protocol::flow::core::{Flow, TimedEvent};
3use rill_protocol::frame::Frame;
4use rill_protocol::io::provider::StreamType;
5use rill_protocol::range::Pct;
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct Stat {
11    pub sum: f64,
12    pub count: u64,
13}
14
15impl Default for Stat {
16    fn default() -> Self {
17        Self { sum: 0.0, count: 0 }
18    }
19}
20
21impl Stat {
22    fn add(&mut self, value: f64) {
23        self.sum += value;
24        self.count += 1;
25    }
26
27    fn del(&mut self, value: f64) {
28        self.sum -= value;
29        self.count -= 1;
30    }
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct HistogramState {
35    #[serde(with = "vectorize")]
36    pub buckets: BTreeMap<OrderedFloat<f64>, Stat>,
37    pub total: Stat,
38    frame: Option<Frame<f64>>,
39}
40
41impl HistogramState {
42    pub fn new(levels: Vec<f64>, window: Option<u32>) -> Self {
43        let mut buckets: BTreeMap<_, _> = levels
44            .iter()
45            .map(|level| (OrderedFloat::from(*level), Stat::default()))
46            .collect();
47        let inf_level = OrderedFloat::from(f64::INFINITY);
48        buckets.entry(inf_level).or_default();
49        Self {
50            buckets,
51            total: Stat::default(),
52            frame: window.map(Frame::new),
53        }
54    }
55
56    pub fn bars(&self) -> impl Iterator<Item = Bar> + '_ {
57        let total = self.total.sum;
58        self.buckets.iter().map(move |(level, stat)| Bar {
59            level: *level,
60            count: stat.count,
61            pct: Pct::from_div(stat.sum, total),
62        })
63    }
64}
65
66impl Flow for HistogramState {
67    type Action = ();
68    type Event = HistogramEvent;
69
70    fn stream_type() -> StreamType {
71        StreamType::from("rillrate.data.histogram.v0")
72    }
73
74    fn apply(&mut self, event: TimedEvent<Self::Event>) {
75        match event.event {
76            HistogramEvent::Add(amount) => {
77                self.total.add(amount);
78                let expected = OrderedFloat::from(amount);
79                for (level, stat) in &mut self.buckets {
80                    if &expected <= level {
81                        stat.add(amount);
82                        break;
83                    }
84                }
85
86                // If sliding window is active
87                if let Some(frame) = self.frame.as_mut() {
88                    if let Some(amount) = frame.insert_pop(amount) {
89                        self.total.del(amount);
90                        let expected = OrderedFloat::from(amount);
91                        for (level, stat) in &mut self.buckets {
92                            if &expected <= level {
93                                stat.del(amount);
94                                break;
95                            }
96                        }
97                    }
98                }
99            }
100        }
101    }
102}
103
104pub struct Bar {
105    pub level: OrderedFloat<f64>,
106    pub count: u64,
107    pub pct: Pct,
108}
109
110pub type HistogramDelta = Vec<TimedEvent<HistogramEvent>>;
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub enum HistogramEvent {
114    Add(f64),
115}