1use crate::imports::*;
2use kaspa_daemon::KaspadConfig;
3use workflow_core::task::sleep;
4use workflow_node::process;
5pub use workflow_node::process::Event;
6use workflow_store::fs;
7
8#[derive(Describe, Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq, Ord, PartialOrd)]
9#[serde(rename_all = "lowercase")]
10pub enum KaspadSettings {
11 #[describe("Binary location")]
12 Location,
13 #[describe("Mute logs")]
14 Mute,
15}
16
17#[async_trait]
18impl DefaultSettings for KaspadSettings {
19 async fn defaults() -> Vec<(Self, Value)> {
20 let mut settings = vec![(Self::Mute, to_value(true).unwrap())];
21
22 let root = nw_sys::app::folder();
23 if let Ok(binaries) = kaspa_daemon::locate_binaries(&root, "kaspad").await {
24 if let Some(path) = binaries.first() {
25 settings.push((Self::Location, to_value(path.to_string_lossy().to_string()).unwrap()));
26 }
27 }
28
29 settings
30 }
31}
32
33pub struct Node {
34 settings: SettingsStore<KaspadSettings>,
35 mute: Arc<AtomicBool>,
36 is_running: Arc<AtomicBool>,
37}
38
39impl Default for Node {
40 fn default() -> Self {
41 Node {
42 settings: SettingsStore::try_new("kaspad").expect("Failed to create node settings store"),
43 mute: Arc::new(AtomicBool::new(true)),
44 is_running: Arc::new(AtomicBool::new(false)),
45 }
46 }
47}
48
49#[async_trait]
50impl Handler for Node {
51 fn verb(&self, ctx: &Arc<dyn Context>) -> Option<&'static str> {
52 if let Ok(ctx) = ctx.clone().downcast_arc::<KaspaCli>() {
53 ctx.daemons().clone().kaspad.as_ref().map(|_| "node")
54 } else {
55 None
56 }
57 }
58
59 fn help(&self, _ctx: &Arc<dyn Context>) -> &'static str {
60 "Manage the local Kaspa node instance"
61 }
62
63 async fn start(self: Arc<Self>, _ctx: &Arc<dyn Context>) -> cli::Result<()> {
64 self.settings.try_load().await.ok();
65 if let Some(mute) = self.settings.get(KaspadSettings::Mute) {
66 self.mute.store(mute, Ordering::Relaxed);
67 }
68 Ok(())
69 }
70
71 async fn handle(self: Arc<Self>, ctx: &Arc<dyn Context>, argv: Vec<String>, cmd: &str) -> cli::Result<()> {
72 let ctx = ctx.clone().downcast_arc::<KaspaCli>()?;
73 self.main(ctx, argv, cmd).await.map_err(|e| e.into())
74 }
75}
76
77impl Node {
78 pub fn is_running(&self) -> bool {
79 self.is_running.load(Ordering::SeqCst)
80 }
81
82 async fn create_config(&self, ctx: &Arc<KaspaCli>) -> Result<KaspadConfig> {
83 let location: String = self
84 .settings
85 .get(KaspadSettings::Location)
86 .ok_or_else(|| Error::Custom("No miner binary specified, please use `miner select` to select a binary.".into()))?;
87 let network_id = ctx.wallet().network_id()?;
88 let mute = false;
91 let config = KaspadConfig::new(location.as_str(), network_id, mute);
92 Ok(config)
93 }
94
95 async fn main(self: Arc<Self>, ctx: Arc<KaspaCli>, mut argv: Vec<String>, cmd: &str) -> Result<()> {
96 if argv.is_empty() {
97 return self.display_help(ctx, argv).await;
98 }
99 let kaspad = ctx.daemons().kaspad();
100 match argv.remove(0).as_str() {
101 "start" => {
102 let mute = self.mute.load(Ordering::SeqCst);
103 if mute {
104 tprintln!(ctx, "starting kaspa node... {}", style("(logs are muted, use 'node mute' to toggle)").dim());
105 } else {
106 tprintln!(ctx, "starting kaspa node... {}", style("(use 'node mute' to mute logging)").dim());
107 }
108
109 let wrpc_client = ctx.wallet().try_wrpc_client().ok_or(Error::custom("Unable to start node with non-wRPC client"))?;
110
111 kaspad.configure(self.create_config(&ctx).await?).await?;
112 kaspad.start().await?;
113
114 let url = ctx.wallet().settings().get(WalletSettings::Server);
116 let network_type = ctx.wallet().network_id()?;
117 if let Some(url) = url
118 .map(|url| wrpc_client.parse_url_with_network_type(url, network_type.into()).map_err(|e| e.to_string()))
119 .transpose()?
120 {
121 if url.contains("127.0.0.1") || url.contains("localhost") {
123 spawn(async move {
124 let options = ConnectOptions {
125 block_async_connect: true,
126 strategy: ConnectStrategy::Fallback,
127 url: Some(url),
128 ..Default::default()
129 };
130 for _ in 0..5 {
131 sleep(Duration::from_millis(1000)).await;
132 if wrpc_client.connect(Some(options.clone())).await.is_ok() {
133 break;
134 }
135 }
136 });
137 }
138 }
139 }
140 "stop" => {
141 kaspad.stop().await?;
142 }
143 "restart" => {
144 kaspad.configure(self.create_config(&ctx).await?).await?;
145 kaspad.restart().await?;
146 }
147 "kill" => {
148 kaspad.kill().await?;
149 }
150 "mute" | "logs" => {
151 let mute = !self.mute.load(Ordering::SeqCst);
152 self.mute.store(mute, Ordering::SeqCst);
153 if mute {
154 tprintln!(ctx, "{}", style("node is muted").dim());
155 } else {
156 tprintln!(ctx, "{}", style("node is unmuted").dim());
157 }
158 self.settings.set(KaspadSettings::Mute, mute).await?;
160 }
161 "status" => {
162 let status = kaspad.status().await?;
163 tprintln!(ctx, "{}", status);
164 }
165 "select" => {
166 let regex = Regex::new(r"(?i)^\s*node\s+select\s+").unwrap();
167 let path = regex.replace(cmd, "").trim().to_string();
168 self.select(ctx, path.is_not_empty().then_some(path)).await?;
169 }
170 "version" => {
171 kaspad.configure(self.create_config(&ctx).await?).await?;
172 let version = kaspad.version().await?;
173 tprintln!(ctx, "{}", version);
174 }
175 v => {
176 tprintln!(ctx, "unknown command: '{v}'\r\n");
177
178 return self.display_help(ctx, argv).await;
179 }
180 }
181
182 Ok(())
183 }
184
185 async fn display_help(self: Arc<Self>, ctx: Arc<KaspaCli>, _argv: Vec<String>) -> Result<()> {
186 ctx.term().help(
187 &[
188 ("select", "Select Kaspad executable (binary) location"),
189 ("version", "Display Kaspad executable version"),
190 ("start", "Start the local Kaspa node instance"),
191 ("stop", "Stop the local Kaspa node instance"),
192 ("restart", "Restart the local Kaspa node instance"),
193 ("kill", "Kill the local Kaspa node instance"),
194 ("status", "Get the status of the local Kaspa node instance"),
195 ("mute", "Toggle log output"),
196 ],
197 None,
198 )?;
199
200 Ok(())
201 }
202
203 async fn select(self: Arc<Self>, ctx: Arc<KaspaCli>, path: Option<String>) -> Result<()> {
204 let root = nw_sys::app::folder();
205
206 match path {
207 None => {
208 let binaries = kaspa_daemon::locate_binaries(root.as_str(), "kaspad").await?;
209
210 if binaries.is_empty() {
211 tprintln!(ctx, "No kaspad binaries found");
212 } else {
213 let binaries = binaries.iter().map(|p| p.display().to_string()).collect::<Vec<_>>();
214 if let Some(selection) = ctx.term().select("Please select a kaspad binary", &binaries).await? {
215 tprintln!(ctx, "selecting: {}", selection);
216 self.settings.set(KaspadSettings::Location, selection.as_str()).await?;
217 } else {
218 tprintln!(ctx, "no selection is made");
219 }
220 }
221 }
222 Some(path) => {
223 if fs::exists(&path).await? {
224 let version = process::version(&path).await?;
225 tprintln!(ctx, "detected binary version: {}", version);
226 tprintln!(ctx, "selecting: {path}");
227 self.settings.set(KaspadSettings::Location, path.as_str()).await?;
228 } else {
229 twarnln!(ctx, "destination binary not found, please specify full path including the binary name");
230 twarnln!(ctx, "example: 'node select /home/user/testnet/kaspad'");
231 tprintln!(ctx, "no selection is made");
232 }
233 }
234 }
235
236 Ok(())
237 }
238
239 pub async fn handle_event(&self, ctx: &Arc<KaspaCli>, event: Event) -> Result<()> {
240 let term = ctx.term();
241
242 match event {
243 Event::Start => {
244 self.is_running.store(true, Ordering::SeqCst);
245 term.refresh_prompt();
246 }
247 Event::Exit(_code) => {
248 tprintln!(ctx, "Kaspad has exited");
249 self.is_running.store(false, Ordering::SeqCst);
250 term.refresh_prompt();
251 }
252 Event::Error(error) => {
253 tprintln!(ctx, "{}", style(format!("Kaspad error: {error}")).red());
254 self.is_running.store(false, Ordering::SeqCst);
255 term.refresh_prompt();
256 }
257 Event::Stdout(text) | Event::Stderr(text) => {
258 if !ctx.wallet().utxo_processor().is_synced() {
259 ctx.wallet().utxo_processor().sync_proc().handle_stdout(&text).await?;
260 }
261
262 if !self.mute.load(Ordering::SeqCst) {
263 let sanitize = true;
264 if sanitize {
265 let lines = text.split('\n').collect::<Vec<_>>();
266 lines.into_iter().for_each(|line| {
267 let line = line.trim();
268 if !line.is_empty() {
269 if line.len() < 38 || &line[30..31] != "[" {
270 term.writeln(line);
271 } else {
272 let time = &line[11..23];
273 let kind = &line[31..36];
274 let text = &line[38..];
275
276 match kind {
277 "WARN " => {
278 term.writeln(format!("{time} {}", style(text).yellow()));
279 }
280 "ERROR" => {
281 term.writeln(format!("{time} {}", style(text).red()));
282 }
283 _ => {
284 if text.starts_with("Processed") {
285 term.writeln(format!("{time} {}", style(text).blue()));
286 } else {
287 term.writeln(format!("{time} {text}"));
288 }
289 }
290 }
291 }
292 }
293 });
294 } else {
295 term.writeln(text.trim().crlf());
296 }
297 }
298 }
299 }
300 Ok(())
301 }
302}