1use crate::error::EtherNetIpError;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime};
5use tokio::sync::RwLock;
6use tokio::time::interval;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct MonitoringMetrics {
11 pub connections: ConnectionMetrics,
13 pub operations: OperationMetrics,
15 pub performance: PerformanceMetrics,
17 pub errors: ErrorMetrics,
19 pub health: HealthMetrics,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ConnectionMetrics {
25 pub active_connections: u32,
26 pub total_connections: u64,
27 pub failed_connections: u64,
28 pub connection_uptime_avg: Duration,
29 pub last_connection_time: Option<SystemTime>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct OperationMetrics {
34 pub total_reads: u64,
35 pub total_writes: u64,
36 pub successful_reads: u64,
37 pub successful_writes: u64,
38 pub failed_reads: u64,
39 pub failed_writes: u64,
40 pub batch_operations: u64,
41 pub subscription_updates: u64,
42 pub partial_batch_failures: u64,
43 pub last_successful_read_time: Option<SystemTime>,
44 pub last_failed_read_time: Option<SystemTime>,
45 pub last_successful_write_time: Option<SystemTime>,
46 pub last_failed_write_time: Option<SystemTime>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct PerformanceMetrics {
51 pub avg_read_latency_ms: f64,
52 pub avg_write_latency_ms: f64,
53 pub max_read_latency_ms: f64,
54 pub max_write_latency_ms: f64,
55 pub reads_per_second: f64,
56 pub writes_per_second: f64,
57 pub memory_usage_mb: f64,
58 pub cpu_usage_percent: f64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct ErrorMetrics {
63 pub network_errors: u64,
64 pub protocol_errors: u64,
65 pub timeout_errors: u64,
66 pub tag_not_found_errors: u64,
67 pub data_type_errors: u64,
68 pub session_errors: u64,
69 pub route_path_errors: u64,
70 pub embedded_service_errors: u64,
71 pub known_controller_limitation_errors: u64,
72 pub retriable_errors: u64,
73 pub non_retriable_errors: u64,
74 pub last_error_time: Option<SystemTime>,
75 pub last_error_message: Option<String>,
76 pub last_error_category: Option<ErrorCategory>,
77 pub last_retriable_error_time: Option<SystemTime>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct HealthMetrics {
82 pub overall_health: HealthStatus,
83 pub last_health_check: SystemTime,
84 pub health_mode: HealthCheckMode,
85 pub last_verified_health_check: Option<SystemTime>,
86 pub consecutive_failures: u32,
87 pub recovery_attempts: u32,
88 pub system_uptime: Duration,
89 pub last_success_time: Option<SystemTime>,
90 pub last_failure_time: Option<SystemTime>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[non_exhaustive]
95pub enum HealthStatus {
96 Healthy,
97 Warning,
98 Critical,
99 Unknown,
100}
101
102#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
103#[non_exhaustive]
104pub enum HealthCheckMode {
105 Passive,
106 Verified,
107}
108
109#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
110#[non_exhaustive]
111pub enum ErrorCategory {
112 Network,
113 Timeout,
114 Session,
115 RoutePath,
116 CipProtocol,
117 BatchEmbeddedService,
118 KnownControllerLimitation,
119 DataType,
120 NotFound,
121 Unknown,
122}
123
124impl ErrorCategory {
125 pub fn is_retriable(self) -> bool {
126 matches!(
127 self,
128 ErrorCategory::Network | ErrorCategory::Timeout | ErrorCategory::Session
129 )
130 }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct DiagnosticsSnapshot {
135 pub captured_at: SystemTime,
136 pub connections: ConnectionMetrics,
137 pub operations: OperationMetrics,
138 pub performance: PerformanceMetrics,
139 pub errors: ErrorMetrics,
140 pub health: HealthMetrics,
141 pub system_metrics_are_placeholders: bool,
142}
143
144pub struct ProductionMonitor {
146 metrics: Arc<RwLock<MonitoringMetrics>>,
147 start_time: Instant,
148 system_start_time: SystemTime,
149}
150
151impl Default for ProductionMonitor {
152 fn default() -> Self {
153 Self::new()
154 }
155}
156
157impl ProductionMonitor {
158 pub fn new() -> Self {
159 Self {
160 metrics: Arc::new(RwLock::new(MonitoringMetrics {
161 connections: ConnectionMetrics {
162 active_connections: 0,
163 total_connections: 0,
164 failed_connections: 0,
165 connection_uptime_avg: Duration::ZERO,
166 last_connection_time: None,
167 },
168 operations: OperationMetrics {
169 total_reads: 0,
170 total_writes: 0,
171 successful_reads: 0,
172 successful_writes: 0,
173 failed_reads: 0,
174 failed_writes: 0,
175 batch_operations: 0,
176 subscription_updates: 0,
177 partial_batch_failures: 0,
178 last_successful_read_time: None,
179 last_failed_read_time: None,
180 last_successful_write_time: None,
181 last_failed_write_time: None,
182 },
183 performance: PerformanceMetrics {
184 avg_read_latency_ms: 0.0,
185 avg_write_latency_ms: 0.0,
186 max_read_latency_ms: 0.0,
187 max_write_latency_ms: 0.0,
188 reads_per_second: 0.0,
189 writes_per_second: 0.0,
190 memory_usage_mb: 0.0,
191 cpu_usage_percent: 0.0,
192 },
193 errors: ErrorMetrics {
194 network_errors: 0,
195 protocol_errors: 0,
196 timeout_errors: 0,
197 tag_not_found_errors: 0,
198 data_type_errors: 0,
199 session_errors: 0,
200 route_path_errors: 0,
201 embedded_service_errors: 0,
202 known_controller_limitation_errors: 0,
203 retriable_errors: 0,
204 non_retriable_errors: 0,
205 last_error_time: None,
206 last_error_message: None,
207 last_error_category: None,
208 last_retriable_error_time: None,
209 },
210 health: HealthMetrics {
211 overall_health: HealthStatus::Unknown,
212 last_health_check: SystemTime::now(),
213 health_mode: HealthCheckMode::Passive,
214 last_verified_health_check: None,
215 consecutive_failures: 0,
216 recovery_attempts: 0,
217 system_uptime: Duration::ZERO,
218 last_success_time: None,
219 last_failure_time: None,
220 },
221 })),
222 start_time: Instant::now(),
223 system_start_time: SystemTime::now(),
224 }
225 }
226
227 pub async fn record_read_success(&self, latency: Duration) {
229 let mut metrics = self.metrics.write().await;
230 metrics.operations.total_reads += 1;
231 metrics.operations.successful_reads += 1;
232 let now = SystemTime::now();
233 metrics.operations.last_successful_read_time = Some(now);
234 metrics.health.last_success_time = Some(now);
235 metrics.health.consecutive_failures = 0;
236
237 let latency_ms = latency.as_millis() as f64;
239 metrics.performance.avg_read_latency_ms = (metrics.performance.avg_read_latency_ms
240 * (metrics.operations.successful_reads - 1) as f64
241 + latency_ms)
242 / metrics.operations.successful_reads as f64;
243
244 if latency_ms > metrics.performance.max_read_latency_ms {
245 metrics.performance.max_read_latency_ms = latency_ms;
246 }
247 }
248
249 pub async fn record_read_failure(&self, error_type: &str) {
251 let mut metrics = self.metrics.write().await;
252 metrics.operations.total_reads += 1;
253 metrics.operations.failed_reads += 1;
254 metrics.operations.last_failed_read_time = Some(SystemTime::now());
255 self.record_error(&mut metrics, error_type);
256 }
257
258 pub async fn record_write_success(&self, latency: Duration) {
260 let mut metrics = self.metrics.write().await;
261 metrics.operations.total_writes += 1;
262 metrics.operations.successful_writes += 1;
263 let now = SystemTime::now();
264 metrics.operations.last_successful_write_time = Some(now);
265 metrics.health.last_success_time = Some(now);
266 metrics.health.consecutive_failures = 0;
267
268 let latency_ms = latency.as_millis() as f64;
270 metrics.performance.avg_write_latency_ms = (metrics.performance.avg_write_latency_ms
271 * (metrics.operations.successful_writes - 1) as f64
272 + latency_ms)
273 / metrics.operations.successful_writes as f64;
274
275 if latency_ms > metrics.performance.max_write_latency_ms {
276 metrics.performance.max_write_latency_ms = latency_ms;
277 }
278 }
279
280 pub async fn record_write_failure(&self, error_type: &str) {
282 let mut metrics = self.metrics.write().await;
283 metrics.operations.total_writes += 1;
284 metrics.operations.failed_writes += 1;
285 metrics.operations.last_failed_write_time = Some(SystemTime::now());
286 self.record_error(&mut metrics, error_type);
287 }
288
289 pub async fn record_partial_batch_failure(&self, error_type: &str) {
291 let mut metrics = self.metrics.write().await;
292 metrics.operations.batch_operations += 1;
293 metrics.operations.partial_batch_failures += 1;
294 self.record_error(&mut metrics, error_type);
295 }
296
297 pub async fn record_connection(&self, success: bool) {
299 let mut metrics = self.metrics.write().await;
300 if success {
301 metrics.connections.total_connections += 1;
302 metrics.connections.active_connections += 1;
303 metrics.connections.last_connection_time = Some(SystemTime::now());
304 } else {
305 metrics.connections.failed_connections += 1;
306 }
307 }
308
309 pub async fn record_disconnection(&self) {
311 let mut metrics = self.metrics.write().await;
312 if metrics.connections.active_connections > 0 {
313 metrics.connections.active_connections -= 1;
314 }
315 }
316
317 fn record_error(&self, metrics: &mut MonitoringMetrics, error_type: &str) {
319 let category = Self::classify_error_type(error_type);
320 let now = SystemTime::now();
321
322 match category {
323 ErrorCategory::Network => metrics.errors.network_errors += 1,
324 ErrorCategory::Timeout => metrics.errors.timeout_errors += 1,
325 ErrorCategory::Session => metrics.errors.session_errors += 1,
326 ErrorCategory::RoutePath => metrics.errors.route_path_errors += 1,
327 ErrorCategory::CipProtocol => metrics.errors.protocol_errors += 1,
328 ErrorCategory::BatchEmbeddedService => {
329 metrics.errors.protocol_errors += 1;
330 metrics.errors.embedded_service_errors += 1;
331 }
332 ErrorCategory::KnownControllerLimitation => {
333 metrics.errors.protocol_errors += 1;
334 metrics.errors.known_controller_limitation_errors += 1;
335 }
336 ErrorCategory::DataType => metrics.errors.data_type_errors += 1,
337 ErrorCategory::NotFound => metrics.errors.tag_not_found_errors += 1,
338 ErrorCategory::Unknown => {}
339 }
340
341 if category.is_retriable() {
342 metrics.errors.retriable_errors += 1;
343 metrics.errors.last_retriable_error_time = Some(now);
344 } else {
345 metrics.errors.non_retriable_errors += 1;
346 }
347
348 metrics.errors.last_error_time = Some(now);
349 metrics.errors.last_error_message = Some(error_type.to_string());
350 metrics.errors.last_error_category = Some(category);
351 metrics.health.consecutive_failures += 1;
352 metrics.health.last_failure_time = Some(now);
353 }
354
355 pub fn classify_error(error: &EtherNetIpError) -> ErrorCategory {
356 match error {
357 EtherNetIpError::Io(_) => ErrorCategory::Network,
358 EtherNetIpError::Timeout(_) => ErrorCategory::Timeout,
359 EtherNetIpError::Connection(_) | EtherNetIpError::ConnectionLost(_) => {
360 ErrorCategory::Session
361 }
362 EtherNetIpError::TagNotFound(_) => ErrorCategory::NotFound,
363 EtherNetIpError::DataTypeMismatch { .. } => ErrorCategory::DataType,
364 EtherNetIpError::CipError { code, message }
365 | EtherNetIpError::ReadError {
366 status: code,
367 message,
368 }
369 | EtherNetIpError::WriteError {
370 status: code,
371 message,
372 } => Self::classify_status_and_message(Some(*code), message),
373 EtherNetIpError::Protocol(message)
374 | EtherNetIpError::InvalidResponse { reason: message }
375 | EtherNetIpError::Other(message)
376 | EtherNetIpError::Tag(message)
377 | EtherNetIpError::Subscription(message)
378 | EtherNetIpError::Udt(message)
379 | EtherNetIpError::Permission(message)
380 | EtherNetIpError::InvalidString { reason: message } => {
381 Self::classify_status_and_message(None, message)
382 }
383 EtherNetIpError::StringTooLong { .. } => ErrorCategory::DataType,
384 EtherNetIpError::Utf8(_) => ErrorCategory::DataType,
385 }
386 }
387
388 pub fn classify_error_type(error_type: &str) -> ErrorCategory {
389 match error_type {
390 "network" => ErrorCategory::Network,
391 "timeout" => ErrorCategory::Timeout,
392 "tag_not_found" => ErrorCategory::NotFound,
393 "data_type" => ErrorCategory::DataType,
394 "session" => ErrorCategory::Session,
395 "route_path" => ErrorCategory::RoutePath,
396 "embedded_service" => ErrorCategory::BatchEmbeddedService,
397 "known_controller_limitation" => ErrorCategory::KnownControllerLimitation,
398 "protocol" => ErrorCategory::CipProtocol,
399 other => Self::classify_status_and_message(None, other),
400 }
401 }
402
403 fn classify_status_and_message(status: Option<u8>, message: &str) -> ErrorCategory {
404 let lower = message.to_ascii_lowercase();
405
406 if status == Some(0x1E) || lower.contains("embedded service error") {
407 return ErrorCategory::BatchEmbeddedService;
408 }
409 if lower.contains("0x2107")
410 || lower.contains("controller rejected")
411 || lower.contains("does not support writing to udt array element members")
412 {
413 return ErrorCategory::KnownControllerLimitation;
414 }
415 if status == Some(0x04) || lower.contains("path segment error") || lower.contains("route") {
416 return ErrorCategory::RoutePath;
417 }
418 if lower.contains("timed out") || lower.contains("timeout") {
419 return ErrorCategory::Timeout;
420 }
421 if lower.contains("connection lost")
422 || lower.contains("plc unreachable")
423 || lower.contains("session")
424 || lower.contains("keep-alive")
425 {
426 return ErrorCategory::Session;
427 }
428 if lower.contains("tag not found") {
429 return ErrorCategory::NotFound;
430 }
431 if lower.contains("data type")
432 || lower.contains("invalid string")
433 || lower.contains("utf-8")
434 {
435 return ErrorCategory::DataType;
436 }
437 if lower.contains("io error") || lower.contains("network") {
438 return ErrorCategory::Network;
439 }
440 if status.is_some() || lower.contains("cip error") || lower.contains("protocol") {
441 return ErrorCategory::CipProtocol;
442 }
443
444 ErrorCategory::Unknown
445 }
446
447 pub async fn get_metrics(&self) -> MonitoringMetrics {
449 let mut metrics = self.metrics.read().await.clone();
450
451 metrics.health.system_uptime = self.start_time.elapsed();
453
454 let total_time = metrics.health.system_uptime.as_secs_f64();
456 if total_time > 0.0 {
457 metrics.performance.reads_per_second =
458 metrics.operations.successful_reads as f64 / total_time;
459 metrics.performance.writes_per_second =
460 metrics.operations.successful_writes as f64 / total_time;
461 }
462
463 metrics.health.overall_health = self.calculate_health_status(&metrics);
465 metrics.health.last_health_check = SystemTime::now();
466 if metrics.health.last_verified_health_check.is_none() {
467 metrics.health.health_mode = HealthCheckMode::Passive;
468 }
469
470 metrics
471 }
472
473 pub async fn get_diagnostics_snapshot(&self) -> DiagnosticsSnapshot {
475 let metrics = self.get_metrics().await;
476 DiagnosticsSnapshot {
477 captured_at: SystemTime::now(),
478 connections: metrics.connections,
479 operations: metrics.operations,
480 performance: metrics.performance,
481 errors: metrics.errors,
482 health: metrics.health,
483 system_metrics_are_placeholders: true,
484 }
485 }
486
487 fn calculate_health_status(&self, metrics: &MonitoringMetrics) -> HealthStatus {
489 let error_rate = if metrics.operations.total_reads + metrics.operations.total_writes > 0 {
490 (metrics.operations.failed_reads + metrics.operations.failed_writes) as f64
491 / (metrics.operations.total_reads + metrics.operations.total_writes) as f64
492 } else {
493 0.0
494 };
495
496 if error_rate > 0.1 || metrics.health.consecutive_failures > 10 {
497 HealthStatus::Critical
498 } else if error_rate > 0.05 || metrics.health.consecutive_failures > 5 {
499 HealthStatus::Warning
500 } else if metrics.connections.active_connections > 0 {
501 HealthStatus::Healthy
502 } else {
503 HealthStatus::Unknown
504 }
505 }
506
507 pub async fn start_monitoring(&self) {
509 let monitor = self.clone();
510 tokio::spawn(async move {
511 let mut interval = interval(Duration::from_secs(30));
512 loop {
513 interval.tick().await;
514 monitor.update_system_metrics().await;
515 }
516 });
517 }
518
519 async fn update_system_metrics(&self) {
521 let mut metrics = self.metrics.write().await;
522
523 metrics.performance.memory_usage_mb = self.get_memory_usage();
525
526 metrics.performance.cpu_usage_percent = self.get_cpu_usage();
528 }
529
530 fn get_memory_usage(&self) -> f64 {
532 10.0
535 }
536
537 fn get_cpu_usage(&self) -> f64 {
539 5.0
542 }
543
544 pub async fn reset_consecutive_failures(&self) {
546 let mut metrics = self.metrics.write().await;
547 metrics.health.consecutive_failures = 0;
548 metrics.health.recovery_attempts += 1;
549 }
550
551 pub async fn record_verified_health_check(&self, is_healthy: bool) {
553 let mut metrics = self.metrics.write().await;
554 let now = SystemTime::now();
555 metrics.health.health_mode = HealthCheckMode::Verified;
556 metrics.health.last_verified_health_check = Some(now);
557 metrics.health.last_health_check = now;
558
559 if is_healthy {
560 metrics.health.last_success_time = Some(now);
561 metrics.health.consecutive_failures = 0;
562 } else {
563 metrics.health.last_failure_time = Some(now);
564 metrics.health.consecutive_failures += 1;
565 }
566 }
567}
568
569impl Clone for ProductionMonitor {
570 fn clone(&self) -> Self {
571 Self {
572 metrics: Arc::clone(&self.metrics),
573 start_time: self.start_time,
574 system_start_time: self.system_start_time,
575 }
576 }
577}
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582 use crate::error::EtherNetIpError;
583
584 #[test]
585 fn classify_timeout_and_route_path_errors() {
586 assert_eq!(
587 ProductionMonitor::classify_error(&EtherNetIpError::Timeout(Duration::from_secs(1))),
588 ErrorCategory::Timeout
589 );
590 assert_eq!(
591 ProductionMonitor::classify_error(&EtherNetIpError::Protocol(
592 "Path segment error while resolving route".to_string()
593 )),
594 ErrorCategory::RoutePath
595 );
596 }
597
598 #[test]
599 fn classify_known_controller_limitation_and_embedded_service() {
600 assert_eq!(
601 ProductionMonitor::classify_error(&EtherNetIpError::Protocol(
602 "Vendor-specific or composite extended error: 0x2107".to_string()
603 )),
604 ErrorCategory::KnownControllerLimitation
605 );
606 assert_eq!(
607 ProductionMonitor::classify_error(&EtherNetIpError::WriteError {
608 status: 0x1E,
609 message: "Embedded service error".to_string(),
610 }),
611 ErrorCategory::BatchEmbeddedService
612 );
613 }
614
615 #[tokio::test]
616 async fn diagnostics_snapshot_distinguishes_verified_health() {
617 let monitor = ProductionMonitor::new();
618 monitor.record_read_success(Duration::from_millis(10)).await;
619
620 let passive = monitor.get_diagnostics_snapshot().await;
621 assert_eq!(passive.health.health_mode, HealthCheckMode::Passive);
622 assert!(passive.health.last_verified_health_check.is_none());
623 assert!(passive.operations.last_successful_read_time.is_some());
624
625 monitor.record_verified_health_check(true).await;
626 let verified = monitor.get_diagnostics_snapshot().await;
627 assert_eq!(verified.health.health_mode, HealthCheckMode::Verified);
628 assert!(verified.health.last_verified_health_check.is_some());
629 assert!(verified.system_metrics_are_placeholders);
630 }
631}