hojicha_runtime/
metrics.rs

1//! Advanced performance metrics and percentile latency tracking
2//!
3//! This module provides comprehensive performance monitoring capabilities
4//! for production applications, including percentile latencies, throughput
5//! metrics, and queue utilization statistics.
6
7use hdrhistogram::Histogram;
8use log::trace;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, Instant};
13
14/// Advanced event processing statistics with percentile tracking
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct AdvancedEventStats {
17    /// Basic event counts
18    pub basic: BasicStats,
19
20    /// Latency percentiles by priority
21    pub latency: LatencyStats,
22
23    /// Throughput metrics
24    pub throughput: ThroughputStats,
25
26    /// Queue utilization statistics
27    pub queue: QueueStats,
28
29    /// Time-windowed statistics
30    pub windows: WindowedStats,
31}
32
33/// Basic event statistics
34#[derive(Debug, Clone, Default, Serialize, Deserialize)]
35pub struct BasicStats {
36    /// Total number of events processed
37    pub total_events: usize,
38    /// Number of high priority events processed
39    pub high_priority_events: usize,
40    /// Number of normal priority events processed
41    pub normal_priority_events: usize,
42    /// Number of low priority events processed
43    pub low_priority_events: usize,
44    /// Number of events dropped due to queue overflow
45    pub dropped_events: usize,
46    /// Number of times backpressure was activated
47    pub backpressure_activations: usize,
48}
49
50/// Latency statistics with percentiles
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct LatencyStats {
53    /// High priority event latencies
54    pub high_priority: LatencyPercentiles,
55
56    /// Normal priority event latencies
57    pub normal_priority: LatencyPercentiles,
58
59    /// Low priority event latencies
60    pub low_priority: LatencyPercentiles,
61
62    /// Overall latencies across all priorities
63    pub overall: LatencyPercentiles,
64
65    /// Event type specific latencies
66    pub by_type: HashMap<String, LatencyPercentiles>,
67}
68
69/// Latency percentiles in microseconds
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct LatencyPercentiles {
72    /// Minimum latency observed
73    pub min: u64,
74    /// 50th percentile (median) latency
75    pub p50: u64,
76    /// 75th percentile latency
77    pub p75: u64,
78    /// 90th percentile latency
79    pub p90: u64,
80    /// 95th percentile latency
81    pub p95: u64,
82    /// 99th percentile latency
83    pub p99: u64,
84    /// 99.9th percentile latency
85    pub p999: u64,
86    /// Maximum latency observed
87    pub max: u64,
88    /// Mean (average) latency
89    pub mean: f64,
90    /// Standard deviation of latency
91    pub std_dev: f64,
92    /// Number of latency measurements recorded
93    pub count: u64,
94}
95
96impl Default for LatencyPercentiles {
97    fn default() -> Self {
98        Self {
99            min: 0,
100            p50: 0,
101            p75: 0,
102            p90: 0,
103            p95: 0,
104            p99: 0,
105            p999: 0,
106            max: 0,
107            mean: 0.0,
108            std_dev: 0.0,
109            count: 0,
110        }
111    }
112}
113
114/// Throughput metrics
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ThroughputStats {
117    /// Current events per second
118    pub current_rate: f64,
119
120    /// Peak rate observed
121    pub peak_rate: f64,
122
123    /// Average rate over last minute
124    pub avg_rate_1m: f64,
125
126    /// Average rate over last 5 minutes
127    pub avg_rate_5m: f64,
128
129    /// Average processing time per event (microseconds)
130    pub avg_processing_time_us: f64,
131}
132
133impl Default for ThroughputStats {
134    fn default() -> Self {
135        Self {
136            current_rate: 0.0,
137            peak_rate: 0.0,
138            avg_rate_1m: 0.0,
139            avg_rate_5m: 0.0,
140            avg_processing_time_us: 0.0,
141        }
142    }
143}
144
145/// Queue utilization statistics
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct QueueStats {
148    /// Current queue depth
149    pub current_depth: usize,
150
151    /// Maximum depth reached
152    pub max_depth: usize,
153
154    /// Average depth over time
155    pub avg_depth: f64,
156
157    /// Percentage of time at capacity
158    pub saturation_percentage: f64,
159
160    /// Queue growth rate (events/sec)
161    pub growth_rate: f64,
162}
163
164impl Default for QueueStats {
165    fn default() -> Self {
166        Self {
167            current_depth: 0,
168            max_depth: 0,
169            avg_depth: 0.0,
170            saturation_percentage: 0.0,
171            growth_rate: 0.0,
172        }
173    }
174}
175
176/// Time-windowed statistics
177#[derive(Debug, Clone, Default, Serialize, Deserialize)]
178pub struct WindowedStats {
179    /// Last 60 seconds (1-second buckets)
180    pub last_minute: Vec<BucketStats>,
181
182    /// Last hour (1-minute buckets)
183    pub last_hour: Vec<BucketStats>,
184}
185
186/// Statistics for a time bucket
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct BucketStats {
189    /// Unix timestamp in seconds for this bucket
190    pub timestamp: u64,
191    /// Number of events processed in this time bucket
192    pub events_processed: usize,
193    /// Number of events dropped in this time bucket
194    pub events_dropped: usize,
195    /// Average latency in microseconds for this bucket
196    pub avg_latency_us: f64,
197    /// 99th percentile latency in microseconds for this bucket
198    pub p99_latency_us: u64,
199}
200
201/// Configuration for metrics collection
202#[derive(Debug, Clone)]
203pub struct MetricsConfig {
204    /// Enable percentile tracking
205    pub track_percentiles: bool,
206
207    /// Track metrics by event type
208    pub track_by_type: bool,
209
210    /// Sampling rate (0.0 to 1.0)
211    pub sampling_rate: f64,
212
213    /// Maximum histogram size (limits memory usage)
214    pub max_histogram_size: u64,
215
216    /// Window size for rate calculations
217    pub rate_window: Duration,
218}
219
220impl Default for MetricsConfig {
221    fn default() -> Self {
222        Self {
223            track_percentiles: true,
224            track_by_type: false,
225            sampling_rate: 1.0,
226            max_histogram_size: 100_000,
227            rate_window: Duration::from_secs(60),
228        }
229    }
230}
231
232/// Latency tracker using HDR Histogram for efficient percentile calculation
233struct LatencyTracker {
234    histogram: Histogram<u64>,
235}
236
237impl LatencyTracker {
238    fn new(max_value: u64) -> Self {
239        let histogram = Histogram::new_with_max(max_value, 3).expect("Failed to create histogram");
240        Self { histogram }
241    }
242
243    fn record(&mut self, latency_us: u64) {
244        let _ = self.histogram.record(latency_us);
245    }
246
247    fn percentiles(&self) -> LatencyPercentiles {
248        if self.histogram.is_empty() {
249            return LatencyPercentiles::default();
250        }
251
252        LatencyPercentiles {
253            min: self.histogram.min(),
254            p50: self.histogram.value_at_percentile(50.0),
255            p75: self.histogram.value_at_percentile(75.0),
256            p90: self.histogram.value_at_percentile(90.0),
257            p95: self.histogram.value_at_percentile(95.0),
258            p99: self.histogram.value_at_percentile(99.0),
259            p999: self.histogram.value_at_percentile(99.9),
260            max: self.histogram.max(),
261            mean: self.histogram.mean(),
262            std_dev: self.histogram.stdev(),
263            count: self.histogram.len(),
264        }
265    }
266
267    fn reset(&mut self) {
268        self.histogram.reset();
269    }
270}
271
272/// Metrics collector for event processing
273pub struct MetricsCollector {
274    config: MetricsConfig,
275    start_time: Instant,
276
277    // Basic counters
278    basic: Arc<Mutex<BasicStats>>,
279
280    // Latency tracking
281    high_priority_latency: Arc<Mutex<LatencyTracker>>,
282    normal_priority_latency: Arc<Mutex<LatencyTracker>>,
283    low_priority_latency: Arc<Mutex<LatencyTracker>>,
284    overall_latency: Arc<Mutex<LatencyTracker>>,
285    by_type_latency: Arc<Mutex<HashMap<String, LatencyTracker>>>,
286
287    // Throughput tracking
288    event_times: Arc<Mutex<Vec<Instant>>>,
289    processing_times: Arc<Mutex<Vec<Duration>>>,
290    peak_rate: Arc<Mutex<f64>>,
291
292    // Queue tracking
293    queue_depths: Arc<Mutex<Vec<(Instant, usize)>>>,
294    max_queue_depth: Arc<Mutex<usize>>,
295    time_at_capacity: Arc<Mutex<Duration>>,
296    last_capacity_check: Arc<Mutex<Instant>>,
297
298    // Windowed stats
299    minute_buckets: Arc<Mutex<Vec<BucketStats>>>,
300    hour_buckets: Arc<Mutex<Vec<BucketStats>>>,
301}
302
303impl MetricsCollector {
304    /// Create a new metrics collector
305    pub fn new(config: MetricsConfig) -> Self {
306        let max_latency = 10_000_000; // 10 seconds in microseconds
307
308        Self {
309            config,
310            start_time: Instant::now(),
311            basic: Arc::new(Mutex::new(BasicStats::default())),
312            high_priority_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
313            normal_priority_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
314            low_priority_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
315            overall_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
316            by_type_latency: Arc::new(Mutex::new(HashMap::new())),
317            event_times: Arc::new(Mutex::new(Vec::new())),
318            processing_times: Arc::new(Mutex::new(Vec::new())),
319            peak_rate: Arc::new(Mutex::new(0.0)),
320            queue_depths: Arc::new(Mutex::new(Vec::new())),
321            max_queue_depth: Arc::new(Mutex::new(0)),
322            time_at_capacity: Arc::new(Mutex::new(Duration::ZERO)),
323            last_capacity_check: Arc::new(Mutex::new(Instant::now())),
324            minute_buckets: Arc::new(Mutex::new(Vec::new())),
325            hour_buckets: Arc::new(Mutex::new(Vec::new())),
326        }
327    }
328
329    /// Record an event being processed
330    pub fn record_event(
331        &self,
332        priority: crate::priority_queue::Priority,
333        latency: Duration,
334        event_type: Option<&str>,
335    ) {
336        // Apply sampling
337        if self.config.sampling_rate < 1.0 {
338            // Simplified sampling: skip based on a simple counter
339            use std::sync::atomic::{AtomicUsize, Ordering};
340            static COUNTER: AtomicUsize = AtomicUsize::new(0);
341            let count = COUNTER.fetch_add(1, Ordering::Relaxed);
342            let sample_every = (1.0 / self.config.sampling_rate) as usize;
343            if !count.is_multiple_of(sample_every) {
344                return;
345            }
346        }
347
348        let latency_us = latency.as_micros() as u64;
349
350        // Update basic stats
351        {
352            let mut basic = self.basic.lock().unwrap();
353            basic.total_events += 1;
354            match priority {
355                crate::priority_queue::Priority::High => basic.high_priority_events += 1,
356                crate::priority_queue::Priority::Normal => basic.normal_priority_events += 1,
357                crate::priority_queue::Priority::Low => basic.low_priority_events += 1,
358            }
359        }
360
361        // Update latency histograms
362        if self.config.track_percentiles {
363            self.overall_latency.lock().unwrap().record(latency_us);
364
365            match priority {
366                crate::priority_queue::Priority::High => {
367                    self.high_priority_latency
368                        .lock()
369                        .unwrap()
370                        .record(latency_us);
371                }
372                crate::priority_queue::Priority::Normal => {
373                    self.normal_priority_latency
374                        .lock()
375                        .unwrap()
376                        .record(latency_us);
377                }
378                crate::priority_queue::Priority::Low => {
379                    self.low_priority_latency.lock().unwrap().record(latency_us);
380                }
381            }
382
383            if self.config.track_by_type {
384                if let Some(event_type) = event_type {
385                    let mut by_type = self.by_type_latency.lock().unwrap();
386                    by_type
387                        .entry(event_type.to_string())
388                        .or_insert_with(|| LatencyTracker::new(10_000_000))
389                        .record(latency_us);
390                }
391            }
392        }
393
394        // Update throughput tracking
395        {
396            let mut event_times = self.event_times.lock().unwrap();
397            let now = Instant::now();
398            event_times.push(now);
399
400            // Keep only events in the rate window
401            let cutoff = now - self.config.rate_window;
402            event_times.retain(|t| *t > cutoff);
403
404            // Calculate current rate
405            if event_times.len() > 1 {
406                let duration = now.duration_since(event_times[0]).as_secs_f64();
407                if duration > 0.0 {
408                    let rate = event_times.len() as f64 / duration;
409                    let mut peak = self.peak_rate.lock().unwrap();
410                    if rate > *peak {
411                        *peak = rate;
412                    }
413                }
414            }
415        }
416
417        // Record processing time
418        self.processing_times.lock().unwrap().push(latency);
419
420        trace!("Recorded event: priority={priority:?}, latency={latency_us}μs");
421    }
422
423    /// Record a dropped event
424    pub fn record_dropped(&self) {
425        self.basic.lock().unwrap().dropped_events += 1;
426    }
427
428    /// Record backpressure activation
429    pub fn record_backpressure(&self) {
430        self.basic.lock().unwrap().backpressure_activations += 1;
431    }
432
433    /// Update queue depth
434    pub fn update_queue_depth(&self, depth: usize, capacity: usize) {
435        let now = Instant::now();
436
437        // Track queue depths over time
438        {
439            let mut depths = self.queue_depths.lock().unwrap();
440            depths.push((now, depth));
441
442            // Keep only recent depths
443            let cutoff = now - Duration::from_secs(300); // 5 minutes
444            depths.retain(|(t, _)| *t > cutoff);
445        }
446
447        // Update max depth
448        {
449            let mut max_depth = self.max_queue_depth.lock().unwrap();
450            if depth > *max_depth {
451                *max_depth = depth;
452            }
453        }
454
455        // Track time at capacity
456        if depth >= capacity {
457            let mut last_check = self.last_capacity_check.lock().unwrap();
458            let duration = now.duration_since(*last_check);
459            *self.time_at_capacity.lock().unwrap() += duration;
460            *last_check = now;
461        }
462    }
463
464    /// Take a snapshot of current metrics
465    pub fn snapshot(&self) -> AdvancedEventStats {
466        let now = Instant::now();
467        let elapsed = now.duration_since(self.start_time).as_secs_f64();
468
469        // Calculate latency stats
470        let latency = LatencyStats {
471            high_priority: self.high_priority_latency.lock().unwrap().percentiles(),
472            normal_priority: self.normal_priority_latency.lock().unwrap().percentiles(),
473            low_priority: self.low_priority_latency.lock().unwrap().percentiles(),
474            overall: self.overall_latency.lock().unwrap().percentiles(),
475            by_type: self
476                .by_type_latency
477                .lock()
478                .unwrap()
479                .iter()
480                .map(|(k, v)| (k.clone(), v.percentiles()))
481                .collect(),
482        };
483
484        // Calculate throughput stats
485        let throughput = {
486            let event_times = self.event_times.lock().unwrap();
487            let processing_times = self.processing_times.lock().unwrap();
488
489            let current_rate = if event_times.len() > 1 {
490                let duration = now.duration_since(event_times[0]).as_secs_f64();
491                if duration > 0.0 {
492                    event_times.len() as f64 / duration
493                } else {
494                    0.0
495                }
496            } else {
497                0.0
498            };
499
500            let avg_processing = if !processing_times.is_empty() {
501                let sum: Duration = processing_times.iter().sum();
502                sum.as_micros() as f64 / processing_times.len() as f64
503            } else {
504                0.0
505            };
506
507            ThroughputStats {
508                current_rate,
509                peak_rate: *self.peak_rate.lock().unwrap(),
510                avg_rate_1m: current_rate, // Simplified for now
511                avg_rate_5m: current_rate, // Simplified for now
512                avg_processing_time_us: avg_processing,
513            }
514        };
515
516        // Calculate queue stats
517        let queue = {
518            let depths = self.queue_depths.lock().unwrap();
519            let current_depth = depths.last().map(|(_, d)| *d).unwrap_or(0);
520
521            let avg_depth = if !depths.is_empty() {
522                depths.iter().map(|(_, d)| *d).sum::<usize>() as f64 / depths.len() as f64
523            } else {
524                0.0
525            };
526
527            let growth_rate = if depths.len() > 1 {
528                let first = depths.first().unwrap().1 as f64;
529                let last = depths.last().unwrap().1 as f64;
530                let duration = depths
531                    .last()
532                    .unwrap()
533                    .0
534                    .duration_since(depths.first().unwrap().0)
535                    .as_secs_f64();
536                if duration > 0.0 {
537                    (last - first) / duration
538                } else {
539                    0.0
540                }
541            } else {
542                0.0
543            };
544
545            let saturation = if elapsed > 0.0 {
546                self.time_at_capacity.lock().unwrap().as_secs_f64() / elapsed * 100.0
547            } else {
548                0.0
549            };
550
551            QueueStats {
552                current_depth,
553                max_depth: *self.max_queue_depth.lock().unwrap(),
554                avg_depth,
555                saturation_percentage: saturation,
556                growth_rate,
557            }
558        };
559
560        AdvancedEventStats {
561            basic: self.basic.lock().unwrap().clone(),
562            latency,
563            throughput,
564            queue,
565            windows: WindowedStats::default(), // Simplified for now
566        }
567    }
568
569    /// Reset all metrics
570    pub fn reset(&self) {
571        *self.basic.lock().unwrap() = BasicStats::default();
572        self.high_priority_latency.lock().unwrap().reset();
573        self.normal_priority_latency.lock().unwrap().reset();
574        self.low_priority_latency.lock().unwrap().reset();
575        self.overall_latency.lock().unwrap().reset();
576        self.by_type_latency.lock().unwrap().clear();
577        self.event_times.lock().unwrap().clear();
578        self.processing_times.lock().unwrap().clear();
579        *self.peak_rate.lock().unwrap() = 0.0;
580        self.queue_depths.lock().unwrap().clear();
581        *self.max_queue_depth.lock().unwrap() = 0;
582        *self.time_at_capacity.lock().unwrap() = Duration::ZERO;
583        self.minute_buckets.lock().unwrap().clear();
584        self.hour_buckets.lock().unwrap().clear();
585    }
586
587    /// Export metrics in JSON format
588    pub fn export_json(&self) -> String {
589        let stats = self.snapshot();
590        stats.export(ExportFormat::Json)
591    }
592
593    /// Export metrics in Prometheus format
594    pub fn export_prometheus(&self) -> String {
595        let stats = self.snapshot();
596        stats.export(ExportFormat::Prometheus)
597    }
598
599    /// Export metrics in plain text format
600    pub fn export_text(&self) -> String {
601        let stats = self.snapshot();
602        stats.export(ExportFormat::PlainText)
603    }
604}
605
606/// Export format for metrics
607#[derive(Debug, Clone, Copy)]
608pub enum ExportFormat {
609    /// Export as JSON format
610    Json,
611    /// Export in Prometheus metrics format
612    Prometheus,
613    /// Export as plain text
614    PlainText,
615}
616
617impl AdvancedEventStats {
618    /// Export metrics in the specified format
619    pub fn export(&self, format: ExportFormat) -> String {
620        match format {
621            ExportFormat::Json => self.to_json(),
622            ExportFormat::Prometheus => self.to_prometheus(),
623            ExportFormat::PlainText => self.to_plain_text(),
624        }
625    }
626
627    fn to_json(&self) -> String {
628        serde_json::to_string_pretty(self)
629            .unwrap_or_else(|e| format!("Failed to serialize metrics: {e}"))
630    }
631
632    fn to_prometheus(&self) -> String {
633        let mut output = String::new();
634
635        // Basic metrics
636        output.push_str("# HELP hojicha_events_total Total events processed\n");
637        output.push_str("# TYPE hojicha_events_total counter\n");
638        output.push_str(&format!(
639            "hojicha_events_total {{}} {}\n",
640            self.basic.total_events
641        ));
642        output.push_str(&format!(
643            "hojicha_events_total {{priority=\"high\"}} {}\n",
644            self.basic.high_priority_events
645        ));
646        output.push_str(&format!(
647            "hojicha_events_total {{priority=\"normal\"}} {}\n",
648            self.basic.normal_priority_events
649        ));
650        output.push_str(&format!(
651            "hojicha_events_total {{priority=\"low\"}} {}\n",
652            self.basic.low_priority_events
653        ));
654
655        // Dropped events
656        output.push_str("# HELP hojicha_events_dropped Total events dropped\n");
657        output.push_str("# TYPE hojicha_events_dropped counter\n");
658        output.push_str(&format!(
659            "hojicha_events_dropped {{}} {}\n",
660            self.basic.dropped_events
661        ));
662
663        // Latency metrics
664        output.push_str("# HELP hojicha_event_latency_microseconds Event processing latency\n");
665        output.push_str("# TYPE hojicha_event_latency_microseconds summary\n");
666
667        for (priority, stats) in [
668            ("high", &self.latency.high_priority),
669            ("normal", &self.latency.normal_priority),
670            ("low", &self.latency.low_priority),
671        ] {
672            output.push_str(&format!(
673                "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.5\"}} {}\n",
674                priority, stats.p50
675            ));
676            output.push_str(&format!(
677                "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.9\"}} {}\n",
678                priority, stats.p90
679            ));
680            output.push_str(&format!(
681                "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.95\"}} {}\n",
682                priority, stats.p95
683            ));
684            output.push_str(&format!(
685                "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.99\"}} {}\n",
686                priority, stats.p99
687            ));
688            output.push_str(&format!(
689                "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.999\"}} {}\n",
690                priority, stats.p999
691            ));
692        }
693
694        // Throughput metrics
695        output.push_str("# HELP hojicha_throughput_rate Events per second\n");
696        output.push_str("# TYPE hojicha_throughput_rate gauge\n");
697        output.push_str(&format!(
698            "hojicha_throughput_rate {{type=\"current\"}} {}\n",
699            self.throughput.current_rate
700        ));
701        output.push_str(&format!(
702            "hojicha_throughput_rate {{type=\"peak\"}} {}\n",
703            self.throughput.peak_rate
704        ));
705
706        // Queue metrics
707        output.push_str("# HELP hojicha_queue_depth Current queue depth\n");
708        output.push_str("# TYPE hojicha_queue_depth gauge\n");
709        output.push_str(&format!(
710            "hojicha_queue_depth {{}} {}\n",
711            self.queue.current_depth
712        ));
713
714        output.push_str("# HELP hojicha_queue_saturation Queue saturation percentage\n");
715        output.push_str("# TYPE hojicha_queue_saturation gauge\n");
716        output.push_str(&format!(
717            "hojicha_queue_saturation {{}} {}\n",
718            self.queue.saturation_percentage
719        ));
720
721        output
722    }
723
724    fn to_plain_text(&self) -> String {
725        format!(
726            "Event Processing Metrics\n\
727            ========================\n\
728            Total Events: {}\n\
729            - High Priority: {}\n\
730            - Normal Priority: {}\n\
731            - Low Priority: {}\n\
732            - Dropped: {}\n\n\
733            Latency (μs):\n\
734            - High Priority:   p50={} p95={} p99={} max={}\n\
735            - Normal Priority: p50={} p95={} p99={} max={}\n\
736            - Low Priority:    p50={} p95={} p99={} max={}\n\n\
737            Throughput:\n\
738            - Current Rate: {:.1} events/sec\n\
739            - Peak Rate: {:.1} events/sec\n\
740            - Avg Processing Time: {:.1} μs\n\n\
741            Queue:\n\
742            - Current Depth: {}\n\
743            - Max Depth: {}\n\
744            - Saturation: {:.1}%\n\
745            - Growth Rate: {:.1} events/sec",
746            self.basic.total_events,
747            self.basic.high_priority_events,
748            self.basic.normal_priority_events,
749            self.basic.low_priority_events,
750            self.basic.dropped_events,
751            self.latency.high_priority.p50,
752            self.latency.high_priority.p95,
753            self.latency.high_priority.p99,
754            self.latency.high_priority.max,
755            self.latency.normal_priority.p50,
756            self.latency.normal_priority.p95,
757            self.latency.normal_priority.p99,
758            self.latency.normal_priority.max,
759            self.latency.low_priority.p50,
760            self.latency.low_priority.p95,
761            self.latency.low_priority.p99,
762            self.latency.low_priority.max,
763            self.throughput.current_rate,
764            self.throughput.peak_rate,
765            self.throughput.avg_processing_time_us,
766            self.queue.current_depth,
767            self.queue.max_depth,
768            self.queue.saturation_percentage,
769            self.queue.growth_rate,
770        )
771    }
772}
773
774/// Print a formatted metrics dashboard
775pub fn print_metrics_dashboard(stats: &AdvancedEventStats) {
776    eprintln!("╔══════════════════════════════════════════════════════════════╗");
777    eprintln!("║                  Event Processing Metrics Dashboard          ║");
778    eprintln!("╠══════════════════════════════════════════════════════════════╣");
779    eprintln!("║ Throughput:                                                  ║");
780    eprintln!(
781        "║   Current: {:>8.1} evt/s   Peak: {:>8.1} evt/s           ║",
782        stats.throughput.current_rate, stats.throughput.peak_rate
783    );
784    eprintln!(
785        "║   Processing Time: {:>8.1} μs average                       ║",
786        stats.throughput.avg_processing_time_us
787    );
788    eprintln!("║                                                              ║");
789    eprintln!("║ Latencies (μs):      P50      P95      P99      Max         ║");
790    eprintln!(
791        "║   High Priority:  {:>7} {:>7} {:>7} {:>7}          ║",
792        stats.latency.high_priority.p50,
793        stats.latency.high_priority.p95,
794        stats.latency.high_priority.p99,
795        stats.latency.high_priority.max
796    );
797    eprintln!(
798        "║   Normal Priority:{:>7} {:>7} {:>7} {:>7}          ║",
799        stats.latency.normal_priority.p50,
800        stats.latency.normal_priority.p95,
801        stats.latency.normal_priority.p99,
802        stats.latency.normal_priority.max
803    );
804    eprintln!(
805        "║   Low Priority:   {:>7} {:>7} {:>7} {:>7}          ║",
806        stats.latency.low_priority.p50,
807        stats.latency.low_priority.p95,
808        stats.latency.low_priority.p99,
809        stats.latency.low_priority.max
810    );
811    eprintln!("║                                                              ║");
812    eprintln!("║ Queue:                                                       ║");
813    eprintln!(
814        "║   Depth: {:>5} (max: {:>5})   Saturation: {:>5.1}%         ║",
815        stats.queue.current_depth, stats.queue.max_depth, stats.queue.saturation_percentage
816    );
817    eprintln!(
818        "║   Growth Rate: {:>8.1} events/sec                          ║",
819        stats.queue.growth_rate
820    );
821    eprintln!("║                                                              ║");
822    eprintln!("║ Events:                                                      ║");
823    eprintln!(
824        "║   Total: {:>8}  Dropped: {:>6}  Backpressure: {:>6}  ║",
825        stats.basic.total_events, stats.basic.dropped_events, stats.basic.backpressure_activations
826    );
827    eprintln!("╚══════════════════════════════════════════════════════════════╝");
828}
829
830/// Display metrics dashboard  
831pub fn display_dashboard(stats: &AdvancedEventStats) -> String {
832    let mut output = String::new();
833    use std::fmt::Write;
834
835    let _ = writeln!(
836        output,
837        "╔══════════════════════════════════════════════════════════════╗"
838    );
839    let _ = writeln!(
840        output,
841        "║                    METRICS DASHBOARD                         ║"
842    );
843    let _ = writeln!(
844        output,
845        "╠══════════════════════════════════════════════════════════════╣"
846    );
847
848    // Event counts
849    let _ = writeln!(
850        output,
851        "║ Events Processed:  {:>10}                                ║",
852        stats.basic.total_events
853    );
854    let _ = writeln!(
855        output,
856        "║ Events Dropped:    {:>10}                                ║",
857        stats.basic.dropped_events
858    );
859    let _ = writeln!(
860        output,
861        "║ Backpressure:      {:>10}                                ║",
862        stats.basic.backpressure_activations
863    );
864
865    let _ = writeln!(
866        output,
867        "╠══════════════════════════════════════════════════════════════╣"
868    );
869
870    // Latency percentiles (overall)
871    let _ = writeln!(
872        output,
873        "║ LATENCY (μs) - Overall                                       ║"
874    );
875    let _ = writeln!(
876        output,
877        "║   p50:  {:>8.1}    p75:  {:>8.1}    p90:  {:>8.1}      ║",
878        stats.latency.overall.p50, stats.latency.overall.p75, stats.latency.overall.p90
879    );
880    let _ = writeln!(
881        output,
882        "║   p95:  {:>8.1}    p99:  {:>8.1}    p999: {:>8.1}      ║",
883        stats.latency.overall.p95, stats.latency.overall.p99, stats.latency.overall.p999
884    );
885    let _ = writeln!(
886        output,
887        "║   min:  {:>8.1}    max:  {:>8.1}                        ║",
888        stats.latency.overall.min, stats.latency.overall.max
889    );
890
891    let _ = writeln!(
892        output,
893        "╠══════════════════════════════════════════════════════════════╣"
894    );
895
896    // Throughput
897    let _ = writeln!(
898        output,
899        "║ THROUGHPUT                                                   ║"
900    );
901    let _ = writeln!(
902        output,
903        "║   Current rate:   {:>10.1} events/sec                   ║",
904        stats.throughput.current_rate
905    );
906    let _ = writeln!(
907        output,
908        "║   Peak rate:      {:>10.1} events/sec                   ║",
909        stats.throughput.peak_rate
910    );
911
912    let _ = writeln!(
913        output,
914        "╠══════════════════════════════════════════════════════════════╣"
915    );
916
917    // Queue stats
918    let _ = writeln!(
919        output,
920        "║ QUEUE                                                        ║"
921    );
922    let _ = writeln!(
923        output,
924        "║   Current depth:    {:>6}                                  ║",
925        stats.queue.current_depth
926    );
927    let _ = writeln!(
928        output,
929        "║   Max depth:        {:>6}                                  ║",
930        stats.queue.max_depth
931    );
932    let _ = writeln!(
933        output,
934        "║   Avg depth:        {:>6.1}                                ║",
935        stats.queue.avg_depth
936    );
937    let _ = writeln!(
938        output,
939        "║   Saturation:       {:>6.1}%                                ║",
940        stats.queue.saturation_percentage
941    );
942
943    let _ = writeln!(
944        output,
945        "╚══════════════════════════════════════════════════════════════╝"
946    );
947
948    output
949}
950
951#[cfg(test)]
952mod tests {
953    use super::*;
954    use crate::priority_queue::Priority;
955
956    #[test]
957    fn test_metrics_collector_creation() {
958        let collector = MetricsCollector::new(MetricsConfig::default());
959        let stats = collector.snapshot();
960
961        assert_eq!(stats.basic.total_events, 0);
962        assert_eq!(stats.latency.high_priority.count, 0);
963    }
964
965    #[test]
966    fn test_event_recording() {
967        let collector = MetricsCollector::new(MetricsConfig::default());
968
969        // Record some events
970        collector.record_event(Priority::High, Duration::from_micros(100), Some("test"));
971        collector.record_event(Priority::Normal, Duration::from_micros(200), Some("test"));
972        collector.record_event(Priority::Low, Duration::from_micros(300), None);
973
974        let stats = collector.snapshot();
975
976        assert_eq!(stats.basic.total_events, 3);
977        assert_eq!(stats.basic.high_priority_events, 1);
978        assert_eq!(stats.basic.normal_priority_events, 1);
979        assert_eq!(stats.basic.low_priority_events, 1);
980
981        // Check that latencies were recorded
982        assert!(stats.latency.high_priority.count > 0);
983        assert!(stats.latency.high_priority.mean > 0.0);
984    }
985
986    #[test]
987    fn test_percentile_calculation() {
988        let collector = MetricsCollector::new(MetricsConfig::default());
989
990        // Record many events with known latencies
991        for i in 1..=100 {
992            collector.record_event(Priority::High, Duration::from_micros(i * 10), None);
993        }
994
995        let stats = collector.snapshot();
996        let percentiles = &stats.latency.high_priority;
997
998        // P50 should be around 500μs (50th value * 10)
999        assert!(percentiles.p50 >= 490 && percentiles.p50 <= 510);
1000
1001        // P99 should be around 990μs (99th value * 10)
1002        assert!(percentiles.p99 >= 980 && percentiles.p99 <= 1000);
1003
1004        assert_eq!(percentiles.count, 100);
1005    }
1006
1007    #[test]
1008    fn test_export_formats() {
1009        let collector = MetricsCollector::new(MetricsConfig::default());
1010        collector.record_event(Priority::High, Duration::from_micros(100), None);
1011
1012        let stats = collector.snapshot();
1013
1014        // Test JSON export
1015        let json = stats.export(ExportFormat::Json);
1016        assert!(json.contains("\"total_events\": 1"));
1017
1018        // Test Prometheus export
1019        let prometheus = stats.export(ExportFormat::Prometheus);
1020        assert!(prometheus.contains("hojicha_events_total"));
1021        assert!(prometheus.contains("hojicha_event_latency_microseconds"));
1022
1023        // Test plain text export
1024        let text = stats.export(ExportFormat::PlainText);
1025        assert!(text.contains("Total Events: 1"));
1026    }
1027
1028    #[test]
1029    fn test_metrics_reset() {
1030        let collector = MetricsCollector::new(MetricsConfig::default());
1031
1032        // Record some events
1033        for _ in 0..10 {
1034            collector.record_event(Priority::High, Duration::from_micros(100), None);
1035        }
1036
1037        let stats = collector.snapshot();
1038        assert_eq!(stats.basic.total_events, 10);
1039
1040        // Reset and verify
1041        collector.reset();
1042        let stats = collector.snapshot();
1043        assert_eq!(stats.basic.total_events, 0);
1044        assert_eq!(stats.latency.high_priority.count, 0);
1045    }
1046}