kaspa_consensus/pipeline/
monitor.rs

1use super::ProcessingCounters;
2use kaspa_core::{
3    info,
4    task::{
5        service::{AsyncService, AsyncServiceFuture},
6        tick::{TickReason, TickService},
7    },
8    trace, warn,
9};
10use std::{
11    sync::Arc,
12    time::{Duration, Instant},
13};
14
15const MONITOR: &str = "consensus-monitor";
16
17pub struct ConsensusMonitor {
18    // Counters
19    counters: Arc<ProcessingCounters>,
20
21    // Tick service
22    tick_service: Arc<TickService>,
23}
24
25impl ConsensusMonitor {
26    pub fn new(counters: Arc<ProcessingCounters>, tick_service: Arc<TickService>) -> ConsensusMonitor {
27        ConsensusMonitor { counters, tick_service }
28    }
29
30    pub async fn worker(self: &Arc<ConsensusMonitor>) {
31        let mut last_snapshot = self.counters.snapshot();
32        let mut last_log_time = Instant::now();
33        let snapshot_interval = 10;
34        loop {
35            if let TickReason::Shutdown = self.tick_service.tick(Duration::from_secs(snapshot_interval)).await {
36                // Let the system print final logs before exiting
37                tokio::time::sleep(Duration::from_millis(500)).await;
38                break;
39            }
40
41            let snapshot = self.counters.snapshot();
42            if snapshot == last_snapshot {
43                // No update, avoid printing useless info
44                last_log_time = Instant::now();
45                continue;
46            }
47
48            // Subtract the snapshots
49            let delta = &snapshot - &last_snapshot;
50            let now = Instant::now();
51
52            info!(
53                "Processed {} blocks and {} headers in the last {:.2}s ({} transactions; {} UTXO-validated blocks; {:.2} parents; {:.2} mergeset; {:.2} TPB; {:.1} mass)", 
54                delta.body_counts,
55                delta.header_counts,
56                (now - last_log_time).as_secs_f64(),
57                delta.txs_counts,
58                delta.chain_block_counts,
59                if delta.header_counts != 0 { delta.dep_counts as f64 / delta.header_counts as f64 } else { 0f64 },
60                if delta.header_counts != 0 { delta.mergeset_counts as f64 / delta.header_counts as f64 } else { 0f64 },
61                if delta.body_counts != 0 { delta.txs_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
62                if delta.body_counts != 0 { delta.mass_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
63            );
64
65            if delta.chain_disqualified_counts > 0 {
66                warn!(
67                    "Consensus detected UTXO-invalid blocks which are disqualified from the virtual selected chain (possibly due to inheritance): {} disqualified vs. {} valid chain blocks",
68                    delta.chain_disqualified_counts, delta.chain_block_counts
69                );
70            }
71
72            last_snapshot = snapshot;
73            last_log_time = now;
74        }
75
76        trace!("monitor thread exiting");
77    }
78}
79
80// service trait implementation for Monitor
81impl AsyncService for ConsensusMonitor {
82    fn ident(self: Arc<Self>) -> &'static str {
83        MONITOR
84    }
85
86    fn start(self: Arc<Self>) -> AsyncServiceFuture {
87        Box::pin(async move {
88            self.worker().await;
89            Ok(())
90        })
91    }
92
93    fn signal_exit(self: Arc<Self>) {
94        trace!("sending an exit signal to {}", MONITOR);
95    }
96
97    fn stop(self: Arc<Self>) -> AsyncServiceFuture {
98        Box::pin(async move {
99            trace!("{} stopped", MONITOR);
100            Ok(())
101        })
102    }
103}