1use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use once_cell::sync::Lazy;
30use parking_lot::RwLock;
31use prometheus::{
32 Encoder, Gauge, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
33 IntGaugeVec, Opts, Registry, TextEncoder,
34};
35use serde::{Deserialize, Serialize};
36
37pub const METRIC_PREFIX: &str = "mabi";
43
44pub const LATENCY_BUCKETS: &[f64] = &[
47 0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
48];
49
50pub const TICK_BUCKETS: &[f64] = &[0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1];
53
54pub static GLOBAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
61
62static GLOBAL_METRICS: Lazy<MetricsCollector> = Lazy::new(|| {
64 MetricsCollector::with_registry(GLOBAL_REGISTRY.clone())
65});
66
67#[derive(Clone)]
79pub struct MetricsCollector {
80 registry: Arc<Registry>,
81 inner: Arc<MetricsInner>,
82}
83
84struct MetricsInner {
86 requests_total: IntCounterVec,
89 reads_total: IntCounterVec,
91 writes_total: IntCounterVec,
93 errors_total: IntCounterVec,
95 ticks_total: IntCounter,
97 messages_total: IntCounterVec,
99 events_total: IntCounterVec,
101
102 devices_active: IntGauge,
105 connections_active: IntGaugeVec,
107 points_total: IntGauge,
109 points_by_device: IntGaugeVec,
111 memory_bytes: IntGauge,
113 cpu_percent: Gauge,
115 tick_rate: Gauge,
117
118 request_duration: HistogramVec,
121 message_latency: HistogramVec,
123 tick_duration: Histogram,
125 read_duration: HistogramVec,
127 write_duration: HistogramVec,
129
130 start_time: Instant,
132 last_tick_time: RwLock<Instant>,
133 tick_count_for_rate: AtomicU64,
134}
135
136impl MetricsCollector {
137 pub fn new() -> Self {
139 let registry = Registry::new();
140 Self::with_registry(registry)
141 }
142
143 pub fn global() -> &'static Self {
148 &GLOBAL_METRICS
149 }
150
151 pub fn with_registry(registry: Registry) -> Self {
153 let inner = MetricsInner::new(®istry);
154
155 Self {
156 registry: Arc::new(registry),
157 inner: Arc::new(inner),
158 }
159 }
160
161 pub fn registry(&self) -> &Registry {
163 &self.registry
164 }
165
166 pub fn record_request(&self, protocol: &str, operation: &str) {
175 self.inner
176 .requests_total
177 .with_label_values(&[protocol, operation])
178 .inc();
179 }
180
181 pub fn record_request_duration(&self, protocol: &str, operation: &str, duration: Duration) {
183 self.inner
184 .request_duration
185 .with_label_values(&[protocol, operation])
186 .observe(duration.as_secs_f64());
187 }
188
189 pub fn time_request(&self, protocol: &str, operation: &str) -> RequestTimer {
201 self.record_request(protocol, operation);
202 RequestTimer::new(
203 self.inner
204 .request_duration
205 .with_label_values(&[protocol, operation]),
206 )
207 }
208
209 pub fn record_message(&self, protocol: &str, direction: &str) {
211 self.inner
212 .messages_total
213 .with_label_values(&[protocol, direction])
214 .inc();
215 }
216
217 pub fn record_read(&self, protocol: &str, success: bool, duration: Duration) {
219 let status = if success { "success" } else { "error" };
220 self.inner
221 .reads_total
222 .with_label_values(&[protocol, status])
223 .inc();
224 self.inner
225 .read_duration
226 .with_label_values(&[protocol])
227 .observe(duration.as_secs_f64());
228
229 self.inner
231 .requests_total
232 .with_label_values(&[protocol, "read"])
233 .inc();
234 self.inner
235 .request_duration
236 .with_label_values(&[protocol, "read"])
237 .observe(duration.as_secs_f64());
238 }
239
240 pub fn record_write(&self, protocol: &str, success: bool, duration: Duration) {
242 let status = if success { "success" } else { "error" };
243 self.inner
244 .writes_total
245 .with_label_values(&[protocol, status])
246 .inc();
247 self.inner
248 .write_duration
249 .with_label_values(&[protocol])
250 .observe(duration.as_secs_f64());
251
252 self.inner
254 .requests_total
255 .with_label_values(&[protocol, "write"])
256 .inc();
257 self.inner
258 .request_duration
259 .with_label_values(&[protocol, "write"])
260 .observe(duration.as_secs_f64());
261 }
262
263 pub fn record_error(&self, protocol: &str, error_type: &str) {
269 self.inner
270 .errors_total
271 .with_label_values(&[protocol, error_type])
272 .inc();
273 }
274
275 pub fn record_tick(&self, duration: Duration) {
281 self.inner.ticks_total.inc();
282 self.inner.tick_duration.observe(duration.as_secs_f64());
283
284 let count = self.inner.tick_count_for_rate.fetch_add(1, Ordering::Relaxed) + 1;
286 let mut last_time = self.inner.last_tick_time.write();
287 let elapsed = last_time.elapsed();
288
289 if elapsed >= Duration::from_secs(1) {
291 let rate = count as f64 / elapsed.as_secs_f64();
292 self.inner.tick_rate.set(rate);
293 self.inner.tick_count_for_rate.store(0, Ordering::Relaxed);
294 *last_time = Instant::now();
295 }
296 }
297
298 pub fn record_latency(&self, protocol: &str, latency: Duration) {
300 self.inner
301 .message_latency
302 .with_label_values(&[protocol])
303 .observe(latency.as_secs_f64());
304 }
305
306 pub fn record_event(&self, event_type: &str) {
308 self.inner
309 .events_total
310 .with_label_values(&[event_type])
311 .inc();
312 }
313
314 pub fn set_devices_active(&self, count: i64) {
320 self.inner.devices_active.set(count);
321 }
322
323 pub fn set_connections_active(&self, protocol: &str, count: i64) {
325 self.inner
326 .connections_active
327 .with_label_values(&[protocol])
328 .set(count);
329 }
330
331 pub fn inc_connections(&self, protocol: &str) {
333 self.inner
334 .connections_active
335 .with_label_values(&[protocol])
336 .inc();
337 }
338
339 pub fn dec_connections(&self, protocol: &str) {
341 self.inner
342 .connections_active
343 .with_label_values(&[protocol])
344 .dec();
345 }
346
347 pub fn set_points_total(&self, count: i64) {
349 self.inner.points_total.set(count);
350 }
351
352 pub fn set_device_points(&self, protocol: &str, device_id: &str, count: i64) {
354 self.inner
355 .points_by_device
356 .with_label_values(&[protocol, device_id])
357 .set(count);
358 }
359
360 pub fn remove_device_points(&self, protocol: &str, device_id: &str) {
362 self.inner
364 .points_by_device
365 .with_label_values(&[protocol, device_id])
366 .set(0);
367 }
368
369 pub fn set_memory_bytes(&self, bytes: i64) {
375 self.inner.memory_bytes.set(bytes);
376 }
377
378 pub fn set_cpu_percent(&self, percent: f64) {
380 self.inner.cpu_percent.set(percent.clamp(0.0, 100.0));
381 }
382
383 pub fn update_system_metrics(&self, memory_bytes: i64, cpu_percent: f64) {
387 self.set_memory_bytes(memory_bytes);
388 self.set_cpu_percent(cpu_percent);
389 }
390
391 pub fn uptime(&self) -> Duration {
397 self.inner.start_time.elapsed()
398 }
399
400 pub fn tick_rate(&self) -> f64 {
402 self.inner.tick_rate.get()
403 }
404
405 pub fn snapshot(&self) -> MetricsSnapshot {
407 MetricsSnapshot {
408 uptime_secs: self.uptime().as_secs(),
409 devices_active: self.inner.devices_active.get() as u64,
410 points_total: self.inner.points_total.get() as u64,
411 memory_bytes: self.inner.memory_bytes.get() as u64,
412 cpu_percent: self.inner.cpu_percent.get(),
413 ticks_total: self.inner.ticks_total.get(),
414 tick_rate: self.inner.tick_rate.get(),
415 }
416 }
417
418 pub fn detailed_snapshot(&self) -> DetailedMetricsSnapshot {
420 DetailedMetricsSnapshot {
421 basic: self.snapshot(),
422 start_time_unix: std::time::SystemTime::now()
423 .duration_since(std::time::UNIX_EPOCH)
424 .map(|d| d.as_secs() - self.uptime().as_secs())
425 .unwrap_or(0),
426 }
427 }
428
429 pub fn export_prometheus(&self) -> String {
431 let encoder = TextEncoder::new();
432 let metric_families = self.registry.gather();
433 let mut buffer = Vec::new();
434 encoder.encode(&metric_families, &mut buffer).unwrap();
435 String::from_utf8(buffer).unwrap()
436 }
437
438 pub fn export_global_prometheus() -> String {
440 let encoder = TextEncoder::new();
441 let metric_families = GLOBAL_REGISTRY.gather();
442 let mut buffer = Vec::new();
443 encoder.encode(&metric_families, &mut buffer).unwrap();
444 String::from_utf8(buffer).unwrap()
445 }
446}
447
448impl Default for MetricsCollector {
449 fn default() -> Self {
450 Self::new()
451 }
452}
453
454impl MetricsInner {
455 fn new(registry: &Registry) -> Self {
456 let now = Instant::now();
457
458 let requests_total = IntCounterVec::new(
463 Opts::new(
464 format!("{}_requests_total", METRIC_PREFIX),
465 "Total number of protocol requests",
466 ),
467 &["protocol", "operation"],
468 )
469 .unwrap();
470 registry.register(Box::new(requests_total.clone())).unwrap();
471
472 let messages_total = IntCounterVec::new(
473 Opts::new(
474 format!("{}_messages_total", METRIC_PREFIX),
475 "Total messages processed",
476 ),
477 &["protocol", "direction"],
478 )
479 .unwrap();
480 registry.register(Box::new(messages_total.clone())).unwrap();
481
482 let reads_total = IntCounterVec::new(
483 Opts::new(
484 format!("{}_reads_total", METRIC_PREFIX),
485 "Total read operations",
486 ),
487 &["protocol", "status"],
488 )
489 .unwrap();
490 registry.register(Box::new(reads_total.clone())).unwrap();
491
492 let writes_total = IntCounterVec::new(
493 Opts::new(
494 format!("{}_writes_total", METRIC_PREFIX),
495 "Total write operations",
496 ),
497 &["protocol", "status"],
498 )
499 .unwrap();
500 registry.register(Box::new(writes_total.clone())).unwrap();
501
502 let errors_total = IntCounterVec::new(
503 Opts::new(
504 format!("{}_errors_total", METRIC_PREFIX),
505 "Total errors by protocol and type",
506 ),
507 &["protocol", "error_type"],
508 )
509 .unwrap();
510 registry.register(Box::new(errors_total.clone())).unwrap();
511
512 let ticks_total = IntCounter::new(
513 format!("{}_ticks_total", METRIC_PREFIX),
514 "Total engine ticks processed",
515 )
516 .unwrap();
517 registry.register(Box::new(ticks_total.clone())).unwrap();
518
519 let events_total = IntCounterVec::new(
520 Opts::new(
521 format!("{}_events_total", METRIC_PREFIX),
522 "Total events by type",
523 ),
524 &["event_type"],
525 )
526 .unwrap();
527 registry.register(Box::new(events_total.clone())).unwrap();
528
529 let devices_active = IntGauge::new(
534 format!("{}_devices_active", METRIC_PREFIX),
535 "Number of active simulated devices",
536 )
537 .unwrap();
538 registry.register(Box::new(devices_active.clone())).unwrap();
539
540 let connections_active = IntGaugeVec::new(
541 Opts::new(
542 format!("{}_connections_active", METRIC_PREFIX),
543 "Number of active connections per protocol",
544 ),
545 &["protocol"],
546 )
547 .unwrap();
548 registry
549 .register(Box::new(connections_active.clone()))
550 .unwrap();
551
552 let points_total = IntGauge::new(
553 format!("{}_data_points_total", METRIC_PREFIX),
554 "Total number of data points across all devices",
555 )
556 .unwrap();
557 registry.register(Box::new(points_total.clone())).unwrap();
558
559 let points_by_device = IntGaugeVec::new(
560 Opts::new(
561 format!("{}_data_points_by_device", METRIC_PREFIX),
562 "Number of data points per device",
563 ),
564 &["protocol", "device_id"],
565 )
566 .unwrap();
567 registry
568 .register(Box::new(points_by_device.clone()))
569 .unwrap();
570
571 let memory_bytes = IntGauge::new(
572 format!("{}_memory_usage_bytes", METRIC_PREFIX),
573 "Current memory usage in bytes",
574 )
575 .unwrap();
576 registry.register(Box::new(memory_bytes.clone())).unwrap();
577
578 let cpu_percent = Gauge::new(
579 format!("{}_cpu_usage_percent", METRIC_PREFIX),
580 "Current CPU usage percentage (0-100)",
581 )
582 .unwrap();
583 registry.register(Box::new(cpu_percent.clone())).unwrap();
584
585 let tick_rate = Gauge::new(
586 format!("{}_tick_rate", METRIC_PREFIX),
587 "Current tick rate (ticks per second)",
588 )
589 .unwrap();
590 registry.register(Box::new(tick_rate.clone())).unwrap();
591
592 let request_duration = HistogramVec::new(
597 HistogramOpts::new(
598 format!("{}_request_duration_seconds", METRIC_PREFIX),
599 "Request processing duration in seconds",
600 )
601 .buckets(LATENCY_BUCKETS.to_vec()),
602 &["protocol", "operation"],
603 )
604 .unwrap();
605 registry
606 .register(Box::new(request_duration.clone()))
607 .unwrap();
608
609 let message_latency = HistogramVec::new(
610 HistogramOpts::new(
611 format!("{}_message_latency_seconds", METRIC_PREFIX),
612 "Message latency in seconds",
613 )
614 .buckets(LATENCY_BUCKETS.to_vec()),
615 &["protocol"],
616 )
617 .unwrap();
618 registry
619 .register(Box::new(message_latency.clone()))
620 .unwrap();
621
622 let tick_duration = Histogram::with_opts(
623 HistogramOpts::new(
624 format!("{}_tick_duration_seconds", METRIC_PREFIX),
625 "Engine tick processing duration in seconds",
626 )
627 .buckets(TICK_BUCKETS.to_vec()),
628 )
629 .unwrap();
630 registry.register(Box::new(tick_duration.clone())).unwrap();
631
632 let read_duration = HistogramVec::new(
633 HistogramOpts::new(
634 format!("{}_read_duration_seconds", METRIC_PREFIX),
635 "Read operation duration in seconds",
636 )
637 .buckets(LATENCY_BUCKETS.to_vec()),
638 &["protocol"],
639 )
640 .unwrap();
641 registry.register(Box::new(read_duration.clone())).unwrap();
642
643 let write_duration = HistogramVec::new(
644 HistogramOpts::new(
645 format!("{}_write_duration_seconds", METRIC_PREFIX),
646 "Write operation duration in seconds",
647 )
648 .buckets(LATENCY_BUCKETS.to_vec()),
649 &["protocol"],
650 )
651 .unwrap();
652 registry.register(Box::new(write_duration.clone())).unwrap();
653
654 Self {
655 requests_total,
657 messages_total,
658 reads_total,
659 writes_total,
660 errors_total,
661 ticks_total,
662 events_total,
663
664 devices_active,
666 connections_active,
667 points_total,
668 points_by_device,
669 memory_bytes,
670 cpu_percent,
671 tick_rate,
672
673 request_duration,
675 message_latency,
676 tick_duration,
677 read_duration,
678 write_duration,
679
680 start_time: now,
682 last_tick_time: RwLock::new(now),
683 tick_count_for_rate: AtomicU64::new(0),
684 }
685 }
686}
687
688#[derive(Debug, Clone, Serialize, Deserialize)]
694pub struct MetricsSnapshot {
695 pub uptime_secs: u64,
697 pub devices_active: u64,
699 pub points_total: u64,
701 pub memory_bytes: u64,
703 pub cpu_percent: f64,
705 pub ticks_total: u64,
707 pub tick_rate: f64,
709}
710
711#[derive(Debug, Clone, Serialize, Deserialize)]
713pub struct DetailedMetricsSnapshot {
714 #[serde(flatten)]
716 pub basic: MetricsSnapshot,
717 pub start_time_unix: u64,
719}
720
721#[derive(Debug, Clone, Default, Serialize, Deserialize)]
727pub struct LatencyStats {
728 pub min_us: u64,
730 pub max_us: u64,
732 pub avg_us: u64,
734 pub p50_us: u64,
736 pub p90_us: u64,
738 pub p95_us: u64,
740 pub p99_us: u64,
742 pub count: u64,
744 pub stddev_us: u64,
746}
747
748impl LatencyStats {
749 pub fn from_samples(samples: &[Duration]) -> Self {
751 if samples.is_empty() {
752 return Self::default();
753 }
754
755 let mut sorted: Vec<u64> = samples.iter().map(|d| d.as_micros() as u64).collect();
756 sorted.sort_unstable();
757
758 let count = sorted.len();
759 let sum: u64 = sorted.iter().sum();
760 let avg = sum / count as u64;
761
762 let variance: u64 = sorted
764 .iter()
765 .map(|&x| {
766 let diff = x.abs_diff(avg);
767 diff * diff
768 })
769 .sum::<u64>()
770 / count as u64;
771 let stddev = (variance as f64).sqrt() as u64;
772
773 Self {
774 min_us: sorted[0],
775 max_us: sorted[count - 1],
776 avg_us: avg,
777 p50_us: Self::percentile(&sorted, 50),
778 p90_us: Self::percentile(&sorted, 90),
779 p95_us: Self::percentile(&sorted, 95),
780 p99_us: Self::percentile(&sorted, 99),
781 count: count as u64,
782 stddev_us: stddev,
783 }
784 }
785
786 fn percentile(sorted: &[u64], p: usize) -> u64 {
788 if sorted.is_empty() {
789 return 0;
790 }
791 let idx = (sorted.len() * p / 100).min(sorted.len() - 1);
792 sorted[idx]
793 }
794
795 pub fn is_latency_high(&self, threshold_us: u64) -> bool {
799 self.p99_us > threshold_us
800 }
801}
802
803pub struct Timer {
812 start: Instant,
813}
814
815impl Timer {
816 pub fn start() -> Self {
818 Self {
819 start: Instant::now(),
820 }
821 }
822
823 pub fn elapsed(&self) -> Duration {
825 self.start.elapsed()
826 }
827
828 pub fn stop(self) -> Duration {
830 self.elapsed()
831 }
832
833 pub fn reset(&mut self) {
835 self.start = Instant::now();
836 }
837}
838
839impl Default for Timer {
840 fn default() -> Self {
841 Self::start()
842 }
843}
844
845pub struct RequestTimer {
858 histogram: Histogram,
859 start: Instant,
860 recorded: bool,
861}
862
863impl RequestTimer {
864 pub fn new(histogram: Histogram) -> Self {
866 Self {
867 histogram,
868 start: Instant::now(),
869 recorded: false,
870 }
871 }
872
873 pub fn elapsed(&self) -> Duration {
875 self.start.elapsed()
876 }
877
878 pub fn record(mut self) -> Duration {
882 let elapsed = self.elapsed();
883 self.histogram.observe(elapsed.as_secs_f64());
884 self.recorded = true;
885 elapsed
886 }
887
888 pub fn discard(mut self) {
892 self.recorded = true; }
894}
895
896impl Drop for RequestTimer {
897 fn drop(&mut self) {
898 if !self.recorded {
899 self.histogram.observe(self.start.elapsed().as_secs_f64());
900 }
901 }
902}
903
904#[cfg(feature = "system-metrics")]
913pub struct SystemMetricsCollector {
914 metrics: MetricsCollector,
915 interval: Duration,
916}
917
918#[cfg(feature = "system-metrics")]
919impl SystemMetricsCollector {
920 pub fn new(metrics: MetricsCollector, interval: Duration) -> Self {
922 Self { metrics, interval }
923 }
924
925 pub fn start(self) -> tokio::task::JoinHandle<()> {
929 tokio::spawn(async move {
930 let mut interval = tokio::time::interval(self.interval);
931
932 loop {
933 interval.tick().await;
934 self.collect_once();
935 }
936 })
937 }
938
939 fn collect_once(&self) {
941 #[cfg(target_os = "linux")]
943 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
944 if let Some(line) = status.lines().find(|l| l.starts_with("VmRSS:")) {
945 if let Some(kb_str) = line.split_whitespace().nth(1) {
946 if let Ok(kb) = kb_str.parse::<i64>() {
947 self.metrics.set_memory_bytes(kb * 1024);
948 }
949 }
950 }
951 }
952
953 }
956}
957
958#[macro_export]
974macro_rules! measure_request {
975 ($metrics:expr, $protocol:expr, $operation:expr, $block:expr) => {{
976 let _timer = $metrics.time_request($protocol, $operation);
977 let result = $block;
978 result
979 }};
980}
981
982#[macro_export]
992macro_rules! record_error {
993 ($metrics:expr, $protocol:expr, $error:expr) => {{
994 let error_type = $crate::metrics::classify_error(&$error);
995 $metrics.record_error($protocol, error_type);
996 }};
997}
998
999pub fn classify_error<E: std::fmt::Display>(error: &E) -> &'static str {
1003 let error_str = error.to_string().to_lowercase();
1004
1005 if error_str.contains("timeout") {
1006 "timeout"
1007 } else if error_str.contains("connection") || error_str.contains("connect") {
1008 "connection"
1009 } else if error_str.contains("protocol") || error_str.contains("invalid") {
1010 "protocol"
1011 } else if error_str.contains(" io ") || error_str.contains("i/o") || error_str.starts_with("io ") || error_str.ends_with(" io") {
1012 "io"
1013 } else if error_str.contains("not found") {
1014 "not_found"
1015 } else if error_str.contains("permission") || error_str.contains("unauthorized") {
1016 "permission"
1017 } else {
1018 "unknown"
1019 }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use super::*;
1025
1026 #[test]
1031 fn test_metrics_collector_new() {
1032 let metrics = MetricsCollector::new();
1033 assert!(metrics.uptime() >= Duration::ZERO);
1034 }
1035
1036 #[test]
1037 fn test_metrics_collector_global() {
1038 let global1 = MetricsCollector::global();
1039 let global2 = MetricsCollector::global();
1040 assert!(std::ptr::eq(global1, global2));
1042 }
1043
1044 #[test]
1045 fn test_metrics_collector_basic_operations() {
1046 let metrics = MetricsCollector::new();
1047
1048 metrics.record_message("modbus", "rx");
1050 metrics.record_message("modbus", "tx");
1051
1052 metrics.record_read("modbus", true, Duration::from_micros(100));
1054 metrics.record_read("modbus", false, Duration::from_micros(200));
1055
1056 metrics.record_write("opcua", true, Duration::from_micros(50));
1058
1059 metrics.set_devices_active(10);
1061
1062 let snapshot = metrics.snapshot();
1063 assert_eq!(snapshot.devices_active, 10);
1064 }
1065
1066 #[test]
1067 fn test_metrics_collector_request_timing() {
1068 let metrics = MetricsCollector::new();
1069
1070 metrics.record_request("modbus", "read");
1072 metrics.record_request_duration("modbus", "read", Duration::from_micros(150));
1073
1074 {
1076 let _timer = metrics.time_request("bacnet", "subscribe");
1077 std::thread::sleep(Duration::from_millis(5));
1078 } }
1080
1081 #[test]
1082 fn test_metrics_collector_error_recording() {
1083 let metrics = MetricsCollector::new();
1084
1085 metrics.record_error("modbus", "timeout");
1086 metrics.record_error("modbus", "connection");
1087 metrics.record_error("opcua", "protocol");
1088 }
1089
1090 #[test]
1091 fn test_metrics_collector_tick_recording() {
1092 let metrics = MetricsCollector::new();
1093
1094 for i in 0..5 {
1095 metrics.record_tick(Duration::from_millis(10 * (i + 1)));
1096 }
1097
1098 let snapshot = metrics.snapshot();
1099 assert_eq!(snapshot.ticks_total, 5);
1100 }
1101
1102 #[test]
1103 fn test_metrics_collector_connections() {
1104 let metrics = MetricsCollector::new();
1105
1106 metrics.set_connections_active("modbus", 5);
1107 metrics.inc_connections("modbus");
1108 metrics.dec_connections("modbus");
1109 }
1110
1111 #[test]
1112 fn test_metrics_collector_device_points() {
1113 let metrics = MetricsCollector::new();
1114
1115 metrics.set_points_total(1000);
1116 metrics.set_device_points("modbus", "device-001", 50);
1117 metrics.set_device_points("modbus", "device-002", 75);
1118
1119 let snapshot = metrics.snapshot();
1120 assert_eq!(snapshot.points_total, 1000);
1121
1122 metrics.remove_device_points("modbus", "device-001");
1124 }
1125
1126 #[test]
1127 fn test_metrics_collector_system_metrics() {
1128 let metrics = MetricsCollector::new();
1129
1130 metrics.set_memory_bytes(1024 * 1024 * 512); metrics.set_cpu_percent(45.5);
1132
1133 let snapshot = metrics.snapshot();
1134 assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 512);
1135 assert!((snapshot.cpu_percent - 45.5).abs() < 0.001);
1136
1137 metrics.update_system_metrics(1024 * 1024 * 256, 30.0);
1139 let snapshot = metrics.snapshot();
1140 assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 256);
1141 assert!((snapshot.cpu_percent - 30.0).abs() < 0.001);
1142
1143 metrics.set_cpu_percent(150.0); let snapshot = metrics.snapshot();
1146 assert!((snapshot.cpu_percent - 100.0).abs() < 0.001);
1147
1148 metrics.set_cpu_percent(-10.0); let snapshot = metrics.snapshot();
1150 assert!(snapshot.cpu_percent.abs() < 0.001);
1151 }
1152
1153 #[test]
1154 fn test_metrics_collector_prometheus_export() {
1155 let metrics = MetricsCollector::new();
1156
1157 metrics.record_read("modbus", true, Duration::from_micros(100));
1158 metrics.set_devices_active(5);
1159
1160 let prometheus_output = metrics.export_prometheus();
1161 assert!(!prometheus_output.is_empty());
1162 assert!(prometheus_output.contains(METRIC_PREFIX));
1163 assert!(prometheus_output.contains("devices_active"));
1164 }
1165
1166 #[test]
1167 fn test_metrics_collector_event_recording() {
1168 let metrics = MetricsCollector::new();
1169
1170 metrics.record_event("device_added");
1171 metrics.record_event("device_removed");
1172 metrics.record_event("engine_started");
1173 }
1174
1175 #[test]
1176 fn test_metrics_collector_snapshot() {
1177 let metrics = MetricsCollector::new();
1178
1179 metrics.set_devices_active(10);
1180 metrics.set_points_total(500);
1181 metrics.set_memory_bytes(1024 * 1024);
1182 metrics.set_cpu_percent(25.0);
1183
1184 let snapshot = metrics.snapshot();
1185 assert_eq!(snapshot.devices_active, 10);
1186 assert_eq!(snapshot.points_total, 500);
1187 assert_eq!(snapshot.memory_bytes, 1024 * 1024);
1188 assert!((snapshot.cpu_percent - 25.0).abs() < 0.001);
1189
1190 let detailed = metrics.detailed_snapshot();
1191 assert_eq!(detailed.basic.devices_active, 10);
1192 assert!(detailed.start_time_unix > 0);
1193 }
1194
1195 #[test]
1200 fn test_latency_stats_from_samples() {
1201 let samples: Vec<Duration> = vec![
1202 Duration::from_micros(100),
1203 Duration::from_micros(200),
1204 Duration::from_micros(300),
1205 Duration::from_micros(400),
1206 Duration::from_micros(500),
1207 ];
1208
1209 let stats = LatencyStats::from_samples(&samples);
1210 assert_eq!(stats.min_us, 100);
1211 assert_eq!(stats.max_us, 500);
1212 assert_eq!(stats.avg_us, 300);
1213 assert_eq!(stats.count, 5);
1214 assert!(stats.p50_us >= 200 && stats.p50_us <= 300);
1215 }
1216
1217 #[test]
1218 fn test_latency_stats_empty() {
1219 let samples: Vec<Duration> = vec![];
1220 let stats = LatencyStats::from_samples(&samples);
1221
1222 assert_eq!(stats.min_us, 0);
1223 assert_eq!(stats.max_us, 0);
1224 assert_eq!(stats.avg_us, 0);
1225 assert_eq!(stats.count, 0);
1226 }
1227
1228 #[test]
1229 fn test_latency_stats_single_sample() {
1230 let samples = vec![Duration::from_micros(150)];
1231 let stats = LatencyStats::from_samples(&samples);
1232
1233 assert_eq!(stats.min_us, 150);
1234 assert_eq!(stats.max_us, 150);
1235 assert_eq!(stats.avg_us, 150);
1236 assert_eq!(stats.count, 1);
1237 }
1238
1239 #[test]
1240 fn test_latency_stats_percentiles() {
1241 let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1243
1244 let stats = LatencyStats::from_samples(&samples);
1245 assert_eq!(stats.min_us, 1);
1246 assert_eq!(stats.max_us, 100);
1247 assert!(stats.p50_us >= 50 && stats.p50_us <= 51);
1249 assert!(stats.p90_us >= 90 && stats.p90_us <= 91);
1250 assert!(stats.p95_us >= 95 && stats.p95_us <= 96);
1251 assert!(stats.p99_us >= 99 && stats.p99_us <= 100);
1252 }
1253
1254 #[test]
1255 fn test_latency_stats_is_latency_high() {
1256 let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1257 let stats = LatencyStats::from_samples(&samples);
1258
1259 assert!(stats.is_latency_high(50)); assert!(!stats.is_latency_high(100)); }
1262
1263 #[test]
1264 fn test_latency_stats_stddev() {
1265 let samples: Vec<Duration> = vec![
1266 Duration::from_micros(100),
1267 Duration::from_micros(100),
1268 Duration::from_micros(100),
1269 ];
1270 let stats = LatencyStats::from_samples(&samples);
1271 assert_eq!(stats.stddev_us, 0); }
1273
1274 #[test]
1279 fn test_timer_start_stop() {
1280 let timer = Timer::start();
1281 std::thread::sleep(Duration::from_millis(10));
1282 let elapsed = timer.stop();
1283 assert!(elapsed >= Duration::from_millis(10));
1284 }
1285
1286 #[test]
1287 fn test_timer_elapsed() {
1288 let timer = Timer::start();
1289 std::thread::sleep(Duration::from_millis(5));
1290 let elapsed1 = timer.elapsed();
1291 std::thread::sleep(Duration::from_millis(5));
1292 let elapsed2 = timer.elapsed();
1293 assert!(elapsed2 > elapsed1);
1294 }
1295
1296 #[test]
1297 fn test_timer_reset() {
1298 let mut timer = Timer::start();
1299 std::thread::sleep(Duration::from_millis(10));
1300 timer.reset();
1301 let elapsed = timer.elapsed();
1302 assert!(elapsed < Duration::from_millis(10));
1303 }
1304
1305 #[test]
1306 fn test_timer_default() {
1307 let timer = Timer::default();
1308 assert!(timer.elapsed() >= Duration::ZERO);
1309 }
1310
1311 #[test]
1316 fn test_request_timer_drop_records() {
1317 let metrics = MetricsCollector::new();
1318 {
1319 let _timer = metrics.time_request("test_proto", "test_op");
1320 std::thread::sleep(Duration::from_millis(5));
1321 }
1322 let output = metrics.export_prometheus();
1325 assert!(output.contains(&format!("{}_request_duration_seconds", METRIC_PREFIX)));
1326 }
1327
1328 #[test]
1329 fn test_request_timer_manual_record() {
1330 let metrics = MetricsCollector::new();
1331 let timer = metrics.time_request("test_proto", "manual_test");
1332 std::thread::sleep(Duration::from_millis(5));
1333 let elapsed = timer.record();
1334 assert!(elapsed >= Duration::from_millis(5));
1335 }
1336
1337 #[test]
1338 fn test_request_timer_discard() {
1339 let metrics = MetricsCollector::new();
1340 let timer = metrics.time_request("test_proto", "discard_test");
1341 timer.discard(); }
1343
1344 #[test]
1349 fn test_classify_error() {
1350 assert_eq!(classify_error(&"Connection timeout"), "timeout");
1351 assert_eq!(classify_error(&"Connection refused"), "connection");
1352 assert_eq!(classify_error(&"I/O error occurred"), "io");
1353 assert_eq!(classify_error(&"Protocol violation"), "protocol");
1354 assert_eq!(classify_error(&"Invalid message format"), "protocol");
1355 assert_eq!(classify_error(&"Device not found"), "not_found");
1356 assert_eq!(classify_error(&"Permission denied"), "permission");
1357 assert_eq!(classify_error(&"Some random error"), "unknown");
1358 }
1359
1360 #[test]
1365 fn test_metric_prefix() {
1366 assert_eq!(METRIC_PREFIX, "mabi");
1367 }
1368
1369 #[test]
1370 fn test_latency_buckets() {
1371 assert!(!LATENCY_BUCKETS.is_empty());
1372 for i in 1..LATENCY_BUCKETS.len() {
1374 assert!(LATENCY_BUCKETS[i] > LATENCY_BUCKETS[i - 1]);
1375 }
1376 }
1377
1378 #[test]
1379 fn test_tick_buckets() {
1380 assert!(!TICK_BUCKETS.is_empty());
1381 for i in 1..TICK_BUCKETS.len() {
1383 assert!(TICK_BUCKETS[i] > TICK_BUCKETS[i - 1]);
1384 }
1385 }
1386}