casper_devnet/
cli.rs

1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
3use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, STATE_FILE_NAME, State};
4use anyhow::{Result, anyhow};
5use backoff::ExponentialBackoff;
6use backoff::backoff::Backoff;
7use casper_types::U512;
8use casper_types::contract_messages::MessagePayload;
9use casper_types::execution::ExecutionResult;
10use clap::{Args, Parser, Subcommand};
11use directories::BaseDirs;
12use futures::StreamExt;
13use nix::errno::Errno;
14use nix::sys::signal::kill;
15use nix::unistd::Pid;
16use serde::Deserialize;
17use spinners::{Spinner, Spinners};
18use std::collections::{HashMap, HashSet};
19use std::os::unix::process::ExitStatusExt;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::Ordering;
23use std::time::Duration;
24use tokio::fs as tokio_fs;
25use tokio::sync::Mutex;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27use veles_casper_rust_sdk::sse::event::SseEvent;
28use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
29
30/// CLI entrypoint for the devnet launcher.
31#[derive(Parser)]
32#[command(name = "nctl")]
33#[command(
34    about = "casper-devnet launcher for local Casper Network development networks",
35    long_about = None
36)]
37pub struct Cli {
38    #[command(subcommand)]
39    command: Command,
40}
41
42/// Top-level CLI subcommands.
43#[derive(Subcommand)]
44enum Command {
45    /// Setup assets (if needed) and start the devnet.
46    Start(StartArgs),
47    /// Manage assets bundles.
48    Assets(AssetsArgs),
49    /// Check whether a network has observed a block yet.
50    IsReady(IsReadyArgs),
51}
52
53/// Arguments for `nctl start`.
54#[derive(Args, Clone)]
55struct StartArgs {
56    /// Network name used in assets paths and configs.
57    #[arg(long, default_value = "casper-dev")]
58    network_name: String,
59
60    /// Override the base path for network runtime assets.
61    #[arg(long, value_name = "PATH")]
62    net_path: Option<PathBuf>,
63
64    /// Protocol version to use from the assets store (e.g. 2.1.1).
65    #[arg(long)]
66    protocol_version: Option<String>,
67
68    /// Number of nodes to create and start.
69    #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
70    node_count: u32,
71
72    /// Number of user accounts to generate (defaults to node count).
73    #[arg(long)]
74    users: Option<u32>,
75
76    /// Genesis activation delay in seconds.
77    #[arg(long, default_value_t = 3)]
78    delay: u64,
79
80    /// Log level for child processes (passed as `RUST_LOG`).
81    #[arg(long = "log-level", default_value = "info")]
82    log_level: String,
83
84    /// Log format for node config files.
85    #[arg(long, default_value = "json")]
86    node_log_format: String,
87
88    /// Create assets and exit without starting processes.
89    #[arg(long)]
90    setup_only: bool,
91
92    /// Rebuild assets even if they already exist.
93    #[arg(long)]
94    force_setup: bool,
95
96    /// Deterministic seed for devnet key generation.
97    #[arg(long, default_value = "default")]
98    seed: Arc<str>,
99}
100
101/// Asset management arguments.
102#[derive(Args)]
103struct AssetsArgs {
104    #[command(subcommand)]
105    command: AssetsCommand,
106}
107
108/// Asset management subcommands.
109#[derive(Subcommand)]
110enum AssetsCommand {
111    /// Extract a local assets bundle into the assets store.
112    Add(AssetsAddArgs),
113    /// Download assets bundles from the upstream release.
114    Pull(AssetsPullArgs),
115    /// List available protocol versions in the assets store.
116    List,
117}
118
119/// Arguments for `nctl assets add`.
120#[derive(Args, Clone)]
121struct AssetsAddArgs {
122    /// Path to a local assets bundle (.tar.gz).
123    #[arg(value_name = "PATH")]
124    path: PathBuf,
125}
126
127/// Arguments for `nctl assets pull`.
128#[derive(Args, Clone)]
129struct AssetsPullArgs {
130    /// Target triple to select from release assets.
131    #[arg(long)]
132    target: Option<String>,
133
134    /// Re-download and replace any existing assets.
135    #[arg(long)]
136    force: bool,
137}
138
139/// Arguments for `nctl is-ready`.
140#[derive(Args, Clone)]
141struct IsReadyArgs {
142    /// Network name used in assets paths and configs.
143    #[arg(long, default_value = "casper-dev")]
144    network_name: String,
145
146    /// Override the base path for network runtime assets.
147    #[arg(long, value_name = "PATH")]
148    net_path: Option<PathBuf>,
149}
150
151/// Parses CLI and runs the selected subcommand.
152pub async fn run() -> Result<()> {
153    let cli = Cli::parse();
154    match cli.command {
155        Command::Start(args) => run_start(args).await,
156        Command::Assets(args) => run_assets(args).await,
157        Command::IsReady(args) => run_is_ready(args).await,
158    }
159}
160
161async fn run_start(args: StartArgs) -> Result<()> {
162    let assets_root = match &args.net_path {
163        Some(path) => path.clone(),
164        None => assets::default_assets_root()?,
165    };
166    let layout = AssetsLayout::new(assets_root, args.network_name.clone());
167    let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
168    println!("assets path: {}", assets_path);
169    let assets_exist = layout.exists().await;
170    if !args.setup_only && !args.force_setup && assets_exist {
171        println!("resuming network operations on {}", layout.network_name());
172    }
173    let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
174
175    if args.setup_only {
176        return run_setup_only(&layout, &args, &protocol_version).await;
177    }
178
179    if args.force_setup {
180        assets::teardown(&layout).await?;
181        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
182    } else if !assets_exist {
183        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
184    }
185
186    if !layout.exists().await {
187        return Err(anyhow!(
188            "assets missing under {}; run with --setup-only to create them",
189            shorten_home_path(&layout.net_dir().display().to_string())
190        ));
191    }
192
193    if !args.force_setup && assets_exist {
194        let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
195        if restored > 0 {
196            println!("recreated consensus keys for {} node(s)", restored);
197        }
198    }
199
200    let rust_log = args.log_level.clone();
201
202    let plan = StartPlan { rust_log };
203
204    let state_path = layout.net_dir().join(STATE_FILE_NAME);
205    let state = Arc::new(Mutex::new(State::new(state_path).await?));
206    let started = {
207        let mut state = state.lock().await;
208        process::start(&layout, &plan, &mut state).await?
209    };
210
211    print_pids(&started);
212    print_start_banner(&layout, &started).await;
213    print_derived_accounts_summary(&layout).await;
214
215    let node_ids = unique_node_ids(&started);
216    let details = format_network_details(&layout, &started).await;
217    let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
218    start_sse_spinner(&health).await;
219    spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
220
221    let (event_tx, mut event_rx) = unbounded_channel();
222    spawn_ctrlc_listener(event_tx.clone());
223    spawn_exit_watchers(started, event_tx);
224
225    if let Some(event) = event_rx.recv().await {
226        match event {
227            RunEvent::CtrlC => {
228                let mut state = state.lock().await;
229                process::stop(&mut state).await?;
230            }
231            RunEvent::ProcessExit {
232                id,
233                pid,
234                code,
235                signal,
236            } => {
237                let mut state = state.lock().await;
238                update_exited_process(&mut state, &id, code, signal).await?;
239                log_exit(&id, pid, code, signal);
240                process::stop(&mut state).await?;
241            }
242        }
243    }
244
245    Ok(())
246}
247
248async fn run_setup_only(
249    layout: &AssetsLayout,
250    args: &StartArgs,
251    protocol_version: &str,
252) -> Result<()> {
253    if args.force_setup {
254        assets::teardown(layout).await?;
255        assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
256        print_derived_accounts_summary(layout).await;
257        return Ok(());
258    }
259
260    if layout.exists().await {
261        println!(
262            "assets already exist at {}; use --force-setup to rebuild",
263            shorten_home_path(&layout.net_dir().display().to_string())
264        );
265        print_derived_accounts_summary(layout).await;
266        return Ok(());
267    }
268
269    assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
270    print_derived_accounts_summary(layout).await;
271    Ok(())
272}
273
274fn record_pid(record: &ProcessRecord) -> Option<u32> {
275    if let Some(handle) = &record.pid_handle {
276        let pid = handle.load(Ordering::SeqCst);
277        if pid != 0 {
278            return Some(pid);
279        }
280    }
281    record.pid
282}
283
284fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
285    SetupOptions {
286        nodes: args.node_count,
287        users: args.users,
288        delay_seconds: args.delay,
289        network_name: args.network_name.clone(),
290        protocol_version: protocol_version.to_string(),
291        node_log_format: args.node_log_format.clone(),
292        seed: Arc::clone(&args.seed),
293    }
294}
295
296fn print_pids(records: &[RunningProcess]) {
297    for record in records {
298        if let Some(pid) = record_pid(&record.record) {
299            println!(
300                "{} pid={} ({:?})",
301                record.record.id, pid, record.record.kind
302            );
303        }
304    }
305}
306
307async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
308    let symlink_root = layout.net_dir();
309    let mut node_pids: HashMap<u32, u32> = HashMap::new();
310    let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
311    let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
312
313    for process in processes {
314        if let Some(pid) = record_pid(&process.record) {
315            match process.record.kind {
316                ProcessKind::Node => {
317                    node_pids.insert(process.record.node_id, pid);
318                }
319                ProcessKind::Sidecar => {
320                    sidecar_pids.insert(process.record.node_id, pid);
321                }
322            }
323            process_logs
324                .entry(process.record.node_id)
325                .or_default()
326                .push((process.record.kind.clone(), pid));
327        }
328    }
329
330    let node_ids = unique_node_ids(processes);
331
332    let mut lines = Vec::new();
333    lines.push("network details".to_string());
334    for node_id in node_ids {
335        let node_pid = node_pids
336            .get(&node_id)
337            .map(|pid| pid.to_string())
338            .unwrap_or_else(|| "-".to_string());
339        let sidecar_pid = sidecar_pids
340            .get(&node_id)
341            .map(|pid| pid.to_string())
342            .unwrap_or_else(|| "-".to_string());
343        lines.push(format!("  node-{}", node_id));
344        lines.push(format!(
345            "    pids: node={} sidecar={}",
346            node_pid, sidecar_pid
347        ));
348        if let Some(entries) = process_logs.get(&node_id) {
349            let mut entries = entries.clone();
350            entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
351            lines.push("    logs".to_string());
352            for (kind, pid) in entries {
353                let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
354                lines.push(format!(
355                    "      {} pid={} stdout={} stderr={}",
356                    process_kind_label(&kind),
357                    pid,
358                    stdout_link,
359                    stderr_link
360                ));
361            }
362        }
363        lines.push("    endpoints".to_string());
364        lines.push(format!("      rest:   {}", assets::rest_endpoint(node_id)));
365        lines.push(format!("      sse:    {}", assets::sse_endpoint(node_id)));
366        lines.push(format!("      rpc:    {}", assets::rpc_endpoint(node_id)));
367        lines.push(format!("      binary: {}", assets::binary_address(node_id)));
368        lines.push(format!(
369            "      gossip: {}",
370            assets::network_address(node_id)
371        ));
372    }
373
374    lines.join("\n")
375}
376
377fn process_kind_label(kind: &ProcessKind) -> &'static str {
378    match kind {
379        ProcessKind::Node => "node",
380        ProcessKind::Sidecar => "sidecar",
381    }
382}
383
384fn shorten_home_path(path: &str) -> String {
385    let path = Path::new(path);
386    let Some(base_dirs) = BaseDirs::new() else {
387        return path.display().to_string();
388    };
389    let home = base_dirs.home_dir();
390    match path.strip_prefix(home) {
391        Ok(stripped) => {
392            if stripped.as_os_str().is_empty() {
393                return "~".to_string();
394            }
395            let mut shorthand = PathBuf::from("~");
396            shorthand.push(stripped);
397            shorthand.display().to_string()
398        }
399        Err(_) => path.display().to_string(),
400    }
401}
402
403fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
404    let base = match kind {
405        ProcessKind::Node => format!("node-{}", node_id),
406        ProcessKind::Sidecar => format!("sidecar-{}", node_id),
407    };
408    let stdout_link = symlink_root.join(format!("{}.stdout", base));
409    let stderr_link = symlink_root.join(format!("{}.stderr", base));
410    (
411        shorten_home_path(&stdout_link.display().to_string()),
412        shorten_home_path(&stderr_link.display().to_string()),
413    )
414}
415
416async fn print_derived_accounts_summary(layout: &AssetsLayout) {
417    if let Some(summary) = assets::derived_accounts_summary(layout).await {
418        if let Some(parsed) = parse_derived_accounts_csv(&summary) {
419            println!("derived accounts");
420            if !parsed.validators.is_empty() {
421                println!("  validators");
422                print_account_group(&parsed.validators);
423            }
424            if !parsed.users.is_empty() {
425                println!("  users");
426                print_account_group(&parsed.users);
427            }
428            if !parsed.other.is_empty() {
429                println!("  other");
430                print_account_group(&parsed.other);
431            }
432        } else {
433            println!("derived accounts");
434            for line in summary.lines() {
435                println!("  {}", line);
436            }
437        }
438    }
439}
440
441struct DerivedAccountRow {
442    name: String,
443    key_type: String,
444    derivation: String,
445    path: String,
446    account_hash: String,
447    balance: String,
448}
449
450struct DerivedAccountsParsed {
451    validators: Vec<DerivedAccountRow>,
452    users: Vec<DerivedAccountRow>,
453    other: Vec<DerivedAccountRow>,
454}
455
456fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
457    let mut lines = summary.lines();
458    let header = lines.next()?.trim();
459    if header != "kind,name,key_type,derivation,path,account_hash,balance" {
460        return None;
461    }
462
463    let mut parsed = DerivedAccountsParsed {
464        validators: Vec::new(),
465        users: Vec::new(),
466        other: Vec::new(),
467    };
468
469    for line in lines {
470        let line = line.trim();
471        if line.is_empty() {
472            continue;
473        }
474        let mut parts = line.splitn(7, ',');
475        let kind = parts.next()?.to_string();
476        let name = parts.next()?.to_string();
477        let key_type = parts.next()?.to_string();
478        let derivation = parts.next()?.to_string();
479        let path = parts.next()?.to_string();
480        let account_hash = parts.next()?.to_string();
481        let balance = parts.next()?.to_string();
482        let row = DerivedAccountRow {
483            name,
484            key_type,
485            derivation,
486            path,
487            account_hash,
488            balance,
489        };
490        match kind.as_str() {
491            "validator" => parsed.validators.push(row),
492            "user" => parsed.users.push(row),
493            _ => parsed.other.push(row),
494        }
495    }
496
497    Some(parsed)
498}
499
500fn print_account_group(rows: &[DerivedAccountRow]) {
501    for row in rows {
502        println!("    {}:", row.name);
503        println!("      key_type: {}", row.key_type);
504        println!("      derivation: {}", row.derivation);
505        println!("      path: {}", row.path);
506        println!("      account_hash: {}", row.account_hash);
507        println!("      balance: {}", row.balance);
508    }
509}
510
511fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
512    let mut nodes = HashSet::new();
513    for process in processes {
514        nodes.insert(process.record.node_id);
515    }
516    let mut ids: Vec<u32> = nodes.into_iter().collect();
517    ids.sort_unstable();
518    ids
519}
520
521enum RunEvent {
522    CtrlC,
523    ProcessExit {
524        id: String,
525        pid: Option<u32>,
526        code: Option<i32>,
527        signal: Option<i32>,
528    },
529}
530
531fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
532    tokio::spawn(async move {
533        if tokio::signal::ctrl_c().await.is_ok() {
534            let _ = tx.send(RunEvent::CtrlC);
535        }
536    });
537}
538
539fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
540    for running in processes {
541        let tx = tx.clone();
542        tokio::spawn(async move {
543            let id = running.record.id.clone();
544            match running.handle {
545                ProcessHandle::Child(mut child) => {
546                    if let Ok(status) = child.wait().await {
547                        let pid = record_pid(&running.record).or_else(|| child.id());
548                        let code = status.code();
549                        let signal = status.signal();
550                        let _ = tx.send(RunEvent::ProcessExit {
551                            id: id.clone(),
552                            pid,
553                            code,
554                            signal,
555                        });
556                    }
557                }
558                ProcessHandle::Task(handle) => {
559                    let status = handle.await;
560                    let pid = record_pid(&running.record);
561                    let (code, signal) = match status {
562                        Ok(Ok(())) => (Some(0), None),
563                        Ok(Err(_)) => (None, None),
564                        Err(_) => (None, None),
565                    };
566                    let _ = tx.send(RunEvent::ProcessExit {
567                        id: id.clone(),
568                        pid,
569                        code,
570                        signal,
571                    });
572                }
573            }
574        });
575    }
576}
577
578const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
579const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
580
581struct SseHealth {
582    expected_nodes: HashSet<u32>,
583    versions: HashMap<u32, String>,
584    announced: bool,
585    block_seen: bool,
586    sse_spinner: Option<Spinner>,
587    block_spinner: Option<Spinner>,
588    details: String,
589}
590
591impl SseHealth {
592    fn new(node_ids: Vec<u32>, details: String) -> Self {
593        Self {
594            expected_nodes: node_ids.into_iter().collect(),
595            versions: HashMap::new(),
596            announced: false,
597            block_seen: false,
598            sse_spinner: None,
599            block_spinner: None,
600            details,
601        }
602    }
603}
604
605async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
606    if node_id != 1 {
607        return false;
608    }
609    let state = health.lock().await;
610    state.announced
611}
612
613fn start_spinner(message: &str) -> Spinner {
614    Spinner::new(Spinners::Dots, message.to_string())
615}
616
617async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
618    let mut state = health.lock().await;
619    if state.sse_spinner.is_none() {
620        state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
621    }
622}
623
624async fn spawn_sse_listeners(
625    layout: AssetsLayout,
626    node_ids: &[u32],
627    health: Arc<Mutex<SseHealth>>,
628    state: Arc<Mutex<State>>,
629) {
630    for node_id in node_ids {
631        let node_id = *node_id;
632        let endpoint = assets::sse_endpoint(node_id);
633        let layout = layout.clone();
634        let health = Arc::clone(&health);
635        let state = Arc::clone(&state);
636        tokio::spawn(async move {
637            run_sse_listener(node_id, endpoint, health, state, layout).await;
638        });
639    }
640}
641
642async fn run_sse_listener(
643    node_id: u32,
644    endpoint: String,
645    health: Arc<Mutex<SseHealth>>,
646    state: Arc<Mutex<State>>,
647    layout: AssetsLayout,
648) {
649    let mut backoff = ExponentialBackoff::default();
650
651    loop {
652        let config = match ListenerConfig::builder()
653            .with_endpoint(endpoint.clone())
654            .build()
655        {
656            Ok(config) => config,
657            Err(_) => {
658                if !sleep_backoff(&mut backoff).await {
659                    return;
660                }
661                continue;
662            }
663        };
664
665        let stream = match sse::listener(config).await {
666            Ok(stream) => {
667                backoff.reset();
668                stream
669            }
670            Err(_) => {
671                if !sleep_backoff(&mut backoff).await {
672                    return;
673                }
674                continue;
675            }
676        };
677
678        futures::pin_mut!(stream);
679        let mut stream_failed = false;
680        while let Some(event) = stream.next().await {
681            match event {
682                Ok(sse_event) => match sse_event {
683                    SseEvent::ApiVersion(version) => {
684                        record_api_version(node_id, version.to_string(), &health).await;
685                    }
686                    SseEvent::BlockAdded { block_hash, block } => {
687                        if node_id == 1
688                            && let Err(err) = record_last_block_height(&state, block.height()).await
689                        {
690                            eprintln!("warning: failed to record last block height: {}", err);
691                        }
692                        if should_log_primary(node_id, &health).await {
693                            mark_block_seen(&health, &layout).await;
694                            let prefix = timestamp_prefix();
695                            println!(
696                                "{} Block {} added (height={} era={})",
697                                prefix,
698                                block_hash,
699                                block.height(),
700                                block.era_id().value()
701                            );
702                        }
703                    }
704                    SseEvent::TransactionAccepted(transaction) => {
705                        if node_id == 1 {
706                            let prefix = timestamp_prefix();
707                            println!("{} Transaction {} accepted", prefix, transaction.hash());
708                        }
709                    }
710                    SseEvent::TransactionProcessed {
711                        transaction_hash,
712                        execution_result,
713                        messages,
714                        ..
715                    } => {
716                        if node_id == 1 {
717                            let tx_hash = transaction_hash.to_string();
718                            let prefix = timestamp_prefix();
719                            log_transaction_processed(
720                                &prefix,
721                                &tx_hash,
722                                &execution_result,
723                                &messages,
724                            );
725                        }
726                    }
727                    _ => {}
728                },
729                Err(_) => {
730                    stream_failed = true;
731                    break;
732                }
733            }
734        }
735
736        if stream_failed && !sleep_backoff(&mut backoff).await {
737            return;
738        }
739    }
740}
741
742async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
743    let (summary, details, sse_spinner) = {
744        let mut state = health.lock().await;
745        if !state.expected_nodes.contains(&node_id) {
746            return;
747        }
748        state.versions.insert(node_id, version);
749        if state.announced || state.versions.len() != state.expected_nodes.len() {
750            return;
751        }
752
753        let summary = version_summary(&state.versions);
754        let details = state.details.clone();
755        let sse_spinner = state.sse_spinner.take();
756        if state.block_spinner.is_none() {
757            state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
758        }
759        state.announced = true;
760        state.block_seen = false;
761        (summary, details, sse_spinner)
762    };
763
764    if let Some(mut spinner) = sse_spinner {
765        spinner.stop_with_message("SSE connection established.".to_string());
766    }
767    println!("Network is healthy ({})", summary);
768    println!("{}", details);
769}
770
771async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
772    let (block_spinner, node_ids) = {
773        let mut state = health.lock().await;
774        if state.block_seen {
775            return;
776        }
777        state.block_seen = true;
778        (
779            state.block_spinner.take(),
780            state.expected_nodes.iter().copied().collect::<Vec<_>>(),
781        )
782    };
783
784    if let Some(mut spinner) = block_spinner {
785        spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
786    }
787
788    match assets::remove_consensus_keys(layout, &node_ids).await {
789        Ok(removed) => {
790            if removed > 0 {
791                println!("Consensus secret keys removed from disk.");
792            }
793        }
794        Err(err) => {
795            eprintln!("warning: failed to remove consensus secret keys: {}", err);
796        }
797    }
798}
799
800async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
801    let mut state = state.lock().await;
802    if state.last_block_height == Some(height) {
803        return Ok(());
804    }
805    state.last_block_height = Some(height);
806    state.touch().await?;
807    Ok(())
808}
809
810fn version_summary(versions: &HashMap<u32, String>) -> String {
811    let mut unique: Vec<String> = versions.values().cloned().collect();
812    unique.sort();
813    unique.dedup();
814    if unique.len() == 1 {
815        format!("version {}", unique[0])
816    } else {
817        format!("versions {}", unique.join(", "))
818    }
819}
820
821async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
822    if let Some(delay) = backoff.next_backoff() {
823        tokio::time::sleep(delay).await;
824        return true;
825    }
826    false
827}
828
829fn log_transaction_processed(
830    prefix: &str,
831    transaction_hash: &str,
832    execution_result: &ExecutionResult,
833    messages: &[casper_types::contract_messages::Message],
834) {
835    let consumed = execution_result.consumed();
836    let consumed_cspr = format_cspr_u512(&consumed);
837    if let Some(error) = execution_result.error_message() {
838        println!(
839            "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
840            prefix, transaction_hash, error, consumed, consumed_cspr
841        );
842    } else {
843        println!(
844            "{} Transaction {} processed succeeded gas={} gas_cspr={}",
845            prefix, transaction_hash, consumed, consumed_cspr
846        );
847    }
848
849    for message in messages {
850        let entity = message.entity_addr().to_formatted_string();
851        let topic = message.topic_name();
852        let payload = format_message_payload(message.payload());
853        println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
854    }
855}
856
857fn timestamp_prefix() -> String {
858    time::OffsetDateTime::now_utc()
859        .format(&time::format_description::well_known::Rfc3339)
860        .unwrap_or_else(|_| "unknown-time".to_string())
861}
862
863fn format_message_payload(payload: &MessagePayload) -> String {
864    match payload {
865        MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
866        MessagePayload::String(value) => format!("{:?}", value),
867    }
868}
869
870fn encode_hex(bytes: &[u8]) -> String {
871    let mut out = String::with_capacity(bytes.len() * 2);
872    for byte in bytes {
873        use std::fmt::Write;
874        let _ = write!(&mut out, "{:02x}", byte);
875    }
876    out
877}
878
879fn format_cspr_u512(motes: &U512) -> String {
880    let motes_str = motes.to_string();
881    let digits = motes_str.len();
882    if digits <= 9 {
883        let frac = format!("{:0>9}", motes_str);
884        let frac = frac.trim_end_matches('0');
885        if frac.is_empty() {
886            return "0".to_string();
887        }
888        return format!("0.{}", frac);
889    }
890
891    let split = digits - 9;
892    let (whole, frac) = motes_str.split_at(split);
893    let frac = frac.trim_end_matches('0');
894    if frac.is_empty() {
895        return whole.to_string();
896    }
897    format!("{}.{}", whole, frac)
898}
899
900async fn update_exited_process(
901    state: &mut State,
902    id: &str,
903    code: Option<i32>,
904    signal: Option<i32>,
905) -> Result<()> {
906    for record in &mut state.processes {
907        if record.id == id {
908            record.last_status = ProcessStatus::Exited;
909            record.exit_code = code;
910            record.exit_signal = signal;
911            record.stopped_at = Some(time::OffsetDateTime::now_utc());
912            break;
913        }
914    }
915    state.touch().await?;
916    Ok(())
917}
918
919fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
920    if let Some(pid) = pid {
921        if let Some(signal) = signal {
922            println!(
923                "process {} (pid {}) exited due to signal {}",
924                id, pid, signal
925            );
926        } else if let Some(code) = code {
927            println!("process {} (pid {}) exited with code {}", id, pid, code);
928        } else {
929            println!("process {} (pid {}) exited", id, pid);
930        }
931    } else if let Some(signal) = signal {
932        println!("process {} exited due to signal {}", id, signal);
933    } else if let Some(code) = code {
934        println!("process {} exited with code {}", id, code);
935    } else {
936        println!("process {} exited", id);
937    }
938}
939
940async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
941    let total_nodes = layout.count_nodes().await.unwrap_or(0);
942    let target = format!("all nodes ({})", total_nodes);
943    let sidecars = processes
944        .iter()
945        .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
946        .count();
947    println!(
948        "started {} process(es) for {} (sidecars: {})",
949        processes.len(),
950        target,
951        sidecars
952    );
953}
954
955fn looks_like_url(path: &Path) -> bool {
956    let value = path.to_string_lossy();
957    value.starts_with("http://") || value.starts_with("https://")
958}
959
960async fn is_dir(path: &Path) -> bool {
961    tokio_fs::metadata(path)
962        .await
963        .map(|meta| meta.is_dir())
964        .unwrap_or(false)
965}
966
967async fn run_assets(args: AssetsArgs) -> Result<()> {
968    match args.command {
969        AssetsCommand::Add(add) => run_assets_add(add).await,
970        AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
971        AssetsCommand::List => run_assets_list().await,
972    }
973}
974
975async fn run_is_ready(args: IsReadyArgs) -> Result<()> {
976    let assets_root = match &args.net_path {
977        Some(path) => path.clone(),
978        None => assets::default_assets_root()?,
979    };
980    let layout = AssetsLayout::new(assets_root, args.network_name);
981    let argv0 = std::env::args()
982        .next()
983        .unwrap_or_else(|| "casper-devnet".to_string());
984    let setup_cmd = format!("{} start --setup-only", argv0);
985    let net_dir = layout.net_dir();
986    if !is_dir(&net_dir).await {
987        return Err(anyhow!(
988            "assets for {} not found; run `{}`",
989            layout.network_name(),
990            setup_cmd
991        ));
992    }
993
994    let state_path = net_dir.join(STATE_FILE_NAME);
995    let contents = match tokio_fs::read_to_string(&state_path).await {
996        Ok(contents) => contents,
997        Err(_) => return Err(anyhow!("network is not ready yet")),
998    };
999    let state =
1000        match tokio::task::spawn_blocking(move || serde_json::from_str::<State>(&contents)).await {
1001            Ok(Ok(state)) => state,
1002            _ => return Err(anyhow!("network is not ready yet")),
1003        };
1004
1005    ensure_processes_running(&state)?;
1006
1007    if state.last_block_height.is_none() {
1008        return Err(anyhow!("network is not ready yet"));
1009    }
1010
1011    ensure_rest_ready(&state).await?;
1012    Ok(())
1013}
1014
1015async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
1016    if looks_like_url(&args.path) {
1017        return Err(anyhow!(
1018            "assets URL is not supported yet; provide a local .tar.gz path"
1019        ));
1020    }
1021    assets::install_assets_bundle(&args.path).await?;
1022    println!(
1023        "assets installed into {}",
1024        assets::assets_bundle_root()?.display()
1025    );
1026    Ok(())
1027}
1028
1029async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
1030    assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
1031    Ok(())
1032}
1033
1034async fn run_assets_list() -> Result<()> {
1035    let mut versions = assets::list_bundle_versions().await?;
1036    if versions.is_empty() {
1037        return Err(anyhow!("no assets bundles found"));
1038    }
1039    versions.sort_by(|a, b| b.cmp(a));
1040    for version in versions {
1041        println!("{}", version);
1042    }
1043    Ok(())
1044}
1045
1046async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
1047    if let Some(raw) = candidate {
1048        let version = assets::parse_protocol_version(raw)?;
1049        if !assets::has_bundle_version(&version).await? {
1050            let argv0 = std::env::args()
1051                .next()
1052                .unwrap_or_else(|| "casper-devnet".to_string());
1053            let pull_cmd = format!("{} assets pull", argv0);
1054            let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1055            return Err(anyhow!(
1056                "assets for version {} not found; run `{}` or `{}`",
1057                version,
1058                pull_cmd,
1059                add_cmd
1060            ));
1061        }
1062        return Ok(version.to_string());
1063    }
1064    let versions = assets::list_bundle_versions().await?;
1065    if versions.is_empty() {
1066        let argv0 = std::env::args()
1067            .next()
1068            .unwrap_or_else(|| "casper-devnet".to_string());
1069        let pull_cmd = format!("{} assets pull", argv0);
1070        let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1071        return Err(anyhow!(
1072            "no assets found; run `{}` or `{}`",
1073            pull_cmd,
1074            add_cmd
1075        ));
1076    }
1077    let version = versions
1078        .into_iter()
1079        .max()
1080        .expect("non-empty assets versions");
1081    Ok(version.to_string())
1082}
1083
1084fn ensure_processes_running(state: &State) -> Result<()> {
1085    if state.processes.is_empty() {
1086        return Err(anyhow!("network is not ready yet"));
1087    }
1088    for process in &state.processes {
1089        if !matches!(process.last_status, ProcessStatus::Running) {
1090            return Err(anyhow!("network is not ready yet"));
1091        }
1092        let pid = match process.pid {
1093            Some(pid) => pid,
1094            None => return Err(anyhow!("network is not ready yet")),
1095        };
1096        if !is_pid_running(pid) {
1097            return Err(anyhow!("network is not ready yet"));
1098        }
1099    }
1100    Ok(())
1101}
1102
1103fn is_pid_running(pid: u32) -> bool {
1104    let pid = Pid::from_raw(pid as i32);
1105    match kill(pid, None) {
1106        Ok(()) => true,
1107        Err(Errno::ESRCH) => false,
1108        Err(_) => true,
1109    }
1110}
1111
1112async fn ensure_rest_ready(state: &State) -> Result<()> {
1113    let node_ids: HashSet<u32> = state
1114        .processes
1115        .iter()
1116        .filter_map(|process| {
1117            if matches!(process.kind, ProcessKind::Node) {
1118                Some(process.node_id)
1119            } else {
1120                None
1121            }
1122        })
1123        .collect();
1124    if node_ids.is_empty() {
1125        return Err(anyhow!("network is not ready yet"));
1126    }
1127
1128    let client = reqwest::Client::builder()
1129        .no_proxy()
1130        .timeout(Duration::from_secs(2))
1131        .build()?;
1132
1133    for node_id in node_ids {
1134        let url = format!("{}/status", assets::rest_endpoint(node_id));
1135        let response = match client.get(&url).send().await {
1136            Ok(response) => response,
1137            Err(_) => return Err(anyhow!("network is not ready yet")),
1138        };
1139        if response.status() != reqwest::StatusCode::OK {
1140            return Err(anyhow!("network is not ready yet"));
1141        }
1142        let status = match response.json::<NodeStatus>().await {
1143            Ok(status) => status,
1144            Err(_) => return Err(anyhow!("network is not ready yet")),
1145        };
1146        if !status
1147            .reactor_state
1148            .as_deref()
1149            .map(is_ready_reactor_state)
1150            .unwrap_or(false)
1151        {
1152            return Err(anyhow!("network is not ready yet"));
1153        }
1154    }
1155    Ok(())
1156}
1157
1158fn is_ready_reactor_state(state: &str) -> bool {
1159    state == "Validate"
1160}
1161
1162#[derive(Deserialize)]
1163#[allow(dead_code)]
1164#[serde(rename_all = "snake_case")]
1165struct NodeStatus {
1166    peers: Option<Vec<NodePeer>>,
1167    api_version: Option<String>,
1168    build_version: Option<String>,
1169    chainspec_name: Option<String>,
1170    starting_state_root_hash: Option<String>,
1171    last_added_block_info: Option<serde_json::Value>,
1172    our_public_signing_key: Option<String>,
1173    round_length: Option<serde_json::Value>,
1174    next_upgrade: Option<serde_json::Value>,
1175    uptime: Option<String>,
1176    reactor_state: Option<String>,
1177    last_progress: Option<String>,
1178    available_block_range: Option<BlockRange>,
1179    block_sync: Option<BlockSync>,
1180    latest_switch_block_hash: Option<serde_json::Value>,
1181}
1182
1183#[derive(Deserialize)]
1184#[allow(dead_code)]
1185#[serde(rename_all = "snake_case")]
1186struct NodePeer {
1187    node_id: Option<String>,
1188    address: Option<String>,
1189}
1190
1191#[derive(Deserialize)]
1192#[allow(dead_code)]
1193#[serde(rename_all = "snake_case")]
1194struct BlockRange {
1195    low: Option<u64>,
1196    high: Option<u64>,
1197}
1198
1199#[derive(Deserialize)]
1200#[allow(dead_code)]
1201#[serde(rename_all = "snake_case")]
1202struct BlockSync {
1203    historical: Option<serde_json::Value>,
1204    forward: Option<serde_json::Value>,
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1210    use casper_types::U512;
1211    use casper_types::contract_messages::MessagePayload;
1212    use directories::BaseDirs;
1213
1214    #[test]
1215    fn format_cspr_u512_handles_whole_and_fractional() {
1216        assert_eq!(format_cspr_u512(&U512::zero()), "0");
1217        assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1218        assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1219        assert_eq!(
1220            format_cspr_u512(&U512::from(1_000_000_001u64)),
1221            "1.000000001"
1222        );
1223        assert_eq!(
1224            format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1225            "123"
1226        );
1227        assert_eq!(
1228            format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1229            "123.000000456"
1230        );
1231    }
1232
1233    #[test]
1234    fn format_message_payload_renders_string_with_quotes() {
1235        let payload = MessagePayload::String("hello".to_string());
1236        assert_eq!(format_message_payload(&payload), "\"hello\"");
1237    }
1238
1239    #[test]
1240    fn encode_hex_renders_lowercase() {
1241        assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1242    }
1243
1244    #[test]
1245    fn shorten_home_path_replaces_home_prefix() {
1246        let Some(base_dirs) = BaseDirs::new() else {
1247            return;
1248        };
1249        let home = base_dirs.home_dir();
1250        let shortened = shorten_home_path(&home.to_string_lossy());
1251        assert_eq!(shortened, "~");
1252
1253        let nested = home.join("devnet/logs/stdout.log");
1254        let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1255        assert!(shortened_nested.starts_with("~"));
1256        assert!(shortened_nested.contains("devnet"));
1257    }
1258
1259    #[test]
1260    fn shorten_home_path_keeps_relative_paths() {
1261        let input = "relative/path";
1262        assert_eq!(shorten_home_path(input), input);
1263    }
1264}