pulseengine_mcp_monitoring/
collector.rs1use crate::{
4 config::MonitoringConfig,
5 metrics::{ServerMetrics, SystemMetrics},
6};
7use pulseengine_mcp_protocol::{Error, Request, Response};
8use std::collections::VecDeque;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, Mutex};
11use sysinfo::System;
12use tokio::sync::RwLock;
13use tokio::time::{Duration, Instant};
14
15#[derive(Debug, Clone)]
17pub struct RequestContext {
18 pub request_id: uuid::Uuid,
19}
20
21#[derive(Clone)]
23struct ResponseTimeHistogram {
24 values: Arc<Mutex<VecDeque<f64>>>,
25 max_size: usize,
26}
27
28impl ResponseTimeHistogram {
29 fn new(max_size: usize) -> Self {
30 Self {
31 values: Arc::new(Mutex::new(VecDeque::with_capacity(max_size))),
32 max_size,
33 }
34 }
35
36 fn record(&self, value: f64) {
37 let mut values = self.values.lock().unwrap();
38 values.push_back(value);
39 if values.len() > self.max_size {
40 values.pop_front();
41 }
42 }
43
44 fn get_average(&self) -> f64 {
45 let values = self.values.lock().unwrap();
46 if values.is_empty() {
47 0.0
48 } else {
49 let sum: f64 = values.iter().sum();
50 sum / values.len() as f64
51 }
52 }
53}
54
55pub struct MetricsCollector {
57 config: MonitoringConfig,
58 start_time: Instant,
59 request_count: Arc<AtomicU64>,
60 error_count: Arc<AtomicU64>,
61 active_connections: Arc<AtomicU64>,
62 response_times: ResponseTimeHistogram,
63 system: Arc<RwLock<System>>,
64 collection_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
65}
66
67impl MetricsCollector {
68 pub fn new(config: MonitoringConfig) -> Self {
69 let mut system = System::new_all();
70 system.refresh_all();
71
72 Self {
73 config,
74 start_time: Instant::now(),
75 request_count: Arc::new(AtomicU64::new(0)),
76 error_count: Arc::new(AtomicU64::new(0)),
77 active_connections: Arc::new(AtomicU64::new(0)),
78 response_times: ResponseTimeHistogram::new(1000), system: Arc::new(RwLock::new(system)),
80 collection_handle: Arc::new(RwLock::new(None)),
81 }
82 }
83
84 pub async fn start_collection(&self) {
85 if self.config.enabled {
86 let system = self.system.clone();
87 let interval_secs = self.config.collection_interval_secs;
88
89 let handle = tokio::spawn(async move {
90 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
91
92 loop {
93 interval.tick().await;
94
95 let mut sys = system.write().await;
97 sys.refresh_all();
98 }
99 });
100
101 let mut handle_guard = self.collection_handle.write().await;
103 *handle_guard = Some(handle);
104
105 tracing::info!(
106 "Started metrics collection with {}s interval",
107 interval_secs
108 );
109 } else {
110 tracing::info!("Metrics collection is disabled");
111 }
112 }
113
114 pub async fn stop_collection(&self) {
115 let mut handle_guard = self.collection_handle.write().await;
116 if let Some(handle) = handle_guard.take() {
117 handle.abort();
118 tracing::info!("Stopped metrics collection");
119 }
120 }
121
122 pub fn increment_connections(&self) {
124 self.active_connections.fetch_add(1, Ordering::Relaxed);
125 }
126
127 pub fn decrement_connections(&self) {
129 self.active_connections.fetch_sub(1, Ordering::Relaxed);
130 }
131
132 pub fn process_request(
139 &self,
140 request: Request,
141 _context: &RequestContext,
142 ) -> Result<Request, Error> {
143 if self.config.enabled {
144 self.request_count.fetch_add(1, Ordering::Relaxed);
145 }
146 Ok(request)
147 }
148
149 pub fn process_response(
156 &self,
157 response: Response,
158 context: &RequestContext,
159 ) -> Result<Response, Error> {
160 if self.config.enabled {
161 if response.error.is_some() {
162 self.error_count.fetch_add(1, Ordering::Relaxed);
163 }
164
165 let simulated_response_time = 10.0 + (context.request_id.as_u128() % 50) as f64;
168 self.response_times.record(simulated_response_time);
169 }
170 Ok(response)
171 }
172
173 pub async fn get_current_metrics(&self) -> ServerMetrics {
174 let uptime_seconds = self.start_time.elapsed().as_secs();
175 let requests_total = self.request_count.load(Ordering::Relaxed);
176 let errors_total = self.error_count.load(Ordering::Relaxed);
177 let active_connections = self.active_connections.load(Ordering::Relaxed);
178
179 let memory_usage_bytes = if self.config.enabled {
181 let sys = self.system.read().await;
182 sys.used_memory()
184 } else {
185 0
186 };
187
188 ServerMetrics {
189 requests_total,
190 requests_per_second: if uptime_seconds > 0 {
191 #[allow(clippy::cast_precision_loss)]
192 {
193 requests_total as f64 / uptime_seconds as f64
194 }
195 } else {
196 0.0
197 },
198 average_response_time_ms: self.response_times.get_average(),
199 error_rate: if requests_total > 0 {
200 #[allow(clippy::cast_precision_loss)]
201 {
202 errors_total as f64 / requests_total as f64
203 }
204 } else {
205 0.0
206 },
207 active_connections,
208 memory_usage_bytes,
209 uptime_seconds,
210 }
211 }
212
213 pub async fn get_system_metrics(&self) -> SystemMetrics {
215 let sys = self.system.read().await;
216 let load_avg = System::load_average();
217
218 SystemMetrics {
219 cpu_usage_percent: sys.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::<f32>()
220 / sys.cpus().len() as f32,
221 memory_total_bytes: sys.total_memory(),
222 memory_used_bytes: sys.used_memory(),
223 memory_available_bytes: sys.available_memory(),
224 swap_total_bytes: sys.total_swap(),
225 swap_used_bytes: sys.used_swap(),
226 load_average: crate::metrics::LoadAverage {
227 one: load_avg.one,
228 five: load_avg.five,
229 fifteen: load_avg.fifteen,
230 },
231 process_count: sys.processes().len() as u64,
232 }
233 }
234
235 pub fn get_uptime_seconds(&self) -> u64 {
236 self.start_time.elapsed().as_secs()
237 }
238}
239
240#[cfg(test)]
241#[path = "collector_tests.rs"]
242mod collector_tests;