Skip to main content

rust_ethernet_ip/
monitoring.rs

1use serde::{Deserialize, Serialize};
2use std::time::{Duration, Instant, SystemTime};
3use tokio::sync::RwLock;
4use tokio::time::interval;
5
6/// Production monitoring metrics for the EtherNet/IP library
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct MonitoringMetrics {
9    /// Connection statistics
10    pub connections: ConnectionMetrics,
11    /// Operation statistics
12    pub operations: OperationMetrics,
13    /// Performance statistics
14    pub performance: PerformanceMetrics,
15    /// Error statistics
16    pub errors: ErrorMetrics,
17    /// System health
18    pub health: HealthMetrics,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ConnectionMetrics {
23    pub active_connections: u32,
24    pub total_connections: u64,
25    pub failed_connections: u64,
26    pub connection_uptime_avg: Duration,
27    pub last_connection_time: Option<SystemTime>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct OperationMetrics {
32    pub total_reads: u64,
33    pub total_writes: u64,
34    pub successful_reads: u64,
35    pub successful_writes: u64,
36    pub failed_reads: u64,
37    pub failed_writes: u64,
38    pub batch_operations: u64,
39    pub subscription_updates: u64,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct PerformanceMetrics {
44    pub avg_read_latency_ms: f64,
45    pub avg_write_latency_ms: f64,
46    pub max_read_latency_ms: f64,
47    pub max_write_latency_ms: f64,
48    pub reads_per_second: f64,
49    pub writes_per_second: f64,
50    pub memory_usage_mb: f64,
51    pub cpu_usage_percent: f64,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ErrorMetrics {
56    pub network_errors: u64,
57    pub protocol_errors: u64,
58    pub timeout_errors: u64,
59    pub tag_not_found_errors: u64,
60    pub data_type_errors: u64,
61    pub last_error_time: Option<SystemTime>,
62    pub last_error_message: Option<String>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct HealthMetrics {
67    pub overall_health: HealthStatus,
68    pub last_health_check: SystemTime,
69    pub consecutive_failures: u32,
70    pub recovery_attempts: u32,
71    pub system_uptime: Duration,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub enum HealthStatus {
76    Healthy,
77    Warning,
78    Critical,
79    Unknown,
80}
81
82/// Production monitoring system for EtherNet/IP operations
83pub struct ProductionMonitor {
84    metrics: RwLock<MonitoringMetrics>,
85    start_time: Instant,
86    system_start_time: SystemTime,
87}
88
89impl Default for ProductionMonitor {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl ProductionMonitor {
96    pub fn new() -> Self {
97        Self {
98            metrics: RwLock::new(MonitoringMetrics {
99                connections: ConnectionMetrics {
100                    active_connections: 0,
101                    total_connections: 0,
102                    failed_connections: 0,
103                    connection_uptime_avg: Duration::ZERO,
104                    last_connection_time: None,
105                },
106                operations: OperationMetrics {
107                    total_reads: 0,
108                    total_writes: 0,
109                    successful_reads: 0,
110                    successful_writes: 0,
111                    failed_reads: 0,
112                    failed_writes: 0,
113                    batch_operations: 0,
114                    subscription_updates: 0,
115                },
116                performance: PerformanceMetrics {
117                    avg_read_latency_ms: 0.0,
118                    avg_write_latency_ms: 0.0,
119                    max_read_latency_ms: 0.0,
120                    max_write_latency_ms: 0.0,
121                    reads_per_second: 0.0,
122                    writes_per_second: 0.0,
123                    memory_usage_mb: 0.0,
124                    cpu_usage_percent: 0.0,
125                },
126                errors: ErrorMetrics {
127                    network_errors: 0,
128                    protocol_errors: 0,
129                    timeout_errors: 0,
130                    tag_not_found_errors: 0,
131                    data_type_errors: 0,
132                    last_error_time: None,
133                    last_error_message: None,
134                },
135                health: HealthMetrics {
136                    overall_health: HealthStatus::Unknown,
137                    last_health_check: SystemTime::now(),
138                    consecutive_failures: 0,
139                    recovery_attempts: 0,
140                    system_uptime: Duration::ZERO,
141                },
142            }),
143            start_time: Instant::now(),
144            system_start_time: SystemTime::now(),
145        }
146    }
147
148    /// Record a successful read operation
149    pub async fn record_read_success(&self, latency: Duration) {
150        let mut metrics = self.metrics.write().await;
151        metrics.operations.total_reads += 1;
152        metrics.operations.successful_reads += 1;
153
154        // Update latency metrics
155        let latency_ms = latency.as_millis() as f64;
156        metrics.performance.avg_read_latency_ms = (metrics.performance.avg_read_latency_ms
157            * (metrics.operations.successful_reads - 1) as f64
158            + latency_ms)
159            / metrics.operations.successful_reads as f64;
160
161        if latency_ms > metrics.performance.max_read_latency_ms {
162            metrics.performance.max_read_latency_ms = latency_ms;
163        }
164    }
165
166    /// Record a failed read operation
167    pub async fn record_read_failure(&self, error_type: &str) {
168        let mut metrics = self.metrics.write().await;
169        metrics.operations.total_reads += 1;
170        metrics.operations.failed_reads += 1;
171        self.record_error(&mut metrics, error_type).await;
172    }
173
174    /// Record a successful write operation
175    pub async fn record_write_success(&self, latency: Duration) {
176        let mut metrics = self.metrics.write().await;
177        metrics.operations.total_writes += 1;
178        metrics.operations.successful_writes += 1;
179
180        // Update latency metrics
181        let latency_ms = latency.as_millis() as f64;
182        metrics.performance.avg_write_latency_ms = (metrics.performance.avg_write_latency_ms
183            * (metrics.operations.successful_writes - 1) as f64
184            + latency_ms)
185            / metrics.operations.successful_writes as f64;
186
187        if latency_ms > metrics.performance.max_write_latency_ms {
188            metrics.performance.max_write_latency_ms = latency_ms;
189        }
190    }
191
192    /// Record a failed write operation
193    pub async fn record_write_failure(&self, error_type: &str) {
194        let mut metrics = self.metrics.write().await;
195        metrics.operations.total_writes += 1;
196        metrics.operations.failed_writes += 1;
197        self.record_error(&mut metrics, error_type).await;
198    }
199
200    /// Record a connection event
201    pub async fn record_connection(&self, success: bool) {
202        let mut metrics = self.metrics.write().await;
203        if success {
204            metrics.connections.total_connections += 1;
205            metrics.connections.active_connections += 1;
206            metrics.connections.last_connection_time = Some(SystemTime::now());
207        } else {
208            metrics.connections.failed_connections += 1;
209        }
210    }
211
212    /// Record a disconnection event
213    pub async fn record_disconnection(&self) {
214        let mut metrics = self.metrics.write().await;
215        if metrics.connections.active_connections > 0 {
216            metrics.connections.active_connections -= 1;
217        }
218    }
219
220    /// Record an error
221    async fn record_error(&self, metrics: &mut MonitoringMetrics, error_type: &str) {
222        match error_type {
223            "network" => metrics.errors.network_errors += 1,
224            "protocol" => metrics.errors.protocol_errors += 1,
225            "timeout" => metrics.errors.timeout_errors += 1,
226            "tag_not_found" => metrics.errors.tag_not_found_errors += 1,
227            "data_type" => metrics.errors.data_type_errors += 1,
228            _ => {}
229        }
230
231        metrics.errors.last_error_time = Some(SystemTime::now());
232        metrics.errors.last_error_message = Some(error_type.to_string());
233        metrics.health.consecutive_failures += 1;
234    }
235
236    /// Get current metrics
237    pub async fn get_metrics(&self) -> MonitoringMetrics {
238        let mut metrics = self.metrics.read().await.clone();
239
240        // Update system uptime
241        metrics.health.system_uptime = self.start_time.elapsed();
242
243        // Calculate operations per second
244        let total_time = metrics.health.system_uptime.as_secs_f64();
245        if total_time > 0.0 {
246            metrics.performance.reads_per_second =
247                metrics.operations.successful_reads as f64 / total_time;
248            metrics.performance.writes_per_second =
249                metrics.operations.successful_writes as f64 / total_time;
250        }
251
252        // Update health status
253        metrics.health.overall_health = self.calculate_health_status(&metrics);
254        metrics.health.last_health_check = SystemTime::now();
255
256        metrics
257    }
258
259    /// Calculate overall health status
260    fn calculate_health_status(&self, metrics: &MonitoringMetrics) -> HealthStatus {
261        let error_rate = if metrics.operations.total_reads + metrics.operations.total_writes > 0 {
262            (metrics.operations.failed_reads + metrics.operations.failed_writes) as f64
263                / (metrics.operations.total_reads + metrics.operations.total_writes) as f64
264        } else {
265            0.0
266        };
267
268        if error_rate > 0.1 || metrics.health.consecutive_failures > 10 {
269            HealthStatus::Critical
270        } else if error_rate > 0.05 || metrics.health.consecutive_failures > 5 {
271            HealthStatus::Warning
272        } else if metrics.connections.active_connections > 0 {
273            HealthStatus::Healthy
274        } else {
275            HealthStatus::Unknown
276        }
277    }
278
279    /// Start monitoring background tasks
280    pub async fn start_monitoring(&self) {
281        let monitor = self.clone();
282        tokio::spawn(async move {
283            let mut interval = interval(Duration::from_secs(30));
284            loop {
285                interval.tick().await;
286                monitor.update_system_metrics().await;
287            }
288        });
289    }
290
291    /// Update system-level metrics
292    async fn update_system_metrics(&self) {
293        let mut metrics = self.metrics.write().await;
294
295        // Update memory usage (simplified)
296        metrics.performance.memory_usage_mb = self.get_memory_usage();
297
298        // Update CPU usage (simplified)
299        metrics.performance.cpu_usage_percent = self.get_cpu_usage();
300    }
301
302    /// Get current memory usage (simplified implementation)
303    fn get_memory_usage(&self) -> f64 {
304        // In a real implementation, you would use system APIs
305        // For now, return a placeholder
306        10.0
307    }
308
309    /// Get current CPU usage (simplified implementation)
310    fn get_cpu_usage(&self) -> f64 {
311        // In a real implementation, you would use system APIs
312        // For now, return a placeholder
313        5.0
314    }
315
316    /// Reset consecutive failures (call after successful recovery)
317    pub async fn reset_consecutive_failures(&self) {
318        let mut metrics = self.metrics.write().await;
319        metrics.health.consecutive_failures = 0;
320        metrics.health.recovery_attempts += 1;
321    }
322}
323
324impl Clone for ProductionMonitor {
325    fn clone(&self) -> Self {
326        Self {
327            metrics: RwLock::new(MonitoringMetrics {
328                connections: ConnectionMetrics {
329                    active_connections: 0,
330                    total_connections: 0,
331                    failed_connections: 0,
332                    connection_uptime_avg: Duration::ZERO,
333                    last_connection_time: None,
334                },
335                operations: OperationMetrics {
336                    total_reads: 0,
337                    total_writes: 0,
338                    successful_reads: 0,
339                    successful_writes: 0,
340                    failed_reads: 0,
341                    failed_writes: 0,
342                    batch_operations: 0,
343                    subscription_updates: 0,
344                },
345                performance: PerformanceMetrics {
346                    avg_read_latency_ms: 0.0,
347                    avg_write_latency_ms: 0.0,
348                    max_read_latency_ms: 0.0,
349                    max_write_latency_ms: 0.0,
350                    reads_per_second: 0.0,
351                    writes_per_second: 0.0,
352                    memory_usage_mb: 0.0,
353                    cpu_usage_percent: 0.0,
354                },
355                errors: ErrorMetrics {
356                    network_errors: 0,
357                    protocol_errors: 0,
358                    timeout_errors: 0,
359                    tag_not_found_errors: 0,
360                    data_type_errors: 0,
361                    last_error_time: None,
362                    last_error_message: None,
363                },
364                health: HealthMetrics {
365                    overall_health: HealthStatus::Unknown,
366                    last_health_check: SystemTime::now(),
367                    consecutive_failures: 0,
368                    recovery_attempts: 0,
369                    system_uptime: Duration::ZERO,
370                },
371            }),
372            start_time: self.start_time,
373            system_start_time: self.system_start_time,
374        }
375    }
376}