1use crate::{
2 Metric, MetricUpdate,
3 stats::{
4 ExprSelector, Meta, Tag, TagType,
5 expression::{MetricField, MetricKind, SelectExpr},
6 fmt,
7 },
8};
9use radiate_utils::{AnyValue, SmallStr};
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12use std::{
13 collections::HashMap,
14 fmt::{Debug, Display},
15 time::Duration,
16};
17
18#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
19#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
20#[repr(transparent)]
21pub(crate) struct MetricIdx(u32);
22
23impl MetricIdx {
24 #[inline(always)]
25 pub(crate) const fn new(idx: u32) -> Self {
26 MetricIdx(idx)
27 }
28
29 #[inline(always)]
30 pub(crate) const fn as_usize(self) -> usize {
31 self.0 as usize
32 }
33}
34
35#[derive(PartialEq)]
36pub struct MetricSetSummary {
37 pub metrics: usize,
38 pub updates: f32,
39}
40
41#[derive(Clone, Default, PartialEq)]
42pub struct MetricSet {
43 metrics: Vec<Metric>,
44 name_lookup: HashMap<SmallStr, MetricIdx>,
45 meta: Meta,
46}
47
48impl MetricSet {
49 pub fn new() -> Self {
50 MetricSet {
51 metrics: Vec::new(),
52 name_lookup: HashMap::new(),
53 meta: Meta::default(),
54 }
55 }
56
57 pub fn bump(&mut self, generation: u64) {
58 self.meta.generation = generation;
59 }
60
61 pub fn generation(&self) -> u64 {
62 self.meta.generation
63 }
64
65 #[inline]
69 pub(crate) fn resolve(&mut self, name: impl AsRef<str>) -> MetricIdx {
70 if let Some(&idx) = self.name_lookup.get(name.as_ref()) {
71 return idx;
72 }
73
74 let idx = MetricIdx::new(self.metrics.len() as u32);
75 let name = SmallStr::from(name.as_ref());
76 self.name_lookup.insert(name.clone(), idx);
77 self.metrics.push(Metric::new(name));
78 idx
79 }
80
81 #[inline]
82 pub(crate) fn upsert_at<'a>(&mut self, idx: MetricIdx, update: impl Into<MetricUpdate<'a>>) {
83 let generation = self.meta.generation;
84 let mmetric = &mut self.metrics[idx.as_usize()];
85
86 mmetric.set_generation(generation);
87 mmetric.apply_update(update.into());
88
89 self.meta.update_count += 1;
90 }
91
92 #[inline(always)]
93 pub fn upsert<'a>(&mut self, key: impl AsRef<str>, metric: impl Into<MetricUpdate<'a>>) {
94 let metric_update = metric.into();
95 let idx = self.resolve(&key);
96 self.upsert_at(idx, metric_update);
97 }
98
99 #[inline(always)]
100 pub fn upsert_tagged<'a>(
101 &mut self,
102 key: impl AsRef<str>,
103 metric: impl Into<MetricUpdate<'a>>,
104 tag: TagType,
105 ) {
106 let metric_update = metric.into();
107 let idx = self.resolve(&key);
108 if let Some(metric) = self.metrics.get_mut(idx.as_usize()) {
109 metric.add_tag(tag);
110 self.upsert_at(idx, metric_update);
111 }
112 }
113
114 #[inline(always)]
115 pub fn keys(&self) -> impl Iterator<Item = SmallStr> {
116 self.metrics.iter().map(|m| m.name().clone())
117 }
118
119 #[inline(always)]
120 pub fn replace(&mut self, metric: impl Into<Metric>) {
121 let metric = metric.into();
122 if let Some(&idx) = self.name_lookup.get(metric.name().as_str()) {
123 self.metrics[idx.as_usize()] = metric;
124 } else {
125 let idx = MetricIdx::new(self.metrics.len() as u32);
126 self.name_lookup.insert(metric.name().clone(), idx);
127 self.metrics.push(metric);
128 }
129 }
130
131 #[inline(always)]
132 pub fn iter_tagged(&self, tag: TagType) -> impl Iterator<Item = (&str, &Metric)> {
133 self.metrics.iter().filter_map(move |m| {
134 if m.tags().has(tag) {
135 Some((m.name().as_str(), m))
136 } else {
137 None
138 }
139 })
140 }
141
142 #[inline(always)]
143 pub fn tags(&self) -> impl Iterator<Item = TagType> {
144 self.metrics
145 .iter()
146 .fold(Tag::empty(), |acc, m| acc.union(m.tags()))
147 .into_iter()
148 }
149
150 #[inline(always)]
151 pub fn iter(&self) -> impl Iterator<Item = (&str, &Metric)> {
152 self.metrics.iter().map(|m| (m.name().as_str(), m))
153 }
154
155 #[inline(always)]
156 pub fn add(&mut self, metric: Metric) {
157 self.replace(metric);
158 }
159
160 #[inline(always)]
161 pub fn get(&self, name: impl AsRef<str>) -> Option<&Metric> {
162 self.name_lookup
163 .get(name.as_ref())
164 .and_then(|idx| self.metrics.get(idx.as_usize()))
165 }
166
167 #[inline(always)]
168 pub fn clear(&mut self) {
169 for m in &mut self.metrics {
170 m.clear_values();
171 }
172 self.meta.update_count = 0;
173 }
174
175 #[inline(always)]
176 pub fn contains_key(&self, name: impl AsRef<str>) -> bool {
177 self.name_lookup.contains_key(name.as_ref())
178 }
179
180 pub fn remove_samples(&mut self) {
181 for m in &mut self.metrics {
182 if m.tags().has(TagType::Distribution) {
183 m.clear_samples();
184 }
185 }
186 }
187
188 #[inline(always)]
189 pub fn len(&self) -> usize {
190 self.metrics.len()
191 }
192
193 pub fn is_empty(&self) -> bool {
194 self.metrics.is_empty()
195 }
196
197 pub fn summary(&self) -> MetricSetSummary {
198 MetricSetSummary {
199 metrics: self.metrics.len(),
200 updates: self.meta.update_count as f32,
201 }
202 }
203
204 pub fn dashboard(&self) -> String {
205 fmt::render_full(self).unwrap_or_default()
206 }
207}
208
209impl ExprSelector for MetricSet {
210 fn select(&self, sel: &SelectExpr) -> AnyValue<'static> {
211 let Some(metric) = sel.metric.as_ref().and_then(|name| self.get(name.as_str())) else {
215 return AnyValue::Null;
216 };
217
218 let wrap = |v: f32| match sel.kind {
219 MetricKind::Value => AnyValue::Float32(v),
220 MetricKind::Duration => AnyValue::Duration(Duration::from_secs_f32(v)),
221 };
222
223 match sel.field {
224 MetricField::LastValue => wrap(metric.last_value()),
225 MetricField::Mean => wrap(metric.mean()),
226 MetricField::StdDev => wrap(metric.stddev()),
227 MetricField::Min => wrap(metric.min()),
228 MetricField::Max => wrap(metric.max()),
229 MetricField::Sum => wrap(metric.sum()),
230 MetricField::Var => wrap(metric.var()),
231 MetricField::Skew => AnyValue::Float32(metric.skew()),
232 MetricField::Count => AnyValue::UInt64(metric.count() as u64),
233 MetricField::Generation => AnyValue::UInt64(metric.generation()),
234 MetricField::UpdateCount => AnyValue::UInt64(metric.update_count() as u64),
235 }
236 }
237}
238
239impl Display for MetricSet {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 let summary = self.summary();
242 let out = format!(
243 "[{} metrics, {:.0} updates]",
244 summary.metrics, summary.updates
245 );
246 write!(f, "{out}\n{}", fmt::render_full(self).unwrap_or_default())?;
247 Ok(())
248 }
249}
250
251impl Debug for MetricSet {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 writeln!(f, "MetricSet {{")?;
254 writeln!(f, "{}", fmt::render_dashboard(self).unwrap_or_default())?;
255 write!(f, "}}")
256 }
257}
258
259#[cfg(feature = "serde")]
260impl Serialize for MetricSet {
261 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
262 where
263 S: serde::Serializer,
264 {
265 self.metrics.serialize(serializer)
266 }
267}
268
269#[cfg(feature = "serde")]
270impl<'de> Deserialize<'de> for MetricSet {
271 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
272 where
273 D: serde::Deserializer<'de>,
274 {
275 let metrics = Vec::<Metric>::deserialize(deserializer)?;
276 let mut by_name = HashMap::with_capacity(metrics.len());
277 for (i, m) in metrics.iter().enumerate() {
278 by_name.insert(m.name().clone(), MetricIdx::new(i as u32));
279 }
280 Ok(MetricSet {
281 metrics,
282 name_lookup: by_name,
283 meta: Meta::default(),
284 })
285 }
286}
287
288#[derive(Debug)]
289pub enum MetricSetUpdate<'a> {
290 Single(SmallStr, MetricUpdate<'a>, Option<TagType>),
291}
292
293impl<'a, N, U> From<(N, U)> for MetricSetUpdate<'a>
294where
295 N: Into<SmallStr>,
296 U: Into<MetricUpdate<'a>>,
297{
298 fn from((name, update): (N, U)) -> Self {
299 MetricSetUpdate::Single(name.into(), update.into(), None)
300 }
301}
302
303impl<'a, N, U> From<(TagType, N, U)> for MetricSetUpdate<'a>
304where
305 N: Into<SmallStr>,
306 U: Into<MetricUpdate<'a>>,
307{
308 fn from((tag, name, update): (TagType, N, U)) -> Self {
309 MetricSetUpdate::Single(name.into(), update.into(), Some(tag))
310 }
311}
312
313impl<'a, N, U> From<(N, U, usize)> for MetricSetUpdate<'a>
314where
315 N: AsRef<str>,
316 U: Into<MetricUpdate<'a>>,
317{
318 fn from((name, update, count): (N, U, usize)) -> Self {
319 let name: SmallStr = format!("{}.{}", name.as_ref(), count).into();
320 MetricSetUpdate::Single(name, update.into(), None)
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn resolve_returns_stable_handle() {
330 let mut set = MetricSet::new();
331 let name = SmallStr::from_static("test.metric");
332
333 let idx1 = set.resolve(&name);
334 let idx2 = set.resolve(&name);
335 assert_eq!(idx1, idx2);
336
337 set.upsert_at(idx1, 1.0);
338 set.upsert_at(idx1, 2.0);
339 set.upsert_at(idx1, 3.0);
340
341 let m = set.get(name.as_str()).unwrap();
342 assert_eq!(m.count(), 3);
343 assert_eq!(m.sum(), 6.0);
344 }
345
346 #[test]
347 fn resolve_assigns_sequential_indices() {
348 let mut set = MetricSet::new();
349 let a = set.resolve(&SmallStr::from_static("a"));
350 let b = set.resolve(&SmallStr::from_static("b"));
351 let c = set.resolve(&SmallStr::from_static("c"));
352 assert_eq!(a.as_usize(), 0);
353 assert_eq!(b.as_usize(), 1);
354 assert_eq!(c.as_usize(), 2);
355 }
356}