1use atomic_float::AtomicF64;
21use crossbeam_channel::{Receiver, Sender, bounded};
22use dashmap::DashMap;
23use serde::{Deserialize, Serialize};
24use std::sync::Arc;
25use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27use tracing::{debug, info};
28
29#[derive(Debug)]
31pub struct PerformanceMonitor {
32 operations: DashMap<String, Arc<OperationMetrics>>,
34
35 caches: DashMap<String, Arc<CacheMetrics>>,
37
38 total_operations: AtomicU64,
41 total_errors: AtomicU64,
43 active_operations: AtomicUsize,
45 started_at_timestamp: AtomicU64,
47
48 session_id: Option<String>,
50
51 metrics_sender: Sender<MetricsEvent>,
53 #[allow(dead_code)]
54 metrics_receiver: Receiver<MetricsEvent>,
55}
56
57#[derive(Debug)]
59pub struct OperationMetrics {
60 operation_name: String,
62
63 total_calls: AtomicU64,
65 error_count: AtomicU64,
67
68 total_duration_ns: AtomicU64,
70 min_duration_ns: AtomicU64,
72 max_duration_ns: AtomicU64,
74
75 last_execution_timestamp: AtomicU64,
77
78 avg_duration_ns: AtomicU64,
80 error_rate: AtomicF64,
82
83 recent_duration_sum: AtomicU64,
85 recent_operation_count: AtomicU64,
87}
88
89#[derive(Debug)]
91pub struct CacheMetrics {
92 cache_name: String,
94
95 total_requests: AtomicU64,
97 hits: AtomicU64,
99 misses: AtomicU64,
101 evictions: AtomicU64,
103
104 total_lookup_time_ns: AtomicU64,
106 avg_lookup_time_ns: AtomicU64,
108
109 hit_rate: AtomicF64,
111 miss_rate: AtomicF64,
113
114 last_updated_timestamp: AtomicU64,
116 #[allow(dead_code)]
117 created_timestamp: AtomicU64,
119}
120
121pub struct OperationTimer {
123 #[allow(dead_code)]
124 operation_name: String,
126 start_time: Instant,
128 monitor: Option<Arc<PerformanceMonitor>>,
130 is_finished: AtomicBool,
132}
133
134#[derive(Debug, Clone)]
136enum MetricsEvent {
137 OperationCompleted {
139 #[allow(dead_code)]
140 operation_name: String,
142 #[allow(dead_code)]
143 duration_ns: u64,
145 #[allow(dead_code)]
146 is_error: bool,
148 #[allow(dead_code)]
149 timestamp: u64,
151 },
152 CacheHit {
154 #[allow(dead_code)]
155 cache_name: String,
157 #[allow(dead_code)]
158 lookup_time_ns: u64,
160 #[allow(dead_code)]
161 timestamp: u64,
163 },
164 CacheMiss {
166 #[allow(dead_code)]
167 cache_name: String,
169 #[allow(dead_code)]
170 lookup_time_ns: u64,
172 #[allow(dead_code)]
173 timestamp: u64,
175 },
176 CacheEviction {
178 #[allow(dead_code)]
179 cache_name: String,
181 #[allow(dead_code)]
182 timestamp: u64,
184 },
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct PerformanceSnapshot {
190 pub session_id: Option<String>,
192 pub started_at_timestamp: u64,
194 pub total_operations: u64,
196 pub total_errors: u64,
198 pub global_error_rate: f64,
200 pub active_operations: usize,
202 pub operations: Vec<OperationSnapshot>,
204 pub caches: Vec<CacheSnapshot>,
206 pub slow_operations: Vec<(String, f64)>,
208 pub error_prone_operations: Vec<(String, f64)>,
210 pub cache_issues: Vec<(String, String)>,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct OperationSnapshot {
217 pub operation_name: String,
219 pub total_calls: u64,
221 pub error_count: u64,
223 pub error_rate: f64,
225 pub avg_duration_ms: f64,
227 pub min_duration_ms: f64,
229 pub max_duration_ms: f64,
231 pub last_execution_timestamp: u64,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct CacheSnapshot {
238 pub cache_name: String,
240 pub total_requests: u64,
242 pub hits: u64,
244 pub misses: u64,
246 pub evictions: u64,
248 pub hit_rate: f64,
250 pub miss_rate: f64,
252 pub avg_lookup_time_ns: u64,
254 pub last_updated_timestamp: u64,
256}
257
258impl PerformanceMonitor {
259 pub fn new(session_id: Option<String>) -> Self {
261 let (sender, receiver) = bounded(10000); Self {
264 operations: DashMap::new(),
265 caches: DashMap::new(),
266 total_operations: AtomicU64::new(0),
267 total_errors: AtomicU64::new(0),
268 active_operations: AtomicUsize::new(0),
269 started_at_timestamp: AtomicU64::new(
270 SystemTime::now()
271 .duration_since(UNIX_EPOCH)
272 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
273 .as_secs(),
274 ),
275 session_id,
276 metrics_sender: sender,
277 metrics_receiver: receiver,
278 }
279 }
280
281 pub fn start_timer(&self, operation_name: &str) -> OperationTimer {
283 self.active_operations.fetch_add(1, Ordering::Relaxed);
284
285 OperationTimer {
286 operation_name: operation_name.to_string(),
287 start_time: Instant::now(),
288 monitor: None, is_finished: AtomicBool::new(false),
290 }
291 }
292
293 pub fn record_operation(&self, operation_name: &str, duration: Duration, is_error: bool) {
295 let duration_ns = duration.as_nanos() as u64;
296 let timestamp = SystemTime::now()
297 .duration_since(UNIX_EPOCH)
298 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
299 .as_secs();
300
301 self.total_operations.fetch_add(1, Ordering::Relaxed);
303 if is_error {
304 self.total_errors.fetch_add(1, Ordering::Relaxed);
305 }
306
307 let metrics = self
309 .operations
310 .entry(operation_name.to_string())
311 .or_insert_with(|| Arc::new(OperationMetrics::new(operation_name)))
312 .clone();
313
314 metrics.record_operation(duration_ns, timestamp, is_error);
316
317 let _ = self
319 .metrics_sender
320 .try_send(MetricsEvent::OperationCompleted {
321 operation_name: operation_name.to_string(),
322 duration_ns,
323 is_error,
324 timestamp,
325 });
326
327 debug!(
328 "Recorded operation '{}': {:.2}ms (error: {})",
329 operation_name,
330 duration_ns as f64 / 1_000_000.0,
331 is_error
332 );
333 }
334
335 pub fn record_cache_hit(&self, cache_name: &str, lookup_time: Duration) {
337 let lookup_time_ns = lookup_time.as_nanos() as u64;
338 let timestamp = SystemTime::now()
339 .duration_since(UNIX_EPOCH)
340 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
341 .as_secs();
342
343 let metrics = self
344 .caches
345 .entry(cache_name.to_string())
346 .or_insert_with(|| Arc::new(CacheMetrics::new(cache_name)))
347 .clone();
348
349 metrics.record_hit(lookup_time_ns, timestamp);
350
351 let _ = self.metrics_sender.try_send(MetricsEvent::CacheHit {
352 cache_name: cache_name.to_string(),
353 lookup_time_ns,
354 timestamp,
355 });
356 }
357
358 pub fn record_cache_miss(&self, cache_name: &str, lookup_time: Duration) {
360 let lookup_time_ns = lookup_time.as_nanos() as u64;
361 let timestamp = SystemTime::now()
362 .duration_since(UNIX_EPOCH)
363 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
364 .as_secs();
365
366 let metrics = self
367 .caches
368 .entry(cache_name.to_string())
369 .or_insert_with(|| Arc::new(CacheMetrics::new(cache_name)))
370 .clone();
371
372 metrics.record_miss(lookup_time_ns, timestamp);
373
374 let _ = self.metrics_sender.try_send(MetricsEvent::CacheMiss {
375 cache_name: cache_name.to_string(),
376 lookup_time_ns,
377 timestamp,
378 });
379 }
380
381 pub fn record_cache_eviction(&self, cache_name: &str) {
383 let timestamp = SystemTime::now()
384 .duration_since(UNIX_EPOCH)
385 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
386 .as_secs();
387
388 let metrics = self
389 .caches
390 .entry(cache_name.to_string())
391 .or_insert_with(|| Arc::new(CacheMetrics::new(cache_name)))
392 .clone();
393
394 metrics.record_eviction(timestamp);
395
396 let _ = self.metrics_sender.try_send(MetricsEvent::CacheEviction {
397 cache_name: cache_name.to_string(),
398 timestamp,
399 });
400 }
401
402 pub fn get_snapshot(&self) -> PerformanceSnapshot {
404 let total_ops = self.total_operations.load(Ordering::Relaxed);
405 let total_errors = self.total_errors.load(Ordering::Relaxed);
406 let global_error_rate = if total_ops > 0 {
407 total_errors as f64 / total_ops as f64 * 100.0
408 } else {
409 0.0
410 };
411
412 let mut operations = Vec::new();
414 let mut slow_operations = Vec::new();
415 let mut error_prone_operations = Vec::new();
416
417 for entry in self.operations.iter() {
418 let snapshot = entry.value().snapshot();
419
420 if snapshot.avg_duration_ms > 500.0 {
421 slow_operations.push((snapshot.operation_name.clone(), snapshot.avg_duration_ms));
422 }
423
424 if snapshot.error_rate > 5.0 {
425 error_prone_operations.push((snapshot.operation_name.clone(), snapshot.error_rate));
426 }
427
428 operations.push(snapshot);
429 }
430
431 let mut caches = Vec::new();
433 let mut cache_issues = Vec::new();
434
435 for entry in self.caches.iter() {
436 let snapshot = entry.value().snapshot();
437
438 if snapshot.hit_rate < 0.5 && snapshot.total_requests > 100 {
439 cache_issues.push((
440 snapshot.cache_name.clone(),
441 format!("Low hit rate: {:.1}%", snapshot.hit_rate * 100.0),
442 ));
443 }
444
445 caches.push(snapshot);
446 }
447
448 slow_operations.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
450 error_prone_operations
451 .sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
452
453 PerformanceSnapshot {
454 session_id: self.session_id.clone(),
455 started_at_timestamp: self.started_at_timestamp.load(Ordering::Relaxed),
456 total_operations: total_ops,
457 total_errors,
458 global_error_rate,
459 active_operations: self.active_operations.load(Ordering::Relaxed),
460 operations,
461 caches,
462 slow_operations: slow_operations.into_iter().take(5).collect(),
463 error_prone_operations: error_prone_operations.into_iter().take(5).collect(),
464 cache_issues: cache_issues.into_iter().take(3).collect(),
465 }
466 }
467
468 pub fn has_performance_issues(&self) -> bool {
470 let total_ops = self.total_operations.load(Ordering::Relaxed);
472 if total_ops > 100 {
473 let total_errors = self.total_errors.load(Ordering::Relaxed);
474 let error_rate = total_errors as f64 / total_ops as f64 * 100.0;
475 if error_rate > 10.0 {
476 return true;
477 }
478 }
479
480 for entry in self.operations.iter() {
482 if entry.value().is_problematic() {
483 return true;
484 }
485 }
486
487 for entry in self.caches.iter() {
489 if entry.value().has_issues() {
490 return true;
491 }
492 }
493
494 false
495 }
496
497 pub fn reset(&self) {
499 self.total_operations.store(0, Ordering::Relaxed);
500 self.total_errors.store(0, Ordering::Relaxed);
501 self.active_operations.store(0, Ordering::Relaxed);
502 self.started_at_timestamp.store(
503 SystemTime::now()
504 .duration_since(UNIX_EPOCH)
505 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
506 .as_secs(),
507 Ordering::Relaxed,
508 );
509
510 self.operations.clear();
511 self.caches.clear();
512
513 info!("Reset all performance metrics");
514 }
515
516 pub fn active_operations(&self) -> usize {
518 self.active_operations.load(Ordering::Relaxed)
519 }
520}
521
522impl OperationMetrics {
523 pub fn new(operation_name: &str) -> Self {
525 let now = SystemTime::now()
526 .duration_since(UNIX_EPOCH)
527 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
528 .as_secs();
529
530 Self {
531 operation_name: operation_name.to_string(),
532 total_calls: AtomicU64::new(0),
533 error_count: AtomicU64::new(0),
534 total_duration_ns: AtomicU64::new(0),
535 min_duration_ns: AtomicU64::new(u64::MAX),
536 max_duration_ns: AtomicU64::new(0),
537 last_execution_timestamp: AtomicU64::new(now),
538 avg_duration_ns: AtomicU64::new(0),
539 error_rate: AtomicF64::new(0.0),
540 recent_duration_sum: AtomicU64::new(0),
541 recent_operation_count: AtomicU64::new(0),
542 }
543 }
544
545 pub fn record_operation(&self, duration_ns: u64, timestamp: u64, is_error: bool) {
547 let total_calls = self.total_calls.fetch_add(1, Ordering::Relaxed) + 1;
549 let total_duration = self
550 .total_duration_ns
551 .fetch_add(duration_ns, Ordering::Relaxed)
552 + duration_ns;
553
554 if is_error {
555 self.error_count.fetch_add(1, Ordering::Relaxed);
556 }
557
558 self.last_execution_timestamp
559 .store(timestamp, Ordering::Relaxed);
560
561 let mut current_min = self.min_duration_ns.load(Ordering::Relaxed);
563 while current_min > duration_ns {
564 match self.min_duration_ns.compare_exchange_weak(
565 current_min,
566 duration_ns,
567 Ordering::Relaxed,
568 Ordering::Relaxed,
569 ) {
570 Ok(_) => break,
571 Err(new_min) => current_min = new_min,
572 }
573 }
574
575 let mut current_max = self.max_duration_ns.load(Ordering::Relaxed);
577 while current_max < duration_ns {
578 match self.max_duration_ns.compare_exchange_weak(
579 current_max,
580 duration_ns,
581 Ordering::Relaxed,
582 Ordering::Relaxed,
583 ) {
584 Ok(_) => break,
585 Err(new_max) => current_max = new_max,
586 }
587 }
588
589 let avg = total_duration / total_calls;
591 self.avg_duration_ns.store(avg, Ordering::Relaxed);
592
593 let error_count = self.error_count.load(Ordering::Relaxed);
595 let error_rate = (error_count as f64 / total_calls as f64) * 100.0;
596 self.error_rate.store(error_rate, Ordering::Relaxed);
597
598 let recent_count = self.recent_operation_count.load(Ordering::Relaxed);
600 if recent_count < 100 {
601 self.recent_duration_sum
602 .fetch_add(duration_ns, Ordering::Relaxed);
603 self.recent_operation_count.fetch_add(1, Ordering::Relaxed);
604 } else {
605 self.recent_duration_sum
607 .store(duration_ns, Ordering::Relaxed);
608 self.recent_operation_count.store(1, Ordering::Relaxed);
609 }
610 }
611
612 pub fn snapshot(&self) -> OperationSnapshot {
614 let total_calls = self.total_calls.load(Ordering::Relaxed);
615 let avg_duration_ns = if total_calls > 0 {
616 self.avg_duration_ns.load(Ordering::Relaxed)
617 } else {
618 0
619 };
620
621 OperationSnapshot {
622 operation_name: self.operation_name.clone(),
623 total_calls,
624 error_count: self.error_count.load(Ordering::Relaxed),
625 error_rate: self.error_rate.load(Ordering::Relaxed),
626 avg_duration_ms: avg_duration_ns as f64 / 1_000_000.0,
627 min_duration_ms: self.min_duration_ns.load(Ordering::Relaxed) as f64 / 1_000_000.0,
628 max_duration_ms: self.max_duration_ns.load(Ordering::Relaxed) as f64 / 1_000_000.0,
629 last_execution_timestamp: self.last_execution_timestamp.load(Ordering::Relaxed),
630 }
631 }
632
633 pub fn is_problematic(&self) -> bool {
635 let error_rate = self.error_rate.load(Ordering::Relaxed);
636 let avg_duration_ns = self.avg_duration_ns.load(Ordering::Relaxed);
637 let max_duration_ns = self.max_duration_ns.load(Ordering::Relaxed);
638
639 error_rate > 10.0 ||
640 avg_duration_ns > 2_000_000_000 || max_duration_ns > 30_000_000_000 }
643}
644
645impl CacheMetrics {
646 pub fn new(cache_name: &str) -> Self {
648 let now = SystemTime::now()
649 .duration_since(UNIX_EPOCH)
650 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
651 .as_secs();
652
653 Self {
654 cache_name: cache_name.to_string(),
655 total_requests: AtomicU64::new(0),
656 hits: AtomicU64::new(0),
657 misses: AtomicU64::new(0),
658 evictions: AtomicU64::new(0),
659 total_lookup_time_ns: AtomicU64::new(0),
660 avg_lookup_time_ns: AtomicU64::new(0),
661 hit_rate: AtomicF64::new(0.0),
662 miss_rate: AtomicF64::new(0.0),
663 last_updated_timestamp: AtomicU64::new(now),
664 created_timestamp: AtomicU64::new(now),
665 }
666 }
667
668 pub fn record_hit(&self, lookup_time_ns: u64, timestamp: u64) {
670 let total_requests = self.total_requests.fetch_add(1, Ordering::Relaxed) + 1;
671 let hits = self.hits.fetch_add(1, Ordering::Relaxed) + 1;
672 let total_lookup_time = self
673 .total_lookup_time_ns
674 .fetch_add(lookup_time_ns, Ordering::Relaxed)
675 + lookup_time_ns;
676
677 self.last_updated_timestamp
678 .store(timestamp, Ordering::Relaxed);
679
680 let hit_rate = hits as f64 / total_requests as f64;
682 self.hit_rate.store(hit_rate, Ordering::Relaxed);
683 self.miss_rate.store(1.0 - hit_rate, Ordering::Relaxed);
684
685 let avg_lookup = total_lookup_time / total_requests;
687 self.avg_lookup_time_ns.store(avg_lookup, Ordering::Relaxed);
688 }
689
690 pub fn record_miss(&self, lookup_time_ns: u64, timestamp: u64) {
692 let total_requests = self.total_requests.fetch_add(1, Ordering::Relaxed) + 1;
693 let total_lookup_time = self
694 .total_lookup_time_ns
695 .fetch_add(lookup_time_ns, Ordering::Relaxed)
696 + lookup_time_ns;
697
698 self.last_updated_timestamp
699 .store(timestamp, Ordering::Relaxed);
700
701 let hits = self.hits.load(Ordering::Relaxed);
703 let hit_rate = hits as f64 / total_requests as f64;
704 self.hit_rate.store(hit_rate, Ordering::Relaxed);
705 self.miss_rate.store(1.0 - hit_rate, Ordering::Relaxed);
706
707 let avg_lookup = total_lookup_time / total_requests;
709 self.avg_lookup_time_ns.store(avg_lookup, Ordering::Relaxed);
710 }
711
712 pub fn record_eviction(&self, timestamp: u64) {
714 self.evictions.fetch_add(1, Ordering::Relaxed);
715 self.last_updated_timestamp
716 .store(timestamp, Ordering::Relaxed);
717 }
718
719 pub fn snapshot(&self) -> CacheSnapshot {
721 CacheSnapshot {
722 cache_name: self.cache_name.clone(),
723 total_requests: self.total_requests.load(Ordering::Relaxed),
724 hits: self.hits.load(Ordering::Relaxed),
725 misses: self.misses.load(Ordering::Relaxed),
726 evictions: self.evictions.load(Ordering::Relaxed),
727 hit_rate: self.hit_rate.load(Ordering::Relaxed),
728 miss_rate: self.miss_rate.load(Ordering::Relaxed),
729 avg_lookup_time_ns: self.avg_lookup_time_ns.load(Ordering::Relaxed),
730 last_updated_timestamp: self.last_updated_timestamp.load(Ordering::Relaxed),
731 }
732 }
733
734 pub fn has_issues(&self) -> bool {
736 let hit_rate = self.hit_rate.load(Ordering::Relaxed);
737 let avg_lookup_ns = self.avg_lookup_time_ns.load(Ordering::Relaxed);
738 let total_requests = self.total_requests.load(Ordering::Relaxed);
739
740 (hit_rate < 0.3 && total_requests > 100) || avg_lookup_ns > 1_000_000 }
742}
743
744impl OperationTimer {
745 pub fn finish_with_error(self) {
747 if !self.is_finished.load(Ordering::Relaxed) {
748 self.is_finished.store(true, Ordering::Relaxed);
749 let _duration = self.start_time.elapsed();
750 if let Some(monitor) = &self.monitor {
752 monitor.active_operations.fetch_sub(1, Ordering::Relaxed);
753 }
754 }
755 }
756
757 pub fn current_duration(&self) -> Duration {
759 self.start_time.elapsed()
760 }
761
762 pub fn finish(self) {
764 }
766}
767
768impl Drop for OperationTimer {
769 fn drop(&mut self) {
770 if !self.is_finished.load(Ordering::Relaxed) {
772 let _duration = self.start_time.elapsed();
773 if let Some(monitor) = &self.monitor {
776 monitor.active_operations.fetch_sub(1, Ordering::Relaxed);
777 }
778 }
779 }
780}
781
782static GLOBAL_MONITOR: std::sync::OnceLock<Arc<PerformanceMonitor>> = std::sync::OnceLock::new();
784
785pub fn init_monitoring(session_id: Option<String>) {
787 let monitor = Arc::new(PerformanceMonitor::new(session_id));
788 let _ = GLOBAL_MONITOR.set(monitor);
789}
790
791pub fn get_monitor() -> Option<&'static Arc<PerformanceMonitor>> {
793 GLOBAL_MONITOR.get()
794}
795
796pub fn start_timer(operation_name: &str) -> Option<OperationTimer> {
798 get_monitor().map(|monitor| monitor.start_timer(operation_name))
799}
800
801#[macro_export]
803macro_rules! time_operation {
804 ($operation:expr, $code:block) => {{
805 let _timer = $crate::performance::start_timer($operation);
806 $code
807 }};
808}