Skip to main content

radiate_core/stats/
set.rs

1use crate::{
2    Metric, MetricUpdate,
3    stats::{Meta, Tag, TagType, defaults::try_add_tag_from_str, fmt},
4};
5use radiate_expr::{AnyValue, ApplyExpr, DataType, Expr, ExprProjection, ExprQuery, SelectExpr};
6use radiate_utils::intern;
7#[cfg(feature = "serde")]
8use serde::{Deserialize, Serialize};
9use std::{
10    collections::HashMap,
11    fmt::{Debug, Display},
12    time::Duration,
13};
14
15#[derive(PartialEq)]
16pub struct MetricSetSummary {
17    pub metrics: usize,
18    pub updates: f32,
19}
20
21#[derive(Clone, Default, PartialEq)]
22pub struct MetricSet {
23    metrics: HashMap<&'static str, Metric>,
24    meta: Meta,
25}
26
27impl MetricSet {
28    pub fn new() -> Self {
29        MetricSet {
30            metrics: HashMap::new(),
31            meta: Meta::default(),
32        }
33    }
34
35    pub fn next_version(&mut self) -> u64 {
36        let result = self.meta.version;
37        self.meta.version += 1;
38        result
39    }
40
41    pub fn version(&self) -> u64 {
42        self.meta.version
43    }
44
45    #[inline(always)]
46    pub fn keys(&self) -> Vec<&'static str> {
47        self.metrics.keys().cloned().collect()
48    }
49
50    #[inline(always)]
51    pub fn flush_all_into(&mut self, target: &mut MetricSet) {
52        let version = target.next_version();
53        for (key, mut m) in self.metrics.drain() {
54            m.set_version(version);
55            if let Some(target_metric) = target.metrics.get_mut(key) {
56                target_metric.update_from(m);
57            } else {
58                try_add_tag_from_str(&mut m);
59                target.metrics.insert(key, m);
60            }
61        }
62
63        self.clear();
64    }
65
66    #[inline(always)]
67    pub fn replace(&mut self, metric: impl Into<Metric>) {
68        let mut metric = metric.into();
69        try_add_tag_from_str(&mut metric);
70        self.metrics.insert(intern!(metric.name()), metric);
71    }
72
73    #[inline(always)]
74    pub fn upsert<'a>(&mut self, metric: impl Into<MetricSetUpdate<'a>>) {
75        let update = metric.into();
76        let version = self.version();
77
78        match update {
79            MetricSetUpdate::Many(metrics) => {
80                for metric in metrics {
81                    self.add_or_update_internal(version, metric);
82                }
83            }
84            MetricSetUpdate::Single(metric) => {
85                self.add_or_update_internal(version, metric);
86            }
87            MetricSetUpdate::ManyUpdate(updates) => {
88                for metric in updates {
89                    self.upsert(metric);
90                }
91            }
92            MetricSetUpdate::NamedSingle(name, metric_update, tag) => {
93                self.meta.update_count += 1;
94                if let Some(m) = self.metrics.get_mut(name) {
95                    m.set_version(version);
96                    m.apply_update(metric_update);
97                    if let Some(tag) = tag {
98                        m.add_tag(tag);
99                    }
100                    return;
101                }
102
103                let new_name = radiate_utils::intern_name_as_snake_case(name);
104                if let Some(m) = self.metrics.get_mut(&new_name) {
105                    m.set_version(version);
106                    m.apply_update(metric_update);
107                    if let Some(tag) = tag {
108                        m.add_tag(tag);
109                    }
110                } else {
111                    let mut metric = Metric::new(new_name);
112                    try_add_tag_from_str(&mut metric);
113                    metric.set_version(version);
114                    metric.apply_update(metric_update);
115
116                    if let Some(tag) = tag {
117                        metric.add_tag(tag);
118                    }
119
120                    self.add(metric);
121                }
122            }
123        }
124    }
125
126    #[inline(always)]
127    pub fn iter_tagged<'a>(
128        &'a self,
129        tag: TagType,
130    ) -> impl Iterator<Item = (&'static str, &'a Metric)> {
131        self.metrics.iter().filter_map(move |(k, m)| {
132            if m.tags().has(tag) {
133                Some((*k, m))
134            } else {
135                None
136            }
137        })
138    }
139
140    #[inline(always)]
141    pub fn tags(&self) -> impl Iterator<Item = TagType> {
142        self.metrics
143            .values()
144            .fold(Tag::empty(), |acc, m| acc.union(m.tags()))
145            .into_iter()
146    }
147
148    #[inline(always)]
149    pub fn iter(&self) -> impl Iterator<Item = (&'static str, &Metric)> {
150        self.metrics.iter().map(|(name, metric)| (*name, metric))
151    }
152
153    #[inline(always)]
154    pub fn add(&mut self, metric: Metric) {
155        self.metrics.insert(intern!(metric.name()), metric);
156    }
157
158    #[inline(always)]
159    pub fn get(&self, name: &str) -> Option<&Metric> {
160        self.metrics.get(name)
161    }
162
163    #[inline(always)]
164    pub fn get_from_string(&self, name: String) -> Option<&Metric> {
165        self.metrics.get(name.as_str())
166    }
167
168    #[inline(always)]
169    pub fn clear(&mut self) {
170        for (_, m) in self.metrics.iter_mut() {
171            m.clear_values();
172        }
173
174        self.meta.update_count = 0;
175    }
176
177    #[inline(always)]
178    pub fn contains_key(&self, name: &str) -> bool {
179        self.metrics.contains_key(intern!(name))
180    }
181
182    #[inline(always)]
183    pub fn len(&self) -> usize {
184        self.metrics.len()
185    }
186
187    #[inline(always)]
188    pub fn summary(&self) -> MetricSetSummary {
189        MetricSetSummary {
190            metrics: self.metrics.len(),
191            updates: self.meta.update_count as f32,
192        }
193    }
194
195    pub fn dashboard(&self) -> String {
196        fmt::render_full(self).unwrap_or_default()
197    }
198
199    fn add_or_update_internal(&mut self, version: u64, mut metric: Metric) {
200        self.meta.update_count += 1;
201        if let Some(existing) = self.metrics.get_mut(metric.name()) {
202            existing.set_version(version);
203            existing.update_from(metric);
204        } else {
205            try_add_tag_from_str(&mut metric);
206            metric.set_version(version);
207            self.metrics.insert(intern!(metric.name()), metric);
208        }
209    }
210}
211
212impl<'a> ApplyExpr<'a> for MetricSet {
213    fn apply(&self, expr: &'a mut Expr) -> AnyValue<'a> {
214        expr.dispatch(self).unwrap()
215    }
216}
217
218impl ExprProjection for MetricSet {
219    fn project(&self, path: &SelectExpr) -> Option<AnyValue<'static>> {
220        let value_to_float32 = |value: f32| AnyValue::Float32(value);
221
222        let value_to_duration = |value: f32| Duration::from_secs_f32(value).into();
223
224        let SelectExpr::Field(key, field) = path else {
225            return None;
226        };
227
228        let str_key = key.as_str()?;
229
230        self.get(str_key)
231            .map(|metric| match field.dtype() {
232                DataType::Float32 => match field.name().to_lowercase().as_str() {
233                    "last_value" => AnyValue::Float32(metric.last_value()),
234                    "mean" => value_to_float32(metric.mean()),
235                    "std_dev" => value_to_float32(metric.stddev()),
236                    "min" => value_to_float32(metric.min()),
237                    "max" => value_to_float32(metric.max()),
238                    "sum" => value_to_float32(metric.sum()),
239                    "skew" => value_to_float32(metric.skew()),
240                    "var" => value_to_float32(metric.var()),
241                    "count" => AnyValue::UInt64(metric.count() as u64),
242                    "version" => AnyValue::UInt64(metric.version()),
243                    "update_count" => AnyValue::UInt64(metric.update_count() as u64),
244                    _ => AnyValue::Null,
245                },
246                DataType::Duration => match field.name().to_lowercase().as_str() {
247                    "last_value" => {
248                        AnyValue::Duration(Duration::from_secs_f32(metric.last_value()))
249                    }
250                    "mean" => value_to_duration(metric.mean()),
251                    "std_dev" => value_to_duration(metric.stddev()),
252                    "min" => value_to_duration(metric.min()),
253                    "max" => value_to_duration(metric.max()),
254                    "sum" => value_to_duration(metric.sum()),
255                    "var" => value_to_duration(metric.var()),
256                    "count" => AnyValue::UInt64(metric.count() as u64),
257                    "version" => AnyValue::UInt64(metric.version()),
258                    "update_count" => AnyValue::UInt64(metric.update_count() as u64),
259                    _ => AnyValue::Null,
260                },
261                _ => AnyValue::Null,
262            })
263            .or_else(|| Some(AnyValue::Null))
264    }
265}
266
267impl Display for MetricSet {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        let summary = self.summary();
270        let out = format!(
271            "[{} metrics, {:.0} updates]",
272            summary.metrics, summary.updates
273        );
274        write!(f, "{out}\n{}", fmt::render_full(self).unwrap_or_default())?;
275        Ok(())
276    }
277}
278
279impl Debug for MetricSet {
280    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281        write!(f, "MetricSet {{\n")?;
282        write!(f, "{}\n", fmt::render_dashboard(&self).unwrap_or_default())?;
283        write!(f, "}}")
284    }
285}
286
287#[cfg(feature = "serde")]
288impl Serialize for MetricSet {
289    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290    where
291        S: serde::Serializer,
292    {
293        let metrics = self
294            .metrics
295            .iter()
296            .map(|(_, metric)| metric.clone())
297            .collect::<Vec<Metric>>();
298        metrics.serialize(serializer)
299    }
300}
301
302#[cfg(feature = "serde")]
303impl<'de> Deserialize<'de> for MetricSet {
304    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
305    where
306        D: serde::Deserializer<'de>,
307    {
308        let metrics = Vec::<Metric>::deserialize(deserializer)?;
309
310        let mut metric_set = MetricSet::new();
311        for metric in metrics {
312            metric_set.add(metric);
313        }
314
315        Ok(metric_set)
316    }
317}
318
319pub enum MetricSetUpdate<'a> {
320    Many(Vec<Metric>),
321    Single(Metric),
322    ManyUpdate(Vec<(&'static str, MetricUpdate<'a>)>),
323    NamedSingle(&'static str, MetricUpdate<'a>, Option<TagType>),
324}
325
326impl From<Vec<Metric>> for MetricSetUpdate<'_> {
327    fn from(metrics: Vec<Metric>) -> Self {
328        MetricSetUpdate::Many(metrics)
329    }
330}
331
332impl From<Metric> for MetricSetUpdate<'_> {
333    fn from(metric: Metric) -> Self {
334        MetricSetUpdate::Single(metric)
335    }
336}
337
338impl<'a, U> From<(&'static str, U)> for MetricSetUpdate<'a>
339where
340    U: Into<MetricUpdate<'a>>,
341{
342    fn from((name, update): (&'static str, U)) -> Self {
343        MetricSetUpdate::NamedSingle(name, update.into(), None)
344    }
345}
346
347impl<'a, U> From<(TagType, &'static str, U)> for MetricSetUpdate<'a>
348where
349    U: Into<MetricUpdate<'a>>,
350{
351    fn from((tag, name, update): (TagType, &'static str, U)) -> Self {
352        MetricSetUpdate::NamedSingle(name, update.into(), Some(tag))
353    }
354}
355
356impl<'a, U> From<(&'static str, U, usize)> for MetricSetUpdate<'a>
357where
358    U: Into<MetricUpdate<'a>>,
359{
360    fn from((name, update, count): (&'static str, U, usize)) -> Self {
361        let name = radiate_utils::intern!(format!("{name}.{count}"));
362        MetricSetUpdate::NamedSingle(name, update.into(), None)
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    const EPSILON: f32 = 1e-5;
371
372    fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
373        (a - b).abs() <= eps
374    }
375
376    fn assert_stat_eq(m: &Metric, count: i32, mean: f32, var: f32, min: f32, max: f32) {
377        assert_eq!(m.count(), count);
378        assert!(approx_eq(m.mean(), mean, EPSILON), "mean");
379        assert!(approx_eq(m.var(), var, EPSILON), "var");
380        assert!(approx_eq(m.min(), min, EPSILON), "min");
381        assert!(approx_eq(m.max(), max, EPSILON), "max");
382    }
383
384    fn stats_of(values: &[f32]) -> (i32, f32, f32, f32, f32) {
385        // sample variance (n-1), matches your Statistic::variance
386        let n = values.len() as i32;
387        if n == 0 {
388            return (0, 0.0, f32::NAN, f32::INFINITY, f32::NEG_INFINITY);
389        }
390        let mean = values.iter().sum::<f32>() / values.len() as f32;
391
392        let mut m2 = 0.0_f32;
393        for &v in values {
394            let d = v - mean;
395            m2 += d * d;
396        }
397
398        let var = if n == 1 { 0.0 } else { m2 / (n as f32 - 1.0) };
399
400        let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
401        let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
402
403        (n, mean, var, min, max)
404    }
405
406    #[test]
407    fn metric_set_flush_all_into_merges_metrics() {
408        let mut a = MetricSet::new();
409        let mut b = MetricSet::new();
410
411        a.upsert(("scores", &[1.0, 2.0, 3.0][..]));
412        b.upsert(("scores", &[10.0, 20.0][..]));
413
414        // move a into b
415        a.flush_all_into(&mut b);
416
417        let m = b.get("scores").unwrap();
418        let combined = [1.0, 2.0, 3.0, 10.0, 20.0];
419        let (n, mean, var, min, max) = stats_of(&combined);
420        assert_stat_eq(m, n, mean, var, min, max);
421    }
422}