1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::diagnostics_port;
3use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
4use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, STATE_FILE_NAME, State};
5use anyhow::{Result, anyhow};
6use backoff::ExponentialBackoff;
7use backoff::backoff::Backoff;
8use casper_types::U512;
9use casper_types::contract_messages::MessagePayload;
10use casper_types::execution::ExecutionResult;
11use clap::{Args, Parser, Subcommand};
12use directories::BaseDirs;
13use futures::StreamExt;
14use nix::errno::Errno;
15use nix::sys::signal::kill;
16use nix::unistd::Pid;
17use serde::Deserialize;
18use spinners::{Spinner, Spinners};
19use std::collections::{HashMap, HashSet};
20use std::os::unix::process::ExitStatusExt;
21use std::path::{Path, PathBuf};
22use std::sync::Arc;
23use std::sync::atomic::Ordering;
24use std::time::Duration;
25use tokio::fs as tokio_fs;
26use tokio::sync::Mutex;
27use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
28use veles_casper_rust_sdk::sse::event::SseEvent;
29use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
30
31#[derive(Parser)]
33#[command(name = "nctl")]
34#[command(
35 about = "casper-devnet launcher for local Casper Network development networks",
36 long_about = None
37)]
38pub struct Cli {
39 #[command(subcommand)]
40 command: Command,
41}
42
43#[derive(Subcommand)]
45enum Command {
46 Start(StartArgs),
48 Assets(AssetsArgs),
50 IsReady(IsReadyArgs),
52}
53
54#[derive(Args, Clone)]
56struct StartArgs {
57 #[arg(long, default_value = "casper-dev")]
59 network_name: String,
60
61 #[arg(long, value_name = "PATH")]
63 net_path: Option<PathBuf>,
64
65 #[arg(long)]
67 protocol_version: Option<String>,
68
69 #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
71 node_count: u32,
72
73 #[arg(long)]
75 users: Option<u32>,
76
77 #[arg(long, default_value_t = 3)]
79 delay: u64,
80
81 #[arg(long = "log-level", default_value = "info")]
83 log_level: String,
84
85 #[arg(long, default_value = "json")]
87 node_log_format: String,
88
89 #[arg(long)]
91 setup_only: bool,
92
93 #[arg(long)]
95 force_setup: bool,
96
97 #[arg(long, default_value = "default")]
99 seed: Arc<str>,
100}
101
102#[derive(Args)]
104struct AssetsArgs {
105 #[command(subcommand)]
106 command: AssetsCommand,
107}
108
109#[derive(Subcommand)]
111enum AssetsCommand {
112 Add(AssetsAddArgs),
114 Pull(AssetsPullArgs),
116 List,
118}
119
120#[derive(Args, Clone)]
122struct AssetsAddArgs {
123 #[arg(value_name = "PATH")]
125 path: PathBuf,
126}
127
128#[derive(Args, Clone)]
130struct AssetsPullArgs {
131 #[arg(long)]
133 target: Option<String>,
134
135 #[arg(long)]
137 force: bool,
138}
139
140#[derive(Args, Clone)]
142struct IsReadyArgs {
143 #[arg(long, default_value = "casper-dev")]
145 network_name: String,
146
147 #[arg(long, value_name = "PATH")]
149 net_path: Option<PathBuf>,
150}
151
152pub async fn run() -> Result<()> {
154 let cli = Cli::parse();
155 match cli.command {
156 Command::Start(args) => run_start(args).await,
157 Command::Assets(args) => run_assets(args).await,
158 Command::IsReady(args) => run_is_ready(args).await,
159 }
160}
161
162async fn run_start(args: StartArgs) -> Result<()> {
163 let assets_root = match &args.net_path {
164 Some(path) => path.clone(),
165 None => assets::default_assets_root()?,
166 };
167 let layout = AssetsLayout::new(assets_root, args.network_name.clone());
168 let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
169 println!("assets path: {}", assets_path);
170 let assets_exist = layout.exists().await;
171 if !args.setup_only && !args.force_setup && assets_exist {
172 println!("resuming network operations on {}", layout.network_name());
173 }
174 let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
175
176 if args.setup_only {
177 return run_setup_only(&layout, &args, &protocol_version).await;
178 }
179
180 if args.force_setup {
181 assets::teardown(&layout).await?;
182 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
183 } else if !assets_exist {
184 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
185 }
186
187 if !layout.exists().await {
188 return Err(anyhow!(
189 "assets missing under {}; run with --setup-only to create them",
190 shorten_home_path(&layout.net_dir().display().to_string())
191 ));
192 }
193
194 if !args.force_setup && assets_exist {
195 let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
196 if restored > 0 {
197 println!("recreated consensus keys for {} node(s)", restored);
198 }
199 }
200
201 let rust_log = args.log_level.clone();
202
203 let plan = StartPlan { rust_log };
204
205 let state_path = layout.net_dir().join(STATE_FILE_NAME);
206 let state = Arc::new(Mutex::new(State::new(state_path).await?));
207 let started = {
208 let mut state = state.lock().await;
209 process::start(&layout, &plan, &mut state).await?
210 };
211
212 print_pids(&started);
213 print_start_banner(&layout, &started).await;
214 print_derived_accounts_summary(&layout).await;
215
216 let node_ids = unique_node_ids(&started);
217 let details = format_network_details(&layout, &started).await;
218 let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
219 start_sse_spinner(&health).await;
220 spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
221 let mut diagnostics_proxy = match diagnostics_port::spawn(&layout).await {
222 Ok(proxy) => Some(proxy),
223 Err(err) => {
224 eprintln!("warning: failed to start diagnostics proxy: {}", err);
225 None
226 }
227 };
228
229 let (event_tx, mut event_rx) = unbounded_channel();
230 spawn_ctrlc_listener(event_tx.clone());
231 spawn_exit_watchers(started, event_tx);
232
233 if let Some(event) = event_rx.recv().await {
234 match event {
235 RunEvent::CtrlC => {
236 if let Some(proxy) = diagnostics_proxy.take() {
237 proxy.shutdown();
238 }
239 let mut state = state.lock().await;
240 process::stop(&mut state).await?;
241 }
242 RunEvent::ProcessExit {
243 id,
244 pid,
245 code,
246 signal,
247 } => {
248 if let Some(proxy) = diagnostics_proxy.take() {
249 proxy.shutdown();
250 }
251 let mut state = state.lock().await;
252 update_exited_process(&mut state, &id, code, signal).await?;
253 log_exit(&id, pid, code, signal);
254 process::stop(&mut state).await?;
255 }
256 }
257 }
258
259 Ok(())
260}
261
262async fn run_setup_only(
263 layout: &AssetsLayout,
264 args: &StartArgs,
265 protocol_version: &str,
266) -> Result<()> {
267 if args.force_setup {
268 assets::teardown(layout).await?;
269 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
270 print_derived_accounts_summary(layout).await;
271 return Ok(());
272 }
273
274 if layout.exists().await {
275 println!(
276 "assets already exist at {}; use --force-setup to rebuild",
277 shorten_home_path(&layout.net_dir().display().to_string())
278 );
279 print_derived_accounts_summary(layout).await;
280 return Ok(());
281 }
282
283 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
284 print_derived_accounts_summary(layout).await;
285 Ok(())
286}
287
288fn record_pid(record: &ProcessRecord) -> Option<u32> {
289 if let Some(handle) = &record.pid_handle {
290 let pid = handle.load(Ordering::SeqCst);
291 if pid != 0 {
292 return Some(pid);
293 }
294 }
295 record.pid
296}
297
298fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
299 SetupOptions {
300 nodes: args.node_count,
301 users: args.users,
302 delay_seconds: args.delay,
303 network_name: args.network_name.clone(),
304 protocol_version: protocol_version.to_string(),
305 node_log_format: args.node_log_format.clone(),
306 seed: Arc::clone(&args.seed),
307 }
308}
309
310fn print_pids(records: &[RunningProcess]) {
311 for record in records {
312 if let Some(pid) = record_pid(&record.record) {
313 println!(
314 "{} pid={} ({:?})",
315 record.record.id, pid, record.record.kind
316 );
317 }
318 }
319}
320
321async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
322 let symlink_root = layout.net_dir();
323 let mut node_pids: HashMap<u32, u32> = HashMap::new();
324 let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
325 let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
326
327 for process in processes {
328 if let Some(pid) = record_pid(&process.record) {
329 match process.record.kind {
330 ProcessKind::Node => {
331 node_pids.insert(process.record.node_id, pid);
332 }
333 ProcessKind::Sidecar => {
334 sidecar_pids.insert(process.record.node_id, pid);
335 }
336 }
337 process_logs
338 .entry(process.record.node_id)
339 .or_default()
340 .push((process.record.kind.clone(), pid));
341 }
342 }
343
344 let node_ids = unique_node_ids(processes);
345
346 let mut lines = Vec::new();
347 lines.push("network details".to_string());
348 for node_id in node_ids {
349 let node_pid = node_pids
350 .get(&node_id)
351 .map(|pid| pid.to_string())
352 .unwrap_or_else(|| "-".to_string());
353 let sidecar_pid = sidecar_pids
354 .get(&node_id)
355 .map(|pid| pid.to_string())
356 .unwrap_or_else(|| "-".to_string());
357 lines.push(format!(" node-{}", node_id));
358 lines.push(format!(
359 " pids: node={} sidecar={}",
360 node_pid, sidecar_pid
361 ));
362 if let Some(entries) = process_logs.get(&node_id) {
363 let mut entries = entries.clone();
364 entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
365 lines.push(" logs".to_string());
366 for (kind, pid) in entries {
367 let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
368 lines.push(format!(
369 " {} pid={} stdout={} stderr={}",
370 process_kind_label(&kind),
371 pid,
372 stdout_link,
373 stderr_link
374 ));
375 }
376 }
377 lines.push(" endpoints".to_string());
378 lines.push(format!(" rest: {}", assets::rest_endpoint(node_id)));
379 lines.push(format!(" sse: {}", assets::sse_endpoint(node_id)));
380 lines.push(format!(" rpc: {}", assets::rpc_endpoint(node_id)));
381 lines.push(format!(" binary: {}", assets::binary_address(node_id)));
382 lines.push(format!(
383 " diagnostics: {}",
384 assets::diagnostics_socket_path(layout.network_name(), node_id)
385 ));
386 lines.push(format!(
387 " diagnostics-ws: {}",
388 assets::diagnostics_ws_endpoint(node_id)
389 ));
390 lines.push(format!(
391 " gossip: {}",
392 assets::network_address(node_id)
393 ));
394 }
395
396 lines.join("\n")
397}
398
399fn process_kind_label(kind: &ProcessKind) -> &'static str {
400 match kind {
401 ProcessKind::Node => "node",
402 ProcessKind::Sidecar => "sidecar",
403 }
404}
405
406fn shorten_home_path(path: &str) -> String {
407 let path = Path::new(path);
408 let Some(base_dirs) = BaseDirs::new() else {
409 return path.display().to_string();
410 };
411 let home = base_dirs.home_dir();
412 match path.strip_prefix(home) {
413 Ok(stripped) => {
414 if stripped.as_os_str().is_empty() {
415 return "~".to_string();
416 }
417 let mut shorthand = PathBuf::from("~");
418 shorthand.push(stripped);
419 shorthand.display().to_string()
420 }
421 Err(_) => path.display().to_string(),
422 }
423}
424
425fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
426 let base = match kind {
427 ProcessKind::Node => format!("node-{}", node_id),
428 ProcessKind::Sidecar => format!("sidecar-{}", node_id),
429 };
430 let stdout_link = symlink_root.join(format!("{}.stdout", base));
431 let stderr_link = symlink_root.join(format!("{}.stderr", base));
432 (
433 shorten_home_path(&stdout_link.display().to_string()),
434 shorten_home_path(&stderr_link.display().to_string()),
435 )
436}
437
438async fn print_derived_accounts_summary(layout: &AssetsLayout) {
439 if let Some(summary) = assets::derived_accounts_summary(layout).await {
440 if let Some(parsed) = parse_derived_accounts_csv(&summary) {
441 println!("derived accounts");
442 if !parsed.validators.is_empty() {
443 println!(" validators");
444 print_account_group(&parsed.validators);
445 }
446 if !parsed.users.is_empty() {
447 println!(" users");
448 print_account_group(&parsed.users);
449 }
450 if !parsed.other.is_empty() {
451 println!(" other");
452 print_account_group(&parsed.other);
453 }
454 } else {
455 println!("derived accounts");
456 for line in summary.lines() {
457 println!(" {}", line);
458 }
459 }
460 }
461}
462
463struct DerivedAccountRow {
464 name: String,
465 key_type: String,
466 derivation: String,
467 path: String,
468 account_hash: String,
469 balance: String,
470}
471
472struct DerivedAccountsParsed {
473 validators: Vec<DerivedAccountRow>,
474 users: Vec<DerivedAccountRow>,
475 other: Vec<DerivedAccountRow>,
476}
477
478fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
479 let mut lines = summary.lines();
480 let header = lines.next()?.trim();
481 if header != "kind,name,key_type,derivation,path,account_hash,balance" {
482 return None;
483 }
484
485 let mut parsed = DerivedAccountsParsed {
486 validators: Vec::new(),
487 users: Vec::new(),
488 other: Vec::new(),
489 };
490
491 for line in lines {
492 let line = line.trim();
493 if line.is_empty() {
494 continue;
495 }
496 let mut parts = line.splitn(7, ',');
497 let kind = parts.next()?.to_string();
498 let name = parts.next()?.to_string();
499 let key_type = parts.next()?.to_string();
500 let derivation = parts.next()?.to_string();
501 let path = parts.next()?.to_string();
502 let account_hash = parts.next()?.to_string();
503 let balance = parts.next()?.to_string();
504 let row = DerivedAccountRow {
505 name,
506 key_type,
507 derivation,
508 path,
509 account_hash,
510 balance,
511 };
512 match kind.as_str() {
513 "validator" => parsed.validators.push(row),
514 "user" => parsed.users.push(row),
515 _ => parsed.other.push(row),
516 }
517 }
518
519 Some(parsed)
520}
521
522fn print_account_group(rows: &[DerivedAccountRow]) {
523 for row in rows {
524 println!(" {}:", row.name);
525 println!(" key_type: {}", row.key_type);
526 println!(" derivation: {}", row.derivation);
527 println!(" path: {}", row.path);
528 println!(" account_hash: {}", row.account_hash);
529 println!(" balance: {}", row.balance);
530 }
531}
532
533fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
534 let mut nodes = HashSet::new();
535 for process in processes {
536 nodes.insert(process.record.node_id);
537 }
538 let mut ids: Vec<u32> = nodes.into_iter().collect();
539 ids.sort_unstable();
540 ids
541}
542
543enum RunEvent {
544 CtrlC,
545 ProcessExit {
546 id: String,
547 pid: Option<u32>,
548 code: Option<i32>,
549 signal: Option<i32>,
550 },
551}
552
553fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
554 tokio::spawn(async move {
555 if tokio::signal::ctrl_c().await.is_ok() {
556 let _ = tx.send(RunEvent::CtrlC);
557 }
558 });
559}
560
561fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
562 for running in processes {
563 let tx = tx.clone();
564 tokio::spawn(async move {
565 let id = running.record.id.clone();
566 match running.handle {
567 ProcessHandle::Child(mut child) => {
568 if let Ok(status) = child.wait().await {
569 let pid = record_pid(&running.record).or_else(|| child.id());
570 let code = status.code();
571 let signal = status.signal();
572 let _ = tx.send(RunEvent::ProcessExit {
573 id: id.clone(),
574 pid,
575 code,
576 signal,
577 });
578 }
579 }
580 ProcessHandle::Task(handle) => {
581 let status = handle.await;
582 let pid = record_pid(&running.record);
583 let (code, signal) = match status {
584 Ok(Ok(())) => (Some(0), None),
585 Ok(Err(_)) => (None, None),
586 Err(_) => (None, None),
587 };
588 let _ = tx.send(RunEvent::ProcessExit {
589 id: id.clone(),
590 pid,
591 code,
592 signal,
593 });
594 }
595 }
596 });
597 }
598}
599
600const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
601const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
602
603struct SseHealth {
604 expected_nodes: HashSet<u32>,
605 versions: HashMap<u32, String>,
606 announced: bool,
607 block_seen: bool,
608 sse_spinner: Option<Spinner>,
609 block_spinner: Option<Spinner>,
610 details: String,
611}
612
613impl SseHealth {
614 fn new(node_ids: Vec<u32>, details: String) -> Self {
615 Self {
616 expected_nodes: node_ids.into_iter().collect(),
617 versions: HashMap::new(),
618 announced: false,
619 block_seen: false,
620 sse_spinner: None,
621 block_spinner: None,
622 details,
623 }
624 }
625}
626
627async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
628 if node_id != 1 {
629 return false;
630 }
631 let state = health.lock().await;
632 state.announced
633}
634
635fn start_spinner(message: &str) -> Spinner {
636 Spinner::new(Spinners::Dots, message.to_string())
637}
638
639async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
640 let mut state = health.lock().await;
641 if state.sse_spinner.is_none() {
642 state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
643 }
644}
645
646async fn spawn_sse_listeners(
647 layout: AssetsLayout,
648 node_ids: &[u32],
649 health: Arc<Mutex<SseHealth>>,
650 state: Arc<Mutex<State>>,
651) {
652 for node_id in node_ids {
653 let node_id = *node_id;
654 let endpoint = assets::sse_endpoint(node_id);
655 let layout = layout.clone();
656 let health = Arc::clone(&health);
657 let state = Arc::clone(&state);
658 tokio::spawn(async move {
659 run_sse_listener(node_id, endpoint, health, state, layout).await;
660 });
661 }
662}
663
664async fn run_sse_listener(
665 node_id: u32,
666 endpoint: String,
667 health: Arc<Mutex<SseHealth>>,
668 state: Arc<Mutex<State>>,
669 layout: AssetsLayout,
670) {
671 let mut backoff = ExponentialBackoff::default();
672
673 loop {
674 let config = match ListenerConfig::builder()
675 .with_endpoint(endpoint.clone())
676 .build()
677 {
678 Ok(config) => config,
679 Err(_) => {
680 if !sleep_backoff(&mut backoff).await {
681 return;
682 }
683 continue;
684 }
685 };
686
687 let stream = match sse::listener(config).await {
688 Ok(stream) => {
689 backoff.reset();
690 stream
691 }
692 Err(_) => {
693 if !sleep_backoff(&mut backoff).await {
694 return;
695 }
696 continue;
697 }
698 };
699
700 futures::pin_mut!(stream);
701 let mut stream_failed = false;
702 while let Some(event) = stream.next().await {
703 match event {
704 Ok(sse_event) => match sse_event {
705 SseEvent::ApiVersion(version) => {
706 record_api_version(node_id, version.to_string(), &health).await;
707 }
708 SseEvent::BlockAdded { block_hash, block } => {
709 if node_id == 1
710 && let Err(err) = record_last_block_height(&state, block.height()).await
711 {
712 eprintln!("warning: failed to record last block height: {}", err);
713 }
714 if should_log_primary(node_id, &health).await {
715 mark_block_seen(&health, &layout).await;
716 let prefix = timestamp_prefix();
717 println!(
718 "{} Block {} added (height={} era={})",
719 prefix,
720 block_hash,
721 block.height(),
722 block.era_id().value()
723 );
724 }
725 }
726 SseEvent::TransactionAccepted(transaction) => {
727 if node_id == 1 {
728 let prefix = timestamp_prefix();
729 println!("{} Transaction {} accepted", prefix, transaction.hash());
730 }
731 }
732 SseEvent::TransactionProcessed {
733 transaction_hash,
734 execution_result,
735 messages,
736 ..
737 } => {
738 if node_id == 1 {
739 let tx_hash = transaction_hash.to_string();
740 let prefix = timestamp_prefix();
741 log_transaction_processed(
742 &prefix,
743 &tx_hash,
744 &execution_result,
745 &messages,
746 );
747 }
748 }
749 _ => {}
750 },
751 Err(_) => {
752 stream_failed = true;
753 break;
754 }
755 }
756 }
757
758 if stream_failed && !sleep_backoff(&mut backoff).await {
759 return;
760 }
761 }
762}
763
764async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
765 let (summary, details, sse_spinner) = {
766 let mut state = health.lock().await;
767 if !state.expected_nodes.contains(&node_id) {
768 return;
769 }
770 state.versions.insert(node_id, version);
771 if state.announced || state.versions.len() != state.expected_nodes.len() {
772 return;
773 }
774
775 let summary = version_summary(&state.versions);
776 let details = state.details.clone();
777 let sse_spinner = state.sse_spinner.take();
778 if state.block_spinner.is_none() {
779 state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
780 }
781 state.announced = true;
782 state.block_seen = false;
783 (summary, details, sse_spinner)
784 };
785
786 if let Some(mut spinner) = sse_spinner {
787 spinner.stop_with_message("SSE connection established.".to_string());
788 }
789 println!("Network is healthy ({})", summary);
790 println!("{}", details);
791}
792
793async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
794 let (block_spinner, node_ids) = {
795 let mut state = health.lock().await;
796 if state.block_seen {
797 return;
798 }
799 state.block_seen = true;
800 (
801 state.block_spinner.take(),
802 state.expected_nodes.iter().copied().collect::<Vec<_>>(),
803 )
804 };
805
806 if let Some(mut spinner) = block_spinner {
807 spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
808 }
809
810 match assets::remove_consensus_keys(layout, &node_ids).await {
811 Ok(removed) => {
812 if removed > 0 {
813 println!("Consensus secret keys removed from disk.");
814 }
815 }
816 Err(err) => {
817 eprintln!("warning: failed to remove consensus secret keys: {}", err);
818 }
819 }
820}
821
822async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
823 let mut state = state.lock().await;
824 if state.last_block_height == Some(height) {
825 return Ok(());
826 }
827 state.last_block_height = Some(height);
828 state.touch().await?;
829 Ok(())
830}
831
832fn version_summary(versions: &HashMap<u32, String>) -> String {
833 let mut unique: Vec<String> = versions.values().cloned().collect();
834 unique.sort();
835 unique.dedup();
836 if unique.len() == 1 {
837 format!("version {}", unique[0])
838 } else {
839 format!("versions {}", unique.join(", "))
840 }
841}
842
843async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
844 if let Some(delay) = backoff.next_backoff() {
845 tokio::time::sleep(delay).await;
846 return true;
847 }
848 false
849}
850
851fn log_transaction_processed(
852 prefix: &str,
853 transaction_hash: &str,
854 execution_result: &ExecutionResult,
855 messages: &[casper_types::contract_messages::Message],
856) {
857 let consumed = execution_result.consumed();
858 let consumed_cspr = format_cspr_u512(&consumed);
859 if let Some(error) = execution_result.error_message() {
860 println!(
861 "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
862 prefix, transaction_hash, error, consumed, consumed_cspr
863 );
864 } else {
865 println!(
866 "{} Transaction {} processed succeeded gas={} gas_cspr={}",
867 prefix, transaction_hash, consumed, consumed_cspr
868 );
869 }
870
871 for message in messages {
872 let entity = message.entity_addr().to_formatted_string();
873 let topic = message.topic_name();
874 let payload = format_message_payload(message.payload());
875 println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
876 }
877}
878
879fn timestamp_prefix() -> String {
880 time::OffsetDateTime::now_utc()
881 .format(&time::format_description::well_known::Rfc3339)
882 .unwrap_or_else(|_| "unknown-time".to_string())
883}
884
885fn format_message_payload(payload: &MessagePayload) -> String {
886 match payload {
887 MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
888 MessagePayload::String(value) => format!("{:?}", value),
889 }
890}
891
892fn encode_hex(bytes: &[u8]) -> String {
893 let mut out = String::with_capacity(bytes.len() * 2);
894 for byte in bytes {
895 use std::fmt::Write;
896 let _ = write!(&mut out, "{:02x}", byte);
897 }
898 out
899}
900
901fn format_cspr_u512(motes: &U512) -> String {
902 let motes_str = motes.to_string();
903 let digits = motes_str.len();
904 if digits <= 9 {
905 let frac = format!("{:0>9}", motes_str);
906 let frac = frac.trim_end_matches('0');
907 if frac.is_empty() {
908 return "0".to_string();
909 }
910 return format!("0.{}", frac);
911 }
912
913 let split = digits - 9;
914 let (whole, frac) = motes_str.split_at(split);
915 let frac = frac.trim_end_matches('0');
916 if frac.is_empty() {
917 return whole.to_string();
918 }
919 format!("{}.{}", whole, frac)
920}
921
922async fn update_exited_process(
923 state: &mut State,
924 id: &str,
925 code: Option<i32>,
926 signal: Option<i32>,
927) -> Result<()> {
928 for record in &mut state.processes {
929 if record.id == id {
930 record.last_status = ProcessStatus::Exited;
931 record.exit_code = code;
932 record.exit_signal = signal;
933 record.stopped_at = Some(time::OffsetDateTime::now_utc());
934 break;
935 }
936 }
937 state.touch().await?;
938 Ok(())
939}
940
941fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
942 if let Some(pid) = pid {
943 if let Some(signal) = signal {
944 println!(
945 "process {} (pid {}) exited due to signal {}",
946 id, pid, signal
947 );
948 } else if let Some(code) = code {
949 println!("process {} (pid {}) exited with code {}", id, pid, code);
950 } else {
951 println!("process {} (pid {}) exited", id, pid);
952 }
953 } else if let Some(signal) = signal {
954 println!("process {} exited due to signal {}", id, signal);
955 } else if let Some(code) = code {
956 println!("process {} exited with code {}", id, code);
957 } else {
958 println!("process {} exited", id);
959 }
960}
961
962async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
963 let total_nodes = layout.count_nodes().await.unwrap_or(0);
964 let target = format!("all nodes ({})", total_nodes);
965 let sidecars = processes
966 .iter()
967 .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
968 .count();
969 println!(
970 "started {} process(es) for {} (sidecars: {})",
971 processes.len(),
972 target,
973 sidecars
974 );
975}
976
977fn looks_like_url(path: &Path) -> bool {
978 let value = path.to_string_lossy();
979 value.starts_with("http://") || value.starts_with("https://")
980}
981
982async fn is_dir(path: &Path) -> bool {
983 tokio_fs::metadata(path)
984 .await
985 .map(|meta| meta.is_dir())
986 .unwrap_or(false)
987}
988
989async fn run_assets(args: AssetsArgs) -> Result<()> {
990 match args.command {
991 AssetsCommand::Add(add) => run_assets_add(add).await,
992 AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
993 AssetsCommand::List => run_assets_list().await,
994 }
995}
996
997async fn run_is_ready(args: IsReadyArgs) -> Result<()> {
998 let assets_root = match &args.net_path {
999 Some(path) => path.clone(),
1000 None => assets::default_assets_root()?,
1001 };
1002 let layout = AssetsLayout::new(assets_root, args.network_name);
1003 let argv0 = std::env::args()
1004 .next()
1005 .unwrap_or_else(|| "casper-devnet".to_string());
1006 let setup_cmd = format!("{} start --setup-only", argv0);
1007 let net_dir = layout.net_dir();
1008 if !is_dir(&net_dir).await {
1009 return Err(anyhow!(
1010 "assets for {} not found; run `{}`",
1011 layout.network_name(),
1012 setup_cmd
1013 ));
1014 }
1015
1016 let state_path = net_dir.join(STATE_FILE_NAME);
1017 let contents = match tokio_fs::read_to_string(&state_path).await {
1018 Ok(contents) => contents,
1019 Err(_) => return Err(anyhow!("network is not ready yet")),
1020 };
1021 let state =
1022 match tokio::task::spawn_blocking(move || serde_json::from_str::<State>(&contents)).await {
1023 Ok(Ok(state)) => state,
1024 _ => return Err(anyhow!("network is not ready yet")),
1025 };
1026
1027 ensure_processes_running(&state)?;
1028
1029 if state.last_block_height.is_none() {
1030 return Err(anyhow!("network is not ready yet"));
1031 }
1032
1033 ensure_rest_ready(&state).await?;
1034 Ok(())
1035}
1036
1037async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
1038 if looks_like_url(&args.path) {
1039 return Err(anyhow!(
1040 "assets URL is not supported yet; provide a local .tar.gz path"
1041 ));
1042 }
1043 assets::install_assets_bundle(&args.path).await?;
1044 println!(
1045 "assets installed into {}",
1046 assets::assets_bundle_root()?.display()
1047 );
1048 Ok(())
1049}
1050
1051async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
1052 assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
1053 Ok(())
1054}
1055
1056async fn run_assets_list() -> Result<()> {
1057 let mut versions = assets::list_bundle_versions().await?;
1058 if versions.is_empty() {
1059 return Err(anyhow!("no assets bundles found"));
1060 }
1061 versions.sort_by(|a, b| b.cmp(a));
1062 for version in versions {
1063 println!("{}", version);
1064 }
1065 Ok(())
1066}
1067
1068async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
1069 if let Some(raw) = candidate {
1070 let version = assets::parse_protocol_version(raw)?;
1071 if !assets::has_bundle_version(&version).await? {
1072 let argv0 = std::env::args()
1073 .next()
1074 .unwrap_or_else(|| "casper-devnet".to_string());
1075 let pull_cmd = format!("{} assets pull", argv0);
1076 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1077 return Err(anyhow!(
1078 "assets for version {} not found; run `{}` or `{}`",
1079 version,
1080 pull_cmd,
1081 add_cmd
1082 ));
1083 }
1084 return Ok(version.to_string());
1085 }
1086 let versions = assets::list_bundle_versions().await?;
1087 if versions.is_empty() {
1088 let argv0 = std::env::args()
1089 .next()
1090 .unwrap_or_else(|| "casper-devnet".to_string());
1091 let pull_cmd = format!("{} assets pull", argv0);
1092 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1093 return Err(anyhow!(
1094 "no assets found; run `{}` or `{}`",
1095 pull_cmd,
1096 add_cmd
1097 ));
1098 }
1099 let version = versions
1100 .into_iter()
1101 .max()
1102 .expect("non-empty assets versions");
1103 Ok(version.to_string())
1104}
1105
1106fn ensure_processes_running(state: &State) -> Result<()> {
1107 if state.processes.is_empty() {
1108 return Err(anyhow!("network is not ready yet"));
1109 }
1110 for process in &state.processes {
1111 if !matches!(process.last_status, ProcessStatus::Running) {
1112 return Err(anyhow!("network is not ready yet"));
1113 }
1114 let pid = match process.pid {
1115 Some(pid) => pid,
1116 None => return Err(anyhow!("network is not ready yet")),
1117 };
1118 if !is_pid_running(pid) {
1119 return Err(anyhow!("network is not ready yet"));
1120 }
1121 }
1122 Ok(())
1123}
1124
1125fn is_pid_running(pid: u32) -> bool {
1126 let pid = Pid::from_raw(pid as i32);
1127 match kill(pid, None) {
1128 Ok(()) => true,
1129 Err(Errno::ESRCH) => false,
1130 Err(_) => true,
1131 }
1132}
1133
1134async fn ensure_rest_ready(state: &State) -> Result<()> {
1135 let node_ids: HashSet<u32> = state
1136 .processes
1137 .iter()
1138 .filter_map(|process| {
1139 if matches!(process.kind, ProcessKind::Node) {
1140 Some(process.node_id)
1141 } else {
1142 None
1143 }
1144 })
1145 .collect();
1146 if node_ids.is_empty() {
1147 return Err(anyhow!("network is not ready yet"));
1148 }
1149
1150 let client = reqwest::Client::builder()
1151 .no_proxy()
1152 .timeout(Duration::from_secs(2))
1153 .build()?;
1154
1155 for node_id in node_ids {
1156 let url = format!("{}/status", assets::rest_endpoint(node_id));
1157 let response = match client.get(&url).send().await {
1158 Ok(response) => response,
1159 Err(_) => return Err(anyhow!("network is not ready yet")),
1160 };
1161 if response.status() != reqwest::StatusCode::OK {
1162 return Err(anyhow!("network is not ready yet"));
1163 }
1164 let status = match response.json::<NodeStatus>().await {
1165 Ok(status) => status,
1166 Err(_) => return Err(anyhow!("network is not ready yet")),
1167 };
1168 if !status
1169 .reactor_state
1170 .as_deref()
1171 .map(is_ready_reactor_state)
1172 .unwrap_or(false)
1173 {
1174 return Err(anyhow!("network is not ready yet"));
1175 }
1176 }
1177 Ok(())
1178}
1179
1180fn is_ready_reactor_state(state: &str) -> bool {
1181 state == "Validate"
1182}
1183
1184#[derive(Deserialize)]
1185#[allow(dead_code)]
1186#[serde(rename_all = "snake_case")]
1187struct NodeStatus {
1188 peers: Option<Vec<NodePeer>>,
1189 api_version: Option<String>,
1190 build_version: Option<String>,
1191 chainspec_name: Option<String>,
1192 starting_state_root_hash: Option<String>,
1193 last_added_block_info: Option<serde_json::Value>,
1194 our_public_signing_key: Option<String>,
1195 round_length: Option<serde_json::Value>,
1196 next_upgrade: Option<serde_json::Value>,
1197 uptime: Option<String>,
1198 reactor_state: Option<String>,
1199 last_progress: Option<String>,
1200 available_block_range: Option<BlockRange>,
1201 block_sync: Option<BlockSync>,
1202 latest_switch_block_hash: Option<serde_json::Value>,
1203}
1204
1205#[derive(Deserialize)]
1206#[allow(dead_code)]
1207#[serde(rename_all = "snake_case")]
1208struct NodePeer {
1209 node_id: Option<String>,
1210 address: Option<String>,
1211}
1212
1213#[derive(Deserialize)]
1214#[allow(dead_code)]
1215#[serde(rename_all = "snake_case")]
1216struct BlockRange {
1217 low: Option<u64>,
1218 high: Option<u64>,
1219}
1220
1221#[derive(Deserialize)]
1222#[allow(dead_code)]
1223#[serde(rename_all = "snake_case")]
1224struct BlockSync {
1225 historical: Option<serde_json::Value>,
1226 forward: Option<serde_json::Value>,
1227}
1228
1229#[cfg(test)]
1230mod tests {
1231 use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1232 use casper_types::U512;
1233 use casper_types::contract_messages::MessagePayload;
1234 use directories::BaseDirs;
1235
1236 #[test]
1237 fn format_cspr_u512_handles_whole_and_fractional() {
1238 assert_eq!(format_cspr_u512(&U512::zero()), "0");
1239 assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1240 assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1241 assert_eq!(
1242 format_cspr_u512(&U512::from(1_000_000_001u64)),
1243 "1.000000001"
1244 );
1245 assert_eq!(
1246 format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1247 "123"
1248 );
1249 assert_eq!(
1250 format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1251 "123.000000456"
1252 );
1253 }
1254
1255 #[test]
1256 fn format_message_payload_renders_string_with_quotes() {
1257 let payload = MessagePayload::String("hello".to_string());
1258 assert_eq!(format_message_payload(&payload), "\"hello\"");
1259 }
1260
1261 #[test]
1262 fn encode_hex_renders_lowercase() {
1263 assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1264 }
1265
1266 #[test]
1267 fn shorten_home_path_replaces_home_prefix() {
1268 let Some(base_dirs) = BaseDirs::new() else {
1269 return;
1270 };
1271 let home = base_dirs.home_dir();
1272 let shortened = shorten_home_path(&home.to_string_lossy());
1273 assert_eq!(shortened, "~");
1274
1275 let nested = home.join("devnet/logs/stdout.log");
1276 let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1277 assert!(shortened_nested.starts_with("~"));
1278 assert!(shortened_nested.contains("devnet"));
1279 }
1280
1281 #[test]
1282 fn shorten_home_path_keeps_relative_paths() {
1283 let input = "relative/path";
1284 assert_eq!(shorten_home_path(input), input);
1285 }
1286}