Skip to main content

rust_ethernet_ip/
monitoring.rs

1use crate::error::EtherNetIpError;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use std::time::{Duration, Instant, SystemTime};
5use tokio::sync::RwLock;
6use tokio::time::interval;
7
8/// Production monitoring metrics for the EtherNet/IP library
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct MonitoringMetrics {
11    /// Connection statistics
12    pub connections: ConnectionMetrics,
13    /// Operation statistics
14    pub operations: OperationMetrics,
15    /// Performance statistics
16    pub performance: PerformanceMetrics,
17    /// Error statistics
18    pub errors: ErrorMetrics,
19    /// System health
20    pub health: HealthMetrics,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ConnectionMetrics {
25    pub active_connections: u32,
26    pub total_connections: u64,
27    pub failed_connections: u64,
28    pub connection_uptime_avg: Duration,
29    pub last_connection_time: Option<SystemTime>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct OperationMetrics {
34    pub total_reads: u64,
35    pub total_writes: u64,
36    pub successful_reads: u64,
37    pub successful_writes: u64,
38    pub failed_reads: u64,
39    pub failed_writes: u64,
40    pub batch_operations: u64,
41    pub subscription_updates: u64,
42    pub partial_batch_failures: u64,
43    pub last_successful_read_time: Option<SystemTime>,
44    pub last_failed_read_time: Option<SystemTime>,
45    pub last_successful_write_time: Option<SystemTime>,
46    pub last_failed_write_time: Option<SystemTime>,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct PerformanceMetrics {
51    pub avg_read_latency_ms: f64,
52    pub avg_write_latency_ms: f64,
53    pub max_read_latency_ms: f64,
54    pub max_write_latency_ms: f64,
55    pub reads_per_second: f64,
56    pub writes_per_second: f64,
57    pub memory_usage_mb: f64,
58    pub cpu_usage_percent: f64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct ErrorMetrics {
63    pub network_errors: u64,
64    pub protocol_errors: u64,
65    pub timeout_errors: u64,
66    pub tag_not_found_errors: u64,
67    pub data_type_errors: u64,
68    pub session_errors: u64,
69    pub route_path_errors: u64,
70    pub embedded_service_errors: u64,
71    pub known_controller_limitation_errors: u64,
72    pub retriable_errors: u64,
73    pub non_retriable_errors: u64,
74    pub last_error_time: Option<SystemTime>,
75    pub last_error_message: Option<String>,
76    pub last_error_category: Option<ErrorCategory>,
77    pub last_retriable_error_time: Option<SystemTime>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct HealthMetrics {
82    pub overall_health: HealthStatus,
83    pub last_health_check: SystemTime,
84    pub health_mode: HealthCheckMode,
85    pub last_verified_health_check: Option<SystemTime>,
86    pub consecutive_failures: u32,
87    pub recovery_attempts: u32,
88    pub system_uptime: Duration,
89    pub last_success_time: Option<SystemTime>,
90    pub last_failure_time: Option<SystemTime>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94#[non_exhaustive]
95pub enum HealthStatus {
96    Healthy,
97    Warning,
98    Critical,
99    Unknown,
100}
101
102#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
103#[non_exhaustive]
104pub enum HealthCheckMode {
105    Passive,
106    Verified,
107}
108
109#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
110#[non_exhaustive]
111pub enum ErrorCategory {
112    Network,
113    Timeout,
114    Session,
115    RoutePath,
116    CipProtocol,
117    BatchEmbeddedService,
118    KnownControllerLimitation,
119    DataType,
120    NotFound,
121    Unknown,
122}
123
124impl ErrorCategory {
125    pub fn is_retriable(self) -> bool {
126        matches!(
127            self,
128            ErrorCategory::Network | ErrorCategory::Timeout | ErrorCategory::Session
129        )
130    }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134pub struct DiagnosticsSnapshot {
135    pub captured_at: SystemTime,
136    pub connections: ConnectionMetrics,
137    pub operations: OperationMetrics,
138    pub performance: PerformanceMetrics,
139    pub errors: ErrorMetrics,
140    pub health: HealthMetrics,
141    pub system_metrics_are_placeholders: bool,
142}
143
144/// Production monitoring system for EtherNet/IP operations
145pub struct ProductionMonitor {
146    metrics: Arc<RwLock<MonitoringMetrics>>,
147    start_time: Instant,
148    system_start_time: SystemTime,
149}
150
151impl Default for ProductionMonitor {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157impl ProductionMonitor {
158    pub fn new() -> Self {
159        Self {
160            metrics: Arc::new(RwLock::new(MonitoringMetrics {
161                connections: ConnectionMetrics {
162                    active_connections: 0,
163                    total_connections: 0,
164                    failed_connections: 0,
165                    connection_uptime_avg: Duration::ZERO,
166                    last_connection_time: None,
167                },
168                operations: OperationMetrics {
169                    total_reads: 0,
170                    total_writes: 0,
171                    successful_reads: 0,
172                    successful_writes: 0,
173                    failed_reads: 0,
174                    failed_writes: 0,
175                    batch_operations: 0,
176                    subscription_updates: 0,
177                    partial_batch_failures: 0,
178                    last_successful_read_time: None,
179                    last_failed_read_time: None,
180                    last_successful_write_time: None,
181                    last_failed_write_time: None,
182                },
183                performance: PerformanceMetrics {
184                    avg_read_latency_ms: 0.0,
185                    avg_write_latency_ms: 0.0,
186                    max_read_latency_ms: 0.0,
187                    max_write_latency_ms: 0.0,
188                    reads_per_second: 0.0,
189                    writes_per_second: 0.0,
190                    memory_usage_mb: 0.0,
191                    cpu_usage_percent: 0.0,
192                },
193                errors: ErrorMetrics {
194                    network_errors: 0,
195                    protocol_errors: 0,
196                    timeout_errors: 0,
197                    tag_not_found_errors: 0,
198                    data_type_errors: 0,
199                    session_errors: 0,
200                    route_path_errors: 0,
201                    embedded_service_errors: 0,
202                    known_controller_limitation_errors: 0,
203                    retriable_errors: 0,
204                    non_retriable_errors: 0,
205                    last_error_time: None,
206                    last_error_message: None,
207                    last_error_category: None,
208                    last_retriable_error_time: None,
209                },
210                health: HealthMetrics {
211                    overall_health: HealthStatus::Unknown,
212                    last_health_check: SystemTime::now(),
213                    health_mode: HealthCheckMode::Passive,
214                    last_verified_health_check: None,
215                    consecutive_failures: 0,
216                    recovery_attempts: 0,
217                    system_uptime: Duration::ZERO,
218                    last_success_time: None,
219                    last_failure_time: None,
220                },
221            })),
222            start_time: Instant::now(),
223            system_start_time: SystemTime::now(),
224        }
225    }
226
227    /// Record a successful read operation
228    pub async fn record_read_success(&self, latency: Duration) {
229        let mut metrics = self.metrics.write().await;
230        metrics.operations.total_reads += 1;
231        metrics.operations.successful_reads += 1;
232        let now = SystemTime::now();
233        metrics.operations.last_successful_read_time = Some(now);
234        metrics.health.last_success_time = Some(now);
235        metrics.health.consecutive_failures = 0;
236
237        // Update latency metrics
238        let latency_ms = latency.as_millis() as f64;
239        metrics.performance.avg_read_latency_ms = (metrics.performance.avg_read_latency_ms
240            * (metrics.operations.successful_reads - 1) as f64
241            + latency_ms)
242            / metrics.operations.successful_reads as f64;
243
244        if latency_ms > metrics.performance.max_read_latency_ms {
245            metrics.performance.max_read_latency_ms = latency_ms;
246        }
247    }
248
249    /// Record a failed read operation
250    pub async fn record_read_failure(&self, error_type: &str) {
251        let mut metrics = self.metrics.write().await;
252        metrics.operations.total_reads += 1;
253        metrics.operations.failed_reads += 1;
254        metrics.operations.last_failed_read_time = Some(SystemTime::now());
255        self.record_error(&mut metrics, error_type);
256    }
257
258    /// Record a successful write operation
259    pub async fn record_write_success(&self, latency: Duration) {
260        let mut metrics = self.metrics.write().await;
261        metrics.operations.total_writes += 1;
262        metrics.operations.successful_writes += 1;
263        let now = SystemTime::now();
264        metrics.operations.last_successful_write_time = Some(now);
265        metrics.health.last_success_time = Some(now);
266        metrics.health.consecutive_failures = 0;
267
268        // Update latency metrics
269        let latency_ms = latency.as_millis() as f64;
270        metrics.performance.avg_write_latency_ms = (metrics.performance.avg_write_latency_ms
271            * (metrics.operations.successful_writes - 1) as f64
272            + latency_ms)
273            / metrics.operations.successful_writes as f64;
274
275        if latency_ms > metrics.performance.max_write_latency_ms {
276            metrics.performance.max_write_latency_ms = latency_ms;
277        }
278    }
279
280    /// Record a failed write operation
281    pub async fn record_write_failure(&self, error_type: &str) {
282        let mut metrics = self.metrics.write().await;
283        metrics.operations.total_writes += 1;
284        metrics.operations.failed_writes += 1;
285        metrics.operations.last_failed_write_time = Some(SystemTime::now());
286        self.record_error(&mut metrics, error_type);
287    }
288
289    /// Record a partial batch failure without losing successful values.
290    pub async fn record_partial_batch_failure(&self, error_type: &str) {
291        let mut metrics = self.metrics.write().await;
292        metrics.operations.batch_operations += 1;
293        metrics.operations.partial_batch_failures += 1;
294        self.record_error(&mut metrics, error_type);
295    }
296
297    /// Record a connection event
298    pub async fn record_connection(&self, success: bool) {
299        let mut metrics = self.metrics.write().await;
300        if success {
301            metrics.connections.total_connections += 1;
302            metrics.connections.active_connections += 1;
303            metrics.connections.last_connection_time = Some(SystemTime::now());
304        } else {
305            metrics.connections.failed_connections += 1;
306        }
307    }
308
309    /// Record a disconnection event
310    pub async fn record_disconnection(&self) {
311        let mut metrics = self.metrics.write().await;
312        if metrics.connections.active_connections > 0 {
313            metrics.connections.active_connections -= 1;
314        }
315    }
316
317    /// Record an error
318    fn record_error(&self, metrics: &mut MonitoringMetrics, error_type: &str) {
319        let category = Self::classify_error_type(error_type);
320        let now = SystemTime::now();
321
322        match category {
323            ErrorCategory::Network => metrics.errors.network_errors += 1,
324            ErrorCategory::Timeout => metrics.errors.timeout_errors += 1,
325            ErrorCategory::Session => metrics.errors.session_errors += 1,
326            ErrorCategory::RoutePath => metrics.errors.route_path_errors += 1,
327            ErrorCategory::CipProtocol => metrics.errors.protocol_errors += 1,
328            ErrorCategory::BatchEmbeddedService => {
329                metrics.errors.protocol_errors += 1;
330                metrics.errors.embedded_service_errors += 1;
331            }
332            ErrorCategory::KnownControllerLimitation => {
333                metrics.errors.protocol_errors += 1;
334                metrics.errors.known_controller_limitation_errors += 1;
335            }
336            ErrorCategory::DataType => metrics.errors.data_type_errors += 1,
337            ErrorCategory::NotFound => metrics.errors.tag_not_found_errors += 1,
338            ErrorCategory::Unknown => {}
339        }
340
341        if category.is_retriable() {
342            metrics.errors.retriable_errors += 1;
343            metrics.errors.last_retriable_error_time = Some(now);
344        } else {
345            metrics.errors.non_retriable_errors += 1;
346        }
347
348        metrics.errors.last_error_time = Some(now);
349        metrics.errors.last_error_message = Some(error_type.to_string());
350        metrics.errors.last_error_category = Some(category);
351        metrics.health.consecutive_failures += 1;
352        metrics.health.last_failure_time = Some(now);
353    }
354
355    pub fn classify_error(error: &EtherNetIpError) -> ErrorCategory {
356        match error {
357            EtherNetIpError::Io(_) => ErrorCategory::Network,
358            EtherNetIpError::Timeout(_) => ErrorCategory::Timeout,
359            EtherNetIpError::Connection(_) | EtherNetIpError::ConnectionLost(_) => {
360                ErrorCategory::Session
361            }
362            EtherNetIpError::TagNotFound(_) => ErrorCategory::NotFound,
363            EtherNetIpError::DataTypeMismatch { .. } => ErrorCategory::DataType,
364            EtherNetIpError::CipError { code, message }
365            | EtherNetIpError::ReadError {
366                status: code,
367                message,
368            }
369            | EtherNetIpError::WriteError {
370                status: code,
371                message,
372            } => Self::classify_status_and_message(Some(*code), message),
373            EtherNetIpError::Protocol(message)
374            | EtherNetIpError::InvalidResponse { reason: message }
375            | EtherNetIpError::Other(message)
376            | EtherNetIpError::Tag(message)
377            | EtherNetIpError::Subscription(message)
378            | EtherNetIpError::Udt(message)
379            | EtherNetIpError::Permission(message)
380            | EtherNetIpError::InvalidString { reason: message } => {
381                Self::classify_status_and_message(None, message)
382            }
383            EtherNetIpError::StringTooLong { .. } => ErrorCategory::DataType,
384            EtherNetIpError::Utf8(_) => ErrorCategory::DataType,
385        }
386    }
387
388    pub fn classify_error_type(error_type: &str) -> ErrorCategory {
389        match error_type {
390            "network" => ErrorCategory::Network,
391            "timeout" => ErrorCategory::Timeout,
392            "tag_not_found" => ErrorCategory::NotFound,
393            "data_type" => ErrorCategory::DataType,
394            "session" => ErrorCategory::Session,
395            "route_path" => ErrorCategory::RoutePath,
396            "embedded_service" => ErrorCategory::BatchEmbeddedService,
397            "known_controller_limitation" => ErrorCategory::KnownControllerLimitation,
398            "protocol" => ErrorCategory::CipProtocol,
399            other => Self::classify_status_and_message(None, other),
400        }
401    }
402
403    fn classify_status_and_message(status: Option<u8>, message: &str) -> ErrorCategory {
404        let lower = message.to_ascii_lowercase();
405
406        if status == Some(0x1E) || lower.contains("embedded service error") {
407            return ErrorCategory::BatchEmbeddedService;
408        }
409        if lower.contains("0x2107")
410            || lower.contains("controller rejected")
411            || lower.contains("does not support writing to udt array element members")
412        {
413            return ErrorCategory::KnownControllerLimitation;
414        }
415        if status == Some(0x04) || lower.contains("path segment error") || lower.contains("route") {
416            return ErrorCategory::RoutePath;
417        }
418        if lower.contains("timed out") || lower.contains("timeout") {
419            return ErrorCategory::Timeout;
420        }
421        if lower.contains("connection lost")
422            || lower.contains("plc unreachable")
423            || lower.contains("session")
424            || lower.contains("keep-alive")
425        {
426            return ErrorCategory::Session;
427        }
428        if lower.contains("tag not found") {
429            return ErrorCategory::NotFound;
430        }
431        if lower.contains("data type")
432            || lower.contains("invalid string")
433            || lower.contains("utf-8")
434        {
435            return ErrorCategory::DataType;
436        }
437        if lower.contains("io error") || lower.contains("network") {
438            return ErrorCategory::Network;
439        }
440        if status.is_some() || lower.contains("cip error") || lower.contains("protocol") {
441            return ErrorCategory::CipProtocol;
442        }
443
444        ErrorCategory::Unknown
445    }
446
447    /// Get current metrics
448    pub async fn get_metrics(&self) -> MonitoringMetrics {
449        let mut metrics = self.metrics.read().await.clone();
450
451        // Update system uptime
452        metrics.health.system_uptime = self.start_time.elapsed();
453
454        // Calculate operations per second
455        let total_time = metrics.health.system_uptime.as_secs_f64();
456        if total_time > 0.0 {
457            metrics.performance.reads_per_second =
458                metrics.operations.successful_reads as f64 / total_time;
459            metrics.performance.writes_per_second =
460                metrics.operations.successful_writes as f64 / total_time;
461        }
462
463        // Update health status
464        metrics.health.overall_health = self.calculate_health_status(&metrics);
465        metrics.health.last_health_check = SystemTime::now();
466        if metrics.health.last_verified_health_check.is_none() {
467            metrics.health.health_mode = HealthCheckMode::Passive;
468        }
469
470        metrics
471    }
472
473    /// Get a stable diagnostics snapshot for wrappers and service layers.
474    pub async fn get_diagnostics_snapshot(&self) -> DiagnosticsSnapshot {
475        let metrics = self.get_metrics().await;
476        DiagnosticsSnapshot {
477            captured_at: SystemTime::now(),
478            connections: metrics.connections,
479            operations: metrics.operations,
480            performance: metrics.performance,
481            errors: metrics.errors,
482            health: metrics.health,
483            system_metrics_are_placeholders: true,
484        }
485    }
486
487    /// Calculate overall health status
488    fn calculate_health_status(&self, metrics: &MonitoringMetrics) -> HealthStatus {
489        let error_rate = if metrics.operations.total_reads + metrics.operations.total_writes > 0 {
490            (metrics.operations.failed_reads + metrics.operations.failed_writes) as f64
491                / (metrics.operations.total_reads + metrics.operations.total_writes) as f64
492        } else {
493            0.0
494        };
495
496        if error_rate > 0.1 || metrics.health.consecutive_failures > 10 {
497            HealthStatus::Critical
498        } else if error_rate > 0.05 || metrics.health.consecutive_failures > 5 {
499            HealthStatus::Warning
500        } else if metrics.connections.active_connections > 0 {
501            HealthStatus::Healthy
502        } else {
503            HealthStatus::Unknown
504        }
505    }
506
507    /// Start monitoring background tasks
508    pub async fn start_monitoring(&self) {
509        let monitor = self.clone();
510        tokio::spawn(async move {
511            let mut interval = interval(Duration::from_secs(30));
512            loop {
513                interval.tick().await;
514                monitor.update_system_metrics().await;
515            }
516        });
517    }
518
519    /// Update system-level metrics
520    async fn update_system_metrics(&self) {
521        let mut metrics = self.metrics.write().await;
522
523        // Update memory usage (simplified)
524        metrics.performance.memory_usage_mb = self.get_memory_usage();
525
526        // Update CPU usage (simplified)
527        metrics.performance.cpu_usage_percent = self.get_cpu_usage();
528    }
529
530    /// Get current memory usage (simplified implementation)
531    fn get_memory_usage(&self) -> f64 {
532        // In a real implementation, you would use system APIs
533        // For now, return a placeholder
534        10.0
535    }
536
537    /// Get current CPU usage (simplified implementation)
538    fn get_cpu_usage(&self) -> f64 {
539        // In a real implementation, you would use system APIs
540        // For now, return a placeholder
541        5.0
542    }
543
544    /// Reset consecutive failures (call after successful recovery)
545    pub async fn reset_consecutive_failures(&self) {
546        let mut metrics = self.metrics.write().await;
547        metrics.health.consecutive_failures = 0;
548        metrics.health.recovery_attempts += 1;
549    }
550
551    /// Record the outcome of an active, verified health check.
552    pub async fn record_verified_health_check(&self, is_healthy: bool) {
553        let mut metrics = self.metrics.write().await;
554        let now = SystemTime::now();
555        metrics.health.health_mode = HealthCheckMode::Verified;
556        metrics.health.last_verified_health_check = Some(now);
557        metrics.health.last_health_check = now;
558
559        if is_healthy {
560            metrics.health.last_success_time = Some(now);
561            metrics.health.consecutive_failures = 0;
562        } else {
563            metrics.health.last_failure_time = Some(now);
564            metrics.health.consecutive_failures += 1;
565        }
566    }
567}
568
569impl Clone for ProductionMonitor {
570    fn clone(&self) -> Self {
571        Self {
572            metrics: Arc::clone(&self.metrics),
573            start_time: self.start_time,
574            system_start_time: self.system_start_time,
575        }
576    }
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582    use crate::error::EtherNetIpError;
583
584    #[test]
585    fn classify_timeout_and_route_path_errors() {
586        assert_eq!(
587            ProductionMonitor::classify_error(&EtherNetIpError::Timeout(Duration::from_secs(1))),
588            ErrorCategory::Timeout
589        );
590        assert_eq!(
591            ProductionMonitor::classify_error(&EtherNetIpError::Protocol(
592                "Path segment error while resolving route".to_string()
593            )),
594            ErrorCategory::RoutePath
595        );
596    }
597
598    #[test]
599    fn classify_known_controller_limitation_and_embedded_service() {
600        assert_eq!(
601            ProductionMonitor::classify_error(&EtherNetIpError::Protocol(
602                "Vendor-specific or composite extended error: 0x2107".to_string()
603            )),
604            ErrorCategory::KnownControllerLimitation
605        );
606        assert_eq!(
607            ProductionMonitor::classify_error(&EtherNetIpError::WriteError {
608                status: 0x1E,
609                message: "Embedded service error".to_string(),
610            }),
611            ErrorCategory::BatchEmbeddedService
612        );
613    }
614
615    #[tokio::test]
616    async fn diagnostics_snapshot_distinguishes_verified_health() {
617        let monitor = ProductionMonitor::new();
618        monitor.record_read_success(Duration::from_millis(10)).await;
619
620        let passive = monitor.get_diagnostics_snapshot().await;
621        assert_eq!(passive.health.health_mode, HealthCheckMode::Passive);
622        assert!(passive.health.last_verified_health_check.is_none());
623        assert!(passive.operations.last_successful_read_time.is_some());
624
625        monitor.record_verified_health_check(true).await;
626        let verified = monitor.get_diagnostics_snapshot().await;
627        assert_eq!(verified.health.health_mode, HealthCheckMode::Verified);
628        assert!(verified.health.last_verified_health_check.is_some());
629        assert!(verified.system_metrics_are_placeholders);
630    }
631}