Skip to main content

radiate_core/stats/
metric.rs

1use crate::stats::{MetricView, Tag, TagType, defaults};
2use radiate_error::{RadiateError, radiate_err};
3use radiate_utils::{
4    AnyValue, DataType, SmallStr, Statistic
5};
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8use std::{hash::Hash, time::Duration};
9
10const DTYPE_NULL: u8 = 0;
11const DTYPE_FLOAT32: u8 = 1;
12const DTYPE_DURATION: u8 = 2;
13const DTYPE_LIST: u8 = 3;
14
15#[macro_export]
16macro_rules! metric {
17    ($name:expr, $update:expr) => {{
18        let mut metric = $crate::Metric::new($name);
19        metric.apply_update($update);
20        metric
21    }};
22    ($name:expr) => {{ $crate::Metric::new($name).upsert(1) }};
23}
24
25
26#[derive(Clone, PartialEq, Default)]
27#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
28pub(super) struct Meta {
29    pub(super) update_count: usize,
30    pub(super) generation: u64,
31}
32
33#[derive(Clone, PartialEq, Default)]
34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
35pub struct Metric {
36    name: SmallStr,
37    inner: Statistic,
38    samples: Option<Vec<f32>>,
39    meta: Meta,
40    tags: Tag,
41    dtype: u8,
42}
43
44impl Metric {
45    pub fn new(name: impl Into<SmallStr>) -> Self {
46        let name = name.into();
47        let tags = defaults::default_tags(&name);
48
49        Self {
50            name,
51            inner: Statistic::default(),
52            meta: Meta::default(),
53            samples: None,
54            tags,
55            dtype: DTYPE_NULL,
56        }
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.meta.update_count == 0 && self.inner.count() == 0
61    }
62
63    #[inline(always)]
64    pub fn update_count(&self) -> usize {
65        self.meta.update_count
66    }
67
68    #[inline(always)]
69    pub fn generation(&self) -> u64 {
70        self.meta.generation
71    }
72
73    #[inline(always)]
74    pub fn set_generation(&mut self, generation: u64) {
75        if generation != self.meta.generation {
76            self.meta.update_count = 0;
77        }
78
79        self.meta.generation = generation;
80    }
81
82    pub fn dtype(&self) -> DataType {
83        match self.dtype {
84            DTYPE_NULL => DataType::Null,
85            DTYPE_FLOAT32 => DataType::Float32,
86            DTYPE_DURATION => DataType::Duration,
87            DTYPE_LIST => DataType::List(Box::new(DataType::Float32)),
88            _ => DataType::Null,
89        }
90    }
91
92    #[inline(always)]
93    pub fn tags(&self) -> Tag {
94        self.tags
95    }
96
97    #[inline(always)]
98    pub fn add_tag(&mut self, tag: TagType) {
99        self.tags.insert(tag);
100    }
101
102    pub fn iter_tags(&self) -> impl Iterator<Item = TagType> {
103        self.tags.iter()
104    }
105
106    pub fn clear_values(&mut self) {
107        self.inner = Statistic::default();
108        self.samples = None;
109    }
110
111    pub fn stats<'a>(&'a self) -> Option<MetricView<'a, f32>> {
112        if !self.tags.has(TagType::Statistic) {
113            return None;
114        }
115
116        Some(MetricView {
117            name: &self.name,
118            statistic: &self.inner,
119            samples: self.samples.as_deref(),
120            mapper: |v| v,
121        })
122    }
123
124    pub fn times<'a>(&'a self) -> Option<MetricView<'a, Duration>> {
125        if !self.tags.has(TagType::Time) {
126            return None;
127        }
128
129        Some(MetricView {
130            name: &self.name,
131            statistic: &self.inner,
132            samples: self.samples.as_deref(),
133            mapper: |v| Duration::from_secs_f32(v),
134        })
135    }
136
137    pub fn distributions<'a>(&'a self) -> Option<MetricView<'a, f32>> {
138        if !self.tags.has(TagType::Distribution) {
139            return None;
140        }
141
142        Some(MetricView {
143            name: &self.name,
144            statistic: &self.inner,
145            samples: self.samples.as_deref(),
146            mapper: |v| v,
147         })
148    }
149
150    #[inline(always)]
151    pub fn upsert<'a>(mut self, update: impl Into<MetricUpdate<'a>>) -> Self {
152        self.apply_update(update);
153        self
154    }
155
156    #[inline(always)]
157    pub fn update_from(&mut self, other: Metric) {
158        // Kinda a hack to take advantage of the fact that if count == sum,
159        // we can just apply the sum directly instead of merging statistics - keeps things honest
160        // & avoids merging statistics when we don't have to (even though that's a fast operation).
161        if other.count() as f32 == other.sum() && !other.tags.has(TagType::Distribution) {
162            self.apply_update(other.sum());
163        } else {
164            self.apply_update(other.inner);
165        }
166
167        self.tags = self.tags.union(other.tags);
168    }
169
170    #[inline(always)]
171    pub fn apply_update<'a>(&mut self, update: impl Into<MetricUpdate<'a>>) {
172        let update = update.into();
173        match update {
174            MetricUpdate::Float(value) => {
175                self.update_statistic(value);
176            }
177            MetricUpdate::Usize(value) => {
178                self.update_statistic(value as f32);
179            }
180            MetricUpdate::Duration(value) => {
181                self.update_time_statistic(value);
182            }
183            MetricUpdate::UsizeDistribution(values) => {
184                self.update_statistic_from_iter(values.iter().map(|v| *v as f32));
185            }
186            MetricUpdate::Distribution(values) => {
187                self.update_statistic_from_iter(values.iter().cloned());
188            }
189            MetricUpdate::OwnedDistribution(values) => {
190                self.update_statistic_from_iter(values);
191            }
192            MetricUpdate::Statistic(stat) => {
193                self.inner.merge(&stat);
194                self.dtype = DTYPE_FLOAT32;
195                self.meta.update_count += 1;
196            }
197            MetricUpdate::Bool(value) => {
198                self.update_statistic(if value { 1.0 } else { 0.0 });
199            }
200        }
201    }
202
203    fn update_statistic(&mut self, value: f32) {
204        self.inner.add(value);
205        self.add_tag(TagType::Statistic);
206
207        self.meta.update_count += 1;
208
209        if self.dtype == DTYPE_NULL {
210            self.dtype = DTYPE_FLOAT32;
211        }
212    }
213
214    fn update_time_statistic(&mut self, value: Duration) {
215        self.inner.add(value.as_secs_f32());
216        self.add_tag(TagType::Time);
217        self.meta.update_count += 1;
218
219        if self.dtype == DTYPE_NULL {
220            self.dtype = DTYPE_DURATION;
221        }
222    }
223
224    fn update_statistic_from_iter<I>(&mut self, values: I)
225    where
226        I: IntoIterator<Item = f32>,
227    {   
228        let samples = self.samples.get_or_insert_with(Vec::new);
229
230        samples.clear();
231        self.inner.clear();
232
233        for val in values {
234            samples.push(val);
235            self.inner.add(val);
236        }
237        
238        self.meta.update_count += self.inner.count() as usize;
239        
240        self.add_tag(TagType::Distribution);
241
242        if self.dtype == DTYPE_NULL {
243            self.dtype = DTYPE_LIST;
244        }
245    }
246
247    pub fn clear_samples(&mut self) {
248        self.samples = None;
249    }
250        
251    pub fn statistic(&self) -> &Statistic {
252        &self.inner
253    }
254
255    pub fn name(&self) -> &SmallStr {
256        &self.name
257    }
258
259    pub fn last_value(&self) -> f32 {
260        self.inner.last_value()
261    }
262
263    pub fn count(&self) -> u32 {
264        self.inner.count()
265    }
266
267    pub fn mean(&self) -> f32 {
268        self.inner.mean()
269    }
270
271    pub fn var(&self) -> f32 {
272        self.inner.variance().unwrap_or(0.0)
273    }
274
275    pub fn stddev(&self) -> f32 {
276        self.inner.std_dev().unwrap_or(0.0)
277    }
278
279    pub fn skew(&self) -> f32 {
280        self.inner.skewness().unwrap_or(0.0)
281    }
282
283    pub fn kurt(&self) -> f32 {
284        self.inner.kurtosis().unwrap_or(0.0)
285    }
286    
287    pub fn min(&self) -> f32 {
288        self.inner.min()
289    }
290
291    pub fn max(&self) -> f32 {
292        self.inner.max()
293    }
294
295    pub fn sum(&self) -> f32 {
296        self.inner.sum()
297    }
298
299    pub fn quantile(&self, q: f32) -> Option<f32> {
300        self.distributions().and_then(|view| view.quantile(q))
301    }
302}
303
304impl Hash for Metric {
305    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
306        self.name.hash(state);
307        self.inner.hash(state);
308        self.tags.hash(state);
309    }
310}
311
312#[derive(Clone, PartialEq, Debug)]
313pub enum MetricUpdate<'a> {
314    Float(f32),
315    Usize(usize),
316    Duration(Duration),
317    Distribution(&'a [f32]),
318    OwnedDistribution(Vec<f32>),
319    UsizeDistribution(&'a [usize]),
320    Statistic(Statistic),
321    Bool(bool),
322}
323
324impl From<f32> for MetricUpdate<'_> {
325    fn from(value: f32) -> Self {
326        MetricUpdate::Float(value)
327    }
328}
329
330impl From<usize> for MetricUpdate<'_> {
331    fn from(value: usize) -> Self {
332        MetricUpdate::Usize(value)
333    }
334}
335
336impl From<Duration> for MetricUpdate<'_> {
337    fn from(value: Duration) -> Self {
338        MetricUpdate::Duration(value)
339    }
340}
341
342impl<'a> From<&'a [f32]> for MetricUpdate<'a> {
343    fn from(value: &'a [f32]) -> Self {
344        MetricUpdate::Distribution(value)
345    }
346}
347
348impl<'a> From<&'a Vec<f32>> for MetricUpdate<'a> {
349    fn from(value: &'a Vec<f32>) -> Self {
350        MetricUpdate::Distribution(value)
351    }
352}
353
354impl<'a> From<&'a Vec<usize>> for MetricUpdate<'a> {
355    fn from(value: &'a Vec<usize>) -> Self {
356        MetricUpdate::UsizeDistribution(value)
357    }
358}
359
360impl From<Statistic> for MetricUpdate<'_> {
361    fn from(value: Statistic) -> Self {
362        MetricUpdate::Statistic(value)
363    }
364}
365
366impl From<bool> for MetricUpdate<'_> {
367    fn from(value: bool) -> Self {
368        MetricUpdate::Bool(value)
369    }
370}
371
372impl<'a> TryFrom<AnyValue<'a>> for MetricUpdate<'a> {
373    type Error = RadiateError;
374
375    fn try_from(value: AnyValue<'a>) -> Result<Self, Self::Error> {
376        match value {
377            AnyValue::UInt8(v) => Ok(MetricUpdate::Float(v as f32)),
378            AnyValue::UInt16(v) => Ok(MetricUpdate::Float(v as f32)),
379            AnyValue::UInt32(v) => Ok(MetricUpdate::Float(v as f32)),
380            AnyValue::UInt64(v) => Ok(MetricUpdate::Float(v as f32)),
381            AnyValue::UInt128(v) => Ok(MetricUpdate::Float(v as f32)),
382
383            AnyValue::Int8(v) => Ok(MetricUpdate::Float(v as f32)),
384            AnyValue::Int16(v) => Ok(MetricUpdate::Float(v as f32)),
385            AnyValue::Int32(v) => Ok(MetricUpdate::Float(v as f32)),
386            AnyValue::Int64(v) => Ok(MetricUpdate::Float(v as f32)),
387            AnyValue::Int128(v) => Ok(MetricUpdate::Float(v as f32)),
388
389            AnyValue::Float32(v) => Ok(MetricUpdate::Float(v)),
390            AnyValue::Float64(v) => Ok(MetricUpdate::Float(v as f32)),
391
392            AnyValue::Duration(v) => Ok(MetricUpdate::Duration(v)),
393
394            AnyValue::Slice(values) => {
395                let out = values
396                    .iter()
397                    .enumerate()
398                    .map(|(index, v)| {
399                        v.clone().extract::<f32>().ok_or(
400                            radiate_err!(
401                                Metric: 
402                                "cannot convert AnyValue sequence into Vec<f32>: element at index {index} has non-numeric type `{}`", v.type_name()))
403                            
404                    })
405                    .collect::<Result<Vec<f32>, _>>()?;
406
407                Ok(MetricUpdate::OwnedDistribution(out))
408            }
409
410            AnyValue::Vector(values) => {
411                let out = values
412                    .into_iter()
413                    .enumerate()
414                    .map(|(index, v)| {
415                        let ty = v.type_name();
416                        v.extract::<f32>()
417                            .ok_or(radiate_err!(
418                                Metric: 
419                                "cannot convert AnyValue sequence into Vec<f32>: element at index {index} has non-numeric type `{ty}`"
420                            ))
421                    })
422                    .collect::<Result<Vec<f32>, _>>()?;
423
424                Ok(MetricUpdate::OwnedDistribution(out))
425            }
426
427            other => Err(radiate_err!(Metric: "cannot convert AnyValue of type `{}` into MetricUpdate", other.type_name())),
428        }
429    }
430}
431
432impl std::fmt::Debug for Metric {
433    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434        write!(f, "Metric {{ name: {}, }}", self.name)
435    }
436}
437
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    const EPSILON: f32 = 1e-5;
444
445    fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
446        (a - b).abs() <= eps
447    }
448
449    fn assert_stat_eq(m: &Metric, count: u32, mean: f32, var: f32, min: f32, max: f32) {
450        assert_eq!(m.count(), count);
451        assert!(approx_eq(m.mean(), mean, EPSILON), "mean");
452        assert!(approx_eq(m.var(), var, EPSILON), "var");
453        assert!(approx_eq(m.min(), min, EPSILON), "min");
454        assert!(approx_eq(m.max(), max, EPSILON), "max");
455    }
456
457    fn stats_of(values: &[f32]) -> (u32, f32, f32, f32, f32) {
458        // sample variance (n-1), matches your Statistic::variance
459        let n = values.len() as u32;
460        if n == 0 {
461            return (0, 0.0, f32::NAN, f32::INFINITY, f32::NEG_INFINITY);
462        }
463        let mean = values.iter().sum::<f32>() / values.len() as f32;
464
465        let mut m2 = 0.0_f32;
466        for &v in values {
467            let d = v - mean;
468            m2 += d * d;
469        }
470
471        let var = if n == 1 { 0.0 } else { m2 / (n as f32 - 1.0) };
472
473        let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
474        let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
475
476        (n, mean, var, min, max)
477    }
478
479    #[test]
480    fn test_metric() {
481        let mut metric = Metric::new("test");
482        metric.apply_update(1.0);
483        metric.apply_update(2.0);
484        metric.apply_update(3.0);
485        metric.apply_update(4.0);
486        metric.apply_update(5.0);
487
488        assert_eq!(metric.count(), 5);
489        assert_eq!(metric.last_value(), 5.0);
490        assert_eq!(metric.mean(), 3.0);
491        assert_eq!(metric.var(), 2.5);
492        assert_eq!(metric.stddev(), 1.5811388);
493        assert_eq!(metric.min(), 1.0);
494        assert_eq!(metric.max(), 5.0);
495        assert_eq!(metric.name(), "test");
496    }
497
498    #[test]
499    fn test_metric_labels() {
500        let mut metric = Metric::new("test");
501
502        metric.apply_update(1.0);
503        metric.apply_update(2.0);
504        metric.apply_update(3.0);
505        metric.apply_update(4.0);
506        metric.apply_update(5.0);
507
508        assert_eq!(metric.count(), 5);
509        assert_eq!(metric.last_value(), 5.0);
510        assert_eq!(metric.mean(), 3.0);
511        assert_eq!(metric.var(), 2.5);
512        assert_eq!(metric.stddev(), 1.5811388);
513        assert_eq!(metric.min(), 1.0);
514        assert_eq!(metric.max(), 5.0);
515    }
516
517    #[test]
518    fn distribution_tag_is_applied_on_any_slice_update() {
519        let mut m = Metric::new("scores");
520
521        // seed with scalar samples first (creates Statistic but not Distribution tag)
522        m.apply_update(1.0);
523        m.apply_update(2.0);
524        assert!(m.tags().has(TagType::Statistic));
525        assert!(!m.tags().has(TagType::Distribution));
526
527        // now apply a slice update - we expect Distribution tag to appear
528        m.apply_update(&[3.0, 4.0][..]);
529
530        assert!(
531            m.tags().has(TagType::Distribution),
532            "expected Distribution tag after slice update"
533        );
534    }
535
536    #[test]
537    fn metric_merge_matches_streaming_samples() {
538        let a = [1.0, 2.0, 3.0, 4.0];
539        let b = [10.0, 20.0, 30.0];
540
541        let mut m1 = Metric::new("x");
542        m1.apply_update(&a[..]);
543
544        let mut m2 = Metric::new("x");
545        m2.apply_update(&b[..]);
546
547        m1.update_from(m2);
548
549        let combined = [1.0, 2.0, 3.0, 4.0, 10.0, 20.0, 30.0];
550        let (n, mean, var, min, max) = stats_of(&combined);
551        assert_stat_eq(&m1, n, mean, var, min, max);
552    }
553}