pulseengine_mcp_monitoring/
collector.rs

1//! Metrics collector implementation
2
3use crate::{
4    config::MonitoringConfig,
5    metrics::{ServerMetrics, SystemMetrics},
6};
7use pulseengine_mcp_protocol::{Error, Request, Response};
8use std::collections::VecDeque;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use sysinfo::System;
12use tokio::sync::RwLock;
13use tokio::time::{Duration, Instant};
14
15/// Simple request context for monitoring
16#[derive(Debug, Clone)]
17pub struct RequestContext {
18    pub request_id: uuid::Uuid,
19}
20
21/// Response time histogram for percentile calculations
22#[derive(Clone)]
23struct ResponseTimeHistogram {
24    values: Arc<Mutex<VecDeque<f64>>>,
25    max_size: usize,
26}
27
28impl ResponseTimeHistogram {
29    fn new(max_size: usize) -> Self {
30        Self {
31            values: Arc::new(Mutex::new(VecDeque::with_capacity(max_size))),
32            max_size,
33        }
34    }
35
36    fn record(&self, value: f64) {
37        let mut values = self.values.lock().unwrap();
38        values.push_back(value);
39        if values.len() > self.max_size {
40            values.pop_front();
41        }
42    }
43
44    fn get_average(&self) -> f64 {
45        let values = self.values.lock().unwrap();
46        if values.is_empty() {
47            0.0
48        } else {
49            let sum: f64 = values.iter().sum();
50            sum / values.len() as f64
51        }
52    }
53}
54
55/// Metrics collector for MCP server
56pub struct MetricsCollector {
57    config: MonitoringConfig,
58    start_time: Instant,
59    request_count: Arc<AtomicU64>,
60    error_count: Arc<AtomicU64>,
61    active_connections: Arc<AtomicU64>,
62    response_times: ResponseTimeHistogram,
63    system: Arc<RwLock<System>>,
64    collection_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
65}
66
67impl MetricsCollector {
68    pub fn new(config: MonitoringConfig) -> Self {
69        let mut system = System::new_all();
70        system.refresh_all();
71
72        Self {
73            config,
74            start_time: Instant::now(),
75            request_count: Arc::new(AtomicU64::new(0)),
76            error_count: Arc::new(AtomicU64::new(0)),
77            active_connections: Arc::new(AtomicU64::new(0)),
78            response_times: ResponseTimeHistogram::new(1000), // Keep last 1000 response times
79            system: Arc::new(RwLock::new(system)),
80            collection_handle: Arc::new(RwLock::new(None)),
81        }
82    }
83
84    pub async fn start_collection(&self) {
85        if self.config.enabled {
86            let system = self.system.clone();
87            let interval_secs = self.config.collection_interval_secs;
88
89            let handle = tokio::spawn(async move {
90                let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
91
92                loop {
93                    interval.tick().await;
94
95                    // Refresh system information
96                    let mut sys = system.write().await;
97                    sys.refresh_all();
98                }
99            });
100
101            // Store the handle
102            let mut handle_guard = self.collection_handle.write().await;
103            *handle_guard = Some(handle);
104
105            tracing::info!(
106                "Started metrics collection with {}s interval",
107                interval_secs
108            );
109        } else {
110            tracing::info!("Metrics collection is disabled");
111        }
112    }
113
114    pub async fn stop_collection(&self) {
115        let mut handle_guard = self.collection_handle.write().await;
116        if let Some(handle) = handle_guard.take() {
117            handle.abort();
118            tracing::info!("Stopped metrics collection");
119        }
120    }
121
122    /// Increment active connections
123    pub fn increment_connections(&self) {
124        self.active_connections.fetch_add(1, Ordering::Relaxed);
125    }
126
127    /// Decrement active connections
128    pub fn decrement_connections(&self) {
129        self.active_connections.fetch_sub(1, Ordering::Relaxed);
130    }
131
132    /// Process a request and update metrics
133    ///
134    /// # Errors
135    ///
136    /// This function currently never returns an error, but the signature allows for
137    /// future error handling in metrics processing
138    pub fn process_request(
139        &self,
140        request: Request,
141        _context: &RequestContext,
142    ) -> Result<Request, Error> {
143        if self.config.enabled {
144            self.request_count.fetch_add(1, Ordering::Relaxed);
145        }
146        Ok(request)
147    }
148
149    /// Process a response and update error metrics
150    ///
151    /// # Errors
152    ///
153    /// This function currently never returns an error, but the signature allows for
154    /// future error handling in metrics processing
155    pub fn process_response(
156        &self,
157        response: Response,
158        context: &RequestContext,
159    ) -> Result<Response, Error> {
160        if self.config.enabled {
161            if response.error.is_some() {
162                self.error_count.fetch_add(1, Ordering::Relaxed);
163            }
164
165            // In a real implementation, we'd track request start time in context
166            // For now, record a simulated response time
167            let simulated_response_time = 10.0 + (context.request_id.as_u128() % 50) as f64;
168            self.response_times.record(simulated_response_time);
169        }
170        Ok(response)
171    }
172
173    pub async fn get_current_metrics(&self) -> ServerMetrics {
174        let uptime_seconds = self.start_time.elapsed().as_secs();
175        let requests_total = self.request_count.load(Ordering::Relaxed);
176        let errors_total = self.error_count.load(Ordering::Relaxed);
177        let active_connections = self.active_connections.load(Ordering::Relaxed);
178
179        // Get system metrics
180        let memory_usage_bytes = if self.config.enabled {
181            let sys = self.system.read().await;
182            // Get total system used memory
183            sys.used_memory()
184        } else {
185            0
186        };
187
188        ServerMetrics {
189            requests_total,
190            requests_per_second: if uptime_seconds > 0 {
191                #[allow(clippy::cast_precision_loss)]
192                {
193                    requests_total as f64 / uptime_seconds as f64
194                }
195            } else {
196                0.0
197            },
198            average_response_time_ms: self.response_times.get_average(),
199            error_rate: if requests_total > 0 {
200                #[allow(clippy::cast_precision_loss)]
201                {
202                    errors_total as f64 / requests_total as f64
203                }
204            } else {
205                0.0
206            },
207            active_connections,
208            memory_usage_bytes,
209            uptime_seconds,
210        }
211    }
212
213    /// Get detailed system metrics
214    pub async fn get_system_metrics(&self) -> SystemMetrics {
215        let sys = self.system.read().await;
216        let load_avg = System::load_average();
217
218        SystemMetrics {
219            cpu_usage_percent: sys.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::<f32>()
220                / sys.cpus().len() as f32,
221            memory_total_bytes: sys.total_memory(),
222            memory_used_bytes: sys.used_memory(),
223            memory_available_bytes: sys.available_memory(),
224            swap_total_bytes: sys.total_swap(),
225            swap_used_bytes: sys.used_swap(),
226            load_average: crate::metrics::LoadAverage {
227                one: load_avg.one,
228                five: load_avg.five,
229                fifteen: load_avg.fifteen,
230            },
231            process_count: sys.processes().len() as u64,
232        }
233    }
234
235    pub fn get_uptime_seconds(&self) -> u64 {
236        self.start_time.elapsed().as_secs()
237    }
238}
239
240#[cfg(test)]
241#[path = "collector_tests.rs"]
242mod collector_tests;