Skip to main content

radiate_core/stats/
set.rs

1use crate::{
2    Metric, MetricUpdate,
3    stats::{
4        ExprSelector, Meta, Tag, TagType,
5        expression::{MetricField, MetricKind, SelectExpr},
6        fmt,
7    },
8};
9use radiate_utils::{AnyValue, SmallStr};
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12use std::{
13    collections::HashMap,
14    fmt::{Debug, Display},
15    time::Duration,
16};
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
19#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
20#[repr(transparent)]
21pub(crate) struct MetricIdx(u32);
22
23impl MetricIdx {
24    #[inline(always)]
25    pub(crate) const fn new(idx: u32) -> Self {
26        MetricIdx(idx)
27    }
28
29    #[inline(always)]
30    pub(crate) const fn as_usize(self) -> usize {
31        self.0 as usize
32    }
33}
34
35#[derive(PartialEq)]
36pub struct MetricSetSummary {
37    pub metrics: usize,
38    pub updates: f32,
39}
40
41#[derive(Clone, Default, PartialEq)]
42pub struct MetricSet {
43    metrics: Vec<Metric>,
44    name_lookup: HashMap<SmallStr, MetricIdx>,
45    meta: Meta,
46}
47
48impl MetricSet {
49    pub fn new() -> Self {
50        MetricSet {
51            metrics: Vec::new(),
52            name_lookup: HashMap::new(),
53            meta: Meta::default(),
54        }
55    }
56
57    pub fn bump(&mut self, generation: u64) {
58        self.meta.generation = generation;
59    }
60
61    pub fn generation(&self) -> u64 {
62        self.meta.generation
63    }
64
65    /// Resolve a name to a stable [`MetricIdx`], registering an empty metric if
66    /// the name has not been seen before. The returned handle is valid for the
67    /// lifetime of this `MetricSet`.
68    #[inline]
69    pub(crate) fn resolve(&mut self, name: impl AsRef<str>) -> MetricIdx {
70        if let Some(&idx) = self.name_lookup.get(name.as_ref()) {
71            return idx;
72        }
73
74        let idx = MetricIdx::new(self.metrics.len() as u32);
75        let name = SmallStr::from(name.as_ref());
76        self.name_lookup.insert(name.clone(), idx);
77        self.metrics.push(Metric::new(name));
78        idx
79    }
80
81    #[inline]
82    pub(crate) fn upsert_at<'a>(&mut self, idx: MetricIdx, update: impl Into<MetricUpdate<'a>>) {
83        let generation = self.meta.generation;
84        let mmetric = &mut self.metrics[idx.as_usize()];
85
86        mmetric.set_generation(generation);
87        mmetric.apply_update(update.into());
88
89        self.meta.update_count += 1;
90    }
91
92    #[inline(always)]
93    pub fn upsert<'a>(&mut self, key: impl AsRef<str>, metric: impl Into<MetricUpdate<'a>>) {
94        let metric_update = metric.into();
95        let idx = self.resolve(&key);
96        self.upsert_at(idx, metric_update);
97    }
98
99    #[inline(always)]
100    pub fn upsert_tagged<'a>(
101        &mut self,
102        key: impl AsRef<str>,
103        metric: impl Into<MetricUpdate<'a>>,
104        tag: TagType,
105    ) {
106        let metric_update = metric.into();
107        let idx = self.resolve(&key);
108        if let Some(metric) = self.metrics.get_mut(idx.as_usize()) {
109            metric.add_tag(tag);
110            self.upsert_at(idx, metric_update);
111        }
112    }
113
114    #[inline(always)]
115    pub fn keys(&self) -> impl Iterator<Item = SmallStr> {
116        self.metrics.iter().map(|m| m.name().clone())
117    }
118
119    #[inline(always)]
120    pub fn replace(&mut self, metric: impl Into<Metric>) {
121        let metric = metric.into();
122        if let Some(&idx) = self.name_lookup.get(metric.name().as_str()) {
123            self.metrics[idx.as_usize()] = metric;
124        } else {
125            let idx = MetricIdx::new(self.metrics.len() as u32);
126            self.name_lookup.insert(metric.name().clone(), idx);
127            self.metrics.push(metric);
128        }
129    }
130
131    #[inline(always)]
132    pub fn iter_tagged(&self, tag: TagType) -> impl Iterator<Item = (&str, &Metric)> {
133        self.metrics.iter().filter_map(move |m| {
134            if m.tags().has(tag) {
135                Some((m.name().as_str(), m))
136            } else {
137                None
138            }
139        })
140    }
141
142    #[inline(always)]
143    pub fn tags(&self) -> impl Iterator<Item = TagType> {
144        self.metrics
145            .iter()
146            .fold(Tag::empty(), |acc, m| acc.union(m.tags()))
147            .into_iter()
148    }
149
150    #[inline(always)]
151    pub fn iter(&self) -> impl Iterator<Item = (&str, &Metric)> {
152        self.metrics.iter().map(|m| (m.name().as_str(), m))
153    }
154
155    #[inline(always)]
156    pub fn add(&mut self, metric: Metric) {
157        self.replace(metric);
158    }
159
160    #[inline(always)]
161    pub fn get(&self, name: impl AsRef<str>) -> Option<&Metric> {
162        self.name_lookup
163            .get(name.as_ref())
164            .and_then(|idx| self.metrics.get(idx.as_usize()))
165    }
166
167    #[inline(always)]
168    pub fn clear(&mut self) {
169        for m in &mut self.metrics {
170            m.clear_values();
171        }
172        self.meta.update_count = 0;
173    }
174
175    #[inline(always)]
176    pub fn contains_key(&self, name: impl AsRef<str>) -> bool {
177        self.name_lookup.contains_key(name.as_ref())
178    }
179
180    pub fn remove_samples(&mut self) {
181        for m in &mut self.metrics {
182            if m.tags().has(TagType::Distribution) {
183                m.clear_samples();
184            }
185        }
186    }
187
188    #[inline(always)]
189    pub fn len(&self) -> usize {
190        self.metrics.len()
191    }
192
193    pub fn is_empty(&self) -> bool {
194        self.metrics.is_empty()
195    }
196
197    pub fn summary(&self) -> MetricSetSummary {
198        MetricSetSummary {
199            metrics: self.metrics.len(),
200            updates: self.meta.update_count as f32,
201        }
202    }
203
204    pub fn dashboard(&self) -> String {
205        fmt::render_full(self).unwrap_or_default()
206    }
207}
208
209impl ExprSelector for MetricSet {
210    fn select(&self, sel: &SelectExpr) -> AnyValue<'static> {
211        // Missing metrics return Null so downstream math can propagate it; the
212        // outer Clamp (or any consumer using non-finite fallback) then takes the
213        // floor instead of the engine seeing an unrelated error.
214        let Some(metric) = sel.metric.as_ref().and_then(|name| self.get(name.as_str())) else {
215            return AnyValue::Null;
216        };
217
218        let wrap = |v: f32| match sel.kind {
219            MetricKind::Value => AnyValue::Float32(v),
220            MetricKind::Duration => AnyValue::Duration(Duration::from_secs_f32(v)),
221        };
222
223        match sel.field {
224            MetricField::LastValue => wrap(metric.last_value()),
225            MetricField::Mean => wrap(metric.mean()),
226            MetricField::StdDev => wrap(metric.stddev()),
227            MetricField::Min => wrap(metric.min()),
228            MetricField::Max => wrap(metric.max()),
229            MetricField::Sum => wrap(metric.sum()),
230            MetricField::Var => wrap(metric.var()),
231            MetricField::Skew => AnyValue::Float32(metric.skew()),
232            MetricField::Count => AnyValue::UInt64(metric.count() as u64),
233            MetricField::Generation => AnyValue::UInt64(metric.generation()),
234            MetricField::UpdateCount => AnyValue::UInt64(metric.update_count() as u64),
235        }
236    }
237}
238
239impl Display for MetricSet {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        let summary = self.summary();
242        let out = format!(
243            "[{} metrics, {:.0} updates]",
244            summary.metrics, summary.updates
245        );
246        write!(f, "{out}\n{}", fmt::render_full(self).unwrap_or_default())?;
247        Ok(())
248    }
249}
250
251impl Debug for MetricSet {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        writeln!(f, "MetricSet {{")?;
254        writeln!(f, "{}", fmt::render_dashboard(self).unwrap_or_default())?;
255        write!(f, "}}")
256    }
257}
258
259#[cfg(feature = "serde")]
260impl Serialize for MetricSet {
261    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
262    where
263        S: serde::Serializer,
264    {
265        self.metrics.serialize(serializer)
266    }
267}
268
269#[cfg(feature = "serde")]
270impl<'de> Deserialize<'de> for MetricSet {
271    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
272    where
273        D: serde::Deserializer<'de>,
274    {
275        let metrics = Vec::<Metric>::deserialize(deserializer)?;
276        let mut by_name = HashMap::with_capacity(metrics.len());
277        for (i, m) in metrics.iter().enumerate() {
278            by_name.insert(m.name().clone(), MetricIdx::new(i as u32));
279        }
280        Ok(MetricSet {
281            metrics,
282            name_lookup: by_name,
283            meta: Meta::default(),
284        })
285    }
286}
287
288#[derive(Debug)]
289pub enum MetricSetUpdate<'a> {
290    Single(SmallStr, MetricUpdate<'a>, Option<TagType>),
291}
292
293impl<'a, N, U> From<(N, U)> for MetricSetUpdate<'a>
294where
295    N: Into<SmallStr>,
296    U: Into<MetricUpdate<'a>>,
297{
298    fn from((name, update): (N, U)) -> Self {
299        MetricSetUpdate::Single(name.into(), update.into(), None)
300    }
301}
302
303impl<'a, N, U> From<(TagType, N, U)> for MetricSetUpdate<'a>
304where
305    N: Into<SmallStr>,
306    U: Into<MetricUpdate<'a>>,
307{
308    fn from((tag, name, update): (TagType, N, U)) -> Self {
309        MetricSetUpdate::Single(name.into(), update.into(), Some(tag))
310    }
311}
312
313impl<'a, N, U> From<(N, U, usize)> for MetricSetUpdate<'a>
314where
315    N: AsRef<str>,
316    U: Into<MetricUpdate<'a>>,
317{
318    fn from((name, update, count): (N, U, usize)) -> Self {
319        let name: SmallStr = format!("{}.{}", name.as_ref(), count).into();
320        MetricSetUpdate::Single(name, update.into(), None)
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn resolve_returns_stable_handle() {
330        let mut set = MetricSet::new();
331        let name = SmallStr::from_static("test.metric");
332
333        let idx1 = set.resolve(&name);
334        let idx2 = set.resolve(&name);
335        assert_eq!(idx1, idx2);
336
337        set.upsert_at(idx1, 1.0);
338        set.upsert_at(idx1, 2.0);
339        set.upsert_at(idx1, 3.0);
340
341        let m = set.get(name.as_str()).unwrap();
342        assert_eq!(m.count(), 3);
343        assert_eq!(m.sum(), 6.0);
344    }
345
346    #[test]
347    fn resolve_assigns_sequential_indices() {
348        let mut set = MetricSet::new();
349        let a = set.resolve(&SmallStr::from_static("a"));
350        let b = set.resolve(&SmallStr::from_static("b"));
351        let c = set.resolve(&SmallStr::from_static("c"));
352        assert_eq!(a.as_usize(), 0);
353        assert_eq!(b.as_usize(), 1);
354        assert_eq!(c.as_usize(), 2);
355    }
356}