1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::diagnostics_port;
3use crate::mcp::{self, McpArgs};
4use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
5use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, STATE_FILE_NAME, State};
6use anyhow::{Result, anyhow};
7use backoff::ExponentialBackoff;
8use backoff::backoff::Backoff;
9use casper_types::U512;
10use casper_types::contract_messages::MessagePayload;
11use casper_types::execution::ExecutionResult;
12use clap::{Args, Parser, Subcommand};
13use directories::BaseDirs;
14use futures::StreamExt;
15use nix::errno::Errno;
16use nix::sys::signal::kill;
17use nix::unistd::Pid;
18use serde::Deserialize;
19use spinners::{Spinner, Spinners};
20use std::collections::{HashMap, HashSet};
21use std::os::unix::process::ExitStatusExt;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::sync::atomic::Ordering;
25use std::time::Duration;
26use tokio::fs as tokio_fs;
27use tokio::sync::Mutex;
28use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
29use veles_casper_rust_sdk::sse::event::SseEvent;
30use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
31
32#[derive(Parser)]
34#[command(name = "nctl")]
35#[command(
36 about = "casper-devnet launcher for local Casper Network development networks",
37 long_about = None
38)]
39pub struct Cli {
40 #[command(subcommand)]
41 command: Command,
42}
43
44#[derive(Subcommand)]
46enum Command {
47 Start(StartArgs),
49 Mcp(McpArgs),
51 Assets(AssetsArgs),
53 IsReady(IsReadyArgs),
55}
56
57#[derive(Args, Clone)]
59struct StartArgs {
60 #[arg(long, default_value = "casper-dev")]
62 network_name: String,
63
64 #[arg(long, value_name = "PATH")]
66 net_path: Option<PathBuf>,
67
68 #[arg(long)]
70 protocol_version: Option<String>,
71
72 #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
74 node_count: u32,
75
76 #[arg(long)]
78 users: Option<u32>,
79
80 #[arg(long, default_value_t = 3)]
82 delay: u64,
83
84 #[arg(long = "log-level", default_value = "info")]
86 log_level: String,
87
88 #[arg(long, default_value = "json")]
90 node_log_format: String,
91
92 #[arg(long)]
94 setup_only: bool,
95
96 #[arg(long)]
98 force_setup: bool,
99
100 #[arg(long, default_value = "default")]
102 seed: Arc<str>,
103}
104
105#[derive(Args)]
107struct AssetsArgs {
108 #[command(subcommand)]
109 command: AssetsCommand,
110}
111
112#[derive(Subcommand)]
114enum AssetsCommand {
115 Add(AssetsAddArgs),
117 Pull(AssetsPullArgs),
119 List,
121}
122
123#[derive(Args, Clone)]
125struct AssetsAddArgs {
126 #[arg(value_name = "PATH")]
128 path: PathBuf,
129}
130
131#[derive(Args, Clone)]
133struct AssetsPullArgs {
134 #[arg(long)]
136 target: Option<String>,
137
138 #[arg(long)]
140 force: bool,
141}
142
143#[derive(Args, Clone)]
145struct IsReadyArgs {
146 #[arg(long, default_value = "casper-dev")]
148 network_name: String,
149
150 #[arg(long, value_name = "PATH")]
152 net_path: Option<PathBuf>,
153}
154
155pub async fn run() -> Result<()> {
157 let cli = Cli::parse();
158 match cli.command {
159 Command::Start(args) => run_start(args).await,
160 Command::Mcp(args) => mcp::run(args).await,
161 Command::Assets(args) => run_assets(args).await,
162 Command::IsReady(args) => run_is_ready(args).await,
163 }
164}
165
166async fn run_start(args: StartArgs) -> Result<()> {
167 let assets_root = match &args.net_path {
168 Some(path) => path.clone(),
169 None => assets::default_assets_root()?,
170 };
171 let layout = AssetsLayout::new(assets_root, args.network_name.clone());
172 let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
173 println!("assets path: {}", assets_path);
174 let assets_exist = layout.exists().await;
175 if !args.setup_only && !args.force_setup && assets_exist {
176 println!("resuming network operations on {}", layout.network_name());
177 }
178 let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
179
180 if args.setup_only {
181 return run_setup_only(&layout, &args, &protocol_version).await;
182 }
183
184 if args.force_setup {
185 assets::teardown(&layout).await?;
186 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
187 } else if !assets_exist {
188 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
189 }
190
191 if !layout.exists().await {
192 return Err(anyhow!(
193 "assets missing under {}; run with --setup-only to create them",
194 shorten_home_path(&layout.net_dir().display().to_string())
195 ));
196 }
197
198 if !args.force_setup && assets_exist {
199 let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
200 if restored > 0 {
201 println!("recreated consensus keys for {} node(s)", restored);
202 }
203 }
204
205 let rust_log = args.log_level.clone();
206
207 let plan = StartPlan { rust_log };
208
209 let state_path = layout.net_dir().join(STATE_FILE_NAME);
210 let state = Arc::new(Mutex::new(State::new(state_path).await?));
211 let started = {
212 let mut state = state.lock().await;
213 process::start(&layout, &plan, &mut state).await?
214 };
215
216 print_pids(&started);
217 print_start_banner(&layout, &started).await;
218 print_derived_accounts_summary(&layout).await;
219
220 let node_ids = unique_node_ids(&started);
221 let details = format_network_details(&layout, &started).await;
222 let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
223 start_sse_spinner(&health).await;
224 spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
225 let mut diagnostics_proxy = match diagnostics_port::spawn(&layout).await {
226 Ok(proxy) => Some(proxy),
227 Err(err) => {
228 eprintln!("warning: failed to start diagnostics proxy: {}", err);
229 None
230 }
231 };
232
233 let (event_tx, mut event_rx) = unbounded_channel();
234 spawn_ctrlc_listener(event_tx.clone());
235 spawn_exit_watchers(started, event_tx);
236
237 if let Some(event) = event_rx.recv().await {
238 match event {
239 RunEvent::CtrlC => {
240 if let Some(proxy) = diagnostics_proxy.take() {
241 proxy.shutdown();
242 }
243 let mut state = state.lock().await;
244 process::stop(&mut state).await?;
245 }
246 RunEvent::ProcessExit {
247 id,
248 pid,
249 code,
250 signal,
251 } => {
252 if let Some(proxy) = diagnostics_proxy.take() {
253 proxy.shutdown();
254 }
255 let mut state = state.lock().await;
256 update_exited_process(&mut state, &id, code, signal).await?;
257 log_exit(&id, pid, code, signal);
258 process::stop(&mut state).await?;
259 }
260 }
261 }
262
263 Ok(())
264}
265
266async fn run_setup_only(
267 layout: &AssetsLayout,
268 args: &StartArgs,
269 protocol_version: &str,
270) -> Result<()> {
271 if args.force_setup {
272 assets::teardown(layout).await?;
273 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
274 print_derived_accounts_summary(layout).await;
275 return Ok(());
276 }
277
278 if layout.exists().await {
279 println!(
280 "assets already exist at {}; use --force-setup to rebuild",
281 shorten_home_path(&layout.net_dir().display().to_string())
282 );
283 print_derived_accounts_summary(layout).await;
284 return Ok(());
285 }
286
287 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
288 print_derived_accounts_summary(layout).await;
289 Ok(())
290}
291
292fn record_pid(record: &ProcessRecord) -> Option<u32> {
293 if let Some(handle) = &record.pid_handle {
294 let pid = handle.load(Ordering::SeqCst);
295 if pid != 0 {
296 return Some(pid);
297 }
298 }
299 record.pid
300}
301
302fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
303 SetupOptions {
304 nodes: args.node_count,
305 users: args.users,
306 delay_seconds: args.delay,
307 network_name: args.network_name.clone(),
308 protocol_version: protocol_version.to_string(),
309 node_log_format: args.node_log_format.clone(),
310 seed: Arc::clone(&args.seed),
311 }
312}
313
314fn print_pids(records: &[RunningProcess]) {
315 for record in records {
316 if let Some(pid) = record_pid(&record.record) {
317 println!(
318 "{} pid={} ({:?})",
319 record.record.id, pid, record.record.kind
320 );
321 }
322 }
323}
324
325async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
326 let symlink_root = layout.net_dir();
327 let mut node_pids: HashMap<u32, u32> = HashMap::new();
328 let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
329 let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
330
331 for process in processes {
332 if let Some(pid) = record_pid(&process.record) {
333 match process.record.kind {
334 ProcessKind::Node => {
335 node_pids.insert(process.record.node_id, pid);
336 }
337 ProcessKind::Sidecar => {
338 sidecar_pids.insert(process.record.node_id, pid);
339 }
340 }
341 process_logs
342 .entry(process.record.node_id)
343 .or_default()
344 .push((process.record.kind.clone(), pid));
345 }
346 }
347
348 let node_ids = unique_node_ids(processes);
349
350 let mut lines = Vec::new();
351 lines.push("network details".to_string());
352 for node_id in node_ids {
353 let node_pid = node_pids
354 .get(&node_id)
355 .map(|pid| pid.to_string())
356 .unwrap_or_else(|| "-".to_string());
357 let sidecar_pid = sidecar_pids
358 .get(&node_id)
359 .map(|pid| pid.to_string())
360 .unwrap_or_else(|| "-".to_string());
361 lines.push(format!(" node-{}", node_id));
362 lines.push(format!(
363 " pids: node={} sidecar={}",
364 node_pid, sidecar_pid
365 ));
366 if let Some(entries) = process_logs.get(&node_id) {
367 let mut entries = entries.clone();
368 entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
369 lines.push(" logs".to_string());
370 for (kind, pid) in entries {
371 let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
372 lines.push(format!(
373 " {} pid={} stdout={} stderr={}",
374 process_kind_label(&kind),
375 pid,
376 stdout_link,
377 stderr_link
378 ));
379 }
380 }
381 lines.push(" endpoints".to_string());
382 lines.push(format!(" rest: {}", assets::rest_endpoint(node_id)));
383 lines.push(format!(" sse: {}", assets::sse_endpoint(node_id)));
384 lines.push(format!(" rpc: {}", assets::rpc_endpoint(node_id)));
385 lines.push(format!(" binary: {}", assets::binary_address(node_id)));
386 lines.push(format!(
387 " diagnostics: {}",
388 assets::diagnostics_socket_path(layout.network_name(), node_id)
389 ));
390 lines.push(format!(
391 " diagnostics-ws: {}",
392 assets::diagnostics_ws_endpoint(node_id)
393 ));
394 lines.push(format!(
395 " gossip: {}",
396 assets::network_address(node_id)
397 ));
398 }
399
400 lines.join("\n")
401}
402
403fn process_kind_label(kind: &ProcessKind) -> &'static str {
404 match kind {
405 ProcessKind::Node => "node",
406 ProcessKind::Sidecar => "sidecar",
407 }
408}
409
410fn shorten_home_path(path: &str) -> String {
411 let path = Path::new(path);
412 let Some(base_dirs) = BaseDirs::new() else {
413 return path.display().to_string();
414 };
415 let home = base_dirs.home_dir();
416 match path.strip_prefix(home) {
417 Ok(stripped) => {
418 if stripped.as_os_str().is_empty() {
419 return "~".to_string();
420 }
421 let mut shorthand = PathBuf::from("~");
422 shorthand.push(stripped);
423 shorthand.display().to_string()
424 }
425 Err(_) => path.display().to_string(),
426 }
427}
428
429fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
430 let base = match kind {
431 ProcessKind::Node => format!("node-{}", node_id),
432 ProcessKind::Sidecar => format!("sidecar-{}", node_id),
433 };
434 let stdout_link = symlink_root.join(format!("{}.stdout", base));
435 let stderr_link = symlink_root.join(format!("{}.stderr", base));
436 (
437 shorten_home_path(&stdout_link.display().to_string()),
438 shorten_home_path(&stderr_link.display().to_string()),
439 )
440}
441
442async fn print_derived_accounts_summary(layout: &AssetsLayout) {
443 if let Some(summary) = assets::derived_accounts_summary(layout).await {
444 if let Some(parsed) = parse_derived_accounts_csv(&summary) {
445 println!("derived accounts");
446 if !parsed.validators.is_empty() {
447 println!(" validators");
448 print_account_group(&parsed.validators);
449 }
450 if !parsed.users.is_empty() {
451 println!(" users");
452 print_account_group(&parsed.users);
453 }
454 if !parsed.other.is_empty() {
455 println!(" other");
456 print_account_group(&parsed.other);
457 }
458 } else {
459 println!("derived accounts");
460 for line in summary.lines() {
461 println!(" {}", line);
462 }
463 }
464 }
465}
466
467struct DerivedAccountRow {
468 name: String,
469 key_type: String,
470 derivation: String,
471 path: String,
472 account_hash: String,
473 balance: String,
474}
475
476struct DerivedAccountsParsed {
477 validators: Vec<DerivedAccountRow>,
478 users: Vec<DerivedAccountRow>,
479 other: Vec<DerivedAccountRow>,
480}
481
482fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
483 let mut lines = summary.lines();
484 let header = lines.next()?.trim();
485 if header != "kind,name,key_type,derivation,path,account_hash,balance" {
486 return None;
487 }
488
489 let mut parsed = DerivedAccountsParsed {
490 validators: Vec::new(),
491 users: Vec::new(),
492 other: Vec::new(),
493 };
494
495 for line in lines {
496 let line = line.trim();
497 if line.is_empty() {
498 continue;
499 }
500 let mut parts = line.splitn(7, ',');
501 let kind = parts.next()?.to_string();
502 let name = parts.next()?.to_string();
503 let key_type = parts.next()?.to_string();
504 let derivation = parts.next()?.to_string();
505 let path = parts.next()?.to_string();
506 let account_hash = parts.next()?.to_string();
507 let balance = parts.next()?.to_string();
508 let row = DerivedAccountRow {
509 name,
510 key_type,
511 derivation,
512 path,
513 account_hash,
514 balance,
515 };
516 match kind.as_str() {
517 "validator" => parsed.validators.push(row),
518 "user" => parsed.users.push(row),
519 _ => parsed.other.push(row),
520 }
521 }
522
523 Some(parsed)
524}
525
526fn print_account_group(rows: &[DerivedAccountRow]) {
527 for row in rows {
528 println!(" {}:", row.name);
529 println!(" key_type: {}", row.key_type);
530 println!(" derivation: {}", row.derivation);
531 println!(" path: {}", row.path);
532 println!(" account_hash: {}", row.account_hash);
533 println!(" balance: {}", row.balance);
534 }
535}
536
537fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
538 let mut nodes = HashSet::new();
539 for process in processes {
540 nodes.insert(process.record.node_id);
541 }
542 let mut ids: Vec<u32> = nodes.into_iter().collect();
543 ids.sort_unstable();
544 ids
545}
546
547enum RunEvent {
548 CtrlC,
549 ProcessExit {
550 id: String,
551 pid: Option<u32>,
552 code: Option<i32>,
553 signal: Option<i32>,
554 },
555}
556
557fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
558 tokio::spawn(async move {
559 if tokio::signal::ctrl_c().await.is_ok() {
560 let _ = tx.send(RunEvent::CtrlC);
561 }
562 });
563}
564
565fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
566 for running in processes {
567 let tx = tx.clone();
568 tokio::spawn(async move {
569 let id = running.record.id.clone();
570 match running.handle {
571 ProcessHandle::Child(mut child) => {
572 if let Ok(status) = child.wait().await {
573 let pid = record_pid(&running.record).or_else(|| child.id());
574 let code = status.code();
575 let signal = status.signal();
576 let _ = tx.send(RunEvent::ProcessExit {
577 id: id.clone(),
578 pid,
579 code,
580 signal,
581 });
582 }
583 }
584 ProcessHandle::Task(handle) => {
585 let status = handle.await;
586 let pid = record_pid(&running.record);
587 let (code, signal) = match status {
588 Ok(Ok(())) => (Some(0), None),
589 Ok(Err(_)) => (None, None),
590 Err(_) => (None, None),
591 };
592 let _ = tx.send(RunEvent::ProcessExit {
593 id: id.clone(),
594 pid,
595 code,
596 signal,
597 });
598 }
599 }
600 });
601 }
602}
603
604const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
605const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
606
607struct SseHealth {
608 expected_nodes: HashSet<u32>,
609 versions: HashMap<u32, String>,
610 announced: bool,
611 block_seen: bool,
612 sse_spinner: Option<Spinner>,
613 block_spinner: Option<Spinner>,
614 details: String,
615}
616
617impl SseHealth {
618 fn new(node_ids: Vec<u32>, details: String) -> Self {
619 Self {
620 expected_nodes: node_ids.into_iter().collect(),
621 versions: HashMap::new(),
622 announced: false,
623 block_seen: false,
624 sse_spinner: None,
625 block_spinner: None,
626 details,
627 }
628 }
629}
630
631async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
632 if node_id != 1 {
633 return false;
634 }
635 let state = health.lock().await;
636 state.announced
637}
638
639fn start_spinner(message: &str) -> Spinner {
640 Spinner::new(Spinners::Dots, message.to_string())
641}
642
643async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
644 let mut state = health.lock().await;
645 if state.sse_spinner.is_none() {
646 state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
647 }
648}
649
650async fn spawn_sse_listeners(
651 layout: AssetsLayout,
652 node_ids: &[u32],
653 health: Arc<Mutex<SseHealth>>,
654 state: Arc<Mutex<State>>,
655) {
656 for node_id in node_ids {
657 let node_id = *node_id;
658 let endpoint = assets::sse_endpoint(node_id);
659 let layout = layout.clone();
660 let health = Arc::clone(&health);
661 let state = Arc::clone(&state);
662 tokio::spawn(async move {
663 run_sse_listener(node_id, endpoint, health, state, layout).await;
664 });
665 }
666}
667
668async fn run_sse_listener(
669 node_id: u32,
670 endpoint: String,
671 health: Arc<Mutex<SseHealth>>,
672 state: Arc<Mutex<State>>,
673 layout: AssetsLayout,
674) {
675 let mut backoff = ExponentialBackoff::default();
676
677 loop {
678 let config = match ListenerConfig::builder()
679 .with_endpoint(endpoint.clone())
680 .build()
681 {
682 Ok(config) => config,
683 Err(_) => {
684 if !sleep_backoff(&mut backoff).await {
685 return;
686 }
687 continue;
688 }
689 };
690
691 let stream = match sse::listener(config).await {
692 Ok(stream) => {
693 backoff.reset();
694 stream
695 }
696 Err(_) => {
697 if !sleep_backoff(&mut backoff).await {
698 return;
699 }
700 continue;
701 }
702 };
703
704 futures::pin_mut!(stream);
705 let mut stream_failed = false;
706 while let Some(event) = stream.next().await {
707 match event {
708 Ok(sse_event) => match sse_event {
709 SseEvent::ApiVersion(version) => {
710 record_api_version(node_id, version.to_string(), &health).await;
711 }
712 SseEvent::BlockAdded { block_hash, block } => {
713 if node_id == 1
714 && let Err(err) = record_last_block_height(&state, block.height()).await
715 {
716 eprintln!("warning: failed to record last block height: {}", err);
717 }
718 if should_log_primary(node_id, &health).await {
719 mark_block_seen(&health, &layout).await;
720 let prefix = timestamp_prefix();
721 println!(
722 "{} Block {} added (height={} era={})",
723 prefix,
724 block_hash,
725 block.height(),
726 block.era_id().value()
727 );
728 }
729 }
730 SseEvent::TransactionAccepted(transaction) => {
731 if node_id == 1 {
732 let prefix = timestamp_prefix();
733 println!("{} Transaction {} accepted", prefix, transaction.hash());
734 }
735 }
736 SseEvent::TransactionProcessed {
737 transaction_hash,
738 execution_result,
739 messages,
740 ..
741 } => {
742 if node_id == 1 {
743 let tx_hash = transaction_hash.to_string();
744 let prefix = timestamp_prefix();
745 log_transaction_processed(
746 &prefix,
747 &tx_hash,
748 &execution_result,
749 &messages,
750 );
751 }
752 }
753 _ => {}
754 },
755 Err(_) => {
756 stream_failed = true;
757 break;
758 }
759 }
760 }
761
762 if stream_failed && !sleep_backoff(&mut backoff).await {
763 return;
764 }
765 }
766}
767
768async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
769 let (summary, details, sse_spinner) = {
770 let mut state = health.lock().await;
771 if !state.expected_nodes.contains(&node_id) {
772 return;
773 }
774 state.versions.insert(node_id, version);
775 if state.announced || state.versions.len() != state.expected_nodes.len() {
776 return;
777 }
778
779 let summary = version_summary(&state.versions);
780 let details = state.details.clone();
781 let sse_spinner = state.sse_spinner.take();
782 if state.block_spinner.is_none() {
783 state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
784 }
785 state.announced = true;
786 state.block_seen = false;
787 (summary, details, sse_spinner)
788 };
789
790 if let Some(mut spinner) = sse_spinner {
791 spinner.stop_with_message("SSE connection established.".to_string());
792 }
793 println!("Network is healthy ({})", summary);
794 println!("{}", details);
795}
796
797async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
798 let (block_spinner, node_ids) = {
799 let mut state = health.lock().await;
800 if state.block_seen {
801 return;
802 }
803 state.block_seen = true;
804 (
805 state.block_spinner.take(),
806 state.expected_nodes.iter().copied().collect::<Vec<_>>(),
807 )
808 };
809
810 if let Some(mut spinner) = block_spinner {
811 spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
812 }
813
814 match assets::remove_consensus_keys(layout, &node_ids).await {
815 Ok(removed) => {
816 if removed > 0 {
817 println!("Consensus secret keys removed from disk.");
818 }
819 }
820 Err(err) => {
821 eprintln!("warning: failed to remove consensus secret keys: {}", err);
822 }
823 }
824}
825
826async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
827 let mut state = state.lock().await;
828 if state.last_block_height == Some(height) {
829 return Ok(());
830 }
831 state.last_block_height = Some(height);
832 state.touch().await?;
833 Ok(())
834}
835
836fn version_summary(versions: &HashMap<u32, String>) -> String {
837 let mut unique: Vec<String> = versions.values().cloned().collect();
838 unique.sort();
839 unique.dedup();
840 if unique.len() == 1 {
841 format!("version {}", unique[0])
842 } else {
843 format!("versions {}", unique.join(", "))
844 }
845}
846
847async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
848 if let Some(delay) = backoff.next_backoff() {
849 tokio::time::sleep(delay).await;
850 return true;
851 }
852 false
853}
854
855fn log_transaction_processed(
856 prefix: &str,
857 transaction_hash: &str,
858 execution_result: &ExecutionResult,
859 messages: &[casper_types::contract_messages::Message],
860) {
861 let consumed = execution_result.consumed();
862 let consumed_cspr = format_cspr_u512(&consumed);
863 if let Some(error) = execution_result.error_message() {
864 println!(
865 "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
866 prefix, transaction_hash, error, consumed, consumed_cspr
867 );
868 } else {
869 println!(
870 "{} Transaction {} processed succeeded gas={} gas_cspr={}",
871 prefix, transaction_hash, consumed, consumed_cspr
872 );
873 }
874
875 for message in messages {
876 let entity = message.entity_addr().to_formatted_string();
877 let topic = message.topic_name();
878 let payload = format_message_payload(message.payload());
879 println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
880 }
881}
882
883fn timestamp_prefix() -> String {
884 time::OffsetDateTime::now_utc()
885 .format(&time::format_description::well_known::Rfc3339)
886 .unwrap_or_else(|_| "unknown-time".to_string())
887}
888
889fn format_message_payload(payload: &MessagePayload) -> String {
890 match payload {
891 MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
892 MessagePayload::String(value) => format!("{:?}", value),
893 }
894}
895
896fn encode_hex(bytes: &[u8]) -> String {
897 let mut out = String::with_capacity(bytes.len() * 2);
898 for byte in bytes {
899 use std::fmt::Write;
900 let _ = write!(&mut out, "{:02x}", byte);
901 }
902 out
903}
904
905fn format_cspr_u512(motes: &U512) -> String {
906 let motes_str = motes.to_string();
907 let digits = motes_str.len();
908 if digits <= 9 {
909 let frac = format!("{:0>9}", motes_str);
910 let frac = frac.trim_end_matches('0');
911 if frac.is_empty() {
912 return "0".to_string();
913 }
914 return format!("0.{}", frac);
915 }
916
917 let split = digits - 9;
918 let (whole, frac) = motes_str.split_at(split);
919 let frac = frac.trim_end_matches('0');
920 if frac.is_empty() {
921 return whole.to_string();
922 }
923 format!("{}.{}", whole, frac)
924}
925
926async fn update_exited_process(
927 state: &mut State,
928 id: &str,
929 code: Option<i32>,
930 signal: Option<i32>,
931) -> Result<()> {
932 for record in &mut state.processes {
933 if record.id == id {
934 record.last_status = ProcessStatus::Exited;
935 record.exit_code = code;
936 record.exit_signal = signal;
937 record.stopped_at = Some(time::OffsetDateTime::now_utc());
938 break;
939 }
940 }
941 state.touch().await?;
942 Ok(())
943}
944
945fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
946 if let Some(pid) = pid {
947 if let Some(signal) = signal {
948 println!(
949 "process {} (pid {}) exited due to signal {}",
950 id, pid, signal
951 );
952 } else if let Some(code) = code {
953 println!("process {} (pid {}) exited with code {}", id, pid, code);
954 } else {
955 println!("process {} (pid {}) exited", id, pid);
956 }
957 } else if let Some(signal) = signal {
958 println!("process {} exited due to signal {}", id, signal);
959 } else if let Some(code) = code {
960 println!("process {} exited with code {}", id, code);
961 } else {
962 println!("process {} exited", id);
963 }
964}
965
966async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
967 let total_nodes = layout.count_nodes().await.unwrap_or(0);
968 let target = format!("all nodes ({})", total_nodes);
969 let sidecars = processes
970 .iter()
971 .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
972 .count();
973 println!(
974 "started {} process(es) for {} (sidecars: {})",
975 processes.len(),
976 target,
977 sidecars
978 );
979}
980
981fn looks_like_url(path: &Path) -> bool {
982 let value = path.to_string_lossy();
983 value.starts_with("http://") || value.starts_with("https://")
984}
985
986async fn is_dir(path: &Path) -> bool {
987 tokio_fs::metadata(path)
988 .await
989 .map(|meta| meta.is_dir())
990 .unwrap_or(false)
991}
992
993async fn run_assets(args: AssetsArgs) -> Result<()> {
994 match args.command {
995 AssetsCommand::Add(add) => run_assets_add(add).await,
996 AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
997 AssetsCommand::List => run_assets_list().await,
998 }
999}
1000
1001async fn run_is_ready(args: IsReadyArgs) -> Result<()> {
1002 let assets_root = match &args.net_path {
1003 Some(path) => path.clone(),
1004 None => assets::default_assets_root()?,
1005 };
1006 let layout = AssetsLayout::new(assets_root, args.network_name);
1007 let argv0 = std::env::args()
1008 .next()
1009 .unwrap_or_else(|| "casper-devnet".to_string());
1010 let setup_cmd = format!("{} start --setup-only", argv0);
1011 let net_dir = layout.net_dir();
1012 if !is_dir(&net_dir).await {
1013 return Err(anyhow!(
1014 "assets for {} not found; run `{}`",
1015 layout.network_name(),
1016 setup_cmd
1017 ));
1018 }
1019
1020 let state_path = net_dir.join(STATE_FILE_NAME);
1021 let contents = match tokio_fs::read_to_string(&state_path).await {
1022 Ok(contents) => contents,
1023 Err(_) => return Err(anyhow!("network is not ready yet")),
1024 };
1025 let state =
1026 match tokio::task::spawn_blocking(move || serde_json::from_str::<State>(&contents)).await {
1027 Ok(Ok(state)) => state,
1028 _ => return Err(anyhow!("network is not ready yet")),
1029 };
1030
1031 ensure_processes_running(&state)?;
1032
1033 if state.last_block_height.is_none() {
1034 return Err(anyhow!("network is not ready yet"));
1035 }
1036
1037 ensure_rest_ready(&state).await?;
1038 Ok(())
1039}
1040
1041async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
1042 if looks_like_url(&args.path) {
1043 return Err(anyhow!(
1044 "assets URL is not supported yet; provide a local .tar.gz path"
1045 ));
1046 }
1047 assets::install_assets_bundle(&args.path).await?;
1048 println!(
1049 "assets installed into {}",
1050 assets::assets_bundle_root()?.display()
1051 );
1052 Ok(())
1053}
1054
1055async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
1056 assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
1057 Ok(())
1058}
1059
1060async fn run_assets_list() -> Result<()> {
1061 let mut versions = assets::list_bundle_versions().await?;
1062 if versions.is_empty() {
1063 return Err(anyhow!("no assets bundles found"));
1064 }
1065 versions.sort_by(|a, b| b.cmp(a));
1066 for version in versions {
1067 println!("{}", version);
1068 }
1069 Ok(())
1070}
1071
1072async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
1073 if let Some(raw) = candidate {
1074 let version = assets::parse_protocol_version(raw)?;
1075 if !assets::has_bundle_version(&version).await? {
1076 let argv0 = std::env::args()
1077 .next()
1078 .unwrap_or_else(|| "casper-devnet".to_string());
1079 let pull_cmd = format!("{} assets pull", argv0);
1080 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1081 return Err(anyhow!(
1082 "assets for version {} not found; run `{}` or `{}`",
1083 version,
1084 pull_cmd,
1085 add_cmd
1086 ));
1087 }
1088 return Ok(version.to_string());
1089 }
1090 let versions = assets::list_bundle_versions().await?;
1091 if versions.is_empty() {
1092 let argv0 = std::env::args()
1093 .next()
1094 .unwrap_or_else(|| "casper-devnet".to_string());
1095 let pull_cmd = format!("{} assets pull", argv0);
1096 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1097 return Err(anyhow!(
1098 "no assets found; run `{}` or `{}`",
1099 pull_cmd,
1100 add_cmd
1101 ));
1102 }
1103 let version = versions
1104 .into_iter()
1105 .max()
1106 .expect("non-empty assets versions");
1107 Ok(version.to_string())
1108}
1109
1110fn ensure_processes_running(state: &State) -> Result<()> {
1111 if state.processes.is_empty() {
1112 return Err(anyhow!("network is not ready yet"));
1113 }
1114 for process in &state.processes {
1115 if !matches!(process.last_status, ProcessStatus::Running) {
1116 return Err(anyhow!("network is not ready yet"));
1117 }
1118 let pid = match process.pid {
1119 Some(pid) => pid,
1120 None => return Err(anyhow!("network is not ready yet")),
1121 };
1122 if !is_pid_running(pid) {
1123 return Err(anyhow!("network is not ready yet"));
1124 }
1125 }
1126 Ok(())
1127}
1128
1129fn is_pid_running(pid: u32) -> bool {
1130 let pid = Pid::from_raw(pid as i32);
1131 match kill(pid, None) {
1132 Ok(()) => true,
1133 Err(Errno::ESRCH) => false,
1134 Err(_) => true,
1135 }
1136}
1137
1138async fn ensure_rest_ready(state: &State) -> Result<()> {
1139 let node_ids: HashSet<u32> = state
1140 .processes
1141 .iter()
1142 .filter_map(|process| {
1143 if matches!(process.kind, ProcessKind::Node) {
1144 Some(process.node_id)
1145 } else {
1146 None
1147 }
1148 })
1149 .collect();
1150 if node_ids.is_empty() {
1151 return Err(anyhow!("network is not ready yet"));
1152 }
1153
1154 let client = reqwest::Client::builder()
1155 .no_proxy()
1156 .timeout(Duration::from_secs(2))
1157 .build()?;
1158
1159 for node_id in node_ids {
1160 let url = format!("{}/status", assets::rest_endpoint(node_id));
1161 let response = match client.get(&url).send().await {
1162 Ok(response) => response,
1163 Err(_) => return Err(anyhow!("network is not ready yet")),
1164 };
1165 if response.status() != reqwest::StatusCode::OK {
1166 return Err(anyhow!("network is not ready yet"));
1167 }
1168 let status = match response.json::<NodeStatus>().await {
1169 Ok(status) => status,
1170 Err(_) => return Err(anyhow!("network is not ready yet")),
1171 };
1172 if !status
1173 .reactor_state
1174 .as_deref()
1175 .map(is_ready_reactor_state)
1176 .unwrap_or(false)
1177 {
1178 return Err(anyhow!("network is not ready yet"));
1179 }
1180 }
1181 Ok(())
1182}
1183
1184fn is_ready_reactor_state(state: &str) -> bool {
1185 state == "Validate"
1186}
1187
1188#[derive(Deserialize)]
1189#[allow(dead_code)]
1190#[serde(rename_all = "snake_case")]
1191struct NodeStatus {
1192 peers: Option<Vec<NodePeer>>,
1193 api_version: Option<String>,
1194 build_version: Option<String>,
1195 chainspec_name: Option<String>,
1196 starting_state_root_hash: Option<String>,
1197 last_added_block_info: Option<serde_json::Value>,
1198 our_public_signing_key: Option<String>,
1199 round_length: Option<serde_json::Value>,
1200 next_upgrade: Option<serde_json::Value>,
1201 uptime: Option<String>,
1202 reactor_state: Option<String>,
1203 last_progress: Option<String>,
1204 available_block_range: Option<BlockRange>,
1205 block_sync: Option<BlockSync>,
1206 latest_switch_block_hash: Option<serde_json::Value>,
1207}
1208
1209#[derive(Deserialize)]
1210#[allow(dead_code)]
1211#[serde(rename_all = "snake_case")]
1212struct NodePeer {
1213 node_id: Option<String>,
1214 address: Option<String>,
1215}
1216
1217#[derive(Deserialize)]
1218#[allow(dead_code)]
1219#[serde(rename_all = "snake_case")]
1220struct BlockRange {
1221 low: Option<u64>,
1222 high: Option<u64>,
1223}
1224
1225#[derive(Deserialize)]
1226#[allow(dead_code)]
1227#[serde(rename_all = "snake_case")]
1228struct BlockSync {
1229 historical: Option<serde_json::Value>,
1230 forward: Option<serde_json::Value>,
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235 use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1236 use casper_types::U512;
1237 use casper_types::contract_messages::MessagePayload;
1238 use directories::BaseDirs;
1239
1240 #[test]
1241 fn format_cspr_u512_handles_whole_and_fractional() {
1242 assert_eq!(format_cspr_u512(&U512::zero()), "0");
1243 assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1244 assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1245 assert_eq!(
1246 format_cspr_u512(&U512::from(1_000_000_001u64)),
1247 "1.000000001"
1248 );
1249 assert_eq!(
1250 format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1251 "123"
1252 );
1253 assert_eq!(
1254 format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1255 "123.000000456"
1256 );
1257 }
1258
1259 #[test]
1260 fn format_message_payload_renders_string_with_quotes() {
1261 let payload = MessagePayload::String("hello".to_string());
1262 assert_eq!(format_message_payload(&payload), "\"hello\"");
1263 }
1264
1265 #[test]
1266 fn encode_hex_renders_lowercase() {
1267 assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1268 }
1269
1270 #[test]
1271 fn shorten_home_path_replaces_home_prefix() {
1272 let Some(base_dirs) = BaseDirs::new() else {
1273 return;
1274 };
1275 let home = base_dirs.home_dir();
1276 let shortened = shorten_home_path(&home.to_string_lossy());
1277 assert_eq!(shortened, "~");
1278
1279 let nested = home.join("devnet/logs/stdout.log");
1280 let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1281 assert!(shortened_nested.starts_with("~"));
1282 assert!(shortened_nested.contains("devnet"));
1283 }
1284
1285 #[test]
1286 fn shorten_home_path_keeps_relative_paths() {
1287 let input = "relative/path";
1288 assert_eq!(shorten_home_path(input), input);
1289 }
1290}