Skip to main content

sol_parser_sdk/common/
metrics.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::Arc;
3
4use super::constants::*;
5
6/// Event type enumeration
7#[derive(Debug, Clone, Copy)]
8pub enum EventType {
9    Transaction = 0,
10    Account = 1,
11    BlockMeta = 2,
12}
13
14/// Compatibility alias
15pub type MetricsEventType = EventType;
16
17impl EventType {
18    #[inline]
19    const fn as_index(self) -> usize {
20        self as usize
21    }
22
23    const fn name(self) -> &'static str {
24        match self {
25            EventType::Transaction => "TX",
26            EventType::Account => "Account",
27            EventType::BlockMeta => "Block Meta",
28        }
29    }
30
31    // Compatibility constants
32    pub const TX: EventType = EventType::Transaction;
33}
34
35/// High-performance atomic event metrics
36#[derive(Debug)]
37struct AtomicEventMetrics {
38    process_count: AtomicU64,
39    events_processed: AtomicU64,
40    events_in_window: AtomicU64,
41    window_start_nanos: AtomicU64,
42    // Processing time statistics per event type
43    processing_stats: AtomicProcessingTimeStats,
44}
45
46impl AtomicEventMetrics {
47    fn new(now_nanos: u64) -> Self {
48        Self {
49            process_count: AtomicU64::new(0),
50            events_processed: AtomicU64::new(0),
51            events_in_window: AtomicU64::new(0),
52            window_start_nanos: AtomicU64::new(now_nanos),
53            processing_stats: AtomicProcessingTimeStats::new(),
54        }
55    }
56
57    /// Atomically increment process count
58    #[inline]
59    fn add_process_count(&self) {
60        self.process_count.fetch_add(1, Ordering::Relaxed);
61    }
62
63    /// Atomically increment event processing count
64    #[inline]
65    fn add_events_processed(&self, count: u64) {
66        self.events_processed.fetch_add(count, Ordering::Relaxed);
67        self.events_in_window.fetch_add(count, Ordering::Relaxed);
68    }
69
70    /// Get current count (non-blocking)
71    #[inline]
72    fn get_counts(&self) -> (u64, u64, u64) {
73        (
74            self.process_count.load(Ordering::Relaxed),
75            self.events_processed.load(Ordering::Relaxed),
76            self.events_in_window.load(Ordering::Relaxed),
77        )
78    }
79
80    /// Reset window count
81    #[inline]
82    fn reset_window(&self, new_start_nanos: u64) {
83        self.events_in_window.store(0, Ordering::Relaxed);
84        self.window_start_nanos.store(new_start_nanos, Ordering::Relaxed);
85    }
86
87    #[inline]
88    fn get_window_start(&self) -> u64 {
89        self.window_start_nanos.load(Ordering::Relaxed)
90    }
91
92    /// Get processing time statistics for this event type
93    #[inline]
94    fn get_processing_stats(&self) -> ProcessingTimeStats {
95        self.processing_stats.get_stats()
96    }
97
98    /// Update processing time statistics for this event type
99    #[inline]
100    fn update_processing_stats(&self, time_us: f64, event_count: u64) {
101        self.processing_stats.update(time_us, event_count);
102    }
103}
104
105/// High-performance atomic processing time statistics
106#[derive(Debug)]
107struct AtomicProcessingTimeStats {
108    min_time_bits: AtomicU64,
109    max_time_bits: AtomicU64,
110    min_time_timestamp_nanos: AtomicU64, // Timestamp of min value update (nanoseconds)
111    max_time_timestamp_nanos: AtomicU64, // Timestamp of max value update (nanoseconds)
112    total_time_us: AtomicU64,            // Store integer part of microseconds
113    total_events: AtomicU64,
114}
115
116impl AtomicProcessingTimeStats {
117    fn new() -> Self {
118        let now_nanos =
119            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
120                as u64;
121
122        Self {
123            min_time_bits: AtomicU64::new(f64::INFINITY.to_bits()),
124            max_time_bits: AtomicU64::new(0),
125            min_time_timestamp_nanos: AtomicU64::new(now_nanos),
126            max_time_timestamp_nanos: AtomicU64::new(now_nanos),
127            total_time_us: AtomicU64::new(0),
128            total_events: AtomicU64::new(0),
129        }
130    }
131
132    /// Atomically update processing time statistics
133    #[inline]
134    fn update(&self, time_us: f64, event_count: u64) {
135        let time_bits = time_us.to_bits();
136        let now_nanos =
137            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
138                as u64;
139
140        // Update minimum value, check time difference and reset if over 10 seconds
141        let mut current_min = self.min_time_bits.load(Ordering::Relaxed);
142        let min_timestamp = self.min_time_timestamp_nanos.load(Ordering::Relaxed);
143
144        // Check if min value timestamp exceeds 10 seconds (10_000_000_000 nanoseconds)
145        let min_time_diff_nanos = now_nanos.saturating_sub(min_timestamp);
146        if min_time_diff_nanos > 10_000_000_000 {
147            // Over 10 seconds, reset min value
148            self.min_time_bits.store(f64::INFINITY.to_bits(), Ordering::Relaxed);
149            self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
150            current_min = f64::INFINITY.to_bits();
151        }
152
153        // If current time is less than min value, update min value and timestamp
154        while time_bits < current_min {
155            match self.min_time_bits.compare_exchange_weak(
156                current_min,
157                time_bits,
158                Ordering::Relaxed,
159                Ordering::Relaxed,
160            ) {
161                Ok(_) => {
162                    // Successfully updated min value, also update timestamp
163                    self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
164                    break;
165                }
166                Err(x) => current_min = x,
167            }
168        }
169
170        // Update maximum value, check time difference and reset if over 10 seconds
171        let mut current_max = self.max_time_bits.load(Ordering::Relaxed);
172        let max_timestamp = self.max_time_timestamp_nanos.load(Ordering::Relaxed);
173
174        // Check if max value timestamp exceeds 10 seconds (10_000_000_000 nanoseconds)
175        let time_diff_nanos = now_nanos.saturating_sub(max_timestamp);
176        if time_diff_nanos > 10_000_000_000 {
177            // Over 10 seconds, reset max value
178            self.max_time_bits.store(0, Ordering::Relaxed);
179            self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
180            current_max = 0;
181        }
182
183        // If current time is greater than max value, update max value and timestamp
184        while time_bits > current_max {
185            match self.max_time_bits.compare_exchange_weak(
186                current_max,
187                time_bits,
188                Ordering::Relaxed,
189                Ordering::Relaxed,
190            ) {
191                Ok(_) => {
192                    // Successfully updated max value, also update timestamp
193                    self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
194                    break;
195                }
196                Err(x) => current_max = x,
197            }
198        }
199
200        // Update cumulative values (convert microseconds to integers to avoid floating point accumulation issues)
201        let total_time_us_int = (time_us * event_count as f64) as u64;
202        self.total_time_us.fetch_add(total_time_us_int, Ordering::Relaxed);
203        self.total_events.fetch_add(event_count, Ordering::Relaxed);
204    }
205
206    /// Get statistics (non-blocking)
207    #[inline]
208    fn get_stats(&self) -> ProcessingTimeStats {
209        let min_bits = self.min_time_bits.load(Ordering::Relaxed);
210        let max_bits = self.max_time_bits.load(Ordering::Relaxed);
211        let total_time_us_int = self.total_time_us.load(Ordering::Relaxed);
212        let total_events = self.total_events.load(Ordering::Relaxed);
213
214        let min_time = f64::from_bits(min_bits);
215        let max_time = f64::from_bits(max_bits);
216        let avg_time =
217            if total_events > 0 { total_time_us_int as f64 / total_events as f64 } else { 0.0 };
218
219        ProcessingTimeStats {
220            min_us: if min_time == f64::INFINITY { 0.0 } else { min_time },
221            max_us: max_time,
222            avg_us: avg_time,
223        }
224    }
225}
226
227/// Processing time statistics result
228#[derive(Debug, Clone)]
229pub struct ProcessingTimeStats {
230    pub min_us: f64,
231    pub max_us: f64,
232    pub avg_us: f64,
233}
234
235/// Event metrics snapshot
236#[derive(Debug, Clone)]
237pub struct EventMetricsSnapshot {
238    pub process_count: u64,
239    pub events_processed: u64,
240    pub processing_stats: ProcessingTimeStats,
241}
242
243/// Compatibility structure - complete performance metrics
244#[derive(Debug, Clone)]
245pub struct PerformanceMetrics {
246    pub uptime: std::time::Duration,
247    pub tx_metrics: EventMetricsSnapshot,
248    pub account_metrics: EventMetricsSnapshot,
249    pub block_meta_metrics: EventMetricsSnapshot,
250    pub processing_stats: ProcessingTimeStats,
251    pub dropped_events_count: u64,
252}
253
254impl PerformanceMetrics {
255    /// Create default performance metrics (compatibility method)
256    pub fn new() -> Self {
257        let default_stats = ProcessingTimeStats { min_us: 0.0, max_us: 0.0, avg_us: 0.0 };
258        let default_metrics = EventMetricsSnapshot {
259            process_count: 0,
260            events_processed: 0,
261            processing_stats: default_stats.clone(),
262        };
263
264        Self {
265            uptime: std::time::Duration::ZERO,
266            tx_metrics: default_metrics.clone(),
267            account_metrics: default_metrics.clone(),
268            block_meta_metrics: default_metrics,
269            processing_stats: default_stats,
270            dropped_events_count: 0,
271        }
272    }
273}
274
275impl Default for PerformanceMetrics {
276    fn default() -> Self {
277        Self::new()
278    }
279}
280
281/// High-performance metrics system
282#[derive(Debug)]
283pub struct HighPerformanceMetrics {
284    start_nanos: u64,
285    event_metrics: [AtomicEventMetrics; 3],
286    processing_stats: AtomicProcessingTimeStats,
287    // 丢弃事件指标
288    dropped_events_count: AtomicU64,
289}
290
291impl HighPerformanceMetrics {
292    fn new() -> Self {
293        let now_nanos =
294            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
295                as u64;
296
297        Self {
298            start_nanos: now_nanos,
299            event_metrics: [
300                AtomicEventMetrics::new(now_nanos),
301                AtomicEventMetrics::new(now_nanos),
302                AtomicEventMetrics::new(now_nanos),
303            ],
304            processing_stats: AtomicProcessingTimeStats::new(),
305            // 初始化丢弃事件指标
306            dropped_events_count: AtomicU64::new(0),
307        }
308    }
309
310    /// 获取运行时长(秒)
311    #[inline]
312    pub fn get_uptime_seconds(&self) -> f64 {
313        let now_nanos =
314            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
315                as u64;
316        (now_nanos - self.start_nanos) as f64 / 1_000_000_000.0
317    }
318
319    /// 获取事件指标快照
320    #[inline]
321    pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
322        let index = event_type.as_index();
323        let (process_count, events_processed, _) = self.event_metrics[index].get_counts();
324        let processing_stats = self.event_metrics[index].get_processing_stats();
325
326        EventMetricsSnapshot { process_count, events_processed, processing_stats }
327    }
328
329    /// 获取处理时间统计
330    #[inline]
331    pub fn get_processing_stats(&self) -> ProcessingTimeStats {
332        self.processing_stats.get_stats()
333    }
334
335    /// 获取丢弃事件计数
336    #[inline]
337    pub fn get_dropped_events_count(&self) -> u64 {
338        self.dropped_events_count.load(Ordering::Relaxed)
339    }
340
341    /// 更新窗口指标(后台任务调用)
342    fn update_window_metrics(&self, event_type: EventType, window_duration_nanos: u64) {
343        let now_nanos =
344            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
345                as u64;
346
347        let index = event_type.as_index();
348        let event_metric = &self.event_metrics[index];
349
350        let window_start = event_metric.get_window_start();
351        if now_nanos.saturating_sub(window_start) >= window_duration_nanos {
352            event_metric.reset_window(now_nanos);
353        }
354    }
355}
356
357/// 高性能指标管理器
358pub struct MetricsManager {
359    metrics: Arc<HighPerformanceMetrics>,
360    enable_metrics: bool,
361    stream_name: String,
362    background_task_running: AtomicBool,
363}
364
365impl MetricsManager {
366    /// 创建新的指标管理器
367    pub fn new(enable_metrics: bool, stream_name: String) -> Self {
368        let manager = Self {
369            metrics: Arc::new(HighPerformanceMetrics::new()),
370            enable_metrics,
371            stream_name,
372            background_task_running: AtomicBool::new(false),
373        };
374
375        // 启动后台任务
376        manager.start_background_tasks();
377        manager
378    }
379
380    /// 启动后台任务
381    fn start_background_tasks(&self) {
382        if self
383            .background_task_running
384            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
385            .is_ok()
386        {
387            if !self.enable_metrics {
388                return;
389            }
390
391            let metrics = self.metrics.clone();
392
393            tokio::spawn(async move {
394                let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
395
396                loop {
397                    interval.tick().await;
398
399                    let window_duration_nanos = DEFAULT_METRICS_WINDOW_SECONDS * 1_000_000_000;
400
401                    // 更新所有事件类型的窗口指标
402                    metrics.update_window_metrics(EventType::Transaction, window_duration_nanos);
403                    metrics.update_window_metrics(EventType::Account, window_duration_nanos);
404                    metrics.update_window_metrics(EventType::BlockMeta, window_duration_nanos);
405                }
406            });
407        }
408    }
409
410    /// 记录处理次数(非阻塞)
411    #[inline]
412    pub fn record_process(&self, event_type: EventType) {
413        if self.enable_metrics {
414            self.metrics.event_metrics[event_type.as_index()].add_process_count();
415        }
416    }
417
418    /// 记录事件处理(非阻塞)
419    #[inline]
420    pub fn record_events(&self, event_type: EventType, count: u64, processing_time_us: f64) {
421        if !self.enable_metrics {
422            return;
423        }
424
425        let index = event_type.as_index();
426
427        // 原子更新事件计数
428        self.metrics.event_metrics[index].add_events_processed(count);
429
430        // 原子更新该事件类型的处理时间统计
431        self.metrics.event_metrics[index].update_processing_stats(processing_time_us, count);
432
433        // 保持全局处理时间统计的兼容性
434        self.metrics.processing_stats.update(processing_time_us, count);
435    }
436
437    /// 记录慢处理操作
438    #[inline]
439    pub fn log_slow_processing(&self, processing_time_us: f64, event_count: usize) {
440        if processing_time_us > SLOW_PROCESSING_THRESHOLD_US {
441            log::debug!(
442                "{} slow processing: {:.2}us for {} events",
443                self.stream_name,
444                processing_time_us,
445                event_count,
446            );
447        }
448    }
449
450    /// 获取运行时长
451    pub fn get_uptime(&self) -> std::time::Duration {
452        std::time::Duration::from_secs_f64(self.metrics.get_uptime_seconds())
453    }
454
455    /// 获取事件指标
456    pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
457        self.metrics.get_event_metrics(event_type)
458    }
459
460    /// 获取处理时间统计
461    pub fn get_processing_stats(&self) -> ProcessingTimeStats {
462        self.metrics.get_processing_stats()
463    }
464
465    /// 获取丢弃事件计数
466    pub fn get_dropped_events_count(&self) -> u64 {
467        self.metrics.get_dropped_events_count()
468    }
469
470    /// 打印性能指标(非阻塞)
471    pub fn print_metrics(&self) {
472        println!("\n📊 {} Performance Metrics", self.stream_name);
473        println!("   Run Time: {:?}", self.get_uptime());
474
475        // 打印丢弃事件指标
476        let dropped_count = self.get_dropped_events_count();
477        if dropped_count > 0 {
478            println!("\n⚠️  Dropped Events: {}", dropped_count);
479        }
480
481        // 打印事件指标表格(包含处理时间统计)
482        println!("┌─────────────┬──────────────┬──────────────────┬─────────────┬─────────────┬─────────────┐");
483        println!("│ Event Type  │ Process Count│ Events Processed │ Avg Time(μs)│ Min 10s(μs) │ Max 10s(μs) │");
484        println!("├─────────────┼──────────────┼──────────────────┼─────────────┼─────────────┼─────────────┤");
485
486        for event_type in [EventType::Transaction, EventType::Account, EventType::BlockMeta] {
487            let metrics = self.get_event_metrics(event_type);
488            println!(
489                "│ {:11} │ {:12} │ {:16} │ {:9.2}   │ {:9.2}   │ {:9.2}   │",
490                event_type.name(),
491                metrics.process_count,
492                metrics.events_processed,
493                metrics.processing_stats.avg_us,
494                metrics.processing_stats.min_us,
495                metrics.processing_stats.max_us
496            );
497        }
498
499        println!("└─────────────┴──────────────┴──────────────────┴─────────────┴─────────────┴─────────────┘");
500        println!();
501    }
502
503    /// 启动自动性能监控任务
504    pub async fn start_auto_monitoring(&self) -> Option<tokio::task::JoinHandle<()>> {
505        if !self.enable_metrics {
506            return None;
507        }
508
509        let manager = self.clone();
510        let handle = tokio::spawn(async move {
511            let mut interval = tokio::time::interval(std::time::Duration::from_secs(
512                DEFAULT_METRICS_PRINT_INTERVAL_SECONDS,
513            ));
514            loop {
515                interval.tick().await;
516                manager.print_metrics();
517            }
518        });
519        Some(handle)
520    }
521
522    // === 兼容性方法 ===
523
524    /// 兼容性构造函数
525    pub fn new_with_metrics(
526        _metrics: Arc<std::sync::RwLock<PerformanceMetrics>>,
527        enable_metrics: bool,
528        stream_name: String,
529    ) -> Self {
530        Self::new(enable_metrics, stream_name)
531    }
532
533    /// 获取完整的性能指标(兼容性方法)
534    pub fn get_metrics(&self) -> PerformanceMetrics {
535        PerformanceMetrics {
536            uptime: self.get_uptime(),
537            tx_metrics: self.get_event_metrics(EventType::Transaction),
538            account_metrics: self.get_event_metrics(EventType::Account),
539            block_meta_metrics: self.get_event_metrics(EventType::BlockMeta),
540            processing_stats: self.get_processing_stats(),
541            dropped_events_count: self.metrics.get_dropped_events_count(),
542        }
543    }
544
545    /// 兼容性方法 - 添加交易处理计数
546    #[inline]
547    pub fn add_tx_process_count(&self) {
548        self.record_process(EventType::Transaction);
549    }
550
551    /// 兼容性方法 - 添加账户处理计数
552    #[inline]
553    pub fn add_account_process_count(&self) {
554        self.record_process(EventType::Account);
555    }
556
557    /// 兼容性方法 - 添加区块元数据处理计数
558    #[inline]
559    pub fn add_block_meta_process_count(&self) {
560        self.record_process(EventType::BlockMeta);
561    }
562
563    /// 兼容性方法 - 更新指标
564    #[inline]
565    pub fn update_metrics(
566        &self,
567        event_type: MetricsEventType,
568        events_processed: u64,
569        processing_time_us: f64,
570    ) {
571        self.record_events(event_type, events_processed, processing_time_us);
572        self.log_slow_processing(processing_time_us, events_processed as usize);
573    }
574
575    /// 增加丢弃事件计数
576    #[inline]
577    pub fn increment_dropped_events(&self) {
578        if !self.enable_metrics {
579            return;
580        }
581
582        // 原子地增加丢弃事件计数
583        let new_count = self.metrics.dropped_events_count.fetch_add(1, Ordering::Relaxed) + 1;
584
585        // 每丢弃1000个事件记录一次警告日志
586        if new_count.is_multiple_of(1000) {
587            log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
588        }
589    }
590
591    /// 批量增加丢弃事件计数
592    #[inline]
593    pub fn increment_dropped_events_by(&self, count: u64) {
594        if !self.enable_metrics || count == 0 {
595            return;
596        }
597
598        // 原子地增加丢弃事件计数
599        let new_count =
600            self.metrics.dropped_events_count.fetch_add(count, Ordering::Relaxed) + count;
601
602        // 记录批量丢弃事件的日志
603        if count > 1 {
604            log::debug!(
605                "{} dropped batch of {} events, total dropped: {}",
606                self.stream_name,
607                count,
608                new_count
609            );
610        }
611
612        // 每丢弃1000个事件记录一次警告日志
613        if new_count.is_multiple_of(1000) || (new_count / 1000) != ((new_count - count) / 1000) {
614            log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
615        }
616    }
617}
618
619impl Clone for MetricsManager {
620    fn clone(&self) -> Self {
621        Self {
622            metrics: self.metrics.clone(),
623            enable_metrics: self.enable_metrics,
624            stream_name: self.stream_name.clone(),
625            background_task_running: AtomicBool::new(false), // 新实例不自动启动后台任务
626        }
627    }
628}