Skip to main content

radiate_core/stats/
metric.rs

1use crate::stats::{MetricView, Tag, TagType, defaults};
2use radiate_error::{RadiateError, radiate_err};
3use radiate_expr::{AnyValue, DataType};
4use radiate_utils::{
5    Statistic, ToSnakeCase, cache_arc_string, intern, intern_snake_case,
6};
7#[cfg(feature = "serde")]
8use serde::{Deserialize, Serialize};
9use std::{hash::Hash, sync::Arc, time::Duration};
10
11const DATA_TYPE_NULL: u8 = 0;
12const DATA_TYPE_FLOAT32: u8 = 1;
13const DATA_TYPE_DURATION: u8 = 2;
14const DATA_TYPE_LIST: u8 = 3;
15
16#[macro_export]
17macro_rules! metric {
18    ($name:expr, $update:expr) => {{
19        let mut metric = $crate::Metric::new($name);
20        metric.apply_update($update);
21        metric
22    }};
23    ($name:expr) => {{ $crate::Metric::new($name).upsert(1) }};
24}
25
26
27#[derive(Clone, PartialEq, Default)]
28#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
29pub(super) struct Meta {
30    pub(super) update_count: usize,
31    pub(super) version: u64,
32}
33
34#[derive(Clone, PartialEq, Default)]
35#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
36pub struct Metric {
37    name: Arc<String>,
38    meta: Option<Meta>,
39    inner: Statistic,
40    tags: Tag,
41    dtype: u8,
42}
43
44impl Metric {
45    pub fn new(name: &'static str) -> Self {
46        let name = cache_arc_string!(intern_snake_case!(name));
47        let tags = defaults::default_tags(&name);
48
49        Self {
50            name,
51            meta: None,
52            inner: Statistic::default(),
53            tags,
54            dtype: DATA_TYPE_NULL,
55        }
56    }
57
58    #[inline(always)]
59    pub fn update_count(&self) -> usize {
60        self.meta.as_ref().map_or(0, |meta| meta.update_count)
61    }
62
63    #[inline(always)]
64    pub fn version(&self) -> u64 {
65        self.meta.as_ref().map_or(0, |meta| meta.version)
66    }
67
68    #[inline(always)]
69    pub fn set_version(&mut self, version: u64) {
70        if let Some(meta) = &mut self.meta {
71            if version != meta.version {
72                meta.update_count = 0;
73            }
74
75            meta.version = version;
76        } else {
77            self.meta = Some(Meta {
78                update_count: 0,
79                version,
80            });
81        }
82    }
83
84    pub fn dtype(&self) -> DataType {
85        match self.dtype {
86            DATA_TYPE_NULL => DataType::Null,
87            DATA_TYPE_FLOAT32 => DataType::Float32,
88            DATA_TYPE_DURATION => DataType::Duration,
89            DATA_TYPE_LIST => DataType::List(Box::new(DataType::Float32)),
90            _ => DataType::Null,
91        }
92    }
93
94    #[inline(always)]
95    pub fn tags(&self) -> Tag {
96        self.tags
97    }
98
99    #[inline(always)]
100    pub fn with_tag(mut self, tag: TagType) -> Self {
101        self.add_tag(tag);
102        self
103    }
104
105    #[inline(always)]
106    pub fn with_tags<T>(&mut self, tags: T)
107    where
108        T: Into<Tag>,
109    {
110        self.tags = tags.into();
111    }
112
113    #[inline(always)]
114    pub fn add_tags(&mut self, tags: Tag) {
115        self.tags = self.tags.union(tags);
116    }
117
118    #[inline(always)]
119    pub fn add_tag(&mut self, tag: TagType) {
120        self.tags.insert(tag);
121    }
122
123    pub fn contains_tag(&self, tag: &TagType) -> bool {
124        self.tags.has(*tag)
125    }
126
127    pub fn tags_iter(&self) -> impl Iterator<Item = TagType> {
128        self.tags.iter()
129    }
130
131    pub fn clear_values(&mut self) {
132        self.inner = Statistic::default();
133    }
134
135    pub fn stats<'a>(&'a self) -> Option<MetricView<'a, f32>> {
136        if !self.tags.has(TagType::Statistic) {
137            return None;
138        }
139
140        Some(MetricView {
141            name: &self.name,
142            statistic: &self.inner,
143            mapper: |v| Some(v),
144        })
145    }
146
147    pub fn times<'a>(&'a self) -> Option<MetricView<'a, Duration>> {
148        if !self.tags.has(TagType::Time) {
149            return None;
150        }
151
152        Some(MetricView {
153            name: &self.name,
154            statistic: &self.inner,
155            mapper: |v| Some(Duration::from_secs_f32(v)),
156        } )
157    }
158
159    pub fn distributions<'a>(&'a self) -> Option<MetricView<'a, f32>> {
160        if !self.tags.has(TagType::Distribution) {
161            return None;
162        }
163
164        Some(MetricView {
165            name: &self.name,
166            statistic: &self.inner,
167            mapper: |v| Some(v),
168         })
169    }
170
171    #[inline(always)]
172    pub fn upsert<'a>(mut self, update: impl Into<MetricUpdate<'a>>) -> Self {
173        self.apply_update(update);
174        self
175    }
176
177    #[inline(always)]
178    pub fn update_from(&mut self, other: Metric) {
179        // Kinda a hack to take advantage of the fact that if count == sum,
180        // we can just apply the sum directly instead of merging statistics - keeps things honest
181        // & avoids merging statistics when we don't have to (even though that's a fast operation).
182        if other.count() as f32 == other.sum() && !other.tags.has(TagType::Distribution) {
183            self.apply_update(other.sum());
184        } else {
185            self.apply_update(other.inner);
186        }
187
188        self.tags = self.tags.union(other.tags);
189    }
190
191    #[inline(always)]
192    pub fn apply_update<'a>(&mut self, update: impl Into<MetricUpdate<'a>>) {
193        let update = update.into();
194        match update {
195            MetricUpdate::Float(value) => {
196                self.update_statistic(value);
197            }
198            MetricUpdate::Usize(value) => {
199                self.update_statistic(value as f32);
200            }
201            MetricUpdate::Duration(value) => {
202                self.update_time_statistic(value);
203            }
204            MetricUpdate::UsizeDistribution(values) => {
205                self.update_statistic_from_iter(values.iter().map(|v| *v as f32));
206            }
207            MetricUpdate::Distribution(values) => {
208                self.update_statistic_from_iter(values.iter().cloned());
209            }
210            MetricUpdate::Statistic(stat) => {
211                self.inner.merge(&stat);                
212                self.meta.as_mut().map(|meta| meta.update_count += 1);
213                self.dtype = DATA_TYPE_FLOAT32;
214            }
215        }
216    }
217
218    fn update_statistic(&mut self, value: f32) {
219        self.inner.add(value);
220        self.add_tag(TagType::Statistic);
221        self.meta.as_mut().map(|meta| meta.update_count += 1);
222
223        if self.dtype == DATA_TYPE_NULL {
224            self.dtype = DATA_TYPE_FLOAT32;
225        }
226    }
227
228    fn update_time_statistic(&mut self, value: Duration) {
229        self.inner.add(value.as_secs_f32());
230        self.add_tag(TagType::Time);
231        self.meta.as_mut().map(|meta| meta.update_count += 1);
232
233        if self.dtype == DATA_TYPE_NULL {
234            self.dtype = DATA_TYPE_DURATION;
235        }
236    }
237
238    fn update_statistic_from_iter<I>(&mut self, values: I)
239    where
240        I: IntoIterator<Item = f32>,
241    {   
242        let mut values_count = 0;
243        let mut new_stat = Statistic::default();
244        for value in values {
245            new_stat.add(value);
246            values_count += 1;
247        }
248        
249        self.inner = new_stat;
250        self.meta.as_mut().map(|meta| meta.update_count += values_count);
251        self.add_tag(TagType::Distribution);
252
253        if self.dtype == DATA_TYPE_NULL {
254            self.dtype = DATA_TYPE_LIST;
255        }
256    }
257
258    pub fn statistic(&self) -> &Statistic {
259        &self.inner
260    }
261
262    pub fn name(&self) -> &str {
263        &self.name
264    }
265
266    pub fn last_value(&self) -> f32 {
267        self.inner.last_value()
268    }
269
270    pub fn count(&self) -> i32 {
271        self.inner.count()
272    }
273
274    pub fn mean(&self) -> f32 {
275        self.inner.mean()
276    }
277
278    pub fn var(&self) -> f32 {
279        self.inner.variance().unwrap_or(0.0)
280    }
281
282    pub fn stddev(&self) -> f32 {
283        self.inner.std_dev().unwrap_or(0.0)
284    }
285
286    pub fn skew(&self) -> f32 {
287        self.inner.skewness().unwrap_or(0.0)
288    }
289
290    pub fn min(&self) -> f32 {
291        self.inner.min()
292    }
293
294    pub fn max(&self) -> f32 {
295        self.inner.max()
296    }
297
298    pub fn sum(&self) -> f32 {
299        self.inner.sum()
300    }
301}
302
303impl Hash for Metric {
304    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
305        self.name.hash(state);
306        self.inner.hash(state);
307        self.tags.hash(state);
308    }
309}
310
311#[derive(Clone, PartialEq)]
312pub enum MetricUpdate<'a> {
313    Float(f32),
314    Usize(usize),
315    Duration(Duration),
316    Distribution(&'a [f32]),
317    UsizeDistribution(&'a [usize]),
318    Statistic(Statistic),
319}
320
321impl From<f32> for MetricUpdate<'_> {
322    fn from(value: f32) -> Self {
323        MetricUpdate::Float(value)
324    }
325}
326
327impl From<usize> for MetricUpdate<'_> {
328    fn from(value: usize) -> Self {
329        MetricUpdate::Usize(value)
330    }
331}
332
333impl From<Duration> for MetricUpdate<'_> {
334    fn from(value: Duration) -> Self {
335        MetricUpdate::Duration(value)
336    }
337}
338
339impl<'a> From<&'a [f32]> for MetricUpdate<'a> {
340    fn from(value: &'a [f32]) -> Self {
341        MetricUpdate::Distribution(value)
342    }
343}
344
345impl<'a> From<&'a Vec<f32>> for MetricUpdate<'a> {
346    fn from(value: &'a Vec<f32>) -> Self {
347        MetricUpdate::Distribution(value)
348    }
349}
350
351impl<'a> From<&'a Vec<usize>> for MetricUpdate<'a> {
352    fn from(value: &'a Vec<usize>) -> Self {
353        MetricUpdate::UsizeDistribution(value)
354    }
355}
356
357impl From<Statistic> for MetricUpdate<'_> {
358    fn from(value: Statistic) -> Self {
359        MetricUpdate::Statistic(value)
360    }
361}
362
363impl<'a> TryFrom<AnyValue<'a>> for MetricUpdate<'a> {
364    type Error = RadiateError;
365
366    fn try_from(value: AnyValue<'a>) -> Result<Self, Self::Error> {
367        match value {
368            AnyValue::UInt8(v) => Ok(MetricUpdate::Float(v as f32)),
369            AnyValue::UInt16(v) => Ok(MetricUpdate::Float(v as f32)),
370            AnyValue::UInt32(v) => Ok(MetricUpdate::Float(v as f32)),
371            AnyValue::UInt64(v) => Ok(MetricUpdate::Float(v as f32)),
372            AnyValue::UInt128(v) => Ok(MetricUpdate::Float(v as f32)),
373
374            AnyValue::Int8(v) => Ok(MetricUpdate::Float(v as f32)),
375            AnyValue::Int16(v) => Ok(MetricUpdate::Float(v as f32)),
376            AnyValue::Int32(v) => Ok(MetricUpdate::Float(v as f32)),
377            AnyValue::Int64(v) => Ok(MetricUpdate::Float(v as f32)),
378            AnyValue::Int128(v) => Ok(MetricUpdate::Float(v as f32)),
379
380            AnyValue::Float32(v) => Ok(MetricUpdate::Float(v)),
381            AnyValue::Float64(v) => Ok(MetricUpdate::Float(v as f32)),
382
383            AnyValue::Duration(v) => Ok(MetricUpdate::Duration(v)),
384
385            AnyValue::Slice(values) => {
386                let out = values
387                    .iter()
388                    .enumerate()
389                    .map(|(index, v)| {
390                        v.clone().extract::<f32>().ok_or(
391                            radiate_err!(
392                                Metric: 
393                                "cannot convert AnyValue sequence into MetricUpdate::Statistic: element at index {index} has non-numeric type `{}`", v.type_name()))
394                            
395                    })
396                    .collect::<Result<Statistic, _>>()?;
397
398                Ok(MetricUpdate::Statistic(out))
399            }
400
401            AnyValue::Vector(values) => {
402                let out = values
403                    .into_iter()
404                    .enumerate()
405                    .map(|(index, v)| {
406                        let ty = v.type_name();
407                        v.extract::<f32>()
408                            .ok_or(radiate_err!(
409                                Metric: 
410                                "cannot convert AnyValue sequence into MetricUpdate::Distribution: element at index {index} has non-numeric type `{ty}`"
411                            ))
412                    })
413                    .collect::<Result<Statistic, _>>()?;
414
415                Ok(MetricUpdate::Statistic(out))
416            }
417
418            other => Err(radiate_err!(Metric: "cannot convert AnyValue of type `{}` into MetricUpdate", other.type_name())),
419        }
420    }
421}
422
423impl std::fmt::Debug for Metric {
424    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425        write!(f, "Metric {{ name: {}, }}", self.name)
426    }
427}
428
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433
434    const EPSILON: f32 = 1e-5;
435
436    fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
437        (a - b).abs() <= eps
438    }
439
440    fn assert_stat_eq(m: &Metric, count: i32, mean: f32, var: f32, min: f32, max: f32) {
441        assert_eq!(m.count(), count);
442        assert!(approx_eq(m.mean(), mean, EPSILON), "mean");
443        assert!(approx_eq(m.var(), var, EPSILON), "var");
444        assert!(approx_eq(m.min(), min, EPSILON), "min");
445        assert!(approx_eq(m.max(), max, EPSILON), "max");
446    }
447
448    fn stats_of(values: &[f32]) -> (i32, f32, f32, f32, f32) {
449        // sample variance (n-1), matches your Statistic::variance
450        let n = values.len() as i32;
451        if n == 0 {
452            return (0, 0.0, f32::NAN, f32::INFINITY, f32::NEG_INFINITY);
453        }
454        let mean = values.iter().sum::<f32>() / values.len() as f32;
455
456        let mut m2 = 0.0_f32;
457        for &v in values {
458            let d = v - mean;
459            m2 += d * d;
460        }
461
462        let var = if n == 1 { 0.0 } else { m2 / (n as f32 - 1.0) };
463
464        let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
465        let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
466
467        (n, mean, var, min, max)
468    }
469
470    #[test]
471    fn test_metric() {
472        let mut metric = Metric::new("test");
473        metric.apply_update(1.0);
474        metric.apply_update(2.0);
475        metric.apply_update(3.0);
476        metric.apply_update(4.0);
477        metric.apply_update(5.0);
478
479        assert_eq!(metric.count(), 5);
480        assert_eq!(metric.last_value(), 5.0);
481        assert_eq!(metric.mean(), 3.0);
482        assert_eq!(metric.var(), 2.5);
483        assert_eq!(metric.stddev(), 1.5811388);
484        assert_eq!(metric.min(), 1.0);
485        assert_eq!(metric.max(), 5.0);
486        assert_eq!(metric.name(), "test");
487    }
488
489    #[test]
490    fn test_metric_labels() {
491        let mut metric = Metric::new("test");
492
493        metric.apply_update(1.0);
494        metric.apply_update(2.0);
495        metric.apply_update(3.0);
496        metric.apply_update(4.0);
497        metric.apply_update(5.0);
498
499        assert_eq!(metric.count(), 5);
500        assert_eq!(metric.last_value(), 5.0);
501        assert_eq!(metric.mean(), 3.0);
502        assert_eq!(metric.var(), 2.5);
503        assert_eq!(metric.stddev(), 1.5811388);
504        assert_eq!(metric.min(), 1.0);
505        assert_eq!(metric.max(), 5.0);
506    }
507
508    #[test]
509    fn distribution_tag_is_applied_on_any_slice_update() {
510        let mut m = Metric::new("scores");
511
512        // seed with scalar samples first (creates Statistic but not Distribution tag)
513        m.apply_update(1.0);
514        m.apply_update(2.0);
515        assert!(m.tags().has(TagType::Statistic));
516        assert!(!m.tags().has(TagType::Distribution));
517
518        // now apply a slice update - we expect Distribution tag to appear
519        m.apply_update(&[3.0, 4.0][..]);
520
521        assert!(
522            m.tags().has(TagType::Distribution),
523            "expected Distribution tag after slice update"
524        );
525    }
526
527    #[test]
528    fn metric_merge_matches_streaming_samples() {
529        let a = [1.0, 2.0, 3.0, 4.0];
530        let b = [10.0, 20.0, 30.0];
531
532        let mut m1 = Metric::new("x");
533        m1.apply_update(&a[..]);
534
535        let mut m2 = Metric::new("x");
536        m2.apply_update(&b[..]);
537
538        m1.update_from(m2);
539
540        let combined = [1.0, 2.0, 3.0, 4.0, 10.0, 20.0, 30.0];
541        let (n, mean, var, min, max) = stats_of(&combined);
542        assert_stat_eq(&m1, n, mean, var, min, max);
543    }
544}