1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::Arc;
3
4use super::constants::*;
5
6#[derive(Debug, Clone, Copy)]
8pub enum EventType {
9 Transaction = 0,
10 Account = 1,
11 BlockMeta = 2,
12}
13
14pub type MetricsEventType = EventType;
16
17impl EventType {
18 #[inline]
19 const fn as_index(self) -> usize {
20 self as usize
21 }
22
23 const fn name(self) -> &'static str {
24 match self {
25 EventType::Transaction => "TX",
26 EventType::Account => "Account",
27 EventType::BlockMeta => "Block Meta",
28 }
29 }
30
31 pub const TX: EventType = EventType::Transaction;
33}
34
35#[derive(Debug)]
37struct AtomicEventMetrics {
38 process_count: AtomicU64,
39 events_processed: AtomicU64,
40 events_in_window: AtomicU64,
41 window_start_nanos: AtomicU64,
42 processing_stats: AtomicProcessingTimeStats,
44}
45
46impl AtomicEventMetrics {
47 fn new(now_nanos: u64) -> Self {
48 Self {
49 process_count: AtomicU64::new(0),
50 events_processed: AtomicU64::new(0),
51 events_in_window: AtomicU64::new(0),
52 window_start_nanos: AtomicU64::new(now_nanos),
53 processing_stats: AtomicProcessingTimeStats::new(),
54 }
55 }
56
57 #[inline]
59 fn add_process_count(&self) {
60 self.process_count.fetch_add(1, Ordering::Relaxed);
61 }
62
63 #[inline]
65 fn add_events_processed(&self, count: u64) {
66 self.events_processed.fetch_add(count, Ordering::Relaxed);
67 self.events_in_window.fetch_add(count, Ordering::Relaxed);
68 }
69
70 #[inline]
72 fn get_counts(&self) -> (u64, u64, u64) {
73 (
74 self.process_count.load(Ordering::Relaxed),
75 self.events_processed.load(Ordering::Relaxed),
76 self.events_in_window.load(Ordering::Relaxed),
77 )
78 }
79
80 #[inline]
82 fn reset_window(&self, new_start_nanos: u64) {
83 self.events_in_window.store(0, Ordering::Relaxed);
84 self.window_start_nanos.store(new_start_nanos, Ordering::Relaxed);
85 }
86
87 #[inline]
88 fn get_window_start(&self) -> u64 {
89 self.window_start_nanos.load(Ordering::Relaxed)
90 }
91
92 #[inline]
94 fn get_processing_stats(&self) -> ProcessingTimeStats {
95 self.processing_stats.get_stats()
96 }
97
98 #[inline]
100 fn update_processing_stats(&self, time_us: f64, event_count: u64) {
101 self.processing_stats.update(time_us, event_count);
102 }
103}
104
105#[derive(Debug)]
107struct AtomicProcessingTimeStats {
108 min_time_bits: AtomicU64,
109 max_time_bits: AtomicU64,
110 min_time_timestamp_nanos: AtomicU64, max_time_timestamp_nanos: AtomicU64, total_time_us: AtomicU64, total_events: AtomicU64,
114}
115
116impl AtomicProcessingTimeStats {
117 fn new() -> Self {
118 let now_nanos =
119 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
120 as u64;
121
122 Self {
123 min_time_bits: AtomicU64::new(f64::INFINITY.to_bits()),
124 max_time_bits: AtomicU64::new(0),
125 min_time_timestamp_nanos: AtomicU64::new(now_nanos),
126 max_time_timestamp_nanos: AtomicU64::new(now_nanos),
127 total_time_us: AtomicU64::new(0),
128 total_events: AtomicU64::new(0),
129 }
130 }
131
132 #[inline]
134 fn update(&self, time_us: f64, event_count: u64) {
135 let time_bits = time_us.to_bits();
136 let now_nanos =
137 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
138 as u64;
139
140 let mut current_min = self.min_time_bits.load(Ordering::Relaxed);
142 let min_timestamp = self.min_time_timestamp_nanos.load(Ordering::Relaxed);
143
144 let min_time_diff_nanos = now_nanos.saturating_sub(min_timestamp);
146 if min_time_diff_nanos > 10_000_000_000 {
147 self.min_time_bits.store(f64::INFINITY.to_bits(), Ordering::Relaxed);
149 self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
150 current_min = f64::INFINITY.to_bits();
151 }
152
153 while time_bits < current_min {
155 match self.min_time_bits.compare_exchange_weak(
156 current_min,
157 time_bits,
158 Ordering::Relaxed,
159 Ordering::Relaxed,
160 ) {
161 Ok(_) => {
162 self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
164 break;
165 }
166 Err(x) => current_min = x,
167 }
168 }
169
170 let mut current_max = self.max_time_bits.load(Ordering::Relaxed);
172 let max_timestamp = self.max_time_timestamp_nanos.load(Ordering::Relaxed);
173
174 let time_diff_nanos = now_nanos.saturating_sub(max_timestamp);
176 if time_diff_nanos > 10_000_000_000 {
177 self.max_time_bits.store(0, Ordering::Relaxed);
179 self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
180 current_max = 0;
181 }
182
183 while time_bits > current_max {
185 match self.max_time_bits.compare_exchange_weak(
186 current_max,
187 time_bits,
188 Ordering::Relaxed,
189 Ordering::Relaxed,
190 ) {
191 Ok(_) => {
192 self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
194 break;
195 }
196 Err(x) => current_max = x,
197 }
198 }
199
200 let total_time_us_int = (time_us * event_count as f64) as u64;
202 self.total_time_us.fetch_add(total_time_us_int, Ordering::Relaxed);
203 self.total_events.fetch_add(event_count, Ordering::Relaxed);
204 }
205
206 #[inline]
208 fn get_stats(&self) -> ProcessingTimeStats {
209 let min_bits = self.min_time_bits.load(Ordering::Relaxed);
210 let max_bits = self.max_time_bits.load(Ordering::Relaxed);
211 let total_time_us_int = self.total_time_us.load(Ordering::Relaxed);
212 let total_events = self.total_events.load(Ordering::Relaxed);
213
214 let min_time = f64::from_bits(min_bits);
215 let max_time = f64::from_bits(max_bits);
216 let avg_time =
217 if total_events > 0 { total_time_us_int as f64 / total_events as f64 } else { 0.0 };
218
219 ProcessingTimeStats {
220 min_us: if min_time == f64::INFINITY { 0.0 } else { min_time },
221 max_us: max_time,
222 avg_us: avg_time,
223 }
224 }
225}
226
227#[derive(Debug, Clone)]
229pub struct ProcessingTimeStats {
230 pub min_us: f64,
231 pub max_us: f64,
232 pub avg_us: f64,
233}
234
235#[derive(Debug, Clone)]
237pub struct EventMetricsSnapshot {
238 pub process_count: u64,
239 pub events_processed: u64,
240 pub processing_stats: ProcessingTimeStats,
241}
242
243#[derive(Debug, Clone)]
245pub struct PerformanceMetrics {
246 pub uptime: std::time::Duration,
247 pub tx_metrics: EventMetricsSnapshot,
248 pub account_metrics: EventMetricsSnapshot,
249 pub block_meta_metrics: EventMetricsSnapshot,
250 pub processing_stats: ProcessingTimeStats,
251 pub dropped_events_count: u64,
252}
253
254impl PerformanceMetrics {
255 pub fn new() -> Self {
257 let default_stats = ProcessingTimeStats { min_us: 0.0, max_us: 0.0, avg_us: 0.0 };
258 let default_metrics = EventMetricsSnapshot {
259 process_count: 0,
260 events_processed: 0,
261 processing_stats: default_stats.clone(),
262 };
263
264 Self {
265 uptime: std::time::Duration::ZERO,
266 tx_metrics: default_metrics.clone(),
267 account_metrics: default_metrics.clone(),
268 block_meta_metrics: default_metrics,
269 processing_stats: default_stats,
270 dropped_events_count: 0,
271 }
272 }
273}
274
275#[derive(Debug)]
277pub struct HighPerformanceMetrics {
278 start_nanos: u64,
279 event_metrics: [AtomicEventMetrics; 3],
280 processing_stats: AtomicProcessingTimeStats,
281 dropped_events_count: AtomicU64,
283}
284
285impl HighPerformanceMetrics {
286 fn new() -> Self {
287 let now_nanos =
288 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
289 as u64;
290
291 Self {
292 start_nanos: now_nanos,
293 event_metrics: [
294 AtomicEventMetrics::new(now_nanos),
295 AtomicEventMetrics::new(now_nanos),
296 AtomicEventMetrics::new(now_nanos),
297 ],
298 processing_stats: AtomicProcessingTimeStats::new(),
299 dropped_events_count: AtomicU64::new(0),
301 }
302 }
303
304 #[inline]
306 pub fn get_uptime_seconds(&self) -> f64 {
307 let now_nanos =
308 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
309 as u64;
310 (now_nanos - self.start_nanos) as f64 / 1_000_000_000.0
311 }
312
313 #[inline]
315 pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
316 let index = event_type.as_index();
317 let (process_count, events_processed, _) = self.event_metrics[index].get_counts();
318 let processing_stats = self.event_metrics[index].get_processing_stats();
319
320 EventMetricsSnapshot { process_count, events_processed, processing_stats }
321 }
322
323 #[inline]
325 pub fn get_processing_stats(&self) -> ProcessingTimeStats {
326 self.processing_stats.get_stats()
327 }
328
329 #[inline]
331 pub fn get_dropped_events_count(&self) -> u64 {
332 self.dropped_events_count.load(Ordering::Relaxed)
333 }
334
335 fn update_window_metrics(&self, event_type: EventType, window_duration_nanos: u64) {
337 let now_nanos =
338 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
339 as u64;
340
341 let index = event_type.as_index();
342 let event_metric = &self.event_metrics[index];
343
344 let window_start = event_metric.get_window_start();
345 if now_nanos.saturating_sub(window_start) >= window_duration_nanos {
346 event_metric.reset_window(now_nanos);
347 }
348 }
349}
350
351pub struct MetricsManager {
353 metrics: Arc<HighPerformanceMetrics>,
354 enable_metrics: bool,
355 stream_name: String,
356 background_task_running: AtomicBool,
357}
358
359impl MetricsManager {
360 pub fn new(enable_metrics: bool, stream_name: String) -> Self {
362 let manager = Self {
363 metrics: Arc::new(HighPerformanceMetrics::new()),
364 enable_metrics,
365 stream_name,
366 background_task_running: AtomicBool::new(false),
367 };
368
369 manager.start_background_tasks();
371 manager
372 }
373
374 fn start_background_tasks(&self) {
376 if self
377 .background_task_running
378 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
379 .is_ok()
380 {
381 if !self.enable_metrics {
382 return;
383 }
384
385 let metrics = self.metrics.clone();
386
387 tokio::spawn(async move {
388 let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
389
390 loop {
391 interval.tick().await;
392
393 let window_duration_nanos = DEFAULT_METRICS_WINDOW_SECONDS * 1_000_000_000;
394
395 metrics.update_window_metrics(EventType::Transaction, window_duration_nanos);
397 metrics.update_window_metrics(EventType::Account, window_duration_nanos);
398 metrics.update_window_metrics(EventType::BlockMeta, window_duration_nanos);
399 }
400 });
401 }
402 }
403
404 #[inline]
406 pub fn record_process(&self, event_type: EventType) {
407 if self.enable_metrics {
408 self.metrics.event_metrics[event_type.as_index()].add_process_count();
409 }
410 }
411
412 #[inline]
414 pub fn record_events(&self, event_type: EventType, count: u64, processing_time_us: f64) {
415 if !self.enable_metrics {
416 return;
417 }
418
419 let index = event_type.as_index();
420
421 self.metrics.event_metrics[index].add_events_processed(count);
423
424 self.metrics.event_metrics[index].update_processing_stats(processing_time_us, count);
426
427 self.metrics.processing_stats.update(processing_time_us, count);
429 }
430
431 #[inline]
433 pub fn log_slow_processing(&self, processing_time_us: f64, event_count: usize) {
434 if processing_time_us > SLOW_PROCESSING_THRESHOLD_US {
435 log::debug!(
436 "{} slow processing: {:.2}us for {} events",
437 self.stream_name,
438 processing_time_us,
439 event_count,
440 );
441 }
442 }
443
444 pub fn get_uptime(&self) -> std::time::Duration {
446 std::time::Duration::from_secs_f64(self.metrics.get_uptime_seconds())
447 }
448
449 pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
451 self.metrics.get_event_metrics(event_type)
452 }
453
454 pub fn get_processing_stats(&self) -> ProcessingTimeStats {
456 self.metrics.get_processing_stats()
457 }
458
459 pub fn get_dropped_events_count(&self) -> u64 {
461 self.metrics.get_dropped_events_count()
462 }
463
464 pub fn print_metrics(&self) {
466 println!("\n📊 {} Performance Metrics", self.stream_name);
467 println!(" Run Time: {:?}", self.get_uptime());
468
469 let dropped_count = self.get_dropped_events_count();
471 if dropped_count > 0 {
472 println!("\n⚠️ Dropped Events: {}", dropped_count);
473 }
474
475 println!("┌─────────────┬──────────────┬──────────────────┬─────────────┬─────────────┬─────────────┐");
477 println!("│ Event Type │ Process Count│ Events Processed │ Avg Time(μs)│ Min 10s(μs) │ Max 10s(μs) │");
478 println!("├─────────────┼──────────────┼──────────────────┼─────────────┼─────────────┼─────────────┤");
479
480 for event_type in [EventType::Transaction, EventType::Account, EventType::BlockMeta] {
481 let metrics = self.get_event_metrics(event_type);
482 println!(
483 "│ {:11} │ {:12} │ {:16} │ {:9.2} │ {:9.2} │ {:9.2} │",
484 event_type.name(),
485 metrics.process_count,
486 metrics.events_processed,
487 metrics.processing_stats.avg_us,
488 metrics.processing_stats.min_us,
489 metrics.processing_stats.max_us
490 );
491 }
492
493 println!("└─────────────┴──────────────┴──────────────────┴─────────────┴─────────────┴─────────────┘");
494 println!();
495 }
496
497 pub async fn start_auto_monitoring(&self) -> Option<tokio::task::JoinHandle<()>> {
499 if !self.enable_metrics {
500 return None;
501 }
502
503 let manager = self.clone();
504 let handle = tokio::spawn(async move {
505 let mut interval = tokio::time::interval(std::time::Duration::from_secs(
506 DEFAULT_METRICS_PRINT_INTERVAL_SECONDS,
507 ));
508 loop {
509 interval.tick().await;
510 manager.print_metrics();
511 }
512 });
513 Some(handle)
514 }
515
516 pub fn new_with_metrics(
520 _metrics: Arc<std::sync::RwLock<PerformanceMetrics>>,
521 enable_metrics: bool,
522 stream_name: String,
523 ) -> Self {
524 Self::new(enable_metrics, stream_name)
525 }
526
527 pub fn get_metrics(&self) -> PerformanceMetrics {
529 PerformanceMetrics {
530 uptime: self.get_uptime(),
531 tx_metrics: self.get_event_metrics(EventType::Transaction),
532 account_metrics: self.get_event_metrics(EventType::Account),
533 block_meta_metrics: self.get_event_metrics(EventType::BlockMeta),
534 processing_stats: self.get_processing_stats(),
535 dropped_events_count: self.metrics.get_dropped_events_count(),
536 }
537 }
538
539 #[inline]
541 pub fn add_tx_process_count(&self) {
542 self.record_process(EventType::Transaction);
543 }
544
545 #[inline]
547 pub fn add_account_process_count(&self) {
548 self.record_process(EventType::Account);
549 }
550
551 #[inline]
553 pub fn add_block_meta_process_count(&self) {
554 self.record_process(EventType::BlockMeta);
555 }
556
557 #[inline]
559 pub fn update_metrics(
560 &self,
561 event_type: MetricsEventType,
562 events_processed: u64,
563 processing_time_us: f64,
564 ) {
565 self.record_events(event_type, events_processed, processing_time_us);
566 self.log_slow_processing(processing_time_us, events_processed as usize);
567 }
568
569 #[inline]
571 pub fn increment_dropped_events(&self) {
572 if !self.enable_metrics {
573 return;
574 }
575
576 let new_count = self.metrics.dropped_events_count.fetch_add(1, Ordering::Relaxed) + 1;
578
579 if new_count % 1000 == 0 {
581 log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
582 }
583 }
584
585 #[inline]
587 pub fn increment_dropped_events_by(&self, count: u64) {
588 if !self.enable_metrics || count == 0 {
589 return;
590 }
591
592 let new_count =
594 self.metrics.dropped_events_count.fetch_add(count, Ordering::Relaxed) + count;
595
596 if count > 1 {
598 log::debug!(
599 "{} dropped batch of {} events, total dropped: {}",
600 self.stream_name,
601 count,
602 new_count
603 );
604 }
605
606 if new_count % 1000 == 0 || (new_count / 1000) != ((new_count - count) / 1000) {
608 log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
609 }
610 }
611}
612
613impl Clone for MetricsManager {
614 fn clone(&self) -> Self {
615 Self {
616 metrics: self.metrics.clone(),
617 enable_metrics: self.enable_metrics,
618 stream_name: self.stream_name.clone(),
619 background_task_running: AtomicBool::new(false), }
621 }
622}