Skip to main content

do_memory_mcp/monitoring/
core.rs

1//! Core monitoring system implementation
2
3use super::types::{
4    ComponentHealth, HealthCheck, HealthStatus, MonitoringConfig, MonitoringStats,
5    PerformanceMetrics, RequestMetrics, SystemPerformance,
6};
7use parking_lot::RwLock;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::{Instant, SystemTime, UNIX_EPOCH};
11use tokio::sync::Mutex;
12use tracing::debug;
13
14/// Core monitoring system
15pub struct MonitoringSystem {
16    /// Configuration
17    config: MonitoringConfig,
18    /// Current statistics
19    stats: Arc<RwLock<MonitoringStats>>,
20    /// Performance metrics
21    performance: Arc<RwLock<PerformanceMetrics>>,
22    /// Start time for uptime calculation
23    start_time: Instant,
24    /// Active request tracking
25    active_requests: Arc<Mutex<HashMap<String, RequestMetrics>>>,
26}
27
28impl MonitoringSystem {
29    /// Create a new monitoring system
30    pub fn new(config: MonitoringConfig) -> Self {
31        let start_time = Instant::now();
32        let now = SystemTime::now()
33            .duration_since(UNIX_EPOCH)
34            .unwrap_or_default()
35            .as_secs();
36
37        let stats = MonitoringStats {
38            uptime_seconds: 0,
39            total_requests: 0,
40            successful_requests: 0,
41            failed_requests: 0,
42            avg_response_time_ms: 0.0,
43            memory_usage_mb: 0.0,
44            cpu_usage_percent: 0.0,
45            episode_metrics: Default::default(),
46            last_health_check: now,
47            health_status: HealthStatus::Healthy,
48        };
49
50        let performance = PerformanceMetrics {
51            tool_metrics: HashMap::new(),
52            system_performance: SystemPerformance::default(),
53        };
54
55        Self {
56            config,
57            stats: Arc::new(RwLock::new(stats)),
58            performance: Arc::new(RwLock::new(performance)),
59            start_time,
60            active_requests: Arc::new(Mutex::new(HashMap::new())),
61        }
62    }
63
64    /// Start request tracking
65    pub async fn start_request(&self, request_id: String, tool_name: String) -> String {
66        if !self.config.enabled {
67            return request_id.clone();
68        }
69
70        let start_time = SystemTime::now()
71            .duration_since(UNIX_EPOCH)
72            .unwrap_or_default()
73            .as_secs();
74
75        let metrics = RequestMetrics {
76            request_id: request_id.clone(),
77            tool_name,
78            start_time,
79            end_time: 0,
80            success: false,
81            response_time_ms: 0,
82            error_message: None,
83        };
84
85        let mut active = self.active_requests.lock().await;
86        active.insert(request_id.clone(), metrics);
87
88        debug!("Started tracking request: {}", request_id);
89        request_id
90    }
91
92    /// End request tracking
93    pub async fn end_request(
94        &self,
95        request_id: &str,
96        success: bool,
97        error_message: Option<String>,
98    ) {
99        if !self.config.enabled {
100            return;
101        }
102
103        let end_time = SystemTime::now()
104            .duration_since(UNIX_EPOCH)
105            .unwrap_or_default()
106            .as_secs();
107
108        let mut active = self.active_requests.lock().await;
109        if let Some(mut metrics) = active.remove(request_id) {
110            metrics.end_time = end_time;
111            metrics.success = success;
112            // Avoid underflow if system clock moved backwards; use saturating operations
113            let elapsed_secs = end_time.saturating_sub(metrics.start_time);
114            metrics.response_time_ms = elapsed_secs.saturating_mul(1000); // Convert to ms
115            metrics.error_message = error_message;
116
117            // Update stats
118            {
119                let mut stats = self.stats.write();
120                stats.record_request(success, metrics.response_time_ms);
121            }
122
123            // Update performance metrics
124            {
125                let mut perf = self.performance.write();
126                let tool_perf = perf
127                    .tool_metrics
128                    .entry(metrics.tool_name.clone())
129                    .or_default();
130
131                tool_perf.total_calls += 1;
132                if success {
133                    tool_perf.successful_calls += 1;
134                } else {
135                    tool_perf.failed_calls += 1;
136                }
137
138                // Update response time stats
139                let total_calls = tool_perf.total_calls as f64;
140                tool_perf.avg_response_time_ms = (tool_perf.avg_response_time_ms
141                    * (total_calls - 1.0)
142                    + metrics.response_time_ms as f64)
143                    / total_calls;
144
145                tool_perf.min_response_time_ms =
146                    tool_perf.min_response_time_ms.min(metrics.response_time_ms);
147                tool_perf.max_response_time_ms =
148                    tool_perf.max_response_time_ms.max(metrics.response_time_ms);
149
150                tool_perf.success_rate =
151                    (tool_perf.successful_calls as f64 / tool_perf.total_calls as f64) * 100.0;
152            }
153
154            debug!(
155                "Ended tracking request: {} (success: {}, time: {}ms)",
156                request_id, success, metrics.response_time_ms
157            );
158        }
159    }
160
161    /// Record episode creation
162    pub fn record_episode_creation(&self, success: bool) {
163        if !self.config.enabled || !self.config.enable_episode_tracking {
164            return;
165        }
166
167        let mut stats = self.stats.write();
168        stats.record_episode_creation(success);
169
170        debug!("Recorded episode creation (success: {})", success);
171    }
172
173    /// Update system metrics
174    pub fn update_system_metrics(&self, memory_mb: f64, cpu_percent: f64) {
175        if !self.config.enabled {
176            return;
177        }
178
179        let mut stats = self.stats.write();
180        stats.update_system_metrics(memory_mb, cpu_percent);
181
182        let mut perf = self.performance.write();
183        perf.system_performance.memory_usage_mb = memory_mb;
184        perf.system_performance.cpu_usage_percent = cpu_percent;
185        perf.system_performance.uptime_seconds = self.start_time.elapsed().as_secs();
186
187        debug!(
188            "Updated system metrics: memory={}MB, cpu={}%%",
189            memory_mb, cpu_percent
190        );
191    }
192
193    /// Update uptime
194    pub fn update_uptime(&self) {
195        if !self.config.enabled {
196            return;
197        }
198
199        let uptime = self.start_time.elapsed().as_secs();
200        let mut stats = self.stats.write();
201        stats.update_uptime(uptime);
202    }
203
204    /// Get current monitoring statistics
205    pub fn get_stats(&self) -> MonitoringStats {
206        self.stats.read().clone()
207    }
208
209    /// Get performance metrics
210    pub fn get_performance(&self) -> PerformanceMetrics {
211        self.performance.read().clone()
212    }
213
214    /// Perform health check
215    pub async fn health_check(&self) -> HealthCheck {
216        let now = SystemTime::now()
217            .duration_since(UNIX_EPOCH)
218            .unwrap_or_default()
219            .as_secs();
220
221        let mut components = HashMap::new();
222        let mut overall_status = HealthStatus::Healthy;
223
224        // Check memory usage
225        let memory_status = self.check_memory_health();
226        components.insert(
227            "memory".to_string(),
228            ComponentHealth {
229                name: "memory".to_string(),
230                status: memory_status.clone(),
231                details: Some(format!(
232                    "Memory usage: {:.1}MB",
233                    self.stats.read().memory_usage_mb
234                )),
235                last_check: now,
236            },
237        );
238        if matches!(memory_status, HealthStatus::Unhealthy { .. }) {
239            overall_status = memory_status;
240        }
241
242        // Check CPU usage
243        let cpu_status = self.check_cpu_health();
244        components.insert(
245            "cpu".to_string(),
246            ComponentHealth {
247                name: "cpu".to_string(),
248                status: cpu_status.clone(),
249                details: Some(format!(
250                    "CPU usage: {:.1}%",
251                    self.stats.read().cpu_usage_percent
252                )),
253                last_check: now,
254            },
255        );
256        if matches!(cpu_status, HealthStatus::Unhealthy { .. })
257            && matches!(overall_status, HealthStatus::Healthy)
258        {
259            overall_status = cpu_status;
260        }
261
262        // Check request success rate
263        let request_status = self.check_request_health();
264        components.insert(
265            "requests".to_string(),
266            ComponentHealth {
267                name: "requests".to_string(),
268                status: request_status.clone(),
269                details: Some(format!(
270                    "Success rate: {:.1}%",
271                    self.calculate_success_rate()
272                )),
273                last_check: now,
274            },
275        );
276        if matches!(request_status, HealthStatus::Unhealthy { .. })
277            && matches!(overall_status, HealthStatus::Healthy)
278        {
279            overall_status = request_status;
280        }
281
282        // Update last health check time
283        {
284            let mut stats = self.stats.write();
285            stats.last_health_check = now;
286            stats.health_status = overall_status.clone();
287        }
288
289        HealthCheck {
290            status: overall_status,
291            components,
292            timestamp: now,
293        }
294    }
295
296    /// Check memory health
297    fn check_memory_health(&self) -> HealthStatus {
298        let memory_mb = self.stats.read().memory_usage_mb;
299
300        if memory_mb > 500.0 {
301            // Over 500MB
302            HealthStatus::Unhealthy {
303                message: format!("High memory usage: {:.1}MB", memory_mb),
304            }
305        } else if memory_mb > 200.0 {
306            // Over 200MB
307            HealthStatus::Warning {
308                message: format!("Elevated memory usage: {:.1}MB", memory_mb),
309            }
310        } else {
311            HealthStatus::Healthy
312        }
313    }
314
315    /// Check CPU health
316    fn check_cpu_health(&self) -> HealthStatus {
317        let cpu_percent = self.stats.read().cpu_usage_percent;
318
319        if cpu_percent > 90.0 {
320            // Over 90%
321            HealthStatus::Unhealthy {
322                message: format!("High CPU usage: {:.1}%", cpu_percent),
323            }
324        } else if cpu_percent > 70.0 {
325            // Over 70%
326            HealthStatus::Warning {
327                message: format!("Elevated CPU usage: {:.1}%", cpu_percent),
328            }
329        } else {
330            HealthStatus::Healthy
331        }
332    }
333
334    /// Check request health
335    fn check_request_health(&self) -> HealthStatus {
336        let success_rate = self.calculate_success_rate();
337
338        if success_rate < 80.0 {
339            // Below 80%
340            HealthStatus::Unhealthy {
341                message: format!("Low success rate: {:.1}%", success_rate),
342            }
343        } else if success_rate < 95.0 {
344            // Below 95%
345            HealthStatus::Warning {
346                message: format!("Moderate success rate: {:.1}%", success_rate),
347            }
348        } else {
349            HealthStatus::Healthy
350        }
351    }
352
353    /// Calculate overall success rate
354    fn calculate_success_rate(&self) -> f64 {
355        let stats = self.stats.read();
356        if stats.total_requests == 0 {
357            100.0
358        } else {
359            (stats.successful_requests as f64 / stats.total_requests as f64) * 100.0
360        }
361    }
362
363    /// Get active request count
364    pub async fn active_request_count(&self) -> usize {
365        let active = self.active_requests.lock().await;
366        active.len()
367    }
368
369    /// Get configuration
370    pub fn config(&self) -> &MonitoringConfig {
371        &self.config
372    }
373}