sol_parser_sdk/common/
metrics.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::Arc;
3
4use super::constants::*;
5
6/// Event type enumeration
7#[derive(Debug, Clone, Copy)]
8pub enum EventType {
9    Transaction = 0,
10    Account = 1,
11    BlockMeta = 2,
12}
13
14/// Compatibility alias
15pub type MetricsEventType = EventType;
16
17impl EventType {
18    #[inline]
19    const fn as_index(self) -> usize {
20        self as usize
21    }
22
23    const fn name(self) -> &'static str {
24        match self {
25            EventType::Transaction => "TX",
26            EventType::Account => "Account",
27            EventType::BlockMeta => "Block Meta",
28        }
29    }
30
31    // Compatibility constants
32    pub const TX: EventType = EventType::Transaction;
33}
34
35/// High-performance atomic event metrics
36#[derive(Debug)]
37struct AtomicEventMetrics {
38    process_count: AtomicU64,
39    events_processed: AtomicU64,
40    events_in_window: AtomicU64,
41    window_start_nanos: AtomicU64,
42    // Processing time statistics per event type
43    processing_stats: AtomicProcessingTimeStats,
44}
45
46impl AtomicEventMetrics {
47    fn new(now_nanos: u64) -> Self {
48        Self {
49            process_count: AtomicU64::new(0),
50            events_processed: AtomicU64::new(0),
51            events_in_window: AtomicU64::new(0),
52            window_start_nanos: AtomicU64::new(now_nanos),
53            processing_stats: AtomicProcessingTimeStats::new(),
54        }
55    }
56
57    /// Atomically increment process count
58    #[inline]
59    fn add_process_count(&self) {
60        self.process_count.fetch_add(1, Ordering::Relaxed);
61    }
62
63    /// Atomically increment event processing count
64    #[inline]
65    fn add_events_processed(&self, count: u64) {
66        self.events_processed.fetch_add(count, Ordering::Relaxed);
67        self.events_in_window.fetch_add(count, Ordering::Relaxed);
68    }
69
70    /// Get current count (non-blocking)
71    #[inline]
72    fn get_counts(&self) -> (u64, u64, u64) {
73        (
74            self.process_count.load(Ordering::Relaxed),
75            self.events_processed.load(Ordering::Relaxed),
76            self.events_in_window.load(Ordering::Relaxed),
77        )
78    }
79
80    /// Reset window count
81    #[inline]
82    fn reset_window(&self, new_start_nanos: u64) {
83        self.events_in_window.store(0, Ordering::Relaxed);
84        self.window_start_nanos.store(new_start_nanos, Ordering::Relaxed);
85    }
86
87    #[inline]
88    fn get_window_start(&self) -> u64 {
89        self.window_start_nanos.load(Ordering::Relaxed)
90    }
91
92    /// Get processing time statistics for this event type
93    #[inline]
94    fn get_processing_stats(&self) -> ProcessingTimeStats {
95        self.processing_stats.get_stats()
96    }
97
98    /// Update processing time statistics for this event type
99    #[inline]
100    fn update_processing_stats(&self, time_us: f64, event_count: u64) {
101        self.processing_stats.update(time_us, event_count);
102    }
103}
104
105/// High-performance atomic processing time statistics
106#[derive(Debug)]
107struct AtomicProcessingTimeStats {
108    min_time_bits: AtomicU64,
109    max_time_bits: AtomicU64,
110    min_time_timestamp_nanos: AtomicU64, // Timestamp of min value update (nanoseconds)
111    max_time_timestamp_nanos: AtomicU64, // Timestamp of max value update (nanoseconds)
112    total_time_us: AtomicU64,            // Store integer part of microseconds
113    total_events: AtomicU64,
114}
115
116impl AtomicProcessingTimeStats {
117    fn new() -> Self {
118        let now_nanos =
119            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
120                as u64;
121
122        Self {
123            min_time_bits: AtomicU64::new(f64::INFINITY.to_bits()),
124            max_time_bits: AtomicU64::new(0),
125            min_time_timestamp_nanos: AtomicU64::new(now_nanos),
126            max_time_timestamp_nanos: AtomicU64::new(now_nanos),
127            total_time_us: AtomicU64::new(0),
128            total_events: AtomicU64::new(0),
129        }
130    }
131
132    /// Atomically update processing time statistics
133    #[inline]
134    fn update(&self, time_us: f64, event_count: u64) {
135        let time_bits = time_us.to_bits();
136        let now_nanos =
137            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
138                as u64;
139
140        // Update minimum value, check time difference and reset if over 10 seconds
141        let mut current_min = self.min_time_bits.load(Ordering::Relaxed);
142        let min_timestamp = self.min_time_timestamp_nanos.load(Ordering::Relaxed);
143
144        // Check if min value timestamp exceeds 10 seconds (10_000_000_000 nanoseconds)
145        let min_time_diff_nanos = now_nanos.saturating_sub(min_timestamp);
146        if min_time_diff_nanos > 10_000_000_000 {
147            // Over 10 seconds, reset min value
148            self.min_time_bits.store(f64::INFINITY.to_bits(), Ordering::Relaxed);
149            self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
150            current_min = f64::INFINITY.to_bits();
151        }
152
153        // If current time is less than min value, update min value and timestamp
154        while time_bits < current_min {
155            match self.min_time_bits.compare_exchange_weak(
156                current_min,
157                time_bits,
158                Ordering::Relaxed,
159                Ordering::Relaxed,
160            ) {
161                Ok(_) => {
162                    // Successfully updated min value, also update timestamp
163                    self.min_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
164                    break;
165                }
166                Err(x) => current_min = x,
167            }
168        }
169
170        // Update maximum value, check time difference and reset if over 10 seconds
171        let mut current_max = self.max_time_bits.load(Ordering::Relaxed);
172        let max_timestamp = self.max_time_timestamp_nanos.load(Ordering::Relaxed);
173
174        // Check if max value timestamp exceeds 10 seconds (10_000_000_000 nanoseconds)
175        let time_diff_nanos = now_nanos.saturating_sub(max_timestamp);
176        if time_diff_nanos > 10_000_000_000 {
177            // Over 10 seconds, reset max value
178            self.max_time_bits.store(0, Ordering::Relaxed);
179            self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
180            current_max = 0;
181        }
182
183        // If current time is greater than max value, update max value and timestamp
184        while time_bits > current_max {
185            match self.max_time_bits.compare_exchange_weak(
186                current_max,
187                time_bits,
188                Ordering::Relaxed,
189                Ordering::Relaxed,
190            ) {
191                Ok(_) => {
192                    // Successfully updated max value, also update timestamp
193                    self.max_time_timestamp_nanos.store(now_nanos, Ordering::Relaxed);
194                    break;
195                }
196                Err(x) => current_max = x,
197            }
198        }
199
200        // Update cumulative values (convert microseconds to integers to avoid floating point accumulation issues)
201        let total_time_us_int = (time_us * event_count as f64) as u64;
202        self.total_time_us.fetch_add(total_time_us_int, Ordering::Relaxed);
203        self.total_events.fetch_add(event_count, Ordering::Relaxed);
204    }
205
206    /// Get statistics (non-blocking)
207    #[inline]
208    fn get_stats(&self) -> ProcessingTimeStats {
209        let min_bits = self.min_time_bits.load(Ordering::Relaxed);
210        let max_bits = self.max_time_bits.load(Ordering::Relaxed);
211        let total_time_us_int = self.total_time_us.load(Ordering::Relaxed);
212        let total_events = self.total_events.load(Ordering::Relaxed);
213
214        let min_time = f64::from_bits(min_bits);
215        let max_time = f64::from_bits(max_bits);
216        let avg_time =
217            if total_events > 0 { total_time_us_int as f64 / total_events as f64 } else { 0.0 };
218
219        ProcessingTimeStats {
220            min_us: if min_time == f64::INFINITY { 0.0 } else { min_time },
221            max_us: max_time,
222            avg_us: avg_time,
223        }
224    }
225}
226
227/// Processing time statistics result
228#[derive(Debug, Clone)]
229pub struct ProcessingTimeStats {
230    pub min_us: f64,
231    pub max_us: f64,
232    pub avg_us: f64,
233}
234
235/// Event metrics snapshot
236#[derive(Debug, Clone)]
237pub struct EventMetricsSnapshot {
238    pub process_count: u64,
239    pub events_processed: u64,
240    pub processing_stats: ProcessingTimeStats,
241}
242
243/// Compatibility structure - complete performance metrics
244#[derive(Debug, Clone)]
245pub struct PerformanceMetrics {
246    pub uptime: std::time::Duration,
247    pub tx_metrics: EventMetricsSnapshot,
248    pub account_metrics: EventMetricsSnapshot,
249    pub block_meta_metrics: EventMetricsSnapshot,
250    pub processing_stats: ProcessingTimeStats,
251    pub dropped_events_count: u64,
252}
253
254impl PerformanceMetrics {
255    /// Create default performance metrics (compatibility method)
256    pub fn new() -> Self {
257        let default_stats = ProcessingTimeStats { min_us: 0.0, max_us: 0.0, avg_us: 0.0 };
258        let default_metrics = EventMetricsSnapshot {
259            process_count: 0,
260            events_processed: 0,
261            processing_stats: default_stats.clone(),
262        };
263
264        Self {
265            uptime: std::time::Duration::ZERO,
266            tx_metrics: default_metrics.clone(),
267            account_metrics: default_metrics.clone(),
268            block_meta_metrics: default_metrics,
269            processing_stats: default_stats,
270            dropped_events_count: 0,
271        }
272    }
273}
274
275/// High-performance metrics system
276#[derive(Debug)]
277pub struct HighPerformanceMetrics {
278    start_nanos: u64,
279    event_metrics: [AtomicEventMetrics; 3],
280    processing_stats: AtomicProcessingTimeStats,
281    // 丢弃事件指标
282    dropped_events_count: AtomicU64,
283}
284
285impl HighPerformanceMetrics {
286    fn new() -> Self {
287        let now_nanos =
288            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
289                as u64;
290
291        Self {
292            start_nanos: now_nanos,
293            event_metrics: [
294                AtomicEventMetrics::new(now_nanos),
295                AtomicEventMetrics::new(now_nanos),
296                AtomicEventMetrics::new(now_nanos),
297            ],
298            processing_stats: AtomicProcessingTimeStats::new(),
299            // 初始化丢弃事件指标
300            dropped_events_count: AtomicU64::new(0),
301        }
302    }
303
304    /// 获取运行时长(秒)
305    #[inline]
306    pub fn get_uptime_seconds(&self) -> f64 {
307        let now_nanos =
308            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
309                as u64;
310        (now_nanos - self.start_nanos) as f64 / 1_000_000_000.0
311    }
312
313    /// 获取事件指标快照
314    #[inline]
315    pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
316        let index = event_type.as_index();
317        let (process_count, events_processed, _) = self.event_metrics[index].get_counts();
318        let processing_stats = self.event_metrics[index].get_processing_stats();
319
320        EventMetricsSnapshot { process_count, events_processed, processing_stats }
321    }
322
323    /// 获取处理时间统计
324    #[inline]
325    pub fn get_processing_stats(&self) -> ProcessingTimeStats {
326        self.processing_stats.get_stats()
327    }
328
329    /// 获取丢弃事件计数
330    #[inline]
331    pub fn get_dropped_events_count(&self) -> u64 {
332        self.dropped_events_count.load(Ordering::Relaxed)
333    }
334
335    /// 更新窗口指标(后台任务调用)
336    fn update_window_metrics(&self, event_type: EventType, window_duration_nanos: u64) {
337        let now_nanos =
338            std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos()
339                as u64;
340
341        let index = event_type.as_index();
342        let event_metric = &self.event_metrics[index];
343
344        let window_start = event_metric.get_window_start();
345        if now_nanos.saturating_sub(window_start) >= window_duration_nanos {
346            event_metric.reset_window(now_nanos);
347        }
348    }
349}
350
351/// 高性能指标管理器
352pub struct MetricsManager {
353    metrics: Arc<HighPerformanceMetrics>,
354    enable_metrics: bool,
355    stream_name: String,
356    background_task_running: AtomicBool,
357}
358
359impl MetricsManager {
360    /// 创建新的指标管理器
361    pub fn new(enable_metrics: bool, stream_name: String) -> Self {
362        let manager = Self {
363            metrics: Arc::new(HighPerformanceMetrics::new()),
364            enable_metrics,
365            stream_name,
366            background_task_running: AtomicBool::new(false),
367        };
368
369        // 启动后台任务
370        manager.start_background_tasks();
371        manager
372    }
373
374    /// 启动后台任务
375    fn start_background_tasks(&self) {
376        if self
377            .background_task_running
378            .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
379            .is_ok()
380        {
381            if !self.enable_metrics {
382                return;
383            }
384
385            let metrics = self.metrics.clone();
386
387            tokio::spawn(async move {
388                let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
389
390                loop {
391                    interval.tick().await;
392
393                    let window_duration_nanos = DEFAULT_METRICS_WINDOW_SECONDS * 1_000_000_000;
394
395                    // 更新所有事件类型的窗口指标
396                    metrics.update_window_metrics(EventType::Transaction, window_duration_nanos);
397                    metrics.update_window_metrics(EventType::Account, window_duration_nanos);
398                    metrics.update_window_metrics(EventType::BlockMeta, window_duration_nanos);
399                }
400            });
401        }
402    }
403
404    /// 记录处理次数(非阻塞)
405    #[inline]
406    pub fn record_process(&self, event_type: EventType) {
407        if self.enable_metrics {
408            self.metrics.event_metrics[event_type.as_index()].add_process_count();
409        }
410    }
411
412    /// 记录事件处理(非阻塞)
413    #[inline]
414    pub fn record_events(&self, event_type: EventType, count: u64, processing_time_us: f64) {
415        if !self.enable_metrics {
416            return;
417        }
418
419        let index = event_type.as_index();
420
421        // 原子更新事件计数
422        self.metrics.event_metrics[index].add_events_processed(count);
423
424        // 原子更新该事件类型的处理时间统计
425        self.metrics.event_metrics[index].update_processing_stats(processing_time_us, count);
426
427        // 保持全局处理时间统计的兼容性
428        self.metrics.processing_stats.update(processing_time_us, count);
429    }
430
431    /// 记录慢处理操作
432    #[inline]
433    pub fn log_slow_processing(&self, processing_time_us: f64, event_count: usize) {
434        if processing_time_us > SLOW_PROCESSING_THRESHOLD_US {
435            log::debug!(
436                "{} slow processing: {:.2}us for {} events",
437                self.stream_name,
438                processing_time_us,
439                event_count,
440            );
441        }
442    }
443
444    /// 获取运行时长
445    pub fn get_uptime(&self) -> std::time::Duration {
446        std::time::Duration::from_secs_f64(self.metrics.get_uptime_seconds())
447    }
448
449    /// 获取事件指标
450    pub fn get_event_metrics(&self, event_type: EventType) -> EventMetricsSnapshot {
451        self.metrics.get_event_metrics(event_type)
452    }
453
454    /// 获取处理时间统计
455    pub fn get_processing_stats(&self) -> ProcessingTimeStats {
456        self.metrics.get_processing_stats()
457    }
458
459    /// 获取丢弃事件计数
460    pub fn get_dropped_events_count(&self) -> u64 {
461        self.metrics.get_dropped_events_count()
462    }
463
464    /// 打印性能指标(非阻塞)
465    pub fn print_metrics(&self) {
466        println!("\n📊 {} Performance Metrics", self.stream_name);
467        println!("   Run Time: {:?}", self.get_uptime());
468
469        // 打印丢弃事件指标
470        let dropped_count = self.get_dropped_events_count();
471        if dropped_count > 0 {
472            println!("\n⚠️  Dropped Events: {}", dropped_count);
473        }
474
475        // 打印事件指标表格(包含处理时间统计)
476        println!("┌─────────────┬──────────────┬──────────────────┬─────────────┬─────────────┬─────────────┐");
477        println!("│ Event Type  │ Process Count│ Events Processed │ Avg Time(μs)│ Min 10s(μs) │ Max 10s(μs) │");
478        println!("├─────────────┼──────────────┼──────────────────┼─────────────┼─────────────┼─────────────┤");
479
480        for event_type in [EventType::Transaction, EventType::Account, EventType::BlockMeta] {
481            let metrics = self.get_event_metrics(event_type);
482            println!(
483                "│ {:11} │ {:12} │ {:16} │ {:9.2}   │ {:9.2}   │ {:9.2}   │",
484                event_type.name(),
485                metrics.process_count,
486                metrics.events_processed,
487                metrics.processing_stats.avg_us,
488                metrics.processing_stats.min_us,
489                metrics.processing_stats.max_us
490            );
491        }
492
493        println!("└─────────────┴──────────────┴──────────────────┴─────────────┴─────────────┴─────────────┘");
494        println!();
495    }
496
497    /// 启动自动性能监控任务
498    pub async fn start_auto_monitoring(&self) -> Option<tokio::task::JoinHandle<()>> {
499        if !self.enable_metrics {
500            return None;
501        }
502
503        let manager = self.clone();
504        let handle = tokio::spawn(async move {
505            let mut interval = tokio::time::interval(std::time::Duration::from_secs(
506                DEFAULT_METRICS_PRINT_INTERVAL_SECONDS,
507            ));
508            loop {
509                interval.tick().await;
510                manager.print_metrics();
511            }
512        });
513        Some(handle)
514    }
515
516    // === 兼容性方法 ===
517
518    /// 兼容性构造函数
519    pub fn new_with_metrics(
520        _metrics: Arc<std::sync::RwLock<PerformanceMetrics>>,
521        enable_metrics: bool,
522        stream_name: String,
523    ) -> Self {
524        Self::new(enable_metrics, stream_name)
525    }
526
527    /// 获取完整的性能指标(兼容性方法)
528    pub fn get_metrics(&self) -> PerformanceMetrics {
529        PerformanceMetrics {
530            uptime: self.get_uptime(),
531            tx_metrics: self.get_event_metrics(EventType::Transaction),
532            account_metrics: self.get_event_metrics(EventType::Account),
533            block_meta_metrics: self.get_event_metrics(EventType::BlockMeta),
534            processing_stats: self.get_processing_stats(),
535            dropped_events_count: self.metrics.get_dropped_events_count(),
536        }
537    }
538
539    /// 兼容性方法 - 添加交易处理计数
540    #[inline]
541    pub fn add_tx_process_count(&self) {
542        self.record_process(EventType::Transaction);
543    }
544
545    /// 兼容性方法 - 添加账户处理计数
546    #[inline]
547    pub fn add_account_process_count(&self) {
548        self.record_process(EventType::Account);
549    }
550
551    /// 兼容性方法 - 添加区块元数据处理计数
552    #[inline]
553    pub fn add_block_meta_process_count(&self) {
554        self.record_process(EventType::BlockMeta);
555    }
556
557    /// 兼容性方法 - 更新指标
558    #[inline]
559    pub fn update_metrics(
560        &self,
561        event_type: MetricsEventType,
562        events_processed: u64,
563        processing_time_us: f64,
564    ) {
565        self.record_events(event_type, events_processed, processing_time_us);
566        self.log_slow_processing(processing_time_us, events_processed as usize);
567    }
568
569    /// 增加丢弃事件计数
570    #[inline]
571    pub fn increment_dropped_events(&self) {
572        if !self.enable_metrics {
573            return;
574        }
575
576        // 原子地增加丢弃事件计数
577        let new_count = self.metrics.dropped_events_count.fetch_add(1, Ordering::Relaxed) + 1;
578
579        // 每丢弃1000个事件记录一次警告日志
580        if new_count % 1000 == 0 {
581            log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
582        }
583    }
584
585    /// 批量增加丢弃事件计数
586    #[inline]
587    pub fn increment_dropped_events_by(&self, count: u64) {
588        if !self.enable_metrics || count == 0 {
589            return;
590        }
591
592        // 原子地增加丢弃事件计数
593        let new_count =
594            self.metrics.dropped_events_count.fetch_add(count, Ordering::Relaxed) + count;
595
596        // 记录批量丢弃事件的日志
597        if count > 1 {
598            log::debug!(
599                "{} dropped batch of {} events, total dropped: {}",
600                self.stream_name,
601                count,
602                new_count
603            );
604        }
605
606        // 每丢弃1000个事件记录一次警告日志
607        if new_count % 1000 == 0 || (new_count / 1000) != ((new_count - count) / 1000) {
608            log::debug!("{} dropped events count reached: {}", self.stream_name, new_count);
609        }
610    }
611}
612
613impl Clone for MetricsManager {
614    fn clone(&self) -> Self {
615        Self {
616            metrics: self.metrics.clone(),
617            enable_metrics: self.enable_metrics,
618            stream_name: self.stream_name.clone(),
619            background_task_running: AtomicBool::new(false), // 新实例不自动启动后台任务
620        }
621    }
622}