radiate_core/stats/
metrics.rs

1use super::Statistic;
2use crate::{Distribution, TimeStatistic};
3#[cfg(feature = "serde")]
4use serde::{Deserialize, Serialize};
5use std::sync::OnceLock;
6use std::{
7    collections::{BTreeMap, HashSet},
8    fmt::Debug,
9    sync::Mutex,
10    time::Duration,
11};
12
13static INTERNED: OnceLock<Mutex<HashSet<&'static str>>> = OnceLock::new();
14
15pub fn intern(name: String) -> &'static str {
16    let mut interned = INTERNED
17        .get_or_init(|| Mutex::new(HashSet::new()))
18        .lock()
19        .unwrap();
20    if let Some(&existing) = interned.get(&*name) {
21        return existing;
22    }
23
24    let static_name: &'static str = Box::leak(name.into_boxed_str());
25    interned.insert(static_name);
26    static_name
27}
28
29#[macro_export]
30macro_rules! metric {
31    ($name:expr, $time:expr) => {{
32        let mut metric = $crate::Metric::new($name);
33        metric.apply_update($time);
34        metric
35    }};
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
39pub struct MetricLabel {
40    pub key: &'static str,
41    pub value: String,
42}
43
44impl MetricLabel {
45    pub fn new(key: &'static str, value: impl Into<String>) -> Self {
46        Self {
47            key,
48            value: value.into(),
49        }
50    }
51}
52
53#[cfg(feature = "serde")]
54impl Serialize for MetricLabel {
55    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: serde::Serializer,
58    {
59        use serde::ser::SerializeStruct;
60
61        let mut state = serializer.serialize_struct("MetricLabel", 2)?;
62        state.serialize_field("key", &self.key)?;
63        state.serialize_field("value", &self.value)?;
64        state.end()
65    }
66}
67
68#[cfg(feature = "serde")]
69impl<'de> Deserialize<'de> for MetricLabel {
70    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
71    where
72        D: serde::Deserializer<'de>,
73    {
74        #[derive(Deserialize)]
75        struct MetricLabelData {
76            key: String,
77            value: String,
78        }
79
80        let data = MetricLabelData::deserialize(deserializer)?;
81        Ok(MetricLabel {
82            key: intern(data.key),
83            value: data.value,
84        })
85    }
86}
87
88#[macro_export]
89macro_rules! labels {
90    ($($key:expr => $value:expr),* $(,)?) => {
91        vec![
92            $(
93                $crate::stats::metrics::MetricLabel::new($key, $value)
94            ),*
95        ]
96    };
97}
98
99#[derive(Default, Clone)]
100pub struct MetricSet {
101    metrics: BTreeMap<&'static str, Metric>,
102}
103
104impl MetricSet {
105    pub fn new() -> Self {
106        MetricSet {
107            metrics: BTreeMap::new(),
108        }
109    }
110
111    pub fn merge(&mut self, other: &MetricSet) {
112        for (name, metric) in other.iter() {
113            if let Some(existing_metric) = self.metrics.get_mut(name) {
114                if let Some(value_stat) = &metric.inner.value_statistic {
115                    if let Some(existing_value_stat) = &mut existing_metric.inner.value_statistic {
116                        existing_value_stat.add(value_stat.last_value());
117                    } else {
118                        existing_metric.inner.value_statistic = Some(value_stat.clone());
119                    }
120                }
121
122                if let Some(time_stat) = &metric.inner.time_statistic {
123                    if let Some(existing_time_stat) = &mut existing_metric.inner.time_statistic {
124                        existing_time_stat.add(time_stat.last_time());
125                    } else {
126                        existing_metric.inner.time_statistic = Some(time_stat.clone());
127                    }
128                }
129
130                if let Some(dist) = &metric.inner.distribution {
131                    if let Some(existing_dist) = &mut existing_metric.inner.distribution {
132                        existing_dist.add(&dist.last_sequence());
133                    } else {
134                        existing_metric.inner.distribution = Some(dist.clone());
135                    }
136                }
137
138                if let Some(labels) = &metric.labels {
139                    existing_metric.labels = Some(labels.clone());
140                }
141            } else {
142                self.add(metric.clone());
143            }
144        }
145    }
146
147    pub fn add_labels(&mut self, name: &'static str, labels: Vec<MetricLabel>) {
148        if let Some(m) = self.metrics.get_mut(name) {
149            for label in labels {
150                m.add_label(label);
151            }
152        }
153    }
154
155    pub fn upsert<'a>(&mut self, name: &'static str, update: impl Into<MetricUpdate<'a>>) {
156        if let Some(m) = self.metrics.get_mut(name) {
157            m.apply_update(update);
158        } else {
159            self.add(Metric::new(name));
160            self.upsert(name, update);
161        }
162    }
163
164    pub fn add_or_update<'a>(&mut self, metric: Metric) {
165        if let Some(m) = self.metrics.get_mut(metric.name()) {
166            m.apply_update(metric.last_value());
167        } else {
168            self.add(metric);
169        }
170    }
171
172    pub fn add(&mut self, metric: Metric) {
173        self.metrics.insert(metric.name(), metric);
174    }
175
176    pub fn get(&self, name: &'static str) -> Option<&Metric> {
177        self.metrics.get(name)
178    }
179
180    pub fn iter(&self) -> impl Iterator<Item = (&'static str, &Metric)> {
181        self.metrics.iter().map(|(name, metric)| (*name, metric))
182    }
183
184    pub fn get_from_string(&self, name: String) -> Option<&Metric> {
185        self.metrics.get(name.as_str())
186    }
187
188    pub fn clear(&mut self) {
189        self.metrics.clear();
190    }
191
192    pub fn contains_key(&self, name: impl Into<String>) -> bool {
193        self.metrics.contains_key(intern(name.into()))
194    }
195}
196
197impl Debug for MetricSet {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        write!(f, "MetricSet {{\n")?;
200        write!(f, "{}", format_metrics_table(&self))?;
201        write!(f, "}}")
202    }
203}
204
205#[cfg(feature = "serde")]
206impl Serialize for MetricSet {
207    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
208    where
209        S: serde::Serializer,
210    {
211        let metrics = self
212            .metrics
213            .iter()
214            .map(|(_, metric)| metric.clone())
215            .collect::<Vec<Metric>>();
216        metrics.serialize(serializer)
217    }
218}
219
220#[cfg(feature = "serde")]
221impl<'de> Deserialize<'de> for MetricSet {
222    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
223    where
224        D: serde::Deserializer<'de>,
225    {
226        #[derive(Deserialize)]
227        struct MetricOwned {
228            name: String,
229            inner: MetricInner,
230            labels: Option<HashSet<MetricLabel>>,
231        }
232
233        let metrics = Vec::<MetricOwned>::deserialize(deserializer)?;
234
235        let mut metric_set = MetricSet::new();
236        for metric in metrics {
237            let metric = Metric {
238                name: intern(metric.name),
239                inner: metric.inner,
240                labels: metric.labels,
241            };
242            metric_set.add(metric);
243        }
244        Ok(metric_set)
245    }
246}
247
248#[derive(Clone, PartialEq, Default)]
249#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
250pub struct MetricInner {
251    pub(crate) value_statistic: Option<Statistic>,
252    pub(crate) time_statistic: Option<TimeStatistic>,
253    pub(crate) distribution: Option<Distribution>,
254}
255
256#[derive(Clone, PartialEq, Default)]
257#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
258pub struct Metric {
259    name: &'static str,
260    inner: MetricInner,
261    labels: Option<HashSet<MetricLabel>>,
262}
263
264impl Metric {
265    pub fn new(name: &'static str) -> Self {
266        Self {
267            name,
268            inner: MetricInner {
269                value_statistic: None,
270                time_statistic: None,
271                distribution: None,
272            },
273            labels: None,
274        }
275    }
276
277    pub fn inner(&self) -> &MetricInner {
278        &self.inner
279    }
280
281    pub fn labels(&self) -> Option<&HashSet<MetricLabel>> {
282        self.labels.as_ref()
283    }
284
285    pub fn with_labels(mut self, labels: Vec<MetricLabel>) -> Self {
286        self.labels.get_or_insert_with(HashSet::new).extend(labels);
287        self
288    }
289
290    pub fn add_label(&mut self, label: MetricLabel) {
291        self.labels.get_or_insert_with(HashSet::new).insert(label);
292    }
293
294    pub fn upsert<'a>(mut self, update: impl Into<MetricUpdate<'a>>) -> Self {
295        self.apply_update(update);
296        self
297    }
298
299    pub fn apply_update<'a>(&mut self, update: impl Into<MetricUpdate<'a>>) {
300        let update = update.into();
301        match update {
302            MetricUpdate::Float(value) => {
303                if let Some(stat) = &mut self.inner.value_statistic {
304                    stat.add(value);
305                } else {
306                    self.inner.value_statistic = Some(Statistic::from(value));
307                }
308            }
309            MetricUpdate::Usize(value) => {
310                if let Some(stat) = &mut self.inner.value_statistic {
311                    stat.add(value as f32);
312                } else {
313                    self.inner.value_statistic = Some(Statistic::from(value as f32));
314                }
315            }
316            MetricUpdate::Duration(value) => {
317                if let Some(stat) = &mut self.inner.time_statistic {
318                    stat.add(value);
319                } else {
320                    self.inner.time_statistic = Some(TimeStatistic::from(value));
321                }
322            }
323            MetricUpdate::Distribution(values) => {
324                if let Some(stat) = &mut self.inner.distribution {
325                    stat.add(values);
326                } else {
327                    self.inner.distribution = Some(Distribution::from(values));
328                }
329            }
330            MetricUpdate::FloatOperation(value, time) => {
331                if let Some(stat) = &mut self.inner.value_statistic {
332                    stat.add(value);
333                } else {
334                    self.inner.value_statistic = Some(Statistic::from(value));
335                }
336
337                if let Some(time_stat) = &mut self.inner.time_statistic {
338                    time_stat.add(time);
339                } else {
340                    self.inner.time_statistic = Some(TimeStatistic::from(time));
341                }
342            }
343            MetricUpdate::UsizeOperation(value, time) => {
344                if let Some(stat) = &mut self.inner.value_statistic {
345                    stat.add(value as f32);
346                } else {
347                    self.inner.value_statistic = Some(Statistic::from(value as f32));
348                }
349
350                if let Some(time_stat) = &mut self.inner.time_statistic {
351                    time_stat.add(time);
352                } else {
353                    self.inner.time_statistic = Some(TimeStatistic::from(time));
354                }
355            }
356            MetricUpdate::DistributionRef(values) => {
357                if let Some(stat) = &mut self.inner.distribution {
358                    stat.add(values);
359                } else {
360                    self.inner.distribution = Some(Distribution::from(values.as_slice()));
361                }
362            }
363            MetricUpdate::DistributionOwned(values) => {
364                if let Some(stat) = &mut self.inner.distribution {
365                    stat.add(&values);
366                } else {
367                    self.inner.distribution = Some(Distribution::from(values.as_slice()));
368                }
369            }
370        }
371    }
372
373    ///
374    /// --- Common statistic getters ---
375    ///
376    pub fn name(&self) -> &'static str {
377        self.name
378    }
379
380    pub fn last_value(&self) -> f32 {
381        self.inner
382            .value_statistic
383            .as_ref()
384            .map_or(0.0, |stat| stat.last_value())
385    }
386
387    pub fn distribution(&self) -> Option<&Distribution> {
388        self.inner.distribution.as_ref()
389    }
390
391    pub fn statistic(&self) -> Option<&Statistic> {
392        self.inner.value_statistic.as_ref()
393    }
394
395    pub fn time_statistic(&self) -> Option<&TimeStatistic> {
396        self.inner.time_statistic.as_ref()
397    }
398
399    pub fn last_time(&self) -> Duration {
400        self.time_statistic()
401            .map_or(Duration::ZERO, |stat| stat.last_time())
402    }
403
404    pub fn count(&self) -> i32 {
405        self.statistic().map(|stat| stat.count()).unwrap_or(0)
406    }
407
408    ///
409    /// --- Get the value statistics ---
410    ///
411    pub fn value_mean(&self) -> Option<f32> {
412        self.statistic().map(|stat| stat.mean())
413    }
414
415    pub fn value_variance(&self) -> Option<f32> {
416        self.statistic().map(|stat| stat.variance())
417    }
418
419    pub fn value_std_dev(&self) -> Option<f32> {
420        self.statistic().map(|stat| stat.std_dev())
421    }
422
423    pub fn value_skewness(&self) -> Option<f32> {
424        self.statistic().map(|stat| stat.skewness())
425    }
426
427    pub fn value_min(&self) -> Option<f32> {
428        self.statistic().map(|stat| stat.min())
429    }
430
431    pub fn value_max(&self) -> Option<f32> {
432        self.statistic().map(|stat| stat.max())
433    }
434
435    ///
436    /// ---Get the time statistics ---
437    ///
438    pub fn time_mean(&self) -> Option<Duration> {
439        self.time_statistic().map(|stat| stat.mean())
440    }
441
442    pub fn time_variance(&self) -> Option<Duration> {
443        self.time_statistic().map(|stat| stat.variance())
444    }
445
446    pub fn time_std_dev(&self) -> Option<Duration> {
447        self.time_statistic().map(|stat| stat.standard_deviation())
448    }
449
450    pub fn time_min(&self) -> Option<Duration> {
451        self.time_statistic().map(|stat| stat.min())
452    }
453
454    pub fn time_max(&self) -> Option<Duration> {
455        self.time_statistic().map(|stat| stat.max())
456    }
457
458    pub fn time_sum(&self) -> Option<Duration> {
459        self.time_statistic().map(|stat| stat.sum())
460    }
461
462    ///
463    /// --- Get the distribution statistics ---
464    ///
465    pub fn last_sequence(&self) -> Option<&Vec<f32>> {
466        self.distribution().map(|dist| dist.last_sequence())
467    }
468
469    pub fn distribution_mean(&self) -> Option<f32> {
470        self.distribution().map(|dist| dist.mean())
471    }
472
473    pub fn distribution_variance(&self) -> Option<f32> {
474        self.distribution().map(|dist| dist.variance())
475    }
476
477    pub fn distribution_std_dev(&self) -> Option<f32> {
478        self.distribution().map(|dist| dist.standard_deviation())
479    }
480
481    pub fn distribution_skewness(&self) -> Option<f32> {
482        self.distribution().map(|dist| dist.skewness())
483    }
484
485    pub fn distribution_kurtosis(&self) -> Option<f32> {
486        self.distribution().map(|dist| dist.kurtosis())
487    }
488
489    pub fn distribution_min(&self) -> Option<f32> {
490        self.distribution().map(|dist| dist.min())
491    }
492
493    pub fn distribution_max(&self) -> Option<f32> {
494        self.distribution().map(|dist| dist.max())
495    }
496
497    pub fn distribution_entropy(&self) -> Option<f32> {
498        self.distribution().map(|dist| dist.entropy())
499    }
500}
501
502#[derive(Debug, Clone, PartialEq)]
503pub enum MetricUpdate<'a> {
504    Float(f32),
505    Usize(usize),
506    Duration(Duration),
507    Distribution(&'a [f32]),
508    DistributionRef(&'a Vec<f32>),
509    DistributionOwned(Vec<f32>),
510    FloatOperation(f32, Duration),
511    UsizeOperation(usize, Duration),
512}
513
514impl From<f32> for MetricUpdate<'_> {
515    fn from(value: f32) -> Self {
516        MetricUpdate::Float(value)
517    }
518}
519
520impl From<usize> for MetricUpdate<'_> {
521    fn from(value: usize) -> Self {
522        MetricUpdate::Usize(value)
523    }
524}
525
526impl From<Duration> for MetricUpdate<'_> {
527    fn from(value: Duration) -> Self {
528        MetricUpdate::Duration(value)
529    }
530}
531
532impl<'a> From<&'a [f32]> for MetricUpdate<'a> {
533    fn from(value: &'a [f32]) -> Self {
534        MetricUpdate::Distribution(value)
535    }
536}
537
538impl From<(f32, Duration)> for MetricUpdate<'_> {
539    fn from(value: (f32, Duration)) -> Self {
540        MetricUpdate::FloatOperation(value.0, value.1)
541    }
542}
543
544impl From<(usize, Duration)> for MetricUpdate<'_> {
545    fn from(value: (usize, Duration)) -> Self {
546        MetricUpdate::UsizeOperation(value.0, value.1)
547    }
548}
549
550impl<'a> From<&'a Vec<f32>> for MetricUpdate<'a> {
551    fn from(value: &'a Vec<f32>) -> Self {
552        MetricUpdate::DistributionRef(value)
553    }
554}
555
556impl From<Vec<f32>> for MetricUpdate<'_> {
557    fn from(value: Vec<f32>) -> Self {
558        MetricUpdate::DistributionOwned(value)
559    }
560}
561
562impl std::fmt::Debug for Metric {
563    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564        write!(f, "Metric {{ name: {}, }}", self.name)
565    }
566}
567
568fn format_metrics_table(metrics: &MetricSet) -> String {
569    use std::fmt::Write;
570
571    let mut grouped = BTreeMap::new();
572    for metric in metrics.iter().map(|(_, m)| m) {
573        grouped.insert(metric.name(), metric);
574    }
575
576    let mut output = String::new();
577    writeln!(
578        output,
579        "{:<24} | {:<6} | {:<10} | {:<10} | {:<10} | {:<6} | {:<12} | {:<10} | {:<10} | {:<10} | {:<10}",
580        "Name", "Type", "Mean", "Min", "Max", "N", "Total", "StdDev", "Skew", "Kurt", "Entr"
581    )
582    .unwrap();
583    writeln!(output, "{}", "-".repeat(145)).unwrap();
584
585    for (name, metric) in grouped {
586        let inner = metric.inner();
587
588        // Value row
589        if let Some(stat) = &inner.value_statistic {
590            writeln!(
591                output,
592                "{:<24} | {:<6} | {:<10.3} | {:<10.3} | {:<10.3} | {:<6} | {:<12} | {:<10.3} | {:<10.3} | {:<10.3} | {:<10.3}",
593                name,
594                "value",
595                stat.mean(),
596                stat.min(),
597                stat.max(),
598                stat.count(),
599                "-",
600                stat.std_dev(),
601                stat.skewness(),
602                stat.kurtosis(),
603                "-",
604            )
605            .unwrap();
606        }
607
608        // Time row
609        if let Some(time) = &inner.time_statistic {
610            writeln!(
611                output,
612                "{:<24} | {:<6} | {:<10} | {:<10} | {:<10} | {:<6} | {:<12} | {:<10} | {:<10} | {:<10} | {:<10}",
613                name,
614                "time",
615                format!("{:?}", time.mean()),
616                format!("{:?}", time.min()),
617                format!("{:?}", time.max()),
618                time.count(),
619                format!("{:?}", time.sum()),
620                format!("{:?}", time.standard_deviation()),
621                "-",
622                "-",
623                "-",
624
625            )
626            .unwrap();
627        }
628
629        // Distribution row
630        if let Some(dist) = &inner.distribution {
631            writeln!(
632                output,
633                "{:<24} | {:<6} | {:<10.3} | {:<10.3} | {:<10.3} | {:<6} | {:<12} | {:<10.3} | {:<10.3} | {:<10.3} | {:<10.3}",
634                name,
635                "dist",
636                dist.mean(),
637                dist.min(),
638                dist.max(),
639                dist.count(),
640                format!("{:.3}", dist.entropy()),
641                dist.standard_deviation(),
642                dist.skewness(),
643                dist.kurtosis(),
644                format!("{:.3}", dist.entropy()),
645            )
646            .unwrap();
647        }
648
649        if let Some(labels) = &metric.labels {
650            let labels_str = labels
651                .iter()
652                .map(|l| format!("{}={}", l.key, l.value))
653                .collect::<Vec<String>>()
654                .join(", ");
655            writeln!(output, "{:<24} | Labels: {}", "", labels_str).unwrap();
656        }
657    }
658
659    output
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    #[test]
667    fn test_metric() {
668        let mut metric = Metric::new("test");
669        metric.apply_update(1.0);
670        metric.apply_update(2.0);
671        metric.apply_update(3.0);
672        metric.apply_update(4.0);
673        metric.apply_update(5.0);
674
675        assert_eq!(metric.count(), 5);
676        assert_eq!(metric.last_value(), 5.0);
677        assert_eq!(metric.value_mean().unwrap(), 3.0);
678        assert_eq!(metric.value_variance().unwrap(), 2.5);
679        assert_eq!(metric.value_std_dev().unwrap(), 1.5811388);
680        assert_eq!(metric.value_min().unwrap(), 1.0);
681        assert_eq!(metric.value_max().unwrap(), 5.0);
682        assert_eq!(metric.name(), "test");
683    }
684
685    #[test]
686    fn test_metric_set() {
687        let mut metric_set = MetricSet::new();
688        metric_set.upsert("test", 1.0);
689        metric_set.upsert("test", 2.0);
690        metric_set.upsert("test", 3.0);
691        metric_set.upsert("test", 4.0);
692        metric_set.upsert("test", 5.0);
693
694        let metric = metric_set.get("test").unwrap();
695
696        assert_eq!(metric.count(), 5);
697        assert_eq!(metric.last_value(), 5.0);
698        assert_eq!(metric.value_mean().unwrap(), 3.0);
699        assert_eq!(metric.value_variance().unwrap(), 2.5);
700        assert_eq!(metric.value_std_dev().unwrap(), 1.5811388);
701        assert_eq!(metric.value_min().unwrap(), 1.0);
702        assert_eq!(metric.value_max().unwrap(), 5.0);
703    }
704
705    #[test]
706    fn test_metric_labels() {
707        let mut metric = Metric::new("test");
708        metric.add_label(MetricLabel::new("label1", "value1"));
709        metric.add_label(MetricLabel::new("label2", "value2"));
710        metric.apply_update(1.0);
711        metric.apply_update(2.0);
712        metric.apply_update(3.0);
713        metric.apply_update(4.0);
714        metric.apply_update(5.0);
715
716        assert_eq!(metric.count(), 5);
717        assert_eq!(metric.last_value(), 5.0);
718        assert_eq!(metric.value_mean().unwrap(), 3.0);
719        assert_eq!(metric.value_variance().unwrap(), 2.5);
720        assert_eq!(metric.value_std_dev().unwrap(), 1.5811388);
721        assert_eq!(metric.value_min().unwrap(), 1.0);
722        assert_eq!(metric.value_max().unwrap(), 5.0);
723        assert!(metric.labels().is_some());
724        let labels = metric.labels().unwrap();
725        assert!(labels.contains(&MetricLabel::new("label1", "value1")));
726        assert!(labels.contains(&MetricLabel::new("label2", "value2")));
727    }
728}