1use super::MiningCounters;
2use crate::manager::MiningManagerProxy;
3use kaspa_core::{
4 debug, info,
5 task::{
6 service::{AsyncService, AsyncServiceFuture},
7 tick::{TickReason, TickService},
8 },
9 trace,
10};
11use kaspa_txscript::caches::TxScriptCacheCounters;
12use std::{sync::Arc, time::Duration};
13
14const MONITOR: &str = "mempool-monitor";
15
16pub struct MiningMonitor {
17 mining_manager: MiningManagerProxy,
18
19 counters: Arc<MiningCounters>,
21
22 tx_script_cache_counters: Arc<TxScriptCacheCounters>,
23
24 tick_service: Arc<TickService>,
26}
27
28impl MiningMonitor {
29 pub fn new(
30 mining_manager: MiningManagerProxy,
31 counters: Arc<MiningCounters>,
32 tx_script_cache_counters: Arc<TxScriptCacheCounters>,
33 tick_service: Arc<TickService>,
34 ) -> MiningMonitor {
35 MiningMonitor { mining_manager, counters, tx_script_cache_counters, tick_service }
36 }
37
38 pub async fn worker(self: &Arc<MiningMonitor>) {
39 let mut last_snapshot = self.counters.snapshot();
40 let mut last_tx_script_cache_snapshot = self.tx_script_cache_counters.snapshot();
41 let snapshot_interval = 10;
42 loop {
43 if let TickReason::Shutdown = self.tick_service.tick(Duration::from_secs(snapshot_interval)).await {
44 tokio::time::sleep(Duration::from_millis(500)).await;
46 break;
47 }
48
49 let snapshot = self.counters.snapshot();
50 let tx_script_cache_snapshot = self.tx_script_cache_counters.snapshot();
51 if snapshot == last_snapshot {
52 continue;
54 }
55
56 let delta = &snapshot - &last_snapshot;
58 let tx_script_cache_delta = &tx_script_cache_snapshot - &last_tx_script_cache_snapshot;
59
60 if delta.has_tps_activity() {
61 info!(
62 "Tx throughput stats: {:.2} u-tps, {:.2}% e-tps (in: {} via RPC, {} via P2P, out: {} via accepted blocks)",
63 delta.u_tps(),
64 delta.e_tps() * 100.0,
65 delta.high_priority_tx_counts,
66 delta.low_priority_tx_counts,
67 delta.tx_accepted_counts,
68 );
69 let feerate_estimations = self.mining_manager.clone().get_realtime_feerate_estimations().await;
70 debug!("Realtime feerate estimations: {}", feerate_estimations);
71 }
72 if delta.tx_evicted_counts > 0 {
73 info!(
74 "Mempool stats: {} transactions were evicted from the mempool in favor of incoming higher feerate transactions",
75 delta.tx_evicted_counts
76 );
77 }
78 if tx_script_cache_snapshot != last_tx_script_cache_snapshot {
79 debug!(
80 "UTXO set stats: {} spent, {} created ({} signatures validated, {} cache hits, {:.2} hit ratio)",
81 delta.input_counts,
82 delta.output_counts,
83 tx_script_cache_delta.insert_counts,
84 tx_script_cache_delta.get_counts,
85 tx_script_cache_delta.hit_ratio()
86 );
87 }
88 if delta.txs_sample + delta.orphans_sample > 0 {
89 debug!(
90 "Mempool sample: {} ready out of {} txs, {} orphans, {} cached as accepted",
91 delta.ready_txs_sample, delta.txs_sample, delta.orphans_sample, delta.accepted_sample
92 );
93 }
94
95 last_snapshot = snapshot;
96 last_tx_script_cache_snapshot = tx_script_cache_snapshot;
97 }
98
99 trace!("mempool monitor thread exiting");
100 }
101}
102
103impl AsyncService for MiningMonitor {
105 fn ident(self: Arc<Self>) -> &'static str {
106 MONITOR
107 }
108
109 fn start(self: Arc<Self>) -> AsyncServiceFuture {
110 Box::pin(async move {
111 self.worker().await;
112 Ok(())
113 })
114 }
115
116 fn signal_exit(self: Arc<Self>) {
117 trace!("sending an exit signal to {}", MONITOR);
118 }
119
120 fn stop(self: Arc<Self>) -> AsyncServiceFuture {
121 Box::pin(async move {
122 trace!("{} stopped", MONITOR);
123 Ok(())
124 })
125 }
126}