1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
3use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{Result, anyhow};
5use backoff::ExponentialBackoff;
6use backoff::backoff::Backoff;
7use casper_types::U512;
8use casper_types::contract_messages::MessagePayload;
9use casper_types::execution::ExecutionResult;
10use clap::{Args, Parser, Subcommand};
11use directories::BaseDirs;
12use futures::StreamExt;
13use spinners::{Spinner, Spinners};
14use std::collections::{HashMap, HashSet};
15use std::os::unix::process::ExitStatusExt;
16use std::path::{Path, PathBuf};
17use std::sync::Arc;
18use std::sync::atomic::Ordering;
19use tokio::sync::Mutex;
20use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
21use veles_casper_rust_sdk::sse::event::SseEvent;
22use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
23
24#[derive(Parser)]
26#[command(name = "nctl")]
27#[command(
28 about = "casper-devnet launcher for local Casper Network development networks",
29 long_about = None
30)]
31pub struct Cli {
32 #[command(subcommand)]
33 command: Command,
34}
35
36#[derive(Subcommand)]
38enum Command {
39 Start(StartArgs),
41 Assets(AssetsArgs),
43}
44
45#[derive(Args, Clone)]
47struct StartArgs {
48 #[arg(long, default_value = "casper-dev")]
50 network_name: String,
51
52 #[arg(long, value_name = "PATH")]
54 net_path: Option<PathBuf>,
55
56 #[arg(long)]
58 protocol_version: Option<String>,
59
60 #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
62 node_count: u32,
63
64 #[arg(long)]
66 users: Option<u32>,
67
68 #[arg(long, default_value_t = 3)]
70 delay: u64,
71
72 #[arg(long = "log-level", default_value = "info")]
74 log_level: String,
75
76 #[arg(long, default_value = "json")]
78 node_log_format: String,
79
80 #[arg(long)]
82 setup_only: bool,
83
84 #[arg(long)]
86 force_setup: bool,
87
88 #[arg(long, default_value = "default")]
90 seed: Arc<str>,
91}
92
93#[derive(Args)]
95struct AssetsArgs {
96 #[command(subcommand)]
97 command: AssetsCommand,
98}
99
100#[derive(Subcommand)]
102enum AssetsCommand {
103 Add(AssetsAddArgs),
105 Pull(AssetsPullArgs),
107 List,
109}
110
111#[derive(Args, Clone)]
113struct AssetsAddArgs {
114 #[arg(value_name = "PATH")]
116 path: PathBuf,
117}
118
119#[derive(Args, Clone)]
121struct AssetsPullArgs {
122 #[arg(long)]
124 target: Option<String>,
125
126 #[arg(long)]
128 force: bool,
129}
130
131pub async fn run() -> Result<()> {
133 let cli = Cli::parse();
134 match cli.command {
135 Command::Start(args) => run_start(args).await,
136 Command::Assets(args) => run_assets(args).await,
137 }
138}
139
140async fn run_start(args: StartArgs) -> Result<()> {
141 let assets_root = match &args.net_path {
142 Some(path) => path.clone(),
143 None => assets::default_assets_root()?,
144 };
145 let layout = AssetsLayout::new(assets_root, args.network_name.clone());
146 let assets_path = shorten_home_path(&layout.net_dir().display().to_string());
147 println!("assets path: {}", assets_path);
148 let assets_exist = layout.exists().await;
149 if !args.setup_only && !args.force_setup && assets_exist {
150 println!("resuming network operations on {}", layout.network_name());
151 }
152 let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
153
154 if args.setup_only {
155 return run_setup_only(&layout, &args, &protocol_version).await;
156 }
157
158 if args.force_setup {
159 assets::teardown(&layout).await?;
160 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
161 } else if !assets_exist {
162 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
163 }
164
165 if !layout.exists().await {
166 return Err(anyhow!(
167 "assets missing under {}; run with --setup-only to create them",
168 shorten_home_path(&layout.net_dir().display().to_string())
169 ));
170 }
171
172 if !args.force_setup && assets_exist {
173 let restored = assets::ensure_consensus_keys(&layout, Arc::clone(&args.seed)).await?;
174 if restored > 0 {
175 println!("recreated consensus keys for {} node(s)", restored);
176 }
177 }
178
179 let rust_log = args.log_level.clone();
180
181 let plan = StartPlan { rust_log };
182
183 let state_path = layout.net_dir().join("state.json");
184 let state = Arc::new(Mutex::new(State::new(state_path).await?));
185 let started = {
186 let mut state = state.lock().await;
187 process::start(&layout, &plan, &mut state).await?
188 };
189
190 print_pids(&started);
191 print_start_banner(&layout, &started).await;
192 print_derived_accounts_summary(&layout).await;
193
194 let node_ids = unique_node_ids(&started);
195 let details = format_network_details(&layout, &started).await;
196 let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
197 start_sse_spinner(&health).await;
198 spawn_sse_listeners(layout.clone(), &node_ids, health, Arc::clone(&state)).await;
199
200 let (event_tx, mut event_rx) = unbounded_channel();
201 spawn_ctrlc_listener(event_tx.clone());
202 spawn_exit_watchers(started, event_tx);
203
204 if let Some(event) = event_rx.recv().await {
205 match event {
206 RunEvent::CtrlC => {
207 let mut state = state.lock().await;
208 process::stop(&mut state).await?;
209 }
210 RunEvent::ProcessExit {
211 id,
212 pid,
213 code,
214 signal,
215 } => {
216 let mut state = state.lock().await;
217 update_exited_process(&mut state, &id, code, signal).await?;
218 log_exit(&id, pid, code, signal);
219 process::stop(&mut state).await?;
220 }
221 }
222 }
223
224 Ok(())
225}
226
227async fn run_setup_only(
228 layout: &AssetsLayout,
229 args: &StartArgs,
230 protocol_version: &str,
231) -> Result<()> {
232 if args.force_setup {
233 assets::teardown(layout).await?;
234 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
235 print_derived_accounts_summary(layout).await;
236 return Ok(());
237 }
238
239 if layout.exists().await {
240 println!(
241 "assets already exist at {}; use --force-setup to rebuild",
242 shorten_home_path(&layout.net_dir().display().to_string())
243 );
244 print_derived_accounts_summary(layout).await;
245 return Ok(());
246 }
247
248 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
249 print_derived_accounts_summary(layout).await;
250 Ok(())
251}
252
253fn record_pid(record: &ProcessRecord) -> Option<u32> {
254 if let Some(handle) = &record.pid_handle {
255 let pid = handle.load(Ordering::SeqCst);
256 if pid != 0 {
257 return Some(pid);
258 }
259 }
260 record.pid
261}
262
263fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
264 SetupOptions {
265 nodes: args.node_count,
266 users: args.users,
267 delay_seconds: args.delay,
268 network_name: args.network_name.clone(),
269 protocol_version: protocol_version.to_string(),
270 node_log_format: args.node_log_format.clone(),
271 seed: Arc::clone(&args.seed),
272 }
273}
274
275fn print_pids(records: &[RunningProcess]) {
276 for record in records {
277 if let Some(pid) = record_pid(&record.record) {
278 println!(
279 "{} pid={} ({:?})",
280 record.record.id, pid, record.record.kind
281 );
282 }
283 }
284}
285
286async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
287 let symlink_root = layout.net_dir();
288 let mut node_pids: HashMap<u32, u32> = HashMap::new();
289 let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
290 let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32)>> = HashMap::new();
291
292 for process in processes {
293 if let Some(pid) = record_pid(&process.record) {
294 match process.record.kind {
295 ProcessKind::Node => {
296 node_pids.insert(process.record.node_id, pid);
297 }
298 ProcessKind::Sidecar => {
299 sidecar_pids.insert(process.record.node_id, pid);
300 }
301 }
302 process_logs
303 .entry(process.record.node_id)
304 .or_default()
305 .push((process.record.kind.clone(), pid));
306 }
307 }
308
309 let node_ids = unique_node_ids(processes);
310
311 let mut lines = Vec::new();
312 lines.push("network details".to_string());
313 for node_id in node_ids {
314 let node_pid = node_pids
315 .get(&node_id)
316 .map(|pid| pid.to_string())
317 .unwrap_or_else(|| "-".to_string());
318 let sidecar_pid = sidecar_pids
319 .get(&node_id)
320 .map(|pid| pid.to_string())
321 .unwrap_or_else(|| "-".to_string());
322 lines.push(format!(" node-{}", node_id));
323 lines.push(format!(
324 " pids: node={} sidecar={}",
325 node_pid, sidecar_pid
326 ));
327 if let Some(entries) = process_logs.get(&node_id) {
328 let mut entries = entries.clone();
329 entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
330 lines.push(" logs".to_string());
331 for (kind, pid) in entries {
332 let (stdout_link, stderr_link) = log_symlink_paths(&symlink_root, &kind, node_id);
333 lines.push(format!(
334 " {} pid={} stdout={} stderr={}",
335 process_kind_label(&kind),
336 pid,
337 stdout_link,
338 stderr_link
339 ));
340 }
341 }
342 lines.push(" endpoints".to_string());
343 lines.push(format!(" rest: {}", assets::rest_endpoint(node_id)));
344 lines.push(format!(" sse: {}", assets::sse_endpoint(node_id)));
345 lines.push(format!(" rpc: {}", assets::rpc_endpoint(node_id)));
346 lines.push(format!(" binary: {}", assets::binary_address(node_id)));
347 lines.push(format!(
348 " gossip: {}",
349 assets::network_address(node_id)
350 ));
351 }
352
353 lines.join("\n")
354}
355
356fn process_kind_label(kind: &ProcessKind) -> &'static str {
357 match kind {
358 ProcessKind::Node => "node",
359 ProcessKind::Sidecar => "sidecar",
360 }
361}
362
363fn shorten_home_path(path: &str) -> String {
364 let path = Path::new(path);
365 let Some(base_dirs) = BaseDirs::new() else {
366 return path.display().to_string();
367 };
368 let home = base_dirs.home_dir();
369 match path.strip_prefix(home) {
370 Ok(stripped) => {
371 if stripped.as_os_str().is_empty() {
372 return "~".to_string();
373 }
374 let mut shorthand = PathBuf::from("~");
375 shorthand.push(stripped);
376 shorthand.display().to_string()
377 }
378 Err(_) => path.display().to_string(),
379 }
380}
381
382fn log_symlink_paths(symlink_root: &Path, kind: &ProcessKind, node_id: u32) -> (String, String) {
383 let base = match kind {
384 ProcessKind::Node => format!("node-{}", node_id),
385 ProcessKind::Sidecar => format!("sidecar-{}", node_id),
386 };
387 let stdout_link = symlink_root.join(format!("{}.stdout", base));
388 let stderr_link = symlink_root.join(format!("{}.stderr", base));
389 (
390 shorten_home_path(&stdout_link.display().to_string()),
391 shorten_home_path(&stderr_link.display().to_string()),
392 )
393}
394
395async fn print_derived_accounts_summary(layout: &AssetsLayout) {
396 if let Some(summary) = assets::derived_accounts_summary(layout).await {
397 if let Some(parsed) = parse_derived_accounts_csv(&summary) {
398 println!("derived accounts");
399 if !parsed.validators.is_empty() {
400 println!(" validators");
401 print_account_group(&parsed.validators);
402 }
403 if !parsed.users.is_empty() {
404 println!(" users");
405 print_account_group(&parsed.users);
406 }
407 if !parsed.other.is_empty() {
408 println!(" other");
409 print_account_group(&parsed.other);
410 }
411 } else {
412 println!("derived accounts");
413 for line in summary.lines() {
414 println!(" {}", line);
415 }
416 }
417 }
418}
419
420struct DerivedAccountRow {
421 name: String,
422 key_type: String,
423 derivation: String,
424 path: String,
425 account_hash: String,
426 balance: String,
427}
428
429struct DerivedAccountsParsed {
430 validators: Vec<DerivedAccountRow>,
431 users: Vec<DerivedAccountRow>,
432 other: Vec<DerivedAccountRow>,
433}
434
435fn parse_derived_accounts_csv(summary: &str) -> Option<DerivedAccountsParsed> {
436 let mut lines = summary.lines();
437 let header = lines.next()?.trim();
438 if header != "kind,name,key_type,derivation,path,account_hash,balance" {
439 return None;
440 }
441
442 let mut parsed = DerivedAccountsParsed {
443 validators: Vec::new(),
444 users: Vec::new(),
445 other: Vec::new(),
446 };
447
448 for line in lines {
449 let line = line.trim();
450 if line.is_empty() {
451 continue;
452 }
453 let mut parts = line.splitn(7, ',');
454 let kind = parts.next()?.to_string();
455 let name = parts.next()?.to_string();
456 let key_type = parts.next()?.to_string();
457 let derivation = parts.next()?.to_string();
458 let path = parts.next()?.to_string();
459 let account_hash = parts.next()?.to_string();
460 let balance = parts.next()?.to_string();
461 let row = DerivedAccountRow {
462 name,
463 key_type,
464 derivation,
465 path,
466 account_hash,
467 balance,
468 };
469 match kind.as_str() {
470 "validator" => parsed.validators.push(row),
471 "user" => parsed.users.push(row),
472 _ => parsed.other.push(row),
473 }
474 }
475
476 Some(parsed)
477}
478
479fn print_account_group(rows: &[DerivedAccountRow]) {
480 for row in rows {
481 println!(" {}:", row.name);
482 println!(" key_type: {}", row.key_type);
483 println!(" derivation: {}", row.derivation);
484 println!(" path: {}", row.path);
485 println!(" account_hash: {}", row.account_hash);
486 println!(" balance: {}", row.balance);
487 }
488}
489
490fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
491 let mut nodes = HashSet::new();
492 for process in processes {
493 nodes.insert(process.record.node_id);
494 }
495 let mut ids: Vec<u32> = nodes.into_iter().collect();
496 ids.sort_unstable();
497 ids
498}
499
500enum RunEvent {
501 CtrlC,
502 ProcessExit {
503 id: String,
504 pid: Option<u32>,
505 code: Option<i32>,
506 signal: Option<i32>,
507 },
508}
509
510fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
511 tokio::spawn(async move {
512 if tokio::signal::ctrl_c().await.is_ok() {
513 let _ = tx.send(RunEvent::CtrlC);
514 }
515 });
516}
517
518fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
519 for running in processes {
520 let tx = tx.clone();
521 tokio::spawn(async move {
522 let id = running.record.id.clone();
523 match running.handle {
524 ProcessHandle::Child(mut child) => {
525 if let Ok(status) = child.wait().await {
526 let pid = record_pid(&running.record).or_else(|| child.id());
527 let code = status.code();
528 let signal = status.signal();
529 let _ = tx.send(RunEvent::ProcessExit {
530 id: id.clone(),
531 pid,
532 code,
533 signal,
534 });
535 }
536 }
537 ProcessHandle::Task(handle) => {
538 let status = handle.await;
539 let pid = record_pid(&running.record);
540 let (code, signal) = match status {
541 Ok(Ok(())) => (Some(0), None),
542 Ok(Err(_)) => (None, None),
543 Err(_) => (None, None),
544 };
545 let _ = tx.send(RunEvent::ProcessExit {
546 id: id.clone(),
547 pid,
548 code,
549 signal,
550 });
551 }
552 }
553 });
554 }
555}
556
557const SSE_WAIT_MESSAGE: &str = "Waiting for SSE connection...";
558const BLOCK_WAIT_MESSAGE: &str = "Waiting for new blocks...";
559
560struct SseHealth {
561 expected_nodes: HashSet<u32>,
562 versions: HashMap<u32, String>,
563 announced: bool,
564 block_seen: bool,
565 sse_spinner: Option<Spinner>,
566 block_spinner: Option<Spinner>,
567 details: String,
568}
569
570impl SseHealth {
571 fn new(node_ids: Vec<u32>, details: String) -> Self {
572 Self {
573 expected_nodes: node_ids.into_iter().collect(),
574 versions: HashMap::new(),
575 announced: false,
576 block_seen: false,
577 sse_spinner: None,
578 block_spinner: None,
579 details,
580 }
581 }
582}
583
584async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
585 if node_id != 1 {
586 return false;
587 }
588 let state = health.lock().await;
589 state.announced
590}
591
592fn start_spinner(message: &str) -> Spinner {
593 Spinner::new(Spinners::Dots, message.to_string())
594}
595
596async fn start_sse_spinner(health: &Arc<Mutex<SseHealth>>) {
597 let mut state = health.lock().await;
598 if state.sse_spinner.is_none() {
599 state.sse_spinner = Some(start_spinner(SSE_WAIT_MESSAGE));
600 }
601}
602
603async fn spawn_sse_listeners(
604 layout: AssetsLayout,
605 node_ids: &[u32],
606 health: Arc<Mutex<SseHealth>>,
607 state: Arc<Mutex<State>>,
608) {
609 for node_id in node_ids {
610 let node_id = *node_id;
611 let endpoint = assets::sse_endpoint(node_id);
612 let layout = layout.clone();
613 let health = Arc::clone(&health);
614 let state = Arc::clone(&state);
615 tokio::spawn(async move {
616 run_sse_listener(node_id, endpoint, health, state, layout).await;
617 });
618 }
619}
620
621async fn run_sse_listener(
622 node_id: u32,
623 endpoint: String,
624 health: Arc<Mutex<SseHealth>>,
625 state: Arc<Mutex<State>>,
626 layout: AssetsLayout,
627) {
628 let mut backoff = ExponentialBackoff::default();
629
630 loop {
631 let config = match ListenerConfig::builder()
632 .with_endpoint(endpoint.clone())
633 .build()
634 {
635 Ok(config) => config,
636 Err(_) => {
637 if !sleep_backoff(&mut backoff).await {
638 return;
639 }
640 continue;
641 }
642 };
643
644 let stream = match sse::listener(config).await {
645 Ok(stream) => {
646 backoff.reset();
647 stream
648 }
649 Err(_) => {
650 if !sleep_backoff(&mut backoff).await {
651 return;
652 }
653 continue;
654 }
655 };
656
657 futures::pin_mut!(stream);
658 let mut stream_failed = false;
659 while let Some(event) = stream.next().await {
660 match event {
661 Ok(sse_event) => match sse_event {
662 SseEvent::ApiVersion(version) => {
663 record_api_version(node_id, version.to_string(), &health).await;
664 }
665 SseEvent::BlockAdded { block_hash, block } => {
666 if node_id == 1
667 && let Err(err) = record_last_block_height(&state, block.height()).await
668 {
669 eprintln!("warning: failed to record last block height: {}", err);
670 }
671 if should_log_primary(node_id, &health).await {
672 mark_block_seen(&health, &layout).await;
673 let prefix = timestamp_prefix();
674 println!(
675 "{} Block {} added (height={} era={})",
676 prefix,
677 block_hash,
678 block.height(),
679 block.era_id().value()
680 );
681 }
682 }
683 SseEvent::TransactionAccepted(transaction) => {
684 if node_id == 1 {
685 let prefix = timestamp_prefix();
686 println!("{} Transaction {} accepted", prefix, transaction.hash());
687 }
688 }
689 SseEvent::TransactionProcessed {
690 transaction_hash,
691 execution_result,
692 messages,
693 ..
694 } => {
695 if node_id == 1 {
696 let tx_hash = transaction_hash.to_string();
697 let prefix = timestamp_prefix();
698 log_transaction_processed(
699 &prefix,
700 &tx_hash,
701 &execution_result,
702 &messages,
703 );
704 }
705 }
706 _ => {}
707 },
708 Err(_) => {
709 stream_failed = true;
710 break;
711 }
712 }
713 }
714
715 if stream_failed && !sleep_backoff(&mut backoff).await {
716 return;
717 }
718 }
719}
720
721async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
722 let (summary, details, sse_spinner) = {
723 let mut state = health.lock().await;
724 if !state.expected_nodes.contains(&node_id) {
725 return;
726 }
727 state.versions.insert(node_id, version);
728 if state.announced || state.versions.len() != state.expected_nodes.len() {
729 return;
730 }
731
732 let summary = version_summary(&state.versions);
733 let details = state.details.clone();
734 let sse_spinner = state.sse_spinner.take();
735 if state.block_spinner.is_none() {
736 state.block_spinner = Some(start_spinner(BLOCK_WAIT_MESSAGE));
737 }
738 state.announced = true;
739 state.block_seen = false;
740 (summary, details, sse_spinner)
741 };
742
743 if let Some(mut spinner) = sse_spinner {
744 spinner.stop_with_message("SSE connection established.".to_string());
745 }
746 println!("Network is healthy ({})", summary);
747 println!("{}", details);
748}
749
750async fn mark_block_seen(health: &Arc<Mutex<SseHealth>>, layout: &AssetsLayout) {
751 let (block_spinner, node_ids) = {
752 let mut state = health.lock().await;
753 if state.block_seen {
754 return;
755 }
756 state.block_seen = true;
757 (
758 state.block_spinner.take(),
759 state.expected_nodes.iter().copied().collect::<Vec<_>>(),
760 )
761 };
762
763 if let Some(mut spinner) = block_spinner {
764 spinner.stop_with_message(BLOCK_WAIT_MESSAGE.to_string());
765 }
766
767 match assets::remove_consensus_keys(layout, &node_ids).await {
768 Ok(removed) => {
769 if removed > 0 {
770 println!("Consensus secret keys removed from disk.");
771 }
772 }
773 Err(err) => {
774 eprintln!("warning: failed to remove consensus secret keys: {}", err);
775 }
776 }
777}
778
779async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
780 let mut state = state.lock().await;
781 if state.last_block_height == Some(height) {
782 return Ok(());
783 }
784 state.last_block_height = Some(height);
785 state.touch().await?;
786 Ok(())
787}
788
789fn version_summary(versions: &HashMap<u32, String>) -> String {
790 let mut unique: Vec<String> = versions.values().cloned().collect();
791 unique.sort();
792 unique.dedup();
793 if unique.len() == 1 {
794 format!("version {}", unique[0])
795 } else {
796 format!("versions {}", unique.join(", "))
797 }
798}
799
800async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
801 if let Some(delay) = backoff.next_backoff() {
802 tokio::time::sleep(delay).await;
803 return true;
804 }
805 false
806}
807
808fn log_transaction_processed(
809 prefix: &str,
810 transaction_hash: &str,
811 execution_result: &ExecutionResult,
812 messages: &[casper_types::contract_messages::Message],
813) {
814 let consumed = execution_result.consumed();
815 let consumed_cspr = format_cspr_u512(&consumed);
816 if let Some(error) = execution_result.error_message() {
817 println!(
818 "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
819 prefix, transaction_hash, error, consumed, consumed_cspr
820 );
821 } else {
822 println!(
823 "{} Transaction {} processed succeeded gas={} gas_cspr={}",
824 prefix, transaction_hash, consumed, consumed_cspr
825 );
826 }
827
828 for message in messages {
829 let entity = message.entity_addr().to_formatted_string();
830 let topic = message.topic_name();
831 let payload = format_message_payload(message.payload());
832 println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
833 }
834}
835
836fn timestamp_prefix() -> String {
837 time::OffsetDateTime::now_utc()
838 .format(&time::format_description::well_known::Rfc3339)
839 .unwrap_or_else(|_| "unknown-time".to_string())
840}
841
842fn format_message_payload(payload: &MessagePayload) -> String {
843 match payload {
844 MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
845 MessagePayload::String(value) => format!("{:?}", value),
846 }
847}
848
849fn encode_hex(bytes: &[u8]) -> String {
850 let mut out = String::with_capacity(bytes.len() * 2);
851 for byte in bytes {
852 use std::fmt::Write;
853 let _ = write!(&mut out, "{:02x}", byte);
854 }
855 out
856}
857
858fn format_cspr_u512(motes: &U512) -> String {
859 let motes_str = motes.to_string();
860 let digits = motes_str.len();
861 if digits <= 9 {
862 let frac = format!("{:0>9}", motes_str);
863 let frac = frac.trim_end_matches('0');
864 if frac.is_empty() {
865 return "0".to_string();
866 }
867 return format!("0.{}", frac);
868 }
869
870 let split = digits - 9;
871 let (whole, frac) = motes_str.split_at(split);
872 let frac = frac.trim_end_matches('0');
873 if frac.is_empty() {
874 return whole.to_string();
875 }
876 format!("{}.{}", whole, frac)
877}
878
879async fn update_exited_process(
880 state: &mut State,
881 id: &str,
882 code: Option<i32>,
883 signal: Option<i32>,
884) -> Result<()> {
885 for record in &mut state.processes {
886 if record.id == id {
887 record.last_status = ProcessStatus::Exited;
888 record.exit_code = code;
889 record.exit_signal = signal;
890 record.stopped_at = Some(time::OffsetDateTime::now_utc());
891 break;
892 }
893 }
894 state.touch().await?;
895 Ok(())
896}
897
898fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
899 if let Some(pid) = pid {
900 if let Some(signal) = signal {
901 println!(
902 "process {} (pid {}) exited due to signal {}",
903 id, pid, signal
904 );
905 } else if let Some(code) = code {
906 println!("process {} (pid {}) exited with code {}", id, pid, code);
907 } else {
908 println!("process {} (pid {}) exited", id, pid);
909 }
910 } else if let Some(signal) = signal {
911 println!("process {} exited due to signal {}", id, signal);
912 } else if let Some(code) = code {
913 println!("process {} exited with code {}", id, code);
914 } else {
915 println!("process {} exited", id);
916 }
917}
918
919async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
920 let total_nodes = layout.count_nodes().await.unwrap_or(0);
921 let target = format!("all nodes ({})", total_nodes);
922 let sidecars = processes
923 .iter()
924 .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
925 .count();
926 println!(
927 "started {} process(es) for {} (sidecars: {})",
928 processes.len(),
929 target,
930 sidecars
931 );
932}
933
934fn looks_like_url(path: &Path) -> bool {
935 let value = path.to_string_lossy();
936 value.starts_with("http://") || value.starts_with("https://")
937}
938
939async fn run_assets(args: AssetsArgs) -> Result<()> {
940 match args.command {
941 AssetsCommand::Add(add) => run_assets_add(add).await,
942 AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
943 AssetsCommand::List => run_assets_list().await,
944 }
945}
946
947async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
948 if looks_like_url(&args.path) {
949 return Err(anyhow!(
950 "assets URL is not supported yet; provide a local .tar.gz path"
951 ));
952 }
953 assets::install_assets_bundle(&args.path).await?;
954 println!(
955 "assets installed into {}",
956 assets::assets_bundle_root()?.display()
957 );
958 Ok(())
959}
960
961async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
962 assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
963 Ok(())
964}
965
966async fn run_assets_list() -> Result<()> {
967 let mut versions = assets::list_bundle_versions().await?;
968 if versions.is_empty() {
969 println!("no assets found");
970 return Ok(());
971 }
972 versions.sort_by(|a, b| b.cmp(a));
973 for version in versions {
974 println!("{}", version);
975 }
976 Ok(())
977}
978
979async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
980 if let Some(raw) = candidate {
981 let version = assets::parse_protocol_version(raw)?;
982 if !assets::has_bundle_version(&version).await? {
983 let argv0 = std::env::args()
984 .next()
985 .unwrap_or_else(|| "casper-devnet".to_string());
986 let pull_cmd = format!("{} assets pull", argv0);
987 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
988 return Err(anyhow!(
989 "assets for version {} not found; run `{}` or `{}`",
990 version,
991 pull_cmd,
992 add_cmd
993 ));
994 }
995 return Ok(version.to_string());
996 }
997 match assets::most_recent_bundle_version().await {
998 Ok(version) => Ok(version.to_string()),
999 Err(_) => {
1000 let argv0 = std::env::args()
1001 .next()
1002 .unwrap_or_else(|| "casper-devnet".to_string());
1003 let pull_cmd = format!("{} assets pull", argv0);
1004 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
1005 Err(anyhow!(
1006 "no assets found; run `{}` or `{}`",
1007 pull_cmd,
1008 add_cmd
1009 ))
1010 }
1011 }
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016 use super::{encode_hex, format_cspr_u512, format_message_payload, shorten_home_path};
1017 use casper_types::U512;
1018 use casper_types::contract_messages::MessagePayload;
1019 use directories::BaseDirs;
1020
1021 #[test]
1022 fn format_cspr_u512_handles_whole_and_fractional() {
1023 assert_eq!(format_cspr_u512(&U512::zero()), "0");
1024 assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
1025 assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
1026 assert_eq!(
1027 format_cspr_u512(&U512::from(1_000_000_001u64)),
1028 "1.000000001"
1029 );
1030 assert_eq!(
1031 format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
1032 "123"
1033 );
1034 assert_eq!(
1035 format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
1036 "123.000000456"
1037 );
1038 }
1039
1040 #[test]
1041 fn format_message_payload_renders_string_with_quotes() {
1042 let payload = MessagePayload::String("hello".to_string());
1043 assert_eq!(format_message_payload(&payload), "\"hello\"");
1044 }
1045
1046 #[test]
1047 fn encode_hex_renders_lowercase() {
1048 assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
1049 }
1050
1051 #[test]
1052 fn shorten_home_path_replaces_home_prefix() {
1053 let Some(base_dirs) = BaseDirs::new() else {
1054 return;
1055 };
1056 let home = base_dirs.home_dir();
1057 let shortened = shorten_home_path(&home.to_string_lossy());
1058 assert_eq!(shortened, "~");
1059
1060 let nested = home.join("devnet/logs/stdout.log");
1061 let shortened_nested = shorten_home_path(&nested.to_string_lossy());
1062 assert!(shortened_nested.starts_with("~"));
1063 assert!(shortened_nested.contains("devnet"));
1064 }
1065
1066 #[test]
1067 fn shorten_home_path_keeps_relative_paths() {
1068 let input = "relative/path";
1069 assert_eq!(shorten_home_path(input), input);
1070 }
1071}