Skip to main content

oxirs_stream/
performance_profiler.rs

1//! # Performance Profiler and Optimizer
2//!
3//! This module provides comprehensive performance profiling and optimization
4//! capabilities for stream processing applications.
5//!
6//! ## Features
7//! - Real-time performance monitoring
8//! - Bottleneck detection and analysis
9//! - CPU and memory profiling
10//! - Latency distribution tracking
11//! - Throughput analysis
12//! - Optimization recommendations
13//! - Performance reports
14//!
15//! ## Usage
16//! ```rust,ignore
17//! let profiler = PerformanceProfiler::builder()
18//!     .with_cpu_profiling()
19//!     .with_memory_tracking()
20//!     .build()
21//!     .await?;
22//!
23//! profiler.start().await?;
24//! // ... run stream processing ...
25//! let report = profiler.generate_report().await;
26//! ```
27
28use anyhow::{anyhow, Result};
29use chrono::{DateTime, Utc};
30use serde::{Deserialize, Serialize};
31use std::collections::{HashMap, VecDeque};
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant};
35use tokio::sync::RwLock;
36use tracing::info;
37
38/// Configuration for the performance profiler
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ProfilerConfig {
41    /// Enable CPU profiling
42    pub enable_cpu_profiling: bool,
43    /// Enable memory profiling
44    pub enable_memory_profiling: bool,
45    /// Enable latency tracking
46    pub enable_latency_tracking: bool,
47    /// Enable throughput tracking
48    pub enable_throughput_tracking: bool,
49    /// Sampling interval
50    pub sampling_interval: Duration,
51    /// History size (number of samples to keep)
52    pub history_size: usize,
53    /// Enable automatic optimization recommendations
54    pub enable_recommendations: bool,
55    /// Latency percentiles to track
56    pub percentiles: Vec<f64>,
57    /// Warning thresholds
58    pub warning_thresholds: WarningThresholds,
59    /// Enable flame graph generation
60    pub enable_flame_graph: bool,
61    /// Maximum span depth
62    pub max_span_depth: usize,
63}
64
65impl Default for ProfilerConfig {
66    fn default() -> Self {
67        Self {
68            enable_cpu_profiling: true,
69            enable_memory_profiling: true,
70            enable_latency_tracking: true,
71            enable_throughput_tracking: true,
72            sampling_interval: Duration::from_secs(1),
73            history_size: 3600, // 1 hour of per-second samples
74            enable_recommendations: true,
75            percentiles: vec![50.0, 90.0, 95.0, 99.0, 99.9],
76            warning_thresholds: WarningThresholds::default(),
77            enable_flame_graph: false,
78            max_span_depth: 100,
79        }
80    }
81}
82
83/// Warning thresholds for performance metrics
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct WarningThresholds {
86    /// CPU usage threshold (percentage)
87    pub cpu_usage_percent: f64,
88    /// Memory usage threshold (percentage)
89    pub memory_usage_percent: f64,
90    /// P99 latency threshold (microseconds)
91    pub p99_latency_us: u64,
92    /// Minimum throughput threshold (events/sec)
93    pub min_throughput: f64,
94    /// Buffer usage threshold (percentage)
95    pub buffer_usage_percent: f64,
96}
97
98impl Default for WarningThresholds {
99    fn default() -> Self {
100        Self {
101            cpu_usage_percent: 80.0,
102            memory_usage_percent: 85.0,
103            p99_latency_us: 10000, // 10ms
104            min_throughput: 1000.0,
105            buffer_usage_percent: 90.0,
106        }
107    }
108}
109
110/// Performance span for tracking operations
111#[derive(Debug, Clone)]
112pub struct Span {
113    /// Span name
114    pub name: String,
115    /// Start time
116    pub start: Instant,
117    /// End time
118    pub end: Option<Instant>,
119    /// Parent span ID
120    pub parent_id: Option<u64>,
121    /// Span ID
122    pub id: u64,
123    /// Tags
124    pub tags: HashMap<String, String>,
125    /// Child spans
126    pub children: Vec<u64>,
127}
128
129impl Span {
130    /// Create a new span
131    pub fn new(name: &str, id: u64) -> Self {
132        Self {
133            name: name.to_string(),
134            start: Instant::now(),
135            end: None,
136            parent_id: None,
137            id,
138            tags: HashMap::new(),
139            children: Vec::new(),
140        }
141    }
142
143    /// End the span
144    pub fn finish(&mut self) {
145        self.end = Some(Instant::now());
146    }
147
148    /// Get duration
149    pub fn duration(&self) -> Duration {
150        if let Some(end) = self.end {
151            end.duration_since(self.start)
152        } else {
153            self.start.elapsed()
154        }
155    }
156
157    /// Add a tag
158    pub fn tag(&mut self, key: &str, value: &str) {
159        self.tags.insert(key.to_string(), value.to_string());
160    }
161}
162
163/// Latency histogram for tracking distribution
164pub struct LatencyHistogram {
165    /// Buckets for latency distribution
166    buckets: Vec<(u64, AtomicU64)>, // (upper_bound_us, count)
167    /// Total count
168    total: AtomicU64,
169    /// Sum for mean calculation
170    sum: AtomicU64,
171    /// Maximum value
172    max: AtomicU64,
173    /// Minimum value
174    min: AtomicU64,
175}
176
177impl LatencyHistogram {
178    /// Create a new histogram with default buckets
179    pub fn new() -> Self {
180        // Buckets: 1us, 10us, 50us, 100us, 500us, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, inf
181        let bucket_bounds = vec![
182            1,
183            10,
184            50,
185            100,
186            500,
187            1000,
188            5000,
189            10000,
190            50000,
191            100000,
192            500000,
193            1000000,
194            u64::MAX,
195        ];
196
197        let buckets = bucket_bounds
198            .into_iter()
199            .map(|b| (b, AtomicU64::new(0)))
200            .collect();
201
202        Self {
203            buckets,
204            total: AtomicU64::new(0),
205            sum: AtomicU64::new(0),
206            max: AtomicU64::new(0),
207            min: AtomicU64::new(u64::MAX),
208        }
209    }
210
211    /// Record a latency value
212    pub fn record(&self, latency_us: u64) {
213        self.total.fetch_add(1, Ordering::Relaxed);
214        self.sum.fetch_add(latency_us, Ordering::Relaxed);
215
216        // Update max
217        let mut current_max = self.max.load(Ordering::Relaxed);
218        while latency_us > current_max {
219            match self.max.compare_exchange_weak(
220                current_max,
221                latency_us,
222                Ordering::SeqCst,
223                Ordering::Relaxed,
224            ) {
225                Ok(_) => break,
226                Err(v) => current_max = v,
227            }
228        }
229
230        // Update min
231        let mut current_min = self.min.load(Ordering::Relaxed);
232        while latency_us < current_min {
233            match self.min.compare_exchange_weak(
234                current_min,
235                latency_us,
236                Ordering::SeqCst,
237                Ordering::Relaxed,
238            ) {
239                Ok(_) => break,
240                Err(v) => current_min = v,
241            }
242        }
243
244        // Find bucket
245        for (bound, count) in &self.buckets {
246            if latency_us <= *bound {
247                count.fetch_add(1, Ordering::Relaxed);
248                break;
249            }
250        }
251    }
252
253    /// Get percentile value
254    pub fn percentile(&self, p: f64) -> u64 {
255        let total = self.total.load(Ordering::Relaxed);
256        if total == 0 {
257            return 0;
258        }
259
260        let target = ((total as f64) * p / 100.0) as u64;
261        let mut cumulative = 0u64;
262
263        for (bound, count) in &self.buckets {
264            cumulative += count.load(Ordering::Relaxed);
265            if cumulative >= target {
266                return *bound;
267            }
268        }
269
270        self.max.load(Ordering::Relaxed)
271    }
272
273    /// Get mean latency
274    pub fn mean(&self) -> f64 {
275        let total = self.total.load(Ordering::Relaxed);
276        if total == 0 {
277            return 0.0;
278        }
279        self.sum.load(Ordering::Relaxed) as f64 / total as f64
280    }
281
282    /// Get statistics
283    pub fn stats(&self) -> HistogramStats {
284        HistogramStats {
285            count: self.total.load(Ordering::Relaxed),
286            mean: self.mean(),
287            min: self.min.load(Ordering::Relaxed),
288            max: self.max.load(Ordering::Relaxed),
289            p50: self.percentile(50.0),
290            p90: self.percentile(90.0),
291            p95: self.percentile(95.0),
292            p99: self.percentile(99.0),
293            p999: self.percentile(99.9),
294        }
295    }
296
297    /// Reset the histogram
298    pub fn reset(&self) {
299        self.total.store(0, Ordering::Relaxed);
300        self.sum.store(0, Ordering::Relaxed);
301        self.max.store(0, Ordering::Relaxed);
302        self.min.store(u64::MAX, Ordering::Relaxed);
303        for (_, count) in &self.buckets {
304            count.store(0, Ordering::Relaxed);
305        }
306    }
307}
308
309impl Default for LatencyHistogram {
310    fn default() -> Self {
311        Self::new()
312    }
313}
314
315/// Histogram statistics
316#[derive(Debug, Clone, Serialize, Deserialize)]
317pub struct HistogramStats {
318    pub count: u64,
319    pub mean: f64,
320    pub min: u64,
321    pub max: u64,
322    pub p50: u64,
323    pub p90: u64,
324    pub p95: u64,
325    pub p99: u64,
326    pub p999: u64,
327}
328
329/// Performance sample at a point in time
330#[derive(Debug, Clone, Serialize, Deserialize)]
331pub struct PerformanceSample {
332    /// Timestamp
333    pub timestamp: DateTime<Utc>,
334    /// CPU usage percentage
335    pub cpu_usage_percent: f64,
336    /// Memory usage bytes
337    pub memory_usage_bytes: u64,
338    /// Memory usage percentage
339    pub memory_usage_percent: f64,
340    /// Events processed per second
341    pub events_per_second: f64,
342    /// Bytes processed per second
343    pub bytes_per_second: u64,
344    /// Active operations
345    pub active_operations: u64,
346    /// P99 latency
347    pub p99_latency_us: u64,
348    /// Buffer usage percentage
349    pub buffer_usage_percent: f64,
350}
351
352/// Operation timer for measuring specific operations
353pub struct OperationTimer {
354    /// Operation name
355    name: String,
356    /// Start time
357    start: Instant,
358    /// Tags
359    tags: HashMap<String, String>,
360}
361
362impl OperationTimer {
363    /// Create a new operation timer
364    pub fn new(name: &str) -> Self {
365        Self {
366            name: name.to_string(),
367            start: Instant::now(),
368            tags: HashMap::new(),
369        }
370    }
371
372    /// Add a tag
373    pub fn tag(mut self, key: &str, value: &str) -> Self {
374        self.tags.insert(key.to_string(), value.to_string());
375        self
376    }
377
378    /// Get elapsed time
379    pub fn elapsed(&self) -> Duration {
380        self.start.elapsed()
381    }
382}
383
384/// Performance profiler
385pub struct PerformanceProfiler {
386    /// Configuration
387    config: ProfilerConfig,
388    /// Running flag
389    running: Arc<AtomicBool>,
390    /// Latency histogram
391    latency_histogram: Arc<LatencyHistogram>,
392    /// Active spans
393    spans: Arc<RwLock<HashMap<u64, Span>>>,
394    /// Performance samples
395    samples: Arc<RwLock<VecDeque<PerformanceSample>>>,
396    /// Warnings
397    warnings: Arc<RwLock<Vec<PerformanceWarning>>>,
398    /// Recommendations
399    recommendations: Arc<RwLock<Vec<Recommendation>>>,
400    /// Statistics
401    stats: Arc<RwLock<ProfilerStats>>,
402    /// Next span ID
403    next_span_id: AtomicU64,
404    /// Start time
405    start_time: Arc<RwLock<Option<Instant>>>,
406    /// Events counter
407    events_counter: AtomicU64,
408    /// Bytes counter
409    bytes_counter: AtomicU64,
410}
411
412/// Performance warning
413#[derive(Debug, Clone, Serialize, Deserialize)]
414pub struct PerformanceWarning {
415    /// Warning type
416    pub warning_type: WarningType,
417    /// Message
418    pub message: String,
419    /// Severity
420    pub severity: WarningSeverity,
421    /// Timestamp
422    pub timestamp: DateTime<Utc>,
423    /// Current value
424    pub current_value: f64,
425    /// Threshold
426    pub threshold: f64,
427}
428
429/// Warning types
430#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
431pub enum WarningType {
432    HighCpuUsage,
433    HighMemoryUsage,
434    HighLatency,
435    LowThroughput,
436    BufferOverflow,
437    MemoryLeak,
438    GarbageCollection,
439    ThreadContention,
440}
441
442/// Warning severity
443#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
444pub enum WarningSeverity {
445    Info,
446    Warning,
447    Critical,
448}
449
450/// Optimization recommendation
451#[derive(Debug, Clone, Serialize, Deserialize)]
452pub struct Recommendation {
453    /// Category
454    pub category: RecommendationCategory,
455    /// Title
456    pub title: String,
457    /// Description
458    pub description: String,
459    /// Impact
460    pub impact: RecommendationImpact,
461    /// Effort
462    pub effort: RecommendationEffort,
463    /// Priority score
464    pub priority: u8,
465}
466
467/// Recommendation categories
468#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
469pub enum RecommendationCategory {
470    BatchSize,
471    BufferSize,
472    Parallelism,
473    MemoryManagement,
474    CpuOptimization,
475    NetworkOptimization,
476    QueryOptimization,
477    Configuration,
478}
479
480/// Recommendation impact
481#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
482pub enum RecommendationImpact {
483    Low,
484    Medium,
485    High,
486}
487
488/// Recommendation effort
489#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
490pub enum RecommendationEffort {
491    Low,
492    Medium,
493    High,
494}
495
496/// Profiler statistics
497#[derive(Debug, Clone, Default, Serialize, Deserialize)]
498pub struct ProfilerStats {
499    /// Total events processed
500    pub total_events: u64,
501    /// Total bytes processed
502    pub total_bytes: u64,
503    /// Total duration
504    pub total_duration_secs: f64,
505    /// Average throughput
506    pub avg_throughput: f64,
507    /// Peak throughput
508    pub peak_throughput: f64,
509    /// Warnings generated
510    pub warnings_generated: u64,
511    /// Spans recorded
512    pub spans_recorded: u64,
513    /// Samples collected
514    pub samples_collected: u64,
515}
516
517impl PerformanceProfiler {
518    /// Create a new profiler builder
519    pub fn builder() -> ProfilerBuilder {
520        ProfilerBuilder::new()
521    }
522
523    /// Create a new profiler with config
524    pub fn new(config: ProfilerConfig) -> Self {
525        Self {
526            config,
527            running: Arc::new(AtomicBool::new(false)),
528            latency_histogram: Arc::new(LatencyHistogram::new()),
529            spans: Arc::new(RwLock::new(HashMap::new())),
530            samples: Arc::new(RwLock::new(VecDeque::new())),
531            warnings: Arc::new(RwLock::new(Vec::new())),
532            recommendations: Arc::new(RwLock::new(Vec::new())),
533            stats: Arc::new(RwLock::new(ProfilerStats::default())),
534            next_span_id: AtomicU64::new(0),
535            start_time: Arc::new(RwLock::new(None)),
536            events_counter: AtomicU64::new(0),
537            bytes_counter: AtomicU64::new(0),
538        }
539    }
540
541    /// Start profiling
542    pub async fn start(&self) -> Result<()> {
543        if self.running.load(Ordering::Acquire) {
544            return Err(anyhow!("Profiler already running"));
545        }
546
547        self.running.store(true, Ordering::Release);
548        *self.start_time.write().await = Some(Instant::now());
549
550        info!("Performance profiler started");
551        Ok(())
552    }
553
554    /// Stop profiling
555    pub async fn stop(&self) -> Result<()> {
556        self.running.store(false, Ordering::Release);
557
558        // Update final stats
559        if let Some(start) = *self.start_time.read().await {
560            let duration = start.elapsed();
561            let mut stats = self.stats.write().await;
562            stats.total_duration_secs = duration.as_secs_f64();
563            stats.total_events = self.events_counter.load(Ordering::Relaxed);
564            stats.total_bytes = self.bytes_counter.load(Ordering::Relaxed);
565
566            if duration.as_secs_f64() > 0.0 {
567                stats.avg_throughput = stats.total_events as f64 / duration.as_secs_f64();
568            }
569        }
570
571        info!("Performance profiler stopped");
572        Ok(())
573    }
574
575    /// Check if running
576    pub fn is_running(&self) -> bool {
577        self.running.load(Ordering::Acquire)
578    }
579
580    /// Record an event
581    pub fn record_event(&self, bytes: u64) {
582        self.events_counter.fetch_add(1, Ordering::Relaxed);
583        self.bytes_counter.fetch_add(bytes, Ordering::Relaxed);
584    }
585
586    /// Record latency
587    pub fn record_latency(&self, latency: Duration) {
588        self.latency_histogram.record(latency.as_micros() as u64);
589    }
590
591    /// Start a new span
592    pub async fn start_span(&self, name: &str) -> u64 {
593        let id = self.next_span_id.fetch_add(1, Ordering::SeqCst);
594        let span = Span::new(name, id);
595
596        let mut spans = self.spans.write().await;
597        spans.insert(id, span);
598
599        let mut stats = self.stats.write().await;
600        stats.spans_recorded += 1;
601
602        id
603    }
604
605    /// End a span
606    pub async fn end_span(&self, id: u64) -> Option<Duration> {
607        let mut spans = self.spans.write().await;
608        if let Some(span) = spans.get_mut(&id) {
609            span.finish();
610            let duration = span.duration();
611
612            // Record latency if latency tracking is enabled
613            if self.config.enable_latency_tracking {
614                self.record_latency(duration);
615            }
616
617            Some(duration)
618        } else {
619            None
620        }
621    }
622
623    /// Create an operation timer
624    pub fn time_operation(&self, name: &str) -> OperationTimer {
625        OperationTimer::new(name)
626    }
627
628    /// Record operation completion
629    pub fn record_operation(&self, timer: OperationTimer) {
630        let duration = timer.elapsed();
631        self.record_latency(duration);
632    }
633
634    /// Collect a performance sample
635    pub async fn collect_sample(&self) -> PerformanceSample {
636        let now = Utc::now();
637
638        // Calculate throughput
639        let events = self.events_counter.load(Ordering::Relaxed);
640        let bytes = self.bytes_counter.load(Ordering::Relaxed);
641
642        let (events_per_second, bytes_per_second) =
643            if let Some(start) = *self.start_time.read().await {
644                let duration = start.elapsed().as_secs_f64();
645                if duration > 0.0 {
646                    (events as f64 / duration, (bytes as f64 / duration) as u64)
647                } else {
648                    (0.0, 0)
649                }
650            } else {
651                (0.0, 0)
652            };
653
654        let latency_stats = self.latency_histogram.stats();
655
656        let sample = PerformanceSample {
657            timestamp: now,
658            cpu_usage_percent: 0.0, // Would need system API
659            memory_usage_bytes: 0,  // Would need system API
660            memory_usage_percent: 0.0,
661            events_per_second,
662            bytes_per_second,
663            active_operations: self.spans.read().await.len() as u64,
664            p99_latency_us: latency_stats.p99,
665            buffer_usage_percent: 0.0,
666        };
667
668        // Store sample
669        let mut samples = self.samples.write().await;
670        samples.push_back(sample.clone());
671        while samples.len() > self.config.history_size {
672            samples.pop_front();
673        }
674        drop(samples); // Release samples lock
675
676        let mut stats = self.stats.write().await;
677        stats.samples_collected += 1;
678        drop(stats); // Release stats lock before calling check_warnings
679
680        // Check for warnings (needs to acquire stats lock)
681        self.check_warnings(&sample).await;
682
683        sample
684    }
685
686    /// Check for performance warnings
687    async fn check_warnings(&self, sample: &PerformanceSample) {
688        let mut warnings = self.warnings.write().await;
689
690        // Check CPU usage
691        if sample.cpu_usage_percent > self.config.warning_thresholds.cpu_usage_percent {
692            warnings.push(PerformanceWarning {
693                warning_type: WarningType::HighCpuUsage,
694                message: format!(
695                    "CPU usage {}% exceeds threshold {}%",
696                    sample.cpu_usage_percent, self.config.warning_thresholds.cpu_usage_percent
697                ),
698                severity: if sample.cpu_usage_percent > 95.0 {
699                    WarningSeverity::Critical
700                } else {
701                    WarningSeverity::Warning
702                },
703                timestamp: sample.timestamp,
704                current_value: sample.cpu_usage_percent,
705                threshold: self.config.warning_thresholds.cpu_usage_percent,
706            });
707        }
708
709        // Check latency
710        if sample.p99_latency_us > self.config.warning_thresholds.p99_latency_us {
711            warnings.push(PerformanceWarning {
712                warning_type: WarningType::HighLatency,
713                message: format!(
714                    "P99 latency {}us exceeds threshold {}us",
715                    sample.p99_latency_us, self.config.warning_thresholds.p99_latency_us
716                ),
717                severity: if sample.p99_latency_us
718                    > self.config.warning_thresholds.p99_latency_us * 2
719                {
720                    WarningSeverity::Critical
721                } else {
722                    WarningSeverity::Warning
723                },
724                timestamp: sample.timestamp,
725                current_value: sample.p99_latency_us as f64,
726                threshold: self.config.warning_thresholds.p99_latency_us as f64,
727            });
728        }
729
730        // Check throughput
731        if sample.events_per_second < self.config.warning_thresholds.min_throughput {
732            warnings.push(PerformanceWarning {
733                warning_type: WarningType::LowThroughput,
734                message: format!(
735                    "Throughput {:.2} events/sec below threshold {:.2}",
736                    sample.events_per_second, self.config.warning_thresholds.min_throughput
737                ),
738                severity: WarningSeverity::Warning,
739                timestamp: sample.timestamp,
740                current_value: sample.events_per_second,
741                threshold: self.config.warning_thresholds.min_throughput,
742            });
743        }
744
745        let mut stats = self.stats.write().await;
746        stats.warnings_generated = warnings.len() as u64;
747    }
748
749    /// Generate optimization recommendations
750    pub async fn generate_recommendations(&self) -> Vec<Recommendation> {
751        let mut recommendations = Vec::new();
752        let latency_stats = self.latency_histogram.stats();
753        let stats = self.stats.read().await;
754
755        // Check latency
756        if latency_stats.p99 > 10000 {
757            recommendations.push(Recommendation {
758                category: RecommendationCategory::BatchSize,
759                title: "Increase batch size".to_string(),
760                description: "High P99 latency detected. Consider increasing batch size to amortize overhead.".to_string(),
761                impact: RecommendationImpact::High,
762                effort: RecommendationEffort::Low,
763                priority: 9,
764            });
765        }
766
767        // Check throughput
768        if stats.avg_throughput < 1000.0 && stats.total_events > 100 {
769            recommendations.push(Recommendation {
770                category: RecommendationCategory::Parallelism,
771                title: "Increase parallelism".to_string(),
772                description:
773                    "Low throughput detected. Consider increasing worker threads or partitions."
774                        .to_string(),
775                impact: RecommendationImpact::High,
776                effort: RecommendationEffort::Medium,
777                priority: 8,
778            });
779        }
780
781        // Check variance
782        if latency_stats.max > latency_stats.p99 * 10 {
783            recommendations.push(Recommendation {
784                category: RecommendationCategory::MemoryManagement,
785                title: "Investigate latency spikes".to_string(),
786                description: "Large variance in latency detected. May indicate GC pressure or resource contention.".to_string(),
787                impact: RecommendationImpact::Medium,
788                effort: RecommendationEffort::High,
789                priority: 7,
790            });
791        }
792
793        // Store recommendations
794        *self.recommendations.write().await = recommendations.clone();
795
796        recommendations
797    }
798
799    /// Get latency statistics
800    pub fn get_latency_stats(&self) -> HistogramStats {
801        self.latency_histogram.stats()
802    }
803
804    /// Get all warnings
805    pub async fn get_warnings(&self) -> Vec<PerformanceWarning> {
806        self.warnings.read().await.clone()
807    }
808
809    /// Get samples
810    pub async fn get_samples(&self) -> Vec<PerformanceSample> {
811        self.samples.read().await.iter().cloned().collect()
812    }
813
814    /// Get statistics
815    pub async fn get_stats(&self) -> ProfilerStats {
816        self.stats.read().await.clone()
817    }
818
819    /// Generate performance report
820    pub async fn generate_report(&self) -> PerformanceReport {
821        let stats = self.stats.read().await.clone();
822        let latency_stats = self.latency_histogram.stats();
823        let warnings = self.warnings.read().await.clone();
824        let recommendations = self.generate_recommendations().await;
825        let samples = self.samples.read().await.iter().cloned().collect();
826        let summary = self.generate_summary(&stats, &latency_stats).await;
827
828        PerformanceReport {
829            generated_at: Utc::now(),
830            duration_secs: stats.total_duration_secs,
831            total_events: stats.total_events,
832            total_bytes: stats.total_bytes,
833            avg_throughput: stats.avg_throughput,
834            peak_throughput: stats.peak_throughput,
835            latency_stats,
836            warnings,
837            recommendations,
838            samples,
839            summary,
840        }
841    }
842
843    /// Generate summary
844    async fn generate_summary(&self, stats: &ProfilerStats, latency: &HistogramStats) -> String {
845        let mut summary = String::new();
846
847        summary.push_str(&format!("Performance Summary\n{}\n", "=".repeat(50)));
848        summary.push_str(&format!("Duration: {:.2}s\n", stats.total_duration_secs));
849        summary.push_str(&format!("Events processed: {}\n", stats.total_events));
850        summary.push_str(&format!(
851            "Throughput: {:.2} events/sec\n",
852            stats.avg_throughput
853        ));
854        summary.push_str(&format!(
855            "Latency P50/P99/Max: {}us / {}us / {}us\n",
856            latency.p50, latency.p99, latency.max
857        ));
858        summary.push_str(&format!("Warnings: {}\n", stats.warnings_generated));
859
860        summary
861    }
862
863    /// Reset profiler
864    pub async fn reset(&self) {
865        self.latency_histogram.reset();
866        self.spans.write().await.clear();
867        self.samples.write().await.clear();
868        self.warnings.write().await.clear();
869        self.recommendations.write().await.clear();
870        *self.stats.write().await = ProfilerStats::default();
871        self.events_counter.store(0, Ordering::Relaxed);
872        self.bytes_counter.store(0, Ordering::Relaxed);
873        *self.start_time.write().await = None;
874
875        info!("Performance profiler reset");
876    }
877}
878
879/// Performance report
880#[derive(Debug, Clone, Serialize, Deserialize)]
881pub struct PerformanceReport {
882    /// Generation time
883    pub generated_at: DateTime<Utc>,
884    /// Duration
885    pub duration_secs: f64,
886    /// Total events
887    pub total_events: u64,
888    /// Total bytes
889    pub total_bytes: u64,
890    /// Average throughput
891    pub avg_throughput: f64,
892    /// Peak throughput
893    pub peak_throughput: f64,
894    /// Latency statistics
895    pub latency_stats: HistogramStats,
896    /// Warnings
897    pub warnings: Vec<PerformanceWarning>,
898    /// Recommendations
899    pub recommendations: Vec<Recommendation>,
900    /// Samples
901    pub samples: Vec<PerformanceSample>,
902    /// Summary
903    pub summary: String,
904}
905
906impl PerformanceReport {
907    /// Convert to JSON
908    pub fn to_json(&self) -> Result<String> {
909        serde_json::to_string_pretty(self).map_err(|e| anyhow!("JSON error: {}", e))
910    }
911
912    /// Print report
913    pub fn print(&self) {
914        println!("{}", self.summary);
915
916        if !self.warnings.is_empty() {
917            println!("\nWarnings:");
918            for warning in &self.warnings {
919                println!("  [{:?}] {}", warning.severity, warning.message);
920            }
921        }
922
923        if !self.recommendations.is_empty() {
924            println!("\nRecommendations:");
925            for rec in &self.recommendations {
926                println!(
927                    "  [Priority {}] {} - {}",
928                    rec.priority, rec.title, rec.description
929                );
930            }
931        }
932    }
933}
934
935/// Profiler builder
936pub struct ProfilerBuilder {
937    config: ProfilerConfig,
938}
939
940impl ProfilerBuilder {
941    /// Create a new builder
942    pub fn new() -> Self {
943        Self {
944            config: ProfilerConfig::default(),
945        }
946    }
947
948    /// Enable CPU profiling
949    pub fn with_cpu_profiling(mut self) -> Self {
950        self.config.enable_cpu_profiling = true;
951        self
952    }
953
954    /// Enable memory tracking
955    pub fn with_memory_tracking(mut self) -> Self {
956        self.config.enable_memory_profiling = true;
957        self
958    }
959
960    /// Set sampling interval
961    pub fn sampling_interval(mut self, interval: Duration) -> Self {
962        self.config.sampling_interval = interval;
963        self
964    }
965
966    /// Set history size
967    pub fn history_size(mut self, size: usize) -> Self {
968        self.config.history_size = size;
969        self
970    }
971
972    /// Set warning thresholds
973    pub fn warning_thresholds(mut self, thresholds: WarningThresholds) -> Self {
974        self.config.warning_thresholds = thresholds;
975        self
976    }
977
978    /// Build the profiler
979    pub fn build(self) -> PerformanceProfiler {
980        PerformanceProfiler::new(self.config)
981    }
982}
983
984impl Default for ProfilerBuilder {
985    fn default() -> Self {
986        Self::new()
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use super::*;
993
994    #[tokio::test]
995    async fn test_profiler_creation() {
996        let profiler = PerformanceProfiler::builder().build();
997        assert!(!profiler.is_running());
998    }
999
1000    #[tokio::test]
1001    async fn test_start_stop() {
1002        let profiler = PerformanceProfiler::builder().build();
1003
1004        profiler.start().await.unwrap();
1005        assert!(profiler.is_running());
1006
1007        profiler.stop().await.unwrap();
1008        assert!(!profiler.is_running());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_record_event() {
1013        let profiler = PerformanceProfiler::builder().build();
1014        profiler.start().await.unwrap();
1015
1016        profiler.record_event(100);
1017        profiler.record_event(200);
1018
1019        profiler.stop().await.unwrap();
1020        let stats = profiler.get_stats().await;
1021        assert_eq!(stats.total_events, 2);
1022        assert_eq!(stats.total_bytes, 300);
1023    }
1024
1025    #[tokio::test]
1026    async fn test_latency_histogram() {
1027        let histogram = LatencyHistogram::new();
1028
1029        histogram.record(100);
1030        histogram.record(200);
1031        histogram.record(1000);
1032        histogram.record(5000);
1033        histogram.record(10000);
1034
1035        let stats = histogram.stats();
1036        assert_eq!(stats.count, 5);
1037        assert!(stats.min <= 100);
1038        assert!(stats.max >= 10000);
1039    }
1040
1041    #[tokio::test]
1042    async fn test_spans() {
1043        let profiler = PerformanceProfiler::builder().build();
1044        profiler.start().await.unwrap();
1045
1046        let span_id = profiler.start_span("test_operation").await;
1047        tokio::time::sleep(Duration::from_millis(10)).await;
1048        let duration = profiler.end_span(span_id).await;
1049
1050        assert!(duration.is_some());
1051        assert!(duration.unwrap() >= Duration::from_millis(10));
1052    }
1053
1054    #[tokio::test]
1055    async fn test_operation_timer() {
1056        let profiler = PerformanceProfiler::builder().build();
1057
1058        let timer = profiler.time_operation("test");
1059        tokio::time::sleep(Duration::from_millis(5)).await;
1060        profiler.record_operation(timer);
1061
1062        let stats = profiler.get_latency_stats();
1063        assert!(stats.count > 0);
1064    }
1065
1066    #[tokio::test]
1067    async fn test_collect_sample() {
1068        let profiler = PerformanceProfiler::builder().build();
1069        profiler.start().await.unwrap();
1070
1071        profiler.record_event(100);
1072        let sample = profiler.collect_sample().await;
1073
1074        assert!(sample.events_per_second >= 0.0);
1075    }
1076
1077    #[tokio::test]
1078    async fn test_recommendations() {
1079        let profiler = PerformanceProfiler::builder().build();
1080        profiler.start().await.unwrap();
1081
1082        // Record some high latency operations
1083        for _ in 0..100 {
1084            profiler.record_latency(Duration::from_millis(50));
1085        }
1086
1087        let recommendations = profiler.generate_recommendations().await;
1088        assert!(!recommendations.is_empty());
1089    }
1090
1091    #[tokio::test]
1092    async fn test_generate_report() {
1093        let profiler = PerformanceProfiler::builder().build();
1094        profiler.start().await.unwrap();
1095
1096        for _ in 0..10 {
1097            profiler.record_event(100);
1098            profiler.record_latency(Duration::from_micros(500));
1099        }
1100
1101        profiler.stop().await.unwrap();
1102        let report = profiler.generate_report().await;
1103
1104        assert_eq!(report.total_events, 10);
1105        assert!(!report.summary.is_empty());
1106    }
1107
1108    #[tokio::test]
1109    async fn test_warnings() {
1110        let thresholds = WarningThresholds {
1111            min_throughput: 10000.0, // Very high threshold
1112            ..Default::default()
1113        };
1114
1115        let profiler = PerformanceProfiler::builder()
1116            .warning_thresholds(thresholds)
1117            .build();
1118
1119        profiler.start().await.unwrap();
1120
1121        // Wait a bit to ensure time passes for throughput calculation
1122        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1123
1124        profiler.record_event(100);
1125
1126        // Collect sample which checks warnings
1127        profiler.collect_sample().await;
1128
1129        let warnings = profiler.get_warnings().await;
1130        // Should have low throughput warning
1131        assert!(warnings
1132            .iter()
1133            .any(|w| w.warning_type == WarningType::LowThroughput));
1134    }
1135
1136    #[tokio::test]
1137    async fn test_reset() {
1138        let profiler = PerformanceProfiler::builder().build();
1139        profiler.start().await.unwrap();
1140
1141        profiler.record_event(100);
1142        profiler.record_latency(Duration::from_micros(100));
1143
1144        profiler.reset().await;
1145
1146        let stats = profiler.get_stats().await;
1147        assert_eq!(stats.total_events, 0);
1148
1149        let latency = profiler.get_latency_stats();
1150        assert_eq!(latency.count, 0);
1151    }
1152
1153    #[test]
1154    fn test_histogram_percentiles() {
1155        let histogram = LatencyHistogram::new();
1156
1157        // Add 100 samples
1158        for i in 1..=100 {
1159            histogram.record(i * 10);
1160        }
1161
1162        let p50 = histogram.percentile(50.0);
1163        let p99 = histogram.percentile(99.0);
1164
1165        assert!(p50 <= p99);
1166    }
1167}