Skip to main content

rust_ethernet_ip/
monitoring.rs

1use serde::{Deserialize, Serialize};
2use std::sync::Arc;
3use std::time::{Duration, Instant, SystemTime};
4use tokio::sync::RwLock;
5use tokio::time::interval;
6
7/// Production monitoring metrics for the EtherNet/IP library
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct MonitoringMetrics {
10    /// Connection statistics
11    pub connections: ConnectionMetrics,
12    /// Operation statistics
13    pub operations: OperationMetrics,
14    /// Performance statistics
15    pub performance: PerformanceMetrics,
16    /// Error statistics
17    pub errors: ErrorMetrics,
18    /// System health
19    pub health: HealthMetrics,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ConnectionMetrics {
24    pub active_connections: u32,
25    pub total_connections: u64,
26    pub failed_connections: u64,
27    pub connection_uptime_avg: Duration,
28    pub last_connection_time: Option<SystemTime>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct OperationMetrics {
33    pub total_reads: u64,
34    pub total_writes: u64,
35    pub successful_reads: u64,
36    pub successful_writes: u64,
37    pub failed_reads: u64,
38    pub failed_writes: u64,
39    pub batch_operations: u64,
40    pub subscription_updates: u64,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PerformanceMetrics {
45    pub avg_read_latency_ms: f64,
46    pub avg_write_latency_ms: f64,
47    pub max_read_latency_ms: f64,
48    pub max_write_latency_ms: f64,
49    pub reads_per_second: f64,
50    pub writes_per_second: f64,
51    pub memory_usage_mb: f64,
52    pub cpu_usage_percent: f64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ErrorMetrics {
57    pub network_errors: u64,
58    pub protocol_errors: u64,
59    pub timeout_errors: u64,
60    pub tag_not_found_errors: u64,
61    pub data_type_errors: u64,
62    pub last_error_time: Option<SystemTime>,
63    pub last_error_message: Option<String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct HealthMetrics {
68    pub overall_health: HealthStatus,
69    pub last_health_check: SystemTime,
70    pub consecutive_failures: u32,
71    pub recovery_attempts: u32,
72    pub system_uptime: Duration,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum HealthStatus {
77    Healthy,
78    Warning,
79    Critical,
80    Unknown,
81}
82
83/// Production monitoring system for EtherNet/IP operations
84pub struct ProductionMonitor {
85    metrics: Arc<RwLock<MonitoringMetrics>>,
86    start_time: Instant,
87    system_start_time: SystemTime,
88}
89
90impl Default for ProductionMonitor {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96impl ProductionMonitor {
97    pub fn new() -> Self {
98        Self {
99            metrics: Arc::new(RwLock::new(MonitoringMetrics {
100                connections: ConnectionMetrics {
101                    active_connections: 0,
102                    total_connections: 0,
103                    failed_connections: 0,
104                    connection_uptime_avg: Duration::ZERO,
105                    last_connection_time: None,
106                },
107                operations: OperationMetrics {
108                    total_reads: 0,
109                    total_writes: 0,
110                    successful_reads: 0,
111                    successful_writes: 0,
112                    failed_reads: 0,
113                    failed_writes: 0,
114                    batch_operations: 0,
115                    subscription_updates: 0,
116                },
117                performance: PerformanceMetrics {
118                    avg_read_latency_ms: 0.0,
119                    avg_write_latency_ms: 0.0,
120                    max_read_latency_ms: 0.0,
121                    max_write_latency_ms: 0.0,
122                    reads_per_second: 0.0,
123                    writes_per_second: 0.0,
124                    memory_usage_mb: 0.0,
125                    cpu_usage_percent: 0.0,
126                },
127                errors: ErrorMetrics {
128                    network_errors: 0,
129                    protocol_errors: 0,
130                    timeout_errors: 0,
131                    tag_not_found_errors: 0,
132                    data_type_errors: 0,
133                    last_error_time: None,
134                    last_error_message: None,
135                },
136                health: HealthMetrics {
137                    overall_health: HealthStatus::Unknown,
138                    last_health_check: SystemTime::now(),
139                    consecutive_failures: 0,
140                    recovery_attempts: 0,
141                    system_uptime: Duration::ZERO,
142                },
143            })),
144            start_time: Instant::now(),
145            system_start_time: SystemTime::now(),
146        }
147    }
148
149    /// Record a successful read operation
150    pub async fn record_read_success(&self, latency: Duration) {
151        let mut metrics = self.metrics.write().await;
152        metrics.operations.total_reads += 1;
153        metrics.operations.successful_reads += 1;
154
155        // Update latency metrics
156        let latency_ms = latency.as_millis() as f64;
157        metrics.performance.avg_read_latency_ms = (metrics.performance.avg_read_latency_ms
158            * (metrics.operations.successful_reads - 1) as f64
159            + latency_ms)
160            / metrics.operations.successful_reads as f64;
161
162        if latency_ms > metrics.performance.max_read_latency_ms {
163            metrics.performance.max_read_latency_ms = latency_ms;
164        }
165    }
166
167    /// Record a failed read operation
168    pub async fn record_read_failure(&self, error_type: &str) {
169        let mut metrics = self.metrics.write().await;
170        metrics.operations.total_reads += 1;
171        metrics.operations.failed_reads += 1;
172        self.record_error(&mut metrics, error_type).await;
173    }
174
175    /// Record a successful write operation
176    pub async fn record_write_success(&self, latency: Duration) {
177        let mut metrics = self.metrics.write().await;
178        metrics.operations.total_writes += 1;
179        metrics.operations.successful_writes += 1;
180
181        // Update latency metrics
182        let latency_ms = latency.as_millis() as f64;
183        metrics.performance.avg_write_latency_ms = (metrics.performance.avg_write_latency_ms
184            * (metrics.operations.successful_writes - 1) as f64
185            + latency_ms)
186            / metrics.operations.successful_writes as f64;
187
188        if latency_ms > metrics.performance.max_write_latency_ms {
189            metrics.performance.max_write_latency_ms = latency_ms;
190        }
191    }
192
193    /// Record a failed write operation
194    pub async fn record_write_failure(&self, error_type: &str) {
195        let mut metrics = self.metrics.write().await;
196        metrics.operations.total_writes += 1;
197        metrics.operations.failed_writes += 1;
198        self.record_error(&mut metrics, error_type).await;
199    }
200
201    /// Record a connection event
202    pub async fn record_connection(&self, success: bool) {
203        let mut metrics = self.metrics.write().await;
204        if success {
205            metrics.connections.total_connections += 1;
206            metrics.connections.active_connections += 1;
207            metrics.connections.last_connection_time = Some(SystemTime::now());
208        } else {
209            metrics.connections.failed_connections += 1;
210        }
211    }
212
213    /// Record a disconnection event
214    pub async fn record_disconnection(&self) {
215        let mut metrics = self.metrics.write().await;
216        if metrics.connections.active_connections > 0 {
217            metrics.connections.active_connections -= 1;
218        }
219    }
220
221    /// Record an error
222    async fn record_error(&self, metrics: &mut MonitoringMetrics, error_type: &str) {
223        match error_type {
224            "network" => metrics.errors.network_errors += 1,
225            "protocol" => metrics.errors.protocol_errors += 1,
226            "timeout" => metrics.errors.timeout_errors += 1,
227            "tag_not_found" => metrics.errors.tag_not_found_errors += 1,
228            "data_type" => metrics.errors.data_type_errors += 1,
229            _ => {}
230        }
231
232        metrics.errors.last_error_time = Some(SystemTime::now());
233        metrics.errors.last_error_message = Some(error_type.to_string());
234        metrics.health.consecutive_failures += 1;
235    }
236
237    /// Get current metrics
238    pub async fn get_metrics(&self) -> MonitoringMetrics {
239        let mut metrics = self.metrics.read().await.clone();
240
241        // Update system uptime
242        metrics.health.system_uptime = self.start_time.elapsed();
243
244        // Calculate operations per second
245        let total_time = metrics.health.system_uptime.as_secs_f64();
246        if total_time > 0.0 {
247            metrics.performance.reads_per_second =
248                metrics.operations.successful_reads as f64 / total_time;
249            metrics.performance.writes_per_second =
250                metrics.operations.successful_writes as f64 / total_time;
251        }
252
253        // Update health status
254        metrics.health.overall_health = self.calculate_health_status(&metrics);
255        metrics.health.last_health_check = SystemTime::now();
256
257        metrics
258    }
259
260    /// Calculate overall health status
261    fn calculate_health_status(&self, metrics: &MonitoringMetrics) -> HealthStatus {
262        let error_rate = if metrics.operations.total_reads + metrics.operations.total_writes > 0 {
263            (metrics.operations.failed_reads + metrics.operations.failed_writes) as f64
264                / (metrics.operations.total_reads + metrics.operations.total_writes) as f64
265        } else {
266            0.0
267        };
268
269        if error_rate > 0.1 || metrics.health.consecutive_failures > 10 {
270            HealthStatus::Critical
271        } else if error_rate > 0.05 || metrics.health.consecutive_failures > 5 {
272            HealthStatus::Warning
273        } else if metrics.connections.active_connections > 0 {
274            HealthStatus::Healthy
275        } else {
276            HealthStatus::Unknown
277        }
278    }
279
280    /// Start monitoring background tasks
281    pub async fn start_monitoring(&self) {
282        let monitor = self.clone();
283        tokio::spawn(async move {
284            let mut interval = interval(Duration::from_secs(30));
285            loop {
286                interval.tick().await;
287                monitor.update_system_metrics().await;
288            }
289        });
290    }
291
292    /// Update system-level metrics
293    async fn update_system_metrics(&self) {
294        let mut metrics = self.metrics.write().await;
295
296        // Update memory usage (simplified)
297        metrics.performance.memory_usage_mb = self.get_memory_usage();
298
299        // Update CPU usage (simplified)
300        metrics.performance.cpu_usage_percent = self.get_cpu_usage();
301    }
302
303    /// Get current memory usage (simplified implementation)
304    fn get_memory_usage(&self) -> f64 {
305        // In a real implementation, you would use system APIs
306        // For now, return a placeholder
307        10.0
308    }
309
310    /// Get current CPU usage (simplified implementation)
311    fn get_cpu_usage(&self) -> f64 {
312        // In a real implementation, you would use system APIs
313        // For now, return a placeholder
314        5.0
315    }
316
317    /// Reset consecutive failures (call after successful recovery)
318    pub async fn reset_consecutive_failures(&self) {
319        let mut metrics = self.metrics.write().await;
320        metrics.health.consecutive_failures = 0;
321        metrics.health.recovery_attempts += 1;
322    }
323}
324
325impl Clone for ProductionMonitor {
326    fn clone(&self) -> Self {
327        Self {
328            metrics: Arc::clone(&self.metrics),
329            start_time: self.start_time,
330            system_start_time: self.system_start_time,
331        }
332    }
333}