kaspa_consensus/pipeline/
monitor.rs1use super::ProcessingCounters;
2use kaspa_core::{
3 info,
4 task::{
5 service::{AsyncService, AsyncServiceFuture},
6 tick::{TickReason, TickService},
7 },
8 trace, warn,
9};
10use std::{
11 sync::Arc,
12 time::{Duration, Instant},
13};
14
15const MONITOR: &str = "consensus-monitor";
16
17pub struct ConsensusMonitor {
18 counters: Arc<ProcessingCounters>,
20
21 tick_service: Arc<TickService>,
23}
24
25impl ConsensusMonitor {
26 pub fn new(counters: Arc<ProcessingCounters>, tick_service: Arc<TickService>) -> ConsensusMonitor {
27 ConsensusMonitor { counters, tick_service }
28 }
29
30 pub async fn worker(self: &Arc<ConsensusMonitor>) {
31 let mut last_snapshot = self.counters.snapshot();
32 let mut last_log_time = Instant::now();
33 let snapshot_interval = 10;
34 loop {
35 if let TickReason::Shutdown = self.tick_service.tick(Duration::from_secs(snapshot_interval)).await {
36 tokio::time::sleep(Duration::from_millis(500)).await;
38 break;
39 }
40
41 let snapshot = self.counters.snapshot();
42 if snapshot == last_snapshot {
43 last_log_time = Instant::now();
45 continue;
46 }
47
48 let delta = &snapshot - &last_snapshot;
50 let now = Instant::now();
51
52 info!(
53 "Processed {} blocks and {} headers in the last {:.2}s ({} transactions; {} UTXO-validated blocks; {:.2} parents; {:.2} mergeset; {:.2} TPB; {:.1} mass)",
54 delta.body_counts,
55 delta.header_counts,
56 (now - last_log_time).as_secs_f64(),
57 delta.txs_counts,
58 delta.chain_block_counts,
59 if delta.header_counts != 0 { delta.dep_counts as f64 / delta.header_counts as f64 } else { 0f64 },
60 if delta.header_counts != 0 { delta.mergeset_counts as f64 / delta.header_counts as f64 } else { 0f64 },
61 if delta.body_counts != 0 { delta.txs_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
62 if delta.body_counts != 0 { delta.mass_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
63 );
64
65 if delta.chain_disqualified_counts > 0 {
66 warn!(
67 "Consensus detected UTXO-invalid blocks which are disqualified from the virtual selected chain (possibly due to inheritance): {} disqualified vs. {} valid chain blocks",
68 delta.chain_disqualified_counts, delta.chain_block_counts
69 );
70 }
71
72 last_snapshot = snapshot;
73 last_log_time = now;
74 }
75
76 trace!("monitor thread exiting");
77 }
78}
79
80impl AsyncService for ConsensusMonitor {
82 fn ident(self: Arc<Self>) -> &'static str {
83 MONITOR
84 }
85
86 fn start(self: Arc<Self>) -> AsyncServiceFuture {
87 Box::pin(async move {
88 self.worker().await;
89 Ok(())
90 })
91 }
92
93 fn signal_exit(self: Arc<Self>) {
94 trace!("sending an exit signal to {}", MONITOR);
95 }
96
97 fn stop(self: Arc<Self>) -> AsyncServiceFuture {
98 Box::pin(async move {
99 trace!("{} stopped", MONITOR);
100 Ok(())
101 })
102 }
103}