1use anyhow::{anyhow, Result};
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31use std::collections::{HashMap, VecDeque};
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::sync::RwLock;
36use tracing::info;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProfilerConfig {
41 pub enable_cpu_profiling: bool,
43 pub enable_memory_profiling: bool,
45 pub enable_latency_tracking: bool,
47 pub enable_throughput_tracking: bool,
49 pub sampling_interval: Duration,
51 pub history_size: usize,
53 pub enable_recommendations: bool,
55 pub percentiles: Vec<f64>,
57 pub warning_thresholds: WarningThresholds,
59 pub enable_flame_graph: bool,
61 pub max_span_depth: usize,
63}
64
65impl Default for ProfilerConfig {
66 fn default() -> Self {
67 Self {
68 enable_cpu_profiling: true,
69 enable_memory_profiling: true,
70 enable_latency_tracking: true,
71 enable_throughput_tracking: true,
72 sampling_interval: Duration::from_secs(1),
73 history_size: 3600, enable_recommendations: true,
75 percentiles: vec![50.0, 90.0, 95.0, 99.0, 99.9],
76 warning_thresholds: WarningThresholds::default(),
77 enable_flame_graph: false,
78 max_span_depth: 100,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct WarningThresholds {
86 pub cpu_usage_percent: f64,
88 pub memory_usage_percent: f64,
90 pub p99_latency_us: u64,
92 pub min_throughput: f64,
94 pub buffer_usage_percent: f64,
96}
97
98impl Default for WarningThresholds {
99 fn default() -> Self {
100 Self {
101 cpu_usage_percent: 80.0,
102 memory_usage_percent: 85.0,
103 p99_latency_us: 10000, min_throughput: 1000.0,
105 buffer_usage_percent: 90.0,
106 }
107 }
108}
109
110#[derive(Debug, Clone)]
112pub struct Span {
113 pub name: String,
115 pub start: Instant,
117 pub end: Option<Instant>,
119 pub parent_id: Option<u64>,
121 pub id: u64,
123 pub tags: HashMap<String, String>,
125 pub children: Vec<u64>,
127}
128
129impl Span {
130 pub fn new(name: &str, id: u64) -> Self {
132 Self {
133 name: name.to_string(),
134 start: Instant::now(),
135 end: None,
136 parent_id: None,
137 id,
138 tags: HashMap::new(),
139 children: Vec::new(),
140 }
141 }
142
143 pub fn finish(&mut self) {
145 self.end = Some(Instant::now());
146 }
147
148 pub fn duration(&self) -> Duration {
150 if let Some(end) = self.end {
151 end.duration_since(self.start)
152 } else {
153 self.start.elapsed()
154 }
155 }
156
157 pub fn tag(&mut self, key: &str, value: &str) {
159 self.tags.insert(key.to_string(), value.to_string());
160 }
161}
162
163pub struct LatencyHistogram {
165 buckets: Vec<(u64, AtomicU64)>, total: AtomicU64,
169 sum: AtomicU64,
171 max: AtomicU64,
173 min: AtomicU64,
175}
176
177impl LatencyHistogram {
178 pub fn new() -> Self {
180 let bucket_bounds = vec![
182 1,
183 10,
184 50,
185 100,
186 500,
187 1000,
188 5000,
189 10000,
190 50000,
191 100000,
192 500000,
193 1000000,
194 u64::MAX,
195 ];
196
197 let buckets = bucket_bounds
198 .into_iter()
199 .map(|b| (b, AtomicU64::new(0)))
200 .collect();
201
202 Self {
203 buckets,
204 total: AtomicU64::new(0),
205 sum: AtomicU64::new(0),
206 max: AtomicU64::new(0),
207 min: AtomicU64::new(u64::MAX),
208 }
209 }
210
211 pub fn record(&self, latency_us: u64) {
213 self.total.fetch_add(1, Ordering::Relaxed);
214 self.sum.fetch_add(latency_us, Ordering::Relaxed);
215
216 let mut current_max = self.max.load(Ordering::Relaxed);
218 while latency_us > current_max {
219 match self.max.compare_exchange_weak(
220 current_max,
221 latency_us,
222 Ordering::SeqCst,
223 Ordering::Relaxed,
224 ) {
225 Ok(_) => break,
226 Err(v) => current_max = v,
227 }
228 }
229
230 let mut current_min = self.min.load(Ordering::Relaxed);
232 while latency_us < current_min {
233 match self.min.compare_exchange_weak(
234 current_min,
235 latency_us,
236 Ordering::SeqCst,
237 Ordering::Relaxed,
238 ) {
239 Ok(_) => break,
240 Err(v) => current_min = v,
241 }
242 }
243
244 for (bound, count) in &self.buckets {
246 if latency_us <= *bound {
247 count.fetch_add(1, Ordering::Relaxed);
248 break;
249 }
250 }
251 }
252
253 pub fn percentile(&self, p: f64) -> u64 {
255 let total = self.total.load(Ordering::Relaxed);
256 if total == 0 {
257 return 0;
258 }
259
260 let target = ((total as f64) * p / 100.0) as u64;
261 let mut cumulative = 0u64;
262
263 for (bound, count) in &self.buckets {
264 cumulative += count.load(Ordering::Relaxed);
265 if cumulative >= target {
266 return *bound;
267 }
268 }
269
270 self.max.load(Ordering::Relaxed)
271 }
272
273 pub fn mean(&self) -> f64 {
275 let total = self.total.load(Ordering::Relaxed);
276 if total == 0 {
277 return 0.0;
278 }
279 self.sum.load(Ordering::Relaxed) as f64 / total as f64
280 }
281
282 pub fn stats(&self) -> HistogramStats {
284 HistogramStats {
285 count: self.total.load(Ordering::Relaxed),
286 mean: self.mean(),
287 min: self.min.load(Ordering::Relaxed),
288 max: self.max.load(Ordering::Relaxed),
289 p50: self.percentile(50.0),
290 p90: self.percentile(90.0),
291 p95: self.percentile(95.0),
292 p99: self.percentile(99.0),
293 p999: self.percentile(99.9),
294 }
295 }
296
297 pub fn reset(&self) {
299 self.total.store(0, Ordering::Relaxed);
300 self.sum.store(0, Ordering::Relaxed);
301 self.max.store(0, Ordering::Relaxed);
302 self.min.store(u64::MAX, Ordering::Relaxed);
303 for (_, count) in &self.buckets {
304 count.store(0, Ordering::Relaxed);
305 }
306 }
307}
308
309impl Default for LatencyHistogram {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct HistogramStats {
318 pub count: u64,
319 pub mean: f64,
320 pub min: u64,
321 pub max: u64,
322 pub p50: u64,
323 pub p90: u64,
324 pub p95: u64,
325 pub p99: u64,
326 pub p999: u64,
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct PerformanceSample {
332 pub timestamp: DateTime<Utc>,
334 pub cpu_usage_percent: f64,
336 pub memory_usage_bytes: u64,
338 pub memory_usage_percent: f64,
340 pub events_per_second: f64,
342 pub bytes_per_second: u64,
344 pub active_operations: u64,
346 pub p99_latency_us: u64,
348 pub buffer_usage_percent: f64,
350}
351
352pub struct OperationTimer {
354 name: String,
356 start: Instant,
358 tags: HashMap<String, String>,
360}
361
362impl OperationTimer {
363 pub fn new(name: &str) -> Self {
365 Self {
366 name: name.to_string(),
367 start: Instant::now(),
368 tags: HashMap::new(),
369 }
370 }
371
372 pub fn tag(mut self, key: &str, value: &str) -> Self {
374 self.tags.insert(key.to_string(), value.to_string());
375 self
376 }
377
378 pub fn elapsed(&self) -> Duration {
380 self.start.elapsed()
381 }
382}
383
384pub struct PerformanceProfiler {
386 config: ProfilerConfig,
388 running: Arc<AtomicBool>,
390 latency_histogram: Arc<LatencyHistogram>,
392 spans: Arc<RwLock<HashMap<u64, Span>>>,
394 samples: Arc<RwLock<VecDeque<PerformanceSample>>>,
396 warnings: Arc<RwLock<Vec<PerformanceWarning>>>,
398 recommendations: Arc<RwLock<Vec<Recommendation>>>,
400 stats: Arc<RwLock<ProfilerStats>>,
402 next_span_id: AtomicU64,
404 start_time: Arc<RwLock<Option<Instant>>>,
406 events_counter: AtomicU64,
408 bytes_counter: AtomicU64,
410}
411
412#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct PerformanceWarning {
415 pub warning_type: WarningType,
417 pub message: String,
419 pub severity: WarningSeverity,
421 pub timestamp: DateTime<Utc>,
423 pub current_value: f64,
425 pub threshold: f64,
427}
428
429#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
431pub enum WarningType {
432 HighCpuUsage,
433 HighMemoryUsage,
434 HighLatency,
435 LowThroughput,
436 BufferOverflow,
437 MemoryLeak,
438 GarbageCollection,
439 ThreadContention,
440}
441
442#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
444pub enum WarningSeverity {
445 Info,
446 Warning,
447 Critical,
448}
449
450#[derive(Debug, Clone, Serialize, Deserialize)]
452pub struct Recommendation {
453 pub category: RecommendationCategory,
455 pub title: String,
457 pub description: String,
459 pub impact: RecommendationImpact,
461 pub effort: RecommendationEffort,
463 pub priority: u8,
465}
466
467#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
469pub enum RecommendationCategory {
470 BatchSize,
471 BufferSize,
472 Parallelism,
473 MemoryManagement,
474 CpuOptimization,
475 NetworkOptimization,
476 QueryOptimization,
477 Configuration,
478}
479
480#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
482pub enum RecommendationImpact {
483 Low,
484 Medium,
485 High,
486}
487
488#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
490pub enum RecommendationEffort {
491 Low,
492 Medium,
493 High,
494}
495
496#[derive(Debug, Clone, Default, Serialize, Deserialize)]
498pub struct ProfilerStats {
499 pub total_events: u64,
501 pub total_bytes: u64,
503 pub total_duration_secs: f64,
505 pub avg_throughput: f64,
507 pub peak_throughput: f64,
509 pub warnings_generated: u64,
511 pub spans_recorded: u64,
513 pub samples_collected: u64,
515}
516
517impl PerformanceProfiler {
518 pub fn builder() -> ProfilerBuilder {
520 ProfilerBuilder::new()
521 }
522
523 pub fn new(config: ProfilerConfig) -> Self {
525 Self {
526 config,
527 running: Arc::new(AtomicBool::new(false)),
528 latency_histogram: Arc::new(LatencyHistogram::new()),
529 spans: Arc::new(RwLock::new(HashMap::new())),
530 samples: Arc::new(RwLock::new(VecDeque::new())),
531 warnings: Arc::new(RwLock::new(Vec::new())),
532 recommendations: Arc::new(RwLock::new(Vec::new())),
533 stats: Arc::new(RwLock::new(ProfilerStats::default())),
534 next_span_id: AtomicU64::new(0),
535 start_time: Arc::new(RwLock::new(None)),
536 events_counter: AtomicU64::new(0),
537 bytes_counter: AtomicU64::new(0),
538 }
539 }
540
541 pub async fn start(&self) -> Result<()> {
543 if self.running.load(Ordering::Acquire) {
544 return Err(anyhow!("Profiler already running"));
545 }
546
547 self.running.store(true, Ordering::Release);
548 *self.start_time.write().await = Some(Instant::now());
549
550 info!("Performance profiler started");
551 Ok(())
552 }
553
554 pub async fn stop(&self) -> Result<()> {
556 self.running.store(false, Ordering::Release);
557
558 if let Some(start) = *self.start_time.read().await {
560 let duration = start.elapsed();
561 let mut stats = self.stats.write().await;
562 stats.total_duration_secs = duration.as_secs_f64();
563 stats.total_events = self.events_counter.load(Ordering::Relaxed);
564 stats.total_bytes = self.bytes_counter.load(Ordering::Relaxed);
565
566 if duration.as_secs_f64() > 0.0 {
567 stats.avg_throughput = stats.total_events as f64 / duration.as_secs_f64();
568 }
569 }
570
571 info!("Performance profiler stopped");
572 Ok(())
573 }
574
575 pub fn is_running(&self) -> bool {
577 self.running.load(Ordering::Acquire)
578 }
579
580 pub fn record_event(&self, bytes: u64) {
582 self.events_counter.fetch_add(1, Ordering::Relaxed);
583 self.bytes_counter.fetch_add(bytes, Ordering::Relaxed);
584 }
585
586 pub fn record_latency(&self, latency: Duration) {
588 self.latency_histogram.record(latency.as_micros() as u64);
589 }
590
591 pub async fn start_span(&self, name: &str) -> u64 {
593 let id = self.next_span_id.fetch_add(1, Ordering::SeqCst);
594 let span = Span::new(name, id);
595
596 let mut spans = self.spans.write().await;
597 spans.insert(id, span);
598
599 let mut stats = self.stats.write().await;
600 stats.spans_recorded += 1;
601
602 id
603 }
604
605 pub async fn end_span(&self, id: u64) -> Option<Duration> {
607 let mut spans = self.spans.write().await;
608 if let Some(span) = spans.get_mut(&id) {
609 span.finish();
610 let duration = span.duration();
611
612 if self.config.enable_latency_tracking {
614 self.record_latency(duration);
615 }
616
617 Some(duration)
618 } else {
619 None
620 }
621 }
622
623 pub fn time_operation(&self, name: &str) -> OperationTimer {
625 OperationTimer::new(name)
626 }
627
628 pub fn record_operation(&self, timer: OperationTimer) {
630 let duration = timer.elapsed();
631 self.record_latency(duration);
632 }
633
634 pub async fn collect_sample(&self) -> PerformanceSample {
636 let now = Utc::now();
637
638 let events = self.events_counter.load(Ordering::Relaxed);
640 let bytes = self.bytes_counter.load(Ordering::Relaxed);
641
642 let (events_per_second, bytes_per_second) =
643 if let Some(start) = *self.start_time.read().await {
644 let duration = start.elapsed().as_secs_f64();
645 if duration > 0.0 {
646 (events as f64 / duration, (bytes as f64 / duration) as u64)
647 } else {
648 (0.0, 0)
649 }
650 } else {
651 (0.0, 0)
652 };
653
654 let latency_stats = self.latency_histogram.stats();
655
656 let sample = PerformanceSample {
657 timestamp: now,
658 cpu_usage_percent: 0.0, memory_usage_bytes: 0, memory_usage_percent: 0.0,
661 events_per_second,
662 bytes_per_second,
663 active_operations: self.spans.read().await.len() as u64,
664 p99_latency_us: latency_stats.p99,
665 buffer_usage_percent: 0.0,
666 };
667
668 let mut samples = self.samples.write().await;
670 samples.push_back(sample.clone());
671 while samples.len() > self.config.history_size {
672 samples.pop_front();
673 }
674 drop(samples); let mut stats = self.stats.write().await;
677 stats.samples_collected += 1;
678 drop(stats); self.check_warnings(&sample).await;
682
683 sample
684 }
685
686 async fn check_warnings(&self, sample: &PerformanceSample) {
688 let mut warnings = self.warnings.write().await;
689
690 if sample.cpu_usage_percent > self.config.warning_thresholds.cpu_usage_percent {
692 warnings.push(PerformanceWarning {
693 warning_type: WarningType::HighCpuUsage,
694 message: format!(
695 "CPU usage {}% exceeds threshold {}%",
696 sample.cpu_usage_percent, self.config.warning_thresholds.cpu_usage_percent
697 ),
698 severity: if sample.cpu_usage_percent > 95.0 {
699 WarningSeverity::Critical
700 } else {
701 WarningSeverity::Warning
702 },
703 timestamp: sample.timestamp,
704 current_value: sample.cpu_usage_percent,
705 threshold: self.config.warning_thresholds.cpu_usage_percent,
706 });
707 }
708
709 if sample.p99_latency_us > self.config.warning_thresholds.p99_latency_us {
711 warnings.push(PerformanceWarning {
712 warning_type: WarningType::HighLatency,
713 message: format!(
714 "P99 latency {}us exceeds threshold {}us",
715 sample.p99_latency_us, self.config.warning_thresholds.p99_latency_us
716 ),
717 severity: if sample.p99_latency_us
718 > self.config.warning_thresholds.p99_latency_us * 2
719 {
720 WarningSeverity::Critical
721 } else {
722 WarningSeverity::Warning
723 },
724 timestamp: sample.timestamp,
725 current_value: sample.p99_latency_us as f64,
726 threshold: self.config.warning_thresholds.p99_latency_us as f64,
727 });
728 }
729
730 if sample.events_per_second < self.config.warning_thresholds.min_throughput {
732 warnings.push(PerformanceWarning {
733 warning_type: WarningType::LowThroughput,
734 message: format!(
735 "Throughput {:.2} events/sec below threshold {:.2}",
736 sample.events_per_second, self.config.warning_thresholds.min_throughput
737 ),
738 severity: WarningSeverity::Warning,
739 timestamp: sample.timestamp,
740 current_value: sample.events_per_second,
741 threshold: self.config.warning_thresholds.min_throughput,
742 });
743 }
744
745 let mut stats = self.stats.write().await;
746 stats.warnings_generated = warnings.len() as u64;
747 }
748
749 pub async fn generate_recommendations(&self) -> Vec<Recommendation> {
751 let mut recommendations = Vec::new();
752 let latency_stats = self.latency_histogram.stats();
753 let stats = self.stats.read().await;
754
755 if latency_stats.p99 > 10000 {
757 recommendations.push(Recommendation {
758 category: RecommendationCategory::BatchSize,
759 title: "Increase batch size".to_string(),
760 description: "High P99 latency detected. Consider increasing batch size to amortize overhead.".to_string(),
761 impact: RecommendationImpact::High,
762 effort: RecommendationEffort::Low,
763 priority: 9,
764 });
765 }
766
767 if stats.avg_throughput < 1000.0 && stats.total_events > 100 {
769 recommendations.push(Recommendation {
770 category: RecommendationCategory::Parallelism,
771 title: "Increase parallelism".to_string(),
772 description:
773 "Low throughput detected. Consider increasing worker threads or partitions."
774 .to_string(),
775 impact: RecommendationImpact::High,
776 effort: RecommendationEffort::Medium,
777 priority: 8,
778 });
779 }
780
781 if latency_stats.max > latency_stats.p99 * 10 {
783 recommendations.push(Recommendation {
784 category: RecommendationCategory::MemoryManagement,
785 title: "Investigate latency spikes".to_string(),
786 description: "Large variance in latency detected. May indicate GC pressure or resource contention.".to_string(),
787 impact: RecommendationImpact::Medium,
788 effort: RecommendationEffort::High,
789 priority: 7,
790 });
791 }
792
793 *self.recommendations.write().await = recommendations.clone();
795
796 recommendations
797 }
798
799 pub fn get_latency_stats(&self) -> HistogramStats {
801 self.latency_histogram.stats()
802 }
803
804 pub async fn get_warnings(&self) -> Vec<PerformanceWarning> {
806 self.warnings.read().await.clone()
807 }
808
809 pub async fn get_samples(&self) -> Vec<PerformanceSample> {
811 self.samples.read().await.iter().cloned().collect()
812 }
813
814 pub async fn get_stats(&self) -> ProfilerStats {
816 self.stats.read().await.clone()
817 }
818
819 pub async fn generate_report(&self) -> PerformanceReport {
821 let stats = self.stats.read().await.clone();
822 let latency_stats = self.latency_histogram.stats();
823 let warnings = self.warnings.read().await.clone();
824 let recommendations = self.generate_recommendations().await;
825 let samples = self.samples.read().await.iter().cloned().collect();
826 let summary = self.generate_summary(&stats, &latency_stats).await;
827
828 PerformanceReport {
829 generated_at: Utc::now(),
830 duration_secs: stats.total_duration_secs,
831 total_events: stats.total_events,
832 total_bytes: stats.total_bytes,
833 avg_throughput: stats.avg_throughput,
834 peak_throughput: stats.peak_throughput,
835 latency_stats,
836 warnings,
837 recommendations,
838 samples,
839 summary,
840 }
841 }
842
843 async fn generate_summary(&self, stats: &ProfilerStats, latency: &HistogramStats) -> String {
845 let mut summary = String::new();
846
847 summary.push_str(&format!("Performance Summary\n{}\n", "=".repeat(50)));
848 summary.push_str(&format!("Duration: {:.2}s\n", stats.total_duration_secs));
849 summary.push_str(&format!("Events processed: {}\n", stats.total_events));
850 summary.push_str(&format!(
851 "Throughput: {:.2} events/sec\n",
852 stats.avg_throughput
853 ));
854 summary.push_str(&format!(
855 "Latency P50/P99/Max: {}us / {}us / {}us\n",
856 latency.p50, latency.p99, latency.max
857 ));
858 summary.push_str(&format!("Warnings: {}\n", stats.warnings_generated));
859
860 summary
861 }
862
863 pub async fn reset(&self) {
865 self.latency_histogram.reset();
866 self.spans.write().await.clear();
867 self.samples.write().await.clear();
868 self.warnings.write().await.clear();
869 self.recommendations.write().await.clear();
870 *self.stats.write().await = ProfilerStats::default();
871 self.events_counter.store(0, Ordering::Relaxed);
872 self.bytes_counter.store(0, Ordering::Relaxed);
873 *self.start_time.write().await = None;
874
875 info!("Performance profiler reset");
876 }
877}
878
879#[derive(Debug, Clone, Serialize, Deserialize)]
881pub struct PerformanceReport {
882 pub generated_at: DateTime<Utc>,
884 pub duration_secs: f64,
886 pub total_events: u64,
888 pub total_bytes: u64,
890 pub avg_throughput: f64,
892 pub peak_throughput: f64,
894 pub latency_stats: HistogramStats,
896 pub warnings: Vec<PerformanceWarning>,
898 pub recommendations: Vec<Recommendation>,
900 pub samples: Vec<PerformanceSample>,
902 pub summary: String,
904}
905
906impl PerformanceReport {
907 pub fn to_json(&self) -> Result<String> {
909 serde_json::to_string_pretty(self).map_err(|e| anyhow!("JSON error: {}", e))
910 }
911
912 pub fn print(&self) {
914 println!("{}", self.summary);
915
916 if !self.warnings.is_empty() {
917 println!("\nWarnings:");
918 for warning in &self.warnings {
919 println!(" [{:?}] {}", warning.severity, warning.message);
920 }
921 }
922
923 if !self.recommendations.is_empty() {
924 println!("\nRecommendations:");
925 for rec in &self.recommendations {
926 println!(
927 " [Priority {}] {} - {}",
928 rec.priority, rec.title, rec.description
929 );
930 }
931 }
932 }
933}
934
935pub struct ProfilerBuilder {
937 config: ProfilerConfig,
938}
939
940impl ProfilerBuilder {
941 pub fn new() -> Self {
943 Self {
944 config: ProfilerConfig::default(),
945 }
946 }
947
948 pub fn with_cpu_profiling(mut self) -> Self {
950 self.config.enable_cpu_profiling = true;
951 self
952 }
953
954 pub fn with_memory_tracking(mut self) -> Self {
956 self.config.enable_memory_profiling = true;
957 self
958 }
959
960 pub fn sampling_interval(mut self, interval: Duration) -> Self {
962 self.config.sampling_interval = interval;
963 self
964 }
965
966 pub fn history_size(mut self, size: usize) -> Self {
968 self.config.history_size = size;
969 self
970 }
971
972 pub fn warning_thresholds(mut self, thresholds: WarningThresholds) -> Self {
974 self.config.warning_thresholds = thresholds;
975 self
976 }
977
978 pub fn build(self) -> PerformanceProfiler {
980 PerformanceProfiler::new(self.config)
981 }
982}
983
984impl Default for ProfilerBuilder {
985 fn default() -> Self {
986 Self::new()
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use super::*;
993
994 #[tokio::test]
995 async fn test_profiler_creation() {
996 let profiler = PerformanceProfiler::builder().build();
997 assert!(!profiler.is_running());
998 }
999
1000 #[tokio::test]
1001 async fn test_start_stop() {
1002 let profiler = PerformanceProfiler::builder().build();
1003
1004 profiler.start().await.unwrap();
1005 assert!(profiler.is_running());
1006
1007 profiler.stop().await.unwrap();
1008 assert!(!profiler.is_running());
1009 }
1010
1011 #[tokio::test]
1012 async fn test_record_event() {
1013 let profiler = PerformanceProfiler::builder().build();
1014 profiler.start().await.unwrap();
1015
1016 profiler.record_event(100);
1017 profiler.record_event(200);
1018
1019 profiler.stop().await.unwrap();
1020 let stats = profiler.get_stats().await;
1021 assert_eq!(stats.total_events, 2);
1022 assert_eq!(stats.total_bytes, 300);
1023 }
1024
1025 #[tokio::test]
1026 async fn test_latency_histogram() {
1027 let histogram = LatencyHistogram::new();
1028
1029 histogram.record(100);
1030 histogram.record(200);
1031 histogram.record(1000);
1032 histogram.record(5000);
1033 histogram.record(10000);
1034
1035 let stats = histogram.stats();
1036 assert_eq!(stats.count, 5);
1037 assert!(stats.min <= 100);
1038 assert!(stats.max >= 10000);
1039 }
1040
1041 #[tokio::test]
1042 async fn test_spans() {
1043 let profiler = PerformanceProfiler::builder().build();
1044 profiler.start().await.unwrap();
1045
1046 let span_id = profiler.start_span("test_operation").await;
1047 tokio::time::sleep(Duration::from_millis(10)).await;
1048 let duration = profiler.end_span(span_id).await;
1049
1050 assert!(duration.is_some());
1051 assert!(duration.unwrap() >= Duration::from_millis(10));
1052 }
1053
1054 #[tokio::test]
1055 async fn test_operation_timer() {
1056 let profiler = PerformanceProfiler::builder().build();
1057
1058 let timer = profiler.time_operation("test");
1059 tokio::time::sleep(Duration::from_millis(5)).await;
1060 profiler.record_operation(timer);
1061
1062 let stats = profiler.get_latency_stats();
1063 assert!(stats.count > 0);
1064 }
1065
1066 #[tokio::test]
1067 async fn test_collect_sample() {
1068 let profiler = PerformanceProfiler::builder().build();
1069 profiler.start().await.unwrap();
1070
1071 profiler.record_event(100);
1072 let sample = profiler.collect_sample().await;
1073
1074 assert!(sample.events_per_second >= 0.0);
1075 }
1076
1077 #[tokio::test]
1078 async fn test_recommendations() {
1079 let profiler = PerformanceProfiler::builder().build();
1080 profiler.start().await.unwrap();
1081
1082 for _ in 0..100 {
1084 profiler.record_latency(Duration::from_millis(50));
1085 }
1086
1087 let recommendations = profiler.generate_recommendations().await;
1088 assert!(!recommendations.is_empty());
1089 }
1090
1091 #[tokio::test]
1092 async fn test_generate_report() {
1093 let profiler = PerformanceProfiler::builder().build();
1094 profiler.start().await.unwrap();
1095
1096 for _ in 0..10 {
1097 profiler.record_event(100);
1098 profiler.record_latency(Duration::from_micros(500));
1099 }
1100
1101 profiler.stop().await.unwrap();
1102 let report = profiler.generate_report().await;
1103
1104 assert_eq!(report.total_events, 10);
1105 assert!(!report.summary.is_empty());
1106 }
1107
1108 #[tokio::test]
1109 async fn test_warnings() {
1110 let thresholds = WarningThresholds {
1111 min_throughput: 10000.0, ..Default::default()
1113 };
1114
1115 let profiler = PerformanceProfiler::builder()
1116 .warning_thresholds(thresholds)
1117 .build();
1118
1119 profiler.start().await.unwrap();
1120
1121 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1123
1124 profiler.record_event(100);
1125
1126 profiler.collect_sample().await;
1128
1129 let warnings = profiler.get_warnings().await;
1130 assert!(warnings
1132 .iter()
1133 .any(|w| w.warning_type == WarningType::LowThroughput));
1134 }
1135
1136 #[tokio::test]
1137 async fn test_reset() {
1138 let profiler = PerformanceProfiler::builder().build();
1139 profiler.start().await.unwrap();
1140
1141 profiler.record_event(100);
1142 profiler.record_latency(Duration::from_micros(100));
1143
1144 profiler.reset().await;
1145
1146 let stats = profiler.get_stats().await;
1147 assert_eq!(stats.total_events, 0);
1148
1149 let latency = profiler.get_latency_stats();
1150 assert_eq!(latency.count, 0);
1151 }
1152
1153 #[test]
1154 fn test_histogram_percentiles() {
1155 let histogram = LatencyHistogram::new();
1156
1157 for i in 1..=100 {
1159 histogram.record(i * 10);
1160 }
1161
1162 let p50 = histogram.percentile(50.0);
1163 let p99 = histogram.percentile(99.0);
1164
1165 assert!(p50 <= p99);
1166 }
1167}