Skip to main content

mabi_core/
metrics.rs

1//! Metrics collection and export.
2//!
3//! This module provides a comprehensive metrics collection system for Mabinogion protocol simulator.
4//! It supports:
5//! - Prometheus-compatible metrics export
6//! - Global registry with lazy initialization
7//! - Protocol-specific and device-specific metrics
8//! - System metrics (CPU, memory)
9//! - Request timing with automatic observation via RAII
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use mabi_core::metrics::{MetricsCollector, GLOBAL_REGISTRY, measure_request};
15//!
16//! let metrics = MetricsCollector::global();
17//! metrics.record_read("modbus", true, Duration::from_micros(500));
18//!
19//! // Using the measure_request! macro
20//! measure_request!(metrics, "modbus", "read", {
21//!     // ... operation code
22//! });
23//! ```
24
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use once_cell::sync::Lazy;
30use parking_lot::RwLock;
31use prometheus::{
32    Encoder, Gauge, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
33    IntGaugeVec, Opts, Registry, TextEncoder,
34};
35use serde::{Deserialize, Serialize};
36
37// ============================================================================
38// Constants
39// ============================================================================
40
41/// Metric name prefix for all Mabinogion metrics.
42pub const METRIC_PREFIX: &str = "mabi";
43
44/// Standard histogram buckets for latency measurements (in seconds).
45/// Covers range from 0.1ms to 10s with exponential distribution.
46pub const LATENCY_BUCKETS: &[f64] = &[
47    0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
48];
49
50/// Standard histogram buckets for tick duration measurements (in seconds).
51/// Optimized for sub-millisecond to 100ms range.
52pub const TICK_BUCKETS: &[f64] = &[0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1];
53
54// ============================================================================
55// Global Registry
56// ============================================================================
57
58/// Global Prometheus registry for all metrics.
59/// This registry is shared across all MetricsCollector instances.
60pub static GLOBAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
61
62/// Global metrics collector instance (singleton).
63static GLOBAL_METRICS: Lazy<MetricsCollector> = Lazy::new(|| {
64    MetricsCollector::with_registry(GLOBAL_REGISTRY.clone())
65});
66
67// ============================================================================
68// Metrics Collector
69// ============================================================================
70
71/// Metrics collector for the simulator.
72///
73/// Provides comprehensive metrics collection including:
74/// - Request/response counters and timings
75/// - Device and connection tracking
76/// - System resource monitoring (CPU, memory)
77/// - Protocol-specific metrics
78#[derive(Clone)]
79pub struct MetricsCollector {
80    registry: Arc<Registry>,
81    inner: Arc<MetricsInner>,
82}
83
84/// Internal metrics storage.
85struct MetricsInner {
86    // === Counters ===
87    /// Total requests by protocol and operation type
88    requests_total: IntCounterVec,
89    /// Total read operations by protocol and status
90    reads_total: IntCounterVec,
91    /// Total write operations by protocol and status
92    writes_total: IntCounterVec,
93    /// Total errors by protocol and error type
94    errors_total: IntCounterVec,
95    /// Total engine ticks
96    ticks_total: IntCounter,
97    /// Total messages by protocol and direction
98    messages_total: IntCounterVec,
99    /// Total events by type
100    events_total: IntCounterVec,
101
102    // === Gauges ===
103    /// Number of active devices
104    devices_active: IntGauge,
105    /// Active connections per protocol
106    connections_active: IntGaugeVec,
107    /// Total data points
108    points_total: IntGauge,
109    /// Data points per protocol and device
110    points_by_device: IntGaugeVec,
111    /// Memory usage in bytes
112    memory_bytes: IntGauge,
113    /// CPU usage percentage (0-100)
114    cpu_percent: Gauge,
115    /// Current tick rate (ticks per second)
116    tick_rate: Gauge,
117
118    // === Histograms ===
119    /// Request duration by protocol and operation
120    request_duration: HistogramVec,
121    /// Message latency by protocol
122    message_latency: HistogramVec,
123    /// Engine tick duration
124    tick_duration: Histogram,
125    /// Read operation duration by protocol
126    read_duration: HistogramVec,
127    /// Write operation duration by protocol
128    write_duration: HistogramVec,
129
130    // === Internal tracking ===
131    start_time: Instant,
132    last_tick_time: RwLock<Instant>,
133    tick_count_for_rate: AtomicU64,
134}
135
136impl MetricsCollector {
137    /// Create a new metrics collector with a fresh registry.
138    pub fn new() -> Self {
139        let registry = Registry::new();
140        Self::with_registry(registry)
141    }
142
143    /// Get the global metrics collector instance.
144    ///
145    /// This returns a shared instance that uses the global registry.
146    /// Use this when you want all metrics to be collected in one place.
147    pub fn global() -> &'static Self {
148        &GLOBAL_METRICS
149    }
150
151    /// Create a metrics collector with a custom registry.
152    pub fn with_registry(registry: Registry) -> Self {
153        let inner = MetricsInner::new(&registry);
154
155        Self {
156            registry: Arc::new(registry),
157            inner: Arc::new(inner),
158        }
159    }
160
161    /// Get the Prometheus registry.
162    pub fn registry(&self) -> &Registry {
163        &self.registry
164    }
165
166    // ========================================================================
167    // Request/Operation Recording
168    // ========================================================================
169
170    /// Record a request by protocol and operation.
171    ///
172    /// This is the primary method for tracking request counts and should be
173    /// used in conjunction with `record_request_duration` or `time_request`.
174    pub fn record_request(&self, protocol: &str, operation: &str) {
175        self.inner
176            .requests_total
177            .with_label_values(&[protocol, operation])
178            .inc();
179    }
180
181    /// Record a request duration.
182    pub fn record_request_duration(&self, protocol: &str, operation: &str, duration: Duration) {
183        self.inner
184            .request_duration
185            .with_label_values(&[protocol, operation])
186            .observe(duration.as_secs_f64());
187    }
188
189    /// Create a request timer for automatic duration recording.
190    ///
191    /// The timer automatically records the duration when dropped.
192    ///
193    /// # Example
194    ///
195    /// ```rust,ignore
196    /// let _timer = metrics.time_request("modbus", "read");
197    /// // ... do work ...
198    /// // duration is automatically recorded when _timer goes out of scope
199    /// ```
200    pub fn time_request(&self, protocol: &str, operation: &str) -> RequestTimer {
201        self.record_request(protocol, operation);
202        RequestTimer::new(
203            self.inner
204                .request_duration
205                .with_label_values(&[protocol, operation]),
206        )
207    }
208
209    /// Record a message (for protocol-level message tracking).
210    pub fn record_message(&self, protocol: &str, direction: &str) {
211        self.inner
212            .messages_total
213            .with_label_values(&[protocol, direction])
214            .inc();
215    }
216
217    /// Record a read operation with timing and status.
218    pub fn record_read(&self, protocol: &str, success: bool, duration: Duration) {
219        let status = if success { "success" } else { "error" };
220        self.inner
221            .reads_total
222            .with_label_values(&[protocol, status])
223            .inc();
224        self.inner
225            .read_duration
226            .with_label_values(&[protocol])
227            .observe(duration.as_secs_f64());
228
229        // Also record as a request
230        self.inner
231            .requests_total
232            .with_label_values(&[protocol, "read"])
233            .inc();
234        self.inner
235            .request_duration
236            .with_label_values(&[protocol, "read"])
237            .observe(duration.as_secs_f64());
238    }
239
240    /// Record a write operation with timing and status.
241    pub fn record_write(&self, protocol: &str, success: bool, duration: Duration) {
242        let status = if success { "success" } else { "error" };
243        self.inner
244            .writes_total
245            .with_label_values(&[protocol, status])
246            .inc();
247        self.inner
248            .write_duration
249            .with_label_values(&[protocol])
250            .observe(duration.as_secs_f64());
251
252        // Also record as a request
253        self.inner
254            .requests_total
255            .with_label_values(&[protocol, "write"])
256            .inc();
257        self.inner
258            .request_duration
259            .with_label_values(&[protocol, "write"])
260            .observe(duration.as_secs_f64());
261    }
262
263    // ========================================================================
264    // Error Recording
265    // ========================================================================
266
267    /// Record an error by protocol and error type.
268    pub fn record_error(&self, protocol: &str, error_type: &str) {
269        self.inner
270            .errors_total
271            .with_label_values(&[protocol, error_type])
272            .inc();
273    }
274
275    // ========================================================================
276    // Engine/Tick Recording
277    // ========================================================================
278
279    /// Record an engine tick with duration.
280    pub fn record_tick(&self, duration: Duration) {
281        self.inner.ticks_total.inc();
282        self.inner.tick_duration.observe(duration.as_secs_f64());
283
284        // Update tick rate calculation
285        let count = self.inner.tick_count_for_rate.fetch_add(1, Ordering::Relaxed) + 1;
286        let mut last_time = self.inner.last_tick_time.write();
287        let elapsed = last_time.elapsed();
288
289        // Update tick rate every second
290        if elapsed >= Duration::from_secs(1) {
291            let rate = count as f64 / elapsed.as_secs_f64();
292            self.inner.tick_rate.set(rate);
293            self.inner.tick_count_for_rate.store(0, Ordering::Relaxed);
294            *last_time = Instant::now();
295        }
296    }
297
298    /// Record message latency.
299    pub fn record_latency(&self, protocol: &str, latency: Duration) {
300        self.inner
301            .message_latency
302            .with_label_values(&[protocol])
303            .observe(latency.as_secs_f64());
304    }
305
306    /// Record an event by type.
307    pub fn record_event(&self, event_type: &str) {
308        self.inner
309            .events_total
310            .with_label_values(&[event_type])
311            .inc();
312    }
313
314    // ========================================================================
315    // Gauge Setters
316    // ========================================================================
317
318    /// Set the number of active devices.
319    pub fn set_devices_active(&self, count: i64) {
320        self.inner.devices_active.set(count);
321    }
322
323    /// Set active connections for a protocol.
324    pub fn set_connections_active(&self, protocol: &str, count: i64) {
325        self.inner
326            .connections_active
327            .with_label_values(&[protocol])
328            .set(count);
329    }
330
331    /// Increment active connections for a protocol.
332    pub fn inc_connections(&self, protocol: &str) {
333        self.inner
334            .connections_active
335            .with_label_values(&[protocol])
336            .inc();
337    }
338
339    /// Decrement active connections for a protocol.
340    pub fn dec_connections(&self, protocol: &str) {
341        self.inner
342            .connections_active
343            .with_label_values(&[protocol])
344            .dec();
345    }
346
347    /// Set total data points.
348    pub fn set_points_total(&self, count: i64) {
349        self.inner.points_total.set(count);
350    }
351
352    /// Set data points for a specific device.
353    pub fn set_device_points(&self, protocol: &str, device_id: &str, count: i64) {
354        self.inner
355            .points_by_device
356            .with_label_values(&[protocol, device_id])
357            .set(count);
358    }
359
360    /// Remove device points metrics (when device is removed).
361    pub fn remove_device_points(&self, protocol: &str, device_id: &str) {
362        // Set to 0 to indicate removal (Prometheus doesn't support metric deletion)
363        self.inner
364            .points_by_device
365            .with_label_values(&[protocol, device_id])
366            .set(0);
367    }
368
369    // ========================================================================
370    // System Metrics
371    // ========================================================================
372
373    /// Set memory usage in bytes.
374    pub fn set_memory_bytes(&self, bytes: i64) {
375        self.inner.memory_bytes.set(bytes);
376    }
377
378    /// Set CPU usage percentage (0-100).
379    pub fn set_cpu_percent(&self, percent: f64) {
380        self.inner.cpu_percent.set(percent.clamp(0.0, 100.0));
381    }
382
383    /// Update system metrics (memory and CPU).
384    ///
385    /// This is a convenience method that updates both memory and CPU metrics.
386    pub fn update_system_metrics(&self, memory_bytes: i64, cpu_percent: f64) {
387        self.set_memory_bytes(memory_bytes);
388        self.set_cpu_percent(cpu_percent);
389    }
390
391    // ========================================================================
392    // Getters & Export
393    // ========================================================================
394
395    /// Get uptime since the metrics collector was created.
396    pub fn uptime(&self) -> Duration {
397        self.inner.start_time.elapsed()
398    }
399
400    /// Get the current tick rate (ticks per second).
401    pub fn tick_rate(&self) -> f64 {
402        self.inner.tick_rate.get()
403    }
404
405    /// Get a snapshot of current metrics.
406    pub fn snapshot(&self) -> MetricsSnapshot {
407        MetricsSnapshot {
408            uptime_secs: self.uptime().as_secs(),
409            devices_active: self.inner.devices_active.get() as u64,
410            points_total: self.inner.points_total.get() as u64,
411            memory_bytes: self.inner.memory_bytes.get() as u64,
412            cpu_percent: self.inner.cpu_percent.get(),
413            ticks_total: self.inner.ticks_total.get(),
414            tick_rate: self.inner.tick_rate.get(),
415        }
416    }
417
418    /// Get a detailed snapshot with additional information.
419    pub fn detailed_snapshot(&self) -> DetailedMetricsSnapshot {
420        DetailedMetricsSnapshot {
421            basic: self.snapshot(),
422            start_time_unix: std::time::SystemTime::now()
423                .duration_since(std::time::UNIX_EPOCH)
424                .map(|d| d.as_secs() - self.uptime().as_secs())
425                .unwrap_or(0),
426        }
427    }
428
429    /// Export metrics in Prometheus text format.
430    pub fn export_prometheus(&self) -> String {
431        let encoder = TextEncoder::new();
432        let metric_families = self.registry.gather();
433        let mut buffer = Vec::new();
434        encoder.encode(&metric_families, &mut buffer).unwrap();
435        String::from_utf8(buffer).unwrap()
436    }
437
438    /// Export metrics from the global registry.
439    pub fn export_global_prometheus() -> String {
440        let encoder = TextEncoder::new();
441        let metric_families = GLOBAL_REGISTRY.gather();
442        let mut buffer = Vec::new();
443        encoder.encode(&metric_families, &mut buffer).unwrap();
444        String::from_utf8(buffer).unwrap()
445    }
446}
447
448impl Default for MetricsCollector {
449    fn default() -> Self {
450        Self::new()
451    }
452}
453
454impl MetricsInner {
455    fn new(registry: &Registry) -> Self {
456        let now = Instant::now();
457
458        // ====================================================================
459        // Counters
460        // ====================================================================
461
462        let requests_total = IntCounterVec::new(
463            Opts::new(
464                format!("{}_requests_total", METRIC_PREFIX),
465                "Total number of protocol requests",
466            ),
467            &["protocol", "operation"],
468        )
469        .unwrap();
470        registry.register(Box::new(requests_total.clone())).unwrap();
471
472        let messages_total = IntCounterVec::new(
473            Opts::new(
474                format!("{}_messages_total", METRIC_PREFIX),
475                "Total messages processed",
476            ),
477            &["protocol", "direction"],
478        )
479        .unwrap();
480        registry.register(Box::new(messages_total.clone())).unwrap();
481
482        let reads_total = IntCounterVec::new(
483            Opts::new(
484                format!("{}_reads_total", METRIC_PREFIX),
485                "Total read operations",
486            ),
487            &["protocol", "status"],
488        )
489        .unwrap();
490        registry.register(Box::new(reads_total.clone())).unwrap();
491
492        let writes_total = IntCounterVec::new(
493            Opts::new(
494                format!("{}_writes_total", METRIC_PREFIX),
495                "Total write operations",
496            ),
497            &["protocol", "status"],
498        )
499        .unwrap();
500        registry.register(Box::new(writes_total.clone())).unwrap();
501
502        let errors_total = IntCounterVec::new(
503            Opts::new(
504                format!("{}_errors_total", METRIC_PREFIX),
505                "Total errors by protocol and type",
506            ),
507            &["protocol", "error_type"],
508        )
509        .unwrap();
510        registry.register(Box::new(errors_total.clone())).unwrap();
511
512        let ticks_total = IntCounter::new(
513            format!("{}_ticks_total", METRIC_PREFIX),
514            "Total engine ticks processed",
515        )
516        .unwrap();
517        registry.register(Box::new(ticks_total.clone())).unwrap();
518
519        let events_total = IntCounterVec::new(
520            Opts::new(
521                format!("{}_events_total", METRIC_PREFIX),
522                "Total events by type",
523            ),
524            &["event_type"],
525        )
526        .unwrap();
527        registry.register(Box::new(events_total.clone())).unwrap();
528
529        // ====================================================================
530        // Gauges
531        // ====================================================================
532
533        let devices_active = IntGauge::new(
534            format!("{}_devices_active", METRIC_PREFIX),
535            "Number of active simulated devices",
536        )
537        .unwrap();
538        registry.register(Box::new(devices_active.clone())).unwrap();
539
540        let connections_active = IntGaugeVec::new(
541            Opts::new(
542                format!("{}_connections_active", METRIC_PREFIX),
543                "Number of active connections per protocol",
544            ),
545            &["protocol"],
546        )
547        .unwrap();
548        registry
549            .register(Box::new(connections_active.clone()))
550            .unwrap();
551
552        let points_total = IntGauge::new(
553            format!("{}_data_points_total", METRIC_PREFIX),
554            "Total number of data points across all devices",
555        )
556        .unwrap();
557        registry.register(Box::new(points_total.clone())).unwrap();
558
559        let points_by_device = IntGaugeVec::new(
560            Opts::new(
561                format!("{}_data_points_by_device", METRIC_PREFIX),
562                "Number of data points per device",
563            ),
564            &["protocol", "device_id"],
565        )
566        .unwrap();
567        registry
568            .register(Box::new(points_by_device.clone()))
569            .unwrap();
570
571        let memory_bytes = IntGauge::new(
572            format!("{}_memory_usage_bytes", METRIC_PREFIX),
573            "Current memory usage in bytes",
574        )
575        .unwrap();
576        registry.register(Box::new(memory_bytes.clone())).unwrap();
577
578        let cpu_percent = Gauge::new(
579            format!("{}_cpu_usage_percent", METRIC_PREFIX),
580            "Current CPU usage percentage (0-100)",
581        )
582        .unwrap();
583        registry.register(Box::new(cpu_percent.clone())).unwrap();
584
585        let tick_rate = Gauge::new(
586            format!("{}_tick_rate", METRIC_PREFIX),
587            "Current tick rate (ticks per second)",
588        )
589        .unwrap();
590        registry.register(Box::new(tick_rate.clone())).unwrap();
591
592        // ====================================================================
593        // Histograms
594        // ====================================================================
595
596        let request_duration = HistogramVec::new(
597            HistogramOpts::new(
598                format!("{}_request_duration_seconds", METRIC_PREFIX),
599                "Request processing duration in seconds",
600            )
601            .buckets(LATENCY_BUCKETS.to_vec()),
602            &["protocol", "operation"],
603        )
604        .unwrap();
605        registry
606            .register(Box::new(request_duration.clone()))
607            .unwrap();
608
609        let message_latency = HistogramVec::new(
610            HistogramOpts::new(
611                format!("{}_message_latency_seconds", METRIC_PREFIX),
612                "Message latency in seconds",
613            )
614            .buckets(LATENCY_BUCKETS.to_vec()),
615            &["protocol"],
616        )
617        .unwrap();
618        registry
619            .register(Box::new(message_latency.clone()))
620            .unwrap();
621
622        let tick_duration = Histogram::with_opts(
623            HistogramOpts::new(
624                format!("{}_tick_duration_seconds", METRIC_PREFIX),
625                "Engine tick processing duration in seconds",
626            )
627            .buckets(TICK_BUCKETS.to_vec()),
628        )
629        .unwrap();
630        registry.register(Box::new(tick_duration.clone())).unwrap();
631
632        let read_duration = HistogramVec::new(
633            HistogramOpts::new(
634                format!("{}_read_duration_seconds", METRIC_PREFIX),
635                "Read operation duration in seconds",
636            )
637            .buckets(LATENCY_BUCKETS.to_vec()),
638            &["protocol"],
639        )
640        .unwrap();
641        registry.register(Box::new(read_duration.clone())).unwrap();
642
643        let write_duration = HistogramVec::new(
644            HistogramOpts::new(
645                format!("{}_write_duration_seconds", METRIC_PREFIX),
646                "Write operation duration in seconds",
647            )
648            .buckets(LATENCY_BUCKETS.to_vec()),
649            &["protocol"],
650        )
651        .unwrap();
652        registry.register(Box::new(write_duration.clone())).unwrap();
653
654        Self {
655            // Counters
656            requests_total,
657            messages_total,
658            reads_total,
659            writes_total,
660            errors_total,
661            ticks_total,
662            events_total,
663
664            // Gauges
665            devices_active,
666            connections_active,
667            points_total,
668            points_by_device,
669            memory_bytes,
670            cpu_percent,
671            tick_rate,
672
673            // Histograms
674            request_duration,
675            message_latency,
676            tick_duration,
677            read_duration,
678            write_duration,
679
680            // Internal tracking
681            start_time: now,
682            last_tick_time: RwLock::new(now),
683            tick_count_for_rate: AtomicU64::new(0),
684        }
685    }
686}
687
688// ============================================================================
689// Snapshot Types
690// ============================================================================
691
692/// Snapshot of current metrics.
693#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct MetricsSnapshot {
695    /// Uptime in seconds.
696    pub uptime_secs: u64,
697    /// Number of active devices.
698    pub devices_active: u64,
699    /// Total data points across all devices.
700    pub points_total: u64,
701    /// Memory usage in bytes.
702    pub memory_bytes: u64,
703    /// CPU usage percentage (0-100).
704    pub cpu_percent: f64,
705    /// Total engine ticks processed.
706    pub ticks_total: u64,
707    /// Current tick rate (ticks per second).
708    pub tick_rate: f64,
709}
710
711/// Detailed metrics snapshot with additional information.
712#[derive(Debug, Clone, Serialize, Deserialize)]
713pub struct DetailedMetricsSnapshot {
714    /// Basic metrics snapshot.
715    #[serde(flatten)]
716    pub basic: MetricsSnapshot,
717    /// Unix timestamp when the metrics collection started.
718    pub start_time_unix: u64,
719}
720
721// ============================================================================
722// Latency Statistics
723// ============================================================================
724
725/// Latency statistics with percentile calculations.
726#[derive(Debug, Clone, Default, Serialize, Deserialize)]
727pub struct LatencyStats {
728    /// Minimum latency in microseconds.
729    pub min_us: u64,
730    /// Maximum latency in microseconds.
731    pub max_us: u64,
732    /// Average latency in microseconds.
733    pub avg_us: u64,
734    /// Median (P50) latency in microseconds.
735    pub p50_us: u64,
736    /// P90 latency in microseconds.
737    pub p90_us: u64,
738    /// P95 latency in microseconds.
739    pub p95_us: u64,
740    /// P99 latency in microseconds.
741    pub p99_us: u64,
742    /// Sample count.
743    pub count: u64,
744    /// Standard deviation in microseconds.
745    pub stddev_us: u64,
746}
747
748impl LatencyStats {
749    /// Calculate statistics from duration samples.
750    pub fn from_samples(samples: &[Duration]) -> Self {
751        if samples.is_empty() {
752            return Self::default();
753        }
754
755        let mut sorted: Vec<u64> = samples.iter().map(|d| d.as_micros() as u64).collect();
756        sorted.sort_unstable();
757
758        let count = sorted.len();
759        let sum: u64 = sorted.iter().sum();
760        let avg = sum / count as u64;
761
762        // Calculate standard deviation
763        let variance: u64 = sorted
764            .iter()
765            .map(|&x| {
766                let diff = x.abs_diff(avg);
767                diff * diff
768            })
769            .sum::<u64>()
770            / count as u64;
771        let stddev = (variance as f64).sqrt() as u64;
772
773        Self {
774            min_us: sorted[0],
775            max_us: sorted[count - 1],
776            avg_us: avg,
777            p50_us: Self::percentile(&sorted, 50),
778            p90_us: Self::percentile(&sorted, 90),
779            p95_us: Self::percentile(&sorted, 95),
780            p99_us: Self::percentile(&sorted, 99),
781            count: count as u64,
782            stddev_us: stddev,
783        }
784    }
785
786    /// Calculate a specific percentile from sorted samples.
787    fn percentile(sorted: &[u64], p: usize) -> u64 {
788        if sorted.is_empty() {
789            return 0;
790        }
791        let idx = (sorted.len() * p / 100).min(sorted.len() - 1);
792        sorted[idx]
793    }
794
795    /// Check if the latency statistics indicate a performance issue.
796    ///
797    /// Returns true if P99 latency exceeds the threshold.
798    pub fn is_latency_high(&self, threshold_us: u64) -> bool {
799        self.p99_us > threshold_us
800    }
801}
802
803// ============================================================================
804// Timer Utilities
805// ============================================================================
806
807/// Simple timer for measuring operation duration.
808///
809/// Use this when you need manual control over timing.
810/// For automatic recording to Prometheus, use `RequestTimer` instead.
811pub struct Timer {
812    start: Instant,
813}
814
815impl Timer {
816    /// Start a new timer.
817    pub fn start() -> Self {
818        Self {
819            start: Instant::now(),
820        }
821    }
822
823    /// Get elapsed duration without consuming the timer.
824    pub fn elapsed(&self) -> Duration {
825        self.start.elapsed()
826    }
827
828    /// Stop the timer and return elapsed duration.
829    pub fn stop(self) -> Duration {
830        self.elapsed()
831    }
832
833    /// Reset the timer to now.
834    pub fn reset(&mut self) {
835        self.start = Instant::now();
836    }
837}
838
839impl Default for Timer {
840    fn default() -> Self {
841        Self::start()
842    }
843}
844
845/// Request timer that automatically records duration to Prometheus when dropped.
846///
847/// This implements the RAII pattern for request timing.
848/// The duration is recorded when the timer goes out of scope.
849///
850/// # Example
851///
852/// ```rust,ignore
853/// let _timer = metrics.time_request("modbus", "read");
854/// // ... perform the operation ...
855/// // duration is automatically recorded when _timer is dropped
856/// ```
857pub struct RequestTimer {
858    histogram: Histogram,
859    start: Instant,
860    recorded: bool,
861}
862
863impl RequestTimer {
864    /// Create a new request timer.
865    pub fn new(histogram: Histogram) -> Self {
866        Self {
867            histogram,
868            start: Instant::now(),
869            recorded: false,
870        }
871    }
872
873    /// Get elapsed duration without recording.
874    pub fn elapsed(&self) -> Duration {
875        self.start.elapsed()
876    }
877
878    /// Manually record the duration and mark as recorded.
879    ///
880    /// This prevents the Drop implementation from recording again.
881    pub fn record(mut self) -> Duration {
882        let elapsed = self.elapsed();
883        self.histogram.observe(elapsed.as_secs_f64());
884        self.recorded = true;
885        elapsed
886    }
887
888    /// Discard the timer without recording.
889    ///
890    /// Use this if you want to cancel timing (e.g., on error).
891    pub fn discard(mut self) {
892        self.recorded = true; // Prevent recording in Drop
893    }
894}
895
896impl Drop for RequestTimer {
897    fn drop(&mut self) {
898        if !self.recorded {
899            self.histogram.observe(self.start.elapsed().as_secs_f64());
900        }
901    }
902}
903
904// ============================================================================
905// System Metrics Collector
906// ============================================================================
907
908/// System metrics collector for CPU and memory monitoring.
909///
910/// This provides a way to periodically collect system metrics
911/// and update the MetricsCollector.
912#[cfg(feature = "system-metrics")]
913pub struct SystemMetricsCollector {
914    metrics: MetricsCollector,
915    interval: Duration,
916}
917
918#[cfg(feature = "system-metrics")]
919impl SystemMetricsCollector {
920    /// Create a new system metrics collector.
921    pub fn new(metrics: MetricsCollector, interval: Duration) -> Self {
922        Self { metrics, interval }
923    }
924
925    /// Start collecting system metrics in a background task.
926    ///
927    /// Returns a handle that can be used to stop collection.
928    pub fn start(self) -> tokio::task::JoinHandle<()> {
929        tokio::spawn(async move {
930            let mut interval = tokio::time::interval(self.interval);
931
932            loop {
933                interval.tick().await;
934                self.collect_once();
935            }
936        })
937    }
938
939    /// Collect system metrics once.
940    fn collect_once(&self) {
941        // Memory usage (process RSS)
942        #[cfg(target_os = "linux")]
943        if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
944            if let Some(line) = status.lines().find(|l| l.starts_with("VmRSS:")) {
945                if let Some(kb_str) = line.split_whitespace().nth(1) {
946                    if let Ok(kb) = kb_str.parse::<i64>() {
947                        self.metrics.set_memory_bytes(kb * 1024);
948                    }
949                }
950            }
951        }
952
953        // CPU usage would require more complex tracking (comparing with previous sample)
954        // For now, we skip automatic CPU collection
955    }
956}
957
958// ============================================================================
959// Macros
960// ============================================================================
961
962/// Measure request duration and record to metrics.
963///
964/// This macro times the execution of a block and records the duration.
965///
966/// # Example
967///
968/// ```rust,ignore
969/// let result = measure_request!(metrics, "modbus", "read", {
970///     device.read_registers(0, 10).await
971/// });
972/// ```
973#[macro_export]
974macro_rules! measure_request {
975    ($metrics:expr, $protocol:expr, $operation:expr, $block:expr) => {{
976        let _timer = $metrics.time_request($protocol, $operation);
977        let result = $block;
978        result
979    }};
980}
981
982/// Record an error with context.
983///
984/// This macro records an error and provides a consistent error type format.
985///
986/// # Example
987///
988/// ```rust,ignore
989/// record_error!(metrics, "modbus", err);
990/// ```
991#[macro_export]
992macro_rules! record_error {
993    ($metrics:expr, $protocol:expr, $error:expr) => {{
994        let error_type = $crate::metrics::classify_error(&$error);
995        $metrics.record_error($protocol, error_type);
996    }};
997}
998
999/// Classify an error into a category for metrics.
1000///
1001/// Returns a string suitable for use as a metric label.
1002pub fn classify_error<E: std::fmt::Display>(error: &E) -> &'static str {
1003    let error_str = error.to_string().to_lowercase();
1004
1005    if error_str.contains("timeout") {
1006        "timeout"
1007    } else if error_str.contains("connection") || error_str.contains("connect") {
1008        "connection"
1009    } else if error_str.contains("protocol") || error_str.contains("invalid") {
1010        "protocol"
1011    } else if error_str.contains(" io ") || error_str.contains("i/o") || error_str.starts_with("io ") || error_str.ends_with(" io") {
1012        "io"
1013    } else if error_str.contains("not found") {
1014        "not_found"
1015    } else if error_str.contains("permission") || error_str.contains("unauthorized") {
1016        "permission"
1017    } else {
1018        "unknown"
1019    }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use super::*;
1025
1026    // ========================================================================
1027    // MetricsCollector Tests
1028    // ========================================================================
1029
1030    #[test]
1031    fn test_metrics_collector_new() {
1032        let metrics = MetricsCollector::new();
1033        assert!(metrics.uptime() >= Duration::ZERO);
1034    }
1035
1036    #[test]
1037    fn test_metrics_collector_global() {
1038        let global1 = MetricsCollector::global();
1039        let global2 = MetricsCollector::global();
1040        // Both should point to the same instance
1041        assert!(std::ptr::eq(global1, global2));
1042    }
1043
1044    #[test]
1045    fn test_metrics_collector_basic_operations() {
1046        let metrics = MetricsCollector::new();
1047
1048        // Test message recording
1049        metrics.record_message("modbus", "rx");
1050        metrics.record_message("modbus", "tx");
1051
1052        // Test read recording
1053        metrics.record_read("modbus", true, Duration::from_micros(100));
1054        metrics.record_read("modbus", false, Duration::from_micros(200));
1055
1056        // Test write recording
1057        metrics.record_write("opcua", true, Duration::from_micros(50));
1058
1059        // Test device count
1060        metrics.set_devices_active(10);
1061
1062        let snapshot = metrics.snapshot();
1063        assert_eq!(snapshot.devices_active, 10);
1064    }
1065
1066    #[test]
1067    fn test_metrics_collector_request_timing() {
1068        let metrics = MetricsCollector::new();
1069
1070        // Test direct request recording
1071        metrics.record_request("modbus", "read");
1072        metrics.record_request_duration("modbus", "read", Duration::from_micros(150));
1073
1074        // Test time_request helper
1075        {
1076            let _timer = metrics.time_request("bacnet", "subscribe");
1077            std::thread::sleep(Duration::from_millis(5));
1078        } // Timer records on drop
1079    }
1080
1081    #[test]
1082    fn test_metrics_collector_error_recording() {
1083        let metrics = MetricsCollector::new();
1084
1085        metrics.record_error("modbus", "timeout");
1086        metrics.record_error("modbus", "connection");
1087        metrics.record_error("opcua", "protocol");
1088    }
1089
1090    #[test]
1091    fn test_metrics_collector_tick_recording() {
1092        let metrics = MetricsCollector::new();
1093
1094        for i in 0..5 {
1095            metrics.record_tick(Duration::from_millis(10 * (i + 1)));
1096        }
1097
1098        let snapshot = metrics.snapshot();
1099        assert_eq!(snapshot.ticks_total, 5);
1100    }
1101
1102    #[test]
1103    fn test_metrics_collector_connections() {
1104        let metrics = MetricsCollector::new();
1105
1106        metrics.set_connections_active("modbus", 5);
1107        metrics.inc_connections("modbus");
1108        metrics.dec_connections("modbus");
1109    }
1110
1111    #[test]
1112    fn test_metrics_collector_device_points() {
1113        let metrics = MetricsCollector::new();
1114
1115        metrics.set_points_total(1000);
1116        metrics.set_device_points("modbus", "device-001", 50);
1117        metrics.set_device_points("modbus", "device-002", 75);
1118
1119        let snapshot = metrics.snapshot();
1120        assert_eq!(snapshot.points_total, 1000);
1121
1122        // Test removal
1123        metrics.remove_device_points("modbus", "device-001");
1124    }
1125
1126    #[test]
1127    fn test_metrics_collector_system_metrics() {
1128        let metrics = MetricsCollector::new();
1129
1130        metrics.set_memory_bytes(1024 * 1024 * 512); // 512 MB
1131        metrics.set_cpu_percent(45.5);
1132
1133        let snapshot = metrics.snapshot();
1134        assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 512);
1135        assert!((snapshot.cpu_percent - 45.5).abs() < 0.001);
1136
1137        // Test convenience method
1138        metrics.update_system_metrics(1024 * 1024 * 256, 30.0);
1139        let snapshot = metrics.snapshot();
1140        assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 256);
1141        assert!((snapshot.cpu_percent - 30.0).abs() < 0.001);
1142
1143        // Test clamping
1144        metrics.set_cpu_percent(150.0); // Should be clamped to 100
1145        let snapshot = metrics.snapshot();
1146        assert!((snapshot.cpu_percent - 100.0).abs() < 0.001);
1147
1148        metrics.set_cpu_percent(-10.0); // Should be clamped to 0
1149        let snapshot = metrics.snapshot();
1150        assert!(snapshot.cpu_percent.abs() < 0.001);
1151    }
1152
1153    #[test]
1154    fn test_metrics_collector_prometheus_export() {
1155        let metrics = MetricsCollector::new();
1156
1157        metrics.record_read("modbus", true, Duration::from_micros(100));
1158        metrics.set_devices_active(5);
1159
1160        let prometheus_output = metrics.export_prometheus();
1161        assert!(!prometheus_output.is_empty());
1162        assert!(prometheus_output.contains(METRIC_PREFIX));
1163        assert!(prometheus_output.contains("devices_active"));
1164    }
1165
1166    #[test]
1167    fn test_metrics_collector_event_recording() {
1168        let metrics = MetricsCollector::new();
1169
1170        metrics.record_event("device_added");
1171        metrics.record_event("device_removed");
1172        metrics.record_event("engine_started");
1173    }
1174
1175    #[test]
1176    fn test_metrics_collector_snapshot() {
1177        let metrics = MetricsCollector::new();
1178
1179        metrics.set_devices_active(10);
1180        metrics.set_points_total(500);
1181        metrics.set_memory_bytes(1024 * 1024);
1182        metrics.set_cpu_percent(25.0);
1183
1184        let snapshot = metrics.snapshot();
1185        assert_eq!(snapshot.devices_active, 10);
1186        assert_eq!(snapshot.points_total, 500);
1187        assert_eq!(snapshot.memory_bytes, 1024 * 1024);
1188        assert!((snapshot.cpu_percent - 25.0).abs() < 0.001);
1189
1190        let detailed = metrics.detailed_snapshot();
1191        assert_eq!(detailed.basic.devices_active, 10);
1192        assert!(detailed.start_time_unix > 0);
1193    }
1194
1195    // ========================================================================
1196    // LatencyStats Tests
1197    // ========================================================================
1198
1199    #[test]
1200    fn test_latency_stats_from_samples() {
1201        let samples: Vec<Duration> = vec![
1202            Duration::from_micros(100),
1203            Duration::from_micros(200),
1204            Duration::from_micros(300),
1205            Duration::from_micros(400),
1206            Duration::from_micros(500),
1207        ];
1208
1209        let stats = LatencyStats::from_samples(&samples);
1210        assert_eq!(stats.min_us, 100);
1211        assert_eq!(stats.max_us, 500);
1212        assert_eq!(stats.avg_us, 300);
1213        assert_eq!(stats.count, 5);
1214        assert!(stats.p50_us >= 200 && stats.p50_us <= 300);
1215    }
1216
1217    #[test]
1218    fn test_latency_stats_empty() {
1219        let samples: Vec<Duration> = vec![];
1220        let stats = LatencyStats::from_samples(&samples);
1221
1222        assert_eq!(stats.min_us, 0);
1223        assert_eq!(stats.max_us, 0);
1224        assert_eq!(stats.avg_us, 0);
1225        assert_eq!(stats.count, 0);
1226    }
1227
1228    #[test]
1229    fn test_latency_stats_single_sample() {
1230        let samples = vec![Duration::from_micros(150)];
1231        let stats = LatencyStats::from_samples(&samples);
1232
1233        assert_eq!(stats.min_us, 150);
1234        assert_eq!(stats.max_us, 150);
1235        assert_eq!(stats.avg_us, 150);
1236        assert_eq!(stats.count, 1);
1237    }
1238
1239    #[test]
1240    fn test_latency_stats_percentiles() {
1241        // Create 100 samples for accurate percentile testing
1242        let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1243
1244        let stats = LatencyStats::from_samples(&samples);
1245        assert_eq!(stats.min_us, 1);
1246        assert_eq!(stats.max_us, 100);
1247        // Note: percentile calculation uses integer division, so exact values may vary slightly
1248        assert!(stats.p50_us >= 50 && stats.p50_us <= 51);
1249        assert!(stats.p90_us >= 90 && stats.p90_us <= 91);
1250        assert!(stats.p95_us >= 95 && stats.p95_us <= 96);
1251        assert!(stats.p99_us >= 99 && stats.p99_us <= 100);
1252    }
1253
1254    #[test]
1255    fn test_latency_stats_is_latency_high() {
1256        let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1257        let stats = LatencyStats::from_samples(&samples);
1258
1259        assert!(stats.is_latency_high(50)); // p99 (99) > 50
1260        assert!(!stats.is_latency_high(100)); // p99 (99) <= 100
1261    }
1262
1263    #[test]
1264    fn test_latency_stats_stddev() {
1265        let samples: Vec<Duration> = vec![
1266            Duration::from_micros(100),
1267            Duration::from_micros(100),
1268            Duration::from_micros(100),
1269        ];
1270        let stats = LatencyStats::from_samples(&samples);
1271        assert_eq!(stats.stddev_us, 0); // All same values, no deviation
1272    }
1273
1274    // ========================================================================
1275    // Timer Tests
1276    // ========================================================================
1277
1278    #[test]
1279    fn test_timer_start_stop() {
1280        let timer = Timer::start();
1281        std::thread::sleep(Duration::from_millis(10));
1282        let elapsed = timer.stop();
1283        assert!(elapsed >= Duration::from_millis(10));
1284    }
1285
1286    #[test]
1287    fn test_timer_elapsed() {
1288        let timer = Timer::start();
1289        std::thread::sleep(Duration::from_millis(5));
1290        let elapsed1 = timer.elapsed();
1291        std::thread::sleep(Duration::from_millis(5));
1292        let elapsed2 = timer.elapsed();
1293        assert!(elapsed2 > elapsed1);
1294    }
1295
1296    #[test]
1297    fn test_timer_reset() {
1298        let mut timer = Timer::start();
1299        std::thread::sleep(Duration::from_millis(10));
1300        timer.reset();
1301        let elapsed = timer.elapsed();
1302        assert!(elapsed < Duration::from_millis(10));
1303    }
1304
1305    #[test]
1306    fn test_timer_default() {
1307        let timer = Timer::default();
1308        assert!(timer.elapsed() >= Duration::ZERO);
1309    }
1310
1311    // ========================================================================
1312    // RequestTimer Tests
1313    // ========================================================================
1314
1315    #[test]
1316    fn test_request_timer_drop_records() {
1317        let metrics = MetricsCollector::new();
1318        {
1319            let _timer = metrics.time_request("test_proto", "test_op");
1320            std::thread::sleep(Duration::from_millis(5));
1321        }
1322        // Duration should be recorded on drop
1323        // We verify this works by checking the prometheus export contains the metric
1324        let output = metrics.export_prometheus();
1325        assert!(output.contains(&format!("{}_request_duration_seconds", METRIC_PREFIX)));
1326    }
1327
1328    #[test]
1329    fn test_request_timer_manual_record() {
1330        let metrics = MetricsCollector::new();
1331        let timer = metrics.time_request("test_proto", "manual_test");
1332        std::thread::sleep(Duration::from_millis(5));
1333        let elapsed = timer.record();
1334        assert!(elapsed >= Duration::from_millis(5));
1335    }
1336
1337    #[test]
1338    fn test_request_timer_discard() {
1339        let metrics = MetricsCollector::new();
1340        let timer = metrics.time_request("test_proto", "discard_test");
1341        timer.discard(); // Should not record
1342    }
1343
1344    // ========================================================================
1345    // Utility Function Tests
1346    // ========================================================================
1347
1348    #[test]
1349    fn test_classify_error() {
1350        assert_eq!(classify_error(&"Connection timeout"), "timeout");
1351        assert_eq!(classify_error(&"Connection refused"), "connection");
1352        assert_eq!(classify_error(&"I/O error occurred"), "io");
1353        assert_eq!(classify_error(&"Protocol violation"), "protocol");
1354        assert_eq!(classify_error(&"Invalid message format"), "protocol");
1355        assert_eq!(classify_error(&"Device not found"), "not_found");
1356        assert_eq!(classify_error(&"Permission denied"), "permission");
1357        assert_eq!(classify_error(&"Some random error"), "unknown");
1358    }
1359
1360    // ========================================================================
1361    // Constants Tests
1362    // ========================================================================
1363
1364    #[test]
1365    fn test_metric_prefix() {
1366        assert_eq!(METRIC_PREFIX, "mabi");
1367    }
1368
1369    #[test]
1370    fn test_latency_buckets() {
1371        assert!(!LATENCY_BUCKETS.is_empty());
1372        // Check buckets are sorted
1373        for i in 1..LATENCY_BUCKETS.len() {
1374            assert!(LATENCY_BUCKETS[i] > LATENCY_BUCKETS[i - 1]);
1375        }
1376    }
1377
1378    #[test]
1379    fn test_tick_buckets() {
1380        assert!(!TICK_BUCKETS.is_empty());
1381        // Check buckets are sorted
1382        for i in 1..TICK_BUCKETS.len() {
1383            assert!(TICK_BUCKETS[i] > TICK_BUCKETS[i - 1]);
1384        }
1385    }
1386}