codex_memory/monitoring/
metrics.rs

1use prometheus::{
2    exponential_buckets, linear_buckets, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge,
3    Opts, Registry,
4};
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::{error, info, warn};
8use uuid::Uuid;
9
10/// Prometheus metrics collector for the memory system
11pub struct MetricsCollector {
12    registry: Arc<Registry>,
13
14    // Request metrics
15    pub requests_total: IntCounter,
16    pub requests_duration_seconds: Histogram,
17    pub requests_in_flight: IntGauge,
18
19    // Memory tier metrics
20    pub memories_by_tier: IntGauge,
21    pub memory_migrations_total: IntCounter,
22    pub memory_creation_total: IntCounter,
23    pub memory_deletion_total: IntCounter,
24
25    // Database metrics
26    pub db_connections_active: IntGauge,
27    pub db_connections_max: IntGauge,
28    pub db_query_duration_seconds: Histogram,
29    pub db_query_errors_total: IntCounter,
30
31    // Search metrics
32    pub search_requests_total: IntCounter,
33    pub search_duration_seconds: Histogram,
34    pub search_results_count: Histogram,
35    pub search_cache_hits_total: IntCounter,
36    pub search_cache_misses_total: IntCounter,
37
38    // System metrics
39    pub memory_usage_bytes: Gauge,
40    pub cpu_usage_percent: Gauge,
41    pub uptime_seconds: IntCounter,
42    pub error_rate_percent: Gauge,
43
44    // Migration metrics
45    pub migration_duration_seconds: Histogram,
46    pub migration_failures_total: IntCounter,
47    pub migration_queue_size: IntGauge,
48
49    // Performance metrics
50    pub response_time_p95: Gauge,
51    pub response_time_p99: Gauge,
52    pub memory_pressure_ratio: Gauge,
53}
54
55impl MetricsCollector {
56    pub fn new() -> anyhow::Result<Self> {
57        let registry = Arc::new(Registry::new());
58
59        let requests_total = IntCounter::with_opts(Opts::new(
60            "memory_requests_total",
61            "Total number of memory requests",
62        ))?;
63        registry.register(Box::new(requests_total.clone()))?;
64
65        let requests_duration_seconds = Histogram::with_opts(
66            HistogramOpts::new(
67                "memory_request_duration_seconds",
68                "Duration of memory requests in seconds",
69            )
70            .buckets(exponential_buckets(0.001, 2.0, 15)?),
71        )?;
72        registry.register(Box::new(requests_duration_seconds.clone()))?;
73
74        let requests_in_flight = IntGauge::with_opts(Opts::new(
75            "memory_requests_in_flight",
76            "Number of requests currently being processed",
77        ))?;
78        registry.register(Box::new(requests_in_flight.clone()))?;
79
80        let memories_by_tier = IntGauge::with_opts(Opts::new(
81            "memory_tier_count",
82            "Number of memories in each tier",
83        ))?;
84        registry.register(Box::new(memories_by_tier.clone()))?;
85
86        let memory_migrations_total = IntCounter::with_opts(Opts::new(
87            "memory_migrations_total",
88            "Total number of memory tier migrations",
89        ))?;
90        registry.register(Box::new(memory_migrations_total.clone()))?;
91
92        let memory_creation_total = IntCounter::with_opts(Opts::new(
93            "memory_creation_total",
94            "Total number of memories created",
95        ))?;
96        registry.register(Box::new(memory_creation_total.clone()))?;
97
98        let memory_deletion_total = IntCounter::with_opts(Opts::new(
99            "memory_deletion_total",
100            "Total number of memories deleted",
101        ))?;
102        registry.register(Box::new(memory_deletion_total.clone()))?;
103
104        let db_connections_active = IntGauge::with_opts(Opts::new(
105            "db_connections_active",
106            "Number of active database connections",
107        ))?;
108        registry.register(Box::new(db_connections_active.clone()))?;
109
110        let db_connections_max = IntGauge::with_opts(Opts::new(
111            "db_connections_max",
112            "Maximum number of database connections",
113        ))?;
114        registry.register(Box::new(db_connections_max.clone()))?;
115
116        let db_query_duration_seconds = Histogram::with_opts(
117            HistogramOpts::new(
118                "db_query_duration_seconds",
119                "Duration of database queries in seconds",
120            )
121            .buckets(exponential_buckets(0.001, 2.0, 15)?),
122        )?;
123        registry.register(Box::new(db_query_duration_seconds.clone()))?;
124
125        let db_query_errors_total = IntCounter::with_opts(Opts::new(
126            "db_query_errors_total",
127            "Total number of database query errors",
128        ))?;
129        registry.register(Box::new(db_query_errors_total.clone()))?;
130
131        let search_requests_total = IntCounter::with_opts(Opts::new(
132            "search_requests_total",
133            "Total number of search requests",
134        ))?;
135        registry.register(Box::new(search_requests_total.clone()))?;
136
137        let search_duration_seconds = Histogram::with_opts(
138            HistogramOpts::new(
139                "search_duration_seconds",
140                "Duration of search requests in seconds",
141            )
142            .buckets(linear_buckets(0.01, 0.01, 20)?),
143        )?;
144        registry.register(Box::new(search_duration_seconds.clone()))?;
145
146        let search_results_count = Histogram::with_opts(
147            HistogramOpts::new(
148                "search_results_count",
149                "Number of results returned by search",
150            )
151            .buckets(linear_buckets(1.0, 5.0, 20)?),
152        )?;
153        registry.register(Box::new(search_results_count.clone()))?;
154
155        let search_cache_hits_total = IntCounter::with_opts(Opts::new(
156            "search_cache_hits_total",
157            "Total number of search cache hits",
158        ))?;
159        registry.register(Box::new(search_cache_hits_total.clone()))?;
160
161        let search_cache_misses_total = IntCounter::with_opts(Opts::new(
162            "search_cache_misses_total",
163            "Total number of search cache misses",
164        ))?;
165        registry.register(Box::new(search_cache_misses_total.clone()))?;
166
167        let memory_usage_bytes = Gauge::with_opts(Opts::new(
168            "memory_usage_bytes",
169            "Current memory usage in bytes",
170        ))?;
171        registry.register(Box::new(memory_usage_bytes.clone()))?;
172
173        let cpu_usage_percent = Gauge::with_opts(Opts::new(
174            "cpu_usage_percent",
175            "Current CPU usage percentage",
176        ))?;
177        registry.register(Box::new(cpu_usage_percent.clone()))?;
178
179        let uptime_seconds =
180            IntCounter::with_opts(Opts::new("uptime_seconds_total", "Total uptime in seconds"))?;
181        registry.register(Box::new(uptime_seconds.clone()))?;
182
183        let error_rate_percent = Gauge::with_opts(Opts::new(
184            "error_rate_percent",
185            "Current error rate percentage",
186        ))?;
187        registry.register(Box::new(error_rate_percent.clone()))?;
188
189        let migration_duration_seconds = Histogram::with_opts(
190            HistogramOpts::new(
191                "migration_duration_seconds",
192                "Duration of memory migrations in seconds",
193            )
194            .buckets(exponential_buckets(0.01, 2.0, 12)?),
195        )?;
196        registry.register(Box::new(migration_duration_seconds.clone()))?;
197
198        let migration_failures_total = IntCounter::with_opts(Opts::new(
199            "migration_failures_total",
200            "Total number of migration failures",
201        ))?;
202        registry.register(Box::new(migration_failures_total.clone()))?;
203
204        let migration_queue_size = IntGauge::with_opts(Opts::new(
205            "migration_queue_size",
206            "Number of memories queued for migration",
207        ))?;
208        registry.register(Box::new(migration_queue_size.clone()))?;
209
210        let response_time_p95 = Gauge::with_opts(Opts::new(
211            "response_time_p95_seconds",
212            "95th percentile response time in seconds",
213        ))?;
214        registry.register(Box::new(response_time_p95.clone()))?;
215
216        let response_time_p99 = Gauge::with_opts(Opts::new(
217            "response_time_p99_seconds",
218            "99th percentile response time in seconds",
219        ))?;
220        registry.register(Box::new(response_time_p99.clone()))?;
221
222        let memory_pressure_ratio = Gauge::with_opts(Opts::new(
223            "memory_pressure_ratio",
224            "Ratio of memory usage indicating pressure (0-1)",
225        ))?;
226        registry.register(Box::new(memory_pressure_ratio.clone()))?;
227
228        info!("Initialized Prometheus metrics collector");
229
230        Ok(Self {
231            registry,
232            requests_total,
233            requests_duration_seconds,
234            requests_in_flight,
235            memories_by_tier,
236            memory_migrations_total,
237            memory_creation_total,
238            memory_deletion_total,
239            db_connections_active,
240            db_connections_max,
241            db_query_duration_seconds,
242            db_query_errors_total,
243            search_requests_total,
244            search_duration_seconds,
245            search_results_count,
246            search_cache_hits_total,
247            search_cache_misses_total,
248            memory_usage_bytes,
249            cpu_usage_percent,
250            uptime_seconds,
251            error_rate_percent,
252            migration_duration_seconds,
253            migration_failures_total,
254            migration_queue_size,
255            response_time_p95,
256            response_time_p99,
257            memory_pressure_ratio,
258        })
259    }
260
261    pub fn registry(&self) -> Arc<Registry> {
262        self.registry.clone()
263    }
264
265    /// Record a request with timing
266    pub fn record_request(&self, start_time: Instant) {
267        let duration = start_time.elapsed().as_secs_f64();
268        self.requests_total.inc();
269        self.requests_duration_seconds.observe(duration);
270    }
271
272    /// Record a database query with timing
273    pub fn record_db_query(&self, start_time: Instant, success: bool) {
274        let duration = start_time.elapsed().as_secs_f64();
275        self.db_query_duration_seconds.observe(duration);
276
277        if !success {
278            self.db_query_errors_total.inc();
279        }
280    }
281
282    /// Record a search operation
283    pub fn record_search(&self, start_time: Instant, results_count: usize, cache_hit: bool) {
284        let duration = start_time.elapsed().as_secs_f64();
285        self.search_requests_total.inc();
286        self.search_duration_seconds.observe(duration);
287        self.search_results_count.observe(results_count as f64);
288
289        if cache_hit {
290            self.search_cache_hits_total.inc();
291        } else {
292            self.search_cache_misses_total.inc();
293        }
294    }
295
296    /// Record a memory migration
297    pub fn record_migration(
298        &self,
299        start_time: Instant,
300        success: bool,
301        _memory_id: Uuid,
302        from_tier: &str,
303        to_tier: &str,
304    ) {
305        let duration = start_time.elapsed().as_secs_f64();
306        self.migration_duration_seconds.observe(duration);
307
308        if success {
309            self.memory_migrations_total.inc();
310            info!(
311                "Recorded successful migration from {} to {} in {:.3}s",
312                from_tier, to_tier, duration
313            );
314        } else {
315            self.migration_failures_total.inc();
316            warn!(
317                "Recorded failed migration from {} to {} after {:.3}s",
318                from_tier, to_tier, duration
319            );
320        }
321    }
322
323    /// Update system resource metrics
324    pub fn update_system_metrics(&self, memory_bytes: u64, cpu_percent: f64) {
325        self.memory_usage_bytes.set(memory_bytes as f64);
326        self.cpu_usage_percent.set(cpu_percent);
327    }
328
329    /// Update database connection pool metrics
330    pub fn update_connection_pool_metrics(&self, active: u32, max: u32) {
331        self.db_connections_active.set(active as i64);
332        self.db_connections_max.set(max as i64);
333    }
334
335    /// Update memory tier distribution
336    pub fn update_tier_metrics(&self, working: u64, warm: u64, cold: u64) {
337        // Use labels to distinguish tiers (simplified for now)
338        // In production, would use proper label support
339        info!(
340            "Memory tier distribution - Working: {}, Warm: {}, Cold: {}",
341            working, warm, cold
342        );
343    }
344
345    /// Calculate and update derived metrics
346    pub fn update_derived_metrics(&self) {
347        // Calculate cache hit ratio
348        let cache_hits = self.search_cache_hits_total.get();
349        let cache_misses = self.search_cache_misses_total.get();
350        let total_requests = cache_hits + cache_misses;
351
352        if total_requests > 0 {
353            let hit_ratio = cache_hits as f64 / total_requests as f64;
354            info!("Search cache hit ratio: {:.2}%", hit_ratio * 100.0);
355        }
356
357        // Calculate error rate
358        let total_requests = self.requests_total.get();
359        let db_errors = self.db_query_errors_total.get();
360        let migration_failures = self.migration_failures_total.get();
361
362        if total_requests > 0 {
363            let error_rate =
364                (db_errors + migration_failures) as f64 / total_requests as f64 * 100.0;
365            self.error_rate_percent.set(error_rate);
366        }
367    }
368
369    /// Get metrics in Prometheus format
370    pub fn gather_metrics(&self) -> String {
371        use prometheus::TextEncoder;
372        let encoder = TextEncoder::new();
373        let metric_families = self.registry.gather();
374        encoder
375            .encode_to_string(&metric_families)
376            .unwrap_or_else(|e| {
377                error!("Failed to encode metrics: {}", e);
378                String::new()
379            })
380    }
381}
382
383/// Request timing guard that automatically records metrics on drop
384pub struct RequestTimer {
385    start: Instant,
386    metrics: Arc<MetricsCollector>,
387    #[allow(dead_code)]
388    operation: String,
389}
390
391impl RequestTimer {
392    pub fn new(metrics: Arc<MetricsCollector>, operation: String) -> Self {
393        metrics.requests_in_flight.inc();
394        Self {
395            start: Instant::now(),
396            metrics,
397            operation,
398        }
399    }
400}
401
402impl Drop for RequestTimer {
403    fn drop(&mut self) {
404        self.metrics.requests_in_flight.dec();
405        self.metrics.record_request(self.start);
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use std::thread;
413    use std::time::Duration;
414
415    #[test]
416    fn test_metrics_collector_creation() {
417        let collector = MetricsCollector::new().unwrap();
418        assert_eq!(collector.requests_total.get(), 0);
419        assert_eq!(collector.requests_in_flight.get(), 0);
420    }
421
422    #[test]
423    fn test_request_timing() {
424        let collector = MetricsCollector::new().unwrap();
425        let start = Instant::now();
426
427        // Simulate some work
428        thread::sleep(Duration::from_millis(10));
429
430        collector.record_request(start);
431        assert_eq!(collector.requests_total.get(), 1);
432
433        let metrics_text = collector.gather_metrics();
434        assert!(metrics_text.contains("memory_requests_total"));
435    }
436
437    #[test]
438    fn test_request_timer() {
439        let collector = Arc::new(MetricsCollector::new().unwrap());
440
441        {
442            let _timer = RequestTimer::new(collector.clone(), "test".to_string());
443            assert_eq!(collector.requests_in_flight.get(), 1);
444            thread::sleep(Duration::from_millis(5));
445        } // Timer drops here
446
447        assert_eq!(collector.requests_in_flight.get(), 0);
448        assert_eq!(collector.requests_total.get(), 1);
449    }
450
451    #[test]
452    fn test_system_metrics_update() {
453        let collector = MetricsCollector::new().unwrap();
454
455        collector.update_system_metrics(1024 * 1024 * 512, 75.5); // 512MB, 75.5% CPU
456        assert_eq!(collector.memory_usage_bytes.get(), 1024.0 * 1024.0 * 512.0);
457        assert_eq!(collector.cpu_usage_percent.get(), 75.5);
458    }
459
460    #[test]
461    fn test_db_metrics() {
462        let collector = MetricsCollector::new().unwrap();
463        let start = Instant::now();
464
465        collector.record_db_query(start, true);
466        assert_eq!(collector.db_query_errors_total.get(), 0);
467
468        collector.record_db_query(start, false);
469        assert_eq!(collector.db_query_errors_total.get(), 1);
470    }
471}