kaspa_cli_lib/modules/
monitor.rs1use crate::imports::*;
2use workflow_core::channel::*;
3use workflow_terminal::clear::*;
4use workflow_terminal::cursor::*;
5
6pub struct Monitor {
7 shutdown_tx: Arc<Mutex<Option<Sender<()>>>>,
8}
9
10impl Default for Monitor {
11 fn default() -> Self {
12 Monitor { shutdown_tx: Arc::new(Mutex::new(None)) }
13 }
14}
15
16#[async_trait]
17impl Handler for Monitor {
18 fn verb(&self, _ctx: &Arc<dyn Context>) -> Option<&'static str> {
19 Some("monitor")
20 }
21
22 fn help(&self, _ctx: &Arc<dyn Context>) -> &'static str {
23 "Balance monitor"
24 }
25
26 async fn stop(self: Arc<Self>, _ctx: &Arc<dyn Context>) -> cli::Result<()> {
27 let shutdown_tx = self.shutdown_tx.lock().unwrap().take();
28 if let Some(shutdown_tx) = shutdown_tx {
29 shutdown_tx.send(()).await.ok();
30 }
31 Ok(())
32 }
33
34 async fn handle(self: Arc<Self>, ctx: &Arc<dyn Context>, argv: Vec<String>, cmd: &str) -> cli::Result<()> {
35 let ctx = ctx.clone().downcast_arc::<KaspaCli>()?;
36 self.main(&ctx, argv, cmd).await.map_err(|e| e.into())
37 }
38}
39
40impl Monitor {
41 async fn main(self: Arc<Self>, ctx: &Arc<KaspaCli>, _argv: Vec<String>, _cmd: &str) -> Result<()> {
42 let max_events = 16;
43 let events = Arc::new(Mutex::new(VecDeque::new()));
44 let events_rx = ctx.wallet().multiplexer().channel();
45
46 let (shutdown_tx, shutdown_rx) = oneshot();
47 self.shutdown_tx.lock().unwrap().replace(shutdown_tx.clone());
48 let mut interval = interval(Duration::from_millis(1000));
49
50 let term = ctx.term();
51 spawn(async move {
52 term.kbhit(None).await.ok();
53 shutdown_tx.send(()).await.ok();
54 });
55
56 let ctx = ctx.clone();
57 let this = self.clone();
58 spawn(async move {
59 loop {
60 select! {
61
62 event = events_rx.recv().fuse() => {
63 if let Ok(event) = event {
64 let mut events = events.lock().unwrap();
65 events.push_front(event);
66 while events.len() > max_events {
67 events.pop_back();
68 }
69 }
70 }
71
72 _ = interval.next().fuse() => {
73 this.redraw(&ctx, &events).await.ok();
74 yield_executor().await;
75 }
76
77 _ = shutdown_rx.recv().fuse() => {
78 break;
79 }
80
81 }
82 }
83
84 tprint!(ctx, "{}", ClearScreen);
85 tprint!(ctx, "{}", Goto(1, 1));
86 this.shutdown_tx.lock().unwrap().take();
87 ctx.term().refresh_prompt();
88 });
89
90 Ok(())
91 }
92
93 async fn redraw(self: &Arc<Self>, ctx: &Arc<KaspaCli>, events: &Arc<Mutex<VecDeque<Box<Events>>>>) -> Result<()> {
94 tprint!(ctx, "{}", ClearScreen);
95 tprint!(ctx, "{}", Goto(1, 1));
96
97 let wallet = ctx.wallet();
98
99 if !wallet.is_connected() {
100 tprintln!(ctx, "{}", style("Wallet is not connected to the network").magenta());
101 tprintln!(ctx);
102 } else if !wallet.is_synced() {
103 tprintln!(ctx, "{}", style("Kaspa node is currently syncing").magenta());
104 tprintln!(ctx);
105 }
106
107 ctx.list().await?;
108
109 let events = events.lock().unwrap();
110 events.iter().for_each(|event| match event.deref() {
111 Events::DaaScoreChange { .. } => {}
112 Events::Balance { balance, id } => {
113 let network_id = wallet.network_id().expect("missing network type");
114 let network_type = NetworkType::from(network_id);
115 let balance_strings = BalanceStrings::from((balance.as_ref(), &network_type, None));
116 let id = id.short();
117
118 let mature_utxo_count =
119 balance.as_ref().map(|balance| balance.mature_utxo_count.separated_string()).unwrap_or("N/A".to_string());
120 let pending_utxo_count = balance.as_ref().map(|balance| balance.pending_utxo_count).unwrap_or(0);
121
122 let pending_utxo_info =
123 if pending_utxo_count > 0 { format!("({pending_utxo_count} pending)") } else { "".to_string() };
124 let utxo_info = style(format!("{mature_utxo_count} UTXOs {pending_utxo_info}")).dim();
125
126 tprintln!(ctx, "{} {id}: {balance_strings} {utxo_info}", style("balance".pad_to_width(8)).blue());
127 }
128 _ => {}
129 });
130
131 Ok(())
132 }
133}