1use crate::{MonitoringConfig, PerformanceMetrics};
7use serde::{Deserialize, Serialize};
8use std::{
9 collections::{HashMap, VecDeque},
10 sync::{
11 atomic::{AtomicU64, Ordering},
12 Arc, RwLock,
13 },
14 time::{Duration, Instant, SystemTime},
15};
16use tokio::time::Interval;
17
18#[derive(Debug)]
20pub struct PerformanceCounters {
21 pub get_operations: AtomicU64,
23 pub put_operations: AtomicU64,
25 pub remove_operations: AtomicU64,
27 pub eviction_operations: AtomicU64,
29 pub total_get_latency_us: AtomicU64,
31 pub total_put_latency_us: AtomicU64,
33 pub total_remove_latency_us: AtomicU64,
35 pub total_eviction_latency_us: AtomicU64,
37 pub hit_count: AtomicU64,
39 pub miss_count: AtomicU64,
41 pub total_bytes_allocated: AtomicU64,
43 pub peak_memory_usage: AtomicU64,
45 pub current_memory_usage: AtomicU64,
47 pub disk_bytes_read: AtomicU64,
49 pub disk_bytes_written: AtomicU64,
51}
52
53impl Default for PerformanceCounters {
54 fn default() -> Self {
55 Self {
56 get_operations: AtomicU64::new(0),
57 put_operations: AtomicU64::new(0),
58 remove_operations: AtomicU64::new(0),
59 eviction_operations: AtomicU64::new(0),
60 total_get_latency_us: AtomicU64::new(0),
61 total_put_latency_us: AtomicU64::new(0),
62 total_remove_latency_us: AtomicU64::new(0),
63 total_eviction_latency_us: AtomicU64::new(0),
64 hit_count: AtomicU64::new(0),
65 miss_count: AtomicU64::new(0),
66 total_bytes_allocated: AtomicU64::new(0),
67 peak_memory_usage: AtomicU64::new(0),
68 current_memory_usage: AtomicU64::new(0),
69 disk_bytes_read: AtomicU64::new(0),
70 disk_bytes_written: AtomicU64::new(0),
71 }
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PerformanceEvent {
78 pub timestamp: SystemTime,
80 pub event_type: PerformanceEventType,
82 pub latency_us: u64,
84 pub memory_usage: u64,
86 pub metadata: HashMap<String, String>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub enum PerformanceEventType {
93 CacheGet,
95 CachePut,
97 CacheRemove,
99 CacheEviction,
101 CachePromotion,
103 CacheDemotion,
105 PredictivePreheating,
107 AdaptiveTuning,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct BenchmarkResult {
114 pub name: String,
116 pub ops_per_second: f64,
118 pub avg_latency_us: f64,
120 pub p95_latency_us: f64,
122 pub p99_latency_us: f64,
124 pub memory_usage: u64,
126 pub hit_rate: f64,
128 pub duration: Duration,
130 pub timestamp: SystemTime,
132}
133
134#[derive(Debug)]
139pub struct PerformanceMonitor {
140 config: MonitoringConfig,
142 counters: Arc<PerformanceCounters>,
144 event_log: Arc<RwLock<VecDeque<PerformanceEvent>>>,
146 latency_histogram: Arc<RwLock<HashMap<u64, u64>>>,
148 benchmark_history: Arc<RwLock<Vec<BenchmarkResult>>>,
150 start_time: Instant,
152 _monitoring_interval: Option<Interval>,
154}
155
156impl PerformanceMonitor {
157 pub fn new(config: MonitoringConfig) -> Self {
159 Self {
160 config,
161 counters: Arc::new(PerformanceCounters::default()),
162 event_log: Arc::new(RwLock::new(VecDeque::new())),
163 latency_histogram: Arc::new(RwLock::new(HashMap::new())),
164 benchmark_history: Arc::new(RwLock::new(Vec::new())),
165 start_time: Instant::now(),
166 _monitoring_interval: None,
167 }
168 }
169
170 pub async fn record_get_latency(&self, latency: Duration) {
172 let latency_us = latency.as_micros() as u64;
173
174 self.counters.get_operations.fetch_add(1, Ordering::Relaxed);
175 self.counters
176 .total_get_latency_us
177 .fetch_add(latency_us, Ordering::Relaxed);
178
179 self.update_latency_histogram(latency_us).await;
180
181 if self.config.enable_event_logging {
182 self.log_event(PerformanceEvent {
183 timestamp: SystemTime::now(),
184 event_type: PerformanceEventType::CacheGet,
185 latency_us,
186 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
187 metadata: HashMap::new(),
188 })
189 .await;
190 }
191 }
192
193 pub async fn record_put_latency(&self, latency: Duration) {
195 let latency_us = latency.as_micros() as u64;
196
197 self.counters.put_operations.fetch_add(1, Ordering::Relaxed);
198 self.counters
199 .total_put_latency_us
200 .fetch_add(latency_us, Ordering::Relaxed);
201
202 self.update_latency_histogram(latency_us).await;
203
204 if self.config.enable_event_logging {
205 self.log_event(PerformanceEvent {
206 timestamp: SystemTime::now(),
207 event_type: PerformanceEventType::CachePut,
208 latency_us,
209 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
210 metadata: HashMap::new(),
211 })
212 .await;
213 }
214 }
215
216 pub async fn record_remove_latency(&self, latency: Duration) {
218 let latency_us = latency.as_micros() as u64;
219
220 self.counters
221 .remove_operations
222 .fetch_add(1, Ordering::Relaxed);
223 self.counters
224 .total_remove_latency_us
225 .fetch_add(latency_us, Ordering::Relaxed);
226
227 self.update_latency_histogram(latency_us).await;
228
229 if self.config.enable_event_logging {
230 self.log_event(PerformanceEvent {
231 timestamp: SystemTime::now(),
232 event_type: PerformanceEventType::CacheRemove,
233 latency_us,
234 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
235 metadata: HashMap::new(),
236 })
237 .await;
238 }
239 }
240
241 pub async fn update_memory_usage(&self, current_usage: u64) {
243 self.counters
244 .current_memory_usage
245 .store(current_usage, Ordering::Relaxed);
246
247 let current_peak = self.counters.peak_memory_usage.load(Ordering::Relaxed);
249 if current_usage > current_peak {
250 self.counters
251 .peak_memory_usage
252 .store(current_usage, Ordering::Relaxed);
253 }
254 }
255
256 pub async fn record_disk_io(&self, bytes_read: u64, bytes_written: u64) {
258 self.counters
259 .disk_bytes_read
260 .fetch_add(bytes_read, Ordering::Relaxed);
261 self.counters
262 .disk_bytes_written
263 .fetch_add(bytes_written, Ordering::Relaxed);
264 }
265
266 pub async fn record_eviction_latency(&self, latency: Duration) {
268 let latency_us = latency.as_micros() as u64;
269 self.counters
270 .eviction_operations
271 .fetch_add(1, Ordering::Relaxed);
272 self.counters
273 .total_eviction_latency_us
274 .fetch_add(latency_us, Ordering::Relaxed);
275
276 self.update_latency_histogram(latency_us).await;
277
278 if self.config.enable_event_logging {
279 self.log_event(PerformanceEvent {
280 timestamp: SystemTime::now(),
281 event_type: PerformanceEventType::CacheEviction,
282 latency_us,
283 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
284 metadata: HashMap::new(),
285 })
286 .await;
287 }
288 }
289
290 pub fn record_cache_hit(&self) {
292 self.counters.hit_count.fetch_add(1, Ordering::Relaxed);
293 }
294
295 pub fn record_cache_miss(&self) {
297 self.counters.miss_count.fetch_add(1, Ordering::Relaxed);
298 }
299
300 pub fn record_allocation(&self, bytes: u64) {
302 self.counters
303 .total_bytes_allocated
304 .fetch_add(bytes, Ordering::Relaxed);
305 }
306
307 pub fn hit_rate(&self) -> f64 {
309 let hits = self.counters.hit_count.load(Ordering::Relaxed);
310 let misses = self.counters.miss_count.load(Ordering::Relaxed);
311 let total = hits + misses;
312 if total > 0 {
313 hits as f64 / total as f64
314 } else {
315 0.0
316 }
317 }
318
319 async fn log_event(&self, event: PerformanceEvent) {
321 if !self.config.enable_event_logging {
322 return;
323 }
324
325 let mut log = self.event_log.write().unwrap();
326 log.push_back(event);
327
328 while log.len() > self.config.max_events_in_memory {
330 log.pop_front();
331 }
332 }
333
334 async fn update_latency_histogram(&self, latency_us: u64) {
336 let bucket = if latency_us == 0 {
338 0
339 } else {
340 1u64 << (64 - latency_us.leading_zeros())
341 };
342
343 let mut histogram = self.latency_histogram.write().unwrap();
344 *histogram.entry(bucket).or_insert(0) += 1;
345 }
346
347 pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
349 let get_ops = self.counters.get_operations.load(Ordering::Relaxed);
350 let put_ops = self.counters.put_operations.load(Ordering::Relaxed);
351 let remove_ops = self.counters.remove_operations.load(Ordering::Relaxed);
352 let eviction_ops = self.counters.eviction_operations.load(Ordering::Relaxed);
353
354 let total_get_latency = self.counters.total_get_latency_us.load(Ordering::Relaxed);
355 let total_put_latency = self.counters.total_put_latency_us.load(Ordering::Relaxed);
356 let total_eviction_latency = self
357 .counters
358 .total_eviction_latency_us
359 .load(Ordering::Relaxed);
360
361 let elapsed_secs = self.start_time.elapsed().as_secs_f64();
362 let total_ops = get_ops + put_ops + remove_ops;
363
364 let total_allocated = self.counters.total_bytes_allocated.load(Ordering::Relaxed);
366 let memory_allocation_rate = if elapsed_secs > 0.0 {
367 total_allocated as f64 / elapsed_secs
368 } else {
369 0.0
370 };
371
372 PerformanceMetrics {
373 avg_get_latency_us: if get_ops > 0 {
374 total_get_latency as f64 / get_ops as f64
375 } else {
376 0.0
377 },
378 avg_put_latency_us: if put_ops > 0 {
379 total_put_latency as f64 / put_ops as f64
380 } else {
381 0.0
382 },
383 avg_eviction_latency_us: if eviction_ops > 0 {
384 total_eviction_latency as f64 / eviction_ops as f64
385 } else {
386 0.0
387 },
388 ops_per_second: if elapsed_secs > 0.0 {
389 total_ops as f64 / elapsed_secs
390 } else {
391 0.0
392 },
393 memory_allocation_rate,
394 disk_io_rate: if elapsed_secs > 0.0 {
395 (self.counters.disk_bytes_read.load(Ordering::Relaxed)
396 + self.counters.disk_bytes_written.load(Ordering::Relaxed))
397 as f64
398 / elapsed_secs
399 } else {
400 0.0
401 },
402 cpu_usage_percent: {
405 let total_op_latency_us = total_get_latency
406 + total_put_latency
407 + total_eviction_latency
408 + self
409 .counters
410 .total_remove_latency_us
411 .load(Ordering::Relaxed);
412 let elapsed_us = self.start_time.elapsed().as_micros() as f64;
413 if elapsed_us > 0.0 {
414 (total_op_latency_us as f64 / elapsed_us * 100.0).min(100.0)
415 } else {
416 0.0
417 }
418 },
419 peak_memory_usage: self.counters.peak_memory_usage.load(Ordering::Relaxed),
420 }
421 }
422
423 pub async fn run_benchmark<F, Fut>(&self, name: &str, benchmark_fn: F) -> BenchmarkResult
425 where
426 F: Fn() -> Fut,
427 Fut: std::future::Future<Output = ()>,
428 {
429 let start_time = Instant::now();
430 let start_ops = self.counters.get_operations.load(Ordering::Relaxed)
431 + self.counters.put_operations.load(Ordering::Relaxed);
432
433 {
435 let mut histogram = self.latency_histogram.write().unwrap();
436 histogram.clear();
437 }
438
439 benchmark_fn().await;
441
442 let duration = start_time.elapsed();
443 let end_ops = self.counters.get_operations.load(Ordering::Relaxed)
444 + self.counters.put_operations.load(Ordering::Relaxed);
445
446 let ops_performed = end_ops - start_ops;
447 let ops_per_second = if duration.as_secs_f64() > 0.0 {
448 ops_performed as f64 / duration.as_secs_f64()
449 } else {
450 0.0
451 };
452
453 let (avg_latency, p95_latency, p99_latency) = self.calculate_latency_percentiles().await;
455
456 let result = BenchmarkResult {
457 name: name.to_string(),
458 ops_per_second,
459 avg_latency_us: avg_latency,
460 p95_latency_us: p95_latency,
461 p99_latency_us: p99_latency,
462 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
463 hit_rate: self.hit_rate(),
464 duration,
465 timestamp: SystemTime::now(),
466 };
467
468 {
470 let mut history = self.benchmark_history.write().unwrap();
471 history.push(result.clone());
472 }
473
474 result
475 }
476
477 async fn calculate_latency_percentiles(&self) -> (f64, f64, f64) {
479 let histogram = self.latency_histogram.read().unwrap();
480
481 if histogram.is_empty() {
482 return (0.0, 0.0, 0.0);
483 }
484
485 let mut latencies: Vec<(u64, u64)> = histogram
487 .iter()
488 .map(|(&latency, &count)| (latency, count))
489 .collect();
490 latencies.sort_by_key(|&(latency, _)| latency);
491
492 let total_samples: u64 = latencies.iter().map(|(_, count)| count).sum();
493 if total_samples == 0 {
494 return (0.0, 0.0, 0.0);
495 }
496
497 let weighted_sum: u64 = latencies
499 .iter()
500 .map(|(latency, count)| latency * count)
501 .sum();
502 let avg_latency = weighted_sum as f64 / total_samples as f64;
503
504 let p95_target = (total_samples as f64 * 0.95) as u64;
506 let p99_target = (total_samples as f64 * 0.99) as u64;
507
508 let mut cumulative = 0u64;
509 let mut p95_latency = 0.0;
510 let mut p99_latency = 0.0;
511
512 for &(latency, count) in &latencies {
513 cumulative += count;
514
515 if p95_latency == 0.0 && cumulative >= p95_target {
516 p95_latency = latency as f64;
517 }
518
519 if p99_latency == 0.0 && cumulative >= p99_target {
520 p99_latency = latency as f64;
521 break;
522 }
523 }
524
525 (avg_latency, p95_latency, p99_latency)
526 }
527
528 pub async fn get_recent_events(&self, limit: usize) -> Vec<PerformanceEvent> {
530 let log = self.event_log.read().unwrap();
531 log.iter().rev().take(limit).cloned().collect()
532 }
533
534 pub async fn get_benchmark_history(&self) -> Vec<BenchmarkResult> {
536 self.benchmark_history.read().unwrap().clone()
537 }
538
539 pub async fn reset(&self) {
541 self.counters.get_operations.store(0, Ordering::Relaxed);
543 self.counters.put_operations.store(0, Ordering::Relaxed);
544 self.counters.remove_operations.store(0, Ordering::Relaxed);
545 self.counters
546 .eviction_operations
547 .store(0, Ordering::Relaxed);
548 self.counters
549 .total_get_latency_us
550 .store(0, Ordering::Relaxed);
551 self.counters
552 .total_put_latency_us
553 .store(0, Ordering::Relaxed);
554 self.counters
555 .total_remove_latency_us
556 .store(0, Ordering::Relaxed);
557 self.counters
558 .total_eviction_latency_us
559 .store(0, Ordering::Relaxed);
560 self.counters.hit_count.store(0, Ordering::Relaxed);
561 self.counters.miss_count.store(0, Ordering::Relaxed);
562 self.counters
563 .total_bytes_allocated
564 .store(0, Ordering::Relaxed);
565 self.counters.peak_memory_usage.store(0, Ordering::Relaxed);
566 self.counters
567 .current_memory_usage
568 .store(0, Ordering::Relaxed);
569 self.counters.disk_bytes_read.store(0, Ordering::Relaxed);
570 self.counters.disk_bytes_written.store(0, Ordering::Relaxed);
571
572 {
574 let mut log = self.event_log.write().unwrap();
575 log.clear();
576 }
577
578 {
579 let mut histogram = self.latency_histogram.write().unwrap();
580 histogram.clear();
581 }
582 }
583}