1use crate::stats::{MetricView, Tag, TagType, defaults};
2use radiate_error::{RadiateError, radiate_err};
3use radiate_utils::{
4 AnyValue, DataType, SmallStr, Statistic
5};
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8use std::{hash::Hash, time::Duration};
9
10const DTYPE_NULL: u8 = 0;
11const DTYPE_FLOAT32: u8 = 1;
12const DTYPE_DURATION: u8 = 2;
13const DTYPE_LIST: u8 = 3;
14
15#[macro_export]
16macro_rules! metric {
17 ($name:expr, $update:expr) => {{
18 let mut metric = $crate::Metric::new($name);
19 metric.apply_update($update);
20 metric
21 }};
22 ($name:expr) => {{ $crate::Metric::new($name).upsert(1) }};
23}
24
25
26#[derive(Clone, PartialEq, Default)]
27#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
28pub(super) struct Meta {
29 pub(super) update_count: usize,
30 pub(super) generation: u64,
31}
32
33#[derive(Clone, PartialEq, Default)]
34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
35pub struct Metric {
36 name: SmallStr,
37 inner: Statistic,
38 samples: Option<Vec<f32>>,
39 meta: Meta,
40 tags: Tag,
41 dtype: u8,
42}
43
44impl Metric {
45 pub fn new(name: impl Into<SmallStr>) -> Self {
46 let name = name.into();
47 let tags = defaults::default_tags(&name);
48
49 Self {
50 name,
51 inner: Statistic::default(),
52 meta: Meta::default(),
53 samples: None,
54 tags,
55 dtype: DTYPE_NULL,
56 }
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.meta.update_count == 0 && self.inner.count() == 0
61 }
62
63 #[inline(always)]
64 pub fn update_count(&self) -> usize {
65 self.meta.update_count
66 }
67
68 #[inline(always)]
69 pub fn generation(&self) -> u64 {
70 self.meta.generation
71 }
72
73 #[inline(always)]
74 pub fn set_generation(&mut self, generation: u64) {
75 if generation != self.meta.generation {
76 self.meta.update_count = 0;
77 }
78
79 self.meta.generation = generation;
80 }
81
82 pub fn dtype(&self) -> DataType {
83 match self.dtype {
84 DTYPE_NULL => DataType::Null,
85 DTYPE_FLOAT32 => DataType::Float32,
86 DTYPE_DURATION => DataType::Duration,
87 DTYPE_LIST => DataType::List(Box::new(DataType::Float32)),
88 _ => DataType::Null,
89 }
90 }
91
92 #[inline(always)]
93 pub fn tags(&self) -> Tag {
94 self.tags
95 }
96
97 #[inline(always)]
98 pub fn add_tag(&mut self, tag: TagType) {
99 self.tags.insert(tag);
100 }
101
102 pub fn iter_tags(&self) -> impl Iterator<Item = TagType> {
103 self.tags.iter()
104 }
105
106 pub fn clear_values(&mut self) {
107 self.inner = Statistic::default();
108 self.samples = None;
109 }
110
111 pub fn stats<'a>(&'a self) -> Option<MetricView<'a, f32>> {
112 if !self.tags.has(TagType::Statistic) {
113 return None;
114 }
115
116 Some(MetricView {
117 name: &self.name,
118 statistic: &self.inner,
119 samples: self.samples.as_deref(),
120 mapper: |v| v,
121 })
122 }
123
124 pub fn times<'a>(&'a self) -> Option<MetricView<'a, Duration>> {
125 if !self.tags.has(TagType::Time) {
126 return None;
127 }
128
129 Some(MetricView {
130 name: &self.name,
131 statistic: &self.inner,
132 samples: self.samples.as_deref(),
133 mapper: |v| Duration::from_secs_f32(v),
134 })
135 }
136
137 pub fn distributions<'a>(&'a self) -> Option<MetricView<'a, f32>> {
138 if !self.tags.has(TagType::Distribution) {
139 return None;
140 }
141
142 Some(MetricView {
143 name: &self.name,
144 statistic: &self.inner,
145 samples: self.samples.as_deref(),
146 mapper: |v| v,
147 })
148 }
149
150 #[inline(always)]
151 pub fn upsert<'a>(mut self, update: impl Into<MetricUpdate<'a>>) -> Self {
152 self.apply_update(update);
153 self
154 }
155
156 #[inline(always)]
157 pub fn update_from(&mut self, other: Metric) {
158 if other.count() as f32 == other.sum() && !other.tags.has(TagType::Distribution) {
162 self.apply_update(other.sum());
163 } else {
164 self.apply_update(other.inner);
165 }
166
167 self.tags = self.tags.union(other.tags);
168 }
169
170 #[inline(always)]
171 pub fn apply_update<'a>(&mut self, update: impl Into<MetricUpdate<'a>>) {
172 let update = update.into();
173 match update {
174 MetricUpdate::Float(value) => {
175 self.update_statistic(value);
176 }
177 MetricUpdate::Usize(value) => {
178 self.update_statistic(value as f32);
179 }
180 MetricUpdate::Duration(value) => {
181 self.update_time_statistic(value);
182 }
183 MetricUpdate::UsizeDistribution(values) => {
184 self.update_statistic_from_iter(values.iter().map(|v| *v as f32));
185 }
186 MetricUpdate::Distribution(values) => {
187 self.update_statistic_from_iter(values.iter().cloned());
188 }
189 MetricUpdate::OwnedDistribution(values) => {
190 self.update_statistic_from_iter(values);
191 }
192 MetricUpdate::Statistic(stat) => {
193 self.inner.merge(&stat);
194 self.dtype = DTYPE_FLOAT32;
195 self.meta.update_count += 1;
196 }
197 MetricUpdate::Bool(value) => {
198 self.update_statistic(if value { 1.0 } else { 0.0 });
199 }
200 }
201 }
202
203 fn update_statistic(&mut self, value: f32) {
204 self.inner.add(value);
205 self.add_tag(TagType::Statistic);
206
207 self.meta.update_count += 1;
208
209 if self.dtype == DTYPE_NULL {
210 self.dtype = DTYPE_FLOAT32;
211 }
212 }
213
214 fn update_time_statistic(&mut self, value: Duration) {
215 self.inner.add(value.as_secs_f32());
216 self.add_tag(TagType::Time);
217 self.meta.update_count += 1;
218
219 if self.dtype == DTYPE_NULL {
220 self.dtype = DTYPE_DURATION;
221 }
222 }
223
224 fn update_statistic_from_iter<I>(&mut self, values: I)
225 where
226 I: IntoIterator<Item = f32>,
227 {
228 let samples = self.samples.get_or_insert_with(Vec::new);
229
230 samples.clear();
231 self.inner.clear();
232
233 for val in values {
234 samples.push(val);
235 self.inner.add(val);
236 }
237
238 self.meta.update_count += self.inner.count() as usize;
239
240 self.add_tag(TagType::Distribution);
241
242 if self.dtype == DTYPE_NULL {
243 self.dtype = DTYPE_LIST;
244 }
245 }
246
247 pub fn clear_samples(&mut self) {
248 self.samples = None;
249 }
250
251 pub fn statistic(&self) -> &Statistic {
252 &self.inner
253 }
254
255 pub fn name(&self) -> &SmallStr {
256 &self.name
257 }
258
259 pub fn last_value(&self) -> f32 {
260 self.inner.last_value()
261 }
262
263 pub fn count(&self) -> u32 {
264 self.inner.count()
265 }
266
267 pub fn mean(&self) -> f32 {
268 self.inner.mean()
269 }
270
271 pub fn var(&self) -> f32 {
272 self.inner.variance().unwrap_or(0.0)
273 }
274
275 pub fn stddev(&self) -> f32 {
276 self.inner.std_dev().unwrap_or(0.0)
277 }
278
279 pub fn skew(&self) -> f32 {
280 self.inner.skewness().unwrap_or(0.0)
281 }
282
283 pub fn kurt(&self) -> f32 {
284 self.inner.kurtosis().unwrap_or(0.0)
285 }
286
287 pub fn min(&self) -> f32 {
288 self.inner.min()
289 }
290
291 pub fn max(&self) -> f32 {
292 self.inner.max()
293 }
294
295 pub fn sum(&self) -> f32 {
296 self.inner.sum()
297 }
298
299 pub fn quantile(&self, q: f32) -> Option<f32> {
300 self.distributions().and_then(|view| view.quantile(q))
301 }
302}
303
304impl Hash for Metric {
305 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
306 self.name.hash(state);
307 self.inner.hash(state);
308 self.tags.hash(state);
309 }
310}
311
312#[derive(Clone, PartialEq, Debug)]
313pub enum MetricUpdate<'a> {
314 Float(f32),
315 Usize(usize),
316 Duration(Duration),
317 Distribution(&'a [f32]),
318 OwnedDistribution(Vec<f32>),
319 UsizeDistribution(&'a [usize]),
320 Statistic(Statistic),
321 Bool(bool),
322}
323
324impl From<f32> for MetricUpdate<'_> {
325 fn from(value: f32) -> Self {
326 MetricUpdate::Float(value)
327 }
328}
329
330impl From<usize> for MetricUpdate<'_> {
331 fn from(value: usize) -> Self {
332 MetricUpdate::Usize(value)
333 }
334}
335
336impl From<Duration> for MetricUpdate<'_> {
337 fn from(value: Duration) -> Self {
338 MetricUpdate::Duration(value)
339 }
340}
341
342impl<'a> From<&'a [f32]> for MetricUpdate<'a> {
343 fn from(value: &'a [f32]) -> Self {
344 MetricUpdate::Distribution(value)
345 }
346}
347
348impl<'a> From<&'a Vec<f32>> for MetricUpdate<'a> {
349 fn from(value: &'a Vec<f32>) -> Self {
350 MetricUpdate::Distribution(value)
351 }
352}
353
354impl<'a> From<&'a Vec<usize>> for MetricUpdate<'a> {
355 fn from(value: &'a Vec<usize>) -> Self {
356 MetricUpdate::UsizeDistribution(value)
357 }
358}
359
360impl From<Statistic> for MetricUpdate<'_> {
361 fn from(value: Statistic) -> Self {
362 MetricUpdate::Statistic(value)
363 }
364}
365
366impl From<bool> for MetricUpdate<'_> {
367 fn from(value: bool) -> Self {
368 MetricUpdate::Bool(value)
369 }
370}
371
372impl<'a> TryFrom<AnyValue<'a>> for MetricUpdate<'a> {
373 type Error = RadiateError;
374
375 fn try_from(value: AnyValue<'a>) -> Result<Self, Self::Error> {
376 match value {
377 AnyValue::UInt8(v) => Ok(MetricUpdate::Float(v as f32)),
378 AnyValue::UInt16(v) => Ok(MetricUpdate::Float(v as f32)),
379 AnyValue::UInt32(v) => Ok(MetricUpdate::Float(v as f32)),
380 AnyValue::UInt64(v) => Ok(MetricUpdate::Float(v as f32)),
381 AnyValue::UInt128(v) => Ok(MetricUpdate::Float(v as f32)),
382
383 AnyValue::Int8(v) => Ok(MetricUpdate::Float(v as f32)),
384 AnyValue::Int16(v) => Ok(MetricUpdate::Float(v as f32)),
385 AnyValue::Int32(v) => Ok(MetricUpdate::Float(v as f32)),
386 AnyValue::Int64(v) => Ok(MetricUpdate::Float(v as f32)),
387 AnyValue::Int128(v) => Ok(MetricUpdate::Float(v as f32)),
388
389 AnyValue::Float32(v) => Ok(MetricUpdate::Float(v)),
390 AnyValue::Float64(v) => Ok(MetricUpdate::Float(v as f32)),
391
392 AnyValue::Duration(v) => Ok(MetricUpdate::Duration(v)),
393
394 AnyValue::Slice(values) => {
395 let out = values
396 .iter()
397 .enumerate()
398 .map(|(index, v)| {
399 v.clone().extract::<f32>().ok_or(
400 radiate_err!(
401 Metric:
402 "cannot convert AnyValue sequence into Vec<f32>: element at index {index} has non-numeric type `{}`", v.type_name()))
403
404 })
405 .collect::<Result<Vec<f32>, _>>()?;
406
407 Ok(MetricUpdate::OwnedDistribution(out))
408 }
409
410 AnyValue::Vector(values) => {
411 let out = values
412 .into_iter()
413 .enumerate()
414 .map(|(index, v)| {
415 let ty = v.type_name();
416 v.extract::<f32>()
417 .ok_or(radiate_err!(
418 Metric:
419 "cannot convert AnyValue sequence into Vec<f32>: element at index {index} has non-numeric type `{ty}`"
420 ))
421 })
422 .collect::<Result<Vec<f32>, _>>()?;
423
424 Ok(MetricUpdate::OwnedDistribution(out))
425 }
426
427 other => Err(radiate_err!(Metric: "cannot convert AnyValue of type `{}` into MetricUpdate", other.type_name())),
428 }
429 }
430}
431
432impl std::fmt::Debug for Metric {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 write!(f, "Metric {{ name: {}, }}", self.name)
435 }
436}
437
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 const EPSILON: f32 = 1e-5;
444
445 fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
446 (a - b).abs() <= eps
447 }
448
449 fn assert_stat_eq(m: &Metric, count: u32, mean: f32, var: f32, min: f32, max: f32) {
450 assert_eq!(m.count(), count);
451 assert!(approx_eq(m.mean(), mean, EPSILON), "mean");
452 assert!(approx_eq(m.var(), var, EPSILON), "var");
453 assert!(approx_eq(m.min(), min, EPSILON), "min");
454 assert!(approx_eq(m.max(), max, EPSILON), "max");
455 }
456
457 fn stats_of(values: &[f32]) -> (u32, f32, f32, f32, f32) {
458 let n = values.len() as u32;
460 if n == 0 {
461 return (0, 0.0, f32::NAN, f32::INFINITY, f32::NEG_INFINITY);
462 }
463 let mean = values.iter().sum::<f32>() / values.len() as f32;
464
465 let mut m2 = 0.0_f32;
466 for &v in values {
467 let d = v - mean;
468 m2 += d * d;
469 }
470
471 let var = if n == 1 { 0.0 } else { m2 / (n as f32 - 1.0) };
472
473 let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
474 let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
475
476 (n, mean, var, min, max)
477 }
478
479 #[test]
480 fn test_metric() {
481 let mut metric = Metric::new("test");
482 metric.apply_update(1.0);
483 metric.apply_update(2.0);
484 metric.apply_update(3.0);
485 metric.apply_update(4.0);
486 metric.apply_update(5.0);
487
488 assert_eq!(metric.count(), 5);
489 assert_eq!(metric.last_value(), 5.0);
490 assert_eq!(metric.mean(), 3.0);
491 assert_eq!(metric.var(), 2.5);
492 assert_eq!(metric.stddev(), 1.5811388);
493 assert_eq!(metric.min(), 1.0);
494 assert_eq!(metric.max(), 5.0);
495 assert_eq!(metric.name(), "test");
496 }
497
498 #[test]
499 fn test_metric_labels() {
500 let mut metric = Metric::new("test");
501
502 metric.apply_update(1.0);
503 metric.apply_update(2.0);
504 metric.apply_update(3.0);
505 metric.apply_update(4.0);
506 metric.apply_update(5.0);
507
508 assert_eq!(metric.count(), 5);
509 assert_eq!(metric.last_value(), 5.0);
510 assert_eq!(metric.mean(), 3.0);
511 assert_eq!(metric.var(), 2.5);
512 assert_eq!(metric.stddev(), 1.5811388);
513 assert_eq!(metric.min(), 1.0);
514 assert_eq!(metric.max(), 5.0);
515 }
516
517 #[test]
518 fn distribution_tag_is_applied_on_any_slice_update() {
519 let mut m = Metric::new("scores");
520
521 m.apply_update(1.0);
523 m.apply_update(2.0);
524 assert!(m.tags().has(TagType::Statistic));
525 assert!(!m.tags().has(TagType::Distribution));
526
527 m.apply_update(&[3.0, 4.0][..]);
529
530 assert!(
531 m.tags().has(TagType::Distribution),
532 "expected Distribution tag after slice update"
533 );
534 }
535
536 #[test]
537 fn metric_merge_matches_streaming_samples() {
538 let a = [1.0, 2.0, 3.0, 4.0];
539 let b = [10.0, 20.0, 30.0];
540
541 let mut m1 = Metric::new("x");
542 m1.apply_update(&a[..]);
543
544 let mut m2 = Metric::new("x");
545 m2.apply_update(&b[..]);
546
547 m1.update_from(m2);
548
549 let combined = [1.0, 2.0, 3.0, 4.0, 10.0, 20.0, 30.0];
550 let (n, mean, var, min, max) = stats_of(&combined);
551 assert_stat_eq(&m1, n, mean, var, min, max);
552 }
553}