1use hdrhistogram::Histogram;
8use log::trace;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::{Arc, Mutex};
12use std::time::{Duration, Instant};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct AdvancedEventStats {
17 pub basic: BasicStats,
19
20 pub latency: LatencyStats,
22
23 pub throughput: ThroughputStats,
25
26 pub queue: QueueStats,
28
29 pub windows: WindowedStats,
31}
32
33#[derive(Debug, Clone, Default, Serialize, Deserialize)]
35pub struct BasicStats {
36 pub total_events: usize,
38 pub high_priority_events: usize,
40 pub normal_priority_events: usize,
42 pub low_priority_events: usize,
44 pub dropped_events: usize,
46 pub backpressure_activations: usize,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct LatencyStats {
53 pub high_priority: LatencyPercentiles,
55
56 pub normal_priority: LatencyPercentiles,
58
59 pub low_priority: LatencyPercentiles,
61
62 pub overall: LatencyPercentiles,
64
65 pub by_type: HashMap<String, LatencyPercentiles>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct LatencyPercentiles {
72 pub min: u64,
74 pub p50: u64,
76 pub p75: u64,
78 pub p90: u64,
80 pub p95: u64,
82 pub p99: u64,
84 pub p999: u64,
86 pub max: u64,
88 pub mean: f64,
90 pub std_dev: f64,
92 pub count: u64,
94}
95
96impl Default for LatencyPercentiles {
97 fn default() -> Self {
98 Self {
99 min: 0,
100 p50: 0,
101 p75: 0,
102 p90: 0,
103 p95: 0,
104 p99: 0,
105 p999: 0,
106 max: 0,
107 mean: 0.0,
108 std_dev: 0.0,
109 count: 0,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ThroughputStats {
117 pub current_rate: f64,
119
120 pub peak_rate: f64,
122
123 pub avg_rate_1m: f64,
125
126 pub avg_rate_5m: f64,
128
129 pub avg_processing_time_us: f64,
131}
132
133impl Default for ThroughputStats {
134 fn default() -> Self {
135 Self {
136 current_rate: 0.0,
137 peak_rate: 0.0,
138 avg_rate_1m: 0.0,
139 avg_rate_5m: 0.0,
140 avg_processing_time_us: 0.0,
141 }
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct QueueStats {
148 pub current_depth: usize,
150
151 pub max_depth: usize,
153
154 pub avg_depth: f64,
156
157 pub saturation_percentage: f64,
159
160 pub growth_rate: f64,
162}
163
164impl Default for QueueStats {
165 fn default() -> Self {
166 Self {
167 current_depth: 0,
168 max_depth: 0,
169 avg_depth: 0.0,
170 saturation_percentage: 0.0,
171 growth_rate: 0.0,
172 }
173 }
174}
175
176#[derive(Debug, Clone, Default, Serialize, Deserialize)]
178pub struct WindowedStats {
179 pub last_minute: Vec<BucketStats>,
181
182 pub last_hour: Vec<BucketStats>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct BucketStats {
189 pub timestamp: u64,
191 pub events_processed: usize,
193 pub events_dropped: usize,
195 pub avg_latency_us: f64,
197 pub p99_latency_us: u64,
199}
200
201#[derive(Debug, Clone)]
203pub struct MetricsConfig {
204 pub track_percentiles: bool,
206
207 pub track_by_type: bool,
209
210 pub sampling_rate: f64,
212
213 pub max_histogram_size: u64,
215
216 pub rate_window: Duration,
218}
219
220impl Default for MetricsConfig {
221 fn default() -> Self {
222 Self {
223 track_percentiles: true,
224 track_by_type: false,
225 sampling_rate: 1.0,
226 max_histogram_size: 100_000,
227 rate_window: Duration::from_secs(60),
228 }
229 }
230}
231
232struct LatencyTracker {
234 histogram: Histogram<u64>,
235}
236
237impl LatencyTracker {
238 fn new(max_value: u64) -> Self {
239 let histogram = Histogram::new_with_max(max_value, 3).expect("Failed to create histogram");
240 Self { histogram }
241 }
242
243 fn record(&mut self, latency_us: u64) {
244 let _ = self.histogram.record(latency_us);
245 }
246
247 fn percentiles(&self) -> LatencyPercentiles {
248 if self.histogram.is_empty() {
249 return LatencyPercentiles::default();
250 }
251
252 LatencyPercentiles {
253 min: self.histogram.min(),
254 p50: self.histogram.value_at_percentile(50.0),
255 p75: self.histogram.value_at_percentile(75.0),
256 p90: self.histogram.value_at_percentile(90.0),
257 p95: self.histogram.value_at_percentile(95.0),
258 p99: self.histogram.value_at_percentile(99.0),
259 p999: self.histogram.value_at_percentile(99.9),
260 max: self.histogram.max(),
261 mean: self.histogram.mean(),
262 std_dev: self.histogram.stdev(),
263 count: self.histogram.len(),
264 }
265 }
266
267 fn reset(&mut self) {
268 self.histogram.reset();
269 }
270}
271
272pub struct MetricsCollector {
274 config: MetricsConfig,
275 start_time: Instant,
276
277 basic: Arc<Mutex<BasicStats>>,
279
280 high_priority_latency: Arc<Mutex<LatencyTracker>>,
282 normal_priority_latency: Arc<Mutex<LatencyTracker>>,
283 low_priority_latency: Arc<Mutex<LatencyTracker>>,
284 overall_latency: Arc<Mutex<LatencyTracker>>,
285 by_type_latency: Arc<Mutex<HashMap<String, LatencyTracker>>>,
286
287 event_times: Arc<Mutex<Vec<Instant>>>,
289 processing_times: Arc<Mutex<Vec<Duration>>>,
290 peak_rate: Arc<Mutex<f64>>,
291
292 queue_depths: Arc<Mutex<Vec<(Instant, usize)>>>,
294 max_queue_depth: Arc<Mutex<usize>>,
295 time_at_capacity: Arc<Mutex<Duration>>,
296 last_capacity_check: Arc<Mutex<Instant>>,
297
298 minute_buckets: Arc<Mutex<Vec<BucketStats>>>,
300 hour_buckets: Arc<Mutex<Vec<BucketStats>>>,
301}
302
303impl MetricsCollector {
304 pub fn new(config: MetricsConfig) -> Self {
306 let max_latency = 10_000_000; Self {
309 config,
310 start_time: Instant::now(),
311 basic: Arc::new(Mutex::new(BasicStats::default())),
312 high_priority_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
313 normal_priority_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
314 low_priority_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
315 overall_latency: Arc::new(Mutex::new(LatencyTracker::new(max_latency))),
316 by_type_latency: Arc::new(Mutex::new(HashMap::new())),
317 event_times: Arc::new(Mutex::new(Vec::new())),
318 processing_times: Arc::new(Mutex::new(Vec::new())),
319 peak_rate: Arc::new(Mutex::new(0.0)),
320 queue_depths: Arc::new(Mutex::new(Vec::new())),
321 max_queue_depth: Arc::new(Mutex::new(0)),
322 time_at_capacity: Arc::new(Mutex::new(Duration::ZERO)),
323 last_capacity_check: Arc::new(Mutex::new(Instant::now())),
324 minute_buckets: Arc::new(Mutex::new(Vec::new())),
325 hour_buckets: Arc::new(Mutex::new(Vec::new())),
326 }
327 }
328
329 pub fn record_event(
331 &self,
332 priority: crate::priority_queue::Priority,
333 latency: Duration,
334 event_type: Option<&str>,
335 ) {
336 if self.config.sampling_rate < 1.0 {
338 use std::sync::atomic::{AtomicUsize, Ordering};
340 static COUNTER: AtomicUsize = AtomicUsize::new(0);
341 let count = COUNTER.fetch_add(1, Ordering::Relaxed);
342 let sample_every = (1.0 / self.config.sampling_rate) as usize;
343 if !count.is_multiple_of(sample_every) {
344 return;
345 }
346 }
347
348 let latency_us = latency.as_micros() as u64;
349
350 {
352 let mut basic = self.basic.lock().unwrap();
353 basic.total_events += 1;
354 match priority {
355 crate::priority_queue::Priority::High => basic.high_priority_events += 1,
356 crate::priority_queue::Priority::Normal => basic.normal_priority_events += 1,
357 crate::priority_queue::Priority::Low => basic.low_priority_events += 1,
358 }
359 }
360
361 if self.config.track_percentiles {
363 self.overall_latency.lock().unwrap().record(latency_us);
364
365 match priority {
366 crate::priority_queue::Priority::High => {
367 self.high_priority_latency
368 .lock()
369 .unwrap()
370 .record(latency_us);
371 }
372 crate::priority_queue::Priority::Normal => {
373 self.normal_priority_latency
374 .lock()
375 .unwrap()
376 .record(latency_us);
377 }
378 crate::priority_queue::Priority::Low => {
379 self.low_priority_latency.lock().unwrap().record(latency_us);
380 }
381 }
382
383 if self.config.track_by_type {
384 if let Some(event_type) = event_type {
385 let mut by_type = self.by_type_latency.lock().unwrap();
386 by_type
387 .entry(event_type.to_string())
388 .or_insert_with(|| LatencyTracker::new(10_000_000))
389 .record(latency_us);
390 }
391 }
392 }
393
394 {
396 let mut event_times = self.event_times.lock().unwrap();
397 let now = Instant::now();
398 event_times.push(now);
399
400 let cutoff = now - self.config.rate_window;
402 event_times.retain(|t| *t > cutoff);
403
404 if event_times.len() > 1 {
406 let duration = now.duration_since(event_times[0]).as_secs_f64();
407 if duration > 0.0 {
408 let rate = event_times.len() as f64 / duration;
409 let mut peak = self.peak_rate.lock().unwrap();
410 if rate > *peak {
411 *peak = rate;
412 }
413 }
414 }
415 }
416
417 self.processing_times.lock().unwrap().push(latency);
419
420 trace!("Recorded event: priority={priority:?}, latency={latency_us}μs");
421 }
422
423 pub fn record_dropped(&self) {
425 self.basic.lock().unwrap().dropped_events += 1;
426 }
427
428 pub fn record_backpressure(&self) {
430 self.basic.lock().unwrap().backpressure_activations += 1;
431 }
432
433 pub fn update_queue_depth(&self, depth: usize, capacity: usize) {
435 let now = Instant::now();
436
437 {
439 let mut depths = self.queue_depths.lock().unwrap();
440 depths.push((now, depth));
441
442 let cutoff = now - Duration::from_secs(300); depths.retain(|(t, _)| *t > cutoff);
445 }
446
447 {
449 let mut max_depth = self.max_queue_depth.lock().unwrap();
450 if depth > *max_depth {
451 *max_depth = depth;
452 }
453 }
454
455 if depth >= capacity {
457 let mut last_check = self.last_capacity_check.lock().unwrap();
458 let duration = now.duration_since(*last_check);
459 *self.time_at_capacity.lock().unwrap() += duration;
460 *last_check = now;
461 }
462 }
463
464 pub fn snapshot(&self) -> AdvancedEventStats {
466 let now = Instant::now();
467 let elapsed = now.duration_since(self.start_time).as_secs_f64();
468
469 let latency = LatencyStats {
471 high_priority: self.high_priority_latency.lock().unwrap().percentiles(),
472 normal_priority: self.normal_priority_latency.lock().unwrap().percentiles(),
473 low_priority: self.low_priority_latency.lock().unwrap().percentiles(),
474 overall: self.overall_latency.lock().unwrap().percentiles(),
475 by_type: self
476 .by_type_latency
477 .lock()
478 .unwrap()
479 .iter()
480 .map(|(k, v)| (k.clone(), v.percentiles()))
481 .collect(),
482 };
483
484 let throughput = {
486 let event_times = self.event_times.lock().unwrap();
487 let processing_times = self.processing_times.lock().unwrap();
488
489 let current_rate = if event_times.len() > 1 {
490 let duration = now.duration_since(event_times[0]).as_secs_f64();
491 if duration > 0.0 {
492 event_times.len() as f64 / duration
493 } else {
494 0.0
495 }
496 } else {
497 0.0
498 };
499
500 let avg_processing = if !processing_times.is_empty() {
501 let sum: Duration = processing_times.iter().sum();
502 sum.as_micros() as f64 / processing_times.len() as f64
503 } else {
504 0.0
505 };
506
507 ThroughputStats {
508 current_rate,
509 peak_rate: *self.peak_rate.lock().unwrap(),
510 avg_rate_1m: current_rate, avg_rate_5m: current_rate, avg_processing_time_us: avg_processing,
513 }
514 };
515
516 let queue = {
518 let depths = self.queue_depths.lock().unwrap();
519 let current_depth = depths.last().map(|(_, d)| *d).unwrap_or(0);
520
521 let avg_depth = if !depths.is_empty() {
522 depths.iter().map(|(_, d)| *d).sum::<usize>() as f64 / depths.len() as f64
523 } else {
524 0.0
525 };
526
527 let growth_rate = if depths.len() > 1 {
528 let first = depths.first().unwrap().1 as f64;
529 let last = depths.last().unwrap().1 as f64;
530 let duration = depths
531 .last()
532 .unwrap()
533 .0
534 .duration_since(depths.first().unwrap().0)
535 .as_secs_f64();
536 if duration > 0.0 {
537 (last - first) / duration
538 } else {
539 0.0
540 }
541 } else {
542 0.0
543 };
544
545 let saturation = if elapsed > 0.0 {
546 self.time_at_capacity.lock().unwrap().as_secs_f64() / elapsed * 100.0
547 } else {
548 0.0
549 };
550
551 QueueStats {
552 current_depth,
553 max_depth: *self.max_queue_depth.lock().unwrap(),
554 avg_depth,
555 saturation_percentage: saturation,
556 growth_rate,
557 }
558 };
559
560 AdvancedEventStats {
561 basic: self.basic.lock().unwrap().clone(),
562 latency,
563 throughput,
564 queue,
565 windows: WindowedStats::default(), }
567 }
568
569 pub fn reset(&self) {
571 *self.basic.lock().unwrap() = BasicStats::default();
572 self.high_priority_latency.lock().unwrap().reset();
573 self.normal_priority_latency.lock().unwrap().reset();
574 self.low_priority_latency.lock().unwrap().reset();
575 self.overall_latency.lock().unwrap().reset();
576 self.by_type_latency.lock().unwrap().clear();
577 self.event_times.lock().unwrap().clear();
578 self.processing_times.lock().unwrap().clear();
579 *self.peak_rate.lock().unwrap() = 0.0;
580 self.queue_depths.lock().unwrap().clear();
581 *self.max_queue_depth.lock().unwrap() = 0;
582 *self.time_at_capacity.lock().unwrap() = Duration::ZERO;
583 self.minute_buckets.lock().unwrap().clear();
584 self.hour_buckets.lock().unwrap().clear();
585 }
586
587 pub fn export_json(&self) -> String {
589 let stats = self.snapshot();
590 stats.export(ExportFormat::Json)
591 }
592
593 pub fn export_prometheus(&self) -> String {
595 let stats = self.snapshot();
596 stats.export(ExportFormat::Prometheus)
597 }
598
599 pub fn export_text(&self) -> String {
601 let stats = self.snapshot();
602 stats.export(ExportFormat::PlainText)
603 }
604}
605
606#[derive(Debug, Clone, Copy)]
608pub enum ExportFormat {
609 Json,
611 Prometheus,
613 PlainText,
615}
616
617impl AdvancedEventStats {
618 pub fn export(&self, format: ExportFormat) -> String {
620 match format {
621 ExportFormat::Json => self.to_json(),
622 ExportFormat::Prometheus => self.to_prometheus(),
623 ExportFormat::PlainText => self.to_plain_text(),
624 }
625 }
626
627 fn to_json(&self) -> String {
628 serde_json::to_string_pretty(self)
629 .unwrap_or_else(|e| format!("Failed to serialize metrics: {e}"))
630 }
631
632 fn to_prometheus(&self) -> String {
633 let mut output = String::new();
634
635 output.push_str("# HELP hojicha_events_total Total events processed\n");
637 output.push_str("# TYPE hojicha_events_total counter\n");
638 output.push_str(&format!(
639 "hojicha_events_total {{}} {}\n",
640 self.basic.total_events
641 ));
642 output.push_str(&format!(
643 "hojicha_events_total {{priority=\"high\"}} {}\n",
644 self.basic.high_priority_events
645 ));
646 output.push_str(&format!(
647 "hojicha_events_total {{priority=\"normal\"}} {}\n",
648 self.basic.normal_priority_events
649 ));
650 output.push_str(&format!(
651 "hojicha_events_total {{priority=\"low\"}} {}\n",
652 self.basic.low_priority_events
653 ));
654
655 output.push_str("# HELP hojicha_events_dropped Total events dropped\n");
657 output.push_str("# TYPE hojicha_events_dropped counter\n");
658 output.push_str(&format!(
659 "hojicha_events_dropped {{}} {}\n",
660 self.basic.dropped_events
661 ));
662
663 output.push_str("# HELP hojicha_event_latency_microseconds Event processing latency\n");
665 output.push_str("# TYPE hojicha_event_latency_microseconds summary\n");
666
667 for (priority, stats) in [
668 ("high", &self.latency.high_priority),
669 ("normal", &self.latency.normal_priority),
670 ("low", &self.latency.low_priority),
671 ] {
672 output.push_str(&format!(
673 "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.5\"}} {}\n",
674 priority, stats.p50
675 ));
676 output.push_str(&format!(
677 "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.9\"}} {}\n",
678 priority, stats.p90
679 ));
680 output.push_str(&format!(
681 "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.95\"}} {}\n",
682 priority, stats.p95
683 ));
684 output.push_str(&format!(
685 "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.99\"}} {}\n",
686 priority, stats.p99
687 ));
688 output.push_str(&format!(
689 "hojicha_event_latency_microseconds {{priority=\"{}\",quantile=\"0.999\"}} {}\n",
690 priority, stats.p999
691 ));
692 }
693
694 output.push_str("# HELP hojicha_throughput_rate Events per second\n");
696 output.push_str("# TYPE hojicha_throughput_rate gauge\n");
697 output.push_str(&format!(
698 "hojicha_throughput_rate {{type=\"current\"}} {}\n",
699 self.throughput.current_rate
700 ));
701 output.push_str(&format!(
702 "hojicha_throughput_rate {{type=\"peak\"}} {}\n",
703 self.throughput.peak_rate
704 ));
705
706 output.push_str("# HELP hojicha_queue_depth Current queue depth\n");
708 output.push_str("# TYPE hojicha_queue_depth gauge\n");
709 output.push_str(&format!(
710 "hojicha_queue_depth {{}} {}\n",
711 self.queue.current_depth
712 ));
713
714 output.push_str("# HELP hojicha_queue_saturation Queue saturation percentage\n");
715 output.push_str("# TYPE hojicha_queue_saturation gauge\n");
716 output.push_str(&format!(
717 "hojicha_queue_saturation {{}} {}\n",
718 self.queue.saturation_percentage
719 ));
720
721 output
722 }
723
724 fn to_plain_text(&self) -> String {
725 format!(
726 "Event Processing Metrics\n\
727 ========================\n\
728 Total Events: {}\n\
729 - High Priority: {}\n\
730 - Normal Priority: {}\n\
731 - Low Priority: {}\n\
732 - Dropped: {}\n\n\
733 Latency (μs):\n\
734 - High Priority: p50={} p95={} p99={} max={}\n\
735 - Normal Priority: p50={} p95={} p99={} max={}\n\
736 - Low Priority: p50={} p95={} p99={} max={}\n\n\
737 Throughput:\n\
738 - Current Rate: {:.1} events/sec\n\
739 - Peak Rate: {:.1} events/sec\n\
740 - Avg Processing Time: {:.1} μs\n\n\
741 Queue:\n\
742 - Current Depth: {}\n\
743 - Max Depth: {}\n\
744 - Saturation: {:.1}%\n\
745 - Growth Rate: {:.1} events/sec",
746 self.basic.total_events,
747 self.basic.high_priority_events,
748 self.basic.normal_priority_events,
749 self.basic.low_priority_events,
750 self.basic.dropped_events,
751 self.latency.high_priority.p50,
752 self.latency.high_priority.p95,
753 self.latency.high_priority.p99,
754 self.latency.high_priority.max,
755 self.latency.normal_priority.p50,
756 self.latency.normal_priority.p95,
757 self.latency.normal_priority.p99,
758 self.latency.normal_priority.max,
759 self.latency.low_priority.p50,
760 self.latency.low_priority.p95,
761 self.latency.low_priority.p99,
762 self.latency.low_priority.max,
763 self.throughput.current_rate,
764 self.throughput.peak_rate,
765 self.throughput.avg_processing_time_us,
766 self.queue.current_depth,
767 self.queue.max_depth,
768 self.queue.saturation_percentage,
769 self.queue.growth_rate,
770 )
771 }
772}
773
774pub fn print_metrics_dashboard(stats: &AdvancedEventStats) {
776 eprintln!("╔══════════════════════════════════════════════════════════════╗");
777 eprintln!("║ Event Processing Metrics Dashboard ║");
778 eprintln!("╠══════════════════════════════════════════════════════════════╣");
779 eprintln!("║ Throughput: ║");
780 eprintln!(
781 "║ Current: {:>8.1} evt/s Peak: {:>8.1} evt/s ║",
782 stats.throughput.current_rate, stats.throughput.peak_rate
783 );
784 eprintln!(
785 "║ Processing Time: {:>8.1} μs average ║",
786 stats.throughput.avg_processing_time_us
787 );
788 eprintln!("║ ║");
789 eprintln!("║ Latencies (μs): P50 P95 P99 Max ║");
790 eprintln!(
791 "║ High Priority: {:>7} {:>7} {:>7} {:>7} ║",
792 stats.latency.high_priority.p50,
793 stats.latency.high_priority.p95,
794 stats.latency.high_priority.p99,
795 stats.latency.high_priority.max
796 );
797 eprintln!(
798 "║ Normal Priority:{:>7} {:>7} {:>7} {:>7} ║",
799 stats.latency.normal_priority.p50,
800 stats.latency.normal_priority.p95,
801 stats.latency.normal_priority.p99,
802 stats.latency.normal_priority.max
803 );
804 eprintln!(
805 "║ Low Priority: {:>7} {:>7} {:>7} {:>7} ║",
806 stats.latency.low_priority.p50,
807 stats.latency.low_priority.p95,
808 stats.latency.low_priority.p99,
809 stats.latency.low_priority.max
810 );
811 eprintln!("║ ║");
812 eprintln!("║ Queue: ║");
813 eprintln!(
814 "║ Depth: {:>5} (max: {:>5}) Saturation: {:>5.1}% ║",
815 stats.queue.current_depth, stats.queue.max_depth, stats.queue.saturation_percentage
816 );
817 eprintln!(
818 "║ Growth Rate: {:>8.1} events/sec ║",
819 stats.queue.growth_rate
820 );
821 eprintln!("║ ║");
822 eprintln!("║ Events: ║");
823 eprintln!(
824 "║ Total: {:>8} Dropped: {:>6} Backpressure: {:>6} ║",
825 stats.basic.total_events, stats.basic.dropped_events, stats.basic.backpressure_activations
826 );
827 eprintln!("╚══════════════════════════════════════════════════════════════╝");
828}
829
830pub fn display_dashboard(stats: &AdvancedEventStats) -> String {
832 let mut output = String::new();
833 use std::fmt::Write;
834
835 let _ = writeln!(
836 output,
837 "╔══════════════════════════════════════════════════════════════╗"
838 );
839 let _ = writeln!(
840 output,
841 "║ METRICS DASHBOARD ║"
842 );
843 let _ = writeln!(
844 output,
845 "╠══════════════════════════════════════════════════════════════╣"
846 );
847
848 let _ = writeln!(
850 output,
851 "║ Events Processed: {:>10} ║",
852 stats.basic.total_events
853 );
854 let _ = writeln!(
855 output,
856 "║ Events Dropped: {:>10} ║",
857 stats.basic.dropped_events
858 );
859 let _ = writeln!(
860 output,
861 "║ Backpressure: {:>10} ║",
862 stats.basic.backpressure_activations
863 );
864
865 let _ = writeln!(
866 output,
867 "╠══════════════════════════════════════════════════════════════╣"
868 );
869
870 let _ = writeln!(
872 output,
873 "║ LATENCY (μs) - Overall ║"
874 );
875 let _ = writeln!(
876 output,
877 "║ p50: {:>8.1} p75: {:>8.1} p90: {:>8.1} ║",
878 stats.latency.overall.p50, stats.latency.overall.p75, stats.latency.overall.p90
879 );
880 let _ = writeln!(
881 output,
882 "║ p95: {:>8.1} p99: {:>8.1} p999: {:>8.1} ║",
883 stats.latency.overall.p95, stats.latency.overall.p99, stats.latency.overall.p999
884 );
885 let _ = writeln!(
886 output,
887 "║ min: {:>8.1} max: {:>8.1} ║",
888 stats.latency.overall.min, stats.latency.overall.max
889 );
890
891 let _ = writeln!(
892 output,
893 "╠══════════════════════════════════════════════════════════════╣"
894 );
895
896 let _ = writeln!(
898 output,
899 "║ THROUGHPUT ║"
900 );
901 let _ = writeln!(
902 output,
903 "║ Current rate: {:>10.1} events/sec ║",
904 stats.throughput.current_rate
905 );
906 let _ = writeln!(
907 output,
908 "║ Peak rate: {:>10.1} events/sec ║",
909 stats.throughput.peak_rate
910 );
911
912 let _ = writeln!(
913 output,
914 "╠══════════════════════════════════════════════════════════════╣"
915 );
916
917 let _ = writeln!(
919 output,
920 "║ QUEUE ║"
921 );
922 let _ = writeln!(
923 output,
924 "║ Current depth: {:>6} ║",
925 stats.queue.current_depth
926 );
927 let _ = writeln!(
928 output,
929 "║ Max depth: {:>6} ║",
930 stats.queue.max_depth
931 );
932 let _ = writeln!(
933 output,
934 "║ Avg depth: {:>6.1} ║",
935 stats.queue.avg_depth
936 );
937 let _ = writeln!(
938 output,
939 "║ Saturation: {:>6.1}% ║",
940 stats.queue.saturation_percentage
941 );
942
943 let _ = writeln!(
944 output,
945 "╚══════════════════════════════════════════════════════════════╝"
946 );
947
948 output
949}
950
951#[cfg(test)]
952mod tests {
953 use super::*;
954 use crate::priority_queue::Priority;
955
956 #[test]
957 fn test_metrics_collector_creation() {
958 let collector = MetricsCollector::new(MetricsConfig::default());
959 let stats = collector.snapshot();
960
961 assert_eq!(stats.basic.total_events, 0);
962 assert_eq!(stats.latency.high_priority.count, 0);
963 }
964
965 #[test]
966 fn test_event_recording() {
967 let collector = MetricsCollector::new(MetricsConfig::default());
968
969 collector.record_event(Priority::High, Duration::from_micros(100), Some("test"));
971 collector.record_event(Priority::Normal, Duration::from_micros(200), Some("test"));
972 collector.record_event(Priority::Low, Duration::from_micros(300), None);
973
974 let stats = collector.snapshot();
975
976 assert_eq!(stats.basic.total_events, 3);
977 assert_eq!(stats.basic.high_priority_events, 1);
978 assert_eq!(stats.basic.normal_priority_events, 1);
979 assert_eq!(stats.basic.low_priority_events, 1);
980
981 assert!(stats.latency.high_priority.count > 0);
983 assert!(stats.latency.high_priority.mean > 0.0);
984 }
985
986 #[test]
987 fn test_percentile_calculation() {
988 let collector = MetricsCollector::new(MetricsConfig::default());
989
990 for i in 1..=100 {
992 collector.record_event(Priority::High, Duration::from_micros(i * 10), None);
993 }
994
995 let stats = collector.snapshot();
996 let percentiles = &stats.latency.high_priority;
997
998 assert!(percentiles.p50 >= 490 && percentiles.p50 <= 510);
1000
1001 assert!(percentiles.p99 >= 980 && percentiles.p99 <= 1000);
1003
1004 assert_eq!(percentiles.count, 100);
1005 }
1006
1007 #[test]
1008 fn test_export_formats() {
1009 let collector = MetricsCollector::new(MetricsConfig::default());
1010 collector.record_event(Priority::High, Duration::from_micros(100), None);
1011
1012 let stats = collector.snapshot();
1013
1014 let json = stats.export(ExportFormat::Json);
1016 assert!(json.contains("\"total_events\": 1"));
1017
1018 let prometheus = stats.export(ExportFormat::Prometheus);
1020 assert!(prometheus.contains("hojicha_events_total"));
1021 assert!(prometheus.contains("hojicha_event_latency_microseconds"));
1022
1023 let text = stats.export(ExportFormat::PlainText);
1025 assert!(text.contains("Total Events: 1"));
1026 }
1027
1028 #[test]
1029 fn test_metrics_reset() {
1030 let collector = MetricsCollector::new(MetricsConfig::default());
1031
1032 for _ in 0..10 {
1034 collector.record_event(Priority::High, Duration::from_micros(100), None);
1035 }
1036
1037 let stats = collector.snapshot();
1038 assert_eq!(stats.basic.total_events, 10);
1039
1040 collector.reset();
1042 let stats = collector.snapshot();
1043 assert_eq!(stats.basic.total_events, 0);
1044 assert_eq!(stats.latency.high_priority.count, 0);
1045 }
1046}