Skip to main content

casper_devnet/
cli.rs

1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::diagnostics_port;
3use crate::mcp::{self, McpArgs};
4use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
5use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, STATE_FILE_NAME, State};
6use anyhow::{Result, anyhow};
7use backoff::ExponentialBackoff;
8use backoff::backoff::Backoff;
9use casper_types::U512;
10use casper_types::contract_messages::MessagePayload;
11use casper_types::execution::ExecutionResult;
12use clap::{Args, Parser, Subcommand};
13use directories::BaseDirs;
14use futures::StreamExt;
15use nix::errno::Errno;
16use nix::sys::signal::kill;
17use nix::unistd::Pid;
18use serde::Deserialize;
19use spinners::{Spinner, Spinners};
20use std::collections::{HashMap, HashSet};
21use std::os::unix::process::ExitStatusExt;
22use std::path::{Path, PathBuf};
23use std::sync::Arc;
24use std::sync::atomic::Ordering;
25use std::time::Duration;
26use tokio::fs as tokio_fs;
27use tokio::sync::Mutex;
28use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
29use veles_casper_rust_sdk::sse::event::SseEvent;
30use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
31
32/// CLI entrypoint for the devnet launcher.
33#[derive(Parser)]
34#[command(name = "nctl")]
35#[command(
36    about = "casper-devnet launcher for local Casper Network development networks",
37    long_about = None
38)]
39pub struct Cli {
40    #[command(subcommand)]
41    command: Command,
42}
43
44/// Top-level CLI subcommands.
45#[derive(Subcommand)]
46enum Command {
47    /// Setup assets (if needed) and start the devnet.
48    Start(StartArgs),
49    /// Run MCP control plane server.
50    Mcp(McpArgs),
51    /// Manage assets bundles.
52    Assets(AssetsArgs),
53    /// Check whether a network has observed a block yet.
54    IsReady(IsReadyArgs),
55}
56
57/// Arguments for `nctl start`.
58#[derive(Args, Clone)]
59struct StartArgs {
60    /// Network name used in assets paths and configs.
61    #[arg(long, default_value = "casper-dev")]
62    network_name: String,
63
64    /// Override the base path for network runtime assets.
65    #[arg(long, value_name = "PATH")]
66    net_path: Option<PathBuf>,
67
68    /// Protocol version to use from the assets store (e.g. 2.1.1).
69    #[arg(long)]
70    protocol_version: Option<String>,
71
72    /// Number of nodes to create and start.
73    #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
74    node_count: u32,
75
76    /// Number of user accounts to generate (defaults to node count).
77    #[arg(long)]
78    users: Option<u32>,
79
80    /// Genesis activation delay in seconds.
81    #[arg(long, default_value_t = 3)]
82    delay: u64,
83
84    /// Log level for child processes (passed as `RUST_LOG`).
85    #[arg(long = "log-level", default_value = "info")]
86    log_level: String,
87
88    /// Log format for node config files.
89    #[arg(long, default_value = "json")]
90    node_log_format: String,
91
92    /// Create assets and exit without starting processes.
93    #[arg(long)]
94    setup_only: bool,
95
96    /// Rebuild assets even if they already exist.
97    #[arg(long)]
98    force_setup: bool,
99
100    /// Deterministic seed for devnet key generation.
101    #[arg(long, default_value = "default")]
102    seed: Arc<str>,
103}
104
105/// Asset management arguments.
106#[derive(Args)]
107struct AssetsArgs {
108    #[command(subcommand)]
109    command: AssetsCommand,
110}
111
112/// Asset management subcommands.
113#[derive(Subcommand)]
114enum AssetsCommand {
115    /// Extract a local assets bundle into the assets store.
116    Add(AssetsAddArgs),
117    /// Download assets bundles from the upstream release.
118    Pull(AssetsPullArgs),
119    /// List available protocol versions in the assets store.
120    List,
121}
122
123/// Arguments for `nctl assets add`.
124#[derive(Args, Clone)]
125struct AssetsAddArgs {
126    /// Path to a local assets bundle (.tar.gz).
127    #[arg(value_name = "PATH")]
128    path: PathBuf,
129}
130
131/// Arguments for `nctl assets pull`.
132#[derive(Args, Clone)]
133struct AssetsPullArgs {
134    /// Target triple to select from release assets.
135    #[arg(long)]
136    target: Option<String>,
137
138    /// Re-download and replace any existing assets.
139    #[arg(long)]
140    force: bool,
141}
142
143/// Arguments for `nctl is-ready`.
144#[derive(Args, Clone)]
145struct IsReadyArgs {
146    /// Network name used in assets paths and configs.
147    #[arg(long, default_value = "casper-dev")]
148    network_name: String,
149
150    /// Override the base path for network runtime assets.
151    #[arg(long, value_name = "PATH")]
152    net_path: Option<PathBuf>,
153}
154
155/// Parses CLI and runs the selected subcommand.
156pub async fn run() -> Result<()> {
157    let cli = Cli::parse();
158    match cli.command {
159        Command::Start(args) => run_start(args).await,
160        Command::Mcp(args) => mcp::run(args).await,
161        Command::Assets(args) => run_assets(args).await,
162        Command::IsReady(args) => run_is_ready(args).await,
163    }
164}
165
166async fn run_start(args: StartArgs) -> Result<()> {
167    let assets_root = match &args.net_path {
168        Some(path) => path.clone(),
169        None => assets::default_assets_root()?,
170    };
171    let layout = AssetsLayout::new(assets_root, args.network_name.clone());
172    let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
173    println!("assets path: {}", assets_path);
174    let assets_exist = layout.exists().await;
175    if !args.setup_only && !args.force_setup && assets_exist {
176        println!("resuming network operations on {}", layout.network_name());
177    }
178    let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
179
180    if args.setup_only {
181        return run_setup_only(&layout, &args, &protocol_version).await;
182    }
183
184    if args.force_setup {
185        assets::teardown(&layout).await?;
186        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
187    } else if !assets_exist {
188        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
189    }
190
191    if !layout.exists().await {
192        return Err(anyhow!(
193            "assets missing under {}; run with --setup-only to create them",
194            shorten_home_path(&layout.net_dir().display().to_string())
195        ));
196    }
197
198    if !args.force_setup && assets_exist {
199        let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
200        if restored > 0 {
201            println!("recreated consensus keys for {} node(s)", restored);
202        }
203    }
204
205    let rust_log = args.log_level.clone();
206
207    let plan = StartPlan { rust_log };
208
209    let state_path = layout.net_dir().join(STATE_FILE_NAME);
210    let state = Arc::new(Mutex::new(State::new(state_path).await?));
211    let started = {
212        let mut state = state.lock().await;
213        process::start(&layout, &plan, &mut state).await?
214    };
215
216    print_pids(&started);
217    print_start_banner(&layout, &started).await;
218    print_derived_accounts_summary(&layout).await;
219
220    let node_ids = unique_node_ids(&started);
221    let details = format_network_details(&layout, &started).await;
222    let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
223    start_sse_spinner(&health).await;
224    spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
225    let mut diagnostics_proxy = match diagnostics_port::spawn(&layout).await {
226        Ok(proxy) => Some(proxy),
227        Err(err) => {
228            eprintln!("warning: failed to start diagnostics proxy: {}", err);
229            None
230        }
231    };
232
233    let (event_tx, mut event_rx) = unbounded_channel();
234    spawn_ctrlc_listener(event_tx.clone());
235    spawn_exit_watchers(started, event_tx);
236
237    if let Some(event) = event_rx.recv().await {
238        match event {
239            RunEvent::CtrlC => {
240                if let Some(proxy) = diagnostics_proxy.take() {
241                    proxy.shutdown();
242                }
243                let mut state = state.lock().await;
244                process::stop(&mut state).await?;
245            }
246            RunEvent::ProcessExit {
247                id,
248                pid,
249                code,
250                signal,
251            } => {
252                if let Some(proxy) = diagnostics_proxy.take() {
253                    proxy.shutdown();
254                }
255                let mut state = state.lock().await;
256                update_exited_process(&mut state, &id, code, signal).await?;
257                log_exit(&id, pid, code, signal);
258                process::stop(&mut state).await?;
259            }
260        }
261    }
262
263    Ok(())
264}
265
266async fn run_setup_only(
267    layout: &AssetsLayout,
268    args: &StartArgs,
269    protocol_version: &str,
270) -> Result<()> {
271    if args.force_setup {
272        assets::teardown(layout).await?;
273        assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
274        print_derived_accounts_summary(layout).await;
275        return Ok(());
276    }
277
278    if layout.exists().await {
279        println!(
280            "assets already exist at {}; use --force-setup to rebuild",
281            shorten_home_path(&layout.net_dir().display().to_string())
282        );
283        print_derived_accounts_summary(layout).await;
284        return Ok(());
285    }
286
287    assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
288    print_derived_accounts_summary(layout).await;
289    Ok(())
290}
291
292fn record_pid(record: &ProcessRecord) -> Option<u32> {
293    if let Some(handle) = &record.pid_handle {
294        let pid = handle.load(Ordering::SeqCst);
295        if pid != 0 {
296            return Some(pid);
297        }
298    }
299    record.pid
300}
301
302fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
303    SetupOptions {
304        nodes: args.node_count,
305        users: args.users,
306        delay_seconds: args.delay,
307        network_name: args.network_name.clone(),
308        protocol_version: protocol_version.to_string(),
309        node_log_format: args.node_log_format.clone(),
310        seed: Arc::clone(&args.seed),
311    }
312}
313
314fn print_pids(records: &[RunningProcess]) {
315    for record in records {
316        if let Some(pid) = record_pid(&record.record) {
317            println!(
318                "{} pid={} ({:?})",
319                record.record.id, pid, record.record.kind
320            );
321        }
322    }
323}
324
325async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
326    let symlink_root = layout.net_dir();
327    let mut node_pids: HashMap<u32, u32> = HashMap::new();
328    let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
329    let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
330
331    for process in processes {
332        if let Some(pid) = record_pid(&process.record) {
333            match process.record.kind {
334                ProcessKind::Node => {
335                    node_pids.insert(process.record.node_id, pid);
336                }
337                ProcessKind::Sidecar => {
338                    sidecar_pids.insert(process.record.node_id, pid);
339                }
340            }
341            process_logs
342                .entry(process.record.node_id)
343                .or_default()
344                .push((process.record.kind.clone(), pid));
345        }
346    }
347
348    let node_ids = unique_node_ids(processes);
349
350    let mut lines = Vec::new();
351    lines.push("network details".to_string());
352    for node_id in node_ids {
353        let node_pid = node_pids
354            .get(&node_id)
355            .map(|pid| pid.to_string())
356            .unwrap_or_else(|| "-".to_string());
357        let sidecar_pid = sidecar_pids
358            .get(&node_id)
359            .map(|pid| pid.to_string())
360            .unwrap_or_else(|| "-".to_string());
361        lines.push(format!("  node-{}", node_id));
362        lines.push(format!(
363            "    pids: node={} sidecar={}",
364            node_pid, sidecar_pid
365        ));
366        if let Some(entries) = process_logs.get(&node_id) {
367            let mut entries = entries.clone();
368            entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
369            lines.push("    logs".to_string());
370            for (kind, pid) in entries {
371                let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
372                lines.push(format!(
373                    "      {} pid={} stdout={} stderr={}",
374                    process_kind_label(&kind),
375                    pid,
376                    stdout_link,
377                    stderr_link
378                ));
379            }
380        }
381        lines.push("    endpoints".to_string());
382        lines.push(format!("      rest:   {}", assets::rest_endpoint(node_id)));
383        lines.push(format!("      sse:    {}", assets::sse_endpoint(node_id)));
384        lines.push(format!("      rpc:    {}", assets::rpc_endpoint(node_id)));
385        lines.push(format!("      binary: {}", assets::binary_address(node_id)));
386        lines.push(format!(
387            "      diagnostics: {}",
388            assets::diagnostics_socket_path(layout.network_name(), node_id)
389        ));
390        lines.push(format!(
391            "      diagnostics-ws: {}",
392            assets::diagnostics_ws_endpoint(node_id)
393        ));
394        lines.push(format!(
395            "      gossip: {}",
396            assets::network_address(node_id)
397        ));
398    }
399
400    lines.join("\n")
401}
402
403fn process_kind_label(kind: &ProcessKind) -> &'static str {
404    match kind {
405        ProcessKind::Node => "node",
406        ProcessKind::Sidecar => "sidecar",
407    }
408}
409
410fn shorten_home_path(path: &str) -> String {
411    let path = Path::new(path);
412    let Some(base_dirs) = BaseDirs::new() else {
413        return path.display().to_string();
414    };
415    let home = base_dirs.home_dir();
416    match path.strip_prefix(home) {
417        Ok(stripped) => {
418            if stripped.as_os_str().is_empty() {
419                return "~".to_string();
420            }
421            let mut shorthand = PathBuf::from("~");
422            shorthand.push(stripped);
423            shorthand.display().to_string()
424        }
425        Err(_) => path.display().to_string(),
426    }
427}
428
429fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
430    let base = match kind {
431        ProcessKind::Node => format!("node-{}", node_id),
432        ProcessKind::Sidecar => format!("sidecar-{}", node_id),
433    };
434    let stdout_link = symlink_root.join(format!("{}.stdout", base));
435    let stderr_link = symlink_root.join(format!("{}.stderr", base));
436    (
437        shorten_home_path(&stdout_link.display().to_string()),
438        shorten_home_path(&stderr_link.display().to_string()),
439    )
440}
441
442async fn print_derived_accounts_summary(layout: &AssetsLayout) {
443    if let Some(summary) = assets::derived_accounts_summary(layout).await {
444        if let Some(parsed) = parse_derived_accounts_csv(&summary) {
445            println!("derived accounts");
446            if !parsed.validators.is_empty() {
447                println!("  validators");
448                print_account_group(&parsed.validators);
449            }
450            if !parsed.users.is_empty() {
451                println!("  users");
452                print_account_group(&parsed.users);
453            }
454            if !parsed.other.is_empty() {
455                println!("  other");
456                print_account_group(&parsed.other);
457            }
458        } else {
459            println!("derived accounts");
460            for line in summary.lines() {
461                println!("  {}", line);
462            }
463        }
464    }
465}
466
467struct DerivedAccountRow {
468    name: String,
469    key_type: String,
470    derivation: String,
471    path: String,
472    account_hash: String,
473    balance: String,
474}
475
476struct DerivedAccountsParsed {
477    validators: Vec<DerivedAccountRow>,
478    users: Vec<DerivedAccountRow>,
479    other: Vec<DerivedAccountRow>,
480}
481
482fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
483    let mut lines = summary.lines();
484    let header = lines.next()?.trim();
485    if header != "kind,name,key_type,derivation,path,account_hash,balance" {
486        return None;
487    }
488
489    let mut parsed = DerivedAccountsParsed {
490        validators: Vec::new(),
491        users: Vec::new(),
492        other: Vec::new(),
493    };
494
495    for line in lines {
496        let line = line.trim();
497        if line.is_empty() {
498            continue;
499        }
500        let mut parts = line.splitn(7, ',');
501        let kind = parts.next()?.to_string();
502        let name = parts.next()?.to_string();
503        let key_type = parts.next()?.to_string();
504        let derivation = parts.next()?.to_string();
505        let path = parts.next()?.to_string();
506        let account_hash = parts.next()?.to_string();
507        let balance = parts.next()?.to_string();
508        let row = DerivedAccountRow {
509            name,
510            key_type,
511            derivation,
512            path,
513            account_hash,
514            balance,
515        };
516        match kind.as_str() {
517            "validator" => parsed.validators.push(row),
518            "user" => parsed.users.push(row),
519            _ => parsed.other.push(row),
520        }
521    }
522
523    Some(parsed)
524}
525
526fn print_account_group(rows: &[DerivedAccountRow]) {
527    for row in rows {
528        println!("    {}:", row.name);
529        println!("      key_type: {}", row.key_type);
530        println!("      derivation: {}", row.derivation);
531        println!("      path: {}", row.path);
532        println!("      account_hash: {}", row.account_hash);
533        println!("      balance: {}", row.balance);
534    }
535}
536
537fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
538    let mut nodes = HashSet::new();
539    for process in processes {
540        nodes.insert(process.record.node_id);
541    }
542    let mut ids: Vec<u32> = nodes.into_iter().collect();
543    ids.sort_unstable();
544    ids
545}
546
547enum RunEvent {
548    CtrlC,
549    ProcessExit {
550        id: String,
551        pid: Option<u32>,
552        code: Option<i32>,
553        signal: Option<i32>,
554    },
555}
556
557fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
558    tokio::spawn(async move {
559        if tokio::signal::ctrl_c().await.is_ok() {
560            let _ = tx.send(RunEvent::CtrlC);
561        }
562    });
563}
564
565fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
566    for running in processes {
567        let tx = tx.clone();
568        tokio::spawn(async move {
569            let id = running.record.id.clone();
570            match running.handle {
571                ProcessHandle::Child(mut child) => {
572                    if let Ok(status) = child.wait().await {
573                        let pid = record_pid(&running.record).or_else(|| child.id());
574                        let code = status.code();
575                        let signal = status.signal();
576                        let _ = tx.send(RunEvent::ProcessExit {
577                            id: id.clone(),
578                            pid,
579                            code,
580                            signal,
581                        });
582                    }
583                }
584                ProcessHandle::Task(handle) => {
585                    let status = handle.await;
586                    let pid = record_pid(&running.record);
587                    let (code, signal) = match status {
588                        Ok(Ok(())) => (Some(0), None),
589                        Ok(Err(_)) => (None, None),
590                        Err(_) => (None, None),
591                    };
592                    let _ = tx.send(RunEvent::ProcessExit {
593                        id: id.clone(),
594                        pid,
595                        code,
596                        signal,
597                    });
598                }
599            }
600        });
601    }
602}
603
604const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
605const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
606
607struct SseHealth {
608    expected_nodes: HashSet<u32>,
609    versions: HashMap<u32, String>,
610    announced: bool,
611    block_seen: bool,
612    sse_spinner: Option<Spinner>,
613    block_spinner: Option<Spinner>,
614    details: String,
615}
616
617impl SseHealth {
618    fn new(node_ids: Vec<u32>, details: String) -> Self {
619        Self {
620            expected_nodes: node_ids.into_iter().collect(),
621            versions: HashMap::new(),
622            announced: false,
623            block_seen: false,
624            sse_spinner: None,
625            block_spinner: None,
626            details,
627        }
628    }
629}
630
631async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
632    if node_id != 1 {
633        return false;
634    }
635    let state = health.lock().await;
636    state.announced
637}
638
639fn start_spinner(message: &str) -> Spinner {
640    Spinner::new(Spinners::Dots, message.to_string())
641}
642
643async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
644    let mut state = health.lock().await;
645    if state.sse_spinner.is_none() {
646        state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
647    }
648}
649
650async fn spawn_sse_listeners(
651    layout: AssetsLayout,
652    node_ids: &[u32],
653    health: Arc<Mutex<SseHealth>>,
654    state: Arc<Mutex<State>>,
655) {
656    for node_id in node_ids {
657        let node_id = *node_id;
658        let endpoint = assets::sse_endpoint(node_id);
659        let layout = layout.clone();
660        let health = Arc::clone(&health);
661        let state = Arc::clone(&state);
662        tokio::spawn(async move {
663            run_sse_listener(node_id, endpoint, health, state, layout).await;
664        });
665    }
666}
667
668async fn run_sse_listener(
669    node_id: u32,
670    endpoint: String,
671    health: Arc<Mutex<SseHealth>>,
672    state: Arc<Mutex<State>>,
673    layout: AssetsLayout,
674) {
675    let mut backoff = ExponentialBackoff::default();
676
677    loop {
678        let config = match ListenerConfig::builder()
679            .with_endpoint(endpoint.clone())
680            .build()
681        {
682            Ok(config) => config,
683            Err(_) => {
684                if !sleep_backoff(&mut backoff).await {
685                    return;
686                }
687                continue;
688            }
689        };
690
691        let stream = match sse::listener(config).await {
692            Ok(stream) => {
693                backoff.reset();
694                stream
695            }
696            Err(_) => {
697                if !sleep_backoff(&mut backoff).await {
698                    return;
699                }
700                continue;
701            }
702        };
703
704        futures::pin_mut!(stream);
705        let mut stream_failed = false;
706        while let Some(event) = stream.next().await {
707            match event {
708                Ok(sse_event) => match sse_event {
709                    SseEvent::ApiVersion(version) => {
710                        record_api_version(node_id, version.to_string(), &health).await;
711                    }
712                    SseEvent::BlockAdded { block_hash, block } => {
713                        if node_id == 1
714                            && let Err(err) = record_last_block_height(&state, block.height()).await
715                        {
716                            eprintln!("warning: failed to record last block height: {}", err);
717                        }
718                        if should_log_primary(node_id, &health).await {
719                            mark_block_seen(&health, &layout).await;
720                            let prefix = timestamp_prefix();
721                            println!(
722                                "{} Block {} added (height={} era={})",
723                                prefix,
724                                block_hash,
725                                block.height(),
726                                block.era_id().value()
727                            );
728                        }
729                    }
730                    SseEvent::TransactionAccepted(transaction) => {
731                        if node_id == 1 {
732                            let prefix = timestamp_prefix();
733                            println!("{} Transaction {} accepted", prefix, transaction.hash());
734                        }
735                    }
736                    SseEvent::TransactionProcessed {
737                        transaction_hash,
738                        execution_result,
739                        messages,
740                        ..
741                    } => {
742                        if node_id == 1 {
743                            let tx_hash = transaction_hash.to_string();
744                            let prefix = timestamp_prefix();
745                            log_transaction_processed(
746                                &prefix,
747                                &tx_hash,
748                                &execution_result,
749                                &messages,
750                            );
751                        }
752                    }
753                    _ => {}
754                },
755                Err(_) => {
756                    stream_failed = true;
757                    break;
758                }
759            }
760        }
761
762        if stream_failed && !sleep_backoff(&mut backoff).await {
763            return;
764        }
765    }
766}
767
768async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
769    let (summary, details, sse_spinner) = {
770        let mut state = health.lock().await;
771        if !state.expected_nodes.contains(&node_id) {
772            return;
773        }
774        state.versions.insert(node_id, version);
775        if state.announced || state.versions.len() != state.expected_nodes.len() {
776            return;
777        }
778
779        let summary = version_summary(&state.versions);
780        let details = state.details.clone();
781        let sse_spinner = state.sse_spinner.take();
782        if state.block_spinner.is_none() {
783            state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
784        }
785        state.announced = true;
786        state.block_seen = false;
787        (summary, details, sse_spinner)
788    };
789
790    if let Some(mut spinner) = sse_spinner {
791        spinner.stop_with_message("SSE connection established.".to_string());
792    }
793    println!("Network is healthy ({})", summary);
794    println!("{}", details);
795}
796
797async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
798    let (block_spinner, node_ids) = {
799        let mut state = health.lock().await;
800        if state.block_seen {
801            return;
802        }
803        state.block_seen = true;
804        (
805            state.block_spinner.take(),
806            state.expected_nodes.iter().copied().collect::<Vec<_>>(),
807        )
808    };
809
810    if let Some(mut spinner) = block_spinner {
811        spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
812    }
813
814    match assets::remove_consensus_keys(layout, &node_ids).await {
815        Ok(removed) => {
816            if removed > 0 {
817                println!("Consensus secret keys removed from disk.");
818            }
819        }
820        Err(err) => {
821            eprintln!("warning: failed to remove consensus secret keys: {}", err);
822        }
823    }
824}
825
826async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
827    let mut state = state.lock().await;
828    if state.last_block_height == Some(height) {
829        return Ok(());
830    }
831    state.last_block_height = Some(height);
832    state.touch().await?;
833    Ok(())
834}
835
836fn version_summary(versions: &HashMap<u32, String>) -> String {
837    let mut unique: Vec<String> = versions.values().cloned().collect();
838    unique.sort();
839    unique.dedup();
840    if unique.len() == 1 {
841        format!("version {}", unique[0])
842    } else {
843        format!("versions {}", unique.join(", "))
844    }
845}
846
847async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
848    if let Some(delay) = backoff.next_backoff() {
849        tokio::time::sleep(delay).await;
850        return true;
851    }
852    false
853}
854
855fn log_transaction_processed(
856    prefix: &str,
857    transaction_hash: &str,
858    execution_result: &ExecutionResult,
859    messages: &[casper_types::contract_messages::Message],
860) {
861    let consumed = execution_result.consumed();
862    let consumed_cspr = format_cspr_u512(&consumed);
863    if let Some(error) = execution_result.error_message() {
864        println!(
865            "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
866            prefix, transaction_hash, error, consumed, consumed_cspr
867        );
868    } else {
869        println!(
870            "{} Transaction {} processed succeeded gas={} gas_cspr={}",
871            prefix, transaction_hash, consumed, consumed_cspr
872        );
873    }
874
875    for message in messages {
876        let entity = message.entity_addr().to_formatted_string();
877        let topic = message.topic_name();
878        let payload = format_message_payload(message.payload());
879        println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
880    }
881}
882
883fn timestamp_prefix() -> String {
884    time::OffsetDateTime::now_utc()
885        .format(&time::format_description::well_known::Rfc3339)
886        .unwrap_or_else(|_| "unknown-time".to_string())
887}
888
889fn format_message_payload(payload: &MessagePayload) -> String {
890    match payload {
891        MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
892        MessagePayload::String(value) => format!("{:?}", value),
893    }
894}
895
896fn encode_hex(bytes: &[u8]) -> String {
897    let mut out = String::with_capacity(bytes.len() * 2);
898    for byte in bytes {
899        use std::fmt::Write;
900        let _ = write!(&mut out, "{:02x}", byte);
901    }
902    out
903}
904
905fn format_cspr_u512(motes: &U512) -> String {
906    let motes_str = motes.to_string();
907    let digits = motes_str.len();
908    if digits <= 9 {
909        let frac = format!("{:0>9}", motes_str);
910        let frac = frac.trim_end_matches('0');
911        if frac.is_empty() {
912            return "0".to_string();
913        }
914        return format!("0.{}", frac);
915    }
916
917    let split = digits - 9;
918    let (whole, frac) = motes_str.split_at(split);
919    let frac = frac.trim_end_matches('0');
920    if frac.is_empty() {
921        return whole.to_string();
922    }
923    format!("{}.{}", whole, frac)
924}
925
926async fn update_exited_process(
927    state: &mut State,
928    id: &str,
929    code: Option<i32>,
930    signal: Option<i32>,
931) -> Result<()> {
932    for record in &mut state.processes {
933        if record.id == id {
934            record.last_status = ProcessStatus::Exited;
935            record.exit_code = code;
936            record.exit_signal = signal;
937            record.stopped_at = Some(time::OffsetDateTime::now_utc());
938            break;
939        }
940    }
941    state.touch().await?;
942    Ok(())
943}
944
945fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
946    if let Some(pid) = pid {
947        if let Some(signal) = signal {
948            println!(
949                "process {} (pid {}) exited due to signal {}",
950                id, pid, signal
951            );
952        } else if let Some(code) = code {
953            println!("process {} (pid {}) exited with code {}", id, pid, code);
954        } else {
955            println!("process {} (pid {}) exited", id, pid);
956        }
957    } else if let Some(signal) = signal {
958        println!("process {} exited due to signal {}", id, signal);
959    } else if let Some(code) = code {
960        println!("process {} exited with code {}", id, code);
961    } else {
962        println!("process {} exited", id);
963    }
964}
965
966async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
967    let total_nodes = layout.count_nodes().await.unwrap_or(0);
968    let target = format!("all nodes ({})", total_nodes);
969    let sidecars = processes
970        .iter()
971        .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
972        .count();
973    println!(
974        "started {} process(es) for {} (sidecars: {})",
975        processes.len(),
976        target,
977        sidecars
978    );
979}
980
981fn looks_like_url(path: &Path) -> bool {
982    let value = path.to_string_lossy();
983    value.starts_with("http://") || value.starts_with("https://")
984}
985
986async fn is_dir(path: &Path) -> bool {
987    tokio_fs::metadata(path)
988        .await
989        .map(|meta| meta.is_dir())
990        .unwrap_or(false)
991}
992
993async fn run_assets(args: AssetsArgs) -> Result<()> {
994    match args.command {
995        AssetsCommand::Add(add) => run_assets_add(add).await,
996        AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
997        AssetsCommand::List => run_assets_list().await,
998    }
999}
1000
1001async fn run_is_ready(args: IsReadyArgs) -> Result<()> {
1002    let assets_root = match &args.net_path {
1003        Some(path) => path.clone(),
1004        None => assets::default_assets_root()?,
1005    };
1006    let layout = AssetsLayout::new(assets_root, args.network_name);
1007    let argv0 = std::env::args()
1008        .next()
1009        .unwrap_or_else(|| "casper-devnet".to_string());
1010    let setup_cmd = format!("{} start --setup-only", argv0);
1011    let net_dir = layout.net_dir();
1012    if !is_dir(&net_dir).await {
1013        return Err(anyhow!(
1014            "assets for {} not found; run `{}`",
1015            layout.network_name(),
1016            setup_cmd
1017        ));
1018    }
1019
1020    let state_path = net_dir.join(STATE_FILE_NAME);
1021    let contents = match tokio_fs::read_to_string(&state_path).await {
1022        Ok(contents) => contents,
1023        Err(_) => return Err(anyhow!("network is not ready yet")),
1024    };
1025    let state =
1026        match tokio::task::spawn_blocking(move || serde_json::from_str::<State>(&contents)).await {
1027            Ok(Ok(state)) => state,
1028            _ => return Err(anyhow!("network is not ready yet")),
1029        };
1030
1031    ensure_processes_running(&state)?;
1032
1033    if state.last_block_height.is_none() {
1034        return Err(anyhow!("network is not ready yet"));
1035    }
1036
1037    ensure_rest_ready(&state).await?;
1038    Ok(())
1039}
1040
1041async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
1042    if looks_like_url(&args.path) {
1043        return Err(anyhow!(
1044            "assets URL is not supported yet; provide a local .tar.gz path"
1045        ));
1046    }
1047    assets::install_assets_bundle(&args.path).await?;
1048    println!(
1049        "assets installed into {}",
1050        assets::assets_bundle_root()?.display()
1051    );
1052    Ok(())
1053}
1054
1055async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
1056    assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
1057    Ok(())
1058}
1059
1060async fn run_assets_list() -> Result<()> {
1061    let mut versions = assets::list_bundle_versions().await?;
1062    if versions.is_empty() {
1063        return Err(anyhow!("no assets bundles found"));
1064    }
1065    versions.sort_by(|a, b| b.cmp(a));
1066    for version in versions {
1067        println!("{}", version);
1068    }
1069    Ok(())
1070}
1071
1072async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
1073    if let Some(raw) = candidate {
1074        let version = assets::parse_protocol_version(raw)?;
1075        if !assets::has_bundle_version(&version).await? {
1076            let argv0 = std::env::args()
1077                .next()
1078                .unwrap_or_else(|| "casper-devnet".to_string());
1079            let pull_cmd = format!("{} assets pull", argv0);
1080            let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1081            return Err(anyhow!(
1082                "assets for version {} not found; run `{}` or `{}`",
1083                version,
1084                pull_cmd,
1085                add_cmd
1086            ));
1087        }
1088        return Ok(version.to_string());
1089    }
1090    let versions = assets::list_bundle_versions().await?;
1091    if versions.is_empty() {
1092        let argv0 = std::env::args()
1093            .next()
1094            .unwrap_or_else(|| "casper-devnet".to_string());
1095        let pull_cmd = format!("{} assets pull", argv0);
1096        let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1097        return Err(anyhow!(
1098            "no assets found; run `{}` or `{}`",
1099            pull_cmd,
1100            add_cmd
1101        ));
1102    }
1103    let version = versions
1104        .into_iter()
1105        .max()
1106        .expect("non-empty assets versions");
1107    Ok(version.to_string())
1108}
1109
1110fn ensure_processes_running(state: &State) -> Result<()> {
1111    if state.processes.is_empty() {
1112        return Err(anyhow!("network is not ready yet"));
1113    }
1114    for process in &state.processes {
1115        if !matches!(process.last_status, ProcessStatus::Running) {
1116            return Err(anyhow!("network is not ready yet"));
1117        }
1118        let pid = match process.pid {
1119            Some(pid) => pid,
1120            None => return Err(anyhow!("network is not ready yet")),
1121        };
1122        if !is_pid_running(pid) {
1123            return Err(anyhow!("network is not ready yet"));
1124        }
1125    }
1126    Ok(())
1127}
1128
1129fn is_pid_running(pid: u32) -> bool {
1130    let pid = Pid::from_raw(pid as i32);
1131    match kill(pid, None) {
1132        Ok(()) => true,
1133        Err(Errno::ESRCH) => false,
1134        Err(_) => true,
1135    }
1136}
1137
1138async fn ensure_rest_ready(state: &State) -> Result<()> {
1139    let node_ids: HashSet<u32> = state
1140        .processes
1141        .iter()
1142        .filter_map(|process| {
1143            if matches!(process.kind, ProcessKind::Node) {
1144                Some(process.node_id)
1145            } else {
1146                None
1147            }
1148        })
1149        .collect();
1150    if node_ids.is_empty() {
1151        return Err(anyhow!("network is not ready yet"));
1152    }
1153
1154    let client = reqwest::Client::builder()
1155        .no_proxy()
1156        .timeout(Duration::from_secs(2))
1157        .build()?;
1158
1159    for node_id in node_ids {
1160        let url = format!("{}/status", assets::rest_endpoint(node_id));
1161        let response = match client.get(&url).send().await {
1162            Ok(response) => response,
1163            Err(_) => return Err(anyhow!("network is not ready yet")),
1164        };
1165        if response.status() != reqwest::StatusCode::OK {
1166            return Err(anyhow!("network is not ready yet"));
1167        }
1168        let status = match response.json::<NodeStatus>().await {
1169            Ok(status) => status,
1170            Err(_) => return Err(anyhow!("network is not ready yet")),
1171        };
1172        if !status
1173            .reactor_state
1174            .as_deref()
1175            .map(is_ready_reactor_state)
1176            .unwrap_or(false)
1177        {
1178            return Err(anyhow!("network is not ready yet"));
1179        }
1180    }
1181    Ok(())
1182}
1183
1184fn is_ready_reactor_state(state: &str) -> bool {
1185    state == "Validate"
1186}
1187
1188#[derive(Deserialize)]
1189#[allow(dead_code)]
1190#[serde(rename_all = "snake_case")]
1191struct NodeStatus {
1192    peers: Option<Vec<NodePeer>>,
1193    api_version: Option<String>,
1194    build_version: Option<String>,
1195    chainspec_name: Option<String>,
1196    starting_state_root_hash: Option<String>,
1197    last_added_block_info: Option<serde_json::Value>,
1198    our_public_signing_key: Option<String>,
1199    round_length: Option<serde_json::Value>,
1200    next_upgrade: Option<serde_json::Value>,
1201    uptime: Option<String>,
1202    reactor_state: Option<String>,
1203    last_progress: Option<String>,
1204    available_block_range: Option<BlockRange>,
1205    block_sync: Option<BlockSync>,
1206    latest_switch_block_hash: Option<serde_json::Value>,
1207}
1208
1209#[derive(Deserialize)]
1210#[allow(dead_code)]
1211#[serde(rename_all = "snake_case")]
1212struct NodePeer {
1213    node_id: Option<String>,
1214    address: Option<String>,
1215}
1216
1217#[derive(Deserialize)]
1218#[allow(dead_code)]
1219#[serde(rename_all = "snake_case")]
1220struct BlockRange {
1221    low: Option<u64>,
1222    high: Option<u64>,
1223}
1224
1225#[derive(Deserialize)]
1226#[allow(dead_code)]
1227#[serde(rename_all = "snake_case")]
1228struct BlockSync {
1229    historical: Option<serde_json::Value>,
1230    forward: Option<serde_json::Value>,
1231}
1232
1233#[cfg(test)]
1234mod tests {
1235    use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1236    use casper_types::U512;
1237    use casper_types::contract_messages::MessagePayload;
1238    use directories::BaseDirs;
1239
1240    #[test]
1241    fn format_cspr_u512_handles_whole_and_fractional() {
1242        assert_eq!(format_cspr_u512(&U512::zero()), "0");
1243        assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1244        assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1245        assert_eq!(
1246            format_cspr_u512(&U512::from(1_000_000_001u64)),
1247            "1.000000001"
1248        );
1249        assert_eq!(
1250            format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1251            "123"
1252        );
1253        assert_eq!(
1254            format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1255            "123.000000456"
1256        );
1257    }
1258
1259    #[test]
1260    fn format_message_payload_renders_string_with_quotes() {
1261        let payload = MessagePayload::String("hello".to_string());
1262        assert_eq!(format_message_payload(&payload), "\"hello\"");
1263    }
1264
1265    #[test]
1266    fn encode_hex_renders_lowercase() {
1267        assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1268    }
1269
1270    #[test]
1271    fn shorten_home_path_replaces_home_prefix() {
1272        let Some(base_dirs) = BaseDirs::new() else {
1273            return;
1274        };
1275        let home = base_dirs.home_dir();
1276        let shortened = shorten_home_path(&home.to_string_lossy());
1277        assert_eq!(shortened, "~");
1278
1279        let nested = home.join("devnet/logs/stdout.log");
1280        let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1281        assert!(shortened_nested.starts_with("~"));
1282        assert!(shortened_nested.contains("devnet"));
1283    }
1284
1285    #[test]
1286    fn shorten_home_path_keeps_relative_paths() {
1287        let input = "relative/path";
1288        assert_eq!(shorten_home_path(input), input);
1289    }
1290}