ebpf_histogram/
lib.rs

1use aya::{
2    maps::{MapData, PerCpuHashMap},
3    Pod,
4};
5use std::{collections::BTreeMap, collections::HashMap, hash::Hash, sync::Arc};
6
7use prometheus::{
8    core::{AtomicI64, Collector, Desc, GenericGaugeVec},
9    proto, IntGaugeVec, Opts,
10};
11
12pub trait Key: Sized + Pod + Eq + PartialEq + Hash + Send + Sync {
13    /// Return keys for the Prometheus label pairs:
14    /// - Can be empty
15    /// - Should be the same order and size as `get_label_values`
16    fn get_label_keys() -> Vec<String>;
17    /// Return values for the Prometheus label pairs:
18    /// - Can be empty
19    /// - Should be the same order as get_label_keys
20    fn get_label_values(&self) -> Vec<String>;
21}
22
23#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
24#[repr(C)]
25pub struct KeyWrapper<T: Key> {
26    pub bucket: u32,
27    pub sub_key: T,
28}
29
30unsafe impl<T: Key> Pod for KeyWrapper<T> {}
31
32#[derive(Clone)]
33pub struct Histogram<T: Key> {
34    map: Arc<PerCpuHashMap<MapData, KeyWrapper<T>, u64>>,
35    buckets_metric: GenericGaugeVec<AtomicI64>,
36}
37
38impl<T: Key> Collector for Histogram<T> {
39    fn desc(&self) -> Vec<&Desc> {
40        self.buckets_metric.desc()
41    }
42
43    fn collect(&self) -> Vec<proto::MetricFamily> {
44        // On the ebpf side, only the current bucket is incr, but prometheus expect
45        // all bucket lower or equal to be incremented as well.
46        // So `e` (equal) buckets needs to be transformed to `le` (lower equal)
47        let mut sorted_bucket_index = HashMap::<T, BTreeMap<u32, u64>>::new();
48        self.map
49            .iter()
50            .filter_map(|row| match row {
51                Ok(x) => Some(x),
52                Err(_) => None,
53            })
54            .for_each(|(key, values)| {
55                let entry = sorted_bucket_index.entry(key.sub_key).or_default();
56                let total = values.iter().sum::<u64>();
57                entry.insert(key.bucket, total);
58            });
59
60        // Use the sorted_bucket_index to accumulate total to form `le` buckets
61        sorted_bucket_index.iter().for_each(|(key, bucket_map)| {
62            let label_values = key.get_label_values();
63
64            let mut total = 0;
65            bucket_map.iter().for_each(|(bucket, value)| {
66                total += value;
67                // buckets are exposed as the exponant of a power of 2
68                let expanded_bucket = (2_u64.pow(*bucket)).to_string();
69                let mut str_label_values: Vec<&str> =
70                    label_values.iter().map(|x| x.as_str()).collect();
71                str_label_values.push(expanded_bucket.as_str());
72
73                self.buckets_metric
74                    .with_label_values(&str_label_values)
75                    .set(total as i64)
76            });
77        });
78        self.buckets_metric.collect()
79    }
80}
81
82impl<T: Key> Histogram<T> {
83    pub fn new_from_map(
84        map: PerCpuHashMap<MapData, KeyWrapper<T>, u64>,
85        opts: Opts,
86    ) -> Histogram<T> {
87        let label_keys = T::get_label_keys();
88        let mut str_label_keys: Vec<&str> = label_keys.iter().map(|x| x.as_str()).collect();
89        str_label_keys.push("le"); // le contains the lower/equal buckets for histogram
90
91        let buckets_metric = IntGaugeVec::new(opts, &str_label_keys).unwrap();
92        Histogram {
93            map: Arc::from(map),
94            buckets_metric,
95        }
96    }
97}