1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use crate::imports::*;
use workflow_core::channel::*;
use workflow_terminal::clear::*;
use workflow_terminal::cursor::*;

pub struct Monitor {
    shutdown_tx: Arc<Mutex<Option<Sender<()>>>>,
}

impl Default for Monitor {
    fn default() -> Self {
        Monitor { shutdown_tx: Arc::new(Mutex::new(None)) }
    }
}

#[async_trait]
impl Handler for Monitor {
    fn verb(&self, _ctx: &Arc<dyn Context>) -> Option<&'static str> {
        Some("monitor")
    }

    fn help(&self, _ctx: &Arc<dyn Context>) -> &'static str {
        "Balance monitor"
    }

    async fn stop(self: Arc<Self>, _ctx: &Arc<dyn Context>) -> cli::Result<()> {
        let shutdown_tx = self.shutdown_tx.lock().unwrap().take();
        if let Some(shutdown_tx) = shutdown_tx {
            shutdown_tx.send(()).await.ok();
        }
        Ok(())
    }

    async fn handle(self: Arc<Self>, ctx: &Arc<dyn Context>, argv: Vec<String>, cmd: &str) -> cli::Result<()> {
        let ctx = ctx.clone().downcast_arc::<KaspaCli>()?;
        self.main(&ctx, argv, cmd).await.map_err(|e| e.into())
    }
}

impl Monitor {
    async fn main(self: Arc<Self>, ctx: &Arc<KaspaCli>, _argv: Vec<String>, _cmd: &str) -> Result<()> {
        let max_events = 16;
        let events = Arc::new(Mutex::new(VecDeque::new()));
        let events_rx = ctx.wallet().multiplexer().channel();

        let (shutdown_tx, shutdown_rx) = oneshot();
        self.shutdown_tx.lock().unwrap().replace(shutdown_tx.clone());
        let mut interval = interval(Duration::from_millis(1000));

        let term = ctx.term();
        spawn(async move {
            term.kbhit(None).await.ok();
            shutdown_tx.send(()).await.ok();
        });

        let ctx = ctx.clone();
        let this = self.clone();
        spawn(async move {
            loop {
                select! {

                    event = events_rx.recv().fuse() => {
                        if let Ok(event) = event {
                            let mut events = events.lock().unwrap();
                            events.push_front(event);
                            while events.len() > max_events {
                                events.pop_back();
                            }
                        }
                    }

                    _ = interval.next().fuse() => {
                        this.redraw(&ctx, &events).await.ok();
                        yield_executor().await;
                    }

                    _ = shutdown_rx.recv().fuse() => {
                        break;
                    }

                }
            }

            tprint!(ctx, "{}", ClearScreen);
            tprint!(ctx, "{}", Goto(1, 1));
            this.shutdown_tx.lock().unwrap().take();
            ctx.term().refresh_prompt();
        });

        Ok(())
    }

    async fn redraw(self: &Arc<Self>, ctx: &Arc<KaspaCli>, events: &Arc<Mutex<VecDeque<Box<Events>>>>) -> Result<()> {
        tprint!(ctx, "{}", ClearScreen);
        tprint!(ctx, "{}", Goto(1, 1));

        let wallet = ctx.wallet();

        if !wallet.is_connected() {
            tprintln!(ctx, "{}", style("Wallet is not connected to the network").magenta());
            tprintln!(ctx);
        } else if !wallet.is_synced() {
            tprintln!(ctx, "{}", style("Kaspa node is currently syncing").magenta());
            tprintln!(ctx);
        }

        ctx.list().await?;

        let events = events.lock().unwrap();
        events.iter().for_each(|event| match event.deref() {
            Events::DAAScoreChange { .. } => {}
            Events::Balance { balance, id } => {
                let network_id = wallet.network_id().expect("missing network type");
                let network_type = NetworkType::from(network_id);
                let balance_strings = BalanceStrings::from((balance, &network_type, None));
                let id = id.short();

                let mature_utxo_count =
                    balance.as_ref().map(|balance| balance.mature_utxo_count.separated_string()).unwrap_or("N/A".to_string());
                let pending_utxo_count = balance.as_ref().map(|balance| balance.pending_utxo_count).unwrap_or(0);

                let pending_utxo_info =
                    if pending_utxo_count > 0 { format!("({pending_utxo_count} pending)") } else { "".to_string() };
                let utxo_info = style(format!("{mature_utxo_count} UTXOs {pending_utxo_info}")).dim();

                tprintln!(ctx, "{} {id}: {balance_strings}   {utxo_info}", style("balance".pad_to_width(8)).blue());
            }
            _ => {}
        });

        Ok(())
    }
}