Skip to main content

mabi_core/
metrics.rs

1//! Metrics collection and export.
2//!
3//! This module provides a comprehensive metrics collection system for Mabinogion protocol simulator.
4//! It supports:
5//! - Prometheus-compatible metrics export
6//! - Global registry with lazy initialization
7//! - Protocol-specific and device-specific metrics
8//! - System metrics (CPU, memory)
9//! - Request timing with automatic observation via RAII
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use mabi_core::metrics::{MetricsCollector, GLOBAL_REGISTRY, measure_request};
15//!
16//! let metrics = MetricsCollector::global();
17//! metrics.record_read("modbus", true, Duration::from_micros(500));
18//!
19//! // Using the measure_request! macro
20//! measure_request!(metrics, "modbus", "read", {
21//!     // ... operation code
22//! });
23//! ```
24
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use once_cell::sync::Lazy;
30use parking_lot::RwLock;
31use prometheus::{
32    Encoder, Gauge, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
33    IntGaugeVec, Opts, Registry, TextEncoder,
34};
35use serde::{Deserialize, Serialize};
36
37// ============================================================================
38// Constants
39// ============================================================================
40
41/// Metric name prefix for all Mabinogion metrics.
42pub const METRIC_PREFIX: &str = "mabi";
43
44/// Standard histogram buckets for latency measurements (in seconds).
45/// Covers range from 0.1ms to 10s with exponential distribution.
46pub const LATENCY_BUCKETS: &[f64] = &[
47    0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
48];
49
50/// Standard histogram buckets for tick duration measurements (in seconds).
51/// Optimized for sub-millisecond to 100ms range.
52pub const TICK_BUCKETS: &[f64] = &[0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1];
53
54// ============================================================================
55// Global Registry
56// ============================================================================
57
58/// Global Prometheus registry for all metrics.
59/// This registry is shared across all MetricsCollector instances.
60pub static GLOBAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
61
62/// Global metrics collector instance (singleton).
63static GLOBAL_METRICS: Lazy<MetricsCollector> =
64    Lazy::new(|| MetricsCollector::with_registry(GLOBAL_REGISTRY.clone()));
65
66// ============================================================================
67// Metrics Collector
68// ============================================================================
69
70/// Metrics collector for the simulator.
71///
72/// Provides comprehensive metrics collection including:
73/// - Request/response counters and timings
74/// - Device and connection tracking
75/// - System resource monitoring (CPU, memory)
76/// - Protocol-specific metrics
77#[derive(Clone)]
78pub struct MetricsCollector {
79    registry: Arc<Registry>,
80    inner: Arc<MetricsInner>,
81}
82
83/// Internal metrics storage.
84struct MetricsInner {
85    // === Counters ===
86    /// Total requests by protocol and operation type
87    requests_total: IntCounterVec,
88    /// Total read operations by protocol and status
89    reads_total: IntCounterVec,
90    /// Total write operations by protocol and status
91    writes_total: IntCounterVec,
92    /// Total errors by protocol and error type
93    errors_total: IntCounterVec,
94    /// Total engine ticks
95    ticks_total: IntCounter,
96    /// Total messages by protocol and direction
97    messages_total: IntCounterVec,
98    /// Total events by type
99    events_total: IntCounterVec,
100
101    // === Gauges ===
102    /// Number of active devices
103    devices_active: IntGauge,
104    /// Active connections per protocol
105    connections_active: IntGaugeVec,
106    /// Total data points
107    points_total: IntGauge,
108    /// Data points per protocol and device
109    points_by_device: IntGaugeVec,
110    /// Memory usage in bytes
111    memory_bytes: IntGauge,
112    /// CPU usage percentage (0-100)
113    cpu_percent: Gauge,
114    /// Current tick rate (ticks per second)
115    tick_rate: Gauge,
116
117    // === Histograms ===
118    /// Request duration by protocol and operation
119    request_duration: HistogramVec,
120    /// Message latency by protocol
121    message_latency: HistogramVec,
122    /// Engine tick duration
123    tick_duration: Histogram,
124    /// Read operation duration by protocol
125    read_duration: HistogramVec,
126    /// Write operation duration by protocol
127    write_duration: HistogramVec,
128
129    // === Internal tracking ===
130    start_time: Instant,
131    last_tick_time: RwLock<Instant>,
132    tick_count_for_rate: AtomicU64,
133}
134
135impl MetricsCollector {
136    /// Create a new metrics collector with a fresh registry.
137    pub fn new() -> Self {
138        let registry = Registry::new();
139        Self::with_registry(registry)
140    }
141
142    /// Get the global metrics collector instance.
143    ///
144    /// This returns a shared instance that uses the global registry.
145    /// Use this when you want all metrics to be collected in one place.
146    pub fn global() -> &'static Self {
147        &GLOBAL_METRICS
148    }
149
150    /// Create a metrics collector with a custom registry.
151    pub fn with_registry(registry: Registry) -> Self {
152        let inner = MetricsInner::new(&registry);
153
154        Self {
155            registry: Arc::new(registry),
156            inner: Arc::new(inner),
157        }
158    }
159
160    /// Get the Prometheus registry.
161    pub fn registry(&self) -> &Registry {
162        &self.registry
163    }
164
165    // ========================================================================
166    // Request/Operation Recording
167    // ========================================================================
168
169    /// Record a request by protocol and operation.
170    ///
171    /// This is the primary method for tracking request counts and should be
172    /// used in conjunction with `record_request_duration` or `time_request`.
173    pub fn record_request(&self, protocol: &str, operation: &str) {
174        self.inner
175            .requests_total
176            .with_label_values(&[protocol, operation])
177            .inc();
178    }
179
180    /// Record a request duration.
181    pub fn record_request_duration(&self, protocol: &str, operation: &str, duration: Duration) {
182        self.inner
183            .request_duration
184            .with_label_values(&[protocol, operation])
185            .observe(duration.as_secs_f64());
186    }
187
188    /// Create a request timer for automatic duration recording.
189    ///
190    /// The timer automatically records the duration when dropped.
191    ///
192    /// # Example
193    ///
194    /// ```rust,ignore
195    /// let _timer = metrics.time_request("modbus", "read");
196    /// // ... do work ...
197    /// // duration is automatically recorded when _timer goes out of scope
198    /// ```
199    pub fn time_request(&self, protocol: &str, operation: &str) -> RequestTimer {
200        self.record_request(protocol, operation);
201        RequestTimer::new(
202            self.inner
203                .request_duration
204                .with_label_values(&[protocol, operation]),
205        )
206    }
207
208    /// Record a message (for protocol-level message tracking).
209    pub fn record_message(&self, protocol: &str, direction: &str) {
210        self.inner
211            .messages_total
212            .with_label_values(&[protocol, direction])
213            .inc();
214    }
215
216    /// Record a read operation with timing and status.
217    pub fn record_read(&self, protocol: &str, success: bool, duration: Duration) {
218        let status = if success { "success" } else { "error" };
219        self.inner
220            .reads_total
221            .with_label_values(&[protocol, status])
222            .inc();
223        self.inner
224            .read_duration
225            .with_label_values(&[protocol])
226            .observe(duration.as_secs_f64());
227
228        // Also record as a request
229        self.inner
230            .requests_total
231            .with_label_values(&[protocol, "read"])
232            .inc();
233        self.inner
234            .request_duration
235            .with_label_values(&[protocol, "read"])
236            .observe(duration.as_secs_f64());
237    }
238
239    /// Record a write operation with timing and status.
240    pub fn record_write(&self, protocol: &str, success: bool, duration: Duration) {
241        let status = if success { "success" } else { "error" };
242        self.inner
243            .writes_total
244            .with_label_values(&[protocol, status])
245            .inc();
246        self.inner
247            .write_duration
248            .with_label_values(&[protocol])
249            .observe(duration.as_secs_f64());
250
251        // Also record as a request
252        self.inner
253            .requests_total
254            .with_label_values(&[protocol, "write"])
255            .inc();
256        self.inner
257            .request_duration
258            .with_label_values(&[protocol, "write"])
259            .observe(duration.as_secs_f64());
260    }
261
262    // ========================================================================
263    // Error Recording
264    // ========================================================================
265
266    /// Record an error by protocol and error type.
267    pub fn record_error(&self, protocol: &str, error_type: &str) {
268        self.inner
269            .errors_total
270            .with_label_values(&[protocol, error_type])
271            .inc();
272    }
273
274    // ========================================================================
275    // Engine/Tick Recording
276    // ========================================================================
277
278    /// Record an engine tick with duration.
279    pub fn record_tick(&self, duration: Duration) {
280        self.inner.ticks_total.inc();
281        self.inner.tick_duration.observe(duration.as_secs_f64());
282
283        // Update tick rate calculation
284        let count = self
285            .inner
286            .tick_count_for_rate
287            .fetch_add(1, Ordering::Relaxed)
288            + 1;
289        let mut last_time = self.inner.last_tick_time.write();
290        let elapsed = last_time.elapsed();
291
292        // Update tick rate every second
293        if elapsed >= Duration::from_secs(1) {
294            let rate = count as f64 / elapsed.as_secs_f64();
295            self.inner.tick_rate.set(rate);
296            self.inner.tick_count_for_rate.store(0, Ordering::Relaxed);
297            *last_time = Instant::now();
298        }
299    }
300
301    /// Record message latency.
302    pub fn record_latency(&self, protocol: &str, latency: Duration) {
303        self.inner
304            .message_latency
305            .with_label_values(&[protocol])
306            .observe(latency.as_secs_f64());
307    }
308
309    /// Record an event by type.
310    pub fn record_event(&self, event_type: &str) {
311        self.inner
312            .events_total
313            .with_label_values(&[event_type])
314            .inc();
315    }
316
317    // ========================================================================
318    // Gauge Setters
319    // ========================================================================
320
321    /// Set the number of active devices.
322    pub fn set_devices_active(&self, count: i64) {
323        self.inner.devices_active.set(count);
324    }
325
326    /// Set active connections for a protocol.
327    pub fn set_connections_active(&self, protocol: &str, count: i64) {
328        self.inner
329            .connections_active
330            .with_label_values(&[protocol])
331            .set(count);
332    }
333
334    /// Increment active connections for a protocol.
335    pub fn inc_connections(&self, protocol: &str) {
336        self.inner
337            .connections_active
338            .with_label_values(&[protocol])
339            .inc();
340    }
341
342    /// Decrement active connections for a protocol.
343    pub fn dec_connections(&self, protocol: &str) {
344        self.inner
345            .connections_active
346            .with_label_values(&[protocol])
347            .dec();
348    }
349
350    /// Set total data points.
351    pub fn set_points_total(&self, count: i64) {
352        self.inner.points_total.set(count);
353    }
354
355    /// Set data points for a specific device.
356    pub fn set_device_points(&self, protocol: &str, device_id: &str, count: i64) {
357        self.inner
358            .points_by_device
359            .with_label_values(&[protocol, device_id])
360            .set(count);
361    }
362
363    /// Remove device points metrics (when device is removed).
364    pub fn remove_device_points(&self, protocol: &str, device_id: &str) {
365        // Set to 0 to indicate removal (Prometheus doesn't support metric deletion)
366        self.inner
367            .points_by_device
368            .with_label_values(&[protocol, device_id])
369            .set(0);
370    }
371
372    // ========================================================================
373    // System Metrics
374    // ========================================================================
375
376    /// Set memory usage in bytes.
377    pub fn set_memory_bytes(&self, bytes: i64) {
378        self.inner.memory_bytes.set(bytes);
379    }
380
381    /// Set CPU usage percentage (0-100).
382    pub fn set_cpu_percent(&self, percent: f64) {
383        self.inner.cpu_percent.set(percent.clamp(0.0, 100.0));
384    }
385
386    /// Update system metrics (memory and CPU).
387    ///
388    /// This is a convenience method that updates both memory and CPU metrics.
389    pub fn update_system_metrics(&self, memory_bytes: i64, cpu_percent: f64) {
390        self.set_memory_bytes(memory_bytes);
391        self.set_cpu_percent(cpu_percent);
392    }
393
394    // ========================================================================
395    // Getters & Export
396    // ========================================================================
397
398    /// Get uptime since the metrics collector was created.
399    pub fn uptime(&self) -> Duration {
400        self.inner.start_time.elapsed()
401    }
402
403    /// Get the current tick rate (ticks per second).
404    pub fn tick_rate(&self) -> f64 {
405        self.inner.tick_rate.get()
406    }
407
408    /// Get a snapshot of current metrics.
409    pub fn snapshot(&self) -> MetricsSnapshot {
410        MetricsSnapshot {
411            uptime_secs: self.uptime().as_secs(),
412            devices_active: self.inner.devices_active.get() as u64,
413            points_total: self.inner.points_total.get() as u64,
414            memory_bytes: self.inner.memory_bytes.get() as u64,
415            cpu_percent: self.inner.cpu_percent.get(),
416            ticks_total: self.inner.ticks_total.get(),
417            tick_rate: self.inner.tick_rate.get(),
418        }
419    }
420
421    /// Get a detailed snapshot with additional information.
422    pub fn detailed_snapshot(&self) -> DetailedMetricsSnapshot {
423        DetailedMetricsSnapshot {
424            basic: self.snapshot(),
425            start_time_unix: std::time::SystemTime::now()
426                .duration_since(std::time::UNIX_EPOCH)
427                .map(|d| d.as_secs() - self.uptime().as_secs())
428                .unwrap_or(0),
429        }
430    }
431
432    /// Export metrics in Prometheus text format.
433    pub fn export_prometheus(&self) -> String {
434        let encoder = TextEncoder::new();
435        let metric_families = self.registry.gather();
436        let mut buffer = Vec::new();
437        encoder.encode(&metric_families, &mut buffer).unwrap();
438        String::from_utf8(buffer).unwrap()
439    }
440
441    /// Export metrics from the global registry.
442    pub fn export_global_prometheus() -> String {
443        let encoder = TextEncoder::new();
444        let metric_families = GLOBAL_REGISTRY.gather();
445        let mut buffer = Vec::new();
446        encoder.encode(&metric_families, &mut buffer).unwrap();
447        String::from_utf8(buffer).unwrap()
448    }
449}
450
451impl Default for MetricsCollector {
452    fn default() -> Self {
453        Self::new()
454    }
455}
456
457impl MetricsInner {
458    fn new(registry: &Registry) -> Self {
459        let now = Instant::now();
460
461        // ====================================================================
462        // Counters
463        // ====================================================================
464
465        let requests_total = IntCounterVec::new(
466            Opts::new(
467                format!("{}_requests_total", METRIC_PREFIX),
468                "Total number of protocol requests",
469            ),
470            &["protocol", "operation"],
471        )
472        .unwrap();
473        registry.register(Box::new(requests_total.clone())).unwrap();
474
475        let messages_total = IntCounterVec::new(
476            Opts::new(
477                format!("{}_messages_total", METRIC_PREFIX),
478                "Total messages processed",
479            ),
480            &["protocol", "direction"],
481        )
482        .unwrap();
483        registry.register(Box::new(messages_total.clone())).unwrap();
484
485        let reads_total = IntCounterVec::new(
486            Opts::new(
487                format!("{}_reads_total", METRIC_PREFIX),
488                "Total read operations",
489            ),
490            &["protocol", "status"],
491        )
492        .unwrap();
493        registry.register(Box::new(reads_total.clone())).unwrap();
494
495        let writes_total = IntCounterVec::new(
496            Opts::new(
497                format!("{}_writes_total", METRIC_PREFIX),
498                "Total write operations",
499            ),
500            &["protocol", "status"],
501        )
502        .unwrap();
503        registry.register(Box::new(writes_total.clone())).unwrap();
504
505        let errors_total = IntCounterVec::new(
506            Opts::new(
507                format!("{}_errors_total", METRIC_PREFIX),
508                "Total errors by protocol and type",
509            ),
510            &["protocol", "error_type"],
511        )
512        .unwrap();
513        registry.register(Box::new(errors_total.clone())).unwrap();
514
515        let ticks_total = IntCounter::new(
516            format!("{}_ticks_total", METRIC_PREFIX),
517            "Total engine ticks processed",
518        )
519        .unwrap();
520        registry.register(Box::new(ticks_total.clone())).unwrap();
521
522        let events_total = IntCounterVec::new(
523            Opts::new(
524                format!("{}_events_total", METRIC_PREFIX),
525                "Total events by type",
526            ),
527            &["event_type"],
528        )
529        .unwrap();
530        registry.register(Box::new(events_total.clone())).unwrap();
531
532        // ====================================================================
533        // Gauges
534        // ====================================================================
535
536        let devices_active = IntGauge::new(
537            format!("{}_devices_active", METRIC_PREFIX),
538            "Number of active simulated devices",
539        )
540        .unwrap();
541        registry.register(Box::new(devices_active.clone())).unwrap();
542
543        let connections_active = IntGaugeVec::new(
544            Opts::new(
545                format!("{}_connections_active", METRIC_PREFIX),
546                "Number of active connections per protocol",
547            ),
548            &["protocol"],
549        )
550        .unwrap();
551        registry
552            .register(Box::new(connections_active.clone()))
553            .unwrap();
554
555        let points_total = IntGauge::new(
556            format!("{}_data_points_total", METRIC_PREFIX),
557            "Total number of data points across all devices",
558        )
559        .unwrap();
560        registry.register(Box::new(points_total.clone())).unwrap();
561
562        let points_by_device = IntGaugeVec::new(
563            Opts::new(
564                format!("{}_data_points_by_device", METRIC_PREFIX),
565                "Number of data points per device",
566            ),
567            &["protocol", "device_id"],
568        )
569        .unwrap();
570        registry
571            .register(Box::new(points_by_device.clone()))
572            .unwrap();
573
574        let memory_bytes = IntGauge::new(
575            format!("{}_memory_usage_bytes", METRIC_PREFIX),
576            "Current memory usage in bytes",
577        )
578        .unwrap();
579        registry.register(Box::new(memory_bytes.clone())).unwrap();
580
581        let cpu_percent = Gauge::new(
582            format!("{}_cpu_usage_percent", METRIC_PREFIX),
583            "Current CPU usage percentage (0-100)",
584        )
585        .unwrap();
586        registry.register(Box::new(cpu_percent.clone())).unwrap();
587
588        let tick_rate = Gauge::new(
589            format!("{}_tick_rate", METRIC_PREFIX),
590            "Current tick rate (ticks per second)",
591        )
592        .unwrap();
593        registry.register(Box::new(tick_rate.clone())).unwrap();
594
595        // ====================================================================
596        // Histograms
597        // ====================================================================
598
599        let request_duration = HistogramVec::new(
600            HistogramOpts::new(
601                format!("{}_request_duration_seconds", METRIC_PREFIX),
602                "Request processing duration in seconds",
603            )
604            .buckets(LATENCY_BUCKETS.to_vec()),
605            &["protocol", "operation"],
606        )
607        .unwrap();
608        registry
609            .register(Box::new(request_duration.clone()))
610            .unwrap();
611
612        let message_latency = HistogramVec::new(
613            HistogramOpts::new(
614                format!("{}_message_latency_seconds", METRIC_PREFIX),
615                "Message latency in seconds",
616            )
617            .buckets(LATENCY_BUCKETS.to_vec()),
618            &["protocol"],
619        )
620        .unwrap();
621        registry
622            .register(Box::new(message_latency.clone()))
623            .unwrap();
624
625        let tick_duration = Histogram::with_opts(
626            HistogramOpts::new(
627                format!("{}_tick_duration_seconds", METRIC_PREFIX),
628                "Engine tick processing duration in seconds",
629            )
630            .buckets(TICK_BUCKETS.to_vec()),
631        )
632        .unwrap();
633        registry.register(Box::new(tick_duration.clone())).unwrap();
634
635        let read_duration = HistogramVec::new(
636            HistogramOpts::new(
637                format!("{}_read_duration_seconds", METRIC_PREFIX),
638                "Read operation duration in seconds",
639            )
640            .buckets(LATENCY_BUCKETS.to_vec()),
641            &["protocol"],
642        )
643        .unwrap();
644        registry.register(Box::new(read_duration.clone())).unwrap();
645
646        let write_duration = HistogramVec::new(
647            HistogramOpts::new(
648                format!("{}_write_duration_seconds", METRIC_PREFIX),
649                "Write operation duration in seconds",
650            )
651            .buckets(LATENCY_BUCKETS.to_vec()),
652            &["protocol"],
653        )
654        .unwrap();
655        registry.register(Box::new(write_duration.clone())).unwrap();
656
657        Self {
658            // Counters
659            requests_total,
660            messages_total,
661            reads_total,
662            writes_total,
663            errors_total,
664            ticks_total,
665            events_total,
666
667            // Gauges
668            devices_active,
669            connections_active,
670            points_total,
671            points_by_device,
672            memory_bytes,
673            cpu_percent,
674            tick_rate,
675
676            // Histograms
677            request_duration,
678            message_latency,
679            tick_duration,
680            read_duration,
681            write_duration,
682
683            // Internal tracking
684            start_time: now,
685            last_tick_time: RwLock::new(now),
686            tick_count_for_rate: AtomicU64::new(0),
687        }
688    }
689}
690
691// ============================================================================
692// Snapshot Types
693// ============================================================================
694
695/// Snapshot of current metrics.
696#[derive(Debug, Clone, Serialize, Deserialize)]
697pub struct MetricsSnapshot {
698    /// Uptime in seconds.
699    pub uptime_secs: u64,
700    /// Number of active devices.
701    pub devices_active: u64,
702    /// Total data points across all devices.
703    pub points_total: u64,
704    /// Memory usage in bytes.
705    pub memory_bytes: u64,
706    /// CPU usage percentage (0-100).
707    pub cpu_percent: f64,
708    /// Total engine ticks processed.
709    pub ticks_total: u64,
710    /// Current tick rate (ticks per second).
711    pub tick_rate: f64,
712}
713
714/// Detailed metrics snapshot with additional information.
715#[derive(Debug, Clone, Serialize, Deserialize)]
716pub struct DetailedMetricsSnapshot {
717    /// Basic metrics snapshot.
718    #[serde(flatten)]
719    pub basic: MetricsSnapshot,
720    /// Unix timestamp when the metrics collection started.
721    pub start_time_unix: u64,
722}
723
724// ============================================================================
725// Latency Statistics
726// ============================================================================
727
728/// Latency statistics with percentile calculations.
729#[derive(Debug, Clone, Default, Serialize, Deserialize)]
730pub struct LatencyStats {
731    /// Minimum latency in microseconds.
732    pub min_us: u64,
733    /// Maximum latency in microseconds.
734    pub max_us: u64,
735    /// Average latency in microseconds.
736    pub avg_us: u64,
737    /// Median (P50) latency in microseconds.
738    pub p50_us: u64,
739    /// P90 latency in microseconds.
740    pub p90_us: u64,
741    /// P95 latency in microseconds.
742    pub p95_us: u64,
743    /// P99 latency in microseconds.
744    pub p99_us: u64,
745    /// Sample count.
746    pub count: u64,
747    /// Standard deviation in microseconds.
748    pub stddev_us: u64,
749}
750
751impl LatencyStats {
752    /// Calculate statistics from duration samples.
753    pub fn from_samples(samples: &[Duration]) -> Self {
754        if samples.is_empty() {
755            return Self::default();
756        }
757
758        let mut sorted: Vec<u64> = samples.iter().map(|d| d.as_micros() as u64).collect();
759        sorted.sort_unstable();
760
761        let count = sorted.len();
762        let sum: u64 = sorted.iter().sum();
763        let avg = sum / count as u64;
764
765        // Calculate standard deviation
766        let variance: u64 = sorted
767            .iter()
768            .map(|&x| {
769                let diff = x.abs_diff(avg);
770                diff * diff
771            })
772            .sum::<u64>()
773            / count as u64;
774        let stddev = (variance as f64).sqrt() as u64;
775
776        Self {
777            min_us: sorted[0],
778            max_us: sorted[count - 1],
779            avg_us: avg,
780            p50_us: Self::percentile(&sorted, 50),
781            p90_us: Self::percentile(&sorted, 90),
782            p95_us: Self::percentile(&sorted, 95),
783            p99_us: Self::percentile(&sorted, 99),
784            count: count as u64,
785            stddev_us: stddev,
786        }
787    }
788
789    /// Calculate a specific percentile from sorted samples.
790    fn percentile(sorted: &[u64], p: usize) -> u64 {
791        if sorted.is_empty() {
792            return 0;
793        }
794        let idx = (sorted.len() * p / 100).min(sorted.len() - 1);
795        sorted[idx]
796    }
797
798    /// Check if the latency statistics indicate a performance issue.
799    ///
800    /// Returns true if P99 latency exceeds the threshold.
801    pub fn is_latency_high(&self, threshold_us: u64) -> bool {
802        self.p99_us > threshold_us
803    }
804}
805
806// ============================================================================
807// Timer Utilities
808// ============================================================================
809
810/// Simple timer for measuring operation duration.
811///
812/// Use this when you need manual control over timing.
813/// For automatic recording to Prometheus, use `RequestTimer` instead.
814pub struct Timer {
815    start: Instant,
816}
817
818impl Timer {
819    /// Start a new timer.
820    pub fn start() -> Self {
821        Self {
822            start: Instant::now(),
823        }
824    }
825
826    /// Get elapsed duration without consuming the timer.
827    pub fn elapsed(&self) -> Duration {
828        self.start.elapsed()
829    }
830
831    /// Stop the timer and return elapsed duration.
832    pub fn stop(self) -> Duration {
833        self.elapsed()
834    }
835
836    /// Reset the timer to now.
837    pub fn reset(&mut self) {
838        self.start = Instant::now();
839    }
840}
841
842impl Default for Timer {
843    fn default() -> Self {
844        Self::start()
845    }
846}
847
848/// Request timer that automatically records duration to Prometheus when dropped.
849///
850/// This implements the RAII pattern for request timing.
851/// The duration is recorded when the timer goes out of scope.
852///
853/// # Example
854///
855/// ```rust,ignore
856/// let _timer = metrics.time_request("modbus", "read");
857/// // ... perform the operation ...
858/// // duration is automatically recorded when _timer is dropped
859/// ```
860pub struct RequestTimer {
861    histogram: Histogram,
862    start: Instant,
863    recorded: bool,
864}
865
866impl RequestTimer {
867    /// Create a new request timer.
868    pub fn new(histogram: Histogram) -> Self {
869        Self {
870            histogram,
871            start: Instant::now(),
872            recorded: false,
873        }
874    }
875
876    /// Get elapsed duration without recording.
877    pub fn elapsed(&self) -> Duration {
878        self.start.elapsed()
879    }
880
881    /// Manually record the duration and mark as recorded.
882    ///
883    /// This prevents the Drop implementation from recording again.
884    pub fn record(mut self) -> Duration {
885        let elapsed = self.elapsed();
886        self.histogram.observe(elapsed.as_secs_f64());
887        self.recorded = true;
888        elapsed
889    }
890
891    /// Discard the timer without recording.
892    ///
893    /// Use this if you want to cancel timing (e.g., on error).
894    pub fn discard(mut self) {
895        self.recorded = true; // Prevent recording in Drop
896    }
897}
898
899impl Drop for RequestTimer {
900    fn drop(&mut self) {
901        if !self.recorded {
902            self.histogram.observe(self.start.elapsed().as_secs_f64());
903        }
904    }
905}
906
907// ============================================================================
908// System Metrics Collector
909// ============================================================================
910
911/// System metrics collector for CPU and memory monitoring.
912///
913/// This provides a way to periodically collect system metrics
914/// and update the MetricsCollector.
915#[cfg(feature = "system-metrics")]
916pub struct SystemMetricsCollector {
917    metrics: MetricsCollector,
918    interval: Duration,
919}
920
921#[cfg(feature = "system-metrics")]
922impl SystemMetricsCollector {
923    /// Create a new system metrics collector.
924    pub fn new(metrics: MetricsCollector, interval: Duration) -> Self {
925        Self { metrics, interval }
926    }
927
928    /// Start collecting system metrics in a background task.
929    ///
930    /// Returns a handle that can be used to stop collection.
931    pub fn start(self) -> tokio::task::JoinHandle<()> {
932        tokio::spawn(async move {
933            let mut interval = tokio::time::interval(self.interval);
934
935            loop {
936                interval.tick().await;
937                self.collect_once();
938            }
939        })
940    }
941
942    /// Collect system metrics once.
943    fn collect_once(&self) {
944        // Memory usage (process RSS)
945        #[cfg(target_os = "linux")]
946        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
947            if let Some(line) = status.lines().find(|l| l.starts_with("VmRSS:")) {
948                if let Some(kb_str) = line.split_whitespace().nth(1) {
949                    if let Ok(kb) = kb_str.parse::<i64>() {
950                        self.metrics.set_memory_bytes(kb * 1024);
951                    }
952                }
953            }
954        }
955
956        // CPU usage would require more complex tracking (comparing with previous sample)
957        // For now, we skip automatic CPU collection
958    }
959}
960
961// ============================================================================
962// Macros
963// ============================================================================
964
965/// Measure request duration and record to metrics.
966///
967/// This macro times the execution of a block and records the duration.
968///
969/// # Example
970///
971/// ```rust,ignore
972/// let result = measure_request!(metrics, "modbus", "read", {
973///     device.read_registers(0, 10).await
974/// });
975/// ```
976#[macro_export]
977macro_rules! measure_request {
978    ($metrics:expr, $protocol:expr, $operation:expr, $block:expr) => {{
979        let _timer = $metrics.time_request($protocol, $operation);
980        let result = $block;
981        result
982    }};
983}
984
985/// Record an error with context.
986///
987/// This macro records an error and provides a consistent error type format.
988///
989/// # Example
990///
991/// ```rust,ignore
992/// record_error!(metrics, "modbus", err);
993/// ```
994#[macro_export]
995macro_rules! record_error {
996    ($metrics:expr, $protocol:expr, $error:expr) => {{
997        let error_type = $crate::metrics::classify_error(&$error);
998        $metrics.record_error($protocol, error_type);
999    }};
1000}
1001
1002/// Classify an error into a category for metrics.
1003///
1004/// Returns a string suitable for use as a metric label.
1005pub fn classify_error<E: std::fmt::Display>(error: &E) -> &'static str {
1006    let error_str = error.to_string().to_lowercase();
1007
1008    if error_str.contains("timeout") {
1009        "timeout"
1010    } else if error_str.contains("connection") || error_str.contains("connect") {
1011        "connection"
1012    } else if error_str.contains("protocol") || error_str.contains("invalid") {
1013        "protocol"
1014    } else if error_str.contains(" io ")
1015        || error_str.contains("i/o")
1016        || error_str.starts_with("io ")
1017        || error_str.ends_with(" io")
1018    {
1019        "io"
1020    } else if error_str.contains("not found") {
1021        "not_found"
1022    } else if error_str.contains("permission") || error_str.contains("unauthorized") {
1023        "permission"
1024    } else {
1025        "unknown"
1026    }
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031    use super::*;
1032
1033    // ========================================================================
1034    // MetricsCollector Tests
1035    // ========================================================================
1036
1037    #[test]
1038    fn test_metrics_collector_new() {
1039        let metrics = MetricsCollector::new();
1040        assert!(metrics.uptime() >= Duration::ZERO);
1041    }
1042
1043    #[test]
1044    fn test_metrics_collector_global() {
1045        let global1 = MetricsCollector::global();
1046        let global2 = MetricsCollector::global();
1047        // Both should point to the same instance
1048        assert!(std::ptr::eq(global1, global2));
1049    }
1050
1051    #[test]
1052    fn test_metrics_collector_basic_operations() {
1053        let metrics = MetricsCollector::new();
1054
1055        // Test message recording
1056        metrics.record_message("modbus", "rx");
1057        metrics.record_message("modbus", "tx");
1058
1059        // Test read recording
1060        metrics.record_read("modbus", true, Duration::from_micros(100));
1061        metrics.record_read("modbus", false, Duration::from_micros(200));
1062
1063        // Test write recording
1064        metrics.record_write("opcua", true, Duration::from_micros(50));
1065
1066        // Test device count
1067        metrics.set_devices_active(10);
1068
1069        let snapshot = metrics.snapshot();
1070        assert_eq!(snapshot.devices_active, 10);
1071    }
1072
1073    #[test]
1074    fn test_metrics_collector_request_timing() {
1075        let metrics = MetricsCollector::new();
1076
1077        // Test direct request recording
1078        metrics.record_request("modbus", "read");
1079        metrics.record_request_duration("modbus", "read", Duration::from_micros(150));
1080
1081        // Test time_request helper
1082        {
1083            let _timer = metrics.time_request("bacnet", "subscribe");
1084            std::thread::sleep(Duration::from_millis(5));
1085        } // Timer records on drop
1086    }
1087
1088    #[test]
1089    fn test_metrics_collector_error_recording() {
1090        let metrics = MetricsCollector::new();
1091
1092        metrics.record_error("modbus", "timeout");
1093        metrics.record_error("modbus", "connection");
1094        metrics.record_error("opcua", "protocol");
1095    }
1096
1097    #[test]
1098    fn test_metrics_collector_tick_recording() {
1099        let metrics = MetricsCollector::new();
1100
1101        for i in 0..5 {
1102            metrics.record_tick(Duration::from_millis(10 * (i + 1)));
1103        }
1104
1105        let snapshot = metrics.snapshot();
1106        assert_eq!(snapshot.ticks_total, 5);
1107    }
1108
1109    #[test]
1110    fn test_metrics_collector_connections() {
1111        let metrics = MetricsCollector::new();
1112
1113        metrics.set_connections_active("modbus", 5);
1114        metrics.inc_connections("modbus");
1115        metrics.dec_connections("modbus");
1116    }
1117
1118    #[test]
1119    fn test_metrics_collector_device_points() {
1120        let metrics = MetricsCollector::new();
1121
1122        metrics.set_points_total(1000);
1123        metrics.set_device_points("modbus", "device-001", 50);
1124        metrics.set_device_points("modbus", "device-002", 75);
1125
1126        let snapshot = metrics.snapshot();
1127        assert_eq!(snapshot.points_total, 1000);
1128
1129        // Test removal
1130        metrics.remove_device_points("modbus", "device-001");
1131    }
1132
1133    #[test]
1134    fn test_metrics_collector_system_metrics() {
1135        let metrics = MetricsCollector::new();
1136
1137        metrics.set_memory_bytes(1024 * 1024 * 512); // 512 MB
1138        metrics.set_cpu_percent(45.5);
1139
1140        let snapshot = metrics.snapshot();
1141        assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 512);
1142        assert!((snapshot.cpu_percent - 45.5).abs() < 0.001);
1143
1144        // Test convenience method
1145        metrics.update_system_metrics(1024 * 1024 * 256, 30.0);
1146        let snapshot = metrics.snapshot();
1147        assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 256);
1148        assert!((snapshot.cpu_percent - 30.0).abs() < 0.001);
1149
1150        // Test clamping
1151        metrics.set_cpu_percent(150.0); // Should be clamped to 100
1152        let snapshot = metrics.snapshot();
1153        assert!((snapshot.cpu_percent - 100.0).abs() < 0.001);
1154
1155        metrics.set_cpu_percent(-10.0); // Should be clamped to 0
1156        let snapshot = metrics.snapshot();
1157        assert!(snapshot.cpu_percent.abs() < 0.001);
1158    }
1159
1160    #[test]
1161    fn test_metrics_collector_prometheus_export() {
1162        let metrics = MetricsCollector::new();
1163
1164        metrics.record_read("modbus", true, Duration::from_micros(100));
1165        metrics.set_devices_active(5);
1166
1167        let prometheus_output = metrics.export_prometheus();
1168        assert!(!prometheus_output.is_empty());
1169        assert!(prometheus_output.contains(METRIC_PREFIX));
1170        assert!(prometheus_output.contains("devices_active"));
1171    }
1172
1173    #[test]
1174    fn test_metrics_collector_event_recording() {
1175        let metrics = MetricsCollector::new();
1176
1177        metrics.record_event("device_added");
1178        metrics.record_event("device_removed");
1179        metrics.record_event("engine_started");
1180    }
1181
1182    #[test]
1183    fn test_metrics_collector_snapshot() {
1184        let metrics = MetricsCollector::new();
1185
1186        metrics.set_devices_active(10);
1187        metrics.set_points_total(500);
1188        metrics.set_memory_bytes(1024 * 1024);
1189        metrics.set_cpu_percent(25.0);
1190
1191        let snapshot = metrics.snapshot();
1192        assert_eq!(snapshot.devices_active, 10);
1193        assert_eq!(snapshot.points_total, 500);
1194        assert_eq!(snapshot.memory_bytes, 1024 * 1024);
1195        assert!((snapshot.cpu_percent - 25.0).abs() < 0.001);
1196
1197        let detailed = metrics.detailed_snapshot();
1198        assert_eq!(detailed.basic.devices_active, 10);
1199        assert!(detailed.start_time_unix > 0);
1200    }
1201
1202    // ========================================================================
1203    // LatencyStats Tests
1204    // ========================================================================
1205
1206    #[test]
1207    fn test_latency_stats_from_samples() {
1208        let samples: Vec<Duration> = vec![
1209            Duration::from_micros(100),
1210            Duration::from_micros(200),
1211            Duration::from_micros(300),
1212            Duration::from_micros(400),
1213            Duration::from_micros(500),
1214        ];
1215
1216        let stats = LatencyStats::from_samples(&samples);
1217        assert_eq!(stats.min_us, 100);
1218        assert_eq!(stats.max_us, 500);
1219        assert_eq!(stats.avg_us, 300);
1220        assert_eq!(stats.count, 5);
1221        assert!(stats.p50_us >= 200 && stats.p50_us <= 300);
1222    }
1223
1224    #[test]
1225    fn test_latency_stats_empty() {
1226        let samples: Vec<Duration> = vec![];
1227        let stats = LatencyStats::from_samples(&samples);
1228
1229        assert_eq!(stats.min_us, 0);
1230        assert_eq!(stats.max_us, 0);
1231        assert_eq!(stats.avg_us, 0);
1232        assert_eq!(stats.count, 0);
1233    }
1234
1235    #[test]
1236    fn test_latency_stats_single_sample() {
1237        let samples = vec![Duration::from_micros(150)];
1238        let stats = LatencyStats::from_samples(&samples);
1239
1240        assert_eq!(stats.min_us, 150);
1241        assert_eq!(stats.max_us, 150);
1242        assert_eq!(stats.avg_us, 150);
1243        assert_eq!(stats.count, 1);
1244    }
1245
1246    #[test]
1247    fn test_latency_stats_percentiles() {
1248        // Create 100 samples for accurate percentile testing
1249        let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1250
1251        let stats = LatencyStats::from_samples(&samples);
1252        assert_eq!(stats.min_us, 1);
1253        assert_eq!(stats.max_us, 100);
1254        // Note: percentile calculation uses integer division, so exact values may vary slightly
1255        assert!(stats.p50_us >= 50 && stats.p50_us <= 51);
1256        assert!(stats.p90_us >= 90 && stats.p90_us <= 91);
1257        assert!(stats.p95_us >= 95 && stats.p95_us <= 96);
1258        assert!(stats.p99_us >= 99 && stats.p99_us <= 100);
1259    }
1260
1261    #[test]
1262    fn test_latency_stats_is_latency_high() {
1263        let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1264        let stats = LatencyStats::from_samples(&samples);
1265
1266        assert!(stats.is_latency_high(50)); // p99 (99) > 50
1267        assert!(!stats.is_latency_high(100)); // p99 (99) <= 100
1268    }
1269
1270    #[test]
1271    fn test_latency_stats_stddev() {
1272        let samples: Vec<Duration> = vec![
1273            Duration::from_micros(100),
1274            Duration::from_micros(100),
1275            Duration::from_micros(100),
1276        ];
1277        let stats = LatencyStats::from_samples(&samples);
1278        assert_eq!(stats.stddev_us, 0); // All same values, no deviation
1279    }
1280
1281    // ========================================================================
1282    // Timer Tests
1283    // ========================================================================
1284
1285    #[test]
1286    fn test_timer_start_stop() {
1287        let timer = Timer::start();
1288        std::thread::sleep(Duration::from_millis(10));
1289        let elapsed = timer.stop();
1290        assert!(elapsed >= Duration::from_millis(10));
1291    }
1292
1293    #[test]
1294    fn test_timer_elapsed() {
1295        let timer = Timer::start();
1296        std::thread::sleep(Duration::from_millis(5));
1297        let elapsed1 = timer.elapsed();
1298        std::thread::sleep(Duration::from_millis(5));
1299        let elapsed2 = timer.elapsed();
1300        assert!(elapsed2 > elapsed1);
1301    }
1302
1303    #[test]
1304    fn test_timer_reset() {
1305        let mut timer = Timer::start();
1306        std::thread::sleep(Duration::from_millis(10));
1307        timer.reset();
1308        let elapsed = timer.elapsed();
1309        assert!(elapsed < Duration::from_millis(10));
1310    }
1311
1312    #[test]
1313    fn test_timer_default() {
1314        let timer = Timer::default();
1315        assert!(timer.elapsed() >= Duration::ZERO);
1316    }
1317
1318    // ========================================================================
1319    // RequestTimer Tests
1320    // ========================================================================
1321
1322    #[test]
1323    fn test_request_timer_drop_records() {
1324        let metrics = MetricsCollector::new();
1325        {
1326            let _timer = metrics.time_request("test_proto", "test_op");
1327            std::thread::sleep(Duration::from_millis(5));
1328        }
1329        // Duration should be recorded on drop
1330        // We verify this works by checking the prometheus export contains the metric
1331        let output = metrics.export_prometheus();
1332        assert!(output.contains(&format!("{}_request_duration_seconds", METRIC_PREFIX)));
1333    }
1334
1335    #[test]
1336    fn test_request_timer_manual_record() {
1337        let metrics = MetricsCollector::new();
1338        let timer = metrics.time_request("test_proto", "manual_test");
1339        std::thread::sleep(Duration::from_millis(5));
1340        let elapsed = timer.record();
1341        assert!(elapsed >= Duration::from_millis(5));
1342    }
1343
1344    #[test]
1345    fn test_request_timer_discard() {
1346        let metrics = MetricsCollector::new();
1347        let timer = metrics.time_request("test_proto", "discard_test");
1348        timer.discard(); // Should not record
1349    }
1350
1351    // ========================================================================
1352    // Utility Function Tests
1353    // ========================================================================
1354
1355    #[test]
1356    fn test_classify_error() {
1357        assert_eq!(classify_error(&"Connection timeout"), "timeout");
1358        assert_eq!(classify_error(&"Connection refused"), "connection");
1359        assert_eq!(classify_error(&"I/O error occurred"), "io");
1360        assert_eq!(classify_error(&"Protocol violation"), "protocol");
1361        assert_eq!(classify_error(&"Invalid message format"), "protocol");
1362        assert_eq!(classify_error(&"Device not found"), "not_found");
1363        assert_eq!(classify_error(&"Permission denied"), "permission");
1364        assert_eq!(classify_error(&"Some random error"), "unknown");
1365    }
1366
1367    // ========================================================================
1368    // Constants Tests
1369    // ========================================================================
1370
1371    #[test]
1372    fn test_metric_prefix() {
1373        assert_eq!(METRIC_PREFIX, "mabi");
1374    }
1375
1376    #[test]
1377    fn test_latency_buckets() {
1378        assert!(!LATENCY_BUCKETS.is_empty());
1379        // Check buckets are sorted
1380        for i in 1..LATENCY_BUCKETS.len() {
1381            assert!(LATENCY_BUCKETS[i] > LATENCY_BUCKETS[i - 1]);
1382        }
1383    }
1384
1385    #[test]
1386    fn test_tick_buckets() {
1387        assert!(!TICK_BUCKETS.is_empty());
1388        // Check buckets are sorted
1389        for i in 1..TICK_BUCKETS.len() {
1390            assert!(TICK_BUCKETS[i] > TICK_BUCKETS[i - 1]);
1391        }
1392    }
1393}