Skip to main content

radiate_core/stats/
metric.rs

1use super::Statistic;
2use crate::{
3    TimeStatistic,
4    stats::{Tag, TagKind, defaults},
5};
6use radiate_utils::{ToSnakeCase, cache_arc_string, intern, intern_snake_case};
7#[cfg(feature = "serde")]
8use serde::{Deserialize, Serialize};
9use std::{sync::Arc, time::Duration};
10
11#[macro_export]
12macro_rules! metric {
13    ($name:expr, $update:expr) => {{
14        let mut metric = $crate::Metric::new($name);
15        metric.apply_update($update);
16        metric
17    }};
18    ($name:expr) => {{ $crate::Metric::new($name).upsert(1) }};
19}
20
21#[derive(Clone, PartialEq, Default)]
22#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
23struct MetricInner {
24    value_statistic: Option<Statistic>,
25    time_statistic: Option<TimeStatistic>,
26}
27
28#[derive(Clone, PartialEq, Default)]
29#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
30pub struct Metric {
31    name: Arc<String>,
32    inner: MetricInner,
33    tags: Tag,
34}
35
36impl Metric {
37    pub fn new(name: &'static str) -> Self {
38        let name = cache_arc_string!(intern_snake_case!(name));
39        let tags = defaults::default_tags(&name);
40
41        Self {
42            name,
43            inner: MetricInner {
44                value_statistic: None,
45                time_statistic: None,
46            },
47            tags,
48        }
49    }
50
51    #[inline(always)]
52    pub fn tags(&self) -> Tag {
53        self.tags
54    }
55
56    #[inline(always)]
57    pub fn with_tag(mut self, tag: TagKind) -> Self {
58        self.add_tag(tag);
59        self
60    }
61
62    #[inline(always)]
63    pub fn with_tags<T>(&mut self, tags: T)
64    where
65        T: Into<Tag>,
66    {
67        self.tags = tags.into();
68    }
69
70    #[inline(always)]
71    pub fn add_tags(&mut self, tags: Tag) {
72        self.tags = self.tags.union(tags);
73    }
74
75    #[inline(always)]
76    pub fn add_tag(&mut self, tag: TagKind) {
77        self.tags.insert(tag);
78    }
79
80    pub fn contains_tag(&self, tag: &TagKind) -> bool {
81        self.tags.has(*tag)
82    }
83
84    pub fn tags_iter(&self) -> impl Iterator<Item = TagKind> {
85        self.tags.iter()
86    }
87
88    pub fn clear_values(&mut self) {
89        self.inner = MetricInner::default();
90    }
91
92    #[inline(always)]
93    pub fn upsert<'a>(mut self, update: impl Into<MetricUpdate<'a>>) -> Self {
94        self.apply_update(update);
95        self
96    }
97
98    #[inline(always)]
99    pub fn update_from(&mut self, other: Metric) {
100        if let Some(stat) = other.inner.value_statistic {
101            // Kinda a hack to take advantage of the fact that if count == sum,
102            // we can just apply the sum directly instead of merging statistics - keeps things honest
103            // & avoids merging statistics when we don't have to (even though that's a fast operation).
104            if stat.count() as f32 == stat.sum() && !other.tags.has(TagKind::Distribution) {
105                self.apply_update(stat.sum());
106            } else {
107                self.apply_update(stat);
108            }
109        }
110
111        if let Some(time) = other.inner.time_statistic {
112            if time.count() as u32 == time.sum().as_millis() as u32 {
113                self.apply_update(time.sum());
114            } else {
115                self.apply_update(time);
116            }
117        }
118
119        self.tags = self.tags.union(other.tags);
120    }
121
122    #[inline(always)]
123    pub fn apply_update<'a>(&mut self, update: impl Into<MetricUpdate<'a>>) {
124        let update = update.into();
125        match update {
126            MetricUpdate::Float(value) => {
127                self.update_statistic(value);
128            }
129            MetricUpdate::Usize(value) => {
130                self.update_statistic(value as f32);
131            }
132            MetricUpdate::Duration(value) => {
133                self.update_time_statistic(value);
134            }
135            MetricUpdate::FloatOperation(value, time) => {
136                self.update_statistic(value);
137                self.update_time_statistic(time);
138            }
139            MetricUpdate::UsizeOperation(value, time) => {
140                self.update_statistic(value as f32);
141                self.update_time_statistic(time);
142            }
143            MetricUpdate::UsizeDistribution(values) => {
144                self.update_statistic_from_iter(values.iter().map(|v| *v as f32));
145            }
146            MetricUpdate::Distribution(values) => {
147                self.update_statistic_from_iter(values.iter().cloned());
148            }
149            MetricUpdate::Statistic(stat) => {
150                if let Some(existing_stat) = &mut self.inner.value_statistic {
151                    existing_stat.merge(&stat);
152                } else {
153                    self.new_statistic(stat);
154                }
155            }
156            MetricUpdate::TimeStatistic(time_stat) => {
157                if let Some(existing_time_stat) = &mut self.inner.time_statistic {
158                    existing_time_stat.merge(&time_stat);
159                } else {
160                    self.new_time_statistic(time_stat);
161                }
162            }
163        }
164    }
165
166    pub fn new_statistic(&mut self, value: impl Into<Statistic>) {
167        self.inner.value_statistic = Some(value.into());
168        self.add_tag(TagKind::Statistic);
169    }
170
171    pub fn new_time_statistic(&mut self, value: impl Into<TimeStatistic>) {
172        self.inner.time_statistic = Some(value.into());
173        self.add_tag(TagKind::Time);
174    }
175
176    fn update_statistic(&mut self, value: f32) {
177        if let Some(stat) = &mut self.inner.value_statistic {
178            stat.add(value);
179        } else {
180            self.new_statistic(value);
181        }
182    }
183
184    fn update_time_statistic(&mut self, value: Duration) {
185        if let Some(stat) = &mut self.inner.time_statistic {
186            stat.add(value);
187        } else {
188            self.new_time_statistic(value);
189        }
190    }
191
192    fn update_statistic_from_iter<I>(&mut self, values: I)
193    where
194        I: IntoIterator<Item = f32>,
195    {
196        if let Some(stat) = &mut self.inner.value_statistic {
197            for value in values {
198                stat.add(value);
199            }
200
201            self.add_tag(TagKind::Distribution);
202        } else {
203            let mut new_stat = Statistic::default();
204            for value in values {
205                new_stat.add(value);
206            }
207
208            self.new_statistic(new_stat);
209            self.add_tag(TagKind::Distribution);
210        }
211    }
212
213    ///
214    /// --- Common statistic getters ---
215    ///
216    pub fn name(&self) -> &str {
217        &self.name
218    }
219
220    pub fn last_value(&self) -> f32 {
221        self.inner
222            .value_statistic
223            .as_ref()
224            .map_or(0.0, |stat| stat.last_value())
225    }
226
227    pub fn statistic(&self) -> Option<&Statistic> {
228        self.inner.value_statistic.as_ref()
229    }
230
231    pub fn time_statistic(&self) -> Option<&TimeStatistic> {
232        self.inner.time_statistic.as_ref()
233    }
234
235    pub fn last_time(&self) -> Duration {
236        self.time_statistic()
237            .map_or(Duration::ZERO, |stat| stat.last_time())
238    }
239
240    pub fn count(&self) -> i32 {
241        if let Some(stat) = &self.inner.value_statistic {
242            return stat.count();
243        } else if let Some(stat) = &self.inner.time_statistic {
244            return stat.count();
245        }
246
247        // No statistics recorded yet
248        0
249    }
250
251    ///
252    /// --- Get the value statistics ---
253    ///
254    pub fn value_mean(&self) -> Option<f32> {
255        self.statistic().map(|stat| stat.mean())
256    }
257
258    pub fn value_variance(&self) -> Option<f32> {
259        self.statistic().map(|stat| stat.variance())
260    }
261
262    pub fn value_std_dev(&self) -> Option<f32> {
263        self.statistic().map(|stat| stat.std_dev())
264    }
265
266    pub fn value_skewness(&self) -> Option<f32> {
267        self.statistic().map(|stat| stat.skewness())
268    }
269
270    pub fn value_min(&self) -> Option<f32> {
271        self.statistic().map(|stat| stat.min())
272    }
273
274    pub fn value_max(&self) -> Option<f32> {
275        self.statistic().map(|stat| stat.max())
276    }
277
278    pub fn value_sum(&self) -> Option<f32> {
279        self.statistic().map(|stat| stat.sum())
280    }
281
282    pub fn value_count(&self) -> Option<i32> {
283        self.statistic().map(|stat| stat.count())
284    }
285
286    ///
287    /// --- Get the time statistics ---
288    ///
289    pub fn time_mean(&self) -> Option<Duration> {
290        self.time_statistic().map(|stat| stat.mean())
291    }
292
293    pub fn time_variance(&self) -> Option<Duration> {
294        self.time_statistic().map(|stat| stat.variance())
295    }
296
297    pub fn time_std_dev(&self) -> Option<Duration> {
298        self.time_statistic().map(|stat| stat.standard_deviation())
299    }
300
301    pub fn time_min(&self) -> Option<Duration> {
302        self.time_statistic().map(|stat| stat.min())
303    }
304
305    pub fn time_max(&self) -> Option<Duration> {
306        self.time_statistic().map(|stat| stat.max())
307    }
308
309    pub fn time_sum(&self) -> Option<Duration> {
310        self.time_statistic().map(|stat| stat.sum())
311    }
312}
313
314#[derive(Clone, PartialEq)]
315pub enum MetricUpdate<'a> {
316    Float(f32),
317    Usize(usize),
318    Duration(Duration),
319    FloatOperation(f32, Duration),
320    UsizeOperation(usize, Duration),
321    Distribution(&'a [f32]),
322    UsizeDistribution(&'a [usize]),
323    Statistic(Statistic),
324    TimeStatistic(TimeStatistic),
325}
326
327impl From<f32> for MetricUpdate<'_> {
328    fn from(value: f32) -> Self {
329        MetricUpdate::Float(value)
330    }
331}
332
333impl From<usize> for MetricUpdate<'_> {
334    fn from(value: usize) -> Self {
335        MetricUpdate::Usize(value)
336    }
337}
338
339impl From<Duration> for MetricUpdate<'_> {
340    fn from(value: Duration) -> Self {
341        MetricUpdate::Duration(value)
342    }
343}
344
345impl<'a> From<&'a [f32]> for MetricUpdate<'a> {
346    fn from(value: &'a [f32]) -> Self {
347        MetricUpdate::Distribution(value)
348    }
349}
350
351impl From<(f32, Duration)> for MetricUpdate<'_> {
352    fn from(value: (f32, Duration)) -> Self {
353        MetricUpdate::FloatOperation(value.0, value.1)
354    }
355}
356
357impl From<(usize, Duration)> for MetricUpdate<'_> {
358    fn from(value: (usize, Duration)) -> Self {
359        MetricUpdate::UsizeOperation(value.0, value.1)
360    }
361}
362
363impl<'a> From<&'a Vec<f32>> for MetricUpdate<'a> {
364    fn from(value: &'a Vec<f32>) -> Self {
365        MetricUpdate::Distribution(value)
366    }
367}
368
369impl<'a> From<&'a Vec<usize>> for MetricUpdate<'a> {
370    fn from(value: &'a Vec<usize>) -> Self {
371        MetricUpdate::UsizeDistribution(value)
372    }
373}
374
375impl From<Statistic> for MetricUpdate<'_> {
376    fn from(value: Statistic) -> Self {
377        MetricUpdate::Statistic(value)
378    }
379}
380
381impl From<TimeStatistic> for MetricUpdate<'_> {
382    fn from(value: TimeStatistic) -> Self {
383        MetricUpdate::TimeStatistic(value)
384    }
385}
386
387impl std::fmt::Debug for Metric {
388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389        write!(f, "Metric {{ name: {}, }}", self.name)
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    const EPSILON: f32 = 1e-5;
398
399    fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
400        (a - b).abs() <= eps
401    }
402
403    fn assert_stat_eq(m: &Metric, count: i32, mean: f32, var: f32, min: f32, max: f32) {
404        assert_eq!(m.count(), count);
405        assert!(approx_eq(m.value_mean().unwrap(), mean, EPSILON), "mean");
406        assert!(approx_eq(m.value_variance().unwrap(), var, EPSILON), "var");
407        assert!(approx_eq(m.value_min().unwrap(), min, EPSILON), "min");
408        assert!(approx_eq(m.value_max().unwrap(), max, EPSILON), "max");
409    }
410
411    fn stats_of(values: &[f32]) -> (i32, f32, f32, f32, f32) {
412        // sample variance (n-1), matches your Statistic::variance
413        let n = values.len() as i32;
414        if n == 0 {
415            return (0, 0.0, f32::NAN, f32::INFINITY, f32::NEG_INFINITY);
416        }
417        let mean = values.iter().sum::<f32>() / values.len() as f32;
418
419        let mut m2 = 0.0_f32;
420        for &v in values {
421            let d = v - mean;
422            m2 += d * d;
423        }
424
425        let var = if n == 1 { 0.0 } else { m2 / (n as f32 - 1.0) };
426
427        let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
428        let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
429
430        (n, mean, var, min, max)
431    }
432
433    #[test]
434    fn test_metric() {
435        let mut metric = Metric::new("test");
436        metric.apply_update(1.0);
437        metric.apply_update(2.0);
438        metric.apply_update(3.0);
439        metric.apply_update(4.0);
440        metric.apply_update(5.0);
441
442        assert_eq!(metric.count(), 5);
443        assert_eq!(metric.last_value(), 5.0);
444        assert_eq!(metric.value_mean().unwrap(), 3.0);
445        assert_eq!(metric.value_variance().unwrap(), 2.5);
446        assert_eq!(metric.value_std_dev().unwrap(), 1.5811388);
447        assert_eq!(metric.value_min().unwrap(), 1.0);
448        assert_eq!(metric.value_max().unwrap(), 5.0);
449        assert_eq!(metric.name(), "test");
450    }
451
452    #[test]
453    fn test_metric_labels() {
454        let mut metric = Metric::new("test");
455
456        metric.apply_update(1.0);
457        metric.apply_update(2.0);
458        metric.apply_update(3.0);
459        metric.apply_update(4.0);
460        metric.apply_update(5.0);
461
462        assert_eq!(metric.count(), 5);
463        assert_eq!(metric.last_value(), 5.0);
464        assert_eq!(metric.value_mean().unwrap(), 3.0);
465        assert_eq!(metric.value_variance().unwrap(), 2.5);
466        assert_eq!(metric.value_std_dev().unwrap(), 1.5811388);
467        assert_eq!(metric.value_min().unwrap(), 1.0);
468        assert_eq!(metric.value_max().unwrap(), 5.0);
469    }
470
471    #[test]
472    fn distribution_updates_accumulate_samples_across_calls() {
473        let a = [1.0, 2.0, 3.0];
474        let b = [10.0, 20.0];
475
476        let mut m = Metric::new("scores");
477
478        m.apply_update(&a[..]);
479        // expected stats over [1,2,3]
480        let (n1, mean1, var1, min1, max1) = stats_of(&a);
481        assert_stat_eq(&m, n1, mean1, var1, min1, max1);
482
483        m.apply_update(&b[..]);
484        // expected stats over [1,2,3,10,20]
485        let combined = [1.0, 2.0, 3.0, 10.0, 20.0];
486        let (n2, mean2, var2, min2, max2) = stats_of(&combined);
487        assert_stat_eq(&m, n2, mean2, var2, min2, max2);
488    }
489
490    #[test]
491    fn distribution_tag_is_applied_on_any_slice_update() {
492        let mut m = Metric::new("scores");
493
494        // seed with scalar samples first (creates Statistic but not Distribution tag)
495        m.apply_update(1.0);
496        m.apply_update(2.0);
497        assert!(m.tags().has(TagKind::Statistic));
498        assert!(!m.tags().has(TagKind::Distribution));
499
500        // now apply a slice update - we expect Distribution tag to appear
501        m.apply_update(&[3.0, 4.0][..]);
502
503        assert!(
504            m.tags().has(TagKind::Distribution),
505            "expected Distribution tag after slice update"
506        );
507    }
508
509    #[test]
510    fn metric_merge_matches_streaming_samples() {
511        let a = [1.0, 2.0, 3.0, 4.0];
512        let b = [10.0, 20.0, 30.0];
513
514        let mut m1 = Metric::new("x");
515        m1.apply_update(&a[..]);
516
517        let mut m2 = Metric::new("x");
518        m2.apply_update(&b[..]);
519
520        m1.update_from(m2);
521
522        let combined = [1.0, 2.0, 3.0, 4.0, 10.0, 20.0, 30.0];
523        let (n, mean, var, min, max) = stats_of(&combined);
524        assert_stat_eq(&m1, n, mean, var, min, max);
525    }
526}