mecha10_core/
metrics.rs

1//! Metrics Collection
2//!
3//! Standardized metrics collection system for Mecha10 applications.
4//!
5//! # Features
6//!
7//! - Multiple metric types (Counter, Gauge, Histogram)
8//! - Multiple backends (Prometheus, StatsD, OpenTelemetry)
9//! - Automatic node metrics collection
10//! - MLflow integration for experiment tracking
11//! - Low overhead with buffering
12//! - Type-safe metric names and labels
13//!
14//! # Architecture
15//!
16//! ```text
17//! ┌────────────┐
18//! │   Node A   │─┐
19//! └────────────┘ │
20//!                │  Record Metrics
21//! ┌────────────┐ │
22//! │   Node B   │─┼──▶  ┌─────────────────┐
23//! └────────────┘ │     │ MetricsRegistry │
24//!                │     └─────────────────┘
25//! ┌────────────┐ │              │
26//! │   Node C   │─┘              │
27//! └────────────┘                │
28//!                               ▼
29//!                    ┌──────────────────────┐
30//!                    │  Metrics Backends    │
31//!                    ├──────────────────────┤
32//!                    │ • Prometheus         │
33//!                    │ • StatsD             │
34//!                    │ • OpenTelemetry      │
35//!                    │ • MLflow             │
36//!                    └──────────────────────┘
37//! ```
38//!
39//! # Example
40//!
41//! ```rust
42//! use mecha10::prelude::*;
43//! use mecha10::metrics::{MetricsRegistry, Counter, Gauge};
44//!
45//! # async fn example() -> Result<()> {
46//! // Create metrics registry
47//! let metrics = MetricsRegistry::new();
48//!
49//! // Define metrics
50//! let messages_processed = Counter::new("messages_processed", "Total messages processed");
51//! let queue_size = Gauge::new("queue_size", "Current queue size");
52//!
53//! // Record metrics
54//! metrics.increment(&messages_processed, 1.0);
55//! metrics.set(&queue_size, 42.0);
56//!
57//! // Export to Prometheus
58//! let prometheus_text = metrics.export_prometheus();
59//! # Ok(())
60//! # }
61//! ```
62
63use serde::{Deserialize, Serialize};
64use std::collections::HashMap;
65use std::sync::Arc;
66use tokio::sync::RwLock;
67
68// ============================================================================
69// Metric Types
70// ============================================================================
71
72/// Metric type
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
74pub enum MetricType {
75    /// Counter - monotonically increasing value
76    Counter,
77    /// Gauge - value that can go up and down
78    Gauge,
79    /// Histogram - distribution of values
80    Histogram,
81}
82
83/// Metric definition
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub struct MetricName {
86    /// Metric name (e.g., "messages_processed")
87    pub name: String,
88    /// Optional labels (e.g., {"node": "camera", "topic": "/camera/rgb"})
89    pub labels: HashMap<String, String>,
90}
91
92// Implement Hash manually since HashMap doesn't implement Hash
93impl std::hash::Hash for MetricName {
94    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
95        self.name.hash(state);
96        // Hash labels in a deterministic order
97        let mut labels: Vec<_> = self.labels.iter().collect();
98        labels.sort_by_key(|(k, _)| *k);
99        for (k, v) in labels {
100            k.hash(state);
101            v.hash(state);
102        }
103    }
104}
105
106impl MetricName {
107    /// Create a new metric name
108    pub fn new(name: impl Into<String>) -> Self {
109        Self {
110            name: name.into(),
111            labels: HashMap::new(),
112        }
113    }
114
115    /// Add a label
116    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
117        self.labels.insert(key.into(), value.into());
118        self
119    }
120
121    /// Get Prometheus-style name with labels
122    pub fn prometheus_name(&self) -> String {
123        if self.labels.is_empty() {
124            self.name.clone()
125        } else {
126            let labels: Vec<String> = self.labels.iter().map(|(k, v)| format!("{}=\"{}\"", k, v)).collect();
127            format!("{}{{{}}}", self.name, labels.join(","))
128        }
129    }
130}
131
132/// Counter metric - monotonically increasing
133///
134/// Use for: request counts, error counts, bytes sent/received
135#[derive(Debug, Clone)]
136pub struct Counter {
137    pub name: MetricName,
138    pub help: String,
139}
140
141impl Counter {
142    /// Create a new counter
143    pub fn new(name: impl Into<String>, help: impl Into<String>) -> Self {
144        Self {
145            name: MetricName::new(name),
146            help: help.into(),
147        }
148    }
149
150    /// Create with labels
151    pub fn with_labels(name: impl Into<String>, help: impl Into<String>, labels: HashMap<String, String>) -> Self {
152        Self {
153            name: MetricName {
154                name: name.into(),
155                labels,
156            },
157            help: help.into(),
158        }
159    }
160}
161
162/// Gauge metric - value that can go up and down
163///
164/// Use for: temperature, memory usage, queue depth, active connections
165#[derive(Debug, Clone)]
166pub struct Gauge {
167    pub name: MetricName,
168    pub help: String,
169}
170
171impl Gauge {
172    /// Create a new gauge
173    pub fn new(name: impl Into<String>, help: impl Into<String>) -> Self {
174        Self {
175            name: MetricName::new(name),
176            help: help.into(),
177        }
178    }
179
180    /// Create with labels
181    pub fn with_labels(name: impl Into<String>, help: impl Into<String>, labels: HashMap<String, String>) -> Self {
182        Self {
183            name: MetricName {
184                name: name.into(),
185                labels,
186            },
187            help: help.into(),
188        }
189    }
190}
191
192/// Histogram metric - distribution of values
193///
194/// Use for: request duration, response sizes, latencies
195#[derive(Debug, Clone)]
196pub struct Histogram {
197    pub name: MetricName,
198    pub help: String,
199    pub buckets: Vec<f64>,
200}
201
202impl Histogram {
203    /// Create a new histogram with default buckets
204    pub fn new(name: impl Into<String>, help: impl Into<String>) -> Self {
205        Self {
206            name: MetricName::new(name),
207            help: help.into(),
208            buckets: default_buckets(),
209        }
210    }
211
212    /// Create with custom buckets
213    pub fn with_buckets(name: impl Into<String>, help: impl Into<String>, buckets: Vec<f64>) -> Self {
214        Self {
215            name: MetricName::new(name),
216            help: help.into(),
217            buckets,
218        }
219    }
220}
221
222fn default_buckets() -> Vec<f64> {
223    vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
224}
225
226// ============================================================================
227// Metric Values
228// ============================================================================
229
230#[derive(Debug, Clone)]
231struct CounterValue {
232    value: f64,
233}
234
235#[derive(Debug, Clone)]
236struct GaugeValue {
237    value: f64,
238}
239
240#[derive(Debug, Clone)]
241struct HistogramValue {
242    count: u64,
243    sum: f64,
244    buckets: HashMap<usize, u64>, // bucket index -> count
245}
246
247// ============================================================================
248// Metrics Registry
249// ============================================================================
250
251/// Central metrics registry
252///
253/// Collects and stores all metrics, provides export to various formats.
254///
255/// # Example
256///
257/// ```rust
258/// use mecha10::metrics::{MetricsRegistry, Counter, Gauge};
259///
260/// # fn example() {
261/// let registry = MetricsRegistry::new();
262///
263/// let counter = Counter::new("requests_total", "Total requests");
264/// let gauge = Gauge::new("active_connections", "Active connections");
265///
266/// registry.increment(&counter, 1.0);
267/// registry.set(&gauge, 42.0);
268///
269/// // Export to Prometheus
270/// let metrics_text = registry.export_prometheus();
271/// # }
272/// ```
273pub struct MetricsRegistry {
274    counters: Arc<RwLock<HashMap<MetricName, CounterValue>>>,
275    gauges: Arc<RwLock<HashMap<MetricName, GaugeValue>>>,
276    histograms: Arc<RwLock<HashMap<MetricName, HistogramValue>>>,
277    metadata: Arc<RwLock<HashMap<String, (MetricType, String)>>>, // name -> (type, help)
278}
279
280impl MetricsRegistry {
281    /// Create a new metrics registry
282    pub fn new() -> Self {
283        Self {
284            counters: Arc::new(RwLock::new(HashMap::new())),
285            gauges: Arc::new(RwLock::new(HashMap::new())),
286            histograms: Arc::new(RwLock::new(HashMap::new())),
287            metadata: Arc::new(RwLock::new(HashMap::new())),
288        }
289    }
290
291    /// Increment a counter
292    pub async fn increment(&self, counter: &Counter, value: f64) {
293        // Store metadata
294        {
295            let mut metadata = self.metadata.write().await;
296            metadata
297                .entry(counter.name.name.clone())
298                .or_insert((MetricType::Counter, counter.help.clone()));
299        }
300
301        // Update value
302        let mut counters = self.counters.write().await;
303        counters
304            .entry(counter.name.clone())
305            .and_modify(|v| v.value += value)
306            .or_insert(CounterValue { value });
307    }
308
309    /// Set a gauge value
310    pub async fn set(&self, gauge: &Gauge, value: f64) {
311        // Store metadata
312        {
313            let mut metadata = self.metadata.write().await;
314            metadata
315                .entry(gauge.name.name.clone())
316                .or_insert((MetricType::Gauge, gauge.help.clone()));
317        }
318
319        // Update value
320        let mut gauges = self.gauges.write().await;
321        gauges
322            .entry(gauge.name.clone())
323            .and_modify(|v| v.value = value)
324            .or_insert(GaugeValue { value });
325    }
326
327    /// Increment a gauge (delta)
328    pub async fn increment_gauge(&self, gauge: &Gauge, delta: f64) {
329        let mut gauges = self.gauges.write().await;
330        gauges
331            .entry(gauge.name.clone())
332            .and_modify(|v| v.value += delta)
333            .or_insert(GaugeValue { value: delta });
334    }
335
336    /// Decrement a gauge (delta)
337    pub async fn decrement_gauge(&self, gauge: &Gauge, delta: f64) {
338        self.increment_gauge(gauge, -delta).await;
339    }
340
341    /// Observe a value in a histogram
342    pub async fn observe(&self, histogram: &Histogram, value: f64) {
343        // Store metadata
344        {
345            let mut metadata = self.metadata.write().await;
346            metadata
347                .entry(histogram.name.name.clone())
348                .or_insert((MetricType::Histogram, histogram.help.clone()));
349        }
350
351        // Find bucket
352        let bucket_idx = histogram
353            .buckets
354            .iter()
355            .position(|&b| value <= b)
356            .unwrap_or(histogram.buckets.len());
357
358        // Update histogram
359        let mut histograms = self.histograms.write().await;
360        histograms
361            .entry(histogram.name.clone())
362            .and_modify(|h| {
363                h.count += 1;
364                h.sum += value;
365                *h.buckets.entry(bucket_idx).or_insert(0) += 1;
366            })
367            .or_insert_with(|| {
368                let mut buckets = HashMap::new();
369                buckets.insert(bucket_idx, 1);
370                HistogramValue {
371                    count: 1,
372                    sum: value,
373                    buckets,
374                }
375            });
376    }
377
378    /// Export metrics in Prometheus text format
379    pub async fn export_prometheus(&self) -> String {
380        let mut output = String::new();
381
382        // Export counters
383        {
384            let counters = self.counters.read().await;
385            let metadata = self.metadata.read().await;
386
387            let mut seen_names = std::collections::HashSet::new();
388
389            for (name, value) in counters.iter() {
390                // Write TYPE and HELP only once per metric name
391                if !seen_names.contains(&name.name) {
392                    if let Some((_, help)) = metadata.get(&name.name) {
393                        output.push_str(&format!("# HELP {} {}\n", name.name, help));
394                        output.push_str(&format!("# TYPE {} counter\n", name.name));
395                    }
396                    seen_names.insert(name.name.clone());
397                }
398
399                output.push_str(&format!("{} {}\n", name.prometheus_name(), value.value));
400            }
401        }
402
403        // Export gauges
404        {
405            let gauges = self.gauges.read().await;
406            let metadata = self.metadata.read().await;
407
408            let mut seen_names = std::collections::HashSet::new();
409
410            for (name, value) in gauges.iter() {
411                if !seen_names.contains(&name.name) {
412                    if let Some((_, help)) = metadata.get(&name.name) {
413                        output.push_str(&format!("# HELP {} {}\n", name.name, help));
414                        output.push_str(&format!("# TYPE {} gauge\n", name.name));
415                    }
416                    seen_names.insert(name.name.clone());
417                }
418
419                output.push_str(&format!("{} {}\n", name.prometheus_name(), value.value));
420            }
421        }
422
423        // Export histograms
424        {
425            let histograms = self.histograms.read().await;
426            let metadata = self.metadata.read().await;
427
428            for (name, value) in histograms.iter() {
429                if let Some((_, help)) = metadata.get(&name.name) {
430                    output.push_str(&format!("# HELP {} {}\n", name.name, help));
431                    output.push_str(&format!("# TYPE {} histogram\n", name.name));
432                }
433
434                // Output bucket counts
435                for (bucket_idx, count) in &value.buckets {
436                    output.push_str(&format!(
437                        "{}_bucket{{le=\"{}\"}} {}\n",
438                        name.prometheus_name(),
439                        bucket_idx,
440                        count
441                    ));
442                }
443
444                // Output sum and count
445                output.push_str(&format!("{}_sum {}\n", name.prometheus_name(), value.sum));
446                output.push_str(&format!("{}_count {}\n", name.prometheus_name(), value.count));
447            }
448        }
449
450        output
451    }
452
453    /// Export metrics as JSON (for MLflow, APIs, etc.)
454    pub async fn export_json(&self) -> serde_json::Value {
455        let mut metrics = serde_json::Map::new();
456
457        // Export counters
458        {
459            let counters = self.counters.read().await;
460            for (name, value) in counters.iter() {
461                metrics.insert(
462                    name.prometheus_name(),
463                    serde_json::json!({
464                        "type": "counter",
465                        "value": value.value
466                    }),
467                );
468            }
469        }
470
471        // Export gauges
472        {
473            let gauges = self.gauges.read().await;
474            for (name, value) in gauges.iter() {
475                metrics.insert(
476                    name.prometheus_name(),
477                    serde_json::json!({
478                        "type": "gauge",
479                        "value": value.value
480                    }),
481                );
482            }
483        }
484
485        // Export histograms
486        {
487            let histograms = self.histograms.read().await;
488            for (name, value) in histograms.iter() {
489                metrics.insert(
490                    name.prometheus_name(),
491                    serde_json::json!({
492                        "type": "histogram",
493                        "count": value.count,
494                        "sum": value.sum,
495                        "buckets": value.buckets
496                    }),
497                );
498            }
499        }
500
501        serde_json::Value::Object(metrics)
502    }
503
504    /// Clear all metrics (useful for testing)
505    pub async fn clear(&self) {
506        self.counters.write().await.clear();
507        self.gauges.write().await.clear();
508        self.histograms.write().await.clear();
509        self.metadata.write().await.clear();
510    }
511}
512
513impl Default for MetricsRegistry {
514    fn default() -> Self {
515        Self::new()
516    }
517}
518
519// ============================================================================
520// Standard Node Metrics
521// ============================================================================
522
523/// Standard metrics that all nodes should track
524pub struct NodeMetrics {
525    /// Messages received
526    pub messages_received: Counter,
527    /// Messages sent
528    pub messages_sent: Counter,
529    /// Processing errors
530    pub errors_total: Counter,
531    /// Processing duration
532    pub processing_duration: Histogram,
533    /// Active subscriptions
534    pub active_subscriptions: Gauge,
535    /// Queue size
536    pub queue_size: Gauge,
537}
538
539impl NodeMetrics {
540    /// Create standard node metrics for a given node
541    pub fn new(node_id: &str) -> Self {
542        let mut labels = HashMap::new();
543        labels.insert("node".to_string(), node_id.to_string());
544
545        Self {
546            messages_received: Counter::with_labels(
547                "messages_received_total",
548                "Total messages received",
549                labels.clone(),
550            ),
551            messages_sent: Counter::with_labels("messages_sent_total", "Total messages sent", labels.clone()),
552            errors_total: Counter::with_labels("errors_total", "Total errors", labels.clone()),
553            processing_duration: Histogram::with_buckets(
554                "processing_duration_seconds",
555                "Message processing duration",
556                default_buckets(),
557            ),
558            active_subscriptions: Gauge::with_labels(
559                "active_subscriptions",
560                "Number of active subscriptions",
561                labels.clone(),
562            ),
563            queue_size: Gauge::with_labels("queue_size", "Current queue size", labels),
564        }
565    }
566}