1use serde::{Deserialize, Serialize};
2use std::time::{Duration, Instant, SystemTime};
3use tokio::sync::RwLock;
4use tokio::time::interval;
5
6#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct MonitoringMetrics {
9 pub connections: ConnectionMetrics,
11 pub operations: OperationMetrics,
13 pub performance: PerformanceMetrics,
15 pub errors: ErrorMetrics,
17 pub health: HealthMetrics,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ConnectionMetrics {
23 pub active_connections: u32,
24 pub total_connections: u64,
25 pub failed_connections: u64,
26 pub connection_uptime_avg: Duration,
27 pub last_connection_time: Option<SystemTime>,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct OperationMetrics {
32 pub total_reads: u64,
33 pub total_writes: u64,
34 pub successful_reads: u64,
35 pub successful_writes: u64,
36 pub failed_reads: u64,
37 pub failed_writes: u64,
38 pub batch_operations: u64,
39 pub subscription_updates: u64,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct PerformanceMetrics {
44 pub avg_read_latency_ms: f64,
45 pub avg_write_latency_ms: f64,
46 pub max_read_latency_ms: f64,
47 pub max_write_latency_ms: f64,
48 pub reads_per_second: f64,
49 pub writes_per_second: f64,
50 pub memory_usage_mb: f64,
51 pub cpu_usage_percent: f64,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct ErrorMetrics {
56 pub network_errors: u64,
57 pub protocol_errors: u64,
58 pub timeout_errors: u64,
59 pub tag_not_found_errors: u64,
60 pub data_type_errors: u64,
61 pub last_error_time: Option<SystemTime>,
62 pub last_error_message: Option<String>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct HealthMetrics {
67 pub overall_health: HealthStatus,
68 pub last_health_check: SystemTime,
69 pub consecutive_failures: u32,
70 pub recovery_attempts: u32,
71 pub system_uptime: Duration,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub enum HealthStatus {
76 Healthy,
77 Warning,
78 Critical,
79 Unknown,
80}
81
82pub struct ProductionMonitor {
84 metrics: RwLock<MonitoringMetrics>,
85 start_time: Instant,
86 system_start_time: SystemTime,
87}
88
89impl Default for ProductionMonitor {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl ProductionMonitor {
96 pub fn new() -> Self {
97 Self {
98 metrics: RwLock::new(MonitoringMetrics {
99 connections: ConnectionMetrics {
100 active_connections: 0,
101 total_connections: 0,
102 failed_connections: 0,
103 connection_uptime_avg: Duration::ZERO,
104 last_connection_time: None,
105 },
106 operations: OperationMetrics {
107 total_reads: 0,
108 total_writes: 0,
109 successful_reads: 0,
110 successful_writes: 0,
111 failed_reads: 0,
112 failed_writes: 0,
113 batch_operations: 0,
114 subscription_updates: 0,
115 },
116 performance: PerformanceMetrics {
117 avg_read_latency_ms: 0.0,
118 avg_write_latency_ms: 0.0,
119 max_read_latency_ms: 0.0,
120 max_write_latency_ms: 0.0,
121 reads_per_second: 0.0,
122 writes_per_second: 0.0,
123 memory_usage_mb: 0.0,
124 cpu_usage_percent: 0.0,
125 },
126 errors: ErrorMetrics {
127 network_errors: 0,
128 protocol_errors: 0,
129 timeout_errors: 0,
130 tag_not_found_errors: 0,
131 data_type_errors: 0,
132 last_error_time: None,
133 last_error_message: None,
134 },
135 health: HealthMetrics {
136 overall_health: HealthStatus::Unknown,
137 last_health_check: SystemTime::now(),
138 consecutive_failures: 0,
139 recovery_attempts: 0,
140 system_uptime: Duration::ZERO,
141 },
142 }),
143 start_time: Instant::now(),
144 system_start_time: SystemTime::now(),
145 }
146 }
147
148 pub async fn record_read_success(&self, latency: Duration) {
150 let mut metrics = self.metrics.write().await;
151 metrics.operations.total_reads += 1;
152 metrics.operations.successful_reads += 1;
153
154 let latency_ms = latency.as_millis() as f64;
156 metrics.performance.avg_read_latency_ms = (metrics.performance.avg_read_latency_ms
157 * (metrics.operations.successful_reads - 1) as f64
158 + latency_ms)
159 / metrics.operations.successful_reads as f64;
160
161 if latency_ms > metrics.performance.max_read_latency_ms {
162 metrics.performance.max_read_latency_ms = latency_ms;
163 }
164 }
165
166 pub async fn record_read_failure(&self, error_type: &str) {
168 let mut metrics = self.metrics.write().await;
169 metrics.operations.total_reads += 1;
170 metrics.operations.failed_reads += 1;
171 self.record_error(&mut metrics, error_type).await;
172 }
173
174 pub async fn record_write_success(&self, latency: Duration) {
176 let mut metrics = self.metrics.write().await;
177 metrics.operations.total_writes += 1;
178 metrics.operations.successful_writes += 1;
179
180 let latency_ms = latency.as_millis() as f64;
182 metrics.performance.avg_write_latency_ms = (metrics.performance.avg_write_latency_ms
183 * (metrics.operations.successful_writes - 1) as f64
184 + latency_ms)
185 / metrics.operations.successful_writes as f64;
186
187 if latency_ms > metrics.performance.max_write_latency_ms {
188 metrics.performance.max_write_latency_ms = latency_ms;
189 }
190 }
191
192 pub async fn record_write_failure(&self, error_type: &str) {
194 let mut metrics = self.metrics.write().await;
195 metrics.operations.total_writes += 1;
196 metrics.operations.failed_writes += 1;
197 self.record_error(&mut metrics, error_type).await;
198 }
199
200 pub async fn record_connection(&self, success: bool) {
202 let mut metrics = self.metrics.write().await;
203 if success {
204 metrics.connections.total_connections += 1;
205 metrics.connections.active_connections += 1;
206 metrics.connections.last_connection_time = Some(SystemTime::now());
207 } else {
208 metrics.connections.failed_connections += 1;
209 }
210 }
211
212 pub async fn record_disconnection(&self) {
214 let mut metrics = self.metrics.write().await;
215 if metrics.connections.active_connections > 0 {
216 metrics.connections.active_connections -= 1;
217 }
218 }
219
220 async fn record_error(&self, metrics: &mut MonitoringMetrics, error_type: &str) {
222 match error_type {
223 "network" => metrics.errors.network_errors += 1,
224 "protocol" => metrics.errors.protocol_errors += 1,
225 "timeout" => metrics.errors.timeout_errors += 1,
226 "tag_not_found" => metrics.errors.tag_not_found_errors += 1,
227 "data_type" => metrics.errors.data_type_errors += 1,
228 _ => {}
229 }
230
231 metrics.errors.last_error_time = Some(SystemTime::now());
232 metrics.errors.last_error_message = Some(error_type.to_string());
233 metrics.health.consecutive_failures += 1;
234 }
235
236 pub async fn get_metrics(&self) -> MonitoringMetrics {
238 let mut metrics = self.metrics.read().await.clone();
239
240 metrics.health.system_uptime = self.start_time.elapsed();
242
243 let total_time = metrics.health.system_uptime.as_secs_f64();
245 if total_time > 0.0 {
246 metrics.performance.reads_per_second =
247 metrics.operations.successful_reads as f64 / total_time;
248 metrics.performance.writes_per_second =
249 metrics.operations.successful_writes as f64 / total_time;
250 }
251
252 metrics.health.overall_health = self.calculate_health_status(&metrics);
254 metrics.health.last_health_check = SystemTime::now();
255
256 metrics
257 }
258
259 fn calculate_health_status(&self, metrics: &MonitoringMetrics) -> HealthStatus {
261 let error_rate = if metrics.operations.total_reads + metrics.operations.total_writes > 0 {
262 (metrics.operations.failed_reads + metrics.operations.failed_writes) as f64
263 / (metrics.operations.total_reads + metrics.operations.total_writes) as f64
264 } else {
265 0.0
266 };
267
268 if error_rate > 0.1 || metrics.health.consecutive_failures > 10 {
269 HealthStatus::Critical
270 } else if error_rate > 0.05 || metrics.health.consecutive_failures > 5 {
271 HealthStatus::Warning
272 } else if metrics.connections.active_connections > 0 {
273 HealthStatus::Healthy
274 } else {
275 HealthStatus::Unknown
276 }
277 }
278
279 pub async fn start_monitoring(&self) {
281 let monitor = self.clone();
282 tokio::spawn(async move {
283 let mut interval = interval(Duration::from_secs(30));
284 loop {
285 interval.tick().await;
286 monitor.update_system_metrics().await;
287 }
288 });
289 }
290
291 async fn update_system_metrics(&self) {
293 let mut metrics = self.metrics.write().await;
294
295 metrics.performance.memory_usage_mb = self.get_memory_usage();
297
298 metrics.performance.cpu_usage_percent = self.get_cpu_usage();
300 }
301
302 fn get_memory_usage(&self) -> f64 {
304 10.0
307 }
308
309 fn get_cpu_usage(&self) -> f64 {
311 5.0
314 }
315
316 pub async fn reset_consecutive_failures(&self) {
318 let mut metrics = self.metrics.write().await;
319 metrics.health.consecutive_failures = 0;
320 metrics.health.recovery_attempts += 1;
321 }
322}
323
324impl Clone for ProductionMonitor {
325 fn clone(&self) -> Self {
326 Self {
327 metrics: RwLock::new(MonitoringMetrics {
328 connections: ConnectionMetrics {
329 active_connections: 0,
330 total_connections: 0,
331 failed_connections: 0,
332 connection_uptime_avg: Duration::ZERO,
333 last_connection_time: None,
334 },
335 operations: OperationMetrics {
336 total_reads: 0,
337 total_writes: 0,
338 successful_reads: 0,
339 successful_writes: 0,
340 failed_reads: 0,
341 failed_writes: 0,
342 batch_operations: 0,
343 subscription_updates: 0,
344 },
345 performance: PerformanceMetrics {
346 avg_read_latency_ms: 0.0,
347 avg_write_latency_ms: 0.0,
348 max_read_latency_ms: 0.0,
349 max_write_latency_ms: 0.0,
350 reads_per_second: 0.0,
351 writes_per_second: 0.0,
352 memory_usage_mb: 0.0,
353 cpu_usage_percent: 0.0,
354 },
355 errors: ErrorMetrics {
356 network_errors: 0,
357 protocol_errors: 0,
358 timeout_errors: 0,
359 tag_not_found_errors: 0,
360 data_type_errors: 0,
361 last_error_time: None,
362 last_error_message: None,
363 },
364 health: HealthMetrics {
365 overall_health: HealthStatus::Unknown,
366 last_health_check: SystemTime::now(),
367 consecutive_failures: 0,
368 recovery_attempts: 0,
369 system_uptime: Duration::ZERO,
370 },
371 }),
372 start_time: self.start_time,
373 system_start_time: self.system_start_time,
374 }
375 }
376}