1use self::args_parser::parse_session_args;
2use crate::assets::{self, AssetsLayout, SetupOptions};
3use crate::process::{self, StartPlan};
4use crate::state::{ProcessKind, ProcessStatus, STATE_FILE_NAME, State};
5use anyhow::{Context, Result, anyhow};
6use backoff::ExponentialBackoff;
7use backoff::backoff::Backoff;
8use casper_types::contracts::ContractHash;
9use casper_types::{
10 AddressableEntityHash, AsymmetricType, Digest, EntityVersion, Key, PricingMode, PublicKey,
11 SecretKey, TimeDiff, Transaction, TransactionHash, TransactionRuntimeParams, TransactionV1Hash,
12 URef,
13};
14use clap::{Args, ValueEnum};
15use futures::StreamExt;
16use nix::errno::Errno;
17use nix::sys::signal::kill;
18use nix::unistd::Pid;
19use rmcp::handler::server::{router::tool::ToolRouter, wrapper::Parameters};
20use rmcp::model::{CallToolResult, ServerCapabilities, ServerInfo};
21use rmcp::service::ServiceExt;
22use rmcp::transport::{
23 StreamableHttpServerConfig,
24 streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService},
25};
26use rmcp::{
27 ErrorData, ServerHandler, tool, tool_handler, tool_router, transport::stdio as mcp_stdio,
28};
29use serde::{Deserialize, Serialize};
30use serde_json::{Value, json};
31use std::collections::{HashMap, VecDeque};
32use std::path::{Path, PathBuf};
33use std::str::FromStr;
34use std::sync::Arc;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use std::time::{Duration, Instant};
37use tokio::fs as tokio_fs;
38use tokio::sync::{Mutex, Notify};
39use veles_casper_rust_sdk::TransactionV1Builder;
40use veles_casper_rust_sdk::jsonrpc::CasperClient;
41use veles_casper_rust_sdk::sse::event::SseEvent;
42use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
43
44const DEFAULT_HTTP_BIND: &str = "127.0.0.1:32100";
45const DEFAULT_HTTP_PATH: &str = "/mcp";
46const DEFAULT_NETWORK_NAME: &str = "casper-dev";
47const DEFAULT_NODE_COUNT: u32 = 4;
48const DEFAULT_DELAY_SECS: u64 = 3;
49const DEFAULT_LOG_LEVEL: &str = "info";
50const DEFAULT_NODE_LOG_FORMAT: &str = "json";
51const DEFAULT_SEED: &str = "default";
52const DEFAULT_TIMEOUT_SECS: u64 = 60;
53const DEFAULT_LOG_PAGE_SIZE: usize = 200;
54const DEFAULT_SSE_PAGE_SIZE: usize = 200;
55const DEFAULT_SSE_HISTORY_CAPACITY: usize = 20_000;
56const DEFAULT_PAYMENT_AMOUNT: u64 = 100_000_000_000;
57
58mod args_parser;
59
60#[derive(Debug, Clone, Copy, ValueEnum)]
61#[value(rename_all = "kebab-case")]
62pub enum McpTransport {
63 Stdio,
64 Http,
65 Both,
66}
67
68#[derive(Debug, Args, Clone)]
69pub struct McpArgs {
70 #[arg(long, value_enum, default_value_t = McpTransport::Both)]
71 transport: McpTransport,
72
73 #[arg(long, default_value = DEFAULT_HTTP_BIND)]
74 http_bind: String,
75
76 #[arg(long, default_value = DEFAULT_HTTP_PATH)]
77 http_path: String,
78
79 #[arg(long, value_name = "PATH")]
80 net_path: Option<PathBuf>,
81}
82
83#[derive(Debug, Clone)]
84struct McpServer {
85 manager: Arc<NetworkManager>,
86 tool_router: ToolRouter<Self>,
87}
88
89impl McpServer {
90 fn new(manager: Arc<NetworkManager>) -> Self {
91 Self {
92 manager,
93 tool_router: Self::tool_router(),
94 }
95 }
96
97 fn server_info() -> ServerInfo {
98 ServerInfo {
99 instructions: Some(
100 "Control multiple local Casper devnets. Start with spawn_network, then wait_network_ready before RPC or transaction tools. Do not use external casper-client binaries or curl; use MCP tools directly (for example get_transaction, wait_transaction, make_transaction_package_call, make_transaction_contract_call, make_transaction_session_wasm, send_transaction_signed).".to_string(),
101 ),
102 capabilities: ServerCapabilities::builder().enable_tools().build(),
103 ..Default::default()
104 }
105 }
106}
107
108#[tool_router]
109impl McpServer {
110 #[tool(
111 description = "Spawn or resume a managed network. Defaults to force_setup=true for fresh devnet startup."
112 )]
113 async fn spawn_network(
114 &self,
115 Parameters(request): Parameters<SpawnNetworkRequest>,
116 ) -> std::result::Result<CallToolResult, ErrorData> {
117 let result = self
118 .manager
119 .spawn_network(request)
120 .await
121 .map_err(to_mcp_error)?;
122 Ok(ok_value(
123 serde_json::to_value(result).map_err(internal_serde_error)?,
124 ))
125 }
126
127 #[tool(
128 description = "Wait for a managed network to be ready: processes running, /status healthy, reactor Validate, and at least one block observed."
129 )]
130 async fn wait_network_ready(
131 &self,
132 Parameters(request): Parameters<WaitNetworkReadyRequest>,
133 ) -> std::result::Result<CallToolResult, ErrorData> {
134 let result = self
135 .manager
136 .wait_network_ready(&request.network_name, request.timeout_seconds)
137 .await
138 .map_err(to_mcp_error)?;
139 Ok(ok_value(
140 serde_json::to_value(result).map_err(internal_serde_error)?,
141 ))
142 }
143
144 #[tool(
145 description = "Despawn a managed network. By default it stops processes and keeps files; set purge=true to remove files."
146 )]
147 async fn despawn_network(
148 &self,
149 Parameters(request): Parameters<DespawnNetworkRequest>,
150 ) -> std::result::Result<CallToolResult, ErrorData> {
151 let result = self
152 .manager
153 .despawn_network(&request.network_name, request.purge.unwrap_or(false))
154 .await
155 .map_err(to_mcp_error)?;
156 Ok(ok_value(
157 serde_json::to_value(result).map_err(internal_serde_error)?,
158 ))
159 }
160
161 #[tool(description = "List discovered network names and managed/running state.")]
162 async fn list_networks(
163 &self,
164 _: Parameters<ListNetworksRequest>,
165 ) -> std::result::Result<CallToolResult, ErrorData> {
166 let result = self.manager.list_networks().await.map_err(to_mcp_error)?;
167 Ok(ok_value(
168 serde_json::to_value(result).map_err(internal_serde_error)?,
169 ))
170 }
171
172 #[tool(
173 description = "List managed network processes, optionally filtered by process name. Defaults to running_only=true."
174 )]
175 async fn managed_processes(
176 &self,
177 Parameters(request): Parameters<ManagedProcessesRequest>,
178 ) -> std::result::Result<CallToolResult, ErrorData> {
179 let result = self
180 .manager
181 .managed_processes(request)
182 .await
183 .map_err(to_mcp_error)?;
184 Ok(ok_value(
185 serde_json::to_value(result).map_err(internal_serde_error)?,
186 ))
187 }
188
189 #[tool(description = "Call node REST /status for a managed network node.")]
190 async fn status(
191 &self,
192 Parameters(request): Parameters<NodeScopedRequest>,
193 ) -> std::result::Result<CallToolResult, ErrorData> {
194 let network = self
195 .manager
196 .get_network(&request.network_name)
197 .await
198 .map_err(to_mcp_error)?;
199 ensure_running_network(&network).await?;
200 let status = fetch_rest_status(request.node_id)
201 .await
202 .map_err(to_mcp_error)?;
203 Ok(ok_value(status))
204 }
205
206 #[tool(
207 description = "Run a raw JSON-RPC call against node sidecar endpoint. Useful as a generic query helper."
208 )]
209 async fn rpc_query(
210 &self,
211 Parameters(request): Parameters<RpcQueryRequest>,
212 ) -> std::result::Result<CallToolResult, ErrorData> {
213 let network = self
214 .manager
215 .get_network(&request.network_name)
216 .await
217 .map_err(to_mcp_error)?;
218 ensure_running_network(&network).await?;
219 let value = self
220 .manager
221 .raw_rpc_query(request.node_id, &request.method, request.params)
222 .await
223 .map_err(to_mcp_error)?;
224 Ok(ok_value(value))
225 }
226
227 #[tool(description = "Typed balance query by account/public key/uref/entity identifier.")]
228 async fn rpc_query_balance(
229 &self,
230 Parameters(request): Parameters<RpcQueryBalanceRequest>,
231 ) -> std::result::Result<CallToolResult, ErrorData> {
232 let network = self
233 .manager
234 .get_network(&request.network_name)
235 .await
236 .map_err(to_mcp_error)?;
237 ensure_running_network(&network).await?;
238
239 let block_id = normalize_optional_identifier(request.block_id.as_deref());
240 let state_root_hash = normalize_optional_identifier(request.state_root_hash.as_deref());
241 let params =
242 build_query_balance_params(&request.purse_identifier, &block_id, &state_root_hash)
243 .map_err(to_mcp_error)?;
244 let response = self
245 .manager
246 .raw_rpc_query(request.node_id, "query_balance", Some(params))
247 .await
248 .map_err(to_mcp_error)?;
249 Ok(ok_value(
250 extract_rpc_result(response).map_err(to_mcp_error)?,
251 ))
252 }
253
254 #[tool(
255 description = "Typed global state query by key + optional path + optional block/state-root identifier. If no identifier is provided, the latest block hash is used."
256 )]
257 async fn rpc_query_global_state(
258 &self,
259 Parameters(request): Parameters<RpcQueryGlobalStateRequest>,
260 ) -> std::result::Result<CallToolResult, ErrorData> {
261 let network = self
262 .manager
263 .get_network(&request.network_name)
264 .await
265 .map_err(to_mcp_error)?;
266 ensure_running_network(&network).await?;
267
268 let (block_id, state_root_hash) = resolve_global_state_identifier(
269 &request.network_name,
270 request.node_id,
271 request.block_id.as_deref(),
272 request.state_root_hash.as_deref(),
273 )
274 .await
275 .map_err(to_mcp_error)?;
276 let params = build_query_global_state_params(
277 &request.key,
278 request.path.unwrap_or_default(),
279 &block_id,
280 &state_root_hash,
281 )
282 .map_err(to_mcp_error)?;
283 let response = self
284 .manager
285 .raw_rpc_query(request.node_id, "query_global_state", Some(params))
286 .await
287 .map_err(to_mcp_error)?;
288
289 Ok(ok_value(
290 extract_rpc_result(response).map_err(to_mcp_error)?,
291 ))
292 }
293
294 #[tool(description = "Get current block height using typed chain_get_block RPC.")]
295 async fn current_block_height(
296 &self,
297 Parameters(request): Parameters<CurrentBlockRequest>,
298 ) -> std::result::Result<CallToolResult, ErrorData> {
299 let network = self
300 .manager
301 .get_network(&request.network_name)
302 .await
303 .map_err(to_mcp_error)?;
304 ensure_running_network(&network).await?;
305
306 let value = fetch_block_result(
307 &self.manager,
308 &request.network_name,
309 request.node_id,
310 request.block_id.as_deref(),
311 )
312 .await;
313 let value = match value {
314 Ok(value) => value,
315 Err(err) => {
316 if request.block_id.is_none()
317 && let Some((low, high)) =
318 parse_no_such_block_range_from_error(&err.to_string())
319 {
320 return Ok(ok_value(json!({
321 "network_name": request.network_name,
322 "node_id": request.node_id,
323 "height": high,
324 "block": Value::Null,
325 "pending": true,
326 "available_block_range": {
327 "low": low,
328 "high": high,
329 },
330 "message": "latest block is not yet queryable; retry shortly",
331 })));
332 }
333 return Err(to_mcp_error(err));
334 }
335 };
336 let height = extract_block_height(&value).ok_or_else(|| {
337 ErrorData::internal_error("missing block height in RPC response", None)
338 })?;
339
340 Ok(ok_value(json!({
341 "network_name": request.network_name,
342 "node_id": request.node_id,
343 "height": height,
344 "block": value,
345 })))
346 }
347
348 #[tool(description = "Get current block payload using typed chain_get_block RPC.")]
349 async fn current_block(
350 &self,
351 Parameters(request): Parameters<CurrentBlockRequest>,
352 ) -> std::result::Result<CallToolResult, ErrorData> {
353 let network = self
354 .manager
355 .get_network(&request.network_name)
356 .await
357 .map_err(to_mcp_error)?;
358 ensure_running_network(&network).await?;
359
360 let value = fetch_block_result(
361 &self.manager,
362 &request.network_name,
363 request.node_id,
364 request.block_id.as_deref(),
365 )
366 .await
367 .map_err(to_mcp_error)?;
368 Ok(ok_value(value))
369 }
370
371 #[tool(description = "Get paginated node stdout/stderr logs from on-disk files.")]
372 async fn get_node_logs(
373 &self,
374 Parameters(request): Parameters<GetNodeLogsRequest>,
375 ) -> std::result::Result<CallToolResult, ErrorData> {
376 let network = self
377 .manager
378 .get_network(&request.network_name)
379 .await
380 .map_err(to_mcp_error)?;
381
382 let limit = request.limit.unwrap_or(DEFAULT_LOG_PAGE_SIZE);
383 if limit == 0 {
384 return Err(ErrorData::invalid_params(
385 "limit must be greater than 0",
386 None,
387 ));
388 }
389
390 let file_name = match request.stream {
391 NodeLogStream::Stdout => "stdout.log",
392 NodeLogStream::Stderr => "stderr.log",
393 };
394 let path = network
395 .layout
396 .node_logs_dir(request.node_id)
397 .join(file_name);
398
399 let page = read_log_page(&path, request.before_line, limit)
400 .await
401 .map_err(to_mcp_error)?;
402 Ok(ok_value(
403 serde_json::to_value(page).map_err(internal_serde_error)?,
404 ))
405 }
406
407 #[tool(
408 description = "Wait for the next SSE event after a sequence cursor with optional event type filters."
409 )]
410 async fn wait_next_sse(
411 &self,
412 Parameters(request): Parameters<WaitNextSseRequest>,
413 ) -> std::result::Result<CallToolResult, ErrorData> {
414 let network = self
415 .manager
416 .get_network(&request.network_name)
417 .await
418 .map_err(to_mcp_error)?;
419 ensure_running_network(&network).await?;
420
421 let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
422 let filter = SseFilter {
423 include_event_types: request.include_event_types.unwrap_or_default(),
424 exclude_event_types: request.exclude_event_types.unwrap_or_default(),
425 };
426
427 let after = match request.after_sequence {
428 Some(value) => value,
429 None => network.sse_store.latest_sequence().await,
430 };
431
432 let event = network
433 .sse_store
434 .wait_next(after, &filter, timeout)
435 .await
436 .map_err(to_mcp_error)?;
437
438 Ok(ok_value(
439 serde_json::to_value(event).map_err(internal_serde_error)?,
440 ))
441 }
442
443 #[tool(
444 description = "Fetch paginated SSE history with sequence cursor and optional event type filters."
445 )]
446 async fn sse_history(
447 &self,
448 Parameters(request): Parameters<SseHistoryRequest>,
449 ) -> std::result::Result<CallToolResult, ErrorData> {
450 let network = self
451 .manager
452 .get_network(&request.network_name)
453 .await
454 .map_err(to_mcp_error)?;
455
456 let limit = request.limit.unwrap_or(DEFAULT_SSE_PAGE_SIZE);
457 if limit == 0 {
458 return Err(ErrorData::invalid_params(
459 "limit must be greater than 0",
460 None,
461 ));
462 }
463
464 let filter = SseFilter {
465 include_event_types: request.include_event_types.unwrap_or_default(),
466 exclude_event_types: request.exclude_event_types.unwrap_or_default(),
467 };
468
469 let history = network
470 .sse_store
471 .history(request.before_sequence, limit, &filter)
472 .await;
473 Ok(ok_value(
474 serde_json::to_value(history).map_err(internal_serde_error)?,
475 ))
476 }
477
478 #[tool(description = "List derived accounts from derived-accounts.csv for a network.")]
479 async fn list_derived_accounts(
480 &self,
481 Parameters(request): Parameters<ListDerivedAccountsRequest>,
482 ) -> std::result::Result<CallToolResult, ErrorData> {
483 let accounts = self
484 .manager
485 .list_derived_accounts(&request.network_name)
486 .await
487 .map_err(to_mcp_error)?;
488 Ok(ok_value(
489 serde_json::to_value(accounts).map_err(internal_serde_error)?,
490 ))
491 }
492
493 #[tool(
494 description = "Submit signed or unsigned transaction JSON. Signer key is derived from signer_path for this managed network. transaction must be a typed Transaction JSON object."
495 )]
496 async fn send_transaction_signed(
497 &self,
498 Parameters(request): Parameters<SendTransactionSignedRequest>,
499 ) -> std::result::Result<CallToolResult, ErrorData> {
500 let network = self
501 .manager
502 .get_network(&request.network_name)
503 .await
504 .map_err(to_mcp_error)?;
505 ensure_running_network(&network).await?;
506
507 let signer = self
508 .manager
509 .derived_account_for_path(&network, &request.signer_path)
510 .await
511 .map_err(to_mcp_error)?;
512 verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
513 .await
514 .map_err(to_mcp_error)?;
515
516 let mut transaction = parse_transaction_json(request.transaction)?;
517 transaction.sign(&signer.secret_key);
518
519 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
520 let response = rpc
521 .put_transaction(transaction)
522 .await
523 .map_err(to_mcp_error)?;
524
525 Ok(ok_value(json!({
526 "network_name": request.network_name,
527 "node_id": request.node_id,
528 "transaction_hash": response.transaction_hash.to_hex_string(),
529 })))
530 }
531
532 #[tool(
533 description = "Create a stored package-name call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
534 )]
535 async fn make_transaction_package_call(
536 &self,
537 Parameters(request): Parameters<MakeTransactionPackageCallRequest>,
538 ) -> std::result::Result<CallToolResult, ErrorData> {
539 let network = self
540 .manager
541 .get_network(&request.network_name)
542 .await
543 .map_err(to_mcp_error)?;
544 ensure_running_network(&network).await?;
545
546 let runtime = match (
547 request.runtime_transferred_value,
548 request.runtime_seed_hex.as_deref(),
549 ) {
550 (None, None) => TransactionRuntimeParams::VmCasperV1,
551 (transferred_value, maybe_seed_hex) => {
552 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
553 TransactionRuntimeParams::VmCasperV2 {
554 transferred_value: transferred_value.unwrap_or(0),
555 seed,
556 }
557 }
558 };
559
560 let mut builder = TransactionV1Builder::new_targeting_package_via_alias(
561 request.transaction_package_name.clone(),
562 request.transaction_package_version,
563 request.session_entry_point.clone(),
564 runtime,
565 );
566
567 if let Some(args_json) = request.session_args.as_ref()
568 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
569 {
570 builder = builder.with_runtime_args(runtime_args);
571 }
572
573 if let Some(ttl_millis) = request.ttl_millis {
574 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
575 }
576
577 let pricing = build_pricing_mode(
578 request.gas_price_tolerance,
579 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
580 );
581 builder = builder.with_pricing_mode(pricing);
582
583 let chain_name = match request.chain_name {
584 Some(value) => value,
585 None => fetch_chain_name(&request.network_name, request.node_id)
586 .await
587 .map_err(to_mcp_error)?,
588 };
589 builder = builder.with_chain_name(chain_name.clone());
590
591 let mut signed = false;
592 let mut signer_path = None;
593 let mut derived_signer = None;
594 if let Some(path) = request.signer_path {
595 let signer = self
596 .manager
597 .derived_account_for_path(&network, &path)
598 .await
599 .map_err(to_mcp_error)?;
600 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
601 .await
602 .map_err(to_mcp_error)?;
603 derived_signer = Some(signer);
604 signed = true;
605 signer_path = Some(path);
606 } else if let Some(initiator_public_key) = request.initiator_public_key {
607 let public_key = PublicKey::from_hex(initiator_public_key.trim())
608 .with_context(|| "failed to parse initiator_public_key as hex public key")
609 .map_err(to_mcp_error)?;
610 builder = builder.with_initiator_addr(public_key);
611 } else {
612 return Err(ErrorData::invalid_params(
613 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
614 None,
615 ));
616 }
617
618 if let Some(signer) = derived_signer.as_ref() {
619 builder = builder.with_secret_key(&signer.secret_key);
620 }
621
622 let tx = builder.build().map_err(to_mcp_error)?;
623 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
624
625 Ok(ok_value(json!({
626 "network_name": request.network_name,
627 "node_id": request.node_id,
628 "chain_name": chain_name,
629 "signed": signed,
630 "signer_path": signer_path,
631 "transaction": tx_json,
632 })))
633 }
634
635 #[tool(
636 description = "Create a stored contract-hash call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
637 )]
638 async fn make_transaction_contract_call(
639 &self,
640 Parameters(request): Parameters<MakeTransactionContractCallRequest>,
641 ) -> std::result::Result<CallToolResult, ErrorData> {
642 let network = self
643 .manager
644 .get_network(&request.network_name)
645 .await
646 .map_err(to_mcp_error)?;
647 ensure_running_network(&network).await?;
648
649 let runtime = match (
650 request.runtime_transferred_value,
651 request.runtime_seed_hex.as_deref(),
652 ) {
653 (None, None) => TransactionRuntimeParams::VmCasperV1,
654 (transferred_value, maybe_seed_hex) => {
655 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
656 TransactionRuntimeParams::VmCasperV2 {
657 transferred_value: transferred_value.unwrap_or(0),
658 seed,
659 }
660 }
661 };
662
663 let contract_hash = parse_contract_hash_for_invocation(&request.transaction_contract_hash)
664 .map_err(to_mcp_error)?;
665 let mut builder = TransactionV1Builder::new_targeting_invocable_entity(
666 contract_hash,
667 request.session_entry_point.clone(),
668 runtime,
669 );
670
671 if let Some(args_json) = request.session_args.as_ref()
672 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
673 {
674 builder = builder.with_runtime_args(runtime_args);
675 }
676
677 if let Some(ttl_millis) = request.ttl_millis {
678 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
679 }
680
681 let pricing = build_pricing_mode(
682 request.gas_price_tolerance,
683 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
684 );
685 builder = builder.with_pricing_mode(pricing);
686
687 let chain_name = match request.chain_name {
688 Some(value) => value,
689 None => fetch_chain_name(&request.network_name, request.node_id)
690 .await
691 .map_err(to_mcp_error)?,
692 };
693 builder = builder.with_chain_name(chain_name.clone());
694
695 let mut signed = false;
696 let mut signer_path = None;
697 let mut derived_signer = None;
698 if let Some(path) = request.signer_path {
699 let signer = self
700 .manager
701 .derived_account_for_path(&network, &path)
702 .await
703 .map_err(to_mcp_error)?;
704 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
705 .await
706 .map_err(to_mcp_error)?;
707 derived_signer = Some(signer);
708 signed = true;
709 signer_path = Some(path);
710 } else if let Some(initiator_public_key) = request.initiator_public_key {
711 let public_key = PublicKey::from_hex(initiator_public_key.trim())
712 .with_context(|| "failed to parse initiator_public_key as hex public key")
713 .map_err(to_mcp_error)?;
714 builder = builder.with_initiator_addr(public_key);
715 } else {
716 return Err(ErrorData::invalid_params(
717 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
718 None,
719 ));
720 }
721
722 if let Some(signer) = derived_signer.as_ref() {
723 builder = builder.with_secret_key(&signer.secret_key);
724 }
725
726 let tx = builder.build().map_err(to_mcp_error)?;
727 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
728
729 Ok(ok_value(json!({
730 "network_name": request.network_name,
731 "node_id": request.node_id,
732 "chain_name": chain_name,
733 "transaction_contract_hash": request.transaction_contract_hash,
734 "signed": signed,
735 "signer_path": signer_path,
736 "transaction": tx_json,
737 })))
738 }
739
740 #[tool(
741 description = "Create a session wasm transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
742 )]
743 async fn make_transaction_session_wasm(
744 &self,
745 Parameters(request): Parameters<MakeTransactionSessionWasmRequest>,
746 ) -> std::result::Result<CallToolResult, ErrorData> {
747 let network = self
748 .manager
749 .get_network(&request.network_name)
750 .await
751 .map_err(to_mcp_error)?;
752 ensure_running_network(&network).await?;
753
754 let wasm_path = PathBuf::from(&request.wasm_path);
755 let wasm_bytes = tokio_fs::read(&wasm_path)
756 .await
757 .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
758 .map_err(to_mcp_error)?;
759
760 let runtime = match (
761 request.runtime_transferred_value,
762 request.runtime_seed_hex.as_deref(),
763 ) {
764 (None, None) => TransactionRuntimeParams::VmCasperV1,
765 (transferred_value, maybe_seed_hex) => {
766 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
767 TransactionRuntimeParams::VmCasperV2 {
768 transferred_value: transferred_value.unwrap_or(0),
769 seed,
770 }
771 }
772 };
773
774 let mut builder = TransactionV1Builder::new_session(
775 request.is_install_upgrade.unwrap_or(true),
776 wasm_bytes.into(),
777 runtime,
778 );
779
780 if let Some(args_json) = request.session_args.as_ref()
781 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
782 {
783 builder = builder.with_runtime_args(runtime_args);
784 }
785
786 if let Some(ttl_millis) = request.ttl_millis {
787 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
788 }
789
790 let pricing = build_pricing_mode(
791 request.gas_price_tolerance,
792 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
793 );
794 builder = builder.with_pricing_mode(pricing);
795
796 let chain_name = match request.chain_name {
797 Some(value) => value,
798 None => fetch_chain_name(&request.network_name, request.node_id)
799 .await
800 .map_err(to_mcp_error)?,
801 };
802 builder = builder.with_chain_name(chain_name.clone());
803
804 let mut signed = false;
805 let mut signer_path = None;
806 let mut derived_signer = None;
807 if let Some(path) = request.signer_path {
808 let signer = self
809 .manager
810 .derived_account_for_path(&network, &path)
811 .await
812 .map_err(to_mcp_error)?;
813 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
814 .await
815 .map_err(to_mcp_error)?;
816 derived_signer = Some(signer);
817 signed = true;
818 signer_path = Some(path);
819 } else if let Some(initiator_public_key) = request.initiator_public_key {
820 let public_key = PublicKey::from_hex(initiator_public_key.trim())
821 .with_context(|| "failed to parse initiator_public_key as hex public key")
822 .map_err(to_mcp_error)?;
823 builder = builder.with_initiator_addr(public_key);
824 } else {
825 return Err(ErrorData::invalid_params(
826 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
827 None,
828 ));
829 }
830
831 if let Some(signer) = derived_signer.as_ref() {
832 builder = builder.with_secret_key(&signer.secret_key);
833 }
834
835 let tx = builder.build().map_err(to_mcp_error)?;
836 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
837
838 Ok(ok_value(json!({
839 "network_name": request.network_name,
840 "node_id": request.node_id,
841 "chain_name": chain_name,
842 "wasm_path": request.wasm_path,
843 "signed": signed,
844 "signer_path": signer_path,
845 "transaction": tx_json,
846 })))
847 }
848
849 #[tool(
850 description = "Build, sign and submit a session wasm transaction from a derived account path. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
851 )]
852 async fn send_session_wasm(
853 &self,
854 Parameters(request): Parameters<SendSessionWasmRequest>,
855 ) -> std::result::Result<CallToolResult, ErrorData> {
856 let network = self
857 .manager
858 .get_network(&request.network_name)
859 .await
860 .map_err(to_mcp_error)?;
861 ensure_running_network(&network).await?;
862
863 let signer = self
864 .manager
865 .derived_account_for_path(&network, &request.signer_path)
866 .await
867 .map_err(to_mcp_error)?;
868 verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
869 .await
870 .map_err(to_mcp_error)?;
871
872 let wasm_path = PathBuf::from(&request.wasm_path);
873 let wasm_bytes = tokio_fs::read(&wasm_path)
874 .await
875 .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
876 .map_err(to_mcp_error)?;
877
878 let runtime = match (
879 request.runtime_transferred_value,
880 request.runtime_seed_hex.as_deref(),
881 ) {
882 (None, None) => TransactionRuntimeParams::VmCasperV1,
883 (transferred_value, maybe_seed_hex) => {
884 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
885 TransactionRuntimeParams::VmCasperV2 {
886 transferred_value: transferred_value.unwrap_or(0),
887 seed,
888 }
889 }
890 };
891
892 let mut builder = TransactionV1Builder::new_session(
893 request.is_install_upgrade.unwrap_or(true),
894 wasm_bytes.into(),
895 runtime,
896 );
897
898 if let Some(args_json) = request.session_args.as_ref()
899 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
900 {
901 builder = builder.with_runtime_args(runtime_args);
902 }
903
904 if let Some(ttl_millis) = request.ttl_millis {
905 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
906 }
907
908 let pricing = build_pricing_mode(
909 request.gas_price_tolerance,
910 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
911 );
912 builder = builder.with_pricing_mode(pricing);
913
914 let chain_name = match request.chain_name {
915 Some(value) => value,
916 None => fetch_chain_name(&request.network_name, request.node_id)
917 .await
918 .map_err(to_mcp_error)?,
919 };
920
921 let tx = builder
922 .with_chain_name(chain_name)
923 .with_secret_key(&signer.secret_key)
924 .build()
925 .map_err(to_mcp_error)?;
926
927 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
928 let response = rpc
929 .put_transaction(Transaction::V1(tx))
930 .await
931 .map_err(to_mcp_error)?;
932
933 Ok(ok_value(json!({
934 "network_name": request.network_name,
935 "node_id": request.node_id,
936 "transaction_hash": response.transaction_hash.to_hex_string(),
937 })))
938 }
939
940 #[tool(
941 description = "Transfer tokens between derived account paths. from_path signs, to_path resolves recipient."
942 )]
943 async fn transfer_tokens(
944 &self,
945 Parameters(request): Parameters<TransferTokensRequest>,
946 ) -> std::result::Result<CallToolResult, ErrorData> {
947 let network = self
948 .manager
949 .get_network(&request.network_name)
950 .await
951 .map_err(to_mcp_error)?;
952 ensure_running_network(&network).await?;
953
954 let from = self
955 .manager
956 .derived_account_for_path(&network, &request.from_path)
957 .await
958 .map_err(to_mcp_error)?;
959 let to = self
960 .manager
961 .derived_account_for_path(&network, &request.to_path)
962 .await
963 .map_err(to_mcp_error)?;
964
965 verify_path_hash_consistency(&network.layout, &request.from_path, &from.account_hash)
966 .await
967 .map_err(to_mcp_error)?;
968 verify_path_hash_consistency(&network.layout, &request.to_path, &to.account_hash)
969 .await
970 .map_err(to_mcp_error)?;
971
972 let amount = casper_types::U512::from_dec_str(&request.amount)
973 .with_context(|| "amount must be a decimal U512 string")
974 .map_err(to_mcp_error)?;
975 let to_public_key = PublicKey::from_hex(&to.public_key_hex)
976 .with_context(|| "failed to parse derived recipient public key")
977 .map_err(to_mcp_error)?;
978
979 let mut builder =
980 TransactionV1Builder::new_transfer(amount, None, to_public_key, request.transfer_id)
981 .map_err(to_mcp_error)?;
982
983 if let Some(ttl_millis) = request.ttl_millis {
984 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
985 }
986
987 let pricing = build_pricing_mode(
988 request.gas_price_tolerance,
989 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
990 );
991 builder = builder.with_pricing_mode(pricing);
992
993 let chain_name = match request.chain_name {
994 Some(value) => value,
995 None => fetch_chain_name(&request.network_name, request.node_id)
996 .await
997 .map_err(to_mcp_error)?,
998 };
999
1000 let tx = builder
1001 .with_chain_name(chain_name)
1002 .with_secret_key(&from.secret_key)
1003 .build()
1004 .map_err(to_mcp_error)?;
1005
1006 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1007 let response = rpc
1008 .put_transaction(Transaction::V1(tx))
1009 .await
1010 .map_err(to_mcp_error)?;
1011
1012 Ok(ok_value(json!({
1013 "network_name": request.network_name,
1014 "node_id": request.node_id,
1015 "transaction_hash": response.transaction_hash.to_hex_string(),
1016 "from_path": request.from_path,
1017 "to_path": request.to_path,
1018 "amount": request.amount,
1019 })))
1020 }
1021
1022 #[tool(
1023 description = "Wait for transaction execution result with timeout and return execution effects/result payload."
1024 )]
1025 async fn wait_transaction(
1026 &self,
1027 Parameters(request): Parameters<WaitTransactionRequest>,
1028 ) -> std::result::Result<CallToolResult, ErrorData> {
1029 let network = self
1030 .manager
1031 .get_network(&request.network_name)
1032 .await
1033 .map_err(to_mcp_error)?;
1034 ensure_running_network(&network).await?;
1035
1036 let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1037 let poll = Duration::from_millis(request.poll_interval_millis.unwrap_or(1_000));
1038 let tx_hash =
1039 parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1040 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1041 let deadline = Instant::now() + timeout;
1042
1043 loop {
1044 let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1045
1046 if let Some(exec_info) = response.execution_info.clone()
1047 && exec_info.execution_result.is_some()
1048 {
1049 return Ok(ok_value(
1050 serde_json::to_value(response).map_err(internal_serde_error)?,
1051 ));
1052 }
1053
1054 if Instant::now() >= deadline {
1055 return Err(ErrorData::resource_not_found(
1056 format!(
1057 "transaction {} execution result not available before timeout",
1058 request.transaction_hash
1059 ),
1060 None,
1061 ));
1062 }
1063
1064 tokio::time::sleep(poll).await;
1065 }
1066 }
1067
1068 #[tool(
1069 description = "Get transaction details via info_get_transaction (non-waiting). Returns typed JSON response from node."
1070 )]
1071 async fn get_transaction(
1072 &self,
1073 Parameters(request): Parameters<GetTransactionRequest>,
1074 ) -> std::result::Result<CallToolResult, ErrorData> {
1075 let network = self
1076 .manager
1077 .get_network(&request.network_name)
1078 .await
1079 .map_err(to_mcp_error)?;
1080 ensure_running_network(&network).await?;
1081
1082 let tx_hash =
1083 parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1084 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1085 let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1086
1087 Ok(ok_value(
1088 serde_json::to_value(response).map_err(internal_serde_error)?,
1089 ))
1090 }
1091}
1092
1093#[tool_handler]
1094impl ServerHandler for McpServer {
1095 fn get_info(&self) -> ServerInfo {
1096 Self::server_info()
1097 }
1098}
1099
1100#[derive(Debug)]
1101struct NetworkManager {
1102 assets_root: PathBuf,
1103 managed: Mutex<HashMap<String, Arc<ManagedNetwork>>>,
1104 http: reqwest::Client,
1105}
1106
1107#[derive(Debug)]
1108struct ManagedNetwork {
1109 layout: AssetsLayout,
1110 state: Arc<Mutex<State>>,
1111 node_count: u32,
1112 seed: Arc<str>,
1113 sse_store: Arc<SseStore>,
1114 shutdown: Arc<AtomicBool>,
1115 sse_tasks: Mutex<Vec<tokio::task::JoinHandle<()>>>,
1116}
1117
1118impl ManagedNetwork {
1119 async fn is_running(&self) -> bool {
1120 let state = self.state.lock().await;
1121 processes_running(&state)
1122 }
1123
1124 async fn stop(&self) -> Result<()> {
1125 self.shutdown.store(true, Ordering::SeqCst);
1126
1127 let tasks = {
1128 let mut guard = self.sse_tasks.lock().await;
1129 guard.drain(..).collect::<Vec<_>>()
1130 };
1131 for task in tasks {
1132 task.abort();
1133 let _ = task.await;
1134 }
1135
1136 let mut state = self.state.lock().await;
1137 process::stop(&mut state).await
1138 }
1139}
1140
1141impl NetworkManager {
1142 async fn new(assets_root: PathBuf) -> Result<Self> {
1143 let http = reqwest::Client::builder()
1144 .no_proxy()
1145 .timeout(Duration::from_secs(5))
1146 .build()?;
1147 Ok(Self {
1148 assets_root,
1149 managed: Mutex::new(HashMap::new()),
1150 http,
1151 })
1152 }
1153
1154 async fn spawn_network(&self, request: SpawnNetworkRequest) -> Result<SpawnNetworkResponse> {
1155 let network_name = request
1156 .network_name
1157 .unwrap_or_else(|| DEFAULT_NETWORK_NAME.to_string());
1158
1159 if network_name.trim().is_empty() {
1160 return Err(anyhow!("network_name must not be empty"));
1161 }
1162
1163 let maybe_existing = {
1164 let mut managed = self.managed.lock().await;
1165 if let Some(existing) = managed.get(&network_name)
1166 && existing.is_running().await
1167 && !request.force_setup.unwrap_or(true)
1168 {
1169 return Ok(SpawnNetworkResponse {
1170 network_name,
1171 node_count: existing.node_count,
1172 managed: true,
1173 already_running: true,
1174 forced_setup: false,
1175 });
1176 }
1177 managed.remove(&network_name)
1178 };
1179
1180 if let Some(existing) = maybe_existing {
1181 let _ = existing.stop().await;
1182 }
1183
1184 let force_setup = request.force_setup.unwrap_or(true);
1185 let requested_nodes = request.node_count.unwrap_or(DEFAULT_NODE_COUNT);
1186 let users = request.users;
1187 let delay = request.delay.unwrap_or(DEFAULT_DELAY_SECS);
1188 let log_level = request
1189 .log_level
1190 .unwrap_or_else(|| DEFAULT_LOG_LEVEL.to_string());
1191 let node_log_format = request
1192 .node_log_format
1193 .unwrap_or_else(|| DEFAULT_NODE_LOG_FORMAT.to_string());
1194 let seed: Arc<str> = Arc::from(request.seed.unwrap_or_else(|| DEFAULT_SEED.to_string()));
1195
1196 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.clone());
1197 let assets_exist = layout.exists().await;
1198
1199 let protocol_version =
1200 resolve_protocol_version(request.protocol_version.as_deref()).await?;
1201
1202 if force_setup {
1203 assets::teardown(&layout).await?;
1204 assets::setup_local(
1205 &layout,
1206 &SetupOptions {
1207 nodes: requested_nodes,
1208 users,
1209 delay_seconds: delay,
1210 network_name: network_name.clone(),
1211 protocol_version: protocol_version.clone(),
1212 node_log_format,
1213 seed: Arc::clone(&seed),
1214 },
1215 )
1216 .await?;
1217 } else if !assets_exist {
1218 assets::setup_local(
1219 &layout,
1220 &SetupOptions {
1221 nodes: requested_nodes,
1222 users,
1223 delay_seconds: delay,
1224 network_name: network_name.clone(),
1225 protocol_version: protocol_version.clone(),
1226 node_log_format,
1227 seed: Arc::clone(&seed),
1228 },
1229 )
1230 .await?;
1231 }
1232
1233 if !layout.exists().await {
1234 return Err(anyhow!(
1235 "assets missing under {}; call spawn_network with force_setup=true",
1236 layout.net_dir().display()
1237 ));
1238 }
1239
1240 if !force_setup && assets_exist {
1241 let _ = assets::ensure_consensus_keys(&layout, Arc::clone(&seed)).await?;
1242 }
1243
1244 let node_count = layout.count_nodes().await?;
1245 if node_count == 0 {
1246 return Err(anyhow!("network has no nodes to start"));
1247 }
1248
1249 ensure_sidecar_available(&layout, node_count).await?;
1250
1251 let state_path = layout.net_dir().join(STATE_FILE_NAME);
1252 let mut state = State::new(state_path).await?;
1253
1254 process::start(
1255 &layout,
1256 &StartPlan {
1257 rust_log: log_level,
1258 },
1259 &mut state,
1260 )
1261 .await?;
1262
1263 let managed = Arc::new(ManagedNetwork {
1264 layout: layout.clone(),
1265 state: Arc::new(Mutex::new(state)),
1266 node_count,
1267 seed,
1268 sse_store: Arc::new(SseStore::new(DEFAULT_SSE_HISTORY_CAPACITY)),
1269 shutdown: Arc::new(AtomicBool::new(false)),
1270 sse_tasks: Mutex::new(Vec::new()),
1271 });
1272
1273 self.spawn_sse_collectors(&managed).await;
1274
1275 self.managed
1276 .lock()
1277 .await
1278 .insert(network_name.clone(), Arc::clone(&managed));
1279
1280 Ok(SpawnNetworkResponse {
1281 network_name,
1282 node_count,
1283 managed: true,
1284 already_running: false,
1285 forced_setup: force_setup,
1286 })
1287 }
1288
1289 async fn spawn_sse_collectors(&self, network: &Arc<ManagedNetwork>) {
1290 let mut tasks = Vec::new();
1291 for node_id in 1..=network.node_count {
1292 let endpoint = assets::sse_endpoint(node_id);
1293 let network = Arc::clone(network);
1294 let task = tokio::spawn(async move {
1295 run_sse_listener(network, node_id, endpoint).await;
1296 });
1297 tasks.push(task);
1298 }
1299 let mut guard = network.sse_tasks.lock().await;
1300 guard.extend(tasks);
1301 }
1302
1303 async fn wait_network_ready(
1304 &self,
1305 network_name: &str,
1306 timeout_seconds: Option<u64>,
1307 ) -> Result<WaitReadyResponse> {
1308 let network = self.get_network(network_name).await?;
1309 let timeout = Duration::from_secs(timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1310 let deadline = Instant::now() + timeout;
1311
1312 loop {
1313 let running = ensure_running_network(&network).await.is_ok();
1314 if running {
1315 let status = check_rest_ready(&network).await;
1316 if let Ok(rest_by_node) = status {
1317 let state = network.state.lock().await;
1318 let block_observed = state.last_block_height.is_some()
1319 || rest_by_node.values().any(rest_has_block);
1320 if block_observed {
1321 return Ok(WaitReadyResponse {
1322 network_name: network_name.to_string(),
1323 ready: true,
1324 node_count: network.node_count,
1325 rest: rest_by_node,
1326 last_block_height: state.last_block_height,
1327 });
1328 }
1329 }
1330 }
1331
1332 if Instant::now() >= deadline {
1333 return Ok(WaitReadyResponse {
1334 network_name: network_name.to_string(),
1335 ready: false,
1336 node_count: network.node_count,
1337 rest: HashMap::new(),
1338 last_block_height: None,
1339 });
1340 }
1341
1342 tokio::time::sleep(Duration::from_millis(500)).await;
1343 }
1344 }
1345
1346 async fn despawn_network(&self, network_name: &str, purge: bool) -> Result<DespawnResponse> {
1347 let managed = {
1348 let mut guard = self.managed.lock().await;
1349 guard.remove(network_name)
1350 };
1351
1352 let Some(network) = managed else {
1353 return Err(anyhow!("network '{}' is not managed", network_name));
1354 };
1355
1356 network.stop().await?;
1357 if purge {
1358 assets::teardown(&network.layout).await?;
1359 }
1360
1361 Ok(DespawnResponse {
1362 network_name: network_name.to_string(),
1363 purged: purge,
1364 })
1365 }
1366
1367 async fn list_networks(&self) -> Result<ListNetworksResponse> {
1368 let discovered = discover_network_names(&self.assets_root).await?;
1369 let managed_snapshot = {
1370 let guard = self.managed.lock().await;
1371 guard
1372 .iter()
1373 .map(|(name, network)| (name.clone(), Arc::clone(network)))
1374 .collect::<Vec<_>>()
1375 };
1376
1377 let mut rows = Vec::new();
1378
1379 for name in &discovered {
1380 if let Some((_, network)) = managed_snapshot.iter().find(|(n, _)| n == name) {
1381 rows.push(NetworkRow {
1382 network_name: name.clone(),
1383 discovered: true,
1384 managed: true,
1385 running: network.is_running().await,
1386 node_count: Some(network.node_count),
1387 });
1388 } else {
1389 let layout = AssetsLayout::new(self.assets_root.clone(), name.clone());
1390 rows.push(NetworkRow {
1391 network_name: name.clone(),
1392 discovered: true,
1393 managed: false,
1394 running: false,
1395 node_count: layout.count_nodes().await.ok(),
1396 });
1397 }
1398 }
1399
1400 for (name, network) in managed_snapshot {
1401 if !discovered.iter().any(|candidate| candidate == &name) {
1402 rows.push(NetworkRow {
1403 network_name: name,
1404 discovered: false,
1405 managed: true,
1406 running: network.is_running().await,
1407 node_count: Some(network.node_count),
1408 });
1409 }
1410 }
1411
1412 rows.sort_by(|a, b| a.network_name.cmp(&b.network_name));
1413
1414 Ok(ListNetworksResponse { networks: rows })
1415 }
1416
1417 async fn managed_processes(
1418 &self,
1419 request: ManagedProcessesRequest,
1420 ) -> Result<ManagedProcessesResponse> {
1421 let network = self.get_network(&request.network_name).await?;
1422 let running_only = request.running_only.unwrap_or(true);
1423 let process_name = request
1424 .process_name
1425 .as_deref()
1426 .map(str::trim)
1427 .filter(|name| !name.is_empty())
1428 .map(ToString::to_string);
1429
1430 let process_name_lc = process_name.as_ref().map(|name| name.to_ascii_lowercase());
1431 let state = network.state.lock().await;
1432 let mut processes = Vec::new();
1433 for process in &state.processes {
1434 if let Some(name_lc) = process_name_lc.as_deref()
1435 && !process.id.to_ascii_lowercase().contains(name_lc)
1436 {
1437 continue;
1438 }
1439
1440 let running = matches!(process.last_status, ProcessStatus::Running)
1441 && process.pid.is_some_and(is_pid_running);
1442 if running_only && !running {
1443 continue;
1444 }
1445
1446 processes.push(ManagedProcessRow {
1447 id: process.id.clone(),
1448 node_id: process.node_id,
1449 kind: process_kind_name(&process.kind).to_string(),
1450 pid: process.pid,
1451 running,
1452 last_status: process_status_name(&process.last_status).to_string(),
1453 command: process.command.clone(),
1454 args: process.args.clone(),
1455 cwd: process.cwd.clone(),
1456 stdout_path: process.stdout_path.clone(),
1457 stderr_path: process.stderr_path.clone(),
1458 });
1459 }
1460
1461 Ok(ManagedProcessesResponse {
1462 network_name: request.network_name,
1463 running_only,
1464 process_name,
1465 processes,
1466 })
1467 }
1468
1469 async fn get_network(&self, network_name: &str) -> Result<Arc<ManagedNetwork>> {
1470 let guard = self.managed.lock().await;
1471 guard.get(network_name).cloned().ok_or_else(|| {
1472 anyhow!(
1473 "network '{}' is not managed; call spawn_network first",
1474 network_name
1475 )
1476 })
1477 }
1478
1479 async fn stop_all_networks(&self) -> Result<()> {
1480 let managed = {
1481 let mut guard = self.managed.lock().await;
1482 guard
1483 .drain()
1484 .map(|(_, network)| network)
1485 .collect::<Vec<_>>()
1486 };
1487
1488 let mut errors = Vec::new();
1489 for network in managed {
1490 if let Err(err) = network.stop().await {
1491 errors.push(err.to_string());
1492 }
1493 }
1494
1495 if errors.is_empty() {
1496 Ok(())
1497 } else {
1498 Err(anyhow!(errors.join("\n")))
1499 }
1500 }
1501
1502 async fn raw_rpc_query(
1503 &self,
1504 node_id: u32,
1505 method: &str,
1506 params: Option<Value>,
1507 ) -> Result<Value> {
1508 let endpoint = assets::rpc_endpoint(node_id);
1509 let payload = match params {
1510 Some(params) => json!({
1511 "id": 1,
1512 "jsonrpc": "2.0",
1513 "method": method,
1514 "params": params,
1515 }),
1516 None => json!({
1517 "id": 1,
1518 "jsonrpc": "2.0",
1519 "method": method,
1520 }),
1521 };
1522
1523 let response = self
1524 .http
1525 .post(endpoint)
1526 .json(&payload)
1527 .send()
1528 .await?
1529 .error_for_status()?;
1530 Ok(response.json::<Value>().await?)
1531 }
1532
1533 async fn list_derived_accounts(&self, network_name: &str) -> Result<Vec<DerivedAccountRow>> {
1534 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1535 let csv = assets::derived_accounts_summary(&layout)
1536 .await
1537 .ok_or_else(|| {
1538 anyhow!(
1539 "missing derived-accounts.csv for network '{}'",
1540 network_name
1541 )
1542 })?;
1543
1544 let seed = {
1545 let managed = self.managed.lock().await;
1546 managed
1547 .get(network_name)
1548 .map(|network| Arc::clone(&network.seed))
1549 };
1550
1551 parse_derived_accounts_csv(&csv, seed).await
1552 }
1553
1554 async fn derived_account_for_path(
1555 &self,
1556 network: &Arc<ManagedNetwork>,
1557 path: &str,
1558 ) -> Result<DerivedSigner> {
1559 let material =
1560 assets::derive_account_from_seed_path(Arc::clone(&network.seed), path).await?;
1561 let secret_key = SecretKey::from_pem(&material.secret_key_pem)?;
1562 Ok(DerivedSigner {
1563 public_key_hex: material.public_key_hex,
1564 account_hash: material.account_hash,
1565 secret_key,
1566 })
1567 }
1568}
1569
1570#[derive(Debug)]
1571struct SseStore {
1572 sequence: AtomicU64,
1573 events: Mutex<VecDeque<SseRecord>>,
1574 notify: Notify,
1575 capacity: usize,
1576}
1577
1578impl SseStore {
1579 fn new(capacity: usize) -> Self {
1580 Self {
1581 sequence: AtomicU64::new(0),
1582 events: Mutex::new(VecDeque::new()),
1583 notify: Notify::new(),
1584 capacity,
1585 }
1586 }
1587
1588 async fn latest_sequence(&self) -> u64 {
1589 self.sequence.load(Ordering::SeqCst)
1590 }
1591
1592 async fn push(&self, node_id: u32, event: SseEvent) {
1593 let sequence = self.sequence.fetch_add(1, Ordering::SeqCst) + 1;
1594 let event_type = sse_event_type(&event).to_string();
1595 let payload = sse_event_payload(&event);
1596
1597 let record = SseRecord {
1598 sequence,
1599 timestamp_rfc3339: timestamp_prefix(),
1600 node_id,
1601 event_type,
1602 payload,
1603 };
1604
1605 let mut guard = self.events.lock().await;
1606 guard.push_back(record);
1607 while guard.len() > self.capacity {
1608 let _ = guard.pop_front();
1609 }
1610 drop(guard);
1611
1612 self.notify.notify_waiters();
1613 }
1614
1615 async fn wait_next(
1616 &self,
1617 after_sequence: u64,
1618 filter: &SseFilter,
1619 timeout: Duration,
1620 ) -> Result<SseRecord> {
1621 let deadline = Instant::now() + timeout;
1622 loop {
1623 if let Some(record) = self.find_first_after(after_sequence, filter).await {
1624 return Ok(record);
1625 }
1626
1627 if Instant::now() >= deadline {
1628 return Err(anyhow!("timed out waiting for SSE event"));
1629 }
1630
1631 let remaining = deadline.saturating_duration_since(Instant::now());
1632 tokio::time::timeout(remaining, self.notify.notified()).await?;
1633 }
1634 }
1635
1636 async fn find_first_after(&self, after_sequence: u64, filter: &SseFilter) -> Option<SseRecord> {
1637 let guard = self.events.lock().await;
1638 guard
1639 .iter()
1640 .find(|event| event.sequence > after_sequence && filter.matches(event))
1641 .cloned()
1642 }
1643
1644 async fn history(
1645 &self,
1646 before_sequence: Option<u64>,
1647 limit: usize,
1648 filter: &SseFilter,
1649 ) -> SseHistoryPage {
1650 let before = before_sequence.unwrap_or(u64::MAX);
1651 let guard = self.events.lock().await;
1652
1653 let mut matched = guard
1654 .iter()
1655 .filter(|event| event.sequence < before)
1656 .filter(|event| filter.matches(event))
1657 .cloned()
1658 .collect::<Vec<_>>();
1659
1660 let total = matched.len();
1661
1662 if matched.len() > limit {
1663 matched = matched.split_off(matched.len() - limit);
1664 }
1665
1666 let next_before_sequence = matched.first().map(|event| event.sequence);
1667 SseHistoryPage {
1668 total_matching: total,
1669 returned: matched.len(),
1670 next_before_sequence,
1671 events: matched,
1672 }
1673 }
1674}
1675
1676#[derive(Debug, Default)]
1677struct SseFilter {
1678 include_event_types: Vec<String>,
1679 exclude_event_types: Vec<String>,
1680}
1681
1682impl SseFilter {
1683 fn matches(&self, event: &SseRecord) -> bool {
1684 let include = if self.include_event_types.is_empty() {
1685 true
1686 } else {
1687 self.include_event_types
1688 .iter()
1689 .any(|name| name == &event.event_type)
1690 };
1691 if !include {
1692 return false;
1693 }
1694 !self
1695 .exclude_event_types
1696 .iter()
1697 .any(|name| name == &event.event_type)
1698 }
1699}
1700
1701#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
1702struct SseRecord {
1703 sequence: u64,
1704 timestamp_rfc3339: String,
1705 node_id: u32,
1706 event_type: String,
1707 payload: Value,
1708}
1709
1710#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
1711struct SseHistoryPage {
1712 total_matching: usize,
1713 returned: usize,
1714 next_before_sequence: Option<u64>,
1715 events: Vec<SseRecord>,
1716}
1717
1718#[derive(Debug)]
1719struct DerivedSigner {
1720 public_key_hex: String,
1721 account_hash: String,
1722 secret_key: SecretKey,
1723}
1724
1725#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1726struct SpawnNetworkRequest {
1727 network_name: Option<String>,
1728 protocol_version: Option<String>,
1729 node_count: Option<u32>,
1730 users: Option<u32>,
1731 delay: Option<u64>,
1732 log_level: Option<String>,
1733 node_log_format: Option<String>,
1734 seed: Option<String>,
1735 force_setup: Option<bool>,
1736}
1737
1738#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1739struct SpawnNetworkResponse {
1740 network_name: String,
1741 node_count: u32,
1742 managed: bool,
1743 already_running: bool,
1744 forced_setup: bool,
1745}
1746
1747#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1748struct WaitNetworkReadyRequest {
1749 network_name: String,
1750 timeout_seconds: Option<u64>,
1751}
1752
1753#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1754struct WaitReadyResponse {
1755 network_name: String,
1756 ready: bool,
1757 node_count: u32,
1758 rest: HashMap<u32, Value>,
1759 last_block_height: Option<u64>,
1760}
1761
1762#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1763struct DespawnNetworkRequest {
1764 network_name: String,
1765 purge: Option<bool>,
1766}
1767
1768#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1769struct DespawnResponse {
1770 network_name: String,
1771 purged: bool,
1772}
1773
1774#[derive(Debug, Deserialize, Default, rmcp::schemars::JsonSchema)]
1775struct ListNetworksRequest {}
1776
1777#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1778struct ListNetworksResponse {
1779 networks: Vec<NetworkRow>,
1780}
1781
1782#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1783struct NetworkRow {
1784 network_name: String,
1785 discovered: bool,
1786 managed: bool,
1787 running: bool,
1788 node_count: Option<u32>,
1789}
1790
1791#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1792struct ManagedProcessesRequest {
1793 network_name: String,
1794 process_name: Option<String>,
1795 running_only: Option<bool>,
1796}
1797
1798#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1799struct ManagedProcessesResponse {
1800 network_name: String,
1801 running_only: bool,
1802 process_name: Option<String>,
1803 processes: Vec<ManagedProcessRow>,
1804}
1805
1806#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1807struct ManagedProcessRow {
1808 id: String,
1809 node_id: u32,
1810 kind: String,
1811 pid: Option<u32>,
1812 running: bool,
1813 last_status: String,
1814 command: String,
1815 args: Vec<String>,
1816 cwd: String,
1817 stdout_path: String,
1818 stderr_path: String,
1819}
1820
1821#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1822struct NodeScopedRequest {
1823 network_name: String,
1824 node_id: u32,
1825}
1826
1827#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1828struct RpcQueryRequest {
1829 network_name: String,
1830 node_id: u32,
1831 method: String,
1832 params: Option<Value>,
1833}
1834
1835#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1836struct RpcQueryBalanceRequest {
1837 network_name: String,
1838 node_id: u32,
1839 purse_identifier: String,
1840 block_id: Option<String>,
1841 state_root_hash: Option<String>,
1842}
1843
1844#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1845struct RpcQueryGlobalStateRequest {
1846 network_name: String,
1847 node_id: u32,
1848 key: String,
1849 path: Option<Vec<String>>,
1850 block_id: Option<String>,
1851 state_root_hash: Option<String>,
1852}
1853
1854#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1855struct CurrentBlockRequest {
1856 network_name: String,
1857 node_id: u32,
1858 block_id: Option<String>,
1859}
1860
1861#[derive(Debug, Clone, Copy, Deserialize, Serialize, rmcp::schemars::JsonSchema)]
1862#[serde(rename_all = "lowercase")]
1863enum NodeLogStream {
1864 Stdout,
1865 Stderr,
1866}
1867
1868#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1869struct GetNodeLogsRequest {
1870 network_name: String,
1871 node_id: u32,
1872 stream: NodeLogStream,
1873 limit: Option<usize>,
1874 before_line: Option<usize>,
1875}
1876
1877#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1878struct LogLine {
1879 line_number: usize,
1880 content: String,
1881}
1882
1883#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
1884struct LogPage {
1885 path: String,
1886 total_lines: usize,
1887 returned: usize,
1888 next_before_line: Option<usize>,
1889 lines: Vec<LogLine>,
1890}
1891
1892#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1893struct WaitNextSseRequest {
1894 network_name: String,
1895 include_event_types: Option<Vec<String>>,
1896 exclude_event_types: Option<Vec<String>>,
1897 after_sequence: Option<u64>,
1898 timeout_seconds: Option<u64>,
1899}
1900
1901#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1902struct SseHistoryRequest {
1903 network_name: String,
1904 include_event_types: Option<Vec<String>>,
1905 exclude_event_types: Option<Vec<String>>,
1906 before_sequence: Option<u64>,
1907 limit: Option<usize>,
1908}
1909
1910#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1911struct ListDerivedAccountsRequest {
1912 network_name: String,
1913}
1914
1915#[derive(Debug, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
1916struct DerivedAccountRow {
1917 kind: String,
1918 name: String,
1919 key_type: String,
1920 derivation: String,
1921 path: String,
1922 account_hash: String,
1923 balance: String,
1924 public_key: Option<String>,
1925}
1926
1927#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1928struct SendTransactionSignedRequest {
1929 network_name: String,
1930 node_id: u32,
1931 signer_path: String,
1932 transaction: Value,
1933}
1934
1935#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1936struct MakeTransactionPackageCallRequest {
1937 network_name: String,
1938 node_id: u32,
1939 transaction_package_name: String,
1940 transaction_package_version: Option<EntityVersion>,
1941 session_entry_point: String,
1942 #[serde(alias = "session_args_json")]
1943 session_args: Option<Value>,
1944 signer_path: Option<String>,
1945 initiator_public_key: Option<String>,
1946 chain_name: Option<String>,
1947 ttl_millis: Option<u64>,
1948 gas_price_tolerance: Option<u8>,
1949 payment_amount: Option<u64>,
1950 runtime_transferred_value: Option<u64>,
1951 runtime_seed_hex: Option<String>,
1952}
1953
1954#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1955struct MakeTransactionContractCallRequest {
1956 network_name: String,
1957 node_id: u32,
1958 transaction_contract_hash: String,
1959 session_entry_point: String,
1960 #[serde(alias = "session_args_json")]
1961 session_args: Option<Value>,
1962 signer_path: Option<String>,
1963 initiator_public_key: Option<String>,
1964 chain_name: Option<String>,
1965 ttl_millis: Option<u64>,
1966 gas_price_tolerance: Option<u8>,
1967 payment_amount: Option<u64>,
1968 runtime_transferred_value: Option<u64>,
1969 runtime_seed_hex: Option<String>,
1970}
1971
1972#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1973struct MakeTransactionSessionWasmRequest {
1974 network_name: String,
1975 node_id: u32,
1976 wasm_path: String,
1977 #[serde(alias = "session_args_json")]
1978 session_args: Option<Value>,
1979 is_install_upgrade: Option<bool>,
1980 signer_path: Option<String>,
1981 initiator_public_key: Option<String>,
1982 chain_name: Option<String>,
1983 ttl_millis: Option<u64>,
1984 gas_price_tolerance: Option<u8>,
1985 payment_amount: Option<u64>,
1986 runtime_transferred_value: Option<u64>,
1987 runtime_seed_hex: Option<String>,
1988}
1989
1990#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
1991struct SendSessionWasmRequest {
1992 network_name: String,
1993 node_id: u32,
1994 signer_path: String,
1995 wasm_path: String,
1996 #[serde(alias = "session_args_json")]
1997 session_args: Option<Value>,
1998 chain_name: Option<String>,
1999 is_install_upgrade: Option<bool>,
2000 ttl_millis: Option<u64>,
2001 gas_price_tolerance: Option<u8>,
2002 payment_amount: Option<u64>,
2003 runtime_transferred_value: Option<u64>,
2004 runtime_seed_hex: Option<String>,
2005}
2006
2007#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2008struct TransferTokensRequest {
2009 network_name: String,
2010 node_id: u32,
2011 from_path: String,
2012 to_path: String,
2013 amount: String,
2014 transfer_id: Option<u64>,
2015 chain_name: Option<String>,
2016 ttl_millis: Option<u64>,
2017 gas_price_tolerance: Option<u8>,
2018 payment_amount: Option<u64>,
2019}
2020
2021#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2022struct WaitTransactionRequest {
2023 network_name: String,
2024 node_id: u32,
2025 transaction_hash: String,
2026 timeout_seconds: Option<u64>,
2027 poll_interval_millis: Option<u64>,
2028}
2029
2030#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2031struct GetTransactionRequest {
2032 network_name: String,
2033 node_id: u32,
2034 transaction_hash: String,
2035}
2036
2037#[derive(Debug, Deserialize)]
2038#[serde(rename_all = "snake_case")]
2039struct RestStatusProbe {
2040 reactor_state: Option<String>,
2041}
2042
2043fn to_mcp_error(err: impl std::fmt::Display) -> ErrorData {
2044 ErrorData::internal_error(err.to_string(), None)
2045}
2046
2047fn internal_serde_error(err: serde_json::Error) -> ErrorData {
2048 ErrorData::internal_error(format!("serde error: {}", err), None)
2049}
2050
2051fn ok_value(value: Value) -> rmcp::model::CallToolResult {
2052 rmcp::model::CallToolResult::structured(value)
2053}
2054
2055fn parse_transaction_json(value: Value) -> std::result::Result<Transaction, ErrorData> {
2056 if let Value::String(encoded) = &value {
2057 let _ = encoded;
2058 return Err(ErrorData::invalid_params(
2059 "invalid transaction payload: encoded JSON strings are not supported. Provide transaction as a typed JSON object",
2060 None,
2061 ));
2062 }
2063
2064 if let Ok(transaction) = serde_json::from_value::<Transaction>(value.clone()) {
2065 return Ok(transaction);
2066 }
2067
2068 if let Some(inner) = value.get("transaction") {
2069 return parse_transaction_json(inner.clone());
2070 }
2071
2072 Err(ErrorData::invalid_params(
2073 "failed to parse transaction JSON; expected Transaction object or { transaction: Transaction }, with typed JSON values only.",
2074 None,
2075 ))
2076}
2077
2078fn parse_transaction_hash_input(input: &str) -> Result<TransactionHash> {
2079 let value = input.trim();
2080 if value.is_empty() {
2081 return Err(anyhow!("transaction_hash must not be empty"));
2082 }
2083
2084 let unwrapped = value
2085 .strip_prefix("transaction-v1-hash(")
2086 .and_then(|inner| inner.strip_suffix(')'))
2087 .or_else(|| {
2088 value
2089 .strip_prefix("deploy-hash(")
2090 .and_then(|inner| inner.strip_suffix(')'))
2091 })
2092 .unwrap_or(value);
2093
2094 if unwrapped.contains("..") {
2095 return Err(anyhow!(
2096 "transaction_hash appears abbreviated; provide full hex digest"
2097 ));
2098 }
2099
2100 let normalized = unwrapped.strip_prefix("0x").unwrap_or(unwrapped);
2101 let digest = Digest::from_hex(normalized)
2102 .map_err(|err| anyhow!("failed to parse transaction hash digest: {}", err))?;
2103 Ok(TransactionHash::from(TransactionV1Hash::from(digest)))
2104}
2105
2106fn mcp_rpc_client(network_name: &str, node_id: u32) -> Result<CasperClient> {
2107 CasperClient::new(
2108 network_name.to_string(),
2109 vec![assets::rpc_endpoint(node_id)],
2110 )
2111 .map_err(|err| {
2112 anyhow!(
2113 "failed to initialize rpc client for node {}: {}",
2114 node_id,
2115 err
2116 )
2117 })
2118}
2119
2120fn extract_rpc_result(response: Value) -> Result<Value> {
2121 if let Some(error) = response.get("error") {
2122 return Err(anyhow!("rpc request failed: {}", error));
2123 }
2124 response
2125 .get("result")
2126 .cloned()
2127 .ok_or_else(|| anyhow!("rpc response missing result field"))
2128}
2129
2130async fn fetch_block_result(
2131 manager: &NetworkManager,
2132 network_name: &str,
2133 node_id: u32,
2134 block_id: Option<&str>,
2135) -> Result<Value> {
2136 let block_id = normalize_optional_identifier(block_id);
2137 if block_id.is_empty() {
2138 let rpc = mcp_rpc_client(network_name, node_id)?;
2139 let result = rpc.get_block().await?;
2140 return serde_json::to_value(result).map_err(Into::into);
2141 }
2142
2143 let block_identifier = parse_block_identifier_value(&block_id)?;
2144 let response = manager
2145 .raw_rpc_query(
2146 node_id,
2147 "chain_get_block",
2148 Some(json!({
2149 "block_identifier": block_identifier
2150 })),
2151 )
2152 .await?;
2153 extract_rpc_result(response)
2154}
2155
2156fn build_query_global_state_params(
2157 key: &str,
2158 path: Vec<String>,
2159 block_id: &str,
2160 state_root_hash: &str,
2161) -> Result<Value> {
2162 let key = parse_query_key(key)?;
2163 let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2164 let mut params = serde_json::Map::new();
2165 params.insert("key".to_string(), Value::String(key));
2166 params.insert(
2167 "path".to_string(),
2168 Value::Array(path.into_iter().map(Value::String).collect()),
2169 );
2170 if let Some(state_identifier) = state_identifier {
2171 params.insert("state_identifier".to_string(), state_identifier);
2172 }
2173 Ok(Value::Object(params))
2174}
2175
2176fn build_query_balance_params(
2177 purse_identifier: &str,
2178 block_id: &str,
2179 state_root_hash: &str,
2180) -> Result<Value> {
2181 let purse_identifier = parse_purse_identifier(purse_identifier)?;
2182 let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2183 let mut params = serde_json::Map::new();
2184 params.insert("purse_identifier".to_string(), purse_identifier);
2185 if let Some(state_identifier) = state_identifier {
2186 params.insert("state_identifier".to_string(), state_identifier);
2187 }
2188 Ok(Value::Object(params))
2189}
2190
2191fn parse_state_identifier(block_id: &str, state_root_hash: &str) -> Result<Option<Value>> {
2192 let block_id = block_id.trim();
2193 if !block_id.is_empty() {
2194 if block_id.len() == Digest::LENGTH * 2 {
2195 Digest::from_hex(block_id)
2196 .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2197 return Ok(Some(json!({
2198 "BlockHash": block_id,
2199 })));
2200 }
2201
2202 let height = block_id
2203 .parse::<u64>()
2204 .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2205 return Ok(Some(json!({
2206 "BlockHeight": height,
2207 })));
2208 }
2209
2210 let state_root_hash = state_root_hash.trim();
2211 if state_root_hash.is_empty() {
2212 return Ok(None);
2213 }
2214 Digest::from_hex(state_root_hash)
2215 .map_err(|err| anyhow!("invalid state_root_hash digest: {}", err))?;
2216 Ok(Some(json!({
2217 "StateRootHash": state_root_hash,
2218 })))
2219}
2220
2221fn parse_block_identifier_value(block_id: &str) -> Result<Value> {
2222 let block_id = block_id.trim();
2223 if block_id.is_empty() {
2224 return Err(anyhow!("block identifier must not be empty"));
2225 }
2226 if block_id.len() == Digest::LENGTH * 2 {
2227 Digest::from_hex(block_id)
2228 .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2229 Ok(json!({
2230 "Hash": block_id,
2231 }))
2232 } else {
2233 let height = block_id
2234 .parse::<u64>()
2235 .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2236 Ok(json!({
2237 "Height": height,
2238 }))
2239 }
2240}
2241
2242fn parse_query_key(key: &str) -> Result<String> {
2243 let key = key.trim();
2244 if key.is_empty() {
2245 return Err(anyhow!("key must not be empty"));
2246 }
2247 if let Ok(contract_hash) = ContractHash::from_formatted_str(key) {
2248 return Ok(Key::Hash(contract_hash.value()).to_formatted_string());
2249 }
2250 if let Ok(parsed) = Key::from_formatted_str(key) {
2251 return Ok(parsed.to_formatted_string());
2252 }
2253 if let Ok(public_key) = PublicKey::from_hex(key) {
2254 return Ok(Key::Account(public_key.to_account_hash()).to_formatted_string());
2255 }
2256 Err(anyhow!(
2257 "failed to parse key for query; expected formatted Key or public key hex"
2258 ))
2259}
2260
2261fn parse_contract_hash_for_invocation(input: &str) -> Result<AddressableEntityHash> {
2262 let input = input.trim();
2263 if input.is_empty() {
2264 return Err(anyhow!("transaction_contract_hash must not be empty"));
2265 }
2266
2267 if let Ok(contract_hash) = ContractHash::from_formatted_str(input) {
2268 return Ok(contract_hash.into());
2269 }
2270
2271 let digest_hex = input
2272 .strip_prefix("hash-")
2273 .or_else(|| input.strip_prefix("contract-hash-"))
2274 .unwrap_or(input);
2275 let bytes = hex_to_bytes(digest_hex)?;
2276 if bytes.len() != Digest::LENGTH {
2277 return Err(anyhow!(
2278 "transaction_contract_hash must be {} bytes",
2279 Digest::LENGTH
2280 ));
2281 }
2282 let mut hash = [0u8; Digest::LENGTH];
2283 hash.copy_from_slice(&bytes);
2284 Ok(ContractHash::new(hash).into())
2285}
2286
2287fn parse_purse_identifier(input: &str) -> Result<Value> {
2288 let input = input.trim();
2289 if input.is_empty() {
2290 return Err(anyhow!("purse_identifier must not be empty"));
2291 }
2292
2293 if input.starts_with("account-hash-") {
2294 casper_types::account::AccountHash::from_formatted_str(input)
2295 .map_err(|err| anyhow!("invalid account hash purse identifier: {}", err))?;
2296 return Ok(json!({
2297 "main_purse_under_account_hash": input,
2298 }));
2299 }
2300
2301 if input.starts_with("entity-") {
2302 return Ok(json!({
2303 "main_purse_under_entity_addr": input,
2304 }));
2305 }
2306
2307 if input.starts_with("uref-") {
2308 URef::from_formatted_str(input)
2309 .map_err(|err| anyhow!("invalid uref purse identifier: {}", err))?;
2310 return Ok(json!({
2311 "purse_uref": input,
2312 }));
2313 }
2314
2315 let public_key = PublicKey::from_hex(input)
2316 .map_err(|err| anyhow!("invalid public key purse identifier: {}", err))?;
2317 Ok(json!({
2318 "main_purse_under_public_key": public_key.to_hex_string(),
2319 }))
2320}
2321
2322fn build_pricing_mode(gas_price_tolerance: Option<u8>, payment_amount: Option<u64>) -> PricingMode {
2323 if let Some(payment_amount) = payment_amount {
2324 PricingMode::PaymentLimited {
2325 payment_amount,
2326 gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2327 standard_payment: true,
2328 }
2329 } else {
2330 PricingMode::Fixed {
2331 gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2332 additional_computation_factor: 0,
2333 }
2334 }
2335}
2336
2337fn parse_optional_seed_hex(seed: Option<&str>) -> Result<Option<[u8; 32]>> {
2338 let Some(seed) = seed else {
2339 return Ok(None);
2340 };
2341 let seed = seed.trim();
2342 if seed.is_empty() {
2343 return Ok(None);
2344 }
2345
2346 let cleaned = seed.strip_prefix("0x").unwrap_or(seed);
2347 let bytes = hex_to_bytes(cleaned)?;
2348 if bytes.len() != 32 {
2349 return Err(anyhow!("runtime_seed_hex must be exactly 32 bytes"));
2350 }
2351
2352 let mut out = [0u8; 32];
2353 out.copy_from_slice(&bytes);
2354 Ok(Some(out))
2355}
2356
2357fn normalize_optional_identifier(value: Option<&str>) -> String {
2358 value.map(str::trim).unwrap_or_default().to_string()
2359}
2360
2361async fn resolve_global_state_identifier(
2362 network_name: &str,
2363 node_id: u32,
2364 block_id: Option<&str>,
2365 state_root_hash: Option<&str>,
2366) -> Result<(String, String)> {
2367 let block_id = normalize_optional_identifier(block_id);
2368 let state_root_hash = normalize_optional_identifier(state_root_hash);
2369
2370 if !block_id.is_empty() || !state_root_hash.is_empty() {
2371 return Ok((block_id, state_root_hash));
2372 }
2373
2374 let rpc = mcp_rpc_client(network_name, node_id)?;
2375 let response = rpc.get_block().await?;
2376 let latest_block = response
2377 .block_with_signatures
2378 .ok_or_else(|| anyhow!("latest block was not returned by chain_get_block"))?;
2379
2380 Ok((latest_block.block.hash().to_hex_string(), String::new()))
2381}
2382
2383fn hex_to_bytes(input: &str) -> Result<Vec<u8>> {
2384 if !input.len().is_multiple_of(2) {
2385 return Err(anyhow!("hex string must have even length"));
2386 }
2387 let mut out = Vec::with_capacity(input.len() / 2);
2388 let mut chars = input.chars();
2389 while let (Some(hi), Some(lo)) = (chars.next(), chars.next()) {
2390 let byte = ((hex_nibble(hi)? as u8) << 4) | (hex_nibble(lo)? as u8);
2391 out.push(byte);
2392 }
2393 Ok(out)
2394}
2395
2396fn hex_nibble(ch: char) -> Result<u32> {
2397 ch.to_digit(16)
2398 .ok_or_else(|| anyhow!("invalid hex character '{}'", ch))
2399}
2400
2401async fn ensure_running_network(
2402 network: &Arc<ManagedNetwork>,
2403) -> std::result::Result<(), ErrorData> {
2404 if network.is_running().await {
2405 Ok(())
2406 } else {
2407 Err(ErrorData::resource_not_found(
2408 "network is not running; call spawn_network then wait_network_ready",
2409 None,
2410 ))
2411 }
2412}
2413
2414async fn check_rest_ready(network: &Arc<ManagedNetwork>) -> Result<HashMap<u32, Value>> {
2415 let node_ids = {
2416 let state = network.state.lock().await;
2417 state
2418 .processes
2419 .iter()
2420 .filter_map(|process| {
2421 if matches!(process.kind, ProcessKind::Node) {
2422 Some(process.node_id)
2423 } else {
2424 None
2425 }
2426 })
2427 .collect::<Vec<_>>()
2428 };
2429
2430 if node_ids.is_empty() {
2431 return Err(anyhow!("network has no node processes"));
2432 }
2433
2434 let mut by_node = HashMap::new();
2435 for node_id in node_ids {
2436 let value = fetch_rest_status(node_id).await?;
2437 let probe: RestStatusProbe = serde_json::from_value(value.clone())
2438 .with_context(|| format!("invalid /status payload for node {}", node_id))?;
2439 if probe.reactor_state.as_deref() != Some("Validate") {
2440 return Err(anyhow!("node {} reactor is not Validate", node_id));
2441 }
2442 by_node.insert(node_id, value);
2443 }
2444
2445 Ok(by_node)
2446}
2447
2448fn rest_has_block(value: &Value) -> bool {
2449 value
2450 .get("last_added_block_info")
2451 .and_then(|entry| entry.get("height"))
2452 .and_then(Value::as_u64)
2453 .is_some()
2454}
2455
2456async fn fetch_rest_status(node_id: u32) -> Result<Value> {
2457 let url = format!("{}/status", assets::rest_endpoint(node_id));
2458 let client = reqwest::Client::builder()
2459 .no_proxy()
2460 .timeout(Duration::from_secs(4))
2461 .build()?;
2462 let response = client.get(url).send().await?.error_for_status()?;
2463 Ok(response.json::<Value>().await?)
2464}
2465
2466async fn run_sse_listener(network: Arc<ManagedNetwork>, node_id: u32, endpoint: String) {
2467 let mut backoff = ExponentialBackoff::default();
2468
2469 loop {
2470 if network.shutdown.load(Ordering::SeqCst) {
2471 return;
2472 }
2473
2474 let config = match ListenerConfig::builder()
2475 .with_endpoint(endpoint.clone())
2476 .build()
2477 {
2478 Ok(config) => config,
2479 Err(_) => {
2480 if !sleep_backoff(&mut backoff).await {
2481 return;
2482 }
2483 continue;
2484 }
2485 };
2486
2487 let stream = match sse::listener(config).await {
2488 Ok(stream) => {
2489 backoff.reset();
2490 stream
2491 }
2492 Err(_) => {
2493 if !sleep_backoff(&mut backoff).await {
2494 return;
2495 }
2496 continue;
2497 }
2498 };
2499
2500 futures::pin_mut!(stream);
2501 let mut failed = false;
2502
2503 while let Some(event) = stream.next().await {
2504 if network.shutdown.load(Ordering::SeqCst) {
2505 return;
2506 }
2507
2508 match event {
2509 Ok(event) => {
2510 if let SseEvent::BlockAdded { block, .. } = &event {
2511 let _ = record_last_block_height(&network.state, block.height()).await;
2512 }
2513 network.sse_store.push(node_id, event).await;
2514 }
2515 Err(_) => {
2516 failed = true;
2517 break;
2518 }
2519 }
2520 }
2521
2522 if failed && !sleep_backoff(&mut backoff).await {
2523 return;
2524 }
2525 }
2526}
2527
2528async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
2529 let mut state = state.lock().await;
2530 if state.last_block_height == Some(height) {
2531 return Ok(());
2532 }
2533 state.last_block_height = Some(height);
2534 state.touch().await
2535}
2536
2537async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
2538 if let Some(delay) = backoff.next_backoff() {
2539 tokio::time::sleep(delay).await;
2540 true
2541 } else {
2542 false
2543 }
2544}
2545
2546fn sse_event_type(event: &SseEvent) -> &'static str {
2547 match event {
2548 SseEvent::ApiVersion(_) => "ApiVersion",
2549 SseEvent::DeployAccepted(_) => "DeployAccepted",
2550 SseEvent::BlockAdded { .. } => "BlockAdded",
2551 SseEvent::DeployProcessed(_) => "DeployProcessed",
2552 SseEvent::DeployExpired(_) => "DeployExpired",
2553 SseEvent::TransactionAccepted(_) => "TransactionAccepted",
2554 SseEvent::TransactionProcessed { .. } => "TransactionProcessed",
2555 SseEvent::TransactionExpired { .. } => "TransactionExpired",
2556 SseEvent::Fault { .. } => "Fault",
2557 SseEvent::FinalitySignature(_) => "FinalitySignature",
2558 SseEvent::Step { .. } => "Step",
2559 SseEvent::Shutdown => "Shutdown",
2560 }
2561}
2562
2563fn process_kind_name(kind: &ProcessKind) -> &'static str {
2564 match kind {
2565 ProcessKind::Node => "node",
2566 ProcessKind::Sidecar => "sidecar",
2567 }
2568}
2569
2570fn process_status_name(status: &ProcessStatus) -> &'static str {
2571 match status {
2572 ProcessStatus::Running => "running",
2573 ProcessStatus::Stopped => "stopped",
2574 ProcessStatus::Exited => "exited",
2575 ProcessStatus::Unknown => "unknown",
2576 ProcessStatus::Skipped => "skipped",
2577 }
2578}
2579
2580fn sse_event_payload(event: &SseEvent) -> Value {
2581 match event {
2582 SseEvent::ApiVersion(version) => json!({ "api_version": version.to_string() }),
2583 SseEvent::DeployAccepted(payload) => payload.clone(),
2584 SseEvent::BlockAdded { block_hash, block } => json!({
2585 "block_hash": block_hash.to_string(),
2586 "height": block.height(),
2587 "era_id": block.era_id().value(),
2588 }),
2589 SseEvent::DeployProcessed(payload) => payload.clone(),
2590 SseEvent::DeployExpired(payload) => payload.clone(),
2591 SseEvent::TransactionAccepted(transaction) => {
2592 json!({ "transaction_hash": transaction.hash().to_hex_string() })
2593 }
2594 SseEvent::TransactionProcessed {
2595 transaction_hash,
2596 execution_result,
2597 messages,
2598 ..
2599 } => json!({
2600 "transaction_hash": transaction_hash.to_hex_string(),
2601 "execution_result": execution_result,
2602 "messages": messages,
2603 }),
2604 SseEvent::TransactionExpired { transaction_hash } => json!({
2605 "transaction_hash": transaction_hash.to_hex_string(),
2606 }),
2607 SseEvent::Fault {
2608 era_id,
2609 public_key,
2610 timestamp,
2611 } => json!({
2612 "era_id": era_id.value(),
2613 "public_key": public_key.to_hex(),
2614 "timestamp": timestamp,
2615 }),
2616 SseEvent::FinalitySignature(signature) => json!({
2617 "block_hash": signature.block_hash().to_string(),
2618 "era_id": signature.era_id().value(),
2619 "signature": signature.signature().to_hex(),
2620 }),
2621 SseEvent::Step {
2622 era_id,
2623 execution_effects,
2624 } => json!({
2625 "era_id": era_id.value(),
2626 "execution_effects": execution_effects.get(),
2627 }),
2628 SseEvent::Shutdown => json!({}),
2629 }
2630}
2631
2632fn timestamp_prefix() -> String {
2633 time::OffsetDateTime::now_utc()
2634 .format(&time::format_description::well_known::Rfc3339)
2635 .unwrap_or_else(|_| "unknown-time".to_string())
2636}
2637
2638fn processes_running(state: &State) -> bool {
2639 if state.processes.is_empty() {
2640 return false;
2641 }
2642
2643 state.processes.iter().all(|process| {
2644 matches!(process.last_status, ProcessStatus::Running)
2645 && process.pid.is_some_and(is_pid_running)
2646 })
2647}
2648
2649fn is_pid_running(pid: u32) -> bool {
2650 let pid = Pid::from_raw(pid as i32);
2651 match kill(pid, None) {
2652 Ok(()) => true,
2653 Err(Errno::ESRCH) => false,
2654 Err(_) => true,
2655 }
2656}
2657
2658async fn discover_network_names(assets_root: &Path) -> Result<Vec<String>> {
2659 if !is_dir(assets_root).await {
2660 return Ok(Vec::new());
2661 }
2662
2663 let mut names = Vec::new();
2664 let mut entries = tokio_fs::read_dir(assets_root).await?;
2665 while let Some(entry) = entries.next_entry().await? {
2666 if !entry.file_type().await?.is_dir() {
2667 continue;
2668 }
2669 let name = entry.file_name().to_string_lossy().to_string();
2670 if !name.is_empty() {
2671 names.push(name);
2672 }
2673 }
2674
2675 names.sort();
2676 Ok(names)
2677}
2678
2679async fn ensure_sidecar_available(layout: &AssetsLayout, node_count: u32) -> Result<()> {
2680 for node_id in 1..=node_count {
2681 let version_dir = layout.latest_protocol_version_dir(node_id).await?;
2682 let sidecar_bin = layout
2683 .node_bin_dir(node_id)
2684 .join(&version_dir)
2685 .join("casper-sidecar");
2686 let sidecar_cfg = layout
2687 .node_config_root(node_id)
2688 .join(&version_dir)
2689 .join("sidecar.toml");
2690
2691 if !is_file(&sidecar_bin).await {
2692 return Err(anyhow!(
2693 "missing sidecar binary for node {}: {}",
2694 node_id,
2695 sidecar_bin.display()
2696 ));
2697 }
2698
2699 if !is_file(&sidecar_cfg).await {
2700 return Err(anyhow!(
2701 "missing sidecar.toml for node {}: {}",
2702 node_id,
2703 sidecar_cfg.display()
2704 ));
2705 }
2706 }
2707
2708 Ok(())
2709}
2710
2711async fn resolve_protocol_version(candidate: Option<&str>) -> Result<String> {
2712 if let Some(raw) = candidate {
2713 let version = assets::parse_protocol_version(raw)?;
2714 if !assets::has_bundle_version(&version).await? {
2715 return Err(anyhow!("assets for version {} not found", version));
2716 }
2717 return Ok(version.to_string());
2718 }
2719
2720 let version = assets::most_recent_bundle_version()
2721 .await?
2722 .ok_or_else(|| anyhow!("no assets bundles found"))?;
2723 Ok(version.to_string())
2724}
2725
2726async fn parse_derived_accounts_csv(
2727 csv: &str,
2728 seed: Option<Arc<str>>,
2729) -> Result<Vec<DerivedAccountRow>> {
2730 let mut lines = csv.lines();
2731 let header = lines
2732 .next()
2733 .ok_or_else(|| anyhow!("derived accounts csv is empty"))?;
2734 if header.trim() != "kind,name,key_type,derivation,path,account_hash,balance" {
2735 return Err(anyhow!("unexpected derived-accounts.csv header"));
2736 }
2737
2738 let mut rows = Vec::new();
2739 for line in lines {
2740 let line = line.trim();
2741 if line.is_empty() {
2742 continue;
2743 }
2744
2745 let parts = line.splitn(7, ',').collect::<Vec<_>>();
2746 if parts.len() != 7 {
2747 return Err(anyhow!("invalid derived account row: {}", line));
2748 }
2749
2750 let path = parts[4].to_string();
2751 let public_key = if let Some(seed) = &seed {
2752 match assets::derive_account_from_seed_path(Arc::clone(seed), &path).await {
2753 Ok(material) => Some(material.public_key_hex),
2754 Err(_) => None,
2755 }
2756 } else {
2757 None
2758 };
2759
2760 rows.push(DerivedAccountRow {
2761 kind: parts[0].to_string(),
2762 name: parts[1].to_string(),
2763 key_type: parts[2].to_string(),
2764 derivation: parts[3].to_string(),
2765 path,
2766 account_hash: parts[5].to_string(),
2767 balance: parts[6].to_string(),
2768 public_key,
2769 });
2770 }
2771
2772 Ok(rows)
2773}
2774
2775async fn verify_path_hash_consistency(
2776 layout: &AssetsLayout,
2777 path: &str,
2778 expected_account_hash: &str,
2779) -> Result<()> {
2780 let Some(csv) = assets::derived_accounts_summary(layout).await else {
2781 return Ok(());
2782 };
2783
2784 for line in csv.lines().skip(1) {
2785 let line = line.trim();
2786 if line.is_empty() {
2787 continue;
2788 }
2789 let parts = line.splitn(7, ',').collect::<Vec<_>>();
2790 if parts.len() != 7 {
2791 continue;
2792 }
2793 if parts[4] == path {
2794 if parts[5] != expected_account_hash {
2795 return Err(anyhow!(
2796 "derived account hash mismatch for path {}: csv={} derived={}",
2797 path,
2798 parts[5],
2799 expected_account_hash
2800 ));
2801 }
2802 return Ok(());
2803 }
2804 }
2805
2806 Ok(())
2807}
2808
2809async fn fetch_chain_name(network_name: &str, node_id: u32) -> Result<String> {
2810 let rpc = mcp_rpc_client(network_name, node_id)?;
2811 rpc.get_network_name().await.map_err(Into::into)
2812}
2813
2814fn extract_block_height(value: &Value) -> Option<u64> {
2815 value
2816 .pointer("/block_with_signatures/block/header/height")
2817 .and_then(Value::as_u64)
2818 .or_else(|| {
2819 value
2820 .pointer("/block/block/header/height")
2821 .and_then(Value::as_u64)
2822 })
2823 .or_else(|| {
2824 value
2825 .pointer("/block_with_signatures/Version2/block/Version2/header/height")
2826 .and_then(Value::as_u64)
2827 })
2828 .or_else(|| {
2829 value
2830 .pointer("/block_with_signatures/Version1/block/Version1/header/height")
2831 .and_then(Value::as_u64)
2832 })
2833 .or_else(|| find_first_height(value))
2834}
2835
2836fn find_first_height(value: &Value) -> Option<u64> {
2837 match value {
2838 Value::Object(map) => {
2839 if let Some(height) = map.get("height").and_then(Value::as_u64) {
2840 return Some(height);
2841 }
2842 for nested in map.values() {
2843 if let Some(height) = find_first_height(nested) {
2844 return Some(height);
2845 }
2846 }
2847 None
2848 }
2849 Value::Array(items) => {
2850 for item in items {
2851 if let Some(height) = find_first_height(item) {
2852 return Some(height);
2853 }
2854 }
2855 None
2856 }
2857 _ => None,
2858 }
2859}
2860
2861fn parse_no_such_block_range_from_error(error_text: &str) -> Option<(u64, u64)> {
2862 let start = error_text.find('{')?;
2863 let payload = &error_text[start..];
2864 let value: Value = serde_json::from_str(payload).ok()?;
2865
2866 let code = value.get("code").and_then(Value::as_i64)?;
2867 let message = value.get("message").and_then(Value::as_str)?;
2868 if code != -32001 || !message.eq_ignore_ascii_case("No such block") {
2869 return None;
2870 }
2871
2872 let low = value
2873 .pointer("/data/available_block_range/low")
2874 .and_then(Value::as_u64)
2875 .unwrap_or(0);
2876 let high = value
2877 .pointer("/data/available_block_range/high")
2878 .and_then(Value::as_u64)
2879 .unwrap_or(low);
2880 Some((low, high))
2881}
2882
2883async fn read_log_page(path: &Path, before_line: Option<usize>, limit: usize) -> Result<LogPage> {
2884 let contents = tokio_fs::read_to_string(path)
2885 .await
2886 .with_context(|| format!("failed to read log file {}", path.display()))?;
2887
2888 let all_lines = contents
2889 .lines()
2890 .map(ToString::to_string)
2891 .collect::<Vec<_>>();
2892 let total_lines = all_lines.len();
2893
2894 let before = before_line.unwrap_or(total_lines + 1);
2895 if before == 0 {
2896 return Err(anyhow!("before_line must be >= 1"));
2897 }
2898
2899 let end_exclusive = before.saturating_sub(1).min(total_lines);
2900 let start = end_exclusive.saturating_sub(limit);
2901
2902 let mut lines = Vec::new();
2903 for (idx, content) in all_lines[start..end_exclusive].iter().enumerate() {
2904 lines.push(LogLine {
2905 line_number: start + idx + 1,
2906 content: content.clone(),
2907 });
2908 }
2909
2910 let next_before_line = if start == 0 { None } else { Some(start + 1) };
2911
2912 Ok(LogPage {
2913 path: path.display().to_string(),
2914 total_lines,
2915 returned: lines.len(),
2916 next_before_line,
2917 lines,
2918 })
2919}
2920
2921async fn is_dir(path: &Path) -> bool {
2922 tokio_fs::metadata(path)
2923 .await
2924 .map(|meta| meta.is_dir())
2925 .unwrap_or(false)
2926}
2927
2928async fn is_file(path: &Path) -> bool {
2929 tokio_fs::metadata(path)
2930 .await
2931 .map(|meta| meta.is_file())
2932 .unwrap_or(false)
2933}
2934
2935pub async fn run(args: McpArgs) -> Result<()> {
2936 let assets_root = match args.net_path {
2937 Some(path) => path,
2938 None => assets::default_assets_root()?,
2939 };
2940
2941 let manager = Arc::new(NetworkManager::new(assets_root).await?);
2942
2943 let result = match args.transport {
2944 McpTransport::Stdio => run_stdio(manager.clone()).await,
2945 McpTransport::Http => run_http(manager.clone(), &args.http_bind, &args.http_path).await,
2946 McpTransport::Both => run_both(manager.clone(), &args.http_bind, &args.http_path).await,
2947 };
2948
2949 let stop_result = manager.stop_all_networks().await;
2950
2951 match (result, stop_result) {
2952 (Ok(()), Ok(())) => Ok(()),
2953 (Err(err), Ok(())) => Err(err),
2954 (Ok(()), Err(stop_err)) => Err(stop_err),
2955 (Err(run_err), Err(stop_err)) => Err(anyhow!(
2956 "mcp server failed: {run_err}; additionally failed to stop networks: {stop_err}"
2957 )),
2958 }
2959}
2960
2961async fn run_stdio(manager: Arc<NetworkManager>) -> Result<()> {
2962 let service = McpServer::new(manager).serve(mcp_stdio()).await?;
2963 service.waiting().await?;
2964 Ok(())
2965}
2966
2967async fn run_http(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
2968 let path = normalize_http_path(path);
2969 let socket = std::net::SocketAddr::from_str(bind)
2970 .with_context(|| format!("invalid http bind address '{}'", bind))?;
2971
2972 let service: StreamableHttpService<McpServer, LocalSessionManager> = StreamableHttpService::new(
2973 {
2974 let manager = manager.clone();
2975 move || Ok(McpServer::new(manager.clone()))
2976 },
2977 Arc::new(LocalSessionManager::default()),
2978 StreamableHttpServerConfig::default(),
2979 );
2980
2981 let router = axum::Router::new().nest_service(&path, service);
2982 let listener = tokio::net::TcpListener::bind(socket).await?;
2983
2984 axum::serve(listener, router)
2985 .with_graceful_shutdown(async {
2986 let _ = tokio::signal::ctrl_c().await;
2987 })
2988 .await?;
2989
2990 Ok(())
2991}
2992
2993async fn run_both(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
2994 let path = normalize_http_path(path);
2995 let socket = std::net::SocketAddr::from_str(bind)
2996 .with_context(|| format!("invalid http bind address '{}'", bind))?;
2997
2998 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
2999
3000 let mut http_task = {
3001 let manager = manager.clone();
3002 tokio::spawn(async move {
3003 let service: StreamableHttpService<McpServer, LocalSessionManager> =
3004 StreamableHttpService::new(
3005 {
3006 let manager = manager.clone();
3007 move || Ok(McpServer::new(manager.clone()))
3008 },
3009 Arc::new(LocalSessionManager::default()),
3010 StreamableHttpServerConfig::default(),
3011 );
3012
3013 let router = axum::Router::new().nest_service(&path, service);
3014 let listener = tokio::net::TcpListener::bind(socket).await?;
3015
3016 axum::serve(listener, router)
3017 .with_graceful_shutdown(async move {
3018 tokio::select! {
3019 _ = async {
3020 loop {
3021 if *shutdown_rx.borrow() {
3022 break;
3023 }
3024 if shutdown_rx.changed().await.is_err() {
3025 break;
3026 }
3027 }
3028 } => {}
3029 _ = tokio::signal::ctrl_c() => {}
3030 }
3031 })
3032 .await
3033 .map_err(anyhow::Error::from)
3034 })
3035 };
3036
3037 let mut stdio_task = tokio::spawn(async move { run_stdio(manager).await });
3038
3039 let result = tokio::select! {
3040 res = &mut stdio_task => match res {
3041 Ok(inner) => inner,
3042 Err(join_err) => Err(anyhow!("stdio task failed: {}", join_err)),
3043 },
3044 res = &mut http_task => match res {
3045 Ok(inner) => inner,
3046 Err(join_err) => Err(anyhow!("http task failed: {}", join_err)),
3047 },
3048 };
3049
3050 let _ = shutdown_tx.send(true);
3051 let _ = tokio::time::timeout(Duration::from_secs(2), &mut http_task).await;
3052 let _ = tokio::time::timeout(Duration::from_secs(2), &mut stdio_task).await;
3053
3054 result
3055}
3056
3057fn normalize_http_path(path: &str) -> String {
3058 if path.starts_with('/') {
3059 path.to_string()
3060 } else {
3061 format!("/{}", path)
3062 }
3063}
3064
3065#[cfg(test)]
3066mod tests {
3067 use super::*;
3068
3069 #[tokio::test]
3070 async fn parse_derived_accounts_rows() {
3071 let csv = "kind,name,key_type,derivation,path,account_hash,balance\nvalidator,node-1,secp256k1,bip32,m/44'/506'/0'/0/0,account-hash-a,100\nuser,user-1,secp256k1,bip32,m/44'/506'/0'/0/100,account-hash-b,200";
3072 let rows = parse_derived_accounts_csv(csv, None).await.unwrap();
3073 assert_eq!(rows.len(), 2);
3074 assert_eq!(rows[0].path, "m/44'/506'/0'/0/0");
3075 assert_eq!(rows[1].account_hash, "account-hash-b");
3076 }
3077
3078 #[test]
3079 fn sse_filter_include_exclude() {
3080 let filter = SseFilter {
3081 include_event_types: vec!["BlockAdded".to_string()],
3082 exclude_event_types: vec!["TransactionAccepted".to_string()],
3083 };
3084
3085 let block = SseRecord {
3086 sequence: 1,
3087 timestamp_rfc3339: "t".to_string(),
3088 node_id: 1,
3089 event_type: "BlockAdded".to_string(),
3090 payload: json!({}),
3091 };
3092 let tx = SseRecord {
3093 sequence: 2,
3094 timestamp_rfc3339: "t".to_string(),
3095 node_id: 1,
3096 event_type: "TransactionAccepted".to_string(),
3097 payload: json!({}),
3098 };
3099
3100 assert!(filter.matches(&block));
3101 assert!(!filter.matches(&tx));
3102 }
3103
3104 #[tokio::test]
3105 async fn sse_history_paginates_by_sequence() {
3106 let store = SseStore::new(100);
3107 for sequence in 1..=5 {
3108 let mut guard = store.events.lock().await;
3109 guard.push_back(SseRecord {
3110 sequence,
3111 timestamp_rfc3339: "t".to_string(),
3112 node_id: 1,
3113 event_type: "BlockAdded".to_string(),
3114 payload: json!({ "sequence": sequence }),
3115 });
3116 drop(guard);
3117 }
3118 let filter = SseFilter::default();
3119 let page = store.history(Some(6), 2, &filter).await;
3120 assert_eq!(page.returned, 2);
3121 assert_eq!(page.events[0].sequence, 4);
3122 assert_eq!(page.events[1].sequence, 5);
3123 assert_eq!(page.next_before_sequence, Some(4));
3124 }
3125
3126 #[tokio::test]
3127 async fn read_log_page_uses_before_cursor() {
3128 let temp = tempfile::NamedTempFile::new().unwrap();
3129 tokio_fs::write(temp.path(), "a\nb\nc\nd\n").await.unwrap();
3130
3131 let page = read_log_page(temp.path(), Some(5), 2).await.unwrap();
3132 assert_eq!(page.lines.len(), 2);
3133 assert_eq!(page.lines[0].line_number, 3);
3134 assert_eq!(page.lines[1].line_number, 4);
3135 assert_eq!(page.next_before_line, Some(3));
3136 }
3137
3138 #[tokio::test]
3139 async fn sidecar_preflight_fails_when_missing() {
3140 let root = tempfile::tempdir().unwrap();
3141 let layout = AssetsLayout::new(root.path().to_path_buf(), "test-net".to_string());
3142 let node_bin = layout.node_bin_dir(1).join("2_0_0");
3143 let node_cfg = layout.node_config_root(1).join("2_0_0");
3144 tokio_fs::create_dir_all(&node_bin).await.unwrap();
3145 tokio_fs::create_dir_all(&node_cfg).await.unwrap();
3146 tokio_fs::write(node_bin.join("casper-node"), "bin")
3147 .await
3148 .unwrap();
3149 tokio_fs::write(node_cfg.join("sidecar.toml"), "cfg")
3150 .await
3151 .unwrap();
3152
3153 let result = ensure_sidecar_available(&layout, 1).await;
3154 assert!(result.is_err());
3155 }
3156
3157 #[test]
3158 fn normalize_optional_identifier_trims_and_defaults() {
3159 assert_eq!(normalize_optional_identifier(None), "");
3160 assert_eq!(normalize_optional_identifier(Some(" ")), "");
3161 assert_eq!(normalize_optional_identifier(Some(" 123 ")), "123");
3162 }
3163
3164 #[test]
3165 fn parse_state_identifier_variants() {
3166 assert_eq!(
3167 parse_state_identifier("42", "").unwrap(),
3168 Some(json!({ "BlockHeight": 42u64 }))
3169 );
3170 assert_eq!(
3171 parse_state_identifier(
3172 "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234",
3173 ""
3174 )
3175 .unwrap(),
3176 Some(json!({
3177 "BlockHash": "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234"
3178 }))
3179 );
3180 assert_eq!(parse_state_identifier("", "").unwrap(), None,);
3181 }
3182
3183 #[test]
3184 fn parse_contract_hash_for_invocation_accepts_common_formats() {
3185 let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3186 let contract = parse_contract_hash_for_invocation(&format!("contract-{}", hash)).unwrap();
3187 let key_hash = parse_contract_hash_for_invocation(&format!("hash-{}", hash)).unwrap();
3188 let raw = parse_contract_hash_for_invocation(hash).unwrap();
3189 assert_eq!(contract.to_hex_string(), hash);
3190 assert_eq!(key_hash.to_hex_string(), hash);
3191 assert_eq!(raw.to_hex_string(), hash);
3192 }
3193
3194 #[test]
3195 fn parse_query_key_accepts_contract_hash_format() {
3196 let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3197 let key = parse_query_key(&format!("contract-{}", hash)).unwrap();
3198 assert_eq!(key, format!("hash-{}", hash));
3199 }
3200
3201 #[test]
3202 fn parse_transaction_json_rejects_escaped_string_payload() {
3203 let tx_json = json!({
3204 "Version1": {
3205 "hash": "7eeb092361e31b4cc9885e3621f1470f29631338ecc703643c22da1d38fd81a9",
3206 "payload": {
3207 "initiator_addr": {
3208 "PublicKey": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f"
3209 },
3210 "timestamp": "2026-02-27T18:03:18.541Z",
3211 "ttl": "30m",
3212 "chain_name": "casper-devnet",
3213 "pricing_mode": {
3214 "PaymentLimited": {
3215 "payment_amount": 100000000000u64,
3216 "gas_price_tolerance": 5,
3217 "standard_payment": true
3218 }
3219 },
3220 "fields": {
3221 "args": {"Named": []},
3222 "entry_point": {"Custom": "counter_inc"},
3223 "scheduling": "Standard",
3224 "target": {
3225 "Stored": {
3226 "id": {
3227 "ByPackageName": {
3228 "name": "counter_package_name",
3229 "version": null
3230 }
3231 },
3232 "runtime": "VmCasperV1"
3233 }
3234 }
3235 }
3236 },
3237 "approvals": [{
3238 "signer": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f",
3239 "signature": "02c64336e5ed2832bdb84adb3f334d585548ee096066aa9d0797c11ab3f074ec9d7bd396994bc9b9c239342be801bc385a9c5083779bace4dfe0b400d4a13c07db"
3240 }]
3241 }
3242 });
3243 let direct = parse_transaction_json(tx_json.clone()).unwrap();
3244 let wrapped = parse_transaction_json(json!({ "transaction": tx_json })).unwrap();
3245 let encoded = serde_json::to_string(&direct).unwrap();
3246 let from_string_err = parse_transaction_json(Value::String(encoded.clone()));
3247 let wrapped_string_err = parse_transaction_json(json!({
3248 "transaction": encoded
3249 }));
3250
3251 assert_eq!(
3252 direct.hash().to_hex_string(),
3253 wrapped.hash().to_hex_string()
3254 );
3255 assert!(from_string_err.is_err());
3256 assert!(wrapped_string_err.is_err());
3257 }
3258
3259 #[test]
3260 fn parse_no_such_block_range_from_error_extracts_bounds() {
3261 let error = "casper client error: response for rpc-id 1 chain_get_block is json-rpc error: {\"code\":-32001,\"message\":\"No such block\",\"data\":{\"message\":\"no block found for the provided identifier\",\"available_block_range\":{\"low\":0,\"high\":0}}}";
3262 let range = parse_no_such_block_range_from_error(error).unwrap();
3263 assert_eq!(range, (0, 0));
3264 }
3265
3266 #[test]
3267 fn send_transaction_signed_request_requires_transaction_field() {
3268 let payload = json!({
3269 "network_name": "casper-devnet",
3270 "node_id": 1,
3271 "signer_path": "m/44'/506'/0'/0/100",
3272 "transaction": { "Version1": { "hash": "abc" } }
3273 });
3274 let request: SendTransactionSignedRequest = serde_json::from_value(payload).unwrap();
3275 assert!(request.transaction.get("Version1").is_some());
3276
3277 let legacy_payload = json!({
3278 "network_name": "casper-devnet",
3279 "node_id": 1,
3280 "signer_path": "m/44'/506'/0'/0/100",
3281 "transaction_json": { "Version1": { "hash": "def" } }
3282 });
3283 let legacy_err = serde_json::from_value::<SendTransactionSignedRequest>(legacy_payload)
3284 .unwrap_err()
3285 .to_string();
3286 assert!(legacy_err.contains("missing field `transaction`"));
3287 }
3288}