1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use super::{Metric, Pct, TimedEvent};
use crate::frame::Frame;
use crate::io::codec::vectorize;
use crate::io::provider::StreamType;
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

#[derive(Debug)]
pub struct HistogramMetric;

impl Metric for HistogramMetric {
    type State = HistogramState;
    type Event = HistogramEvent;

    fn stream_type() -> StreamType {
        StreamType::from("rillrate.histogram.v0")
    }

    fn apply(state: &mut Self::State, event: TimedEvent<Self::Event>) {
        match event.event {
            HistogramEvent::Add(amount) => {
                state.total.add(amount);
                let expected = OrderedFloat::from(amount);
                for (level, stat) in &mut state.buckets {
                    if &expected <= level {
                        stat.add(amount);
                        break;
                    }
                }

                // If sliding window is active
                if let Some(frame) = state.frame.as_mut() {
                    if let Some(amount) = frame.insert_pop(amount) {
                        state.total.del(amount);
                        let expected = OrderedFloat::from(amount);
                        for (level, stat) in &mut state.buckets {
                            if &expected <= level {
                                stat.del(amount);
                                break;
                            }
                        }
                    }
                }
            }
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Stat {
    pub sum: f64,
    pub count: u64,
}

impl Default for Stat {
    fn default() -> Self {
        Self { sum: 0.0, count: 0 }
    }
}

impl Stat {
    fn add(&mut self, value: f64) {
        self.sum += value;
        self.count += 1;
    }

    fn del(&mut self, value: f64) {
        self.sum -= value;
        self.count -= 1;
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HistogramState {
    #[serde(with = "vectorize")]
    pub buckets: BTreeMap<OrderedFloat<f64>, Stat>,
    pub total: Stat,
    frame: Option<Frame<f64>>,
}

impl HistogramState {
    pub fn new(levels: &[f64], window: Option<u32>) -> Self {
        let mut buckets: BTreeMap<_, _> = levels
            .iter()
            .map(|level| (OrderedFloat::from(*level), Stat::default()))
            .collect();
        let inf_level = OrderedFloat::from(f64::INFINITY);
        buckets.entry(inf_level).or_default();
        Self {
            buckets,
            total: Stat::default(),
            frame: window.map(|size| Frame::new(size)),
        }
    }

    pub fn bars(&self) -> impl Iterator<Item = Bar> + '_ {
        let total = self.total.sum;
        self.buckets.iter().map(move |(level, stat)| Bar {
            level: *level,
            count: stat.count,
            pct: Pct::from_div(stat.sum, total),
        })
    }
}

pub struct Bar {
    pub level: OrderedFloat<f64>,
    pub count: u64,
    pub pct: Pct,
}

pub type HistogramDelta = Vec<TimedEvent<HistogramEvent>>;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HistogramEvent {
    Add(f64),
}