1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::Arc;
3
4use super::constants::*;
5
6#[derive(Debug, Clone, Copy)]
8pub enum EventType {
9 Transaction = 0,
10 Account = 1,
11 BlockMeta = 2,
12}
13
14pub type MetricsEventType = EventType;
16
17impl EventType {
18 #[inline]
19 const fn as_index(self) -> usize {
20 self as usize
21 }
22
23 const fn name(self) -> &'static str {
24 match self {
25 EventType::Transaction => "TX",
26 EventType::Account => "Account",
27 EventType::BlockMeta => "Block Meta",
28 }
29 }
30
31 pub const TX: EventType = EventType::Transaction;
33}
34
35#[derive(Debug)]
37struct AtomicEventMetrics {
38 process_count: AtomicU64,
39 events_processed: AtomicU64,
40 events_in_window: AtomicU64,
41 window_start_nanos: AtomicU64,
42 processing_stats: AtomicProcessingTimeStats,
44}
45
46impl AtomicEventMetrics {
47 fn new(now_nanos: u64) -> Self {
48 Self {
49 process_count: AtomicU64::new(0),
50 events_processed: AtomicU64::new(0),
51 events_in_window: AtomicU64::new(0),
52 window_start_nanos: AtomicU64::new(now_nanos),
53 processing_stats: AtomicProcessingTimeStats::new(),
54 }
55 }
56
57 #[inline]
59 fn add_process_count(&self) {
60 self.process_count.fetch_add(1, Ordering::Relaxed);
61 }
62
63 #[inline]
65 fn add_events_processed(&self, count: u64) {
66 self.events_processed.fetch_add(count, Ordering::Relaxed);
67 self.events_in_window.fetch_add(count, Ordering::Relaxed);
68 }
69
70 #[inline]
72 fn get_counts(&self) -> (u64, u64, u64) {
73 (
74 self.process_count.load(Ordering::Relaxed),
75 self.events_processed.load(Ordering::Relaxed),
76 self.events_in_window.load(Ordering::Relaxed),
77 )
78 }
79
80 #[inline]
82 fn reset_window(&self, new_start_nanos: u64) {
83 self.events_in_window.store(0, Ordering::Relaxed);
84 self.window_start_nanos.store(new_start_nanos, Ordering::Relaxed);
85 }
86
87 #[inline]
88 fn get_window_start(&self) -> u64 {
89 self.window_start_nanos.load(Ordering::Relaxed)
90 }
91
92 #[inline]
94 fn get_processing_stats(&self) -> ProcessingTimeStats {
95 self.processing_stats.get_stats()
96 }
97
98 #[inline]
100 fn update_processing_stats(&self, time_us: f64, event_count: u64) {
101 self.processing_stats.update(time_us, event_count);
102 }
103}
104
105#[derive(Debug)]
107struct AtomicProcessingTimeStats {
108 min_time_bits: AtomicU64,
109 max_time_bits: AtomicU64,
110 min_time_timestamp_nanos: AtomicU64, max_time_timestamp_nanos: AtomicU64, total_time_us: AtomicU64, total_events: AtomicU64,
114}
115
116impl AtomicProcessingTimeStats {
117 fn new() -> Self {
118 let now_nanos =
119 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
120 as u64;
121
122 Self {
123 min_time_bits: AtomicU64::new(f64::INFINITY.to_bits()),
124 max_time_bits: AtomicU64::new(0),
125 min_time_timestamp_nanos: AtomicU64::new(now_nanos),
126 max_time_timestamp_nanos: AtomicU64::new(now_nanos),
127 total_time_us: AtomicU64::new(0),
128 total_events: AtomicU64::new(0),
129 }
130 }
131
132 #[inline]
134 fn update(&self, time_us: f64, event_count: u64) {
135 let time_bits = time_us.to_bits();
136 let now_nanos =
137 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
138 as u64;
139
140 let mut current_min = self.min_time_bits.load(Ordering::Relaxed);
142 let min_timestamp = self.min_time_timestamp_nanos.load(Ordering::Relaxed);
143
144 let min_time_diff_nanos = now_nanos.saturating_sub(min_timestamp);
146 if min_time_diff_nanos > 10_000_000_000 {
147 self.min_time_bits.store(f64::INFINITY.to_bits(), Ordering::Relaxed);
149 self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
150 current_min = f64::INFINITY.to_bits();
151 }
152
153 while time_bits < current_min {
155 match self.min_time_bits.compare_exchange_weak(
156 current_min,
157 time_bits,
158 Ordering::Relaxed,
159 Ordering::Relaxed,
160 ) {
161 Ok(_) => {
162 self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
164 break;
165 }
166 Err(x) => current_min = x,
167 }
168 }
169
170 let mut current_max = self.max_time_bits.load(Ordering::Relaxed);
172 let max_timestamp = self.max_time_timestamp_nanos.load(Ordering::Relaxed);
173
174 let time_diff_nanos = now_nanos.saturating_sub(max_timestamp);
176 if time_diff_nanos > 10_000_000_000 {
177 self.max_time_bits.store(0, Ordering::Relaxed);
179 self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
180 current_max = 0;
181 }
182
183 while time_bits > current_max {
185 match self.max_time_bits.compare_exchange_weak(
186 current_max,
187 time_bits,
188 Ordering::Relaxed,
189 Ordering::Relaxed,
190 ) {
191 Ok(_) => {
192 self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
194 break;
195 }
196 Err(x) => current_max = x,
197 }
198 }
199
200 let total_time_us_int = (time_us * event_count as f64) as u64;
202 self.total_time_us.fetch_add(total_time_us_int, Ordering::Relaxed);
203 self.total_events.fetch_add(event_count, Ordering::Relaxed);
204 }
205
206 #[inline]
208 fn get_stats(&self) -> ProcessingTimeStats {
209 let min_bits = self.min_time_bits.load(Ordering::Relaxed);
210 let max_bits = self.max_time_bits.load(Ordering::Relaxed);
211 let total_time_us_int = self.total_time_us.load(Ordering::Relaxed);
212 let total_events = self.total_events.load(Ordering::Relaxed);
213
214 let min_time = f64::from_bits(min_bits);
215 let max_time = f64::from_bits(max_bits);
216 let avg_time =
217 if total_events > 0 { total_time_us_int as f64 / total_events as f64 } else { 0.0 };
218
219 ProcessingTimeStats {
220 min_us: if min_time == f64::INFINITY { 0.0 } else { min_time },
221 max_us: max_time,
222 avg_us: avg_time,
223 }
224 }
225}
226
227#[derive(Debug, Clone)]
229pub struct ProcessingTimeStats {
230 pub min_us: f64,
231 pub max_us: f64,
232 pub avg_us: f64,
233}
234
235#[derive(Debug, Clone)]
237pub struct EventMetricsSnapshot {
238 pub process_count: u64,
239 pub events_processed: u64,
240 pub processing_stats: ProcessingTimeStats,
241}
242
243#[derive(Debug, Clone)]
245pub struct PerformanceMetrics {
246 pub uptime: std::time::Duration,
247 pub tx_metrics: EventMetricsSnapshot,
248 pub account_metrics: EventMetricsSnapshot,
249 pub block_meta_metrics: EventMetricsSnapshot,
250 pub processing_stats: ProcessingTimeStats,
251 pub dropped_events_count: u64,
252}
253
254impl PerformanceMetrics {
255 pub fn new() -> Self {
257 let default_stats = ProcessingTimeStats { min_us: 0.0, max_us: 0.0, avg_us: 0.0 };
258 let default_metrics = EventMetricsSnapshot {
259 process_count: 0,
260 events_processed: 0,
261 processing_stats: default_stats.clone(),
262 };
263
264 Self {
265 uptime: std::time::Duration::ZERO,
266 tx_metrics: default_metrics.clone(),
267 account_metrics: default_metrics.clone(),
268 block_meta_metrics: default_metrics,
269 processing_stats: default_stats,
270 dropped_events_count: 0,
271 }
272 }
273}
274
275impl Default for PerformanceMetrics {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281#[derive(Debug)]
283pub struct HighPerformanceMetrics {
284 start_nanos: u64,
285 event_metrics: [AtomicEventMetrics; 3],
286 processing_stats: AtomicProcessingTimeStats,
287 dropped_events_count: AtomicU64,
289}
290
291impl HighPerformanceMetrics {
292 fn new() -> Self {
293 let now_nanos =
294 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
295 as u64;
296
297 Self {
298 start_nanos: now_nanos,
299 event_metrics: [
300 AtomicEventMetrics::new(now_nanos),
301 AtomicEventMetrics::new(now_nanos),
302 AtomicEventMetrics::new(now_nanos),
303 ],
304 processing_stats: AtomicProcessingTimeStats::new(),
305 dropped_events_count: AtomicU64::new(0),
307 }
308 }
309
310 #[inline]
312 pub fn get_uptime_seconds(&self) -> f64 {
313 let now_nanos =
314 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
315 as u64;
316 (now_nanos - self.start_nanos) as f64 / 1_000_000_000.0
317 }
318
319 #[inline]
321 pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
322 let index = event_type.as_index();
323 let (process_count, events_processed, _) = self.event_metrics[index].get_counts();
324 let processing_stats = self.event_metrics[index].get_processing_stats();
325
326 EventMetricsSnapshot { process_count, events_processed, processing_stats }
327 }
328
329 #[inline]
331 pub fn get_processing_stats(&self) -> ProcessingTimeStats {
332 self.processing_stats.get_stats()
333 }
334
335 #[inline]
337 pub fn get_dropped_events_count(&self) -> u64 {
338 self.dropped_events_count.load(Ordering::Relaxed)
339 }
340
341 fn update_window_metrics(&self, event_type: EventType, window_duration_nanos: u64) {
343 let now_nanos =
344 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
345 as u64;
346
347 let index = event_type.as_index();
348 let event_metric = &self.event_metrics[index];
349
350 let window_start = event_metric.get_window_start();
351 if now_nanos.saturating_sub(window_start) >= window_duration_nanos {
352 event_metric.reset_window(now_nanos);
353 }
354 }
355}
356
357pub struct MetricsManager {
359 metrics: Arc<HighPerformanceMetrics>,
360 enable_metrics: bool,
361 stream_name: String,
362 background_task_running: AtomicBool,
363}
364
365impl MetricsManager {
366 pub fn new(enable_metrics: bool, stream_name: String) -> Self {
368 let manager = Self {
369 metrics: Arc::new(HighPerformanceMetrics::new()),
370 enable_metrics,
371 stream_name,
372 background_task_running: AtomicBool::new(false),
373 };
374
375 manager.start_background_tasks();
377 manager
378 }
379
380 fn start_background_tasks(&self) {
382 if self
383 .background_task_running
384 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
385 .is_ok()
386 {
387 if !self.enable_metrics {
388 return;
389 }
390
391 let metrics = self.metrics.clone();
392
393 tokio::spawn(async move {
394 let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
395
396 loop {
397 interval.tick().await;
398
399 let window_duration_nanos = DEFAULT_METRICS_WINDOW_SECONDS * 1_000_000_000;
400
401 metrics.update_window_metrics(EventType::Transaction, window_duration_nanos);
403 metrics.update_window_metrics(EventType::Account, window_duration_nanos);
404 metrics.update_window_metrics(EventType::BlockMeta, window_duration_nanos);
405 }
406 });
407 }
408 }
409
410 #[inline]
412 pub fn record_process(&self, event_type: EventType) {
413 if self.enable_metrics {
414 self.metrics.event_metrics[event_type.as_index()].add_process_count();
415 }
416 }
417
418 #[inline]
420 pub fn record_events(&self, event_type: EventType, count: u64, processing_time_us: f64) {
421 if !self.enable_metrics {
422 return;
423 }
424
425 let index = event_type.as_index();
426
427 self.metrics.event_metrics[index].add_events_processed(count);
429
430 self.metrics.event_metrics[index].update_processing_stats(processing_time_us, count);
432
433 self.metrics.processing_stats.update(processing_time_us, count);
435 }
436
437 #[inline]
439 pub fn log_slow_processing(&self, processing_time_us: f64, event_count: usize) {
440 if processing_time_us > SLOW_PROCESSING_THRESHOLD_US {
441 log::debug!(
442 "{} slow processing: {:.2}us for {} events",
443 self.stream_name,
444 processing_time_us,
445 event_count,
446 );
447 }
448 }
449
450 pub fn get_uptime(&self) -> std::time::Duration {
452 std::time::Duration::from_secs_f64(self.metrics.get_uptime_seconds())
453 }
454
455 pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
457 self.metrics.get_event_metrics(event_type)
458 }
459
460 pub fn get_processing_stats(&self) -> ProcessingTimeStats {
462 self.metrics.get_processing_stats()
463 }
464
465 pub fn get_dropped_events_count(&self) -> u64 {
467 self.metrics.get_dropped_events_count()
468 }
469
470 pub fn print_metrics(&self) {
472 println!("\n📊 {} Performance Metrics", self.stream_name);
473 println!(" Run Time: {:?}", self.get_uptime());
474
475 let dropped_count = self.get_dropped_events_count();
477 if dropped_count > 0 {
478 println!("\n⚠️ Dropped Events: {}", dropped_count);
479 }
480
481 println!("┌─────────────┬──────────────┬──────────────────┬─────────────┬─────────────┬─────────────┐");
483 println!("│ Event Type │ Process Count│ Events Processed │ Avg Time(μs)│ Min 10s(μs) │ Max 10s(μs) │");
484 println!("├─────────────┼──────────────┼──────────────────┼─────────────┼─────────────┼─────────────┤");
485
486 for event_type in [EventType::Transaction, EventType::Account, EventType::BlockMeta] {
487 let metrics = self.get_event_metrics(event_type);
488 println!(
489 "│ {:11} │ {:12} │ {:16} │ {:9.2} │ {:9.2} │ {:9.2} │",
490 event_type.name(),
491 metrics.process_count,
492 metrics.events_processed,
493 metrics.processing_stats.avg_us,
494 metrics.processing_stats.min_us,
495 metrics.processing_stats.max_us
496 );
497 }
498
499 println!("└─────────────┴──────────────┴──────────────────┴─────────────┴─────────────┴─────────────┘");
500 println!();
501 }
502
503 pub async fn start_auto_monitoring(&self) -> Option<tokio::task::JoinHandle<()>> {
505 if !self.enable_metrics {
506 return None;
507 }
508
509 let manager = self.clone();
510 let handle = tokio::spawn(async move {
511 let mut interval = tokio::time::interval(std::time::Duration::from_secs(
512 DEFAULT_METRICS_PRINT_INTERVAL_SECONDS,
513 ));
514 loop {
515 interval.tick().await;
516 manager.print_metrics();
517 }
518 });
519 Some(handle)
520 }
521
522 pub fn new_with_metrics(
526 _metrics: Arc<std::sync::RwLock<PerformanceMetrics>>,
527 enable_metrics: bool,
528 stream_name: String,
529 ) -> Self {
530 Self::new(enable_metrics, stream_name)
531 }
532
533 pub fn get_metrics(&self) -> PerformanceMetrics {
535 PerformanceMetrics {
536 uptime: self.get_uptime(),
537 tx_metrics: self.get_event_metrics(EventType::Transaction),
538 account_metrics: self.get_event_metrics(EventType::Account),
539 block_meta_metrics: self.get_event_metrics(EventType::BlockMeta),
540 processing_stats: self.get_processing_stats(),
541 dropped_events_count: self.metrics.get_dropped_events_count(),
542 }
543 }
544
545 #[inline]
547 pub fn add_tx_process_count(&self) {
548 self.record_process(EventType::Transaction);
549 }
550
551 #[inline]
553 pub fn add_account_process_count(&self) {
554 self.record_process(EventType::Account);
555 }
556
557 #[inline]
559 pub fn add_block_meta_process_count(&self) {
560 self.record_process(EventType::BlockMeta);
561 }
562
563 #[inline]
565 pub fn update_metrics(
566 &self,
567 event_type: MetricsEventType,
568 events_processed: u64,
569 processing_time_us: f64,
570 ) {
571 self.record_events(event_type, events_processed, processing_time_us);
572 self.log_slow_processing(processing_time_us, events_processed as usize);
573 }
574
575 #[inline]
577 pub fn increment_dropped_events(&self) {
578 if !self.enable_metrics {
579 return;
580 }
581
582 let new_count = self.metrics.dropped_events_count.fetch_add(1, Ordering::Relaxed) + 1;
584
585 if new_count.is_multiple_of(1000) {
587 log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
588 }
589 }
590
591 #[inline]
593 pub fn increment_dropped_events_by(&self, count: u64) {
594 if !self.enable_metrics || count == 0 {
595 return;
596 }
597
598 let new_count =
600 self.metrics.dropped_events_count.fetch_add(count, Ordering::Relaxed) + count;
601
602 if count > 1 {
604 log::debug!(
605 "{} dropped batch of {} events, total dropped: {}",
606 self.stream_name,
607 count,
608 new_count
609 );
610 }
611
612 if new_count.is_multiple_of(1000) || (new_count / 1000) != ((new_count - count) / 1000) {
614 log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
615 }
616 }
617}
618
619impl Clone for MetricsManager {
620 fn clone(&self) -> Self {
621 Self {
622 metrics: self.metrics.clone(),
623 enable_metrics: self.enable_metrics,
624 stream_name: self.stream_name.clone(),
625 background_task_running: AtomicBool::new(false), }
627 }
628}