codex_memory/performance/
metrics.rs1use anyhow::Result;
4use prometheus::{Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Registry};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10use tracing::debug;
11
12pub struct MetricsCollector {
14 registry: Registry,
15
16 operation_latency: HashMap<String, Histogram>,
18
19 requests_total: IntCounter,
21 requests_success: IntCounter,
22 requests_failed: IntCounter,
23
24 cpu_usage: Gauge,
26 memory_usage: Gauge,
27 db_connections: IntGauge,
28
29 cache_hits: IntCounter,
31 cache_misses: IntCounter,
32
33 custom_metrics: Arc<RwLock<HashMap<String, f64>>>,
35}
36
37impl MetricsCollector {
38 pub fn new() -> Result<Self> {
39 let registry = Registry::new();
40
41 let mut operation_latency = HashMap::new();
43
44 for operation in &["create", "read", "update", "delete", "search", "migrate"] {
45 let histogram = Histogram::with_opts(
46 HistogramOpts::new(
47 format!("{operation}_latency"),
48 format!("Latency for {operation} operations"),
49 )
50 .buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0]),
51 )?;
52
53 registry.register(Box::new(histogram.clone()))?;
54 operation_latency.insert(operation.to_string(), histogram);
55 }
56
57 let requests_total = IntCounter::new("requests_total", "Total number of requests")?;
59 let requests_success = IntCounter::new("requests_success", "Total successful requests")?;
60 let requests_failed = IntCounter::new("requests_failed", "Total failed requests")?;
61 let cache_hits = IntCounter::new("cache_hits", "Total cache hits")?;
62 let cache_misses = IntCounter::new("cache_misses", "Total cache misses")?;
63
64 registry.register(Box::new(requests_total.clone()))?;
65 registry.register(Box::new(requests_success.clone()))?;
66 registry.register(Box::new(requests_failed.clone()))?;
67 registry.register(Box::new(cache_hits.clone()))?;
68 registry.register(Box::new(cache_misses.clone()))?;
69
70 let cpu_usage = Gauge::new("cpu_usage_percent", "Current CPU usage percentage")?;
72 let memory_usage = Gauge::new("memory_usage_bytes", "Current memory usage in bytes")?;
73 let db_connections = IntGauge::new("db_connections_active", "Active database connections")?;
74
75 registry.register(Box::new(cpu_usage.clone()))?;
76 registry.register(Box::new(memory_usage.clone()))?;
77 registry.register(Box::new(db_connections.clone()))?;
78
79 Ok(Self {
80 registry,
81 operation_latency,
82 requests_total,
83 requests_success,
84 requests_failed,
85 cpu_usage,
86 memory_usage,
87 db_connections,
88 cache_hits,
89 cache_misses,
90 custom_metrics: Arc::new(RwLock::new(HashMap::new())),
91 })
92 }
93
94 pub fn record_latency(&self, operation: &str, duration: Duration) {
96 if let Some(histogram) = self.operation_latency.get(operation) {
97 histogram.observe(duration.as_secs_f64());
98 }
99 }
100
101 pub fn record_success(&self) {
103 self.requests_total.inc();
104 self.requests_success.inc();
105 }
106
107 pub fn record_failure(&self) {
109 self.requests_total.inc();
110 self.requests_failed.inc();
111 }
112
113 pub fn record_cache_hit(&self) {
115 self.cache_hits.inc();
116 }
117
118 pub fn record_cache_miss(&self) {
120 self.cache_misses.inc();
121 }
122
123 pub fn update_cpu_usage(&self, usage: f64) {
125 self.cpu_usage.set(usage);
126 }
127
128 pub fn update_memory_usage(&self, bytes: f64) {
130 self.memory_usage.set(bytes);
131 }
132
133 pub fn update_db_connections(&self, connections: i64) {
135 self.db_connections.set(connections);
136 }
137
138 pub async fn record_custom_metric(&self, name: &str, value: f64) {
140 let mut metrics = self.custom_metrics.write().await;
141 metrics.insert(name.to_string(), value);
142 }
143
144 pub async fn get_snapshot(&self) -> MetricsSnapshot {
146 let custom_metrics = self.custom_metrics.read().await;
147
148 let cache_hit_ratio = {
150 let hits = self.cache_hits.get() as f64;
151 let misses = self.cache_misses.get() as f64;
152 let total = hits + misses;
153
154 if total > 0.0 {
155 (hits / total) * 100.0
156 } else {
157 0.0
158 }
159 };
160
161 let success_rate = {
163 let success = self.requests_success.get() as f64;
164 let total = self.requests_total.get() as f64;
165
166 if total > 0.0 {
167 (success / total) * 100.0
168 } else {
169 100.0
170 }
171 };
172
173 MetricsSnapshot {
174 timestamp: Utc::now(),
175 requests_total: self.requests_total.get(),
176 requests_success: self.requests_success.get(),
177 requests_failed: self.requests_failed.get(),
178 success_rate,
179 cpu_usage: self.cpu_usage.get(),
180 memory_usage: self.memory_usage.get() as u64,
181 db_connections: self.db_connections.get(),
182 cache_hits: self.cache_hits.get(),
183 cache_misses: self.cache_misses.get(),
184 cache_hit_ratio,
185 custom_metrics: custom_metrics.clone(),
186 }
187 }
188
189 pub fn export_prometheus(&self) -> String {
191 use prometheus::Encoder;
192 let encoder = prometheus::TextEncoder::new();
193 let metric_families = self.registry.gather();
194 let mut buffer = Vec::new();
195 encoder.encode(&metric_families, &mut buffer).unwrap();
196 String::from_utf8(buffer).unwrap()
197 }
198
199 pub async fn reset(&self) {
201 self.requests_total.reset();
202 self.requests_success.reset();
203 self.requests_failed.reset();
204 self.cache_hits.reset();
205 self.cache_misses.reset();
206 self.cpu_usage.set(0.0);
207 self.memory_usage.set(0.0);
208 self.db_connections.set(0);
209
210 let mut custom = self.custom_metrics.write().await;
211 custom.clear();
212 }
213}
214
215#[derive(Debug, Clone, Serialize, Deserialize)]
217pub struct MetricsSnapshot {
218 pub timestamp: chrono::DateTime<chrono::Utc>,
219 pub requests_total: u64,
220 pub requests_success: u64,
221 pub requests_failed: u64,
222 pub success_rate: f64,
223 pub cpu_usage: f64,
224 pub memory_usage: u64,
225 pub db_connections: i64,
226 pub cache_hits: u64,
227 pub cache_misses: u64,
228 pub cache_hit_ratio: f64,
229 pub custom_metrics: HashMap<String, f64>,
230}
231
232pub struct OperationTimer<'a> {
234 collector: &'a MetricsCollector,
235 operation: String,
236 start: Instant,
237}
238
239impl<'a> OperationTimer<'a> {
240 pub fn new(collector: &'a MetricsCollector, operation: &str) -> Self {
241 Self {
242 collector,
243 operation: operation.to_string(),
244 start: Instant::now(),
245 }
246 }
247}
248
249impl<'a> Drop for OperationTimer<'a> {
250 fn drop(&mut self) {
251 let duration = self.start.elapsed();
252 self.collector.record_latency(&self.operation, duration);
253 debug!("Operation '{}' took {:?}", self.operation, duration);
254 }
255}
256
257pub struct PerformanceAnalyzer;
259
260impl PerformanceAnalyzer {
261 pub fn analyze(snapshot: &MetricsSnapshot) -> PerformanceAnalysis {
263 let mut issues = Vec::new();
264 let mut recommendations = Vec::new();
265
266 if snapshot.success_rate < 99.0 {
268 issues.push(format!(
269 "Low success rate: {:.2}% (target: >99%)",
270 snapshot.success_rate
271 ));
272 recommendations.push("Investigate error logs and increase error handling".to_string());
273 }
274
275 if snapshot.cache_hit_ratio < 80.0 && snapshot.cache_hits + snapshot.cache_misses > 100 {
277 issues.push(format!(
278 "Low cache hit ratio: {:.2}% (target: >80%)",
279 snapshot.cache_hit_ratio
280 ));
281 recommendations.push("Review cache configuration and increase cache size".to_string());
282 }
283
284 if snapshot.cpu_usage > 80.0 {
286 issues.push(format!(
287 "High CPU usage: {:.2}% (threshold: 80%)",
288 snapshot.cpu_usage
289 ));
290 recommendations.push("Profile CPU usage and optimize hot paths".to_string());
291 }
292
293 let memory_gb = snapshot.memory_usage as f64 / (1024.0 * 1024.0 * 1024.0);
295 if memory_gb > 4.0 {
296 issues.push(format!(
297 "High memory usage: {memory_gb:.2} GB (threshold: 4 GB)"
298 ));
299 recommendations
300 .push("Investigate memory leaks and optimize data structures".to_string());
301 }
302
303 if snapshot.db_connections > 80 {
305 issues.push(format!(
306 "High database connection count: {} (threshold: 80)",
307 snapshot.db_connections
308 ));
309 recommendations.push("Review connection pooling and query optimization".to_string());
310 }
311
312 let health = if issues.is_empty() {
313 PerformanceHealth::Good
314 } else if issues.len() <= 2 {
315 PerformanceHealth::Warning
316 } else {
317 PerformanceHealth::Critical
318 };
319
320 PerformanceAnalysis {
321 timestamp: snapshot.timestamp,
322 health,
323 issues,
324 recommendations,
325 metrics_summary: format!(
326 "Requests: {} | Success Rate: {:.2}% | Cache Hit: {:.2}% | CPU: {:.2}% | Memory: {:.2} GB",
327 snapshot.requests_total,
328 snapshot.success_rate,
329 snapshot.cache_hit_ratio,
330 snapshot.cpu_usage,
331 memory_gb
332 ),
333 }
334 }
335}
336
337#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct PerformanceAnalysis {
340 pub timestamp: chrono::DateTime<chrono::Utc>,
341 pub health: PerformanceHealth,
342 pub issues: Vec<String>,
343 pub recommendations: Vec<String>,
344 pub metrics_summary: String,
345}
346
347#[derive(Debug, Clone, Serialize, Deserialize)]
349pub enum PerformanceHealth {
350 Good,
351 Warning,
352 Critical,
353}
354
355use chrono::Utc;
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[tokio::test]
362 async fn test_metrics_collector() {
363 let collector = MetricsCollector::new().unwrap();
364
365 collector.record_success();
367 collector.record_success();
368 collector.record_failure();
369 collector.record_cache_hit();
370 collector.record_cache_miss();
371 collector.update_cpu_usage(50.0);
372 collector.update_memory_usage(1_000_000_000.0);
373
374 let snapshot = collector.get_snapshot().await;
376
377 assert_eq!(snapshot.requests_total, 3);
378 assert_eq!(snapshot.requests_success, 2);
379 assert_eq!(snapshot.requests_failed, 1);
380 assert_eq!(snapshot.cache_hits, 1);
381 assert_eq!(snapshot.cache_misses, 1);
382 assert_eq!(snapshot.cache_hit_ratio, 50.0);
383 assert_eq!(snapshot.cpu_usage, 50.0);
384 }
385
386 #[tokio::test]
387 async fn test_operation_timer() {
388 let collector = MetricsCollector::new().unwrap();
389
390 {
391 let _timer = OperationTimer::new(&collector, "read");
392 std::thread::sleep(std::time::Duration::from_millis(10));
393 }
394
395 }
398
399 #[tokio::test]
400 async fn test_performance_analyzer() {
401 let mut snapshot = MetricsSnapshot {
402 timestamp: Utc::now(),
403 requests_total: 1000,
404 requests_success: 950,
405 requests_failed: 50,
406 success_rate: 95.0,
407 cpu_usage: 85.0,
408 memory_usage: 5_000_000_000,
409 db_connections: 90,
410 cache_hits: 200,
411 cache_misses: 100,
412 cache_hit_ratio: 66.67,
413 custom_metrics: HashMap::new(),
414 };
415
416 let analysis = PerformanceAnalyzer::analyze(&snapshot);
417
418 assert!(matches!(analysis.health, PerformanceHealth::Critical));
419 assert!(!analysis.issues.is_empty());
420 assert!(!analysis.recommendations.is_empty());
421
422 snapshot.success_rate = 99.5;
424 snapshot.cpu_usage = 50.0;
425 snapshot.memory_usage = 1_000_000_000;
426 snapshot.db_connections = 20;
427 snapshot.cache_hit_ratio = 90.0;
428
429 let analysis = PerformanceAnalyzer::analyze(&snapshot);
430 assert!(matches!(analysis.health, PerformanceHealth::Good));
431 assert!(analysis.issues.is_empty());
432 }
433}