1use crate::stats::{MetricView, Tag, TagType, defaults};
2use radiate_error::{RadiateError, radiate_err};
3use radiate_expr::{AnyValue, DataType};
4use radiate_utils::{
5 Statistic, ToSnakeCase, cache_arc_string, intern, intern_snake_case,
6};
7#[cfg(feature = "serde")]
8use serde::{Deserialize, Serialize};
9use std::{hash::Hash, sync::Arc, time::Duration};
10
11const DATA_TYPE_NULL: u8 = 0;
12const DATA_TYPE_FLOAT32: u8 = 1;
13const DATA_TYPE_DURATION: u8 = 2;
14const DATA_TYPE_LIST: u8 = 3;
15
16#[macro_export]
17macro_rules! metric {
18 ($name:expr, $update:expr) => {{
19 let mut metric = $crate::Metric::new($name);
20 metric.apply_update($update);
21 metric
22 }};
23 ($name:expr) => {{ $crate::Metric::new($name).upsert(1) }};
24}
25
26
27#[derive(Clone, PartialEq, Default)]
28#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
29pub(super) struct Meta {
30 pub(super) update_count: usize,
31 pub(super) version: u64,
32}
33
34#[derive(Clone, PartialEq, Default)]
35#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
36pub struct Metric {
37 name: Arc<String>,
38 meta: Option<Meta>,
39 inner: Statistic,
40 tags: Tag,
41 dtype: u8,
42}
43
44impl Metric {
45 pub fn new(name: &'static str) -> Self {
46 let name = cache_arc_string!(intern_snake_case!(name));
47 let tags = defaults::default_tags(&name);
48
49 Self {
50 name,
51 meta: None,
52 inner: Statistic::default(),
53 tags,
54 dtype: DATA_TYPE_NULL,
55 }
56 }
57
58 #[inline(always)]
59 pub fn update_count(&self) -> usize {
60 self.meta.as_ref().map_or(0, |meta| meta.update_count)
61 }
62
63 #[inline(always)]
64 pub fn version(&self) -> u64 {
65 self.meta.as_ref().map_or(0, |meta| meta.version)
66 }
67
68 #[inline(always)]
69 pub fn set_version(&mut self, version: u64) {
70 if let Some(meta) = &mut self.meta {
71 if version != meta.version {
72 meta.update_count = 0;
73 }
74
75 meta.version = version;
76 } else {
77 self.meta = Some(Meta {
78 update_count: 0,
79 version,
80 });
81 }
82 }
83
84 pub fn dtype(&self) -> DataType {
85 match self.dtype {
86 DATA_TYPE_NULL => DataType::Null,
87 DATA_TYPE_FLOAT32 => DataType::Float32,
88 DATA_TYPE_DURATION => DataType::Duration,
89 DATA_TYPE_LIST => DataType::List(Box::new(DataType::Float32)),
90 _ => DataType::Null,
91 }
92 }
93
94 #[inline(always)]
95 pub fn tags(&self) -> Tag {
96 self.tags
97 }
98
99 #[inline(always)]
100 pub fn with_tag(mut self, tag: TagType) -> Self {
101 self.add_tag(tag);
102 self
103 }
104
105 #[inline(always)]
106 pub fn with_tags<T>(&mut self, tags: T)
107 where
108 T: Into<Tag>,
109 {
110 self.tags = tags.into();
111 }
112
113 #[inline(always)]
114 pub fn add_tags(&mut self, tags: Tag) {
115 self.tags = self.tags.union(tags);
116 }
117
118 #[inline(always)]
119 pub fn add_tag(&mut self, tag: TagType) {
120 self.tags.insert(tag);
121 }
122
123 pub fn contains_tag(&self, tag: &TagType) -> bool {
124 self.tags.has(*tag)
125 }
126
127 pub fn tags_iter(&self) -> impl Iterator<Item = TagType> {
128 self.tags.iter()
129 }
130
131 pub fn clear_values(&mut self) {
132 self.inner = Statistic::default();
133 }
134
135 pub fn stats<'a>(&'a self) -> Option<MetricView<'a, f32>> {
136 if !self.tags.has(TagType::Statistic) {
137 return None;
138 }
139
140 Some(MetricView {
141 name: &self.name,
142 statistic: &self.inner,
143 mapper: |v| Some(v),
144 })
145 }
146
147 pub fn times<'a>(&'a self) -> Option<MetricView<'a, Duration>> {
148 if !self.tags.has(TagType::Time) {
149 return None;
150 }
151
152 Some(MetricView {
153 name: &self.name,
154 statistic: &self.inner,
155 mapper: |v| Some(Duration::from_secs_f32(v)),
156 } )
157 }
158
159 pub fn distributions<'a>(&'a self) -> Option<MetricView<'a, f32>> {
160 if !self.tags.has(TagType::Distribution) {
161 return None;
162 }
163
164 Some(MetricView {
165 name: &self.name,
166 statistic: &self.inner,
167 mapper: |v| Some(v),
168 })
169 }
170
171 #[inline(always)]
172 pub fn upsert<'a>(mut self, update: impl Into<MetricUpdate<'a>>) -> Self {
173 self.apply_update(update);
174 self
175 }
176
177 #[inline(always)]
178 pub fn update_from(&mut self, other: Metric) {
179 if other.count() as f32 == other.sum() && !other.tags.has(TagType::Distribution) {
183 self.apply_update(other.sum());
184 } else {
185 self.apply_update(other.inner);
186 }
187
188 self.tags = self.tags.union(other.tags);
189 }
190
191 #[inline(always)]
192 pub fn apply_update<'a>(&mut self, update: impl Into<MetricUpdate<'a>>) {
193 let update = update.into();
194 match update {
195 MetricUpdate::Float(value) => {
196 self.update_statistic(value);
197 }
198 MetricUpdate::Usize(value) => {
199 self.update_statistic(value as f32);
200 }
201 MetricUpdate::Duration(value) => {
202 self.update_time_statistic(value);
203 }
204 MetricUpdate::UsizeDistribution(values) => {
205 self.update_statistic_from_iter(values.iter().map(|v| *v as f32));
206 }
207 MetricUpdate::Distribution(values) => {
208 self.update_statistic_from_iter(values.iter().cloned());
209 }
210 MetricUpdate::Statistic(stat) => {
211 self.inner.merge(&stat);
212 self.meta.as_mut().map(|meta| meta.update_count += 1);
213 self.dtype = DATA_TYPE_FLOAT32;
214 }
215 }
216 }
217
218 fn update_statistic(&mut self, value: f32) {
219 self.inner.add(value);
220 self.add_tag(TagType::Statistic);
221 self.meta.as_mut().map(|meta| meta.update_count += 1);
222
223 if self.dtype == DATA_TYPE_NULL {
224 self.dtype = DATA_TYPE_FLOAT32;
225 }
226 }
227
228 fn update_time_statistic(&mut self, value: Duration) {
229 self.inner.add(value.as_secs_f32());
230 self.add_tag(TagType::Time);
231 self.meta.as_mut().map(|meta| meta.update_count += 1);
232
233 if self.dtype == DATA_TYPE_NULL {
234 self.dtype = DATA_TYPE_DURATION;
235 }
236 }
237
238 fn update_statistic_from_iter<I>(&mut self, values: I)
239 where
240 I: IntoIterator<Item = f32>,
241 {
242 let mut values_count = 0;
243 let mut new_stat = Statistic::default();
244 for value in values {
245 new_stat.add(value);
246 values_count += 1;
247 }
248
249 self.inner = new_stat;
250 self.meta.as_mut().map(|meta| meta.update_count += values_count);
251 self.add_tag(TagType::Distribution);
252
253 if self.dtype == DATA_TYPE_NULL {
254 self.dtype = DATA_TYPE_LIST;
255 }
256 }
257
258 pub fn statistic(&self) -> &Statistic {
259 &self.inner
260 }
261
262 pub fn name(&self) -> &str {
263 &self.name
264 }
265
266 pub fn last_value(&self) -> f32 {
267 self.inner.last_value()
268 }
269
270 pub fn count(&self) -> i32 {
271 self.inner.count()
272 }
273
274 pub fn mean(&self) -> f32 {
275 self.inner.mean()
276 }
277
278 pub fn var(&self) -> f32 {
279 self.inner.variance().unwrap_or(0.0)
280 }
281
282 pub fn stddev(&self) -> f32 {
283 self.inner.std_dev().unwrap_or(0.0)
284 }
285
286 pub fn skew(&self) -> f32 {
287 self.inner.skewness().unwrap_or(0.0)
288 }
289
290 pub fn min(&self) -> f32 {
291 self.inner.min()
292 }
293
294 pub fn max(&self) -> f32 {
295 self.inner.max()
296 }
297
298 pub fn sum(&self) -> f32 {
299 self.inner.sum()
300 }
301}
302
303impl Hash for Metric {
304 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
305 self.name.hash(state);
306 self.inner.hash(state);
307 self.tags.hash(state);
308 }
309}
310
311#[derive(Clone, PartialEq)]
312pub enum MetricUpdate<'a> {
313 Float(f32),
314 Usize(usize),
315 Duration(Duration),
316 Distribution(&'a [f32]),
317 UsizeDistribution(&'a [usize]),
318 Statistic(Statistic),
319}
320
321impl From<f32> for MetricUpdate<'_> {
322 fn from(value: f32) -> Self {
323 MetricUpdate::Float(value)
324 }
325}
326
327impl From<usize> for MetricUpdate<'_> {
328 fn from(value: usize) -> Self {
329 MetricUpdate::Usize(value)
330 }
331}
332
333impl From<Duration> for MetricUpdate<'_> {
334 fn from(value: Duration) -> Self {
335 MetricUpdate::Duration(value)
336 }
337}
338
339impl<'a> From<&'a [f32]> for MetricUpdate<'a> {
340 fn from(value: &'a [f32]) -> Self {
341 MetricUpdate::Distribution(value)
342 }
343}
344
345impl<'a> From<&'a Vec<f32>> for MetricUpdate<'a> {
346 fn from(value: &'a Vec<f32>) -> Self {
347 MetricUpdate::Distribution(value)
348 }
349}
350
351impl<'a> From<&'a Vec<usize>> for MetricUpdate<'a> {
352 fn from(value: &'a Vec<usize>) -> Self {
353 MetricUpdate::UsizeDistribution(value)
354 }
355}
356
357impl From<Statistic> for MetricUpdate<'_> {
358 fn from(value: Statistic) -> Self {
359 MetricUpdate::Statistic(value)
360 }
361}
362
363impl<'a> TryFrom<AnyValue<'a>> for MetricUpdate<'a> {
364 type Error = RadiateError;
365
366 fn try_from(value: AnyValue<'a>) -> Result<Self, Self::Error> {
367 match value {
368 AnyValue::UInt8(v) => Ok(MetricUpdate::Float(v as f32)),
369 AnyValue::UInt16(v) => Ok(MetricUpdate::Float(v as f32)),
370 AnyValue::UInt32(v) => Ok(MetricUpdate::Float(v as f32)),
371 AnyValue::UInt64(v) => Ok(MetricUpdate::Float(v as f32)),
372 AnyValue::UInt128(v) => Ok(MetricUpdate::Float(v as f32)),
373
374 AnyValue::Int8(v) => Ok(MetricUpdate::Float(v as f32)),
375 AnyValue::Int16(v) => Ok(MetricUpdate::Float(v as f32)),
376 AnyValue::Int32(v) => Ok(MetricUpdate::Float(v as f32)),
377 AnyValue::Int64(v) => Ok(MetricUpdate::Float(v as f32)),
378 AnyValue::Int128(v) => Ok(MetricUpdate::Float(v as f32)),
379
380 AnyValue::Float32(v) => Ok(MetricUpdate::Float(v)),
381 AnyValue::Float64(v) => Ok(MetricUpdate::Float(v as f32)),
382
383 AnyValue::Duration(v) => Ok(MetricUpdate::Duration(v)),
384
385 AnyValue::Slice(values) => {
386 let out = values
387 .iter()
388 .enumerate()
389 .map(|(index, v)| {
390 v.clone().extract::<f32>().ok_or(
391 radiate_err!(
392 Metric:
393 "cannot convert AnyValue sequence into MetricUpdate::Statistic: element at index {index} has non-numeric type `{}`", v.type_name()))
394
395 })
396 .collect::<Result<Statistic, _>>()?;
397
398 Ok(MetricUpdate::Statistic(out))
399 }
400
401 AnyValue::Vector(values) => {
402 let out = values
403 .into_iter()
404 .enumerate()
405 .map(|(index, v)| {
406 let ty = v.type_name();
407 v.extract::<f32>()
408 .ok_or(radiate_err!(
409 Metric:
410 "cannot convert AnyValue sequence into MetricUpdate::Distribution: element at index {index} has non-numeric type `{ty}`"
411 ))
412 })
413 .collect::<Result<Statistic, _>>()?;
414
415 Ok(MetricUpdate::Statistic(out))
416 }
417
418 other => Err(radiate_err!(Metric: "cannot convert AnyValue of type `{}` into MetricUpdate", other.type_name())),
419 }
420 }
421}
422
423impl std::fmt::Debug for Metric {
424 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
425 write!(f, "Metric {{ name: {}, }}", self.name)
426 }
427}
428
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 const EPSILON: f32 = 1e-5;
435
436 fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
437 (a - b).abs() <= eps
438 }
439
440 fn assert_stat_eq(m: &Metric, count: i32, mean: f32, var: f32, min: f32, max: f32) {
441 assert_eq!(m.count(), count);
442 assert!(approx_eq(m.mean(), mean, EPSILON), "mean");
443 assert!(approx_eq(m.var(), var, EPSILON), "var");
444 assert!(approx_eq(m.min(), min, EPSILON), "min");
445 assert!(approx_eq(m.max(), max, EPSILON), "max");
446 }
447
448 fn stats_of(values: &[f32]) -> (i32, f32, f32, f32, f32) {
449 let n = values.len() as i32;
451 if n == 0 {
452 return (0, 0.0, f32::NAN, f32::INFINITY, f32::NEG_INFINITY);
453 }
454 let mean = values.iter().sum::<f32>() / values.len() as f32;
455
456 let mut m2 = 0.0_f32;
457 for &v in values {
458 let d = v - mean;
459 m2 += d * d;
460 }
461
462 let var = if n == 1 { 0.0 } else { m2 / (n as f32 - 1.0) };
463
464 let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
465 let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
466
467 (n, mean, var, min, max)
468 }
469
470 #[test]
471 fn test_metric() {
472 let mut metric = Metric::new("test");
473 metric.apply_update(1.0);
474 metric.apply_update(2.0);
475 metric.apply_update(3.0);
476 metric.apply_update(4.0);
477 metric.apply_update(5.0);
478
479 assert_eq!(metric.count(), 5);
480 assert_eq!(metric.last_value(), 5.0);
481 assert_eq!(metric.mean(), 3.0);
482 assert_eq!(metric.var(), 2.5);
483 assert_eq!(metric.stddev(), 1.5811388);
484 assert_eq!(metric.min(), 1.0);
485 assert_eq!(metric.max(), 5.0);
486 assert_eq!(metric.name(), "test");
487 }
488
489 #[test]
490 fn test_metric_labels() {
491 let mut metric = Metric::new("test");
492
493 metric.apply_update(1.0);
494 metric.apply_update(2.0);
495 metric.apply_update(3.0);
496 metric.apply_update(4.0);
497 metric.apply_update(5.0);
498
499 assert_eq!(metric.count(), 5);
500 assert_eq!(metric.last_value(), 5.0);
501 assert_eq!(metric.mean(), 3.0);
502 assert_eq!(metric.var(), 2.5);
503 assert_eq!(metric.stddev(), 1.5811388);
504 assert_eq!(metric.min(), 1.0);
505 assert_eq!(metric.max(), 5.0);
506 }
507
508 #[test]
509 fn distribution_tag_is_applied_on_any_slice_update() {
510 let mut m = Metric::new("scores");
511
512 m.apply_update(1.0);
514 m.apply_update(2.0);
515 assert!(m.tags().has(TagType::Statistic));
516 assert!(!m.tags().has(TagType::Distribution));
517
518 m.apply_update(&[3.0, 4.0][..]);
520
521 assert!(
522 m.tags().has(TagType::Distribution),
523 "expected Distribution tag after slice update"
524 );
525 }
526
527 #[test]
528 fn metric_merge_matches_streaming_samples() {
529 let a = [1.0, 2.0, 3.0, 4.0];
530 let b = [10.0, 20.0, 30.0];
531
532 let mut m1 = Metric::new("x");
533 m1.apply_update(&a[..]);
534
535 let mut m2 = Metric::new("x");
536 m2.apply_update(&b[..]);
537
538 m1.update_from(m2);
539
540 let combined = [1.0, 2.0, 3.0, 4.0, 10.0, 20.0, 30.0];
541 let (n, mean, var, min, max) = stats_of(&combined);
542 assert_stat_eq(&m1, n, mean, var, min, max);
543 }
544}