1use crate::{MonitoringConfig, PerformanceMetrics};
7use serde::{Deserialize, Serialize};
8use std::{
9 collections::{HashMap, VecDeque},
10 sync::{
11 atomic::{AtomicU64, Ordering},
12 Arc, RwLock,
13 },
14 time::{Duration, Instant, SystemTime},
15};
16use tokio::time::Interval;
17
18#[derive(Debug)]
20pub struct PerformanceCounters {
21 pub get_operations: AtomicU64,
23 pub put_operations: AtomicU64,
25 pub remove_operations: AtomicU64,
27 pub total_get_latency_us: AtomicU64,
29 pub total_put_latency_us: AtomicU64,
31 pub total_remove_latency_us: AtomicU64,
33 pub peak_memory_usage: AtomicU64,
35 pub current_memory_usage: AtomicU64,
37 pub disk_bytes_read: AtomicU64,
39 pub disk_bytes_written: AtomicU64,
41}
42
43impl Default for PerformanceCounters {
44 fn default() -> Self {
45 Self {
46 get_operations: AtomicU64::new(0),
47 put_operations: AtomicU64::new(0),
48 remove_operations: AtomicU64::new(0),
49 total_get_latency_us: AtomicU64::new(0),
50 total_put_latency_us: AtomicU64::new(0),
51 total_remove_latency_us: AtomicU64::new(0),
52 peak_memory_usage: AtomicU64::new(0),
53 current_memory_usage: AtomicU64::new(0),
54 disk_bytes_read: AtomicU64::new(0),
55 disk_bytes_written: AtomicU64::new(0),
56 }
57 }
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct PerformanceEvent {
63 pub timestamp: SystemTime,
65 pub event_type: PerformanceEventType,
67 pub latency_us: u64,
69 pub memory_usage: u64,
71 pub metadata: HashMap<String, String>,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub enum PerformanceEventType {
78 CacheGet,
80 CachePut,
82 CacheRemove,
84 CacheEviction,
86 CachePromotion,
88 CacheDemotion,
90 PredictivePreheating,
92 AdaptiveTuning,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct BenchmarkResult {
99 pub name: String,
101 pub ops_per_second: f64,
103 pub avg_latency_us: f64,
105 pub p95_latency_us: f64,
107 pub p99_latency_us: f64,
109 pub memory_usage: u64,
111 pub hit_rate: f64,
113 pub duration: Duration,
115 pub timestamp: SystemTime,
117}
118
119#[derive(Debug)]
124pub struct PerformanceMonitor {
125 config: MonitoringConfig,
127 counters: Arc<PerformanceCounters>,
129 event_log: Arc<RwLock<VecDeque<PerformanceEvent>>>,
131 latency_histogram: Arc<RwLock<HashMap<u64, u64>>>,
133 benchmark_history: Arc<RwLock<Vec<BenchmarkResult>>>,
135 start_time: Instant,
137 _monitoring_interval: Option<Interval>,
139}
140
141impl PerformanceMonitor {
142 pub fn new(config: MonitoringConfig) -> Self {
144 Self {
145 config,
146 counters: Arc::new(PerformanceCounters::default()),
147 event_log: Arc::new(RwLock::new(VecDeque::new())),
148 latency_histogram: Arc::new(RwLock::new(HashMap::new())),
149 benchmark_history: Arc::new(RwLock::new(Vec::new())),
150 start_time: Instant::now(),
151 _monitoring_interval: None,
152 }
153 }
154
155 pub async fn record_get_latency(&self, latency: Duration) {
157 let latency_us = latency.as_micros() as u64;
158
159 self.counters.get_operations.fetch_add(1, Ordering::Relaxed);
160 self.counters
161 .total_get_latency_us
162 .fetch_add(latency_us, Ordering::Relaxed);
163
164 self.update_latency_histogram(latency_us).await;
165
166 if self.config.enable_event_logging {
167 self.log_event(PerformanceEvent {
168 timestamp: SystemTime::now(),
169 event_type: PerformanceEventType::CacheGet,
170 latency_us,
171 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
172 metadata: HashMap::new(),
173 })
174 .await;
175 }
176 }
177
178 pub async fn record_put_latency(&self, latency: Duration) {
180 let latency_us = latency.as_micros() as u64;
181
182 self.counters.put_operations.fetch_add(1, Ordering::Relaxed);
183 self.counters
184 .total_put_latency_us
185 .fetch_add(latency_us, Ordering::Relaxed);
186
187 self.update_latency_histogram(latency_us).await;
188
189 if self.config.enable_event_logging {
190 self.log_event(PerformanceEvent {
191 timestamp: SystemTime::now(),
192 event_type: PerformanceEventType::CachePut,
193 latency_us,
194 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
195 metadata: HashMap::new(),
196 })
197 .await;
198 }
199 }
200
201 pub async fn record_remove_latency(&self, latency: Duration) {
203 let latency_us = latency.as_micros() as u64;
204
205 self.counters
206 .remove_operations
207 .fetch_add(1, Ordering::Relaxed);
208 self.counters
209 .total_remove_latency_us
210 .fetch_add(latency_us, Ordering::Relaxed);
211
212 self.update_latency_histogram(latency_us).await;
213
214 if self.config.enable_event_logging {
215 self.log_event(PerformanceEvent {
216 timestamp: SystemTime::now(),
217 event_type: PerformanceEventType::CacheRemove,
218 latency_us,
219 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
220 metadata: HashMap::new(),
221 })
222 .await;
223 }
224 }
225
226 pub async fn update_memory_usage(&self, current_usage: u64) {
228 self.counters
229 .current_memory_usage
230 .store(current_usage, Ordering::Relaxed);
231
232 let current_peak = self.counters.peak_memory_usage.load(Ordering::Relaxed);
234 if current_usage > current_peak {
235 self.counters
236 .peak_memory_usage
237 .store(current_usage, Ordering::Relaxed);
238 }
239 }
240
241 pub async fn record_disk_io(&self, bytes_read: u64, bytes_written: u64) {
243 self.counters
244 .disk_bytes_read
245 .fetch_add(bytes_read, Ordering::Relaxed);
246 self.counters
247 .disk_bytes_written
248 .fetch_add(bytes_written, Ordering::Relaxed);
249 }
250
251 async fn log_event(&self, event: PerformanceEvent) {
253 if !self.config.enable_event_logging {
254 return;
255 }
256
257 let mut log = self.event_log.write().unwrap();
258 log.push_back(event);
259
260 while log.len() > self.config.max_events_in_memory {
262 log.pop_front();
263 }
264 }
265
266 async fn update_latency_histogram(&self, latency_us: u64) {
268 let bucket = if latency_us == 0 {
270 0
271 } else {
272 1u64 << (64 - latency_us.leading_zeros())
273 };
274
275 let mut histogram = self.latency_histogram.write().unwrap();
276 *histogram.entry(bucket).or_insert(0) += 1;
277 }
278
279 pub async fn get_performance_metrics(&self) -> PerformanceMetrics {
281 let get_ops = self.counters.get_operations.load(Ordering::Relaxed);
282 let put_ops = self.counters.put_operations.load(Ordering::Relaxed);
283 let remove_ops = self.counters.remove_operations.load(Ordering::Relaxed);
284
285 let total_get_latency = self.counters.total_get_latency_us.load(Ordering::Relaxed);
286 let total_put_latency = self.counters.total_put_latency_us.load(Ordering::Relaxed);
287
288 let elapsed_secs = self.start_time.elapsed().as_secs_f64();
289 let total_ops = get_ops + put_ops + remove_ops;
290
291 PerformanceMetrics {
292 avg_get_latency_us: if get_ops > 0 {
293 total_get_latency as f64 / get_ops as f64
294 } else {
295 0.0
296 },
297 avg_put_latency_us: if put_ops > 0 {
298 total_put_latency as f64 / put_ops as f64
299 } else {
300 0.0
301 },
302 avg_eviction_latency_us: 0.0, ops_per_second: if elapsed_secs > 0.0 {
304 total_ops as f64 / elapsed_secs
305 } else {
306 0.0
307 },
308 memory_allocation_rate: 0.0, disk_io_rate: if elapsed_secs > 0.0 {
310 (self.counters.disk_bytes_read.load(Ordering::Relaxed)
311 + self.counters.disk_bytes_written.load(Ordering::Relaxed))
312 as f64
313 / elapsed_secs
314 } else {
315 0.0
316 },
317 cpu_usage_percent: 0.0, peak_memory_usage: self.counters.peak_memory_usage.load(Ordering::Relaxed),
319 }
320 }
321
322 pub async fn run_benchmark<F, Fut>(&self, name: &str, benchmark_fn: F) -> BenchmarkResult
324 where
325 F: Fn() -> Fut,
326 Fut: std::future::Future<Output = ()>,
327 {
328 let start_time = Instant::now();
329 let start_ops = self.counters.get_operations.load(Ordering::Relaxed)
330 + self.counters.put_operations.load(Ordering::Relaxed);
331
332 {
334 let mut histogram = self.latency_histogram.write().unwrap();
335 histogram.clear();
336 }
337
338 benchmark_fn().await;
340
341 let duration = start_time.elapsed();
342 let end_ops = self.counters.get_operations.load(Ordering::Relaxed)
343 + self.counters.put_operations.load(Ordering::Relaxed);
344
345 let ops_performed = end_ops - start_ops;
346 let ops_per_second = if duration.as_secs_f64() > 0.0 {
347 ops_performed as f64 / duration.as_secs_f64()
348 } else {
349 0.0
350 };
351
352 let (avg_latency, p95_latency, p99_latency) = self.calculate_latency_percentiles().await;
354
355 let result = BenchmarkResult {
356 name: name.to_string(),
357 ops_per_second,
358 avg_latency_us: avg_latency,
359 p95_latency_us: p95_latency,
360 p99_latency_us: p99_latency,
361 memory_usage: self.counters.current_memory_usage.load(Ordering::Relaxed),
362 hit_rate: 0.0, duration,
364 timestamp: SystemTime::now(),
365 };
366
367 {
369 let mut history = self.benchmark_history.write().unwrap();
370 history.push(result.clone());
371 }
372
373 result
374 }
375
376 async fn calculate_latency_percentiles(&self) -> (f64, f64, f64) {
378 let histogram = self.latency_histogram.read().unwrap();
379
380 if histogram.is_empty() {
381 return (0.0, 0.0, 0.0);
382 }
383
384 let mut latencies: Vec<(u64, u64)> = histogram
386 .iter()
387 .map(|(&latency, &count)| (latency, count))
388 .collect();
389 latencies.sort_by_key(|&(latency, _)| latency);
390
391 let total_samples: u64 = latencies.iter().map(|(_, count)| count).sum();
392 if total_samples == 0 {
393 return (0.0, 0.0, 0.0);
394 }
395
396 let weighted_sum: u64 = latencies
398 .iter()
399 .map(|(latency, count)| latency * count)
400 .sum();
401 let avg_latency = weighted_sum as f64 / total_samples as f64;
402
403 let p95_target = (total_samples as f64 * 0.95) as u64;
405 let p99_target = (total_samples as f64 * 0.99) as u64;
406
407 let mut cumulative = 0u64;
408 let mut p95_latency = 0.0;
409 let mut p99_latency = 0.0;
410
411 for &(latency, count) in &latencies {
412 cumulative += count;
413
414 if p95_latency == 0.0 && cumulative >= p95_target {
415 p95_latency = latency as f64;
416 }
417
418 if p99_latency == 0.0 && cumulative >= p99_target {
419 p99_latency = latency as f64;
420 break;
421 }
422 }
423
424 (avg_latency, p95_latency, p99_latency)
425 }
426
427 pub async fn get_recent_events(&self, limit: usize) -> Vec<PerformanceEvent> {
429 let log = self.event_log.read().unwrap();
430 log.iter().rev().take(limit).cloned().collect()
431 }
432
433 pub async fn get_benchmark_history(&self) -> Vec<BenchmarkResult> {
435 self.benchmark_history.read().unwrap().clone()
436 }
437
438 pub async fn reset(&self) {
440 self.counters.get_operations.store(0, Ordering::Relaxed);
442 self.counters.put_operations.store(0, Ordering::Relaxed);
443 self.counters.remove_operations.store(0, Ordering::Relaxed);
444 self.counters
445 .total_get_latency_us
446 .store(0, Ordering::Relaxed);
447 self.counters
448 .total_put_latency_us
449 .store(0, Ordering::Relaxed);
450 self.counters
451 .total_remove_latency_us
452 .store(0, Ordering::Relaxed);
453 self.counters.peak_memory_usage.store(0, Ordering::Relaxed);
454 self.counters
455 .current_memory_usage
456 .store(0, Ordering::Relaxed);
457 self.counters.disk_bytes_read.store(0, Ordering::Relaxed);
458 self.counters.disk_bytes_written.store(0, Ordering::Relaxed);
459
460 {
462 let mut log = self.event_log.write().unwrap();
463 log.clear();
464 }
465
466 {
467 let mut histogram = self.latency_histogram.write().unwrap();
468 histogram.clear();
469 }
470 }
471}