1use crate::{
2 Metric, MetricUpdate,
3 stats::{Meta, Tag, TagType, defaults::try_add_tag_from_str, fmt},
4};
5use radiate_expr::{AnyValue, ApplyExpr, DataType, Expr, ExprProjection, ExprQuery, SelectExpr};
6use radiate_utils::intern;
7#[cfg(feature = "serde")]
8use serde::{Deserialize, Serialize};
9use std::{
10 collections::HashMap,
11 fmt::{Debug, Display},
12 time::Duration,
13};
14
15#[derive(PartialEq)]
16pub struct MetricSetSummary {
17 pub metrics: usize,
18 pub updates: f32,
19}
20
21#[derive(Clone, Default, PartialEq)]
22pub struct MetricSet {
23 metrics: HashMap<&'static str, Metric>,
24 meta: Meta,
25}
26
27impl MetricSet {
28 pub fn new() -> Self {
29 MetricSet {
30 metrics: HashMap::new(),
31 meta: Meta::default(),
32 }
33 }
34
35 pub fn next_version(&mut self) -> u64 {
36 let result = self.meta.version;
37 self.meta.version += 1;
38 result
39 }
40
41 pub fn version(&self) -> u64 {
42 self.meta.version
43 }
44
45 #[inline(always)]
46 pub fn keys(&self) -> Vec<&'static str> {
47 self.metrics.keys().cloned().collect()
48 }
49
50 #[inline(always)]
51 pub fn flush_all_into(&mut self, target: &mut MetricSet) {
52 let version = target.next_version();
53 for (key, mut m) in self.metrics.drain() {
54 m.set_version(version);
55 if let Some(target_metric) = target.metrics.get_mut(key) {
56 target_metric.update_from(m);
57 } else {
58 try_add_tag_from_str(&mut m);
59 target.metrics.insert(key, m);
60 }
61 }
62
63 self.clear();
64 }
65
66 #[inline(always)]
67 pub fn replace(&mut self, metric: impl Into<Metric>) {
68 let mut metric = metric.into();
69 try_add_tag_from_str(&mut metric);
70 self.metrics.insert(intern!(metric.name()), metric);
71 }
72
73 #[inline(always)]
74 pub fn upsert<'a>(&mut self, metric: impl Into<MetricSetUpdate<'a>>) {
75 let update = metric.into();
76 let version = self.version();
77
78 match update {
79 MetricSetUpdate::Many(metrics) => {
80 for metric in metrics {
81 self.add_or_update_internal(version, metric);
82 }
83 }
84 MetricSetUpdate::Single(metric) => {
85 self.add_or_update_internal(version, metric);
86 }
87 MetricSetUpdate::ManyUpdate(updates) => {
88 for metric in updates {
89 self.upsert(metric);
90 }
91 }
92 MetricSetUpdate::NamedSingle(name, metric_update, tag) => {
93 self.meta.update_count += 1;
94 if let Some(m) = self.metrics.get_mut(name) {
95 m.set_version(version);
96 m.apply_update(metric_update);
97 if let Some(tag) = tag {
98 m.add_tag(tag);
99 }
100 return;
101 }
102
103 let new_name = radiate_utils::intern_name_as_snake_case(name);
104 if let Some(m) = self.metrics.get_mut(&new_name) {
105 m.set_version(version);
106 m.apply_update(metric_update);
107 if let Some(tag) = tag {
108 m.add_tag(tag);
109 }
110 } else {
111 let mut metric = Metric::new(new_name);
112 try_add_tag_from_str(&mut metric);
113 metric.set_version(version);
114 metric.apply_update(metric_update);
115
116 if let Some(tag) = tag {
117 metric.add_tag(tag);
118 }
119
120 self.add(metric);
121 }
122 }
123 }
124 }
125
126 #[inline(always)]
127 pub fn iter_tagged<'a>(
128 &'a self,
129 tag: TagType,
130 ) -> impl Iterator<Item = (&'static str, &'a Metric)> {
131 self.metrics.iter().filter_map(move |(k, m)| {
132 if m.tags().has(tag) {
133 Some((*k, m))
134 } else {
135 None
136 }
137 })
138 }
139
140 #[inline(always)]
141 pub fn tags(&self) -> impl Iterator<Item = TagType> {
142 self.metrics
143 .values()
144 .fold(Tag::empty(), |acc, m| acc.union(m.tags()))
145 .into_iter()
146 }
147
148 #[inline(always)]
149 pub fn iter(&self) -> impl Iterator<Item = (&'static str, &Metric)> {
150 self.metrics.iter().map(|(name, metric)| (*name, metric))
151 }
152
153 #[inline(always)]
154 pub fn add(&mut self, metric: Metric) {
155 self.metrics.insert(intern!(metric.name()), metric);
156 }
157
158 #[inline(always)]
159 pub fn get(&self, name: &str) -> Option<&Metric> {
160 self.metrics.get(name)
161 }
162
163 #[inline(always)]
164 pub fn get_from_string(&self, name: String) -> Option<&Metric> {
165 self.metrics.get(name.as_str())
166 }
167
168 #[inline(always)]
169 pub fn clear(&mut self) {
170 for (_, m) in self.metrics.iter_mut() {
171 m.clear_values();
172 }
173
174 self.meta.update_count = 0;
175 }
176
177 #[inline(always)]
178 pub fn contains_key(&self, name: &str) -> bool {
179 self.metrics.contains_key(intern!(name))
180 }
181
182 #[inline(always)]
183 pub fn len(&self) -> usize {
184 self.metrics.len()
185 }
186
187 #[inline(always)]
188 pub fn summary(&self) -> MetricSetSummary {
189 MetricSetSummary {
190 metrics: self.metrics.len(),
191 updates: self.meta.update_count as f32,
192 }
193 }
194
195 pub fn dashboard(&self) -> String {
196 fmt::render_full(self).unwrap_or_default()
197 }
198
199 fn add_or_update_internal(&mut self, version: u64, mut metric: Metric) {
200 self.meta.update_count += 1;
201 if let Some(existing) = self.metrics.get_mut(metric.name()) {
202 existing.set_version(version);
203 existing.update_from(metric);
204 } else {
205 try_add_tag_from_str(&mut metric);
206 metric.set_version(version);
207 self.metrics.insert(intern!(metric.name()), metric);
208 }
209 }
210}
211
212impl<'a> ApplyExpr<'a> for MetricSet {
213 fn apply(&self, expr: &'a mut Expr) -> AnyValue<'a> {
214 expr.dispatch(self).unwrap()
215 }
216}
217
218impl ExprProjection for MetricSet {
219 fn project(&self, path: &SelectExpr) -> Option<AnyValue<'static>> {
220 let value_to_float32 = |value: f32| AnyValue::Float32(value);
221
222 let value_to_duration = |value: f32| Duration::from_secs_f32(value).into();
223
224 let SelectExpr::Field(key, field) = path else {
225 return None;
226 };
227
228 let str_key = key.as_str()?;
229
230 self.get(str_key)
231 .map(|metric| match field.dtype() {
232 DataType::Float32 => match field.name().to_lowercase().as_str() {
233 "last_value" => AnyValue::Float32(metric.last_value()),
234 "mean" => value_to_float32(metric.mean()),
235 "std_dev" => value_to_float32(metric.stddev()),
236 "min" => value_to_float32(metric.min()),
237 "max" => value_to_float32(metric.max()),
238 "sum" => value_to_float32(metric.sum()),
239 "skew" => value_to_float32(metric.skew()),
240 "var" => value_to_float32(metric.var()),
241 "count" => AnyValue::UInt64(metric.count() as u64),
242 "version" => AnyValue::UInt64(metric.version()),
243 "update_count" => AnyValue::UInt64(metric.update_count() as u64),
244 _ => AnyValue::Null,
245 },
246 DataType::Duration => match field.name().to_lowercase().as_str() {
247 "last_value" => {
248 AnyValue::Duration(Duration::from_secs_f32(metric.last_value()))
249 }
250 "mean" => value_to_duration(metric.mean()),
251 "std_dev" => value_to_duration(metric.stddev()),
252 "min" => value_to_duration(metric.min()),
253 "max" => value_to_duration(metric.max()),
254 "sum" => value_to_duration(metric.sum()),
255 "var" => value_to_duration(metric.var()),
256 "count" => AnyValue::UInt64(metric.count() as u64),
257 "version" => AnyValue::UInt64(metric.version()),
258 "update_count" => AnyValue::UInt64(metric.update_count() as u64),
259 _ => AnyValue::Null,
260 },
261 _ => AnyValue::Null,
262 })
263 .or_else(|| Some(AnyValue::Null))
264 }
265}
266
267impl Display for MetricSet {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 let summary = self.summary();
270 let out = format!(
271 "[{} metrics, {:.0} updates]",
272 summary.metrics, summary.updates
273 );
274 write!(f, "{out}\n{}", fmt::render_full(self).unwrap_or_default())?;
275 Ok(())
276 }
277}
278
279impl Debug for MetricSet {
280 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
281 write!(f, "MetricSet {{\n")?;
282 write!(f, "{}\n", fmt::render_dashboard(&self).unwrap_or_default())?;
283 write!(f, "}}")
284 }
285}
286
287#[cfg(feature = "serde")]
288impl Serialize for MetricSet {
289 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
290 where
291 S: serde::Serializer,
292 {
293 let metrics = self
294 .metrics
295 .iter()
296 .map(|(_, metric)| metric.clone())
297 .collect::<Vec<Metric>>();
298 metrics.serialize(serializer)
299 }
300}
301
302#[cfg(feature = "serde")]
303impl<'de> Deserialize<'de> for MetricSet {
304 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
305 where
306 D: serde::Deserializer<'de>,
307 {
308 let metrics = Vec::<Metric>::deserialize(deserializer)?;
309
310 let mut metric_set = MetricSet::new();
311 for metric in metrics {
312 metric_set.add(metric);
313 }
314
315 Ok(metric_set)
316 }
317}
318
319pub enum MetricSetUpdate<'a> {
320 Many(Vec<Metric>),
321 Single(Metric),
322 ManyUpdate(Vec<(&'static str, MetricUpdate<'a>)>),
323 NamedSingle(&'static str, MetricUpdate<'a>, Option<TagType>),
324}
325
326impl From<Vec<Metric>> for MetricSetUpdate<'_> {
327 fn from(metrics: Vec<Metric>) -> Self {
328 MetricSetUpdate::Many(metrics)
329 }
330}
331
332impl From<Metric> for MetricSetUpdate<'_> {
333 fn from(metric: Metric) -> Self {
334 MetricSetUpdate::Single(metric)
335 }
336}
337
338impl<'a, U> From<(&'static str, U)> for MetricSetUpdate<'a>
339where
340 U: Into<MetricUpdate<'a>>,
341{
342 fn from((name, update): (&'static str, U)) -> Self {
343 MetricSetUpdate::NamedSingle(name, update.into(), None)
344 }
345}
346
347impl<'a, U> From<(TagType, &'static str, U)> for MetricSetUpdate<'a>
348where
349 U: Into<MetricUpdate<'a>>,
350{
351 fn from((tag, name, update): (TagType, &'static str, U)) -> Self {
352 MetricSetUpdate::NamedSingle(name, update.into(), Some(tag))
353 }
354}
355
356impl<'a, U> From<(&'static str, U, usize)> for MetricSetUpdate<'a>
357where
358 U: Into<MetricUpdate<'a>>,
359{
360 fn from((name, update, count): (&'static str, U, usize)) -> Self {
361 let name = radiate_utils::intern!(format!("{name}.{count}"));
362 MetricSetUpdate::NamedSingle(name, update.into(), None)
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369
370 const EPSILON: f32 = 1e-5;
371
372 fn approx_eq(a: f32, b: f32, eps: f32) -> bool {
373 (a - b).abs() <= eps
374 }
375
376 fn assert_stat_eq(m: &Metric, count: i32, mean: f32, var: f32, min: f32, max: f32) {
377 assert_eq!(m.count(), count);
378 assert!(approx_eq(m.mean(), mean, EPSILON), "mean");
379 assert!(approx_eq(m.var(), var, EPSILON), "var");
380 assert!(approx_eq(m.min(), min, EPSILON), "min");
381 assert!(approx_eq(m.max(), max, EPSILON), "max");
382 }
383
384 fn stats_of(values: &[f32]) -> (i32, f32, f32, f32, f32) {
385 let n = values.len() as i32;
387 if n == 0 {
388 return (0, 0.0, f32::NAN, f32::INFINITY, f32::NEG_INFINITY);
389 }
390 let mean = values.iter().sum::<f32>() / values.len() as f32;
391
392 let mut m2 = 0.0_f32;
393 for &v in values {
394 let d = v - mean;
395 m2 += d * d;
396 }
397
398 let var = if n == 1 { 0.0 } else { m2 / (n as f32 - 1.0) };
399
400 let min = values.iter().cloned().fold(f32::INFINITY, f32::min);
401 let max = values.iter().cloned().fold(f32::NEG_INFINITY, f32::max);
402
403 (n, mean, var, min, max)
404 }
405
406 #[test]
407 fn metric_set_flush_all_into_merges_metrics() {
408 let mut a = MetricSet::new();
409 let mut b = MetricSet::new();
410
411 a.upsert(("scores", &[1.0, 2.0, 3.0][..]));
412 b.upsert(("scores", &[10.0, 20.0][..]));
413
414 a.flush_all_into(&mut b);
416
417 let m = b.get("scores").unwrap();
418 let combined = [1.0, 2.0, 3.0, 10.0, 20.0];
419 let (n, mean, var, min, max) = stats_of(&combined);
420 assert_stat_eq(m, n, mean, var, min, max);
421 }
422}