1use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::time::{Duration, Instant};
28
29use once_cell::sync::Lazy;
30use parking_lot::RwLock;
31use prometheus::{
32 Encoder, Gauge, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGauge,
33 IntGaugeVec, Opts, Registry, TextEncoder,
34};
35use serde::{Deserialize, Serialize};
36
37pub const METRIC_PREFIX: &str = "mabi";
43
44pub const LATENCY_BUCKETS: &[f64] = &[
47 0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
48];
49
50pub const TICK_BUCKETS: &[f64] = &[0.0001, 0.0005, 0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1];
53
54pub static GLOBAL_REGISTRY: Lazy<Registry> = Lazy::new(Registry::new);
61
62static GLOBAL_METRICS: Lazy<MetricsCollector> =
64 Lazy::new(|| MetricsCollector::with_registry(GLOBAL_REGISTRY.clone()));
65
66#[derive(Clone)]
78pub struct MetricsCollector {
79 registry: Arc<Registry>,
80 inner: Arc<MetricsInner>,
81}
82
83struct MetricsInner {
85 requests_total: IntCounterVec,
88 reads_total: IntCounterVec,
90 writes_total: IntCounterVec,
92 errors_total: IntCounterVec,
94 ticks_total: IntCounter,
96 messages_total: IntCounterVec,
98 events_total: IntCounterVec,
100
101 devices_active: IntGauge,
104 connections_active: IntGaugeVec,
106 points_total: IntGauge,
108 points_by_device: IntGaugeVec,
110 memory_bytes: IntGauge,
112 cpu_percent: Gauge,
114 tick_rate: Gauge,
116
117 request_duration: HistogramVec,
120 message_latency: HistogramVec,
122 tick_duration: Histogram,
124 read_duration: HistogramVec,
126 write_duration: HistogramVec,
128
129 start_time: Instant,
131 last_tick_time: RwLock<Instant>,
132 tick_count_for_rate: AtomicU64,
133}
134
135impl MetricsCollector {
136 pub fn new() -> Self {
138 let registry = Registry::new();
139 Self::with_registry(registry)
140 }
141
142 pub fn global() -> &'static Self {
147 &GLOBAL_METRICS
148 }
149
150 pub fn with_registry(registry: Registry) -> Self {
152 let inner = MetricsInner::new(®istry);
153
154 Self {
155 registry: Arc::new(registry),
156 inner: Arc::new(inner),
157 }
158 }
159
160 pub fn registry(&self) -> &Registry {
162 &self.registry
163 }
164
165 pub fn record_request(&self, protocol: &str, operation: &str) {
174 self.inner
175 .requests_total
176 .with_label_values(&[protocol, operation])
177 .inc();
178 }
179
180 pub fn record_request_duration(&self, protocol: &str, operation: &str, duration: Duration) {
182 self.inner
183 .request_duration
184 .with_label_values(&[protocol, operation])
185 .observe(duration.as_secs_f64());
186 }
187
188 pub fn time_request(&self, protocol: &str, operation: &str) -> RequestTimer {
200 self.record_request(protocol, operation);
201 RequestTimer::new(
202 self.inner
203 .request_duration
204 .with_label_values(&[protocol, operation]),
205 )
206 }
207
208 pub fn record_message(&self, protocol: &str, direction: &str) {
210 self.inner
211 .messages_total
212 .with_label_values(&[protocol, direction])
213 .inc();
214 }
215
216 pub fn record_read(&self, protocol: &str, success: bool, duration: Duration) {
218 let status = if success { "success" } else { "error" };
219 self.inner
220 .reads_total
221 .with_label_values(&[protocol, status])
222 .inc();
223 self.inner
224 .read_duration
225 .with_label_values(&[protocol])
226 .observe(duration.as_secs_f64());
227
228 self.inner
230 .requests_total
231 .with_label_values(&[protocol, "read"])
232 .inc();
233 self.inner
234 .request_duration
235 .with_label_values(&[protocol, "read"])
236 .observe(duration.as_secs_f64());
237 }
238
239 pub fn record_write(&self, protocol: &str, success: bool, duration: Duration) {
241 let status = if success { "success" } else { "error" };
242 self.inner
243 .writes_total
244 .with_label_values(&[protocol, status])
245 .inc();
246 self.inner
247 .write_duration
248 .with_label_values(&[protocol])
249 .observe(duration.as_secs_f64());
250
251 self.inner
253 .requests_total
254 .with_label_values(&[protocol, "write"])
255 .inc();
256 self.inner
257 .request_duration
258 .with_label_values(&[protocol, "write"])
259 .observe(duration.as_secs_f64());
260 }
261
262 pub fn record_error(&self, protocol: &str, error_type: &str) {
268 self.inner
269 .errors_total
270 .with_label_values(&[protocol, error_type])
271 .inc();
272 }
273
274 pub fn record_tick(&self, duration: Duration) {
280 self.inner.ticks_total.inc();
281 self.inner.tick_duration.observe(duration.as_secs_f64());
282
283 let count = self
285 .inner
286 .tick_count_for_rate
287 .fetch_add(1, Ordering::Relaxed)
288 + 1;
289 let mut last_time = self.inner.last_tick_time.write();
290 let elapsed = last_time.elapsed();
291
292 if elapsed >= Duration::from_secs(1) {
294 let rate = count as f64 / elapsed.as_secs_f64();
295 self.inner.tick_rate.set(rate);
296 self.inner.tick_count_for_rate.store(0, Ordering::Relaxed);
297 *last_time = Instant::now();
298 }
299 }
300
301 pub fn record_latency(&self, protocol: &str, latency: Duration) {
303 self.inner
304 .message_latency
305 .with_label_values(&[protocol])
306 .observe(latency.as_secs_f64());
307 }
308
309 pub fn record_event(&self, event_type: &str) {
311 self.inner
312 .events_total
313 .with_label_values(&[event_type])
314 .inc();
315 }
316
317 pub fn set_devices_active(&self, count: i64) {
323 self.inner.devices_active.set(count);
324 }
325
326 pub fn set_connections_active(&self, protocol: &str, count: i64) {
328 self.inner
329 .connections_active
330 .with_label_values(&[protocol])
331 .set(count);
332 }
333
334 pub fn inc_connections(&self, protocol: &str) {
336 self.inner
337 .connections_active
338 .with_label_values(&[protocol])
339 .inc();
340 }
341
342 pub fn dec_connections(&self, protocol: &str) {
344 self.inner
345 .connections_active
346 .with_label_values(&[protocol])
347 .dec();
348 }
349
350 pub fn set_points_total(&self, count: i64) {
352 self.inner.points_total.set(count);
353 }
354
355 pub fn set_device_points(&self, protocol: &str, device_id: &str, count: i64) {
357 self.inner
358 .points_by_device
359 .with_label_values(&[protocol, device_id])
360 .set(count);
361 }
362
363 pub fn remove_device_points(&self, protocol: &str, device_id: &str) {
365 self.inner
367 .points_by_device
368 .with_label_values(&[protocol, device_id])
369 .set(0);
370 }
371
372 pub fn set_memory_bytes(&self, bytes: i64) {
378 self.inner.memory_bytes.set(bytes);
379 }
380
381 pub fn set_cpu_percent(&self, percent: f64) {
383 self.inner.cpu_percent.set(percent.clamp(0.0, 100.0));
384 }
385
386 pub fn update_system_metrics(&self, memory_bytes: i64, cpu_percent: f64) {
390 self.set_memory_bytes(memory_bytes);
391 self.set_cpu_percent(cpu_percent);
392 }
393
394 pub fn uptime(&self) -> Duration {
400 self.inner.start_time.elapsed()
401 }
402
403 pub fn tick_rate(&self) -> f64 {
405 self.inner.tick_rate.get()
406 }
407
408 pub fn snapshot(&self) -> MetricsSnapshot {
410 MetricsSnapshot {
411 uptime_secs: self.uptime().as_secs(),
412 devices_active: self.inner.devices_active.get() as u64,
413 points_total: self.inner.points_total.get() as u64,
414 memory_bytes: self.inner.memory_bytes.get() as u64,
415 cpu_percent: self.inner.cpu_percent.get(),
416 ticks_total: self.inner.ticks_total.get(),
417 tick_rate: self.inner.tick_rate.get(),
418 }
419 }
420
421 pub fn detailed_snapshot(&self) -> DetailedMetricsSnapshot {
423 DetailedMetricsSnapshot {
424 basic: self.snapshot(),
425 start_time_unix: std::time::SystemTime::now()
426 .duration_since(std::time::UNIX_EPOCH)
427 .map(|d| d.as_secs() - self.uptime().as_secs())
428 .unwrap_or(0),
429 }
430 }
431
432 pub fn export_prometheus(&self) -> String {
434 let encoder = TextEncoder::new();
435 let metric_families = self.registry.gather();
436 let mut buffer = Vec::new();
437 encoder.encode(&metric_families, &mut buffer).unwrap();
438 String::from_utf8(buffer).unwrap()
439 }
440
441 pub fn export_global_prometheus() -> String {
443 let encoder = TextEncoder::new();
444 let metric_families = GLOBAL_REGISTRY.gather();
445 let mut buffer = Vec::new();
446 encoder.encode(&metric_families, &mut buffer).unwrap();
447 String::from_utf8(buffer).unwrap()
448 }
449}
450
451impl Default for MetricsCollector {
452 fn default() -> Self {
453 Self::new()
454 }
455}
456
457impl MetricsInner {
458 fn new(registry: &Registry) -> Self {
459 let now = Instant::now();
460
461 let requests_total = IntCounterVec::new(
466 Opts::new(
467 format!("{}_requests_total", METRIC_PREFIX),
468 "Total number of protocol requests",
469 ),
470 &["protocol", "operation"],
471 )
472 .unwrap();
473 registry.register(Box::new(requests_total.clone())).unwrap();
474
475 let messages_total = IntCounterVec::new(
476 Opts::new(
477 format!("{}_messages_total", METRIC_PREFIX),
478 "Total messages processed",
479 ),
480 &["protocol", "direction"],
481 )
482 .unwrap();
483 registry.register(Box::new(messages_total.clone())).unwrap();
484
485 let reads_total = IntCounterVec::new(
486 Opts::new(
487 format!("{}_reads_total", METRIC_PREFIX),
488 "Total read operations",
489 ),
490 &["protocol", "status"],
491 )
492 .unwrap();
493 registry.register(Box::new(reads_total.clone())).unwrap();
494
495 let writes_total = IntCounterVec::new(
496 Opts::new(
497 format!("{}_writes_total", METRIC_PREFIX),
498 "Total write operations",
499 ),
500 &["protocol", "status"],
501 )
502 .unwrap();
503 registry.register(Box::new(writes_total.clone())).unwrap();
504
505 let errors_total = IntCounterVec::new(
506 Opts::new(
507 format!("{}_errors_total", METRIC_PREFIX),
508 "Total errors by protocol and type",
509 ),
510 &["protocol", "error_type"],
511 )
512 .unwrap();
513 registry.register(Box::new(errors_total.clone())).unwrap();
514
515 let ticks_total = IntCounter::new(
516 format!("{}_ticks_total", METRIC_PREFIX),
517 "Total engine ticks processed",
518 )
519 .unwrap();
520 registry.register(Box::new(ticks_total.clone())).unwrap();
521
522 let events_total = IntCounterVec::new(
523 Opts::new(
524 format!("{}_events_total", METRIC_PREFIX),
525 "Total events by type",
526 ),
527 &["event_type"],
528 )
529 .unwrap();
530 registry.register(Box::new(events_total.clone())).unwrap();
531
532 let devices_active = IntGauge::new(
537 format!("{}_devices_active", METRIC_PREFIX),
538 "Number of active simulated devices",
539 )
540 .unwrap();
541 registry.register(Box::new(devices_active.clone())).unwrap();
542
543 let connections_active = IntGaugeVec::new(
544 Opts::new(
545 format!("{}_connections_active", METRIC_PREFIX),
546 "Number of active connections per protocol",
547 ),
548 &["protocol"],
549 )
550 .unwrap();
551 registry
552 .register(Box::new(connections_active.clone()))
553 .unwrap();
554
555 let points_total = IntGauge::new(
556 format!("{}_data_points_total", METRIC_PREFIX),
557 "Total number of data points across all devices",
558 )
559 .unwrap();
560 registry.register(Box::new(points_total.clone())).unwrap();
561
562 let points_by_device = IntGaugeVec::new(
563 Opts::new(
564 format!("{}_data_points_by_device", METRIC_PREFIX),
565 "Number of data points per device",
566 ),
567 &["protocol", "device_id"],
568 )
569 .unwrap();
570 registry
571 .register(Box::new(points_by_device.clone()))
572 .unwrap();
573
574 let memory_bytes = IntGauge::new(
575 format!("{}_memory_usage_bytes", METRIC_PREFIX),
576 "Current memory usage in bytes",
577 )
578 .unwrap();
579 registry.register(Box::new(memory_bytes.clone())).unwrap();
580
581 let cpu_percent = Gauge::new(
582 format!("{}_cpu_usage_percent", METRIC_PREFIX),
583 "Current CPU usage percentage (0-100)",
584 )
585 .unwrap();
586 registry.register(Box::new(cpu_percent.clone())).unwrap();
587
588 let tick_rate = Gauge::new(
589 format!("{}_tick_rate", METRIC_PREFIX),
590 "Current tick rate (ticks per second)",
591 )
592 .unwrap();
593 registry.register(Box::new(tick_rate.clone())).unwrap();
594
595 let request_duration = HistogramVec::new(
600 HistogramOpts::new(
601 format!("{}_request_duration_seconds", METRIC_PREFIX),
602 "Request processing duration in seconds",
603 )
604 .buckets(LATENCY_BUCKETS.to_vec()),
605 &["protocol", "operation"],
606 )
607 .unwrap();
608 registry
609 .register(Box::new(request_duration.clone()))
610 .unwrap();
611
612 let message_latency = HistogramVec::new(
613 HistogramOpts::new(
614 format!("{}_message_latency_seconds", METRIC_PREFIX),
615 "Message latency in seconds",
616 )
617 .buckets(LATENCY_BUCKETS.to_vec()),
618 &["protocol"],
619 )
620 .unwrap();
621 registry
622 .register(Box::new(message_latency.clone()))
623 .unwrap();
624
625 let tick_duration = Histogram::with_opts(
626 HistogramOpts::new(
627 format!("{}_tick_duration_seconds", METRIC_PREFIX),
628 "Engine tick processing duration in seconds",
629 )
630 .buckets(TICK_BUCKETS.to_vec()),
631 )
632 .unwrap();
633 registry.register(Box::new(tick_duration.clone())).unwrap();
634
635 let read_duration = HistogramVec::new(
636 HistogramOpts::new(
637 format!("{}_read_duration_seconds", METRIC_PREFIX),
638 "Read operation duration in seconds",
639 )
640 .buckets(LATENCY_BUCKETS.to_vec()),
641 &["protocol"],
642 )
643 .unwrap();
644 registry.register(Box::new(read_duration.clone())).unwrap();
645
646 let write_duration = HistogramVec::new(
647 HistogramOpts::new(
648 format!("{}_write_duration_seconds", METRIC_PREFIX),
649 "Write operation duration in seconds",
650 )
651 .buckets(LATENCY_BUCKETS.to_vec()),
652 &["protocol"],
653 )
654 .unwrap();
655 registry.register(Box::new(write_duration.clone())).unwrap();
656
657 Self {
658 requests_total,
660 messages_total,
661 reads_total,
662 writes_total,
663 errors_total,
664 ticks_total,
665 events_total,
666
667 devices_active,
669 connections_active,
670 points_total,
671 points_by_device,
672 memory_bytes,
673 cpu_percent,
674 tick_rate,
675
676 request_duration,
678 message_latency,
679 tick_duration,
680 read_duration,
681 write_duration,
682
683 start_time: now,
685 last_tick_time: RwLock::new(now),
686 tick_count_for_rate: AtomicU64::new(0),
687 }
688 }
689}
690
691#[derive(Debug, Clone, Serialize, Deserialize)]
697pub struct MetricsSnapshot {
698 pub uptime_secs: u64,
700 pub devices_active: u64,
702 pub points_total: u64,
704 pub memory_bytes: u64,
706 pub cpu_percent: f64,
708 pub ticks_total: u64,
710 pub tick_rate: f64,
712}
713
714#[derive(Debug, Clone, Serialize, Deserialize)]
716pub struct DetailedMetricsSnapshot {
717 #[serde(flatten)]
719 pub basic: MetricsSnapshot,
720 pub start_time_unix: u64,
722}
723
724#[derive(Debug, Clone, Default, Serialize, Deserialize)]
730pub struct LatencyStats {
731 pub min_us: u64,
733 pub max_us: u64,
735 pub avg_us: u64,
737 pub p50_us: u64,
739 pub p90_us: u64,
741 pub p95_us: u64,
743 pub p99_us: u64,
745 pub count: u64,
747 pub stddev_us: u64,
749}
750
751impl LatencyStats {
752 pub fn from_samples(samples: &[Duration]) -> Self {
754 if samples.is_empty() {
755 return Self::default();
756 }
757
758 let mut sorted: Vec<u64> = samples.iter().map(|d| d.as_micros() as u64).collect();
759 sorted.sort_unstable();
760
761 let count = sorted.len();
762 let sum: u64 = sorted.iter().sum();
763 let avg = sum / count as u64;
764
765 let variance: u64 = sorted
767 .iter()
768 .map(|&x| {
769 let diff = x.abs_diff(avg);
770 diff * diff
771 })
772 .sum::<u64>()
773 / count as u64;
774 let stddev = (variance as f64).sqrt() as u64;
775
776 Self {
777 min_us: sorted[0],
778 max_us: sorted[count - 1],
779 avg_us: avg,
780 p50_us: Self::percentile(&sorted, 50),
781 p90_us: Self::percentile(&sorted, 90),
782 p95_us: Self::percentile(&sorted, 95),
783 p99_us: Self::percentile(&sorted, 99),
784 count: count as u64,
785 stddev_us: stddev,
786 }
787 }
788
789 fn percentile(sorted: &[u64], p: usize) -> u64 {
791 if sorted.is_empty() {
792 return 0;
793 }
794 let idx = (sorted.len() * p / 100).min(sorted.len() - 1);
795 sorted[idx]
796 }
797
798 pub fn is_latency_high(&self, threshold_us: u64) -> bool {
802 self.p99_us > threshold_us
803 }
804}
805
806pub struct Timer {
815 start: Instant,
816}
817
818impl Timer {
819 pub fn start() -> Self {
821 Self {
822 start: Instant::now(),
823 }
824 }
825
826 pub fn elapsed(&self) -> Duration {
828 self.start.elapsed()
829 }
830
831 pub fn stop(self) -> Duration {
833 self.elapsed()
834 }
835
836 pub fn reset(&mut self) {
838 self.start = Instant::now();
839 }
840}
841
842impl Default for Timer {
843 fn default() -> Self {
844 Self::start()
845 }
846}
847
848pub struct RequestTimer {
861 histogram: Histogram,
862 start: Instant,
863 recorded: bool,
864}
865
866impl RequestTimer {
867 pub fn new(histogram: Histogram) -> Self {
869 Self {
870 histogram,
871 start: Instant::now(),
872 recorded: false,
873 }
874 }
875
876 pub fn elapsed(&self) -> Duration {
878 self.start.elapsed()
879 }
880
881 pub fn record(mut self) -> Duration {
885 let elapsed = self.elapsed();
886 self.histogram.observe(elapsed.as_secs_f64());
887 self.recorded = true;
888 elapsed
889 }
890
891 pub fn discard(mut self) {
895 self.recorded = true; }
897}
898
899impl Drop for RequestTimer {
900 fn drop(&mut self) {
901 if !self.recorded {
902 self.histogram.observe(self.start.elapsed().as_secs_f64());
903 }
904 }
905}
906
907#[cfg(feature = "system-metrics")]
916pub struct SystemMetricsCollector {
917 metrics: MetricsCollector,
918 interval: Duration,
919}
920
921#[cfg(feature = "system-metrics")]
922impl SystemMetricsCollector {
923 pub fn new(metrics: MetricsCollector, interval: Duration) -> Self {
925 Self { metrics, interval }
926 }
927
928 pub fn start(self) -> tokio::task::JoinHandle<()> {
932 tokio::spawn(async move {
933 let mut interval = tokio::time::interval(self.interval);
934
935 loop {
936 interval.tick().await;
937 self.collect_once();
938 }
939 })
940 }
941
942 fn collect_once(&self) {
944 #[cfg(target_os = "linux")]
946 if let Ok(status) = std::fs::read_to_string("/proc/self/status") {
947 if let Some(line) = status.lines().find(|l| l.starts_with("VmRSS:")) {
948 if let Some(kb_str) = line.split_whitespace().nth(1) {
949 if let Ok(kb) = kb_str.parse::<i64>() {
950 self.metrics.set_memory_bytes(kb * 1024);
951 }
952 }
953 }
954 }
955
956 }
959}
960
961#[macro_export]
977macro_rules! measure_request {
978 ($metrics:expr, $protocol:expr, $operation:expr, $block:expr) => {{
979 let _timer = $metrics.time_request($protocol, $operation);
980 let result = $block;
981 result
982 }};
983}
984
985#[macro_export]
995macro_rules! record_error {
996 ($metrics:expr, $protocol:expr, $error:expr) => {{
997 let error_type = $crate::metrics::classify_error(&$error);
998 $metrics.record_error($protocol, error_type);
999 }};
1000}
1001
1002pub fn classify_error<E: std::fmt::Display>(error: &E) -> &'static str {
1006 let error_str = error.to_string().to_lowercase();
1007
1008 if error_str.contains("timeout") {
1009 "timeout"
1010 } else if error_str.contains("connection") || error_str.contains("connect") {
1011 "connection"
1012 } else if error_str.contains("protocol") || error_str.contains("invalid") {
1013 "protocol"
1014 } else if error_str.contains(" io ")
1015 || error_str.contains("i/o")
1016 || error_str.starts_with("io ")
1017 || error_str.ends_with(" io")
1018 {
1019 "io"
1020 } else if error_str.contains("not found") {
1021 "not_found"
1022 } else if error_str.contains("permission") || error_str.contains("unauthorized") {
1023 "permission"
1024 } else {
1025 "unknown"
1026 }
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031 use super::*;
1032
1033 #[test]
1038 fn test_metrics_collector_new() {
1039 let metrics = MetricsCollector::new();
1040 assert!(metrics.uptime() >= Duration::ZERO);
1041 }
1042
1043 #[test]
1044 fn test_metrics_collector_global() {
1045 let global1 = MetricsCollector::global();
1046 let global2 = MetricsCollector::global();
1047 assert!(std::ptr::eq(global1, global2));
1049 }
1050
1051 #[test]
1052 fn test_metrics_collector_basic_operations() {
1053 let metrics = MetricsCollector::new();
1054
1055 metrics.record_message("modbus", "rx");
1057 metrics.record_message("modbus", "tx");
1058
1059 metrics.record_read("modbus", true, Duration::from_micros(100));
1061 metrics.record_read("modbus", false, Duration::from_micros(200));
1062
1063 metrics.record_write("opcua", true, Duration::from_micros(50));
1065
1066 metrics.set_devices_active(10);
1068
1069 let snapshot = metrics.snapshot();
1070 assert_eq!(snapshot.devices_active, 10);
1071 }
1072
1073 #[test]
1074 fn test_metrics_collector_request_timing() {
1075 let metrics = MetricsCollector::new();
1076
1077 metrics.record_request("modbus", "read");
1079 metrics.record_request_duration("modbus", "read", Duration::from_micros(150));
1080
1081 {
1083 let _timer = metrics.time_request("bacnet", "subscribe");
1084 std::thread::sleep(Duration::from_millis(5));
1085 } }
1087
1088 #[test]
1089 fn test_metrics_collector_error_recording() {
1090 let metrics = MetricsCollector::new();
1091
1092 metrics.record_error("modbus", "timeout");
1093 metrics.record_error("modbus", "connection");
1094 metrics.record_error("opcua", "protocol");
1095 }
1096
1097 #[test]
1098 fn test_metrics_collector_tick_recording() {
1099 let metrics = MetricsCollector::new();
1100
1101 for i in 0..5 {
1102 metrics.record_tick(Duration::from_millis(10 * (i + 1)));
1103 }
1104
1105 let snapshot = metrics.snapshot();
1106 assert_eq!(snapshot.ticks_total, 5);
1107 }
1108
1109 #[test]
1110 fn test_metrics_collector_connections() {
1111 let metrics = MetricsCollector::new();
1112
1113 metrics.set_connections_active("modbus", 5);
1114 metrics.inc_connections("modbus");
1115 metrics.dec_connections("modbus");
1116 }
1117
1118 #[test]
1119 fn test_metrics_collector_device_points() {
1120 let metrics = MetricsCollector::new();
1121
1122 metrics.set_points_total(1000);
1123 metrics.set_device_points("modbus", "device-001", 50);
1124 metrics.set_device_points("modbus", "device-002", 75);
1125
1126 let snapshot = metrics.snapshot();
1127 assert_eq!(snapshot.points_total, 1000);
1128
1129 metrics.remove_device_points("modbus", "device-001");
1131 }
1132
1133 #[test]
1134 fn test_metrics_collector_system_metrics() {
1135 let metrics = MetricsCollector::new();
1136
1137 metrics.set_memory_bytes(1024 * 1024 * 512); metrics.set_cpu_percent(45.5);
1139
1140 let snapshot = metrics.snapshot();
1141 assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 512);
1142 assert!((snapshot.cpu_percent - 45.5).abs() < 0.001);
1143
1144 metrics.update_system_metrics(1024 * 1024 * 256, 30.0);
1146 let snapshot = metrics.snapshot();
1147 assert_eq!(snapshot.memory_bytes, 1024 * 1024 * 256);
1148 assert!((snapshot.cpu_percent - 30.0).abs() < 0.001);
1149
1150 metrics.set_cpu_percent(150.0); let snapshot = metrics.snapshot();
1153 assert!((snapshot.cpu_percent - 100.0).abs() < 0.001);
1154
1155 metrics.set_cpu_percent(-10.0); let snapshot = metrics.snapshot();
1157 assert!(snapshot.cpu_percent.abs() < 0.001);
1158 }
1159
1160 #[test]
1161 fn test_metrics_collector_prometheus_export() {
1162 let metrics = MetricsCollector::new();
1163
1164 metrics.record_read("modbus", true, Duration::from_micros(100));
1165 metrics.set_devices_active(5);
1166
1167 let prometheus_output = metrics.export_prometheus();
1168 assert!(!prometheus_output.is_empty());
1169 assert!(prometheus_output.contains(METRIC_PREFIX));
1170 assert!(prometheus_output.contains("devices_active"));
1171 }
1172
1173 #[test]
1174 fn test_metrics_collector_event_recording() {
1175 let metrics = MetricsCollector::new();
1176
1177 metrics.record_event("device_added");
1178 metrics.record_event("device_removed");
1179 metrics.record_event("engine_started");
1180 }
1181
1182 #[test]
1183 fn test_metrics_collector_snapshot() {
1184 let metrics = MetricsCollector::new();
1185
1186 metrics.set_devices_active(10);
1187 metrics.set_points_total(500);
1188 metrics.set_memory_bytes(1024 * 1024);
1189 metrics.set_cpu_percent(25.0);
1190
1191 let snapshot = metrics.snapshot();
1192 assert_eq!(snapshot.devices_active, 10);
1193 assert_eq!(snapshot.points_total, 500);
1194 assert_eq!(snapshot.memory_bytes, 1024 * 1024);
1195 assert!((snapshot.cpu_percent - 25.0).abs() < 0.001);
1196
1197 let detailed = metrics.detailed_snapshot();
1198 assert_eq!(detailed.basic.devices_active, 10);
1199 assert!(detailed.start_time_unix > 0);
1200 }
1201
1202 #[test]
1207 fn test_latency_stats_from_samples() {
1208 let samples: Vec<Duration> = vec![
1209 Duration::from_micros(100),
1210 Duration::from_micros(200),
1211 Duration::from_micros(300),
1212 Duration::from_micros(400),
1213 Duration::from_micros(500),
1214 ];
1215
1216 let stats = LatencyStats::from_samples(&samples);
1217 assert_eq!(stats.min_us, 100);
1218 assert_eq!(stats.max_us, 500);
1219 assert_eq!(stats.avg_us, 300);
1220 assert_eq!(stats.count, 5);
1221 assert!(stats.p50_us >= 200 && stats.p50_us <= 300);
1222 }
1223
1224 #[test]
1225 fn test_latency_stats_empty() {
1226 let samples: Vec<Duration> = vec![];
1227 let stats = LatencyStats::from_samples(&samples);
1228
1229 assert_eq!(stats.min_us, 0);
1230 assert_eq!(stats.max_us, 0);
1231 assert_eq!(stats.avg_us, 0);
1232 assert_eq!(stats.count, 0);
1233 }
1234
1235 #[test]
1236 fn test_latency_stats_single_sample() {
1237 let samples = vec![Duration::from_micros(150)];
1238 let stats = LatencyStats::from_samples(&samples);
1239
1240 assert_eq!(stats.min_us, 150);
1241 assert_eq!(stats.max_us, 150);
1242 assert_eq!(stats.avg_us, 150);
1243 assert_eq!(stats.count, 1);
1244 }
1245
1246 #[test]
1247 fn test_latency_stats_percentiles() {
1248 let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1250
1251 let stats = LatencyStats::from_samples(&samples);
1252 assert_eq!(stats.min_us, 1);
1253 assert_eq!(stats.max_us, 100);
1254 assert!(stats.p50_us >= 50 && stats.p50_us <= 51);
1256 assert!(stats.p90_us >= 90 && stats.p90_us <= 91);
1257 assert!(stats.p95_us >= 95 && stats.p95_us <= 96);
1258 assert!(stats.p99_us >= 99 && stats.p99_us <= 100);
1259 }
1260
1261 #[test]
1262 fn test_latency_stats_is_latency_high() {
1263 let samples: Vec<Duration> = (1..=100).map(|i| Duration::from_micros(i)).collect();
1264 let stats = LatencyStats::from_samples(&samples);
1265
1266 assert!(stats.is_latency_high(50)); assert!(!stats.is_latency_high(100)); }
1269
1270 #[test]
1271 fn test_latency_stats_stddev() {
1272 let samples: Vec<Duration> = vec![
1273 Duration::from_micros(100),
1274 Duration::from_micros(100),
1275 Duration::from_micros(100),
1276 ];
1277 let stats = LatencyStats::from_samples(&samples);
1278 assert_eq!(stats.stddev_us, 0); }
1280
1281 #[test]
1286 fn test_timer_start_stop() {
1287 let timer = Timer::start();
1288 std::thread::sleep(Duration::from_millis(10));
1289 let elapsed = timer.stop();
1290 assert!(elapsed >= Duration::from_millis(10));
1291 }
1292
1293 #[test]
1294 fn test_timer_elapsed() {
1295 let timer = Timer::start();
1296 std::thread::sleep(Duration::from_millis(5));
1297 let elapsed1 = timer.elapsed();
1298 std::thread::sleep(Duration::from_millis(5));
1299 let elapsed2 = timer.elapsed();
1300 assert!(elapsed2 > elapsed1);
1301 }
1302
1303 #[test]
1304 fn test_timer_reset() {
1305 let mut timer = Timer::start();
1306 std::thread::sleep(Duration::from_millis(10));
1307 timer.reset();
1308 let elapsed = timer.elapsed();
1309 assert!(elapsed < Duration::from_millis(10));
1310 }
1311
1312 #[test]
1313 fn test_timer_default() {
1314 let timer = Timer::default();
1315 assert!(timer.elapsed() >= Duration::ZERO);
1316 }
1317
1318 #[test]
1323 fn test_request_timer_drop_records() {
1324 let metrics = MetricsCollector::new();
1325 {
1326 let _timer = metrics.time_request("test_proto", "test_op");
1327 std::thread::sleep(Duration::from_millis(5));
1328 }
1329 let output = metrics.export_prometheus();
1332 assert!(output.contains(&format!("{}_request_duration_seconds", METRIC_PREFIX)));
1333 }
1334
1335 #[test]
1336 fn test_request_timer_manual_record() {
1337 let metrics = MetricsCollector::new();
1338 let timer = metrics.time_request("test_proto", "manual_test");
1339 std::thread::sleep(Duration::from_millis(5));
1340 let elapsed = timer.record();
1341 assert!(elapsed >= Duration::from_millis(5));
1342 }
1343
1344 #[test]
1345 fn test_request_timer_discard() {
1346 let metrics = MetricsCollector::new();
1347 let timer = metrics.time_request("test_proto", "discard_test");
1348 timer.discard(); }
1350
1351 #[test]
1356 fn test_classify_error() {
1357 assert_eq!(classify_error(&"Connection timeout"), "timeout");
1358 assert_eq!(classify_error(&"Connection refused"), "connection");
1359 assert_eq!(classify_error(&"I/O error occurred"), "io");
1360 assert_eq!(classify_error(&"Protocol violation"), "protocol");
1361 assert_eq!(classify_error(&"Invalid message format"), "protocol");
1362 assert_eq!(classify_error(&"Device not found"), "not_found");
1363 assert_eq!(classify_error(&"Permission denied"), "permission");
1364 assert_eq!(classify_error(&"Some random error"), "unknown");
1365 }
1366
1367 #[test]
1372 fn test_metric_prefix() {
1373 assert_eq!(METRIC_PREFIX, "mabi");
1374 }
1375
1376 #[test]
1377 fn test_latency_buckets() {
1378 assert!(!LATENCY_BUCKETS.is_empty());
1379 for i in 1..LATENCY_BUCKETS.len() {
1381 assert!(LATENCY_BUCKETS[i] > LATENCY_BUCKETS[i - 1]);
1382 }
1383 }
1384
1385 #[test]
1386 fn test_tick_buckets() {
1387 assert!(!TICK_BUCKETS.is_empty());
1388 for i in 1..TICK_BUCKETS.len() {
1390 assert!(TICK_BUCKETS[i] > TICK_BUCKETS[i - 1]);
1391 }
1392 }
1393}