kaspa_mining/
monitor.rs

1use super::MiningCounters;
2use crate::manager::MiningManagerProxy;
3use kaspa_core::{
4    debug, info,
5    task::{
6        service::{AsyncService, AsyncServiceFuture},
7        tick::{TickReason, TickService},
8    },
9    trace,
10};
11use kaspa_txscript::caches::TxScriptCacheCounters;
12use std::{sync::Arc, time::Duration};
13
14const MONITOR: &str = "mempool-monitor";
15
16pub struct MiningMonitor {
17    mining_manager: MiningManagerProxy,
18
19    // Counters
20    counters: Arc<MiningCounters>,
21
22    tx_script_cache_counters: Arc<TxScriptCacheCounters>,
23
24    // Tick service
25    tick_service: Arc<TickService>,
26}
27
28impl MiningMonitor {
29    pub fn new(
30        mining_manager: MiningManagerProxy,
31        counters: Arc<MiningCounters>,
32        tx_script_cache_counters: Arc<TxScriptCacheCounters>,
33        tick_service: Arc<TickService>,
34    ) -> MiningMonitor {
35        MiningMonitor { mining_manager, counters, tx_script_cache_counters, tick_service }
36    }
37
38    pub async fn worker(self: &Arc<MiningMonitor>) {
39        let mut last_snapshot = self.counters.snapshot();
40        let mut last_tx_script_cache_snapshot = self.tx_script_cache_counters.snapshot();
41        let snapshot_interval = 10;
42        loop {
43            if let TickReason::Shutdown = self.tick_service.tick(Duration::from_secs(snapshot_interval)).await {
44                // Let the system print final logs before exiting
45                tokio::time::sleep(Duration::from_millis(500)).await;
46                break;
47            }
48
49            let snapshot = self.counters.snapshot();
50            let tx_script_cache_snapshot = self.tx_script_cache_counters.snapshot();
51            if snapshot == last_snapshot {
52                // No update, avoid printing useless info
53                continue;
54            }
55
56            // Subtract the snapshots
57            let delta = &snapshot - &last_snapshot;
58            let tx_script_cache_delta = &tx_script_cache_snapshot - &last_tx_script_cache_snapshot;
59
60            if delta.has_tps_activity() {
61                info!(
62                    "Tx throughput stats: {:.2} u-tps, {:.2}% e-tps (in: {} via RPC, {} via P2P, out: {} via accepted blocks)",
63                    delta.u_tps(),
64                    delta.e_tps() * 100.0,
65                    delta.high_priority_tx_counts,
66                    delta.low_priority_tx_counts,
67                    delta.tx_accepted_counts,
68                );
69                let feerate_estimations = self.mining_manager.clone().get_realtime_feerate_estimations().await;
70                debug!("Realtime feerate estimations: {}", feerate_estimations);
71            }
72            if delta.tx_evicted_counts > 0 {
73                info!(
74                    "Mempool stats: {} transactions were evicted from the mempool in favor of incoming higher feerate transactions",
75                    delta.tx_evicted_counts
76                );
77            }
78            if tx_script_cache_snapshot != last_tx_script_cache_snapshot {
79                debug!(
80                    "UTXO set stats: {} spent, {} created ({} signatures validated, {} cache hits, {:.2} hit ratio)",
81                    delta.input_counts,
82                    delta.output_counts,
83                    tx_script_cache_delta.insert_counts,
84                    tx_script_cache_delta.get_counts,
85                    tx_script_cache_delta.hit_ratio()
86                );
87            }
88            if delta.txs_sample + delta.orphans_sample > 0 {
89                debug!(
90                    "Mempool sample: {} ready out of {} txs, {} orphans, {} cached as accepted",
91                    delta.ready_txs_sample, delta.txs_sample, delta.orphans_sample, delta.accepted_sample
92                );
93            }
94
95            last_snapshot = snapshot;
96            last_tx_script_cache_snapshot = tx_script_cache_snapshot;
97        }
98
99        trace!("mempool monitor thread exiting");
100    }
101}
102
103// service trait implementation for Monitor
104impl AsyncService for MiningMonitor {
105    fn ident(self: Arc<Self>) -> &'static str {
106        MONITOR
107    }
108
109    fn start(self: Arc<Self>) -> AsyncServiceFuture {
110        Box::pin(async move {
111            self.worker().await;
112            Ok(())
113        })
114    }
115
116    fn signal_exit(self: Arc<Self>) {
117        trace!("sending an exit signal to {}", MONITOR);
118    }
119
120    fn stop(self: Arc<Self>) -> AsyncServiceFuture {
121        Box::pin(async move {
122            trace!("{} stopped", MONITOR);
123            Ok(())
124        })
125    }
126}