casper_devnet/
cli.rs

1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
3use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{Result, anyhow};
5use backoff::ExponentialBackoff;
6use backoff::backoff::Backoff;
7use casper_types::U512;
8use casper_types::contract_messages::MessagePayload;
9use casper_types::execution::ExecutionResult;
10use clap::{Args, Parser, Subcommand};
11use directories::BaseDirs;
12use futures::StreamExt;
13use spinners::{Spinner, Spinners};
14use std::collections::{HashMap, HashSet};
15use std::os::unix::process::ExitStatusExt;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19use tokio::sync::Mutex;
20use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
21use veles_casper_rust_sdk::sse::event::SseEvent;
22use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
23
24/// CLI entrypoint for the devnet launcher.
25#[derive(Parser)]
26#[command(name = "nctl")]
27#[command(
28    about = "casper-devnet launcher for local Casper Network development networks",
29    long_about = None
30)]
31pub struct Cli {
32    #[command(subcommand)]
33    command: Command,
34}
35
36/// Top-level CLI subcommands.
37#[derive(Subcommand)]
38enum Command {
39    /// Setup assets (if needed) and start the devnet.
40    Start(StartArgs),
41    /// Manage assets bundles.
42    Assets(AssetsArgs),
43}
44
45/// Arguments for `nctl start`.
46#[derive(Args, Clone)]
47struct StartArgs {
48    /// Network name used in assets paths and configs.
49    #[arg(long, default_value = "casper-dev")]
50    network_name: String,
51
52    /// Override the base path for network runtime assets.
53    #[arg(long, value_name = "PATH")]
54    net_path: Option<PathBuf>,
55
56    /// Protocol version to use from the assets store (e.g. 2.1.1).
57    #[arg(long)]
58    protocol_version: Option<String>,
59
60    /// Number of nodes to create and start.
61    #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
62    node_count: u32,
63
64    /// Number of user accounts to generate (defaults to node count).
65    #[arg(long)]
66    users: Option<u32>,
67
68    /// Genesis activation delay in seconds.
69    #[arg(long, default_value_t = 3)]
70    delay: u64,
71
72    /// Log level for child processes (passed as `RUST_LOG`).
73    #[arg(long = "log-level", default_value = "info")]
74    log_level: String,
75
76    /// Log format for node config files.
77    #[arg(long, default_value = "json")]
78    node_log_format: String,
79
80    /// Create assets and exit without starting processes.
81    #[arg(long)]
82    setup_only: bool,
83
84    /// Rebuild assets even if they already exist.
85    #[arg(long)]
86    force_setup: bool,
87
88    /// Deterministic seed for devnet key generation.
89    #[arg(long, default_value = "default")]
90    seed: Arc<str>,
91}
92
93/// Asset management arguments.
94#[derive(Args)]
95struct AssetsArgs {
96    #[command(subcommand)]
97    command: AssetsCommand,
98}
99
100/// Asset management subcommands.
101#[derive(Subcommand)]
102enum AssetsCommand {
103    /// Extract a local assets bundle into the assets store.
104    Add(AssetsAddArgs),
105    /// Download assets bundles from the upstream release.
106    Pull(AssetsPullArgs),
107    /// List available protocol versions in the assets store.
108    List,
109}
110
111/// Arguments for `nctl assets add`.
112#[derive(Args, Clone)]
113struct AssetsAddArgs {
114    /// Path to a local assets bundle (.tar.gz).
115    #[arg(value_name = "PATH")]
116    path: PathBuf,
117}
118
119/// Arguments for `nctl assets pull`.
120#[derive(Args, Clone)]
121struct AssetsPullArgs {
122    /// Target triple to select from release assets.
123    #[arg(long)]
124    target: Option<String>,
125
126    /// Re-download and replace any existing assets.
127    #[arg(long)]
128    force: bool,
129}
130
131/// Parses CLI and runs the selected subcommand.
132pub async fn run() -> Result<()> {
133    let cli = Cli::parse();
134    match cli.command {
135        Command::Start(args) => run_start(args).await,
136        Command::Assets(args) => run_assets(args).await,
137    }
138}
139
140async fn run_start(args: StartArgs) -> Result<()> {
141    let assets_root = match &args.net_path {
142        Some(path) => path.clone(),
143        None => assets::default_assets_root()?,
144    };
145    let layout = AssetsLayout::new(assets_root, args.network_name.clone());
146    let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
147    println!("assets path: {}", assets_path);
148    let assets_exist = layout.exists().await;
149    if !args.setup_only && !args.force_setup && assets_exist {
150        println!("resuming network operations on {}", layout.network_name());
151    }
152    let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
153
154    if args.setup_only {
155        return run_setup_only(&layout, &args, &protocol_version).await;
156    }
157
158    if args.force_setup {
159        assets::teardown(&layout).await?;
160        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
161    } else if !assets_exist {
162        assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
163    }
164
165    if !layout.exists().await {
166        return Err(anyhow!(
167            "assets missing under {}; run with --setup-only to create them",
168            shorten_home_path(&layout.net_dir().display().to_string())
169        ));
170    }
171
172    if !args.force_setup && assets_exist {
173        let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
174        if restored > 0 {
175            println!("recreated consensus keys for {} node(s)", restored);
176        }
177    }
178
179    let rust_log = args.log_level.clone();
180
181    let plan = StartPlan { rust_log };
182
183    let state_path = layout.net_dir().join("state.json");
184    let state = Arc::new(Mutex::new(State::new(state_path).await?));
185    let started = {
186        let mut state = state.lock().await;
187        process::start(&layout, &plan, &mut state).await?
188    };
189
190    print_pids(&started);
191    print_start_banner(&layout, &started).await;
192    print_derived_accounts_summary(&layout).await;
193
194    let node_ids = unique_node_ids(&started);
195    let details = format_network_details(&layout, &started).await;
196    let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
197    start_sse_spinner(&health).await;
198    spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
199
200    let (event_tx, mut event_rx) = unbounded_channel();
201    spawn_ctrlc_listener(event_tx.clone());
202    spawn_exit_watchers(started, event_tx);
203
204    if let Some(event) = event_rx.recv().await {
205        match event {
206            RunEvent::CtrlC => {
207                let mut state = state.lock().await;
208                process::stop(&mut state).await?;
209            }
210            RunEvent::ProcessExit {
211                id,
212                pid,
213                code,
214                signal,
215            } => {
216                let mut state = state.lock().await;
217                update_exited_process(&mut state, &id, code, signal).await?;
218                log_exit(&id, pid, code, signal);
219                process::stop(&mut state).await?;
220            }
221        }
222    }
223
224    Ok(())
225}
226
227async fn run_setup_only(
228    layout: &AssetsLayout,
229    args: &StartArgs,
230    protocol_version: &str,
231) -> Result<()> {
232    if args.force_setup {
233        assets::teardown(layout).await?;
234        assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
235        print_derived_accounts_summary(layout).await;
236        return Ok(());
237    }
238
239    if layout.exists().await {
240        println!(
241            "assets already exist at {}; use --force-setup to rebuild",
242            shorten_home_path(&layout.net_dir().display().to_string())
243        );
244        print_derived_accounts_summary(layout).await;
245        return Ok(());
246    }
247
248    assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
249    print_derived_accounts_summary(layout).await;
250    Ok(())
251}
252
253fn record_pid(record: &ProcessRecord) -> Option<u32> {
254    if let Some(handle) = &record.pid_handle {
255        let pid = handle.load(Ordering::SeqCst);
256        if pid != 0 {
257            return Some(pid);
258        }
259    }
260    record.pid
261}
262
263fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
264    SetupOptions {
265        nodes: args.node_count,
266        users: args.users,
267        delay_seconds: args.delay,
268        network_name: args.network_name.clone(),
269        protocol_version: protocol_version.to_string(),
270        node_log_format: args.node_log_format.clone(),
271        seed: Arc::clone(&args.seed),
272    }
273}
274
275fn print_pids(records: &[RunningProcess]) {
276    for record in records {
277        if let Some(pid) = record_pid(&record.record) {
278            println!(
279                "{} pid={} ({:?})",
280                record.record.id, pid, record.record.kind
281            );
282        }
283    }
284}
285
286async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
287    let symlink_root = layout.net_dir();
288    let mut node_pids: HashMap<u32, u32> = HashMap::new();
289    let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
290    let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
291
292    for process in processes {
293        if let Some(pid) = record_pid(&process.record) {
294            match process.record.kind {
295                ProcessKind::Node => {
296                    node_pids.insert(process.record.node_id, pid);
297                }
298                ProcessKind::Sidecar => {
299                    sidecar_pids.insert(process.record.node_id, pid);
300                }
301            }
302            process_logs
303                .entry(process.record.node_id)
304                .or_default()
305                .push((process.record.kind.clone(), pid));
306        }
307    }
308
309    let node_ids = unique_node_ids(processes);
310
311    let mut lines = Vec::new();
312    lines.push("network details".to_string());
313    for node_id in node_ids {
314        let node_pid = node_pids
315            .get(&node_id)
316            .map(|pid| pid.to_string())
317            .unwrap_or_else(|| "-".to_string());
318        let sidecar_pid = sidecar_pids
319            .get(&node_id)
320            .map(|pid| pid.to_string())
321            .unwrap_or_else(|| "-".to_string());
322        lines.push(format!("  node-{}", node_id));
323        lines.push(format!(
324            "    pids: node={} sidecar={}",
325            node_pid, sidecar_pid
326        ));
327        if let Some(entries) = process_logs.get(&node_id) {
328            let mut entries = entries.clone();
329            entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
330            lines.push("    logs".to_string());
331            for (kind, pid) in entries {
332                let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
333                lines.push(format!(
334                    "      {} pid={} stdout={} stderr={}",
335                    process_kind_label(&kind),
336                    pid,
337                    stdout_link,
338                    stderr_link
339                ));
340            }
341        }
342        lines.push("    endpoints".to_string());
343        lines.push(format!("      rest:   {}", assets::rest_endpoint(node_id)));
344        lines.push(format!("      sse:    {}", assets::sse_endpoint(node_id)));
345        lines.push(format!("      rpc:    {}", assets::rpc_endpoint(node_id)));
346        lines.push(format!("      binary: {}", assets::binary_address(node_id)));
347        lines.push(format!(
348            "      gossip: {}",
349            assets::network_address(node_id)
350        ));
351    }
352
353    lines.join("\n")
354}
355
356fn process_kind_label(kind: &ProcessKind) -> &'static str {
357    match kind {
358        ProcessKind::Node => "node",
359        ProcessKind::Sidecar => "sidecar",
360    }
361}
362
363fn shorten_home_path(path: &str) -> String {
364    let path = Path::new(path);
365    let Some(base_dirs) = BaseDirs::new() else {
366        return path.display().to_string();
367    };
368    let home = base_dirs.home_dir();
369    match path.strip_prefix(home) {
370        Ok(stripped) => {
371            if stripped.as_os_str().is_empty() {
372                return "~".to_string();
373            }
374            let mut shorthand = PathBuf::from("~");
375            shorthand.push(stripped);
376            shorthand.display().to_string()
377        }
378        Err(_) => path.display().to_string(),
379    }
380}
381
382fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
383    let base = match kind {
384        ProcessKind::Node => format!("node-{}", node_id),
385        ProcessKind::Sidecar => format!("sidecar-{}", node_id),
386    };
387    let stdout_link = symlink_root.join(format!("{}.stdout", base));
388    let stderr_link = symlink_root.join(format!("{}.stderr", base));
389    (
390        shorten_home_path(&stdout_link.display().to_string()),
391        shorten_home_path(&stderr_link.display().to_string()),
392    )
393}
394
395async fn print_derived_accounts_summary(layout: &AssetsLayout) {
396    if let Some(summary) = assets::derived_accounts_summary(layout).await {
397        if let Some(parsed) = parse_derived_accounts_csv(&summary) {
398            println!("derived accounts");
399            if !parsed.validators.is_empty() {
400                println!("  validators");
401                print_account_group(&parsed.validators);
402            }
403            if !parsed.users.is_empty() {
404                println!("  users");
405                print_account_group(&parsed.users);
406            }
407            if !parsed.other.is_empty() {
408                println!("  other");
409                print_account_group(&parsed.other);
410            }
411        } else {
412            println!("derived accounts");
413            for line in summary.lines() {
414                println!("  {}", line);
415            }
416        }
417    }
418}
419
420struct DerivedAccountRow {
421    name: String,
422    key_type: String,
423    derivation: String,
424    path: String,
425    account_hash: String,
426    balance: String,
427}
428
429struct DerivedAccountsParsed {
430    validators: Vec<DerivedAccountRow>,
431    users: Vec<DerivedAccountRow>,
432    other: Vec<DerivedAccountRow>,
433}
434
435fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
436    let mut lines = summary.lines();
437    let header = lines.next()?.trim();
438    if header != "kind,name,key_type,derivation,path,account_hash,balance" {
439        return None;
440    }
441
442    let mut parsed = DerivedAccountsParsed {
443        validators: Vec::new(),
444        users: Vec::new(),
445        other: Vec::new(),
446    };
447
448    for line in lines {
449        let line = line.trim();
450        if line.is_empty() {
451            continue;
452        }
453        let mut parts = line.splitn(7, ',');
454        let kind = parts.next()?.to_string();
455        let name = parts.next()?.to_string();
456        let key_type = parts.next()?.to_string();
457        let derivation = parts.next()?.to_string();
458        let path = parts.next()?.to_string();
459        let account_hash = parts.next()?.to_string();
460        let balance = parts.next()?.to_string();
461        let row = DerivedAccountRow {
462            name,
463            key_type,
464            derivation,
465            path,
466            account_hash,
467            balance,
468        };
469        match kind.as_str() {
470            "validator" => parsed.validators.push(row),
471            "user" => parsed.users.push(row),
472            _ => parsed.other.push(row),
473        }
474    }
475
476    Some(parsed)
477}
478
479fn print_account_group(rows: &[DerivedAccountRow]) {
480    for row in rows {
481        println!("    {}:", row.name);
482        println!("      key_type: {}", row.key_type);
483        println!("      derivation: {}", row.derivation);
484        println!("      path: {}", row.path);
485        println!("      account_hash: {}", row.account_hash);
486        println!("      balance: {}", row.balance);
487    }
488}
489
490fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
491    let mut nodes = HashSet::new();
492    for process in processes {
493        nodes.insert(process.record.node_id);
494    }
495    let mut ids: Vec<u32> = nodes.into_iter().collect();
496    ids.sort_unstable();
497    ids
498}
499
500enum RunEvent {
501    CtrlC,
502    ProcessExit {
503        id: String,
504        pid: Option<u32>,
505        code: Option<i32>,
506        signal: Option<i32>,
507    },
508}
509
510fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
511    tokio::spawn(async move {
512        if tokio::signal::ctrl_c().await.is_ok() {
513            let _ = tx.send(RunEvent::CtrlC);
514        }
515    });
516}
517
518fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
519    for running in processes {
520        let tx = tx.clone();
521        tokio::spawn(async move {
522            let id = running.record.id.clone();
523            match running.handle {
524                ProcessHandle::Child(mut child) => {
525                    if let Ok(status) = child.wait().await {
526                        let pid = record_pid(&running.record).or_else(|| child.id());
527                        let code = status.code();
528                        let signal = status.signal();
529                        let _ = tx.send(RunEvent::ProcessExit {
530                            id: id.clone(),
531                            pid,
532                            code,
533                            signal,
534                        });
535                    }
536                }
537                ProcessHandle::Task(handle) => {
538                    let status = handle.await;
539                    let pid = record_pid(&running.record);
540                    let (code, signal) = match status {
541                        Ok(Ok(())) => (Some(0), None),
542                        Ok(Err(_)) => (None, None),
543                        Err(_) => (None, None),
544                    };
545                    let _ = tx.send(RunEvent::ProcessExit {
546                        id: id.clone(),
547                        pid,
548                        code,
549                        signal,
550                    });
551                }
552            }
553        });
554    }
555}
556
557const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
558const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
559
560struct SseHealth {
561    expected_nodes: HashSet<u32>,
562    versions: HashMap<u32, String>,
563    announced: bool,
564    block_seen: bool,
565    sse_spinner: Option<Spinner>,
566    block_spinner: Option<Spinner>,
567    details: String,
568}
569
570impl SseHealth {
571    fn new(node_ids: Vec<u32>, details: String) -> Self {
572        Self {
573            expected_nodes: node_ids.into_iter().collect(),
574            versions: HashMap::new(),
575            announced: false,
576            block_seen: false,
577            sse_spinner: None,
578            block_spinner: None,
579            details,
580        }
581    }
582}
583
584async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
585    if node_id != 1 {
586        return false;
587    }
588    let state = health.lock().await;
589    state.announced
590}
591
592fn start_spinner(message: &str) -> Spinner {
593    Spinner::new(Spinners::Dots, message.to_string())
594}
595
596async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
597    let mut state = health.lock().await;
598    if state.sse_spinner.is_none() {
599        state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
600    }
601}
602
603async fn spawn_sse_listeners(
604    layout: AssetsLayout,
605    node_ids: &[u32],
606    health: Arc<Mutex<SseHealth>>,
607    state: Arc<Mutex<State>>,
608) {
609    for node_id in node_ids {
610        let node_id = *node_id;
611        let endpoint = assets::sse_endpoint(node_id);
612        let layout = layout.clone();
613        let health = Arc::clone(&health);
614        let state = Arc::clone(&state);
615        tokio::spawn(async move {
616            run_sse_listener(node_id, endpoint, health, state, layout).await;
617        });
618    }
619}
620
621async fn run_sse_listener(
622    node_id: u32,
623    endpoint: String,
624    health: Arc<Mutex<SseHealth>>,
625    state: Arc<Mutex<State>>,
626    layout: AssetsLayout,
627) {
628    let mut backoff = ExponentialBackoff::default();
629
630    loop {
631        let config = match ListenerConfig::builder()
632            .with_endpoint(endpoint.clone())
633            .build()
634        {
635            Ok(config) => config,
636            Err(_) => {
637                if !sleep_backoff(&mut backoff).await {
638                    return;
639                }
640                continue;
641            }
642        };
643
644        let stream = match sse::listener(config).await {
645            Ok(stream) => {
646                backoff.reset();
647                stream
648            }
649            Err(_) => {
650                if !sleep_backoff(&mut backoff).await {
651                    return;
652                }
653                continue;
654            }
655        };
656
657        futures::pin_mut!(stream);
658        let mut stream_failed = false;
659        while let Some(event) = stream.next().await {
660            match event {
661                Ok(sse_event) => match sse_event {
662                    SseEvent::ApiVersion(version) => {
663                        record_api_version(node_id, version.to_string(), &health).await;
664                    }
665                    SseEvent::BlockAdded { block_hash, block } => {
666                        if node_id == 1
667                            && let Err(err) = record_last_block_height(&state, block.height()).await
668                        {
669                            eprintln!("warning: failed to record last block height: {}", err);
670                        }
671                        if should_log_primary(node_id, &health).await {
672                            mark_block_seen(&health, &layout).await;
673                            let prefix = timestamp_prefix();
674                            println!(
675                                "{} Block {} added (height={} era={})",
676                                prefix,
677                                block_hash,
678                                block.height(),
679                                block.era_id().value()
680                            );
681                        }
682                    }
683                    SseEvent::TransactionAccepted(transaction) => {
684                        if node_id == 1 {
685                            let prefix = timestamp_prefix();
686                            println!("{} Transaction {} accepted", prefix, transaction.hash());
687                        }
688                    }
689                    SseEvent::TransactionProcessed {
690                        transaction_hash,
691                        execution_result,
692                        messages,
693                        ..
694                    } => {
695                        if node_id == 1 {
696                            let tx_hash = transaction_hash.to_string();
697                            let prefix = timestamp_prefix();
698                            log_transaction_processed(
699                                &prefix,
700                                &tx_hash,
701                                &execution_result,
702                                &messages,
703                            );
704                        }
705                    }
706                    _ => {}
707                },
708                Err(_) => {
709                    stream_failed = true;
710                    break;
711                }
712            }
713        }
714
715        if stream_failed && !sleep_backoff(&mut backoff).await {
716            return;
717        }
718    }
719}
720
721async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
722    let (summary, details, sse_spinner) = {
723        let mut state = health.lock().await;
724        if !state.expected_nodes.contains(&node_id) {
725            return;
726        }
727        state.versions.insert(node_id, version);
728        if state.announced || state.versions.len() != state.expected_nodes.len() {
729            return;
730        }
731
732        let summary = version_summary(&state.versions);
733        let details = state.details.clone();
734        let sse_spinner = state.sse_spinner.take();
735        if state.block_spinner.is_none() {
736            state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
737        }
738        state.announced = true;
739        state.block_seen = false;
740        (summary, details, sse_spinner)
741    };
742
743    if let Some(mut spinner) = sse_spinner {
744        spinner.stop_with_message("SSE connection established.".to_string());
745    }
746    println!("Network is healthy ({})", summary);
747    println!("{}", details);
748}
749
750async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
751    let (block_spinner, node_ids) = {
752        let mut state = health.lock().await;
753        if state.block_seen {
754            return;
755        }
756        state.block_seen = true;
757        (
758            state.block_spinner.take(),
759            state.expected_nodes.iter().copied().collect::<Vec<_>>(),
760        )
761    };
762
763    if let Some(mut spinner) = block_spinner {
764        spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
765    }
766
767    match assets::remove_consensus_keys(layout, &node_ids).await {
768        Ok(removed) => {
769            if removed > 0 {
770                println!("Consensus secret keys removed from disk.");
771            }
772        }
773        Err(err) => {
774            eprintln!("warning: failed to remove consensus secret keys: {}", err);
775        }
776    }
777}
778
779async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
780    let mut state = state.lock().await;
781    if state.last_block_height == Some(height) {
782        return Ok(());
783    }
784    state.last_block_height = Some(height);
785    state.touch().await?;
786    Ok(())
787}
788
789fn version_summary(versions: &HashMap<u32, String>) -> String {
790    let mut unique: Vec<String> = versions.values().cloned().collect();
791    unique.sort();
792    unique.dedup();
793    if unique.len() == 1 {
794        format!("version {}", unique[0])
795    } else {
796        format!("versions {}", unique.join(", "))
797    }
798}
799
800async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
801    if let Some(delay) = backoff.next_backoff() {
802        tokio::time::sleep(delay).await;
803        return true;
804    }
805    false
806}
807
808fn log_transaction_processed(
809    prefix: &str,
810    transaction_hash: &str,
811    execution_result: &ExecutionResult,
812    messages: &[casper_types::contract_messages::Message],
813) {
814    let consumed = execution_result.consumed();
815    let consumed_cspr = format_cspr_u512(&consumed);
816    if let Some(error) = execution_result.error_message() {
817        println!(
818            "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
819            prefix, transaction_hash, error, consumed, consumed_cspr
820        );
821    } else {
822        println!(
823            "{} Transaction {} processed succeeded gas={} gas_cspr={}",
824            prefix, transaction_hash, consumed, consumed_cspr
825        );
826    }
827
828    for message in messages {
829        let entity = message.entity_addr().to_formatted_string();
830        let topic = message.topic_name();
831        let payload = format_message_payload(message.payload());
832        println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
833    }
834}
835
836fn timestamp_prefix() -> String {
837    time::OffsetDateTime::now_utc()
838        .format(&time::format_description::well_known::Rfc3339)
839        .unwrap_or_else(|_| "unknown-time".to_string())
840}
841
842fn format_message_payload(payload: &MessagePayload) -> String {
843    match payload {
844        MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
845        MessagePayload::String(value) => format!("{:?}", value),
846    }
847}
848
849fn encode_hex(bytes: &[u8]) -> String {
850    let mut out = String::with_capacity(bytes.len() * 2);
851    for byte in bytes {
852        use std::fmt::Write;
853        let _ = write!(&mut out, "{:02x}", byte);
854    }
855    out
856}
857
858fn format_cspr_u512(motes: &U512) -> String {
859    let motes_str = motes.to_string();
860    let digits = motes_str.len();
861    if digits <= 9 {
862        let frac = format!("{:0>9}", motes_str);
863        let frac = frac.trim_end_matches('0');
864        if frac.is_empty() {
865            return "0".to_string();
866        }
867        return format!("0.{}", frac);
868    }
869
870    let split = digits - 9;
871    let (whole, frac) = motes_str.split_at(split);
872    let frac = frac.trim_end_matches('0');
873    if frac.is_empty() {
874        return whole.to_string();
875    }
876    format!("{}.{}", whole, frac)
877}
878
879async fn update_exited_process(
880    state: &mut State,
881    id: &str,
882    code: Option<i32>,
883    signal: Option<i32>,
884) -> Result<()> {
885    for record in &mut state.processes {
886        if record.id == id {
887            record.last_status = ProcessStatus::Exited;
888            record.exit_code = code;
889            record.exit_signal = signal;
890            record.stopped_at = Some(time::OffsetDateTime::now_utc());
891            break;
892        }
893    }
894    state.touch().await?;
895    Ok(())
896}
897
898fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
899    if let Some(pid) = pid {
900        if let Some(signal) = signal {
901            println!(
902                "process {} (pid {}) exited due to signal {}",
903                id, pid, signal
904            );
905        } else if let Some(code) = code {
906            println!("process {} (pid {}) exited with code {}", id, pid, code);
907        } else {
908            println!("process {} (pid {}) exited", id, pid);
909        }
910    } else if let Some(signal) = signal {
911        println!("process {} exited due to signal {}", id, signal);
912    } else if let Some(code) = code {
913        println!("process {} exited with code {}", id, code);
914    } else {
915        println!("process {} exited", id);
916    }
917}
918
919async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
920    let total_nodes = layout.count_nodes().await.unwrap_or(0);
921    let target = format!("all nodes ({})", total_nodes);
922    let sidecars = processes
923        .iter()
924        .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
925        .count();
926    println!(
927        "started {} process(es) for {} (sidecars: {})",
928        processes.len(),
929        target,
930        sidecars
931    );
932}
933
934fn looks_like_url(path: &Path) -> bool {
935    let value = path.to_string_lossy();
936    value.starts_with("http://") || value.starts_with("https://")
937}
938
939async fn run_assets(args: AssetsArgs) -> Result<()> {
940    match args.command {
941        AssetsCommand::Add(add) => run_assets_add(add).await,
942        AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
943        AssetsCommand::List => run_assets_list().await,
944    }
945}
946
947async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
948    if looks_like_url(&args.path) {
949        return Err(anyhow!(
950            "assets URL is not supported yet; provide a local .tar.gz path"
951        ));
952    }
953    assets::install_assets_bundle(&args.path).await?;
954    println!(
955        "assets installed into {}",
956        assets::assets_bundle_root()?.display()
957    );
958    Ok(())
959}
960
961async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
962    assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
963    Ok(())
964}
965
966async fn run_assets_list() -> Result<()> {
967    let mut versions = assets::list_bundle_versions().await?;
968    if versions.is_empty() {
969        println!("no assets found");
970        return Ok(());
971    }
972    versions.sort_by(|a, b| b.cmp(a));
973    for version in versions {
974        println!("{}", version);
975    }
976    Ok(())
977}
978
979async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
980    if let Some(raw) = candidate {
981        let version = assets::parse_protocol_version(raw)?;
982        if !assets::has_bundle_version(&version).await? {
983            let argv0 = std::env::args()
984                .next()
985                .unwrap_or_else(|| "casper-devnet".to_string());
986            let pull_cmd = format!("{} assets pull", argv0);
987            let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
988            return Err(anyhow!(
989                "assets for version {} not found; run `{}` or `{}`",
990                version,
991                pull_cmd,
992                add_cmd
993            ));
994        }
995        return Ok(version.to_string());
996    }
997    match assets::most_recent_bundle_version().await {
998        Ok(version) => Ok(version.to_string()),
999        Err(_) => {
1000            let argv0 = std::env::args()
1001                .next()
1002                .unwrap_or_else(|| "casper-devnet".to_string());
1003            let pull_cmd = format!("{} assets pull", argv0);
1004            let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1005            Err(anyhow!(
1006                "no assets found; run `{}` or `{}`",
1007                pull_cmd,
1008                add_cmd
1009            ))
1010        }
1011    }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1017    use casper_types::U512;
1018    use casper_types::contract_messages::MessagePayload;
1019    use directories::BaseDirs;
1020
1021    #[test]
1022    fn format_cspr_u512_handles_whole_and_fractional() {
1023        assert_eq!(format_cspr_u512(&U512::zero()), "0");
1024        assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1025        assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1026        assert_eq!(
1027            format_cspr_u512(&U512::from(1_000_000_001u64)),
1028            "1.000000001"
1029        );
1030        assert_eq!(
1031            format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1032            "123"
1033        );
1034        assert_eq!(
1035            format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1036            "123.000000456"
1037        );
1038    }
1039
1040    #[test]
1041    fn format_message_payload_renders_string_with_quotes() {
1042        let payload = MessagePayload::String("hello".to_string());
1043        assert_eq!(format_message_payload(&payload), "\"hello\"");
1044    }
1045
1046    #[test]
1047    fn encode_hex_renders_lowercase() {
1048        assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1049    }
1050
1051    #[test]
1052    fn shorten_home_path_replaces_home_prefix() {
1053        let Some(base_dirs) = BaseDirs::new() else {
1054            return;
1055        };
1056        let home = base_dirs.home_dir();
1057        let shortened = shorten_home_path(&home.to_string_lossy());
1058        assert_eq!(shortened, "~");
1059
1060        let nested = home.join("devnet/logs/stdout.log");
1061        let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1062        assert!(shortened_nested.starts_with("~"));
1063        assert!(shortened_nested.contains("devnet"));
1064    }
1065
1066    #[test]
1067    fn shorten_home_path_keeps_relative_paths() {
1068        let input = "relative/path";
1069        assert_eq!(shorten_home_path(input), input);
1070    }
1071}