1use atomic_float::AtomicF64;
21use crossbeam_channel::{bounded, Receiver, Sender};
22use dashmap::DashMap;
23use serde::{Deserialize, Serialize};
24use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
25use std::sync::Arc;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27use tracing::{debug, info};
28
29#[derive(Debug)]
31pub struct PerformanceMonitor {
32 operations: DashMap<String, Arc<OperationMetrics>>,
34
35 caches: DashMap<String, Arc<CacheMetrics>>,
37
38 total_operations: AtomicU64,
41 total_errors: AtomicU64,
43 active_operations: AtomicUsize,
45 started_at_timestamp: AtomicU64,
47
48 session_id: Option<String>,
50
51 metrics_sender: Sender<MetricsEvent>,
53 #[allow(dead_code)]
54 metrics_receiver: Receiver<MetricsEvent>,
55}
56
57#[derive(Debug)]
59pub struct OperationMetrics {
60 operation_name: String,
62
63 total_calls: AtomicU64,
65 error_count: AtomicU64,
67
68 total_duration_ns: AtomicU64,
70 min_duration_ns: AtomicU64,
72 max_duration_ns: AtomicU64,
74
75 last_execution_timestamp: AtomicU64,
77
78 avg_duration_ns: AtomicU64,
80 error_rate: AtomicF64,
82
83 recent_duration_sum: AtomicU64,
85 recent_operation_count: AtomicU64,
87}
88
89#[derive(Debug)]
91pub struct CacheMetrics {
92 cache_name: String,
94
95 total_requests: AtomicU64,
97 hits: AtomicU64,
99 misses: AtomicU64,
101 evictions: AtomicU64,
103
104 total_lookup_time_ns: AtomicU64,
106 avg_lookup_time_ns: AtomicU64,
108
109 hit_rate: AtomicF64,
111 miss_rate: AtomicF64,
113
114 last_updated_timestamp: AtomicU64,
116 #[allow(dead_code)]
117 created_timestamp: AtomicU64,
119}
120
121pub struct OperationTimer {
123 #[allow(dead_code)]
124 operation_name: String,
126 start_time: Instant,
128 monitor: Option<Arc<PerformanceMonitor>>,
130 is_finished: AtomicBool,
132}
133
134#[derive(Debug, Clone)]
136enum MetricsEvent {
137 OperationCompleted {
139 #[allow(dead_code)]
140 operation_name: String,
142 #[allow(dead_code)]
143 duration_ns: u64,
145 #[allow(dead_code)]
146 is_error: bool,
148 #[allow(dead_code)]
149 timestamp: u64,
151 },
152 CacheHit {
154 #[allow(dead_code)]
155 cache_name: String,
157 #[allow(dead_code)]
158 lookup_time_ns: u64,
160 #[allow(dead_code)]
161 timestamp: u64,
163 },
164 CacheMiss {
166 #[allow(dead_code)]
167 cache_name: String,
169 #[allow(dead_code)]
170 lookup_time_ns: u64,
172 #[allow(dead_code)]
173 timestamp: u64,
175 },
176 CacheEviction {
178 #[allow(dead_code)]
179 cache_name: String,
181 #[allow(dead_code)]
182 timestamp: u64,
184 },
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct PerformanceSnapshot {
190 pub session_id: Option<String>,
192 pub started_at_timestamp: u64,
194 pub total_operations: u64,
196 pub total_errors: u64,
198 pub global_error_rate: f64,
200 pub active_operations: usize,
202 pub operations: Vec<OperationSnapshot>,
204 pub caches: Vec<CacheSnapshot>,
206 pub slow_operations: Vec<(String, f64)>,
208 pub error_prone_operations: Vec<(String, f64)>,
210 pub cache_issues: Vec<(String, String)>,
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct OperationSnapshot {
217 pub operation_name: String,
219 pub total_calls: u64,
221 pub error_count: u64,
223 pub error_rate: f64,
225 pub avg_duration_ms: f64,
227 pub min_duration_ms: f64,
229 pub max_duration_ms: f64,
231 pub last_execution_timestamp: u64,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct CacheSnapshot {
238 pub cache_name: String,
240 pub total_requests: u64,
242 pub hits: u64,
244 pub misses: u64,
246 pub evictions: u64,
248 pub hit_rate: f64,
250 pub miss_rate: f64,
252 pub avg_lookup_time_ns: u64,
254 pub last_updated_timestamp: u64,
256}
257
258impl PerformanceMonitor {
259 pub fn new(session_id: Option<String>) -> Self {
261 let (sender, receiver) = bounded(10000); Self {
264 operations: DashMap::new(),
265 caches: DashMap::new(),
266 total_operations: AtomicU64::new(0),
267 total_errors: AtomicU64::new(0),
268 active_operations: AtomicUsize::new(0),
269 started_at_timestamp: AtomicU64::new(
270 SystemTime::now()
271 .duration_since(UNIX_EPOCH)
272 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
273 .as_secs(),
274 ),
275 session_id,
276 metrics_sender: sender,
277 metrics_receiver: receiver,
278 }
279 }
280
281 pub fn start_timer(&self, operation_name: &str) -> OperationTimer {
283 self.active_operations.fetch_add(1, Ordering::Relaxed);
284
285 OperationTimer {
286 operation_name: operation_name.to_string(),
287 start_time: Instant::now(),
288 monitor: None, is_finished: AtomicBool::new(false),
290 }
291 }
292
293 pub fn record_operation(&self, operation_name: &str, duration: Duration, is_error: bool) {
295 let duration_ns = duration.as_nanos() as u64;
296 let timestamp = SystemTime::now()
297 .duration_since(UNIX_EPOCH)
298 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
299 .as_secs();
300
301 self.total_operations.fetch_add(1, Ordering::Relaxed);
303 if is_error {
304 self.total_errors.fetch_add(1, Ordering::Relaxed);
305 }
306
307 let metrics = self
309 .operations
310 .entry(operation_name.to_string())
311 .or_insert_with(|| Arc::new(OperationMetrics::new(operation_name)))
312 .clone();
313
314 metrics.record_operation(duration_ns, timestamp, is_error);
316
317 let _ = self
319 .metrics_sender
320 .try_send(MetricsEvent::OperationCompleted {
321 operation_name: operation_name.to_string(),
322 duration_ns,
323 is_error,
324 timestamp,
325 });
326
327 debug!(
328 "Recorded operation '{}': {:.2}ms (error: {})",
329 operation_name,
330 duration_ns as f64 / 1_000_000.0,
331 is_error
332 );
333 }
334
335 pub fn record_cache_hit(&self, cache_name: &str, lookup_time: Duration) {
337 let lookup_time_ns = lookup_time.as_nanos() as u64;
338 let timestamp = SystemTime::now()
339 .duration_since(UNIX_EPOCH)
340 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
341 .as_secs();
342
343 let metrics = self
344 .caches
345 .entry(cache_name.to_string())
346 .or_insert_with(|| Arc::new(CacheMetrics::new(cache_name)))
347 .clone();
348
349 metrics.record_hit(lookup_time_ns, timestamp);
350
351 let _ = self.metrics_sender.try_send(MetricsEvent::CacheHit {
352 cache_name: cache_name.to_string(),
353 lookup_time_ns,
354 timestamp,
355 });
356 }
357
358 pub fn record_cache_miss(&self, cache_name: &str, lookup_time: Duration) {
360 let lookup_time_ns = lookup_time.as_nanos() as u64;
361 let timestamp = SystemTime::now()
362 .duration_since(UNIX_EPOCH)
363 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
364 .as_secs();
365
366 let metrics = self
367 .caches
368 .entry(cache_name.to_string())
369 .or_insert_with(|| Arc::new(CacheMetrics::new(cache_name)))
370 .clone();
371
372 metrics.record_miss(lookup_time_ns, timestamp);
373
374 let _ = self.metrics_sender.try_send(MetricsEvent::CacheMiss {
375 cache_name: cache_name.to_string(),
376 lookup_time_ns,
377 timestamp,
378 });
379 }
380
381 pub fn record_cache_eviction(&self, cache_name: &str) {
383 let timestamp = SystemTime::now()
384 .duration_since(UNIX_EPOCH)
385 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
386 .as_secs();
387
388 let metrics = self
389 .caches
390 .entry(cache_name.to_string())
391 .or_insert_with(|| Arc::new(CacheMetrics::new(cache_name)))
392 .clone();
393
394 metrics.record_eviction(timestamp);
395
396 let _ = self.metrics_sender.try_send(MetricsEvent::CacheEviction {
397 cache_name: cache_name.to_string(),
398 timestamp,
399 });
400 }
401
402 pub fn get_snapshot(&self) -> PerformanceSnapshot {
404 let total_ops = self.total_operations.load(Ordering::Relaxed);
405 let total_errors = self.total_errors.load(Ordering::Relaxed);
406 let global_error_rate = if total_ops > 0 {
407 total_errors as f64 / total_ops as f64 * 100.0
408 } else {
409 0.0
410 };
411
412 let mut operations = Vec::new();
414 let mut slow_operations = Vec::new();
415 let mut error_prone_operations = Vec::new();
416
417 for entry in self.operations.iter() {
418 let snapshot = entry.value().snapshot();
419
420 if snapshot.avg_duration_ms > 500.0 {
421 slow_operations.push((snapshot.operation_name.clone(), snapshot.avg_duration_ms));
422 }
423
424 if snapshot.error_rate > 5.0 {
425 error_prone_operations.push((snapshot.operation_name.clone(), snapshot.error_rate));
426 }
427
428 operations.push(snapshot);
429 }
430
431 let mut caches = Vec::new();
433 let mut cache_issues = Vec::new();
434
435 for entry in self.caches.iter() {
436 let snapshot = entry.value().snapshot();
437
438 if snapshot.hit_rate < 0.5 && snapshot.total_requests > 100 {
439 cache_issues.push((
440 snapshot.cache_name.clone(),
441 format!("Low hit rate: {:.1}%", snapshot.hit_rate * 100.0),
442 ));
443 }
444
445 caches.push(snapshot);
446 }
447
448 slow_operations.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
450 error_prone_operations.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
451
452 PerformanceSnapshot {
453 session_id: self.session_id.clone(),
454 started_at_timestamp: self.started_at_timestamp.load(Ordering::Relaxed),
455 total_operations: total_ops,
456 total_errors,
457 global_error_rate,
458 active_operations: self.active_operations.load(Ordering::Relaxed),
459 operations,
460 caches,
461 slow_operations: slow_operations.into_iter().take(5).collect(),
462 error_prone_operations: error_prone_operations.into_iter().take(5).collect(),
463 cache_issues: cache_issues.into_iter().take(3).collect(),
464 }
465 }
466
467 pub fn has_performance_issues(&self) -> bool {
469 let total_ops = self.total_operations.load(Ordering::Relaxed);
471 if total_ops > 100 {
472 let total_errors = self.total_errors.load(Ordering::Relaxed);
473 let error_rate = total_errors as f64 / total_ops as f64 * 100.0;
474 if error_rate > 10.0 {
475 return true;
476 }
477 }
478
479 for entry in self.operations.iter() {
481 if entry.value().is_problematic() {
482 return true;
483 }
484 }
485
486 for entry in self.caches.iter() {
488 if entry.value().has_issues() {
489 return true;
490 }
491 }
492
493 false
494 }
495
496 pub fn reset(&self) {
498 self.total_operations.store(0, Ordering::Relaxed);
499 self.total_errors.store(0, Ordering::Relaxed);
500 self.active_operations.store(0, Ordering::Relaxed);
501 self.started_at_timestamp.store(
502 SystemTime::now()
503 .duration_since(UNIX_EPOCH)
504 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
505 .as_secs(),
506 Ordering::Relaxed,
507 );
508
509 self.operations.clear();
510 self.caches.clear();
511
512 info!("Reset all performance metrics");
513 }
514
515 pub fn active_operations(&self) -> usize {
517 self.active_operations.load(Ordering::Relaxed)
518 }
519}
520
521impl OperationMetrics {
522 pub fn new(operation_name: &str) -> Self {
524 let now = SystemTime::now()
525 .duration_since(UNIX_EPOCH)
526 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
527 .as_secs();
528
529 Self {
530 operation_name: operation_name.to_string(),
531 total_calls: AtomicU64::new(0),
532 error_count: AtomicU64::new(0),
533 total_duration_ns: AtomicU64::new(0),
534 min_duration_ns: AtomicU64::new(u64::MAX),
535 max_duration_ns: AtomicU64::new(0),
536 last_execution_timestamp: AtomicU64::new(now),
537 avg_duration_ns: AtomicU64::new(0),
538 error_rate: AtomicF64::new(0.0),
539 recent_duration_sum: AtomicU64::new(0),
540 recent_operation_count: AtomicU64::new(0),
541 }
542 }
543
544 pub fn record_operation(&self, duration_ns: u64, timestamp: u64, is_error: bool) {
546 let total_calls = self.total_calls.fetch_add(1, Ordering::Relaxed) + 1;
548 let total_duration = self
549 .total_duration_ns
550 .fetch_add(duration_ns, Ordering::Relaxed)
551 + duration_ns;
552
553 if is_error {
554 self.error_count.fetch_add(1, Ordering::Relaxed);
555 }
556
557 self.last_execution_timestamp
558 .store(timestamp, Ordering::Relaxed);
559
560 let mut current_min = self.min_duration_ns.load(Ordering::Relaxed);
562 while current_min > duration_ns {
563 match self.min_duration_ns.compare_exchange_weak(
564 current_min,
565 duration_ns,
566 Ordering::Relaxed,
567 Ordering::Relaxed,
568 ) {
569 Ok(_) => break,
570 Err(new_min) => current_min = new_min,
571 }
572 }
573
574 let mut current_max = self.max_duration_ns.load(Ordering::Relaxed);
576 while current_max < duration_ns {
577 match self.max_duration_ns.compare_exchange_weak(
578 current_max,
579 duration_ns,
580 Ordering::Relaxed,
581 Ordering::Relaxed,
582 ) {
583 Ok(_) => break,
584 Err(new_max) => current_max = new_max,
585 }
586 }
587
588 let avg = total_duration / total_calls;
590 self.avg_duration_ns.store(avg, Ordering::Relaxed);
591
592 let error_count = self.error_count.load(Ordering::Relaxed);
594 let error_rate = (error_count as f64 / total_calls as f64) * 100.0;
595 self.error_rate.store(error_rate, Ordering::Relaxed);
596
597 let recent_count = self.recent_operation_count.load(Ordering::Relaxed);
599 if recent_count < 100 {
600 self.recent_duration_sum
601 .fetch_add(duration_ns, Ordering::Relaxed);
602 self.recent_operation_count.fetch_add(1, Ordering::Relaxed);
603 } else {
604 self.recent_duration_sum
606 .store(duration_ns, Ordering::Relaxed);
607 self.recent_operation_count.store(1, Ordering::Relaxed);
608 }
609 }
610
611 pub fn snapshot(&self) -> OperationSnapshot {
613 let total_calls = self.total_calls.load(Ordering::Relaxed);
614 let avg_duration_ns = if total_calls > 0 {
615 self.avg_duration_ns.load(Ordering::Relaxed)
616 } else {
617 0
618 };
619
620 OperationSnapshot {
621 operation_name: self.operation_name.clone(),
622 total_calls,
623 error_count: self.error_count.load(Ordering::Relaxed),
624 error_rate: self.error_rate.load(Ordering::Relaxed),
625 avg_duration_ms: avg_duration_ns as f64 / 1_000_000.0,
626 min_duration_ms: self.min_duration_ns.load(Ordering::Relaxed) as f64 / 1_000_000.0,
627 max_duration_ms: self.max_duration_ns.load(Ordering::Relaxed) as f64 / 1_000_000.0,
628 last_execution_timestamp: self.last_execution_timestamp.load(Ordering::Relaxed),
629 }
630 }
631
632 pub fn is_problematic(&self) -> bool {
634 let error_rate = self.error_rate.load(Ordering::Relaxed);
635 let avg_duration_ns = self.avg_duration_ns.load(Ordering::Relaxed);
636 let max_duration_ns = self.max_duration_ns.load(Ordering::Relaxed);
637
638 error_rate > 10.0 ||
639 avg_duration_ns > 2_000_000_000 || max_duration_ns > 30_000_000_000 }
642}
643
644impl CacheMetrics {
645 pub fn new(cache_name: &str) -> Self {
647 let now = SystemTime::now()
648 .duration_since(UNIX_EPOCH)
649 .unwrap_or_else(|_| std::time::Duration::from_secs(0))
650 .as_secs();
651
652 Self {
653 cache_name: cache_name.to_string(),
654 total_requests: AtomicU64::new(0),
655 hits: AtomicU64::new(0),
656 misses: AtomicU64::new(0),
657 evictions: AtomicU64::new(0),
658 total_lookup_time_ns: AtomicU64::new(0),
659 avg_lookup_time_ns: AtomicU64::new(0),
660 hit_rate: AtomicF64::new(0.0),
661 miss_rate: AtomicF64::new(0.0),
662 last_updated_timestamp: AtomicU64::new(now),
663 created_timestamp: AtomicU64::new(now),
664 }
665 }
666
667 pub fn record_hit(&self, lookup_time_ns: u64, timestamp: u64) {
669 let total_requests = self.total_requests.fetch_add(1, Ordering::Relaxed) + 1;
670 let hits = self.hits.fetch_add(1, Ordering::Relaxed) + 1;
671 let total_lookup_time = self
672 .total_lookup_time_ns
673 .fetch_add(lookup_time_ns, Ordering::Relaxed)
674 + lookup_time_ns;
675
676 self.last_updated_timestamp
677 .store(timestamp, Ordering::Relaxed);
678
679 let hit_rate = hits as f64 / total_requests as f64;
681 self.hit_rate.store(hit_rate, Ordering::Relaxed);
682 self.miss_rate.store(1.0 - hit_rate, Ordering::Relaxed);
683
684 let avg_lookup = total_lookup_time / total_requests;
686 self.avg_lookup_time_ns.store(avg_lookup, Ordering::Relaxed);
687 }
688
689 pub fn record_miss(&self, lookup_time_ns: u64, timestamp: u64) {
691 let total_requests = self.total_requests.fetch_add(1, Ordering::Relaxed) + 1;
692 let total_lookup_time = self
693 .total_lookup_time_ns
694 .fetch_add(lookup_time_ns, Ordering::Relaxed)
695 + lookup_time_ns;
696
697 self.last_updated_timestamp
698 .store(timestamp, Ordering::Relaxed);
699
700 let hits = self.hits.load(Ordering::Relaxed);
702 let hit_rate = hits as f64 / total_requests as f64;
703 self.hit_rate.store(hit_rate, Ordering::Relaxed);
704 self.miss_rate.store(1.0 - hit_rate, Ordering::Relaxed);
705
706 let avg_lookup = total_lookup_time / total_requests;
708 self.avg_lookup_time_ns.store(avg_lookup, Ordering::Relaxed);
709 }
710
711 pub fn record_eviction(&self, timestamp: u64) {
713 self.evictions.fetch_add(1, Ordering::Relaxed);
714 self.last_updated_timestamp
715 .store(timestamp, Ordering::Relaxed);
716 }
717
718 pub fn snapshot(&self) -> CacheSnapshot {
720 CacheSnapshot {
721 cache_name: self.cache_name.clone(),
722 total_requests: self.total_requests.load(Ordering::Relaxed),
723 hits: self.hits.load(Ordering::Relaxed),
724 misses: self.misses.load(Ordering::Relaxed),
725 evictions: self.evictions.load(Ordering::Relaxed),
726 hit_rate: self.hit_rate.load(Ordering::Relaxed),
727 miss_rate: self.miss_rate.load(Ordering::Relaxed),
728 avg_lookup_time_ns: self.avg_lookup_time_ns.load(Ordering::Relaxed),
729 last_updated_timestamp: self.last_updated_timestamp.load(Ordering::Relaxed),
730 }
731 }
732
733 pub fn has_issues(&self) -> bool {
735 let hit_rate = self.hit_rate.load(Ordering::Relaxed);
736 let avg_lookup_ns = self.avg_lookup_time_ns.load(Ordering::Relaxed);
737 let total_requests = self.total_requests.load(Ordering::Relaxed);
738
739 (hit_rate < 0.3 && total_requests > 100) || avg_lookup_ns > 1_000_000 }
741}
742
743impl OperationTimer {
744 pub fn finish_with_error(self) {
746 if !self.is_finished.load(Ordering::Relaxed) {
747 self.is_finished.store(true, Ordering::Relaxed);
748 let _duration = self.start_time.elapsed();
749 if let Some(monitor) = &self.monitor {
751 monitor.active_operations.fetch_sub(1, Ordering::Relaxed);
752 }
753 }
754 }
755
756 pub fn current_duration(&self) -> Duration {
758 self.start_time.elapsed()
759 }
760
761 pub fn finish(self) {
763 }
765}
766
767impl Drop for OperationTimer {
768 fn drop(&mut self) {
769 if !self.is_finished.load(Ordering::Relaxed) {
771 let _duration = self.start_time.elapsed();
772 if let Some(monitor) = &self.monitor {
775 monitor.active_operations.fetch_sub(1, Ordering::Relaxed);
776 }
777 }
778 }
779}
780
781static GLOBAL_MONITOR: std::sync::OnceLock<Arc<PerformanceMonitor>> =
783 std::sync::OnceLock::new();
784
785pub fn init_monitoring(session_id: Option<String>) {
787 let monitor = Arc::new(PerformanceMonitor::new(session_id));
788 let _ = GLOBAL_MONITOR.set(monitor);
789}
790
791pub fn get_monitor() -> Option<&'static Arc<PerformanceMonitor>> {
793 GLOBAL_MONITOR.get()
794}
795
796pub fn start_timer(operation_name: &str) -> Option<OperationTimer> {
798 get_monitor().map(|monitor| monitor.start_timer(operation_name))
799}
800
801#[macro_export]
803macro_rules! time_operation {
804 ($operation:expr, $code:block) => {{
805 let _timer = $crate::performance::start_timer($operation);
806 $code
807 }};
808}