casper_devnet/
cli.rs

1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::diagnostics_port;
3use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
4use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, STATE_FILE_NAME, State};
5use anyhow::{Result, anyhow};
6use backoff::ExponentialBackoff;
7use backoff::backoff::Backoff;
8use casper_types::U512;
9use casper_types::contract_messages::MessagePayload;
10use casper_types::execution::ExecutionResult;
11use clap::{Args, Parser, Subcommand};
12use directories::BaseDirs;
13use futures::StreamExt;
14use nix::errno::Errno;
15use nix::sys::signal::kill;
16use nix::unistd::Pid;
17use serde::Deserialize;
18use spinners::{Spinner, Spinners};
19use std::collections::{HashMap, HashSet};
20use std::os::unix::process::ExitStatusExt;
21use std::path::{Path, PathBuf};
22use std::sync::Arc;
23use std::sync::atomic::Ordering;
24use std::time::Duration;
25use tokio::fs as tokio_fs;
26use tokio::sync::Mutex;
27use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
28use veles_casper_rust_sdk::sse::event::SseEvent;
29use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
30
31/// CLI entrypoint for the devnet launcher.
32#[derive(Parser)]
33#[command(name = "nctl")]
34#[command(
35    about = "casper-devnet launcher for local Casper Network development networks",
36    long_about = None
37)]
38pub struct Cli {
39    #[command(subcommand)]
40    command: Command,
41}
42
43/// Top-level CLI subcommands.
44#[derive(Subcommand)]
45enum Command {
46    /// Setup assets (if needed) and start the devnet.
47    Start(StartArgs),
48    /// Manage assets bundles.
49    Assets(AssetsArgs),
50    /// Check whether a network has observed a block yet.
51    IsReady(IsReadyArgs),
52}
53
54/// Arguments for `nctl start`.
55#[derive(Args, Clone)]
56struct StartArgs {
57    /// Network name used in assets paths and configs.
58    #[arg(long, default_value = "casper-dev")]
59    network_name: String,
60
61    /// Override the base path for network runtime assets.
62    #[arg(long, value_name = "PATH")]
63    net_path: Option<PathBuf>,
64
65    /// Protocol version to use from the assets store (e.g. 2.1.1).
66    #[arg(long)]
67    protocol_version: Option<String>,
68
69    /// Number of nodes to create and start.
70    #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
71    node_count: u32,
72
73    /// Number of user accounts to generate (defaults to node count).
74    #[arg(long)]
75    users: Option<u32>,
76
77    /// Genesis activation delay in seconds.
78    #[arg(long, default_value_t = 3)]
79    delay: u64,
80
81    /// Log level for child processes (passed as `RUST_LOG`).
82    #[arg(long = "log-level", default_value = "info")]
83    log_level: String,
84
85    /// Log format for node config files.
86    #[arg(long, default_value = "json")]
87    node_log_format: String,
88
89    /// Create assets and exit without starting processes.
90    #[arg(long)]
91    setup_only: bool,
92
93    /// Rebuild assets even if they already exist.
94    #[arg(long)]
95    force_setup: bool,
96
97    /// Deterministic seed for devnet key generation.
98    #[arg(long, default_value = "default")]
99    seed: Arc<str>,
100}
101
102/// Asset management arguments.
103#[derive(Args)]
104struct AssetsArgs {
105    #[command(subcommand)]
106    command: AssetsCommand,
107}
108
109/// Asset management subcommands.
110#[derive(Subcommand)]
111enum AssetsCommand {
112    /// Extract a local assets bundle into the assets store.
113    Add(AssetsAddArgs),
114    /// Download assets bundles from the upstream release.
115    Pull(AssetsPullArgs),
116    /// List available protocol versions in the assets store.
117    List,
118}
119
120/// Arguments for `nctl assets add`.
121#[derive(Args, Clone)]
122struct AssetsAddArgs {
123    /// Path to a local assets bundle (.tar.gz).
124    #[arg(value_name = "PATH")]
125    path: PathBuf,
126}
127
128/// Arguments for `nctl assets pull`.
129#[derive(Args, Clone)]
130struct AssetsPullArgs {
131    /// Target triple to select from release assets.
132    #[arg(long)]
133    target: Option<String>,
134
135    /// Re-download and replace any existing assets.
136    #[arg(long)]
137    force: bool,
138}
139
140/// Arguments for `nctl is-ready`.
141#[derive(Args, Clone)]
142struct IsReadyArgs {
143    /// Network name used in assets paths and configs.
144    #[arg(long, default_value = "casper-dev")]
145    network_name: String,
146
147    /// Override the base path for network runtime assets.
148    #[arg(long, value_name = "PATH")]
149    net_path: Option<PathBuf>,
150}
151
152/// Parses CLI and runs the selected subcommand.
153pub async fn run() -> Result<()> {
154    let cli = Cli::parse();
155    match cli.command {
156        Command::Start(args) => run_start(args).await,
157        Command::Assets(args) => run_assets(args).await,
158        Command::IsReady(args) => run_is_ready(args).await,
159    }
160}
161
162async fn run_start(args: StartArgs) -> Result<()> {
163    let assets_root = match &args.net_path {
164        Some(path) => path.clone(),
165        None => assets::default_assets_root()?,
166    };
167    let layout = AssetsLayout::new(assets_root, args.network_name.clone());
168    let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
169    println!("assets path: {}", assets_path);
170    let assets_exist = layout.exists().await;
171    if !args.setup_only && !args.force_setup && assets_exist {
172        println!("resuming network operations on {}", layout.network_name());
173    }
174    let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
175
176    if args.setup_only {
177        return run_setup_only(&layout, &args, &protocol_version).await;
178    }
179
180    if args.force_setup {
181        assets::teardown(&layout).await?;
182        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
183    } else if !assets_exist {
184        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
185    }
186
187    if !layout.exists().await {
188        return Err(anyhow!(
189            "assets missing under {}; run with --setup-only to create them",
190            shorten_home_path(&layout.net_dir().display().to_string())
191        ));
192    }
193
194    if !args.force_setup && assets_exist {
195        let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
196        if restored > 0 {
197            println!("recreated consensus keys for {} node(s)", restored);
198        }
199    }
200
201    let rust_log = args.log_level.clone();
202
203    let plan = StartPlan { rust_log };
204
205    let state_path = layout.net_dir().join(STATE_FILE_NAME);
206    let state = Arc::new(Mutex::new(State::new(state_path).await?));
207    let started = {
208        let mut state = state.lock().await;
209        process::start(&layout, &plan, &mut state).await?
210    };
211
212    print_pids(&started);
213    print_start_banner(&layout, &started).await;
214    print_derived_accounts_summary(&layout).await;
215
216    let node_ids = unique_node_ids(&started);
217    let details = format_network_details(&layout, &started).await;
218    let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
219    start_sse_spinner(&health).await;
220    spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
221    let mut diagnostics_proxy = match diagnostics_port::spawn(&layout).await {
222        Ok(proxy) => Some(proxy),
223        Err(err) => {
224            eprintln!("warning: failed to start diagnostics proxy: {}", err);
225            None
226        }
227    };
228
229    let (event_tx, mut event_rx) = unbounded_channel();
230    spawn_ctrlc_listener(event_tx.clone());
231    spawn_exit_watchers(started, event_tx);
232
233    if let Some(event) = event_rx.recv().await {
234        match event {
235            RunEvent::CtrlC => {
236                if let Some(proxy) = diagnostics_proxy.take() {
237                    proxy.shutdown();
238                }
239                let mut state = state.lock().await;
240                process::stop(&mut state).await?;
241            }
242            RunEvent::ProcessExit {
243                id,
244                pid,
245                code,
246                signal,
247            } => {
248                if let Some(proxy) = diagnostics_proxy.take() {
249                    proxy.shutdown();
250                }
251                let mut state = state.lock().await;
252                update_exited_process(&mut state, &id, code, signal).await?;
253                log_exit(&id, pid, code, signal);
254                process::stop(&mut state).await?;
255            }
256        }
257    }
258
259    Ok(())
260}
261
262async fn run_setup_only(
263    layout: &AssetsLayout,
264    args: &StartArgs,
265    protocol_version: &str,
266) -> Result<()> {
267    if args.force_setup {
268        assets::teardown(layout).await?;
269        assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
270        print_derived_accounts_summary(layout).await;
271        return Ok(());
272    }
273
274    if layout.exists().await {
275        println!(
276            "assets already exist at {}; use --force-setup to rebuild",
277            shorten_home_path(&layout.net_dir().display().to_string())
278        );
279        print_derived_accounts_summary(layout).await;
280        return Ok(());
281    }
282
283    assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
284    print_derived_accounts_summary(layout).await;
285    Ok(())
286}
287
288fn record_pid(record: &ProcessRecord) -> Option<u32> {
289    if let Some(handle) = &record.pid_handle {
290        let pid = handle.load(Ordering::SeqCst);
291        if pid != 0 {
292            return Some(pid);
293        }
294    }
295    record.pid
296}
297
298fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
299    SetupOptions {
300        nodes: args.node_count,
301        users: args.users,
302        delay_seconds: args.delay,
303        network_name: args.network_name.clone(),
304        protocol_version: protocol_version.to_string(),
305        node_log_format: args.node_log_format.clone(),
306        seed: Arc::clone(&args.seed),
307    }
308}
309
310fn print_pids(records: &[RunningProcess]) {
311    for record in records {
312        if let Some(pid) = record_pid(&record.record) {
313            println!(
314                "{} pid={} ({:?})",
315                record.record.id, pid, record.record.kind
316            );
317        }
318    }
319}
320
321async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
322    let symlink_root = layout.net_dir();
323    let mut node_pids: HashMap<u32, u32> = HashMap::new();
324    let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
325    let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
326
327    for process in processes {
328        if let Some(pid) = record_pid(&process.record) {
329            match process.record.kind {
330                ProcessKind::Node => {
331                    node_pids.insert(process.record.node_id, pid);
332                }
333                ProcessKind::Sidecar => {
334                    sidecar_pids.insert(process.record.node_id, pid);
335                }
336            }
337            process_logs
338                .entry(process.record.node_id)
339                .or_default()
340                .push((process.record.kind.clone(), pid));
341        }
342    }
343
344    let node_ids = unique_node_ids(processes);
345
346    let mut lines = Vec::new();
347    lines.push("network details".to_string());
348    for node_id in node_ids {
349        let node_pid = node_pids
350            .get(&node_id)
351            .map(|pid| pid.to_string())
352            .unwrap_or_else(|| "-".to_string());
353        let sidecar_pid = sidecar_pids
354            .get(&node_id)
355            .map(|pid| pid.to_string())
356            .unwrap_or_else(|| "-".to_string());
357        lines.push(format!("  node-{}", node_id));
358        lines.push(format!(
359            "    pids: node={} sidecar={}",
360            node_pid, sidecar_pid
361        ));
362        if let Some(entries) = process_logs.get(&node_id) {
363            let mut entries = entries.clone();
364            entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
365            lines.push("    logs".to_string());
366            for (kind, pid) in entries {
367                let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
368                lines.push(format!(
369                    "      {} pid={} stdout={} stderr={}",
370                    process_kind_label(&kind),
371                    pid,
372                    stdout_link,
373                    stderr_link
374                ));
375            }
376        }
377        lines.push("    endpoints".to_string());
378        lines.push(format!("      rest:   {}", assets::rest_endpoint(node_id)));
379        lines.push(format!("      sse:    {}", assets::sse_endpoint(node_id)));
380        lines.push(format!("      rpc:    {}", assets::rpc_endpoint(node_id)));
381        lines.push(format!("      binary: {}", assets::binary_address(node_id)));
382        lines.push(format!(
383            "      diagnostics: {}",
384            assets::diagnostics_socket_path(layout.network_name(), node_id)
385        ));
386        lines.push(format!(
387            "      diagnostics-ws: {}",
388            assets::diagnostics_ws_endpoint(node_id)
389        ));
390        lines.push(format!(
391            "      gossip: {}",
392            assets::network_address(node_id)
393        ));
394    }
395
396    lines.join("\n")
397}
398
399fn process_kind_label(kind: &ProcessKind) -> &'static str {
400    match kind {
401        ProcessKind::Node => "node",
402        ProcessKind::Sidecar => "sidecar",
403    }
404}
405
406fn shorten_home_path(path: &str) -> String {
407    let path = Path::new(path);
408    let Some(base_dirs) = BaseDirs::new() else {
409        return path.display().to_string();
410    };
411    let home = base_dirs.home_dir();
412    match path.strip_prefix(home) {
413        Ok(stripped) => {
414            if stripped.as_os_str().is_empty() {
415                return "~".to_string();
416            }
417            let mut shorthand = PathBuf::from("~");
418            shorthand.push(stripped);
419            shorthand.display().to_string()
420        }
421        Err(_) => path.display().to_string(),
422    }
423}
424
425fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
426    let base = match kind {
427        ProcessKind::Node => format!("node-{}", node_id),
428        ProcessKind::Sidecar => format!("sidecar-{}", node_id),
429    };
430    let stdout_link = symlink_root.join(format!("{}.stdout", base));
431    let stderr_link = symlink_root.join(format!("{}.stderr", base));
432    (
433        shorten_home_path(&stdout_link.display().to_string()),
434        shorten_home_path(&stderr_link.display().to_string()),
435    )
436}
437
438async fn print_derived_accounts_summary(layout: &AssetsLayout) {
439    if let Some(summary) = assets::derived_accounts_summary(layout).await {
440        if let Some(parsed) = parse_derived_accounts_csv(&summary) {
441            println!("derived accounts");
442            if !parsed.validators.is_empty() {
443                println!("  validators");
444                print_account_group(&parsed.validators);
445            }
446            if !parsed.users.is_empty() {
447                println!("  users");
448                print_account_group(&parsed.users);
449            }
450            if !parsed.other.is_empty() {
451                println!("  other");
452                print_account_group(&parsed.other);
453            }
454        } else {
455            println!("derived accounts");
456            for line in summary.lines() {
457                println!("  {}", line);
458            }
459        }
460    }
461}
462
463struct DerivedAccountRow {
464    name: String,
465    key_type: String,
466    derivation: String,
467    path: String,
468    account_hash: String,
469    balance: String,
470}
471
472struct DerivedAccountsParsed {
473    validators: Vec<DerivedAccountRow>,
474    users: Vec<DerivedAccountRow>,
475    other: Vec<DerivedAccountRow>,
476}
477
478fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
479    let mut lines = summary.lines();
480    let header = lines.next()?.trim();
481    if header != "kind,name,key_type,derivation,path,account_hash,balance" {
482        return None;
483    }
484
485    let mut parsed = DerivedAccountsParsed {
486        validators: Vec::new(),
487        users: Vec::new(),
488        other: Vec::new(),
489    };
490
491    for line in lines {
492        let line = line.trim();
493        if line.is_empty() {
494            continue;
495        }
496        let mut parts = line.splitn(7, ',');
497        let kind = parts.next()?.to_string();
498        let name = parts.next()?.to_string();
499        let key_type = parts.next()?.to_string();
500        let derivation = parts.next()?.to_string();
501        let path = parts.next()?.to_string();
502        let account_hash = parts.next()?.to_string();
503        let balance = parts.next()?.to_string();
504        let row = DerivedAccountRow {
505            name,
506            key_type,
507            derivation,
508            path,
509            account_hash,
510            balance,
511        };
512        match kind.as_str() {
513            "validator" => parsed.validators.push(row),
514            "user" => parsed.users.push(row),
515            _ => parsed.other.push(row),
516        }
517    }
518
519    Some(parsed)
520}
521
522fn print_account_group(rows: &[DerivedAccountRow]) {
523    for row in rows {
524        println!("    {}:", row.name);
525        println!("      key_type: {}", row.key_type);
526        println!("      derivation: {}", row.derivation);
527        println!("      path: {}", row.path);
528        println!("      account_hash: {}", row.account_hash);
529        println!("      balance: {}", row.balance);
530    }
531}
532
533fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
534    let mut nodes = HashSet::new();
535    for process in processes {
536        nodes.insert(process.record.node_id);
537    }
538    let mut ids: Vec<u32> = nodes.into_iter().collect();
539    ids.sort_unstable();
540    ids
541}
542
543enum RunEvent {
544    CtrlC,
545    ProcessExit {
546        id: String,
547        pid: Option<u32>,
548        code: Option<i32>,
549        signal: Option<i32>,
550    },
551}
552
553fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
554    tokio::spawn(async move {
555        if tokio::signal::ctrl_c().await.is_ok() {
556            let _ = tx.send(RunEvent::CtrlC);
557        }
558    });
559}
560
561fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
562    for running in processes {
563        let tx = tx.clone();
564        tokio::spawn(async move {
565            let id = running.record.id.clone();
566            match running.handle {
567                ProcessHandle::Child(mut child) => {
568                    if let Ok(status) = child.wait().await {
569                        let pid = record_pid(&running.record).or_else(|| child.id());
570                        let code = status.code();
571                        let signal = status.signal();
572                        let _ = tx.send(RunEvent::ProcessExit {
573                            id: id.clone(),
574                            pid,
575                            code,
576                            signal,
577                        });
578                    }
579                }
580                ProcessHandle::Task(handle) => {
581                    let status = handle.await;
582                    let pid = record_pid(&running.record);
583                    let (code, signal) = match status {
584                        Ok(Ok(())) => (Some(0), None),
585                        Ok(Err(_)) => (None, None),
586                        Err(_) => (None, None),
587                    };
588                    let _ = tx.send(RunEvent::ProcessExit {
589                        id: id.clone(),
590                        pid,
591                        code,
592                        signal,
593                    });
594                }
595            }
596        });
597    }
598}
599
600const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
601const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
602
603struct SseHealth {
604    expected_nodes: HashSet<u32>,
605    versions: HashMap<u32, String>,
606    announced: bool,
607    block_seen: bool,
608    sse_spinner: Option<Spinner>,
609    block_spinner: Option<Spinner>,
610    details: String,
611}
612
613impl SseHealth {
614    fn new(node_ids: Vec<u32>, details: String) -> Self {
615        Self {
616            expected_nodes: node_ids.into_iter().collect(),
617            versions: HashMap::new(),
618            announced: false,
619            block_seen: false,
620            sse_spinner: None,
621            block_spinner: None,
622            details,
623        }
624    }
625}
626
627async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
628    if node_id != 1 {
629        return false;
630    }
631    let state = health.lock().await;
632    state.announced
633}
634
635fn start_spinner(message: &str) -> Spinner {
636    Spinner::new(Spinners::Dots, message.to_string())
637}
638
639async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
640    let mut state = health.lock().await;
641    if state.sse_spinner.is_none() {
642        state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
643    }
644}
645
646async fn spawn_sse_listeners(
647    layout: AssetsLayout,
648    node_ids: &[u32],
649    health: Arc<Mutex<SseHealth>>,
650    state: Arc<Mutex<State>>,
651) {
652    for node_id in node_ids {
653        let node_id = *node_id;
654        let endpoint = assets::sse_endpoint(node_id);
655        let layout = layout.clone();
656        let health = Arc::clone(&health);
657        let state = Arc::clone(&state);
658        tokio::spawn(async move {
659            run_sse_listener(node_id, endpoint, health, state, layout).await;
660        });
661    }
662}
663
664async fn run_sse_listener(
665    node_id: u32,
666    endpoint: String,
667    health: Arc<Mutex<SseHealth>>,
668    state: Arc<Mutex<State>>,
669    layout: AssetsLayout,
670) {
671    let mut backoff = ExponentialBackoff::default();
672
673    loop {
674        let config = match ListenerConfig::builder()
675            .with_endpoint(endpoint.clone())
676            .build()
677        {
678            Ok(config) => config,
679            Err(_) => {
680                if !sleep_backoff(&mut backoff).await {
681                    return;
682                }
683                continue;
684            }
685        };
686
687        let stream = match sse::listener(config).await {
688            Ok(stream) => {
689                backoff.reset();
690                stream
691            }
692            Err(_) => {
693                if !sleep_backoff(&mut backoff).await {
694                    return;
695                }
696                continue;
697            }
698        };
699
700        futures::pin_mut!(stream);
701        let mut stream_failed = false;
702        while let Some(event) = stream.next().await {
703            match event {
704                Ok(sse_event) => match sse_event {
705                    SseEvent::ApiVersion(version) => {
706                        record_api_version(node_id, version.to_string(), &health).await;
707                    }
708                    SseEvent::BlockAdded { block_hash, block } => {
709                        if node_id == 1
710                            && let Err(err) = record_last_block_height(&state, block.height()).await
711                        {
712                            eprintln!("warning: failed to record last block height: {}", err);
713                        }
714                        if should_log_primary(node_id, &health).await {
715                            mark_block_seen(&health, &layout).await;
716                            let prefix = timestamp_prefix();
717                            println!(
718                                "{} Block {} added (height={} era={})",
719                                prefix,
720                                block_hash,
721                                block.height(),
722                                block.era_id().value()
723                            );
724                        }
725                    }
726                    SseEvent::TransactionAccepted(transaction) => {
727                        if node_id == 1 {
728                            let prefix = timestamp_prefix();
729                            println!("{} Transaction {} accepted", prefix, transaction.hash());
730                        }
731                    }
732                    SseEvent::TransactionProcessed {
733                        transaction_hash,
734                        execution_result,
735                        messages,
736                        ..
737                    } => {
738                        if node_id == 1 {
739                            let tx_hash = transaction_hash.to_string();
740                            let prefix = timestamp_prefix();
741                            log_transaction_processed(
742                                &prefix,
743                                &tx_hash,
744                                &execution_result,
745                                &messages,
746                            );
747                        }
748                    }
749                    _ => {}
750                },
751                Err(_) => {
752                    stream_failed = true;
753                    break;
754                }
755            }
756        }
757
758        if stream_failed && !sleep_backoff(&mut backoff).await {
759            return;
760        }
761    }
762}
763
764async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
765    let (summary, details, sse_spinner) = {
766        let mut state = health.lock().await;
767        if !state.expected_nodes.contains(&node_id) {
768            return;
769        }
770        state.versions.insert(node_id, version);
771        if state.announced || state.versions.len() != state.expected_nodes.len() {
772            return;
773        }
774
775        let summary = version_summary(&state.versions);
776        let details = state.details.clone();
777        let sse_spinner = state.sse_spinner.take();
778        if state.block_spinner.is_none() {
779            state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
780        }
781        state.announced = true;
782        state.block_seen = false;
783        (summary, details, sse_spinner)
784    };
785
786    if let Some(mut spinner) = sse_spinner {
787        spinner.stop_with_message("SSE connection established.".to_string());
788    }
789    println!("Network is healthy ({})", summary);
790    println!("{}", details);
791}
792
793async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
794    let (block_spinner, node_ids) = {
795        let mut state = health.lock().await;
796        if state.block_seen {
797            return;
798        }
799        state.block_seen = true;
800        (
801            state.block_spinner.take(),
802            state.expected_nodes.iter().copied().collect::<Vec<_>>(),
803        )
804    };
805
806    if let Some(mut spinner) = block_spinner {
807        spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
808    }
809
810    match assets::remove_consensus_keys(layout, &node_ids).await {
811        Ok(removed) => {
812            if removed > 0 {
813                println!("Consensus secret keys removed from disk.");
814            }
815        }
816        Err(err) => {
817            eprintln!("warning: failed to remove consensus secret keys: {}", err);
818        }
819    }
820}
821
822async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
823    let mut state = state.lock().await;
824    if state.last_block_height == Some(height) {
825        return Ok(());
826    }
827    state.last_block_height = Some(height);
828    state.touch().await?;
829    Ok(())
830}
831
832fn version_summary(versions: &HashMap<u32, String>) -> String {
833    let mut unique: Vec<String> = versions.values().cloned().collect();
834    unique.sort();
835    unique.dedup();
836    if unique.len() == 1 {
837        format!("version {}", unique[0])
838    } else {
839        format!("versions {}", unique.join(", "))
840    }
841}
842
843async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
844    if let Some(delay) = backoff.next_backoff() {
845        tokio::time::sleep(delay).await;
846        return true;
847    }
848    false
849}
850
851fn log_transaction_processed(
852    prefix: &str,
853    transaction_hash: &str,
854    execution_result: &ExecutionResult,
855    messages: &[casper_types::contract_messages::Message],
856) {
857    let consumed = execution_result.consumed();
858    let consumed_cspr = format_cspr_u512(&consumed);
859    if let Some(error) = execution_result.error_message() {
860        println!(
861            "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
862            prefix, transaction_hash, error, consumed, consumed_cspr
863        );
864    } else {
865        println!(
866            "{} Transaction {} processed succeeded gas={} gas_cspr={}",
867            prefix, transaction_hash, consumed, consumed_cspr
868        );
869    }
870
871    for message in messages {
872        let entity = message.entity_addr().to_formatted_string();
873        let topic = message.topic_name();
874        let payload = format_message_payload(message.payload());
875        println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
876    }
877}
878
879fn timestamp_prefix() -> String {
880    time::OffsetDateTime::now_utc()
881        .format(&time::format_description::well_known::Rfc3339)
882        .unwrap_or_else(|_| "unknown-time".to_string())
883}
884
885fn format_message_payload(payload: &MessagePayload) -> String {
886    match payload {
887        MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
888        MessagePayload::String(value) => format!("{:?}", value),
889    }
890}
891
892fn encode_hex(bytes: &[u8]) -> String {
893    let mut out = String::with_capacity(bytes.len() * 2);
894    for byte in bytes {
895        use std::fmt::Write;
896        let _ = write!(&mut out, "{:02x}", byte);
897    }
898    out
899}
900
901fn format_cspr_u512(motes: &U512) -> String {
902    let motes_str = motes.to_string();
903    let digits = motes_str.len();
904    if digits <= 9 {
905        let frac = format!("{:0>9}", motes_str);
906        let frac = frac.trim_end_matches('0');
907        if frac.is_empty() {
908            return "0".to_string();
909        }
910        return format!("0.{}", frac);
911    }
912
913    let split = digits - 9;
914    let (whole, frac) = motes_str.split_at(split);
915    let frac = frac.trim_end_matches('0');
916    if frac.is_empty() {
917        return whole.to_string();
918    }
919    format!("{}.{}", whole, frac)
920}
921
922async fn update_exited_process(
923    state: &mut State,
924    id: &str,
925    code: Option<i32>,
926    signal: Option<i32>,
927) -> Result<()> {
928    for record in &mut state.processes {
929        if record.id == id {
930            record.last_status = ProcessStatus::Exited;
931            record.exit_code = code;
932            record.exit_signal = signal;
933            record.stopped_at = Some(time::OffsetDateTime::now_utc());
934            break;
935        }
936    }
937    state.touch().await?;
938    Ok(())
939}
940
941fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
942    if let Some(pid) = pid {
943        if let Some(signal) = signal {
944            println!(
945                "process {} (pid {}) exited due to signal {}",
946                id, pid, signal
947            );
948        } else if let Some(code) = code {
949            println!("process {} (pid {}) exited with code {}", id, pid, code);
950        } else {
951            println!("process {} (pid {}) exited", id, pid);
952        }
953    } else if let Some(signal) = signal {
954        println!("process {} exited due to signal {}", id, signal);
955    } else if let Some(code) = code {
956        println!("process {} exited with code {}", id, code);
957    } else {
958        println!("process {} exited", id);
959    }
960}
961
962async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
963    let total_nodes = layout.count_nodes().await.unwrap_or(0);
964    let target = format!("all nodes ({})", total_nodes);
965    let sidecars = processes
966        .iter()
967        .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
968        .count();
969    println!(
970        "started {} process(es) for {} (sidecars: {})",
971        processes.len(),
972        target,
973        sidecars
974    );
975}
976
977fn looks_like_url(path: &Path) -> bool {
978    let value = path.to_string_lossy();
979    value.starts_with("http://") || value.starts_with("https://")
980}
981
982async fn is_dir(path: &Path) -> bool {
983    tokio_fs::metadata(path)
984        .await
985        .map(|meta| meta.is_dir())
986        .unwrap_or(false)
987}
988
989async fn run_assets(args: AssetsArgs) -> Result<()> {
990    match args.command {
991        AssetsCommand::Add(add) => run_assets_add(add).await,
992        AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
993        AssetsCommand::List => run_assets_list().await,
994    }
995}
996
997async fn run_is_ready(args: IsReadyArgs) -> Result<()> {
998    let assets_root = match &args.net_path {
999        Some(path) => path.clone(),
1000        None => assets::default_assets_root()?,
1001    };
1002    let layout = AssetsLayout::new(assets_root, args.network_name);
1003    let argv0 = std::env::args()
1004        .next()
1005        .unwrap_or_else(|| "casper-devnet".to_string());
1006    let setup_cmd = format!("{} start --setup-only", argv0);
1007    let net_dir = layout.net_dir();
1008    if !is_dir(&net_dir).await {
1009        return Err(anyhow!(
1010            "assets for {} not found; run `{}`",
1011            layout.network_name(),
1012            setup_cmd
1013        ));
1014    }
1015
1016    let state_path = net_dir.join(STATE_FILE_NAME);
1017    let contents = match tokio_fs::read_to_string(&state_path).await {
1018        Ok(contents) => contents,
1019        Err(_) => return Err(anyhow!("network is not ready yet")),
1020    };
1021    let state =
1022        match tokio::task::spawn_blocking(move || serde_json::from_str::<State>(&contents)).await {
1023            Ok(Ok(state)) => state,
1024            _ => return Err(anyhow!("network is not ready yet")),
1025        };
1026
1027    ensure_processes_running(&state)?;
1028
1029    if state.last_block_height.is_none() {
1030        return Err(anyhow!("network is not ready yet"));
1031    }
1032
1033    ensure_rest_ready(&state).await?;
1034    Ok(())
1035}
1036
1037async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
1038    if looks_like_url(&args.path) {
1039        return Err(anyhow!(
1040            "assets URL is not supported yet; provide a local .tar.gz path"
1041        ));
1042    }
1043    assets::install_assets_bundle(&args.path).await?;
1044    println!(
1045        "assets installed into {}",
1046        assets::assets_bundle_root()?.display()
1047    );
1048    Ok(())
1049}
1050
1051async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
1052    assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
1053    Ok(())
1054}
1055
1056async fn run_assets_list() -> Result<()> {
1057    let mut versions = assets::list_bundle_versions().await?;
1058    if versions.is_empty() {
1059        return Err(anyhow!("no assets bundles found"));
1060    }
1061    versions.sort_by(|a, b| b.cmp(a));
1062    for version in versions {
1063        println!("{}", version);
1064    }
1065    Ok(())
1066}
1067
1068async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
1069    if let Some(raw) = candidate {
1070        let version = assets::parse_protocol_version(raw)?;
1071        if !assets::has_bundle_version(&version).await? {
1072            let argv0 = std::env::args()
1073                .next()
1074                .unwrap_or_else(|| "casper-devnet".to_string());
1075            let pull_cmd = format!("{} assets pull", argv0);
1076            let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1077            return Err(anyhow!(
1078                "assets for version {} not found; run `{}` or `{}`",
1079                version,
1080                pull_cmd,
1081                add_cmd
1082            ));
1083        }
1084        return Ok(version.to_string());
1085    }
1086    let versions = assets::list_bundle_versions().await?;
1087    if versions.is_empty() {
1088        let argv0 = std::env::args()
1089            .next()
1090            .unwrap_or_else(|| "casper-devnet".to_string());
1091        let pull_cmd = format!("{} assets pull", argv0);
1092        let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1093        return Err(anyhow!(
1094            "no assets found; run `{}` or `{}`",
1095            pull_cmd,
1096            add_cmd
1097        ));
1098    }
1099    let version = versions
1100        .into_iter()
1101        .max()
1102        .expect("non-empty assets versions");
1103    Ok(version.to_string())
1104}
1105
1106fn ensure_processes_running(state: &State) -> Result<()> {
1107    if state.processes.is_empty() {
1108        return Err(anyhow!("network is not ready yet"));
1109    }
1110    for process in &state.processes {
1111        if !matches!(process.last_status, ProcessStatus::Running) {
1112            return Err(anyhow!("network is not ready yet"));
1113        }
1114        let pid = match process.pid {
1115            Some(pid) => pid,
1116            None => return Err(anyhow!("network is not ready yet")),
1117        };
1118        if !is_pid_running(pid) {
1119            return Err(anyhow!("network is not ready yet"));
1120        }
1121    }
1122    Ok(())
1123}
1124
1125fn is_pid_running(pid: u32) -> bool {
1126    let pid = Pid::from_raw(pid as i32);
1127    match kill(pid, None) {
1128        Ok(()) => true,
1129        Err(Errno::ESRCH) => false,
1130        Err(_) => true,
1131    }
1132}
1133
1134async fn ensure_rest_ready(state: &State) -> Result<()> {
1135    let node_ids: HashSet<u32> = state
1136        .processes
1137        .iter()
1138        .filter_map(|process| {
1139            if matches!(process.kind, ProcessKind::Node) {
1140                Some(process.node_id)
1141            } else {
1142                None
1143            }
1144        })
1145        .collect();
1146    if node_ids.is_empty() {
1147        return Err(anyhow!("network is not ready yet"));
1148    }
1149
1150    let client = reqwest::Client::builder()
1151        .no_proxy()
1152        .timeout(Duration::from_secs(2))
1153        .build()?;
1154
1155    for node_id in node_ids {
1156        let url = format!("{}/status", assets::rest_endpoint(node_id));
1157        let response = match client.get(&url).send().await {
1158            Ok(response) => response,
1159            Err(_) => return Err(anyhow!("network is not ready yet")),
1160        };
1161        if response.status() != reqwest::StatusCode::OK {
1162            return Err(anyhow!("network is not ready yet"));
1163        }
1164        let status = match response.json::<NodeStatus>().await {
1165            Ok(status) => status,
1166            Err(_) => return Err(anyhow!("network is not ready yet")),
1167        };
1168        if !status
1169            .reactor_state
1170            .as_deref()
1171            .map(is_ready_reactor_state)
1172            .unwrap_or(false)
1173        {
1174            return Err(anyhow!("network is not ready yet"));
1175        }
1176    }
1177    Ok(())
1178}
1179
1180fn is_ready_reactor_state(state: &str) -> bool {
1181    state == "Validate"
1182}
1183
1184#[derive(Deserialize)]
1185#[allow(dead_code)]
1186#[serde(rename_all = "snake_case")]
1187struct NodeStatus {
1188    peers: Option<Vec<NodePeer>>,
1189    api_version: Option<String>,
1190    build_version: Option<String>,
1191    chainspec_name: Option<String>,
1192    starting_state_root_hash: Option<String>,
1193    last_added_block_info: Option<serde_json::Value>,
1194    our_public_signing_key: Option<String>,
1195    round_length: Option<serde_json::Value>,
1196    next_upgrade: Option<serde_json::Value>,
1197    uptime: Option<String>,
1198    reactor_state: Option<String>,
1199    last_progress: Option<String>,
1200    available_block_range: Option<BlockRange>,
1201    block_sync: Option<BlockSync>,
1202    latest_switch_block_hash: Option<serde_json::Value>,
1203}
1204
1205#[derive(Deserialize)]
1206#[allow(dead_code)]
1207#[serde(rename_all = "snake_case")]
1208struct NodePeer {
1209    node_id: Option<String>,
1210    address: Option<String>,
1211}
1212
1213#[derive(Deserialize)]
1214#[allow(dead_code)]
1215#[serde(rename_all = "snake_case")]
1216struct BlockRange {
1217    low: Option<u64>,
1218    high: Option<u64>,
1219}
1220
1221#[derive(Deserialize)]
1222#[allow(dead_code)]
1223#[serde(rename_all = "snake_case")]
1224struct BlockSync {
1225    historical: Option<serde_json::Value>,
1226    forward: Option<serde_json::Value>,
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231    use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1232    use casper_types::U512;
1233    use casper_types::contract_messages::MessagePayload;
1234    use directories::BaseDirs;
1235
1236    #[test]
1237    fn format_cspr_u512_handles_whole_and_fractional() {
1238        assert_eq!(format_cspr_u512(&U512::zero()), "0");
1239        assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1240        assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1241        assert_eq!(
1242            format_cspr_u512(&U512::from(1_000_000_001u64)),
1243            "1.000000001"
1244        );
1245        assert_eq!(
1246            format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1247            "123"
1248        );
1249        assert_eq!(
1250            format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1251            "123.000000456"
1252        );
1253    }
1254
1255    #[test]
1256    fn format_message_payload_renders_string_with_quotes() {
1257        let payload = MessagePayload::String("hello".to_string());
1258        assert_eq!(format_message_payload(&payload), "\"hello\"");
1259    }
1260
1261    #[test]
1262    fn encode_hex_renders_lowercase() {
1263        assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1264    }
1265
1266    #[test]
1267    fn shorten_home_path_replaces_home_prefix() {
1268        let Some(base_dirs) = BaseDirs::new() else {
1269            return;
1270        };
1271        let home = base_dirs.home_dir();
1272        let shortened = shorten_home_path(&home.to_string_lossy());
1273        assert_eq!(shortened, "~");
1274
1275        let nested = home.join("devnet/logs/stdout.log");
1276        let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1277        assert!(shortened_nested.starts_with("~"));
1278        assert!(shortened_nested.contains("devnet"));
1279    }
1280
1281    #[test]
1282    fn shorten_home_path_keeps_relative_paths() {
1283        let input = "relative/path";
1284        assert_eq!(shorten_home_path(input), input);
1285    }
1286}