kaspa_cli_lib/modules/
node.rs

1use crate::imports::*;
2use kaspa_daemon::KaspadConfig;
3use workflow_core::task::sleep;
4use workflow_node::process;
5pub use workflow_node::process::Event;
6use workflow_store::fs;
7
8#[derive(Describe, Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq, Ord, PartialOrd)]
9#[serde(rename_all = "lowercase")]
10pub enum KaspadSettings {
11    #[describe("Binary location")]
12    Location,
13    #[describe("Mute logs")]
14    Mute,
15}
16
17#[async_trait]
18impl DefaultSettings for KaspadSettings {
19    async fn defaults() -> Vec<(Self, Value)> {
20        let mut settings = vec![(Self::Mute, to_value(true).unwrap())];
21
22        let root = nw_sys::app::folder();
23        if let Ok(binaries) = kaspa_daemon::locate_binaries(&root, "kaspad").await {
24            if let Some(path) = binaries.first() {
25                settings.push((Self::Location, to_value(path.to_string_lossy().to_string()).unwrap()));
26            }
27        }
28
29        settings
30    }
31}
32
33pub struct Node {
34    settings: SettingsStore<KaspadSettings>,
35    mute: Arc<AtomicBool>,
36    is_running: Arc<AtomicBool>,
37}
38
39impl Default for Node {
40    fn default() -> Self {
41        Node {
42            settings: SettingsStore::try_new("kaspad").expect("Failed to create node settings store"),
43            mute: Arc::new(AtomicBool::new(true)),
44            is_running: Arc::new(AtomicBool::new(false)),
45        }
46    }
47}
48
49#[async_trait]
50impl Handler for Node {
51    fn verb(&self, ctx: &Arc<dyn Context>) -> Option<&'static str> {
52        if let Ok(ctx) = ctx.clone().downcast_arc::<KaspaCli>() {
53            ctx.daemons().clone().kaspad.as_ref().map(|_| "node")
54        } else {
55            None
56        }
57    }
58
59    fn help(&self, _ctx: &Arc<dyn Context>) -> &'static str {
60        "Manage the local Kaspa node instance"
61    }
62
63    async fn start(self: Arc<Self>, _ctx: &Arc<dyn Context>) -> cli::Result<()> {
64        self.settings.try_load().await.ok();
65        if let Some(mute) = self.settings.get(KaspadSettings::Mute) {
66            self.mute.store(mute, Ordering::Relaxed);
67        }
68        Ok(())
69    }
70
71    async fn handle(self: Arc<Self>, ctx: &Arc<dyn Context>, argv: Vec<String>, cmd: &str) -> cli::Result<()> {
72        let ctx = ctx.clone().downcast_arc::<KaspaCli>()?;
73        self.main(ctx, argv, cmd).await.map_err(|e| e.into())
74    }
75}
76
77impl Node {
78    pub fn is_running(&self) -> bool {
79        self.is_running.load(Ordering::SeqCst)
80    }
81
82    async fn create_config(&self, ctx: &Arc<KaspaCli>) -> Result<KaspadConfig> {
83        let location: String = self
84            .settings
85            .get(KaspadSettings::Location)
86            .ok_or_else(|| Error::Custom("No miner binary specified, please use `miner select` to select a binary.".into()))?;
87        let network_id = ctx.wallet().network_id()?;
88        // disabled for prompt update (until progress events are implemented)
89        // let mute = self.mute.load(Ordering::SeqCst);
90        let mute = false;
91        let config = KaspadConfig::new(location.as_str(), network_id, mute);
92        Ok(config)
93    }
94
95    async fn main(self: Arc<Self>, ctx: Arc<KaspaCli>, mut argv: Vec<String>, cmd: &str) -> Result<()> {
96        if argv.is_empty() {
97            return self.display_help(ctx, argv).await;
98        }
99        let kaspad = ctx.daemons().kaspad();
100        match argv.remove(0).as_str() {
101            "start" => {
102                let mute = self.mute.load(Ordering::SeqCst);
103                if mute {
104                    tprintln!(ctx, "starting kaspa node... {}", style("(logs are muted, use 'node mute' to toggle)").dim());
105                } else {
106                    tprintln!(ctx, "starting kaspa node... {}", style("(use 'node mute' to mute logging)").dim());
107                }
108
109                let wrpc_client = ctx.wallet().try_wrpc_client().ok_or(Error::custom("Unable to start node with non-wRPC client"))?;
110
111                kaspad.configure(self.create_config(&ctx).await?).await?;
112                kaspad.start().await?;
113
114                // temporary setup for auto-connect
115                let url = ctx.wallet().settings().get(WalletSettings::Server);
116                let network_type = ctx.wallet().network_id()?;
117                if let Some(url) = url
118                    .map(|url| wrpc_client.parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string()))
119                    .transpose()?
120                {
121                    // log_info!("connecting to url: {}", url);
122                    if url.contains("127.0.0.1") || url.contains("localhost") {
123                        spawn(async move {
124                            let options = ConnectOptions {
125                                block_async_connect: true,
126                                strategy: ConnectStrategy::Fallback,
127                                url: Some(url),
128                                ..Default::default()
129                            };
130                            for _ in 0..5 {
131                                sleep(Duration::from_millis(1000)).await;
132                                if wrpc_client.connect(Some(options.clone())).await.is_ok() {
133                                    break;
134                                }
135                            }
136                        });
137                    }
138                }
139            }
140            "stop" => {
141                kaspad.stop().await?;
142            }
143            "restart" => {
144                kaspad.configure(self.create_config(&ctx).await?).await?;
145                kaspad.restart().await?;
146            }
147            "kill" => {
148                kaspad.kill().await?;
149            }
150            "mute" | "logs" => {
151                let mute = !self.mute.load(Ordering::SeqCst);
152                self.mute.store(mute, Ordering::SeqCst);
153                if mute {
154                    tprintln!(ctx, "{}", style("node is muted").dim());
155                } else {
156                    tprintln!(ctx, "{}", style("node is unmuted").dim());
157                }
158                // kaspad.mute(mute).await?;
159                self.settings.set(KaspadSettings::Mute, mute).await?;
160            }
161            "status" => {
162                let status = kaspad.status().await?;
163                tprintln!(ctx, "{}", status);
164            }
165            "select" => {
166                let regex = Regex::new(r"(?i)^\s*node\s+select\s+").unwrap();
167                let path = regex.replace(cmd, "").trim().to_string();
168                self.select(ctx, path.is_not_empty().then_some(path)).await?;
169            }
170            "version" => {
171                kaspad.configure(self.create_config(&ctx).await?).await?;
172                let version = kaspad.version().await?;
173                tprintln!(ctx, "{}", version);
174            }
175            v => {
176                tprintln!(ctx, "unknown command: '{v}'\r\n");
177
178                return self.display_help(ctx, argv).await;
179            }
180        }
181
182        Ok(())
183    }
184
185    async fn display_help(self: Arc<Self>, ctx: Arc<KaspaCli>, _argv: Vec<String>) -> Result<()> {
186        ctx.term().help(
187            &[
188                ("select", "Select Kaspad executable (binary) location"),
189                ("version", "Display Kaspad executable version"),
190                ("start", "Start the local Kaspa node instance"),
191                ("stop", "Stop the local Kaspa node instance"),
192                ("restart", "Restart the local Kaspa node instance"),
193                ("kill", "Kill the local Kaspa node instance"),
194                ("status", "Get the status of the local Kaspa node instance"),
195                ("mute", "Toggle log output"),
196            ],
197            None,
198        )?;
199
200        Ok(())
201    }
202
203    async fn select(self: Arc<Self>, ctx: Arc<KaspaCli>, path: Option<String>) -> Result<()> {
204        let root = nw_sys::app::folder();
205
206        match path {
207            None => {
208                let binaries = kaspa_daemon::locate_binaries(root.as_str(), "kaspad").await?;
209
210                if binaries.is_empty() {
211                    tprintln!(ctx, "No kaspad binaries found");
212                } else {
213                    let binaries = binaries.iter().map(|p| p.display().to_string()).collect::<Vec<_>>();
214                    if let Some(selection) = ctx.term().select("Please select a kaspad binary", &binaries).await? {
215                        tprintln!(ctx, "selecting: {}", selection);
216                        self.settings.set(KaspadSettings::Location, selection.as_str()).await?;
217                    } else {
218                        tprintln!(ctx, "no selection is made");
219                    }
220                }
221            }
222            Some(path) => {
223                if fs::exists(&path).await? {
224                    let version = process::version(&path).await?;
225                    tprintln!(ctx, "detected binary version: {}", version);
226                    tprintln!(ctx, "selecting: {path}");
227                    self.settings.set(KaspadSettings::Location, path.as_str()).await?;
228                } else {
229                    twarnln!(ctx, "destination binary not found, please specify full path including the binary name");
230                    twarnln!(ctx, "example: 'node select /home/user/testnet/kaspad'");
231                    tprintln!(ctx, "no selection is made");
232                }
233            }
234        }
235
236        Ok(())
237    }
238
239    pub async fn handle_event(&self, ctx: &Arc<KaspaCli>, event: Event) -> Result<()> {
240        let term = ctx.term();
241
242        match event {
243            Event::Start => {
244                self.is_running.store(true, Ordering::SeqCst);
245                term.refresh_prompt();
246            }
247            Event::Exit(_code) => {
248                tprintln!(ctx, "Kaspad has exited");
249                self.is_running.store(false, Ordering::SeqCst);
250                term.refresh_prompt();
251            }
252            Event::Error(error) => {
253                tprintln!(ctx, "{}", style(format!("Kaspad error: {error}")).red());
254                self.is_running.store(false, Ordering::SeqCst);
255                term.refresh_prompt();
256            }
257            Event::Stdout(text) | Event::Stderr(text) => {
258                if !ctx.wallet().utxo_processor().is_synced() {
259                    ctx.wallet().utxo_processor().sync_proc().handle_stdout(&text).await?;
260                }
261
262                if !self.mute.load(Ordering::SeqCst) {
263                    let sanitize = true;
264                    if sanitize {
265                        let lines = text.split('\n').collect::<Vec<_>>();
266                        lines.into_iter().for_each(|line| {
267                            let line = line.trim();
268                            if !line.is_empty() {
269                                if line.len() < 38 || &line[30..31] != "[" {
270                                    term.writeln(line);
271                                } else {
272                                    let time = &line[11..23];
273                                    let kind = &line[31..36];
274                                    let text = &line[38..];
275
276                                    match kind {
277                                        "WARN " => {
278                                            term.writeln(format!("{time} {}", style(text).yellow()));
279                                        }
280                                        "ERROR" => {
281                                            term.writeln(format!("{time} {}", style(text).red()));
282                                        }
283                                        _ => {
284                                            if text.starts_with("Processed") {
285                                                term.writeln(format!("{time} {}", style(text).blue()));
286                                            } else {
287                                                term.writeln(format!("{time} {text}"));
288                                            }
289                                        }
290                                    }
291                                }
292                            }
293                        });
294                    } else {
295                        term.writeln(text.trim().crlf());
296                    }
297                }
298            }
299        }
300        Ok(())
301    }
302}