1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
3use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, STATE_FILE_NAME, State};
4use anyhow::{Result, anyhow};
5use backoff::ExponentialBackoff;
6use backoff::backoff::Backoff;
7use casper_types::U512;
8use casper_types::contract_messages::MessagePayload;
9use casper_types::execution::ExecutionResult;
10use clap::{Args, Parser, Subcommand};
11use directories::BaseDirs;
12use futures::StreamExt;
13use nix::errno::Errno;
14use nix::sys::signal::kill;
15use nix::unistd::Pid;
16use serde::Deserialize;
17use spinners::{Spinner, Spinners};
18use std::collections::{HashMap, HashSet};
19use std::os::unix::process::ExitStatusExt;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::Ordering;
23use std::time::Duration;
24use tokio::fs as tokio_fs;
25use tokio::sync::Mutex;
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27use veles_casper_rust_sdk::sse::event::SseEvent;
28use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
29
30#[derive(Parser)]
32#[command(name = "nctl")]
33#[command(
34 about = "casper-devnet launcher for local Casper Network development networks",
35 long_about = None
36)]
37pub struct Cli {
38 #[command(subcommand)]
39 command: Command,
40}
41
42#[derive(Subcommand)]
44enum Command {
45 Start(StartArgs),
47 Assets(AssetsArgs),
49 IsReady(IsReadyArgs),
51}
52
53#[derive(Args, Clone)]
55struct StartArgs {
56 #[arg(long, default_value = "casper-dev")]
58 network_name: String,
59
60 #[arg(long, value_name = "PATH")]
62 net_path: Option<PathBuf>,
63
64 #[arg(long)]
66 protocol_version: Option<String>,
67
68 #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
70 node_count: u32,
71
72 #[arg(long)]
74 users: Option<u32>,
75
76 #[arg(long, default_value_t = 3)]
78 delay: u64,
79
80 #[arg(long = "log-level", default_value = "info")]
82 log_level: String,
83
84 #[arg(long, default_value = "json")]
86 node_log_format: String,
87
88 #[arg(long)]
90 setup_only: bool,
91
92 #[arg(long)]
94 force_setup: bool,
95
96 #[arg(long, default_value = "default")]
98 seed: Arc<str>,
99}
100
101#[derive(Args)]
103struct AssetsArgs {
104 #[command(subcommand)]
105 command: AssetsCommand,
106}
107
108#[derive(Subcommand)]
110enum AssetsCommand {
111 Add(AssetsAddArgs),
113 Pull(AssetsPullArgs),
115 List,
117}
118
119#[derive(Args, Clone)]
121struct AssetsAddArgs {
122 #[arg(value_name = "PATH")]
124 path: PathBuf,
125}
126
127#[derive(Args, Clone)]
129struct AssetsPullArgs {
130 #[arg(long)]
132 target: Option<String>,
133
134 #[arg(long)]
136 force: bool,
137}
138
139#[derive(Args, Clone)]
141struct IsReadyArgs {
142 #[arg(long, default_value = "casper-dev")]
144 network_name: String,
145
146 #[arg(long, value_name = "PATH")]
148 net_path: Option<PathBuf>,
149}
150
151pub async fn run() -> Result<()> {
153 let cli = Cli::parse();
154 match cli.command {
155 Command::Start(args) => run_start(args).await,
156 Command::Assets(args) => run_assets(args).await,
157 Command::IsReady(args) => run_is_ready(args).await,
158 }
159}
160
161async fn run_start(args: StartArgs) -> Result<()> {
162 let assets_root = match &args.net_path {
163 Some(path) => path.clone(),
164 None => assets::default_assets_root()?,
165 };
166 let layout = AssetsLayout::new(assets_root, args.network_name.clone());
167 let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
168 println!("assets path: {}", assets_path);
169 let assets_exist = layout.exists().await;
170 if !args.setup_only && !args.force_setup && assets_exist {
171 println!("resuming network operations on {}", layout.network_name());
172 }
173 let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
174
175 if args.setup_only {
176 return run_setup_only(&layout, &args, &protocol_version).await;
177 }
178
179 if args.force_setup {
180 assets::teardown(&layout).await?;
181 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
182 } else if !assets_exist {
183 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
184 }
185
186 if !layout.exists().await {
187 return Err(anyhow!(
188 "assets missing under {}; run with --setup-only to create them",
189 shorten_home_path(&layout.net_dir().display().to_string())
190 ));
191 }
192
193 if !args.force_setup && assets_exist {
194 let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
195 if restored > 0 {
196 println!("recreated consensus keys for {} node(s)", restored);
197 }
198 }
199
200 let rust_log = args.log_level.clone();
201
202 let plan = StartPlan { rust_log };
203
204 let state_path = layout.net_dir().join(STATE_FILE_NAME);
205 let state = Arc::new(Mutex::new(State::new(state_path).await?));
206 let started = {
207 let mut state = state.lock().await;
208 process::start(&layout, &plan, &mut state).await?
209 };
210
211 print_pids(&started);
212 print_start_banner(&layout, &started).await;
213 print_derived_accounts_summary(&layout).await;
214
215 let node_ids = unique_node_ids(&started);
216 let details = format_network_details(&layout, &started).await;
217 let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
218 start_sse_spinner(&health).await;
219 spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
220
221 let (event_tx, mut event_rx) = unbounded_channel();
222 spawn_ctrlc_listener(event_tx.clone());
223 spawn_exit_watchers(started, event_tx);
224
225 if let Some(event) = event_rx.recv().await {
226 match event {
227 RunEvent::CtrlC => {
228 let mut state = state.lock().await;
229 process::stop(&mut state).await?;
230 }
231 RunEvent::ProcessExit {
232 id,
233 pid,
234 code,
235 signal,
236 } => {
237 let mut state = state.lock().await;
238 update_exited_process(&mut state, &id, code, signal).await?;
239 log_exit(&id, pid, code, signal);
240 process::stop(&mut state).await?;
241 }
242 }
243 }
244
245 Ok(())
246}
247
248async fn run_setup_only(
249 layout: &AssetsLayout,
250 args: &StartArgs,
251 protocol_version: &str,
252) -> Result<()> {
253 if args.force_setup {
254 assets::teardown(layout).await?;
255 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
256 print_derived_accounts_summary(layout).await;
257 return Ok(());
258 }
259
260 if layout.exists().await {
261 println!(
262 "assets already exist at {}; use --force-setup to rebuild",
263 shorten_home_path(&layout.net_dir().display().to_string())
264 );
265 print_derived_accounts_summary(layout).await;
266 return Ok(());
267 }
268
269 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
270 print_derived_accounts_summary(layout).await;
271 Ok(())
272}
273
274fn record_pid(record: &ProcessRecord) -> Option<u32> {
275 if let Some(handle) = &record.pid_handle {
276 let pid = handle.load(Ordering::SeqCst);
277 if pid != 0 {
278 return Some(pid);
279 }
280 }
281 record.pid
282}
283
284fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
285 SetupOptions {
286 nodes: args.node_count,
287 users: args.users,
288 delay_seconds: args.delay,
289 network_name: args.network_name.clone(),
290 protocol_version: protocol_version.to_string(),
291 node_log_format: args.node_log_format.clone(),
292 seed: Arc::clone(&args.seed),
293 }
294}
295
296fn print_pids(records: &[RunningProcess]) {
297 for record in records {
298 if let Some(pid) = record_pid(&record.record) {
299 println!(
300 "{} pid={} ({:?})",
301 record.record.id, pid, record.record.kind
302 );
303 }
304 }
305}
306
307async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
308 let symlink_root = layout.net_dir();
309 let mut node_pids: HashMap<u32, u32> = HashMap::new();
310 let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
311 let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
312
313 for process in processes {
314 if let Some(pid) = record_pid(&process.record) {
315 match process.record.kind {
316 ProcessKind::Node => {
317 node_pids.insert(process.record.node_id, pid);
318 }
319 ProcessKind::Sidecar => {
320 sidecar_pids.insert(process.record.node_id, pid);
321 }
322 }
323 process_logs
324 .entry(process.record.node_id)
325 .or_default()
326 .push((process.record.kind.clone(), pid));
327 }
328 }
329
330 let node_ids = unique_node_ids(processes);
331
332 let mut lines = Vec::new();
333 lines.push("network details".to_string());
334 for node_id in node_ids {
335 let node_pid = node_pids
336 .get(&node_id)
337 .map(|pid| pid.to_string())
338 .unwrap_or_else(|| "-".to_string());
339 let sidecar_pid = sidecar_pids
340 .get(&node_id)
341 .map(|pid| pid.to_string())
342 .unwrap_or_else(|| "-".to_string());
343 lines.push(format!(" node-{}", node_id));
344 lines.push(format!(
345 " pids: node={} sidecar={}",
346 node_pid, sidecar_pid
347 ));
348 if let Some(entries) = process_logs.get(&node_id) {
349 let mut entries = entries.clone();
350 entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
351 lines.push(" logs".to_string());
352 for (kind, pid) in entries {
353 let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
354 lines.push(format!(
355 " {} pid={} stdout={} stderr={}",
356 process_kind_label(&kind),
357 pid,
358 stdout_link,
359 stderr_link
360 ));
361 }
362 }
363 lines.push(" endpoints".to_string());
364 lines.push(format!(" rest: {}", assets::rest_endpoint(node_id)));
365 lines.push(format!(" sse: {}", assets::sse_endpoint(node_id)));
366 lines.push(format!(" rpc: {}", assets::rpc_endpoint(node_id)));
367 lines.push(format!(" binary: {}", assets::binary_address(node_id)));
368 lines.push(format!(
369 " gossip: {}",
370 assets::network_address(node_id)
371 ));
372 }
373
374 lines.join("\n")
375}
376
377fn process_kind_label(kind: &ProcessKind) -> &'static str {
378 match kind {
379 ProcessKind::Node => "node",
380 ProcessKind::Sidecar => "sidecar",
381 }
382}
383
384fn shorten_home_path(path: &str) -> String {
385 let path = Path::new(path);
386 let Some(base_dirs) = BaseDirs::new() else {
387 return path.display().to_string();
388 };
389 let home = base_dirs.home_dir();
390 match path.strip_prefix(home) {
391 Ok(stripped) => {
392 if stripped.as_os_str().is_empty() {
393 return "~".to_string();
394 }
395 let mut shorthand = PathBuf::from("~");
396 shorthand.push(stripped);
397 shorthand.display().to_string()
398 }
399 Err(_) => path.display().to_string(),
400 }
401}
402
403fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
404 let base = match kind {
405 ProcessKind::Node => format!("node-{}", node_id),
406 ProcessKind::Sidecar => format!("sidecar-{}", node_id),
407 };
408 let stdout_link = symlink_root.join(format!("{}.stdout", base));
409 let stderr_link = symlink_root.join(format!("{}.stderr", base));
410 (
411 shorten_home_path(&stdout_link.display().to_string()),
412 shorten_home_path(&stderr_link.display().to_string()),
413 )
414}
415
416async fn print_derived_accounts_summary(layout: &AssetsLayout) {
417 if let Some(summary) = assets::derived_accounts_summary(layout).await {
418 if let Some(parsed) = parse_derived_accounts_csv(&summary) {
419 println!("derived accounts");
420 if !parsed.validators.is_empty() {
421 println!(" validators");
422 print_account_group(&parsed.validators);
423 }
424 if !parsed.users.is_empty() {
425 println!(" users");
426 print_account_group(&parsed.users);
427 }
428 if !parsed.other.is_empty() {
429 println!(" other");
430 print_account_group(&parsed.other);
431 }
432 } else {
433 println!("derived accounts");
434 for line in summary.lines() {
435 println!(" {}", line);
436 }
437 }
438 }
439}
440
441struct DerivedAccountRow {
442 name: String,
443 key_type: String,
444 derivation: String,
445 path: String,
446 account_hash: String,
447 balance: String,
448}
449
450struct DerivedAccountsParsed {
451 validators: Vec<DerivedAccountRow>,
452 users: Vec<DerivedAccountRow>,
453 other: Vec<DerivedAccountRow>,
454}
455
456fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
457 let mut lines = summary.lines();
458 let header = lines.next()?.trim();
459 if header != "kind,name,key_type,derivation,path,account_hash,balance" {
460 return None;
461 }
462
463 let mut parsed = DerivedAccountsParsed {
464 validators: Vec::new(),
465 users: Vec::new(),
466 other: Vec::new(),
467 };
468
469 for line in lines {
470 let line = line.trim();
471 if line.is_empty() {
472 continue;
473 }
474 let mut parts = line.splitn(7, ',');
475 let kind = parts.next()?.to_string();
476 let name = parts.next()?.to_string();
477 let key_type = parts.next()?.to_string();
478 let derivation = parts.next()?.to_string();
479 let path = parts.next()?.to_string();
480 let account_hash = parts.next()?.to_string();
481 let balance = parts.next()?.to_string();
482 let row = DerivedAccountRow {
483 name,
484 key_type,
485 derivation,
486 path,
487 account_hash,
488 balance,
489 };
490 match kind.as_str() {
491 "validator" => parsed.validators.push(row),
492 "user" => parsed.users.push(row),
493 _ => parsed.other.push(row),
494 }
495 }
496
497 Some(parsed)
498}
499
500fn print_account_group(rows: &[DerivedAccountRow]) {
501 for row in rows {
502 println!(" {}:", row.name);
503 println!(" key_type: {}", row.key_type);
504 println!(" derivation: {}", row.derivation);
505 println!(" path: {}", row.path);
506 println!(" account_hash: {}", row.account_hash);
507 println!(" balance: {}", row.balance);
508 }
509}
510
511fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
512 let mut nodes = HashSet::new();
513 for process in processes {
514 nodes.insert(process.record.node_id);
515 }
516 let mut ids: Vec<u32> = nodes.into_iter().collect();
517 ids.sort_unstable();
518 ids
519}
520
521enum RunEvent {
522 CtrlC,
523 ProcessExit {
524 id: String,
525 pid: Option<u32>,
526 code: Option<i32>,
527 signal: Option<i32>,
528 },
529}
530
531fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
532 tokio::spawn(async move {
533 if tokio::signal::ctrl_c().await.is_ok() {
534 let _ = tx.send(RunEvent::CtrlC);
535 }
536 });
537}
538
539fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
540 for running in processes {
541 let tx = tx.clone();
542 tokio::spawn(async move {
543 let id = running.record.id.clone();
544 match running.handle {
545 ProcessHandle::Child(mut child) => {
546 if let Ok(status) = child.wait().await {
547 let pid = record_pid(&running.record).or_else(|| child.id());
548 let code = status.code();
549 let signal = status.signal();
550 let _ = tx.send(RunEvent::ProcessExit {
551 id: id.clone(),
552 pid,
553 code,
554 signal,
555 });
556 }
557 }
558 ProcessHandle::Task(handle) => {
559 let status = handle.await;
560 let pid = record_pid(&running.record);
561 let (code, signal) = match status {
562 Ok(Ok(())) => (Some(0), None),
563 Ok(Err(_)) => (None, None),
564 Err(_) => (None, None),
565 };
566 let _ = tx.send(RunEvent::ProcessExit {
567 id: id.clone(),
568 pid,
569 code,
570 signal,
571 });
572 }
573 }
574 });
575 }
576}
577
578const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
579const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
580
581struct SseHealth {
582 expected_nodes: HashSet<u32>,
583 versions: HashMap<u32, String>,
584 announced: bool,
585 block_seen: bool,
586 sse_spinner: Option<Spinner>,
587 block_spinner: Option<Spinner>,
588 details: String,
589}
590
591impl SseHealth {
592 fn new(node_ids: Vec<u32>, details: String) -> Self {
593 Self {
594 expected_nodes: node_ids.into_iter().collect(),
595 versions: HashMap::new(),
596 announced: false,
597 block_seen: false,
598 sse_spinner: None,
599 block_spinner: None,
600 details,
601 }
602 }
603}
604
605async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
606 if node_id != 1 {
607 return false;
608 }
609 let state = health.lock().await;
610 state.announced
611}
612
613fn start_spinner(message: &str) -> Spinner {
614 Spinner::new(Spinners::Dots, message.to_string())
615}
616
617async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
618 let mut state = health.lock().await;
619 if state.sse_spinner.is_none() {
620 state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
621 }
622}
623
624async fn spawn_sse_listeners(
625 layout: AssetsLayout,
626 node_ids: &[u32],
627 health: Arc<Mutex<SseHealth>>,
628 state: Arc<Mutex<State>>,
629) {
630 for node_id in node_ids {
631 let node_id = *node_id;
632 let endpoint = assets::sse_endpoint(node_id);
633 let layout = layout.clone();
634 let health = Arc::clone(&health);
635 let state = Arc::clone(&state);
636 tokio::spawn(async move {
637 run_sse_listener(node_id, endpoint, health, state, layout).await;
638 });
639 }
640}
641
642async fn run_sse_listener(
643 node_id: u32,
644 endpoint: String,
645 health: Arc<Mutex<SseHealth>>,
646 state: Arc<Mutex<State>>,
647 layout: AssetsLayout,
648) {
649 let mut backoff = ExponentialBackoff::default();
650
651 loop {
652 let config = match ListenerConfig::builder()
653 .with_endpoint(endpoint.clone())
654 .build()
655 {
656 Ok(config) => config,
657 Err(_) => {
658 if !sleep_backoff(&mut backoff).await {
659 return;
660 }
661 continue;
662 }
663 };
664
665 let stream = match sse::listener(config).await {
666 Ok(stream) => {
667 backoff.reset();
668 stream
669 }
670 Err(_) => {
671 if !sleep_backoff(&mut backoff).await {
672 return;
673 }
674 continue;
675 }
676 };
677
678 futures::pin_mut!(stream);
679 let mut stream_failed = false;
680 while let Some(event) = stream.next().await {
681 match event {
682 Ok(sse_event) => match sse_event {
683 SseEvent::ApiVersion(version) => {
684 record_api_version(node_id, version.to_string(), &health).await;
685 }
686 SseEvent::BlockAdded { block_hash, block } => {
687 if node_id == 1
688 && let Err(err) = record_last_block_height(&state, block.height()).await
689 {
690 eprintln!("warning: failed to record last block height: {}", err);
691 }
692 if should_log_primary(node_id, &health).await {
693 mark_block_seen(&health, &layout).await;
694 let prefix = timestamp_prefix();
695 println!(
696 "{} Block {} added (height={} era={})",
697 prefix,
698 block_hash,
699 block.height(),
700 block.era_id().value()
701 );
702 }
703 }
704 SseEvent::TransactionAccepted(transaction) => {
705 if node_id == 1 {
706 let prefix = timestamp_prefix();
707 println!("{} Transaction {} accepted", prefix, transaction.hash());
708 }
709 }
710 SseEvent::TransactionProcessed {
711 transaction_hash,
712 execution_result,
713 messages,
714 ..
715 } => {
716 if node_id == 1 {
717 let tx_hash = transaction_hash.to_string();
718 let prefix = timestamp_prefix();
719 log_transaction_processed(
720 &prefix,
721 &tx_hash,
722 &execution_result,
723 &messages,
724 );
725 }
726 }
727 _ => {}
728 },
729 Err(_) => {
730 stream_failed = true;
731 break;
732 }
733 }
734 }
735
736 if stream_failed && !sleep_backoff(&mut backoff).await {
737 return;
738 }
739 }
740}
741
742async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
743 let (summary, details, sse_spinner) = {
744 let mut state = health.lock().await;
745 if !state.expected_nodes.contains(&node_id) {
746 return;
747 }
748 state.versions.insert(node_id, version);
749 if state.announced || state.versions.len() != state.expected_nodes.len() {
750 return;
751 }
752
753 let summary = version_summary(&state.versions);
754 let details = state.details.clone();
755 let sse_spinner = state.sse_spinner.take();
756 if state.block_spinner.is_none() {
757 state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
758 }
759 state.announced = true;
760 state.block_seen = false;
761 (summary, details, sse_spinner)
762 };
763
764 if let Some(mut spinner) = sse_spinner {
765 spinner.stop_with_message("SSE connection established.".to_string());
766 }
767 println!("Network is healthy ({})", summary);
768 println!("{}", details);
769}
770
771async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
772 let (block_spinner, node_ids) = {
773 let mut state = health.lock().await;
774 if state.block_seen {
775 return;
776 }
777 state.block_seen = true;
778 (
779 state.block_spinner.take(),
780 state.expected_nodes.iter().copied().collect::<Vec<_>>(),
781 )
782 };
783
784 if let Some(mut spinner) = block_spinner {
785 spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
786 }
787
788 match assets::remove_consensus_keys(layout, &node_ids).await {
789 Ok(removed) => {
790 if removed > 0 {
791 println!("Consensus secret keys removed from disk.");
792 }
793 }
794 Err(err) => {
795 eprintln!("warning: failed to remove consensus secret keys: {}", err);
796 }
797 }
798}
799
800async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
801 let mut state = state.lock().await;
802 if state.last_block_height == Some(height) {
803 return Ok(());
804 }
805 state.last_block_height = Some(height);
806 state.touch().await?;
807 Ok(())
808}
809
810fn version_summary(versions: &HashMap<u32, String>) -> String {
811 let mut unique: Vec<String> = versions.values().cloned().collect();
812 unique.sort();
813 unique.dedup();
814 if unique.len() == 1 {
815 format!("version {}", unique[0])
816 } else {
817 format!("versions {}", unique.join(", "))
818 }
819}
820
821async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
822 if let Some(delay) = backoff.next_backoff() {
823 tokio::time::sleep(delay).await;
824 return true;
825 }
826 false
827}
828
829fn log_transaction_processed(
830 prefix: &str,
831 transaction_hash: &str,
832 execution_result: &ExecutionResult,
833 messages: &[casper_types::contract_messages::Message],
834) {
835 let consumed = execution_result.consumed();
836 let consumed_cspr = format_cspr_u512(&consumed);
837 if let Some(error) = execution_result.error_message() {
838 println!(
839 "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
840 prefix, transaction_hash, error, consumed, consumed_cspr
841 );
842 } else {
843 println!(
844 "{} Transaction {} processed succeeded gas={} gas_cspr={}",
845 prefix, transaction_hash, consumed, consumed_cspr
846 );
847 }
848
849 for message in messages {
850 let entity = message.entity_addr().to_formatted_string();
851 let topic = message.topic_name();
852 let payload = format_message_payload(message.payload());
853 println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
854 }
855}
856
857fn timestamp_prefix() -> String {
858 time::OffsetDateTime::now_utc()
859 .format(&time::format_description::well_known::Rfc3339)
860 .unwrap_or_else(|_| "unknown-time".to_string())
861}
862
863fn format_message_payload(payload: &MessagePayload) -> String {
864 match payload {
865 MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
866 MessagePayload::String(value) => format!("{:?}", value),
867 }
868}
869
870fn encode_hex(bytes: &[u8]) -> String {
871 let mut out = String::with_capacity(bytes.len() * 2);
872 for byte in bytes {
873 use std::fmt::Write;
874 let _ = write!(&mut out, "{:02x}", byte);
875 }
876 out
877}
878
879fn format_cspr_u512(motes: &U512) -> String {
880 let motes_str = motes.to_string();
881 let digits = motes_str.len();
882 if digits <= 9 {
883 let frac = format!("{:0>9}", motes_str);
884 let frac = frac.trim_end_matches('0');
885 if frac.is_empty() {
886 return "0".to_string();
887 }
888 return format!("0.{}", frac);
889 }
890
891 let split = digits - 9;
892 let (whole, frac) = motes_str.split_at(split);
893 let frac = frac.trim_end_matches('0');
894 if frac.is_empty() {
895 return whole.to_string();
896 }
897 format!("{}.{}", whole, frac)
898}
899
900async fn update_exited_process(
901 state: &mut State,
902 id: &str,
903 code: Option<i32>,
904 signal: Option<i32>,
905) -> Result<()> {
906 for record in &mut state.processes {
907 if record.id == id {
908 record.last_status = ProcessStatus::Exited;
909 record.exit_code = code;
910 record.exit_signal = signal;
911 record.stopped_at = Some(time::OffsetDateTime::now_utc());
912 break;
913 }
914 }
915 state.touch().await?;
916 Ok(())
917}
918
919fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
920 if let Some(pid) = pid {
921 if let Some(signal) = signal {
922 println!(
923 "process {} (pid {}) exited due to signal {}",
924 id, pid, signal
925 );
926 } else if let Some(code) = code {
927 println!("process {} (pid {}) exited with code {}", id, pid, code);
928 } else {
929 println!("process {} (pid {}) exited", id, pid);
930 }
931 } else if let Some(signal) = signal {
932 println!("process {} exited due to signal {}", id, signal);
933 } else if let Some(code) = code {
934 println!("process {} exited with code {}", id, code);
935 } else {
936 println!("process {} exited", id);
937 }
938}
939
940async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
941 let total_nodes = layout.count_nodes().await.unwrap_or(0);
942 let target = format!("all nodes ({})", total_nodes);
943 let sidecars = processes
944 .iter()
945 .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
946 .count();
947 println!(
948 "started {} process(es) for {} (sidecars: {})",
949 processes.len(),
950 target,
951 sidecars
952 );
953}
954
955fn looks_like_url(path: &Path) -> bool {
956 let value = path.to_string_lossy();
957 value.starts_with("http://") || value.starts_with("https://")
958}
959
960async fn is_dir(path: &Path) -> bool {
961 tokio_fs::metadata(path)
962 .await
963 .map(|meta| meta.is_dir())
964 .unwrap_or(false)
965}
966
967async fn run_assets(args: AssetsArgs) -> Result<()> {
968 match args.command {
969 AssetsCommand::Add(add) => run_assets_add(add).await,
970 AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
971 AssetsCommand::List => run_assets_list().await,
972 }
973}
974
975async fn run_is_ready(args: IsReadyArgs) -> Result<()> {
976 let assets_root = match &args.net_path {
977 Some(path) => path.clone(),
978 None => assets::default_assets_root()?,
979 };
980 let layout = AssetsLayout::new(assets_root, args.network_name);
981 let argv0 = std::env::args()
982 .next()
983 .unwrap_or_else(|| "casper-devnet".to_string());
984 let setup_cmd = format!("{} start --setup-only", argv0);
985 let net_dir = layout.net_dir();
986 if !is_dir(&net_dir).await {
987 return Err(anyhow!(
988 "assets for {} not found; run `{}`",
989 layout.network_name(),
990 setup_cmd
991 ));
992 }
993
994 let state_path = net_dir.join(STATE_FILE_NAME);
995 let contents = match tokio_fs::read_to_string(&state_path).await {
996 Ok(contents) => contents,
997 Err(_) => return Err(anyhow!("network is not ready yet")),
998 };
999 let state =
1000 match tokio::task::spawn_blocking(move || serde_json::from_str::<State>(&contents)).await {
1001 Ok(Ok(state)) => state,
1002 _ => return Err(anyhow!("network is not ready yet")),
1003 };
1004
1005 ensure_processes_running(&state)?;
1006
1007 if state.last_block_height.is_none() {
1008 return Err(anyhow!("network is not ready yet"));
1009 }
1010
1011 ensure_rest_ready(&state).await?;
1012 Ok(())
1013}
1014
1015async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
1016 if looks_like_url(&args.path) {
1017 return Err(anyhow!(
1018 "assets URL is not supported yet; provide a local .tar.gz path"
1019 ));
1020 }
1021 assets::install_assets_bundle(&args.path).await?;
1022 println!(
1023 "assets installed into {}",
1024 assets::assets_bundle_root()?.display()
1025 );
1026 Ok(())
1027}
1028
1029async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
1030 assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
1031 Ok(())
1032}
1033
1034async fn run_assets_list() -> Result<()> {
1035 let mut versions = assets::list_bundle_versions().await?;
1036 if versions.is_empty() {
1037 return Err(anyhow!("no assets bundles found"));
1038 }
1039 versions.sort_by(|a, b| b.cmp(a));
1040 for version in versions {
1041 println!("{}", version);
1042 }
1043 Ok(())
1044}
1045
1046async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
1047 if let Some(raw) = candidate {
1048 let version = assets::parse_protocol_version(raw)?;
1049 if !assets::has_bundle_version(&version).await? {
1050 let argv0 = std::env::args()
1051 .next()
1052 .unwrap_or_else(|| "casper-devnet".to_string());
1053 let pull_cmd = format!("{} assets pull", argv0);
1054 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1055 return Err(anyhow!(
1056 "assets for version {} not found; run `{}` or `{}`",
1057 version,
1058 pull_cmd,
1059 add_cmd
1060 ));
1061 }
1062 return Ok(version.to_string());
1063 }
1064 let versions = assets::list_bundle_versions().await?;
1065 if versions.is_empty() {
1066 let argv0 = std::env::args()
1067 .next()
1068 .unwrap_or_else(|| "casper-devnet".to_string());
1069 let pull_cmd = format!("{} assets pull", argv0);
1070 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1071 return Err(anyhow!(
1072 "no assets found; run `{}` or `{}`",
1073 pull_cmd,
1074 add_cmd
1075 ));
1076 }
1077 let version = versions
1078 .into_iter()
1079 .max()
1080 .expect("non-empty assets versions");
1081 Ok(version.to_string())
1082}
1083
1084fn ensure_processes_running(state: &State) -> Result<()> {
1085 if state.processes.is_empty() {
1086 return Err(anyhow!("network is not ready yet"));
1087 }
1088 for process in &state.processes {
1089 if !matches!(process.last_status, ProcessStatus::Running) {
1090 return Err(anyhow!("network is not ready yet"));
1091 }
1092 let pid = match process.pid {
1093 Some(pid) => pid,
1094 None => return Err(anyhow!("network is not ready yet")),
1095 };
1096 if !is_pid_running(pid) {
1097 return Err(anyhow!("network is not ready yet"));
1098 }
1099 }
1100 Ok(())
1101}
1102
1103fn is_pid_running(pid: u32) -> bool {
1104 let pid = Pid::from_raw(pid as i32);
1105 match kill(pid, None) {
1106 Ok(()) => true,
1107 Err(Errno::ESRCH) => false,
1108 Err(_) => true,
1109 }
1110}
1111
1112async fn ensure_rest_ready(state: &State) -> Result<()> {
1113 let node_ids: HashSet<u32> = state
1114 .processes
1115 .iter()
1116 .filter_map(|process| {
1117 if matches!(process.kind, ProcessKind::Node) {
1118 Some(process.node_id)
1119 } else {
1120 None
1121 }
1122 })
1123 .collect();
1124 if node_ids.is_empty() {
1125 return Err(anyhow!("network is not ready yet"));
1126 }
1127
1128 let client = reqwest::Client::builder()
1129 .no_proxy()
1130 .timeout(Duration::from_secs(2))
1131 .build()?;
1132
1133 for node_id in node_ids {
1134 let url = format!("{}/status", assets::rest_endpoint(node_id));
1135 let response = match client.get(&url).send().await {
1136 Ok(response) => response,
1137 Err(_) => return Err(anyhow!("network is not ready yet")),
1138 };
1139 if response.status() != reqwest::StatusCode::OK {
1140 return Err(anyhow!("network is not ready yet"));
1141 }
1142 let status = match response.json::<NodeStatus>().await {
1143 Ok(status) => status,
1144 Err(_) => return Err(anyhow!("network is not ready yet")),
1145 };
1146 if !status
1147 .reactor_state
1148 .as_deref()
1149 .map(is_ready_reactor_state)
1150 .unwrap_or(false)
1151 {
1152 return Err(anyhow!("network is not ready yet"));
1153 }
1154 }
1155 Ok(())
1156}
1157
1158fn is_ready_reactor_state(state: &str) -> bool {
1159 state == "Validate"
1160}
1161
1162#[derive(Deserialize)]
1163#[allow(dead_code)]
1164#[serde(rename_all = "snake_case")]
1165struct NodeStatus {
1166 peers: Option<Vec<NodePeer>>,
1167 api_version: Option<String>,
1168 build_version: Option<String>,
1169 chainspec_name: Option<String>,
1170 starting_state_root_hash: Option<String>,
1171 last_added_block_info: Option<serde_json::Value>,
1172 our_public_signing_key: Option<String>,
1173 round_length: Option<serde_json::Value>,
1174 next_upgrade: Option<serde_json::Value>,
1175 uptime: Option<String>,
1176 reactor_state: Option<String>,
1177 last_progress: Option<String>,
1178 available_block_range: Option<BlockRange>,
1179 block_sync: Option<BlockSync>,
1180 latest_switch_block_hash: Option<serde_json::Value>,
1181}
1182
1183#[derive(Deserialize)]
1184#[allow(dead_code)]
1185#[serde(rename_all = "snake_case")]
1186struct NodePeer {
1187 node_id: Option<String>,
1188 address: Option<String>,
1189}
1190
1191#[derive(Deserialize)]
1192#[allow(dead_code)]
1193#[serde(rename_all = "snake_case")]
1194struct BlockRange {
1195 low: Option<u64>,
1196 high: Option<u64>,
1197}
1198
1199#[derive(Deserialize)]
1200#[allow(dead_code)]
1201#[serde(rename_all = "snake_case")]
1202struct BlockSync {
1203 historical: Option<serde_json::Value>,
1204 forward: Option<serde_json::Value>,
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1210 use casper_types::U512;
1211 use casper_types::contract_messages::MessagePayload;
1212 use directories::BaseDirs;
1213
1214 #[test]
1215 fn format_cspr_u512_handles_whole_and_fractional() {
1216 assert_eq!(format_cspr_u512(&U512::zero()), "0");
1217 assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1218 assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1219 assert_eq!(
1220 format_cspr_u512(&U512::from(1_000_000_001u64)),
1221 "1.000000001"
1222 );
1223 assert_eq!(
1224 format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1225 "123"
1226 );
1227 assert_eq!(
1228 format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1229 "123.000000456"
1230 );
1231 }
1232
1233 #[test]
1234 fn format_message_payload_renders_string_with_quotes() {
1235 let payload = MessagePayload::String("hello".to_string());
1236 assert_eq!(format_message_payload(&payload), "\"hello\"");
1237 }
1238
1239 #[test]
1240 fn encode_hex_renders_lowercase() {
1241 assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1242 }
1243
1244 #[test]
1245 fn shorten_home_path_replaces_home_prefix() {
1246 let Some(base_dirs) = BaseDirs::new() else {
1247 return;
1248 };
1249 let home = base_dirs.home_dir();
1250 let shortened = shorten_home_path(&home.to_string_lossy());
1251 assert_eq!(shortened, "~");
1252
1253 let nested = home.join("devnet/logs/stdout.log");
1254 let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1255 assert!(shortened_nested.starts_with("~"));
1256 assert!(shortened_nested.contains("devnet"));
1257 }
1258
1259 #[test]
1260 fn shorten_home_path_keeps_relative_paths() {
1261 let input = "relative/path";
1262 assert_eq!(shorten_home_path(input), input);
1263 }
1264}