1use prometheus::{
2 exponential_buckets, linear_buckets, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge,
3 Opts, Registry,
4};
5use std::sync::Arc;
6use std::time::Instant;
7use tracing::{error, info, warn};
8use uuid::Uuid;
9
10pub struct MetricsCollector {
12 registry: Arc<Registry>,
13
14 pub requests_total: IntCounter,
16 pub requests_duration_seconds: Histogram,
17 pub requests_in_flight: IntGauge,
18
19 pub memories_by_tier: IntGauge,
21 pub memory_migrations_total: IntCounter,
22 pub memory_creation_total: IntCounter,
23 pub memory_deletion_total: IntCounter,
24
25 pub db_connections_active: IntGauge,
27 pub db_connections_max: IntGauge,
28 pub db_query_duration_seconds: Histogram,
29 pub db_query_errors_total: IntCounter,
30
31 pub search_requests_total: IntCounter,
33 pub search_duration_seconds: Histogram,
34 pub search_results_count: Histogram,
35 pub search_cache_hits_total: IntCounter,
36 pub search_cache_misses_total: IntCounter,
37
38 pub memory_usage_bytes: Gauge,
40 pub cpu_usage_percent: Gauge,
41 pub uptime_seconds: IntCounter,
42 pub error_rate_percent: Gauge,
43
44 pub migration_duration_seconds: Histogram,
46 pub migration_failures_total: IntCounter,
47 pub migration_queue_size: IntGauge,
48
49 pub response_time_p95: Gauge,
51 pub response_time_p99: Gauge,
52 pub memory_pressure_ratio: Gauge,
53}
54
55impl MetricsCollector {
56 pub fn new() -> anyhow::Result<Self> {
57 let registry = Arc::new(Registry::new());
58
59 let requests_total = IntCounter::with_opts(Opts::new(
60 "memory_requests_total",
61 "Total number of memory requests",
62 ))?;
63 registry.register(Box::new(requests_total.clone()))?;
64
65 let requests_duration_seconds = Histogram::with_opts(
66 HistogramOpts::new(
67 "memory_request_duration_seconds",
68 "Duration of memory requests in seconds",
69 )
70 .buckets(exponential_buckets(0.001, 2.0, 15)?),
71 )?;
72 registry.register(Box::new(requests_duration_seconds.clone()))?;
73
74 let requests_in_flight = IntGauge::with_opts(Opts::new(
75 "memory_requests_in_flight",
76 "Number of requests currently being processed",
77 ))?;
78 registry.register(Box::new(requests_in_flight.clone()))?;
79
80 let memories_by_tier = IntGauge::with_opts(Opts::new(
81 "memory_tier_count",
82 "Number of memories in each tier",
83 ))?;
84 registry.register(Box::new(memories_by_tier.clone()))?;
85
86 let memory_migrations_total = IntCounter::with_opts(Opts::new(
87 "memory_migrations_total",
88 "Total number of memory tier migrations",
89 ))?;
90 registry.register(Box::new(memory_migrations_total.clone()))?;
91
92 let memory_creation_total = IntCounter::with_opts(Opts::new(
93 "memory_creation_total",
94 "Total number of memories created",
95 ))?;
96 registry.register(Box::new(memory_creation_total.clone()))?;
97
98 let memory_deletion_total = IntCounter::with_opts(Opts::new(
99 "memory_deletion_total",
100 "Total number of memories deleted",
101 ))?;
102 registry.register(Box::new(memory_deletion_total.clone()))?;
103
104 let db_connections_active = IntGauge::with_opts(Opts::new(
105 "db_connections_active",
106 "Number of active database connections",
107 ))?;
108 registry.register(Box::new(db_connections_active.clone()))?;
109
110 let db_connections_max = IntGauge::with_opts(Opts::new(
111 "db_connections_max",
112 "Maximum number of database connections",
113 ))?;
114 registry.register(Box::new(db_connections_max.clone()))?;
115
116 let db_query_duration_seconds = Histogram::with_opts(
117 HistogramOpts::new(
118 "db_query_duration_seconds",
119 "Duration of database queries in seconds",
120 )
121 .buckets(exponential_buckets(0.001, 2.0, 15)?),
122 )?;
123 registry.register(Box::new(db_query_duration_seconds.clone()))?;
124
125 let db_query_errors_total = IntCounter::with_opts(Opts::new(
126 "db_query_errors_total",
127 "Total number of database query errors",
128 ))?;
129 registry.register(Box::new(db_query_errors_total.clone()))?;
130
131 let search_requests_total = IntCounter::with_opts(Opts::new(
132 "search_requests_total",
133 "Total number of search requests",
134 ))?;
135 registry.register(Box::new(search_requests_total.clone()))?;
136
137 let search_duration_seconds = Histogram::with_opts(
138 HistogramOpts::new(
139 "search_duration_seconds",
140 "Duration of search requests in seconds",
141 )
142 .buckets(linear_buckets(0.01, 0.01, 20)?),
143 )?;
144 registry.register(Box::new(search_duration_seconds.clone()))?;
145
146 let search_results_count = Histogram::with_opts(
147 HistogramOpts::new(
148 "search_results_count",
149 "Number of results returned by search",
150 )
151 .buckets(linear_buckets(1.0, 5.0, 20)?),
152 )?;
153 registry.register(Box::new(search_results_count.clone()))?;
154
155 let search_cache_hits_total = IntCounter::with_opts(Opts::new(
156 "search_cache_hits_total",
157 "Total number of search cache hits",
158 ))?;
159 registry.register(Box::new(search_cache_hits_total.clone()))?;
160
161 let search_cache_misses_total = IntCounter::with_opts(Opts::new(
162 "search_cache_misses_total",
163 "Total number of search cache misses",
164 ))?;
165 registry.register(Box::new(search_cache_misses_total.clone()))?;
166
167 let memory_usage_bytes = Gauge::with_opts(Opts::new(
168 "memory_usage_bytes",
169 "Current memory usage in bytes",
170 ))?;
171 registry.register(Box::new(memory_usage_bytes.clone()))?;
172
173 let cpu_usage_percent = Gauge::with_opts(Opts::new(
174 "cpu_usage_percent",
175 "Current CPU usage percentage",
176 ))?;
177 registry.register(Box::new(cpu_usage_percent.clone()))?;
178
179 let uptime_seconds =
180 IntCounter::with_opts(Opts::new("uptime_seconds_total", "Total uptime in seconds"))?;
181 registry.register(Box::new(uptime_seconds.clone()))?;
182
183 let error_rate_percent = Gauge::with_opts(Opts::new(
184 "error_rate_percent",
185 "Current error rate percentage",
186 ))?;
187 registry.register(Box::new(error_rate_percent.clone()))?;
188
189 let migration_duration_seconds = Histogram::with_opts(
190 HistogramOpts::new(
191 "migration_duration_seconds",
192 "Duration of memory migrations in seconds",
193 )
194 .buckets(exponential_buckets(0.01, 2.0, 12)?),
195 )?;
196 registry.register(Box::new(migration_duration_seconds.clone()))?;
197
198 let migration_failures_total = IntCounter::with_opts(Opts::new(
199 "migration_failures_total",
200 "Total number of migration failures",
201 ))?;
202 registry.register(Box::new(migration_failures_total.clone()))?;
203
204 let migration_queue_size = IntGauge::with_opts(Opts::new(
205 "migration_queue_size",
206 "Number of memories queued for migration",
207 ))?;
208 registry.register(Box::new(migration_queue_size.clone()))?;
209
210 let response_time_p95 = Gauge::with_opts(Opts::new(
211 "response_time_p95_seconds",
212 "95th percentile response time in seconds",
213 ))?;
214 registry.register(Box::new(response_time_p95.clone()))?;
215
216 let response_time_p99 = Gauge::with_opts(Opts::new(
217 "response_time_p99_seconds",
218 "99th percentile response time in seconds",
219 ))?;
220 registry.register(Box::new(response_time_p99.clone()))?;
221
222 let memory_pressure_ratio = Gauge::with_opts(Opts::new(
223 "memory_pressure_ratio",
224 "Ratio of memory usage indicating pressure (0-1)",
225 ))?;
226 registry.register(Box::new(memory_pressure_ratio.clone()))?;
227
228 info!("Initialized Prometheus metrics collector");
229
230 Ok(Self {
231 registry,
232 requests_total,
233 requests_duration_seconds,
234 requests_in_flight,
235 memories_by_tier,
236 memory_migrations_total,
237 memory_creation_total,
238 memory_deletion_total,
239 db_connections_active,
240 db_connections_max,
241 db_query_duration_seconds,
242 db_query_errors_total,
243 search_requests_total,
244 search_duration_seconds,
245 search_results_count,
246 search_cache_hits_total,
247 search_cache_misses_total,
248 memory_usage_bytes,
249 cpu_usage_percent,
250 uptime_seconds,
251 error_rate_percent,
252 migration_duration_seconds,
253 migration_failures_total,
254 migration_queue_size,
255 response_time_p95,
256 response_time_p99,
257 memory_pressure_ratio,
258 })
259 }
260
261 pub fn registry(&self) -> Arc<Registry> {
262 self.registry.clone()
263 }
264
265 pub fn record_request(&self, start_time: Instant) {
267 let duration = start_time.elapsed().as_secs_f64();
268 self.requests_total.inc();
269 self.requests_duration_seconds.observe(duration);
270 }
271
272 pub fn record_db_query(&self, start_time: Instant, success: bool) {
274 let duration = start_time.elapsed().as_secs_f64();
275 self.db_query_duration_seconds.observe(duration);
276
277 if !success {
278 self.db_query_errors_total.inc();
279 }
280 }
281
282 pub fn record_search(&self, start_time: Instant, results_count: usize, cache_hit: bool) {
284 let duration = start_time.elapsed().as_secs_f64();
285 self.search_requests_total.inc();
286 self.search_duration_seconds.observe(duration);
287 self.search_results_count.observe(results_count as f64);
288
289 if cache_hit {
290 self.search_cache_hits_total.inc();
291 } else {
292 self.search_cache_misses_total.inc();
293 }
294 }
295
296 pub fn record_migration(
298 &self,
299 start_time: Instant,
300 success: bool,
301 _memory_id: Uuid,
302 from_tier: &str,
303 to_tier: &str,
304 ) {
305 let duration = start_time.elapsed().as_secs_f64();
306 self.migration_duration_seconds.observe(duration);
307
308 if success {
309 self.memory_migrations_total.inc();
310 info!(
311 "Recorded successful migration from {} to {} in {:.3}s",
312 from_tier, to_tier, duration
313 );
314 } else {
315 self.migration_failures_total.inc();
316 warn!(
317 "Recorded failed migration from {} to {} after {:.3}s",
318 from_tier, to_tier, duration
319 );
320 }
321 }
322
323 pub fn update_system_metrics(&self, memory_bytes: u64, cpu_percent: f64) {
325 self.memory_usage_bytes.set(memory_bytes as f64);
326 self.cpu_usage_percent.set(cpu_percent);
327 }
328
329 pub fn update_connection_pool_metrics(&self, active: u32, max: u32) {
331 self.db_connections_active.set(active as i64);
332 self.db_connections_max.set(max as i64);
333 }
334
335 pub fn update_tier_metrics(&self, working: u64, warm: u64, cold: u64) {
337 info!(
340 "Memory tier distribution - Working: {}, Warm: {}, Cold: {}",
341 working, warm, cold
342 );
343 }
344
345 pub fn update_derived_metrics(&self) {
347 let cache_hits = self.search_cache_hits_total.get();
349 let cache_misses = self.search_cache_misses_total.get();
350 let total_requests = cache_hits + cache_misses;
351
352 if total_requests > 0 {
353 let hit_ratio = cache_hits as f64 / total_requests as f64;
354 info!("Search cache hit ratio: {:.2}%", hit_ratio * 100.0);
355 }
356
357 let total_requests = self.requests_total.get();
359 let db_errors = self.db_query_errors_total.get();
360 let migration_failures = self.migration_failures_total.get();
361
362 if total_requests > 0 {
363 let error_rate =
364 (db_errors + migration_failures) as f64 / total_requests as f64 * 100.0;
365 self.error_rate_percent.set(error_rate);
366 }
367 }
368
369 pub fn gather_metrics(&self) -> String {
371 use prometheus::TextEncoder;
372 let encoder = TextEncoder::new();
373 let metric_families = self.registry.gather();
374 encoder
375 .encode_to_string(&metric_families)
376 .unwrap_or_else(|e| {
377 error!("Failed to encode metrics: {}", e);
378 String::new()
379 })
380 }
381}
382
383pub struct RequestTimer {
385 start: Instant,
386 metrics: Arc<MetricsCollector>,
387 #[allow(dead_code)]
388 operation: String,
389}
390
391impl RequestTimer {
392 pub fn new(metrics: Arc<MetricsCollector>, operation: String) -> Self {
393 metrics.requests_in_flight.inc();
394 Self {
395 start: Instant::now(),
396 metrics,
397 operation,
398 }
399 }
400}
401
402impl Drop for RequestTimer {
403 fn drop(&mut self) {
404 self.metrics.requests_in_flight.dec();
405 self.metrics.record_request(self.start);
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use std::thread;
413 use std::time::Duration;
414
415 #[test]
416 fn test_metrics_collector_creation() {
417 let collector = MetricsCollector::new().unwrap();
418 assert_eq!(collector.requests_total.get(), 0);
419 assert_eq!(collector.requests_in_flight.get(), 0);
420 }
421
422 #[test]
423 fn test_request_timing() {
424 let collector = MetricsCollector::new().unwrap();
425 let start = Instant::now();
426
427 thread::sleep(Duration::from_millis(10));
429
430 collector.record_request(start);
431 assert_eq!(collector.requests_total.get(), 1);
432
433 let metrics_text = collector.gather_metrics();
434 assert!(metrics_text.contains("memory_requests_total"));
435 }
436
437 #[test]
438 fn test_request_timer() {
439 let collector = Arc::new(MetricsCollector::new().unwrap());
440
441 {
442 let _timer = RequestTimer::new(collector.clone(), "test".to_string());
443 assert_eq!(collector.requests_in_flight.get(), 1);
444 thread::sleep(Duration::from_millis(5));
445 } assert_eq!(collector.requests_in_flight.get(), 0);
448 assert_eq!(collector.requests_total.get(), 1);
449 }
450
451 #[test]
452 fn test_system_metrics_update() {
453 let collector = MetricsCollector::new().unwrap();
454
455 collector.update_system_metrics(1024 * 1024 * 512, 75.5); assert_eq!(collector.memory_usage_bytes.get(), 1024.0 * 1024.0 * 512.0);
457 assert_eq!(collector.cpu_usage_percent.get(), 75.5);
458 }
459
460 #[test]
461 fn test_db_metrics() {
462 let collector = MetricsCollector::new().unwrap();
463 let start = Instant::now();
464
465 collector.record_db_query(start, true);
466 assert_eq!(collector.db_query_errors_total.get(), 0);
467
468 collector.record_db_query(start, false);
469 assert_eq!(collector.db_query_errors_total.get(), 1);
470 }
471}