1use serde::{Deserialize, Serialize};
2use std::sync::Arc;
3use std::time::{Duration, Instant, SystemTime};
4use tokio::sync::RwLock;
5use tokio::time::interval;
6
7#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct MonitoringMetrics {
10 pub connections: ConnectionMetrics,
12 pub operations: OperationMetrics,
14 pub performance: PerformanceMetrics,
16 pub errors: ErrorMetrics,
18 pub health: HealthMetrics,
20}
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ConnectionMetrics {
24 pub active_connections: u32,
25 pub total_connections: u64,
26 pub failed_connections: u64,
27 pub connection_uptime_avg: Duration,
28 pub last_connection_time: Option<SystemTime>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct OperationMetrics {
33 pub total_reads: u64,
34 pub total_writes: u64,
35 pub successful_reads: u64,
36 pub successful_writes: u64,
37 pub failed_reads: u64,
38 pub failed_writes: u64,
39 pub batch_operations: u64,
40 pub subscription_updates: u64,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct PerformanceMetrics {
45 pub avg_read_latency_ms: f64,
46 pub avg_write_latency_ms: f64,
47 pub max_read_latency_ms: f64,
48 pub max_write_latency_ms: f64,
49 pub reads_per_second: f64,
50 pub writes_per_second: f64,
51 pub memory_usage_mb: f64,
52 pub cpu_usage_percent: f64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct ErrorMetrics {
57 pub network_errors: u64,
58 pub protocol_errors: u64,
59 pub timeout_errors: u64,
60 pub tag_not_found_errors: u64,
61 pub data_type_errors: u64,
62 pub last_error_time: Option<SystemTime>,
63 pub last_error_message: Option<String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct HealthMetrics {
68 pub overall_health: HealthStatus,
69 pub last_health_check: SystemTime,
70 pub consecutive_failures: u32,
71 pub recovery_attempts: u32,
72 pub system_uptime: Duration,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub enum HealthStatus {
77 Healthy,
78 Warning,
79 Critical,
80 Unknown,
81}
82
83pub struct ProductionMonitor {
85 metrics: Arc<RwLock<MonitoringMetrics>>,
86 start_time: Instant,
87 system_start_time: SystemTime,
88}
89
90impl Default for ProductionMonitor {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96impl ProductionMonitor {
97 pub fn new() -> Self {
98 Self {
99 metrics: Arc::new(RwLock::new(MonitoringMetrics {
100 connections: ConnectionMetrics {
101 active_connections: 0,
102 total_connections: 0,
103 failed_connections: 0,
104 connection_uptime_avg: Duration::ZERO,
105 last_connection_time: None,
106 },
107 operations: OperationMetrics {
108 total_reads: 0,
109 total_writes: 0,
110 successful_reads: 0,
111 successful_writes: 0,
112 failed_reads: 0,
113 failed_writes: 0,
114 batch_operations: 0,
115 subscription_updates: 0,
116 },
117 performance: PerformanceMetrics {
118 avg_read_latency_ms: 0.0,
119 avg_write_latency_ms: 0.0,
120 max_read_latency_ms: 0.0,
121 max_write_latency_ms: 0.0,
122 reads_per_second: 0.0,
123 writes_per_second: 0.0,
124 memory_usage_mb: 0.0,
125 cpu_usage_percent: 0.0,
126 },
127 errors: ErrorMetrics {
128 network_errors: 0,
129 protocol_errors: 0,
130 timeout_errors: 0,
131 tag_not_found_errors: 0,
132 data_type_errors: 0,
133 last_error_time: None,
134 last_error_message: None,
135 },
136 health: HealthMetrics {
137 overall_health: HealthStatus::Unknown,
138 last_health_check: SystemTime::now(),
139 consecutive_failures: 0,
140 recovery_attempts: 0,
141 system_uptime: Duration::ZERO,
142 },
143 })),
144 start_time: Instant::now(),
145 system_start_time: SystemTime::now(),
146 }
147 }
148
149 pub async fn record_read_success(&self, latency: Duration) {
151 let mut metrics = self.metrics.write().await;
152 metrics.operations.total_reads += 1;
153 metrics.operations.successful_reads += 1;
154
155 let latency_ms = latency.as_millis() as f64;
157 metrics.performance.avg_read_latency_ms = (metrics.performance.avg_read_latency_ms
158 * (metrics.operations.successful_reads - 1) as f64
159 + latency_ms)
160 / metrics.operations.successful_reads as f64;
161
162 if latency_ms > metrics.performance.max_read_latency_ms {
163 metrics.performance.max_read_latency_ms = latency_ms;
164 }
165 }
166
167 pub async fn record_read_failure(&self, error_type: &str) {
169 let mut metrics = self.metrics.write().await;
170 metrics.operations.total_reads += 1;
171 metrics.operations.failed_reads += 1;
172 self.record_error(&mut metrics, error_type).await;
173 }
174
175 pub async fn record_write_success(&self, latency: Duration) {
177 let mut metrics = self.metrics.write().await;
178 metrics.operations.total_writes += 1;
179 metrics.operations.successful_writes += 1;
180
181 let latency_ms = latency.as_millis() as f64;
183 metrics.performance.avg_write_latency_ms = (metrics.performance.avg_write_latency_ms
184 * (metrics.operations.successful_writes - 1) as f64
185 + latency_ms)
186 / metrics.operations.successful_writes as f64;
187
188 if latency_ms > metrics.performance.max_write_latency_ms {
189 metrics.performance.max_write_latency_ms = latency_ms;
190 }
191 }
192
193 pub async fn record_write_failure(&self, error_type: &str) {
195 let mut metrics = self.metrics.write().await;
196 metrics.operations.total_writes += 1;
197 metrics.operations.failed_writes += 1;
198 self.record_error(&mut metrics, error_type).await;
199 }
200
201 pub async fn record_connection(&self, success: bool) {
203 let mut metrics = self.metrics.write().await;
204 if success {
205 metrics.connections.total_connections += 1;
206 metrics.connections.active_connections += 1;
207 metrics.connections.last_connection_time = Some(SystemTime::now());
208 } else {
209 metrics.connections.failed_connections += 1;
210 }
211 }
212
213 pub async fn record_disconnection(&self) {
215 let mut metrics = self.metrics.write().await;
216 if metrics.connections.active_connections > 0 {
217 metrics.connections.active_connections -= 1;
218 }
219 }
220
221 async fn record_error(&self, metrics: &mut MonitoringMetrics, error_type: &str) {
223 match error_type {
224 "network" => metrics.errors.network_errors += 1,
225 "protocol" => metrics.errors.protocol_errors += 1,
226 "timeout" => metrics.errors.timeout_errors += 1,
227 "tag_not_found" => metrics.errors.tag_not_found_errors += 1,
228 "data_type" => metrics.errors.data_type_errors += 1,
229 _ => {}
230 }
231
232 metrics.errors.last_error_time = Some(SystemTime::now());
233 metrics.errors.last_error_message = Some(error_type.to_string());
234 metrics.health.consecutive_failures += 1;
235 }
236
237 pub async fn get_metrics(&self) -> MonitoringMetrics {
239 let mut metrics = self.metrics.read().await.clone();
240
241 metrics.health.system_uptime = self.start_time.elapsed();
243
244 let total_time = metrics.health.system_uptime.as_secs_f64();
246 if total_time > 0.0 {
247 metrics.performance.reads_per_second =
248 metrics.operations.successful_reads as f64 / total_time;
249 metrics.performance.writes_per_second =
250 metrics.operations.successful_writes as f64 / total_time;
251 }
252
253 metrics.health.overall_health = self.calculate_health_status(&metrics);
255 metrics.health.last_health_check = SystemTime::now();
256
257 metrics
258 }
259
260 fn calculate_health_status(&self, metrics: &MonitoringMetrics) -> HealthStatus {
262 let error_rate = if metrics.operations.total_reads + metrics.operations.total_writes > 0 {
263 (metrics.operations.failed_reads + metrics.operations.failed_writes) as f64
264 / (metrics.operations.total_reads + metrics.operations.total_writes) as f64
265 } else {
266 0.0
267 };
268
269 if error_rate > 0.1 || metrics.health.consecutive_failures > 10 {
270 HealthStatus::Critical
271 } else if error_rate > 0.05 || metrics.health.consecutive_failures > 5 {
272 HealthStatus::Warning
273 } else if metrics.connections.active_connections > 0 {
274 HealthStatus::Healthy
275 } else {
276 HealthStatus::Unknown
277 }
278 }
279
280 pub async fn start_monitoring(&self) {
282 let monitor = self.clone();
283 tokio::spawn(async move {
284 let mut interval = interval(Duration::from_secs(30));
285 loop {
286 interval.tick().await;
287 monitor.update_system_metrics().await;
288 }
289 });
290 }
291
292 async fn update_system_metrics(&self) {
294 let mut metrics = self.metrics.write().await;
295
296 metrics.performance.memory_usage_mb = self.get_memory_usage();
298
299 metrics.performance.cpu_usage_percent = self.get_cpu_usage();
301 }
302
303 fn get_memory_usage(&self) -> f64 {
305 10.0
308 }
309
310 fn get_cpu_usage(&self) -> f64 {
312 5.0
315 }
316
317 pub async fn reset_consecutive_failures(&self) {
319 let mut metrics = self.metrics.write().await;
320 metrics.health.consecutive_failures = 0;
321 metrics.health.recovery_attempts += 1;
322 }
323}
324
325impl Clone for ProductionMonitor {
326 fn clone(&self) -> Self {
327 Self {
328 metrics: Arc::clone(&self.metrics),
329 start_time: self.start_time,
330 system_start_time: self.system_start_time,
331 }
332 }
333}