1use crate::assets::{self, AssetsLayout, SetupOptions};
2use crate::process::{self, ProcessHandle, RunningProcess, StartPlan};
3use crate::state::{ProcessKind, ProcessRecord, ProcessStatus, State};
4use anyhow::{anyhow, Result};
5use backoff::backoff::Backoff;
6use backoff::ExponentialBackoff;
7use casper_types::contract_messages::MessagePayload;
8use casper_types::execution::ExecutionResult;
9use casper_types::U512;
10use clap::{Args, Parser, Subcommand};
11use futures::StreamExt;
12use std::collections::{HashMap, HashSet};
13use std::os::unix::process::ExitStatusExt;
14use std::path::{Path, PathBuf};
15use std::sync::atomic::Ordering;
16use std::sync::Arc;
17use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
18use tokio::sync::Mutex;
19use veles_casper_rust_sdk::sse::event::SseEvent;
20use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
21
22#[derive(Parser)]
24#[command(name = "nctl")]
25#[command(
26 about = "casper-devnet launcher for local Casper Network development networks",
27 long_about = None
28)]
29pub struct Cli {
30 #[command(subcommand)]
31 command: Command,
32}
33
34#[derive(Subcommand)]
36enum Command {
37 Start(StartArgs),
39 Assets(AssetsArgs),
41}
42
43#[derive(Args, Clone)]
45struct StartArgs {
46 #[arg(long, default_value = "casper-dev")]
48 network_name: String,
49
50 #[arg(long)]
52 protocol_version: Option<String>,
53
54 #[arg(long = "node-count", aliases = ["nodes", "validators"], default_value_t = 4)]
56 node_count: u32,
57
58 #[arg(long)]
60 users: Option<u32>,
61
62 #[arg(long, default_value_t = 30)]
64 delay: u64,
65
66 #[arg(long, default_value = "info")]
68 loglevel: String,
69
70 #[arg(long, default_value = "json")]
72 node_log_format: String,
73
74 #[arg(long)]
76 setup_only: bool,
77
78 #[arg(long)]
80 force_setup: bool,
81
82 #[arg(long, default_value = "default")]
84 seed: Arc<str>,
85}
86
87#[derive(Args)]
89struct AssetsArgs {
90 #[command(subcommand)]
91 command: AssetsCommand,
92}
93
94#[derive(Subcommand)]
96enum AssetsCommand {
97 Add(AssetsAddArgs),
99 Pull(AssetsPullArgs),
101 List,
103}
104
105#[derive(Args, Clone)]
107struct AssetsAddArgs {
108 #[arg(value_name = "PATH")]
110 path: PathBuf,
111}
112
113#[derive(Args, Clone)]
115struct AssetsPullArgs {
116 #[arg(long)]
118 target: Option<String>,
119
120 #[arg(long)]
122 force: bool,
123}
124
125pub async fn run() -> Result<()> {
127 let cli = Cli::parse();
128 match cli.command {
129 Command::Start(args) => run_start(args).await,
130 Command::Assets(args) => run_assets(args).await,
131 }
132}
133
134async fn run_start(args: StartArgs) -> Result<()> {
135 let assets_root = assets::default_assets_root()?;
136 let layout = AssetsLayout::new(assets_root, args.network_name.clone());
137 println!("assets path: {}", layout.net_dir().display());
138 let assets_exist = layout.exists().await;
139 if !args.setup_only && !args.force_setup && assets_exist {
140 println!("resuming network operations on {}", layout.network_name());
141 }
142 let protocol_version = resolve_protocol_version(&args.protocol_version).await?;
143
144 if args.setup_only {
145 return run_setup_only(&layout, &args, &protocol_version).await;
146 }
147
148 if args.force_setup {
149 assets::teardown(&layout).await?;
150 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
151 } else if !assets_exist {
152 assets::setup_local(&layout, &setup_options(&args, &protocol_version)).await?;
153 }
154
155 if !layout.exists().await {
156 return Err(anyhow!(
157 "assets missing under {}; run with --setup-only to create them",
158 layout.net_dir().display()
159 ));
160 }
161
162 let rust_log = args.loglevel.clone();
163
164 let plan = StartPlan { rust_log };
165
166 let state_path = layout.net_dir().join("state.json");
167 let mut state = State::new(state_path).await?;
168 let started = process::start(&layout, &plan, &mut state).await?;
169
170 print_pids(&started);
171 print_start_banner(&layout, &started).await;
172 print_derived_accounts_summary(&layout).await;
173
174 let node_ids = unique_node_ids(&started);
175 let details = format_network_details(&layout, &started).await;
176 let health = Arc::new(Mutex::new(SseHealth::new(node_ids.clone(), details)));
177 println!("Waiting for SSE connection...");
178 spawn_sse_listeners(&layout, &node_ids, health).await;
179
180 let (event_tx, mut event_rx) = unbounded_channel();
181 spawn_ctrlc_listener(event_tx.clone());
182 spawn_exit_watchers(started, event_tx);
183
184 if let Some(event) = event_rx.recv().await {
185 match event {
186 RunEvent::CtrlC => {
187 process::stop(&mut state).await?;
188 }
189 RunEvent::ProcessExit {
190 id,
191 pid,
192 code,
193 signal,
194 } => {
195 update_exited_process(&mut state, &id, code, signal).await?;
196 log_exit(&id, pid, code, signal);
197 process::stop(&mut state).await?;
198 }
199 }
200 }
201
202 Ok(())
203}
204
205async fn run_setup_only(
206 layout: &AssetsLayout,
207 args: &StartArgs,
208 protocol_version: &str,
209) -> Result<()> {
210 if args.force_setup {
211 assets::teardown(layout).await?;
212 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
213 print_derived_accounts_summary(layout).await;
214 return Ok(());
215 }
216
217 if layout.exists().await {
218 println!(
219 "assets already exist at {}; use --force-setup to rebuild",
220 layout.net_dir().display()
221 );
222 print_derived_accounts_summary(layout).await;
223 return Ok(());
224 }
225
226 assets::setup_local(layout, &setup_options(args, protocol_version)).await?;
227 print_derived_accounts_summary(layout).await;
228 Ok(())
229}
230
231fn record_pid(record: &ProcessRecord) -> Option<u32> {
232 if let Some(handle) = &record.pid_handle {
233 let pid = handle.load(Ordering::SeqCst);
234 if pid != 0 {
235 return Some(pid);
236 }
237 }
238 record.pid
239}
240
241fn setup_options(args: &StartArgs, protocol_version: &str) -> SetupOptions {
242 SetupOptions {
243 nodes: args.node_count,
244 users: args.users,
245 delay_seconds: args.delay,
246 network_name: args.network_name.clone(),
247 protocol_version: protocol_version.to_string(),
248 node_log_format: args.node_log_format.clone(),
249 seed: Arc::clone(&args.seed),
250 }
251}
252
253fn print_pids(records: &[RunningProcess]) {
254 for record in records {
255 if let Some(pid) = record_pid(&record.record) {
256 println!(
257 "{} pid={} ({:?})",
258 record.record.id, pid, record.record.kind
259 );
260 }
261 }
262}
263
264async fn format_network_details(layout: &AssetsLayout, processes: &[RunningProcess]) -> String {
265 let mut node_pids: HashMap<u32, u32> = HashMap::new();
266 let mut sidecar_pids: HashMap<u32, u32> = HashMap::new();
267 let mut process_logs: HashMap<u32, Vec<(ProcessKind, u32, String, String)>> = HashMap::new();
268
269 for process in processes {
270 if let Some(pid) = record_pid(&process.record) {
271 match process.record.kind {
272 ProcessKind::Node => {
273 node_pids.insert(process.record.node_id, pid);
274 }
275 ProcessKind::Sidecar => {
276 sidecar_pids.insert(process.record.node_id, pid);
277 }
278 }
279 process_logs
280 .entry(process.record.node_id)
281 .or_default()
282 .push((
283 process.record.kind.clone(),
284 pid,
285 process.record.stdout_path.clone(),
286 process.record.stderr_path.clone(),
287 ));
288 }
289 }
290
291 let node_ids = unique_node_ids(processes);
292
293 let mut lines = Vec::new();
294 lines.push("network details".to_string());
295 lines.push(format!("assets: {}", layout.net_dir().display()));
296 for node_id in node_ids {
297 let node_pid = node_pids
298 .get(&node_id)
299 .map(|pid| pid.to_string())
300 .unwrap_or_else(|| "-".to_string());
301 let sidecar_pid = sidecar_pids
302 .get(&node_id)
303 .map(|pid| pid.to_string())
304 .unwrap_or_else(|| "-".to_string());
305 lines.push(format!(
306 "node-{} pid={} sidecar pid={}",
307 node_id, node_pid, sidecar_pid
308 ));
309 if let Some(entries) = process_logs.get(&node_id) {
310 let mut entries = entries.clone();
311 entries.sort_by_key(|entry| process_kind_label(&entry.0).to_string());
312 for (kind, pid, stdout, stderr) in entries {
313 lines.push(format!(
314 " {} pid={} stdout={} stderr={}",
315 process_kind_label(&kind),
316 pid,
317 stdout,
318 stderr
319 ));
320 }
321 }
322 lines.push(format!(" rest: {}", assets::rest_endpoint(node_id)));
323 lines.push(format!(" sse: {}", assets::sse_endpoint(node_id)));
324 lines.push(format!(" rpc: {}", assets::rpc_endpoint(node_id)));
325 lines.push(format!(" binary: {}", assets::binary_address(node_id)));
326 lines.push(format!(" gossip: {}", assets::network_address(node_id)));
327 }
328
329 lines.join("\n")
330}
331
332async fn print_derived_accounts_summary(layout: &AssetsLayout) {
333 if let Some(summary) = assets::derived_accounts_summary(layout).await {
334 println!("derived accounts");
335 for line in summary.lines() {
336 println!(" {}", line);
337 }
338 }
339}
340
341fn process_kind_label(kind: &ProcessKind) -> &'static str {
342 match kind {
343 ProcessKind::Node => "node",
344 ProcessKind::Sidecar => "sidecar",
345 }
346}
347
348fn unique_node_ids(processes: &[RunningProcess]) -> Vec<u32> {
349 let mut nodes = HashSet::new();
350 for process in processes {
351 nodes.insert(process.record.node_id);
352 }
353 let mut ids: Vec<u32> = nodes.into_iter().collect();
354 ids.sort_unstable();
355 ids
356}
357
358enum RunEvent {
359 CtrlC,
360 ProcessExit {
361 id: String,
362 pid: Option<u32>,
363 code: Option<i32>,
364 signal: Option<i32>,
365 },
366}
367
368fn spawn_ctrlc_listener(tx: UnboundedSender<RunEvent>) {
369 tokio::spawn(async move {
370 if tokio::signal::ctrl_c().await.is_ok() {
371 let _ = tx.send(RunEvent::CtrlC);
372 }
373 });
374}
375
376fn spawn_exit_watchers(processes: Vec<RunningProcess>, tx: UnboundedSender<RunEvent>) {
377 for running in processes {
378 let tx = tx.clone();
379 tokio::spawn(async move {
380 let id = running.record.id.clone();
381 match running.handle {
382 ProcessHandle::Child(mut child) => {
383 if let Ok(status) = child.wait().await {
384 let pid = record_pid(&running.record).or_else(|| child.id());
385 let code = status.code();
386 let signal = status.signal();
387 let _ = tx.send(RunEvent::ProcessExit {
388 id: id.clone(),
389 pid,
390 code,
391 signal,
392 });
393 }
394 }
395 ProcessHandle::Task(handle) => {
396 let status = handle.await;
397 let pid = record_pid(&running.record);
398 let (code, signal) = match status {
399 Ok(Ok(())) => (Some(0), None),
400 Ok(Err(_)) => (None, None),
401 Err(_) => (None, None),
402 };
403 let _ = tx.send(RunEvent::ProcessExit {
404 id: id.clone(),
405 pid,
406 code,
407 signal,
408 });
409 }
410 }
411 });
412 }
413}
414
415struct SseHealth {
416 expected_nodes: HashSet<u32>,
417 versions: HashMap<u32, String>,
418 announced: bool,
419 details: String,
420}
421
422impl SseHealth {
423 fn new(node_ids: Vec<u32>, details: String) -> Self {
424 Self {
425 expected_nodes: node_ids.into_iter().collect(),
426 versions: HashMap::new(),
427 announced: false,
428 details,
429 }
430 }
431}
432
433async fn should_log_primary(node_id: u32, health: &Arc<Mutex<SseHealth>>) -> bool {
434 if node_id != 1 {
435 return false;
436 }
437 let state = health.lock().await;
438 state.announced
439}
440
441async fn spawn_sse_listeners(
442 _layout: &AssetsLayout,
443 node_ids: &[u32],
444 health: Arc<Mutex<SseHealth>>,
445) {
446 for node_id in node_ids {
447 let node_id = *node_id;
448 let endpoint = assets::sse_endpoint(node_id);
449 let health = Arc::clone(&health);
450 tokio::spawn(async move {
451 run_sse_listener(node_id, endpoint, health).await;
452 });
453 }
454}
455
456async fn run_sse_listener(node_id: u32, endpoint: String, health: Arc<Mutex<SseHealth>>) {
457 let mut backoff = ExponentialBackoff::default();
458
459 loop {
460 let config = match ListenerConfig::builder()
461 .with_endpoint(endpoint.clone())
462 .build()
463 {
464 Ok(config) => config,
465 Err(_) => {
466 if !sleep_backoff(&mut backoff).await {
467 return;
468 }
469 continue;
470 }
471 };
472
473 let stream = match sse::listener(config).await {
474 Ok(stream) => {
475 backoff.reset();
476 stream
477 }
478 Err(_) => {
479 if !sleep_backoff(&mut backoff).await {
480 return;
481 }
482 continue;
483 }
484 };
485
486 futures::pin_mut!(stream);
487 let mut stream_failed = false;
488 while let Some(event) = stream.next().await {
489 match event {
490 Ok(sse_event) => match sse_event {
491 SseEvent::ApiVersion(version) => {
492 record_api_version(node_id, version.to_string(), &health).await;
493 }
494 SseEvent::BlockAdded { block_hash, block } => {
495 if should_log_primary(node_id, &health).await {
496 let prefix = timestamp_prefix();
497 println!(
498 "{} Block {} added (height={} era={})",
499 prefix,
500 block_hash,
501 block.height(),
502 block.era_id().value()
503 );
504 }
505 }
506 SseEvent::TransactionAccepted(transaction) => {
507 if node_id == 1 {
508 let prefix = timestamp_prefix();
509 println!("{} Transaction {} accepted", prefix, transaction.hash());
510 }
511 }
512 SseEvent::TransactionProcessed {
513 transaction_hash,
514 execution_result,
515 messages,
516 ..
517 } => {
518 if node_id == 1 {
519 let tx_hash = transaction_hash.to_string();
520 let prefix = timestamp_prefix();
521 log_transaction_processed(
522 &prefix,
523 &tx_hash,
524 &execution_result,
525 &messages,
526 );
527 }
528 }
529 _ => {}
530 },
531 Err(_) => {
532 stream_failed = true;
533 break;
534 }
535 }
536 }
537
538 if stream_failed && !sleep_backoff(&mut backoff).await {
539 return;
540 }
541 }
542}
543
544async fn record_api_version(node_id: u32, version: String, health: &Arc<Mutex<SseHealth>>) {
545 let mut state = health.lock().await;
546 if !state.expected_nodes.contains(&node_id) {
547 return;
548 }
549 state.versions.insert(node_id, version);
550 if state.announced || state.versions.len() != state.expected_nodes.len() {
551 return;
552 }
553
554 let summary = version_summary(&state.versions);
555 println!("Network is healthy ({})", summary);
556 println!("{}", state.details);
557 println!("Waiting for new blocks...");
558 state.announced = true;
559}
560
561fn version_summary(versions: &HashMap<u32, String>) -> String {
562 let mut unique: Vec<String> = versions.values().cloned().collect();
563 unique.sort();
564 unique.dedup();
565 if unique.len() == 1 {
566 format!("version {}", unique[0])
567 } else {
568 format!("versions {}", unique.join(", "))
569 }
570}
571
572async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
573 if let Some(delay) = backoff.next_backoff() {
574 tokio::time::sleep(delay).await;
575 return true;
576 }
577 false
578}
579
580fn log_transaction_processed(
581 prefix: &str,
582 transaction_hash: &str,
583 execution_result: &ExecutionResult,
584 messages: &[casper_types::contract_messages::Message],
585) {
586 let consumed = execution_result.consumed();
587 let consumed_cspr = format_cspr_u512(&consumed);
588 if let Some(error) = execution_result.error_message() {
589 println!(
590 "{} Transaction {} processed failed ({}) gas={} gas_cspr={}",
591 prefix, transaction_hash, error, consumed, consumed_cspr
592 );
593 } else {
594 println!(
595 "{} Transaction {} processed succeeded gas={} gas_cspr={}",
596 prefix, transaction_hash, consumed, consumed_cspr
597 );
598 }
599
600 for message in messages {
601 let entity = message.entity_addr().to_formatted_string();
602 let topic = message.topic_name();
603 let payload = format_message_payload(message.payload());
604 println!("{} 📨 {} {}: {}", prefix, entity, topic, payload);
605 }
606}
607
608fn timestamp_prefix() -> String {
609 time::OffsetDateTime::now_utc()
610 .format(&time::format_description::well_known::Rfc3339)
611 .unwrap_or_else(|_| "unknown-time".to_string())
612}
613
614fn format_message_payload(payload: &MessagePayload) -> String {
615 match payload {
616 MessagePayload::Bytes(bytes) => format!("0x{}", encode_hex(bytes.as_ref())),
617 MessagePayload::String(value) => format!("{:?}", value),
618 }
619}
620
621fn encode_hex(bytes: &[u8]) -> String {
622 let mut out = String::with_capacity(bytes.len() * 2);
623 for byte in bytes {
624 use std::fmt::Write;
625 let _ = write!(&mut out, "{:02x}", byte);
626 }
627 out
628}
629
630fn format_cspr_u512(motes: &U512) -> String {
631 let motes_str = motes.to_string();
632 let digits = motes_str.len();
633 if digits <= 9 {
634 let frac = format!("{:0>9}", motes_str);
635 let frac = frac.trim_end_matches('0');
636 if frac.is_empty() {
637 return "0".to_string();
638 }
639 return format!("0.{}", frac);
640 }
641
642 let split = digits - 9;
643 let (whole, frac) = motes_str.split_at(split);
644 let frac = frac.trim_end_matches('0');
645 if frac.is_empty() {
646 return whole.to_string();
647 }
648 format!("{}.{}", whole, frac)
649}
650
651async fn update_exited_process(
652 state: &mut State,
653 id: &str,
654 code: Option<i32>,
655 signal: Option<i32>,
656) -> Result<()> {
657 for record in &mut state.processes {
658 if record.id == id {
659 record.last_status = ProcessStatus::Exited;
660 record.exit_code = code;
661 record.exit_signal = signal;
662 record.stopped_at = Some(time::OffsetDateTime::now_utc());
663 break;
664 }
665 }
666 state.touch().await?;
667 Ok(())
668}
669
670fn log_exit(id: &str, pid: Option<u32>, code: Option<i32>, signal: Option<i32>) {
671 if let Some(pid) = pid {
672 if let Some(signal) = signal {
673 println!(
674 "process {} (pid {}) exited due to signal {}",
675 id, pid, signal
676 );
677 } else if let Some(code) = code {
678 println!("process {} (pid {}) exited with code {}", id, pid, code);
679 } else {
680 println!("process {} (pid {}) exited", id, pid);
681 }
682 } else if let Some(signal) = signal {
683 println!("process {} exited due to signal {}", id, signal);
684 } else if let Some(code) = code {
685 println!("process {} exited with code {}", id, code);
686 } else {
687 println!("process {} exited", id);
688 }
689}
690
691async fn print_start_banner(layout: &AssetsLayout, processes: &[RunningProcess]) {
692 let total_nodes = layout.count_nodes().await.unwrap_or(0);
693 let target = format!("all nodes ({})", total_nodes);
694 let sidecars = processes
695 .iter()
696 .filter(|proc| matches!(proc.record.kind, crate::state::ProcessKind::Sidecar))
697 .count();
698 println!(
699 "started {} process(es) for {} (sidecars: {})",
700 processes.len(),
701 target,
702 sidecars
703 );
704}
705
706fn looks_like_url(path: &Path) -> bool {
707 let value = path.to_string_lossy();
708 value.starts_with("http://") || value.starts_with("https://")
709}
710
711async fn run_assets(args: AssetsArgs) -> Result<()> {
712 match args.command {
713 AssetsCommand::Add(add) => run_assets_add(add).await,
714 AssetsCommand::Pull(pull) => run_assets_pull(pull).await,
715 AssetsCommand::List => run_assets_list().await,
716 }
717}
718
719async fn run_assets_add(args: AssetsAddArgs) -> Result<()> {
720 if looks_like_url(&args.path) {
721 return Err(anyhow!(
722 "assets URL is not supported yet; provide a local .tar.gz path"
723 ));
724 }
725 assets::install_assets_bundle(&args.path).await?;
726 println!(
727 "assets installed into {}",
728 assets::assets_bundle_root()?.display()
729 );
730 Ok(())
731}
732
733async fn run_assets_pull(args: AssetsPullArgs) -> Result<()> {
734 assets::pull_assets_bundles(args.target.as_deref(), args.force).await?;
735 Ok(())
736}
737
738async fn run_assets_list() -> Result<()> {
739 let mut versions = assets::list_bundle_versions().await?;
740 if versions.is_empty() {
741 println!("no assets found");
742 return Ok(());
743 }
744 versions.sort_by(|a, b| b.cmp(a));
745 for version in versions {
746 println!("{}", version);
747 }
748 Ok(())
749}
750
751async fn resolve_protocol_version(candidate: &Option<String>) -> Result<String> {
752 if let Some(raw) = candidate {
753 let version = assets::parse_protocol_version(raw)?;
754 if !assets::has_bundle_version(&version).await? {
755 let argv0 = std::env::args()
756 .next()
757 .unwrap_or_else(|| "casper-devnet".to_string());
758 let pull_cmd = format!("{} assets pull", argv0);
759 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
760 return Err(anyhow!(
761 "assets for version {} not found; run `{}` or `{}`",
762 version,
763 pull_cmd,
764 add_cmd
765 ));
766 }
767 return Ok(version.to_string());
768 }
769 match assets::most_recent_bundle_version().await {
770 Ok(version) => Ok(version.to_string()),
771 Err(_) => {
772 let argv0 = std::env::args()
773 .next()
774 .unwrap_or_else(|| "casper-devnet".to_string());
775 let pull_cmd = format!("{} assets pull", argv0);
776 let add_cmd = format!("{} assets add <path-to-assets.tar.gz>", argv0);
777 Err(anyhow!(
778 "no assets found; run `{}` or `{}`",
779 pull_cmd,
780 add_cmd
781 ))
782 }
783 }
784}
785
786#[cfg(test)]
787mod tests {
788 use super::{encode_hex, format_cspr_u512, format_message_payload};
789 use casper_types::contract_messages::MessagePayload;
790 use casper_types::U512;
791
792 #[test]
793 fn format_cspr_u512_handles_whole_and_fractional() {
794 assert_eq!(format_cspr_u512(&U512::zero()), "0");
795 assert_eq!(format_cspr_u512(&U512::from(1u64)), "0.000000001");
796 assert_eq!(format_cspr_u512(&U512::from(1_000_000_000u64)), "1");
797 assert_eq!(
798 format_cspr_u512(&U512::from(1_000_000_001u64)),
799 "1.000000001"
800 );
801 assert_eq!(
802 format_cspr_u512(&U512::from_dec_str("123000000000").unwrap()),
803 "123"
804 );
805 assert_eq!(
806 format_cspr_u512(&U512::from_dec_str("123000000456").unwrap()),
807 "123.000000456"
808 );
809 }
810
811 #[test]
812 fn format_message_payload_renders_string_with_quotes() {
813 let payload = MessagePayload::String("hello".to_string());
814 assert_eq!(format_message_payload(&payload), "\"hello\"");
815 }
816
817 #[test]
818 fn encode_hex_renders_lowercase() {
819 assert_eq!(encode_hex(&[0x00, 0xAB, 0x0f]), "00ab0f");
820 }
821}