1use self::args_parser::parse_session_args;
2use crate::assets::{
3 self, AddNodesOptions, AssetSelector, AssetsLayout, SetupOptions, StageProtocolOptions,
4};
5use crate::control::{ControlRequest, ControlResponse, ControlResult, send_request};
6use crate::process::{self, StartPlan};
7use crate::state::{
8 ProcessKind, ProcessStatus, STATE_FILE_NAME, State, spawn_pid_sync_tasks,
9 spawn_pid_sync_tasks_for_ids,
10};
11use anyhow::{Context, Result, anyhow};
12use backoff::ExponentialBackoff;
13use backoff::backoff::Backoff;
14use casper_types::contracts::ContractHash;
15use casper_types::{
16 AddressableEntityHash, AsymmetricType, Digest, EntityVersion, Key, PricingMode, PublicKey,
17 SecretKey, TimeDiff, Transaction, TransactionHash, TransactionRuntimeParams, TransactionV1Hash,
18 URef,
19};
20use clap::{Args, ValueEnum};
21use futures::StreamExt;
22use nix::errno::Errno;
23use nix::sys::signal::kill;
24use nix::unistd::Pid;
25use rmcp::handler::server::{router::tool::ToolRouter, wrapper::Parameters};
26use rmcp::model::{CallToolResult, ServerCapabilities, ServerInfo};
27use rmcp::service::ServiceExt;
28use rmcp::transport::{
29 StreamableHttpServerConfig,
30 streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService},
31};
32use rmcp::{
33 ErrorData, ServerHandler, tool, tool_handler, tool_router, transport::stdio as mcp_stdio,
34};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::{HashMap, VecDeque};
38use std::path::{Path, PathBuf};
39use std::str::FromStr;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
42use std::time::{Duration, Instant};
43use tokio::fs as tokio_fs;
44use tokio::io::{AsyncReadExt, AsyncWriteExt};
45use tokio::net::UnixListener;
46use tokio::sync::{Mutex, Notify};
47use veles_casper_rust_sdk::TransactionV1Builder;
48use veles_casper_rust_sdk::jsonrpc::CasperClient;
49use veles_casper_rust_sdk::sse::event::SseEvent;
50use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
51
52const DEFAULT_HTTP_BIND: &str = "127.0.0.1:32100";
53const DEFAULT_HTTP_PATH: &str = "/mcp";
54const DEFAULT_NETWORK_NAME: &str = "casper-dev";
55const DEFAULT_NODE_COUNT: u32 = 4;
56const DEFAULT_DELAY_SECS: u64 = 3;
57const DEFAULT_LOG_LEVEL: &str = "info";
58const DEFAULT_NODE_LOG_FORMAT: &str = "json";
59const DEFAULT_SEED: &str = "default";
60const DEFAULT_TIMEOUT_SECS: u64 = 60;
61const DEFAULT_LOG_PAGE_SIZE: usize = 200;
62const DEFAULT_SSE_PAGE_SIZE: usize = 200;
63const DEFAULT_SSE_HISTORY_CAPACITY: usize = 20_000;
64const DEFAULT_PAYMENT_AMOUNT: u64 = 100_000_000_000;
65
66mod args_parser;
67
68#[derive(Debug, Clone, Copy, ValueEnum)]
69#[value(rename_all = "kebab-case")]
70pub enum McpTransport {
71 Stdio,
72 Http,
73 Both,
74}
75
76#[derive(Debug, Args, Clone)]
77pub struct McpArgs {
78 #[arg(long, value_enum, default_value_t = McpTransport::Both)]
79 transport: McpTransport,
80
81 #[arg(long, default_value = DEFAULT_HTTP_BIND)]
82 http_bind: String,
83
84 #[arg(long, default_value = DEFAULT_HTTP_PATH)]
85 http_path: String,
86
87 #[arg(long, value_name = "PATH")]
88 net_path: Option<PathBuf>,
89}
90
91#[derive(Debug, Clone)]
92struct McpServer {
93 manager: Arc<NetworkManager>,
94 tool_router: ToolRouter<Self>,
95}
96
97impl McpServer {
98 fn new(manager: Arc<NetworkManager>) -> Self {
99 Self {
100 manager,
101 tool_router: Self::tool_router(),
102 }
103 }
104
105 fn server_info() -> ServerInfo {
106 ServerInfo {
107 instructions: Some(
108 "Control multiple local Casper devnets. Start with spawn_network, then wait_network_ready before RPC or transaction tools. Do not use external casper-client binaries or curl; use MCP tools directly (for example get_transaction, wait_transaction, make_transaction_package_call, make_transaction_contract_call, make_transaction_session_wasm, send_transaction_signed).".to_string(),
109 ),
110 capabilities: ServerCapabilities::builder().enable_tools().build(),
111 ..Default::default()
112 }
113 }
114}
115
116#[tool_router]
117impl McpServer {
118 #[tool(
119 description = "Spawn or resume a managed network. Defaults to force_setup=true for fresh devnet startup."
120 )]
121 async fn spawn_network(
122 &self,
123 Parameters(request): Parameters<SpawnNetworkRequest>,
124 ) -> std::result::Result<CallToolResult, ErrorData> {
125 let result = self
126 .manager
127 .spawn_network(request)
128 .await
129 .map_err(to_mcp_error)?;
130 Ok(ok_value(
131 serde_json::to_value(result).map_err(internal_serde_error)?,
132 ))
133 }
134
135 #[tool(
136 description = "Wait for a managed network to be ready: processes running, /status healthy, reactor Validate, and at least one block observed."
137 )]
138 async fn wait_network_ready(
139 &self,
140 Parameters(request): Parameters<WaitNetworkReadyRequest>,
141 ) -> std::result::Result<CallToolResult, ErrorData> {
142 let result = self
143 .manager
144 .wait_network_ready(&request.network_name, request.timeout_seconds)
145 .await
146 .map_err(to_mcp_error)?;
147 Ok(ok_value(
148 serde_json::to_value(result).map_err(internal_serde_error)?,
149 ))
150 }
151
152 #[tool(
153 description = "Stage a protocol upgrade from a named custom asset. When the network is running, sidecars are restarted after staging."
154 )]
155 async fn stage_protocol(
156 &self,
157 Parameters(request): Parameters<StageProtocolRequest>,
158 ) -> std::result::Result<CallToolResult, ErrorData> {
159 let result = self
160 .manager
161 .stage_protocol(
162 &request.network_name,
163 request.asset.as_deref(),
164 request.custom_asset.as_deref(),
165 request.asset_name.as_deref(),
166 &request.protocol_version,
167 request.activation_point,
168 )
169 .await
170 .map_err(to_mcp_error)?;
171 Ok(ok_value(
172 serde_json::to_value(result).map_err(internal_serde_error)?,
173 ))
174 }
175
176 #[tool(
177 description = "Despawn a managed network. By default it stops processes and keeps files; set purge=true to remove files."
178 )]
179 async fn despawn_network(
180 &self,
181 Parameters(request): Parameters<DespawnNetworkRequest>,
182 ) -> std::result::Result<CallToolResult, ErrorData> {
183 let result = self
184 .manager
185 .despawn_network(&request.network_name, request.purge.unwrap_or(false))
186 .await
187 .map_err(to_mcp_error)?;
188 Ok(ok_value(
189 serde_json::to_value(result).map_err(internal_serde_error)?,
190 ))
191 }
192
193 #[tool(description = "List discovered network names and managed/running state.")]
194 async fn list_networks(
195 &self,
196 _: Parameters<ListNetworksRequest>,
197 ) -> std::result::Result<CallToolResult, ErrorData> {
198 let result = self.manager.list_networks().await.map_err(to_mcp_error)?;
199 Ok(ok_value(
200 serde_json::to_value(result).map_err(internal_serde_error)?,
201 ))
202 }
203
204 #[tool(
205 description = "List managed network processes, optionally filtered by process name. Defaults to running_only=true."
206 )]
207 async fn managed_processes(
208 &self,
209 Parameters(request): Parameters<ManagedProcessesRequest>,
210 ) -> std::result::Result<CallToolResult, ErrorData> {
211 let result = self
212 .manager
213 .managed_processes(request)
214 .await
215 .map_err(to_mcp_error)?;
216 Ok(ok_value(
217 serde_json::to_value(result).map_err(internal_serde_error)?,
218 ))
219 }
220
221 #[tool(description = "Call node REST /status for a managed network node.")]
222 async fn status(
223 &self,
224 Parameters(request): Parameters<NodeScopedRequest>,
225 ) -> std::result::Result<CallToolResult, ErrorData> {
226 let network = self
227 .manager
228 .get_network(&request.network_name)
229 .await
230 .map_err(to_mcp_error)?;
231 ensure_running_network(&network).await?;
232 let status = fetch_rest_status(request.node_id)
233 .await
234 .map_err(to_mcp_error)?;
235 Ok(ok_value(status))
236 }
237
238 #[tool(
239 description = "Run a raw JSON-RPC call against node sidecar endpoint. Useful as a generic query helper."
240 )]
241 async fn rpc_query(
242 &self,
243 Parameters(request): Parameters<RpcQueryRequest>,
244 ) -> std::result::Result<CallToolResult, ErrorData> {
245 let network = self
246 .manager
247 .get_network(&request.network_name)
248 .await
249 .map_err(to_mcp_error)?;
250 ensure_running_network(&network).await?;
251 let value = self
252 .manager
253 .raw_rpc_query(request.node_id, &request.method, request.params)
254 .await
255 .map_err(to_mcp_error)?;
256 Ok(ok_value(value))
257 }
258
259 #[tool(description = "Typed balance query by account/public key/uref/entity identifier.")]
260 async fn rpc_query_balance(
261 &self,
262 Parameters(request): Parameters<RpcQueryBalanceRequest>,
263 ) -> std::result::Result<CallToolResult, ErrorData> {
264 let network = self
265 .manager
266 .get_network(&request.network_name)
267 .await
268 .map_err(to_mcp_error)?;
269 ensure_running_network(&network).await?;
270
271 let block_id = normalize_optional_identifier(request.block_id.as_deref());
272 let state_root_hash = normalize_optional_identifier(request.state_root_hash.as_deref());
273 let params =
274 build_query_balance_params(&request.purse_identifier, &block_id, &state_root_hash)
275 .map_err(to_mcp_error)?;
276 let response = self
277 .manager
278 .raw_rpc_query(request.node_id, "query_balance", Some(params))
279 .await
280 .map_err(to_mcp_error)?;
281 Ok(ok_value(
282 extract_rpc_result(response).map_err(to_mcp_error)?,
283 ))
284 }
285
286 #[tool(
287 description = "Typed global state query by key + optional path + optional block/state-root identifier. If no identifier is provided, the latest block hash is used."
288 )]
289 async fn rpc_query_global_state(
290 &self,
291 Parameters(request): Parameters<RpcQueryGlobalStateRequest>,
292 ) -> std::result::Result<CallToolResult, ErrorData> {
293 let network = self
294 .manager
295 .get_network(&request.network_name)
296 .await
297 .map_err(to_mcp_error)?;
298 ensure_running_network(&network).await?;
299
300 let (block_id, state_root_hash) = resolve_global_state_identifier(
301 &request.network_name,
302 request.node_id,
303 request.block_id.as_deref(),
304 request.state_root_hash.as_deref(),
305 )
306 .await
307 .map_err(to_mcp_error)?;
308 let params = build_query_global_state_params(
309 &request.key,
310 request.path.unwrap_or_default(),
311 &block_id,
312 &state_root_hash,
313 )
314 .map_err(to_mcp_error)?;
315 let response = self
316 .manager
317 .raw_rpc_query(request.node_id, "query_global_state", Some(params))
318 .await
319 .map_err(to_mcp_error)?;
320
321 Ok(ok_value(
322 extract_rpc_result(response).map_err(to_mcp_error)?,
323 ))
324 }
325
326 #[tool(description = "Get current block height using typed chain_get_block RPC.")]
327 async fn current_block_height(
328 &self,
329 Parameters(request): Parameters<CurrentBlockRequest>,
330 ) -> std::result::Result<CallToolResult, ErrorData> {
331 let network = self
332 .manager
333 .get_network(&request.network_name)
334 .await
335 .map_err(to_mcp_error)?;
336 ensure_running_network(&network).await?;
337
338 let value = fetch_block_result(
339 &self.manager,
340 &request.network_name,
341 request.node_id,
342 request.block_id.as_deref(),
343 )
344 .await;
345 let value = match value {
346 Ok(value) => value,
347 Err(err) => {
348 if request.block_id.is_none()
349 && let Some((low, high)) =
350 parse_no_such_block_range_from_error(&err.to_string())
351 {
352 return Ok(ok_value(json!({
353 "network_name": request.network_name,
354 "node_id": request.node_id,
355 "height": high,
356 "block": Value::Null,
357 "pending": true,
358 "available_block_range": {
359 "low": low,
360 "high": high,
361 },
362 "message": "latest block is not yet queryable; retry shortly",
363 })));
364 }
365 return Err(to_mcp_error(err));
366 }
367 };
368 let height = extract_block_height(&value).ok_or_else(|| {
369 ErrorData::internal_error("missing block height in RPC response", None)
370 })?;
371
372 Ok(ok_value(json!({
373 "network_name": request.network_name,
374 "node_id": request.node_id,
375 "height": height,
376 "block": value,
377 })))
378 }
379
380 #[tool(description = "Get current block payload using typed chain_get_block RPC.")]
381 async fn current_block(
382 &self,
383 Parameters(request): Parameters<CurrentBlockRequest>,
384 ) -> std::result::Result<CallToolResult, ErrorData> {
385 let network = self
386 .manager
387 .get_network(&request.network_name)
388 .await
389 .map_err(to_mcp_error)?;
390 ensure_running_network(&network).await?;
391
392 let value = fetch_block_result(
393 &self.manager,
394 &request.network_name,
395 request.node_id,
396 request.block_id.as_deref(),
397 )
398 .await
399 .map_err(to_mcp_error)?;
400 Ok(ok_value(value))
401 }
402
403 #[tool(description = "Get paginated node stdout/stderr logs from on-disk files.")]
404 async fn get_node_logs(
405 &self,
406 Parameters(request): Parameters<GetNodeLogsRequest>,
407 ) -> std::result::Result<CallToolResult, ErrorData> {
408 let network = self
409 .manager
410 .get_network(&request.network_name)
411 .await
412 .map_err(to_mcp_error)?;
413
414 let limit = request.limit.unwrap_or(DEFAULT_LOG_PAGE_SIZE);
415 if limit == 0 {
416 return Err(ErrorData::invalid_params(
417 "limit must be greater than 0",
418 None,
419 ));
420 }
421
422 let file_name = match request.stream {
423 NodeLogStream::Stdout => "stdout.log",
424 NodeLogStream::Stderr => "stderr.log",
425 };
426 let path = network
427 .layout
428 .node_logs_dir(request.node_id)
429 .join(file_name);
430
431 let page = read_log_page(&path, request.before_line, limit)
432 .await
433 .map_err(to_mcp_error)?;
434 Ok(ok_value(
435 serde_json::to_value(page).map_err(internal_serde_error)?,
436 ))
437 }
438
439 #[tool(
440 description = "Wait for the next SSE event after a sequence cursor with optional event type filters."
441 )]
442 async fn wait_next_sse(
443 &self,
444 Parameters(request): Parameters<WaitNextSseRequest>,
445 ) -> std::result::Result<CallToolResult, ErrorData> {
446 let network = self
447 .manager
448 .get_network(&request.network_name)
449 .await
450 .map_err(to_mcp_error)?;
451 ensure_running_network(&network).await?;
452
453 let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
454 let filter = SseFilter {
455 include_event_types: request.include_event_types.unwrap_or_default(),
456 exclude_event_types: request.exclude_event_types.unwrap_or_default(),
457 };
458
459 let after = match request.after_sequence {
460 Some(value) => value,
461 None => network.sse_store.latest_sequence().await,
462 };
463
464 let event = network
465 .sse_store
466 .wait_next(after, &filter, timeout)
467 .await
468 .map_err(to_mcp_error)?;
469
470 Ok(ok_value(
471 serde_json::to_value(event).map_err(internal_serde_error)?,
472 ))
473 }
474
475 #[tool(
476 description = "Fetch paginated SSE history with sequence cursor and optional event type filters."
477 )]
478 async fn sse_history(
479 &self,
480 Parameters(request): Parameters<SseHistoryRequest>,
481 ) -> std::result::Result<CallToolResult, ErrorData> {
482 let network = self
483 .manager
484 .get_network(&request.network_name)
485 .await
486 .map_err(to_mcp_error)?;
487
488 let limit = request.limit.unwrap_or(DEFAULT_SSE_PAGE_SIZE);
489 if limit == 0 {
490 return Err(ErrorData::invalid_params(
491 "limit must be greater than 0",
492 None,
493 ));
494 }
495
496 let filter = SseFilter {
497 include_event_types: request.include_event_types.unwrap_or_default(),
498 exclude_event_types: request.exclude_event_types.unwrap_or_default(),
499 };
500
501 let history = network
502 .sse_store
503 .history(request.before_sequence, limit, &filter)
504 .await;
505 Ok(ok_value(
506 serde_json::to_value(history).map_err(internal_serde_error)?,
507 ))
508 }
509
510 #[tool(description = "List derived accounts from derived-accounts.csv for a network.")]
511 async fn list_derived_accounts(
512 &self,
513 Parameters(request): Parameters<ListDerivedAccountsRequest>,
514 ) -> std::result::Result<CallToolResult, ErrorData> {
515 let accounts = self
516 .manager
517 .list_derived_accounts(&request.network_name)
518 .await
519 .map_err(to_mcp_error)?;
520 Ok(ok_value(
521 serde_json::to_value(accounts).map_err(internal_serde_error)?,
522 ))
523 }
524
525 #[tool(
526 description = "Submit signed or unsigned transaction JSON. Signer key is derived from signer_path for this managed network. transaction must be a typed Transaction JSON object."
527 )]
528 async fn send_transaction_signed(
529 &self,
530 Parameters(request): Parameters<SendTransactionSignedRequest>,
531 ) -> std::result::Result<CallToolResult, ErrorData> {
532 let network = self
533 .manager
534 .get_network(&request.network_name)
535 .await
536 .map_err(to_mcp_error)?;
537 ensure_running_network(&network).await?;
538
539 let signer = self
540 .manager
541 .derived_account_for_path(&network, &request.signer_path)
542 .await
543 .map_err(to_mcp_error)?;
544 verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
545 .await
546 .map_err(to_mcp_error)?;
547
548 let mut transaction = parse_transaction_json(request.transaction)?;
549 transaction.sign(&signer.secret_key);
550
551 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
552 let response = rpc
553 .put_transaction(transaction)
554 .await
555 .map_err(to_mcp_error)?;
556
557 Ok(ok_value(json!({
558 "network_name": request.network_name,
559 "node_id": request.node_id,
560 "transaction_hash": response.transaction_hash.to_hex_string(),
561 })))
562 }
563
564 #[tool(
565 description = "Create a stored package-name call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
566 )]
567 async fn make_transaction_package_call(
568 &self,
569 Parameters(request): Parameters<MakeTransactionPackageCallRequest>,
570 ) -> std::result::Result<CallToolResult, ErrorData> {
571 let network = self
572 .manager
573 .get_network(&request.network_name)
574 .await
575 .map_err(to_mcp_error)?;
576 ensure_running_network(&network).await?;
577
578 let runtime = match (
579 request.runtime_transferred_value,
580 request.runtime_seed_hex.as_deref(),
581 ) {
582 (None, None) => TransactionRuntimeParams::VmCasperV1,
583 (transferred_value, maybe_seed_hex) => {
584 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
585 TransactionRuntimeParams::VmCasperV2 {
586 transferred_value: transferred_value.unwrap_or(0),
587 seed,
588 }
589 }
590 };
591
592 let mut builder = TransactionV1Builder::new_targeting_package_via_alias(
593 request.transaction_package_name.clone(),
594 request.transaction_package_version,
595 request.session_entry_point.clone(),
596 runtime,
597 );
598
599 if let Some(args_json) = request.session_args.as_ref()
600 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
601 {
602 builder = builder.with_runtime_args(runtime_args);
603 }
604
605 if let Some(ttl_millis) = request.ttl_millis {
606 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
607 }
608
609 let pricing = build_pricing_mode(
610 request.gas_price_tolerance,
611 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
612 );
613 builder = builder.with_pricing_mode(pricing);
614
615 let chain_name = match request.chain_name {
616 Some(value) => value,
617 None => fetch_chain_name(&request.network_name, request.node_id)
618 .await
619 .map_err(to_mcp_error)?,
620 };
621 builder = builder.with_chain_name(chain_name.clone());
622
623 let mut signed = false;
624 let mut signer_path = None;
625 let mut derived_signer = None;
626 if let Some(path) = request.signer_path {
627 let signer = self
628 .manager
629 .derived_account_for_path(&network, &path)
630 .await
631 .map_err(to_mcp_error)?;
632 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
633 .await
634 .map_err(to_mcp_error)?;
635 derived_signer = Some(signer);
636 signed = true;
637 signer_path = Some(path);
638 } else if let Some(initiator_public_key) = request.initiator_public_key {
639 let public_key = PublicKey::from_hex(initiator_public_key.trim())
640 .with_context(|| "failed to parse initiator_public_key as hex public key")
641 .map_err(to_mcp_error)?;
642 builder = builder.with_initiator_addr(public_key);
643 } else {
644 return Err(ErrorData::invalid_params(
645 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
646 None,
647 ));
648 }
649
650 if let Some(signer) = derived_signer.as_ref() {
651 builder = builder.with_secret_key(&signer.secret_key);
652 }
653
654 let tx = builder.build().map_err(to_mcp_error)?;
655 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
656
657 Ok(ok_value(json!({
658 "network_name": request.network_name,
659 "node_id": request.node_id,
660 "chain_name": chain_name,
661 "signed": signed,
662 "signer_path": signer_path,
663 "transaction": tx_json,
664 })))
665 }
666
667 #[tool(
668 description = "Create a stored contract-hash call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
669 )]
670 async fn make_transaction_contract_call(
671 &self,
672 Parameters(request): Parameters<MakeTransactionContractCallRequest>,
673 ) -> std::result::Result<CallToolResult, ErrorData> {
674 let network = self
675 .manager
676 .get_network(&request.network_name)
677 .await
678 .map_err(to_mcp_error)?;
679 ensure_running_network(&network).await?;
680
681 let runtime = match (
682 request.runtime_transferred_value,
683 request.runtime_seed_hex.as_deref(),
684 ) {
685 (None, None) => TransactionRuntimeParams::VmCasperV1,
686 (transferred_value, maybe_seed_hex) => {
687 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
688 TransactionRuntimeParams::VmCasperV2 {
689 transferred_value: transferred_value.unwrap_or(0),
690 seed,
691 }
692 }
693 };
694
695 let contract_hash = parse_contract_hash_for_invocation(&request.transaction_contract_hash)
696 .map_err(to_mcp_error)?;
697 let mut builder = TransactionV1Builder::new_targeting_invocable_entity(
698 contract_hash,
699 request.session_entry_point.clone(),
700 runtime,
701 );
702
703 if let Some(args_json) = request.session_args.as_ref()
704 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
705 {
706 builder = builder.with_runtime_args(runtime_args);
707 }
708
709 if let Some(ttl_millis) = request.ttl_millis {
710 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
711 }
712
713 let pricing = build_pricing_mode(
714 request.gas_price_tolerance,
715 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
716 );
717 builder = builder.with_pricing_mode(pricing);
718
719 let chain_name = match request.chain_name {
720 Some(value) => value,
721 None => fetch_chain_name(&request.network_name, request.node_id)
722 .await
723 .map_err(to_mcp_error)?,
724 };
725 builder = builder.with_chain_name(chain_name.clone());
726
727 let mut signed = false;
728 let mut signer_path = None;
729 let mut derived_signer = None;
730 if let Some(path) = request.signer_path {
731 let signer = self
732 .manager
733 .derived_account_for_path(&network, &path)
734 .await
735 .map_err(to_mcp_error)?;
736 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
737 .await
738 .map_err(to_mcp_error)?;
739 derived_signer = Some(signer);
740 signed = true;
741 signer_path = Some(path);
742 } else if let Some(initiator_public_key) = request.initiator_public_key {
743 let public_key = PublicKey::from_hex(initiator_public_key.trim())
744 .with_context(|| "failed to parse initiator_public_key as hex public key")
745 .map_err(to_mcp_error)?;
746 builder = builder.with_initiator_addr(public_key);
747 } else {
748 return Err(ErrorData::invalid_params(
749 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
750 None,
751 ));
752 }
753
754 if let Some(signer) = derived_signer.as_ref() {
755 builder = builder.with_secret_key(&signer.secret_key);
756 }
757
758 let tx = builder.build().map_err(to_mcp_error)?;
759 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
760
761 Ok(ok_value(json!({
762 "network_name": request.network_name,
763 "node_id": request.node_id,
764 "chain_name": chain_name,
765 "transaction_contract_hash": request.transaction_contract_hash,
766 "signed": signed,
767 "signer_path": signer_path,
768 "transaction": tx_json,
769 })))
770 }
771
772 #[tool(
773 description = "Create a session wasm transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
774 )]
775 async fn make_transaction_session_wasm(
776 &self,
777 Parameters(request): Parameters<MakeTransactionSessionWasmRequest>,
778 ) -> std::result::Result<CallToolResult, ErrorData> {
779 let network = self
780 .manager
781 .get_network(&request.network_name)
782 .await
783 .map_err(to_mcp_error)?;
784 ensure_running_network(&network).await?;
785
786 let wasm_path = PathBuf::from(&request.wasm_path);
787 let wasm_bytes = tokio_fs::read(&wasm_path)
788 .await
789 .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
790 .map_err(to_mcp_error)?;
791
792 let runtime = match (
793 request.runtime_transferred_value,
794 request.runtime_seed_hex.as_deref(),
795 ) {
796 (None, None) => TransactionRuntimeParams::VmCasperV1,
797 (transferred_value, maybe_seed_hex) => {
798 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
799 TransactionRuntimeParams::VmCasperV2 {
800 transferred_value: transferred_value.unwrap_or(0),
801 seed,
802 }
803 }
804 };
805
806 let mut builder = TransactionV1Builder::new_session(
807 request.is_install_upgrade.unwrap_or(true),
808 wasm_bytes.into(),
809 runtime,
810 );
811
812 if let Some(args_json) = request.session_args.as_ref()
813 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
814 {
815 builder = builder.with_runtime_args(runtime_args);
816 }
817
818 if let Some(ttl_millis) = request.ttl_millis {
819 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
820 }
821
822 let pricing = build_pricing_mode(
823 request.gas_price_tolerance,
824 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
825 );
826 builder = builder.with_pricing_mode(pricing);
827
828 let chain_name = match request.chain_name {
829 Some(value) => value,
830 None => fetch_chain_name(&request.network_name, request.node_id)
831 .await
832 .map_err(to_mcp_error)?,
833 };
834 builder = builder.with_chain_name(chain_name.clone());
835
836 let mut signed = false;
837 let mut signer_path = None;
838 let mut derived_signer = None;
839 if let Some(path) = request.signer_path {
840 let signer = self
841 .manager
842 .derived_account_for_path(&network, &path)
843 .await
844 .map_err(to_mcp_error)?;
845 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
846 .await
847 .map_err(to_mcp_error)?;
848 derived_signer = Some(signer);
849 signed = true;
850 signer_path = Some(path);
851 } else if let Some(initiator_public_key) = request.initiator_public_key {
852 let public_key = PublicKey::from_hex(initiator_public_key.trim())
853 .with_context(|| "failed to parse initiator_public_key as hex public key")
854 .map_err(to_mcp_error)?;
855 builder = builder.with_initiator_addr(public_key);
856 } else {
857 return Err(ErrorData::invalid_params(
858 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
859 None,
860 ));
861 }
862
863 if let Some(signer) = derived_signer.as_ref() {
864 builder = builder.with_secret_key(&signer.secret_key);
865 }
866
867 let tx = builder.build().map_err(to_mcp_error)?;
868 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
869
870 Ok(ok_value(json!({
871 "network_name": request.network_name,
872 "node_id": request.node_id,
873 "chain_name": chain_name,
874 "wasm_path": request.wasm_path,
875 "signed": signed,
876 "signer_path": signer_path,
877 "transaction": tx_json,
878 })))
879 }
880
881 #[tool(
882 description = "Build, sign and submit a session wasm transaction from a derived account path. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
883 )]
884 async fn send_session_wasm(
885 &self,
886 Parameters(request): Parameters<SendSessionWasmRequest>,
887 ) -> std::result::Result<CallToolResult, ErrorData> {
888 let network = self
889 .manager
890 .get_network(&request.network_name)
891 .await
892 .map_err(to_mcp_error)?;
893 ensure_running_network(&network).await?;
894
895 let signer = self
896 .manager
897 .derived_account_for_path(&network, &request.signer_path)
898 .await
899 .map_err(to_mcp_error)?;
900 verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
901 .await
902 .map_err(to_mcp_error)?;
903
904 let wasm_path = PathBuf::from(&request.wasm_path);
905 let wasm_bytes = tokio_fs::read(&wasm_path)
906 .await
907 .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
908 .map_err(to_mcp_error)?;
909
910 let runtime = match (
911 request.runtime_transferred_value,
912 request.runtime_seed_hex.as_deref(),
913 ) {
914 (None, None) => TransactionRuntimeParams::VmCasperV1,
915 (transferred_value, maybe_seed_hex) => {
916 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
917 TransactionRuntimeParams::VmCasperV2 {
918 transferred_value: transferred_value.unwrap_or(0),
919 seed,
920 }
921 }
922 };
923
924 let mut builder = TransactionV1Builder::new_session(
925 request.is_install_upgrade.unwrap_or(true),
926 wasm_bytes.into(),
927 runtime,
928 );
929
930 if let Some(args_json) = request.session_args.as_ref()
931 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
932 {
933 builder = builder.with_runtime_args(runtime_args);
934 }
935
936 if let Some(ttl_millis) = request.ttl_millis {
937 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
938 }
939
940 let pricing = build_pricing_mode(
941 request.gas_price_tolerance,
942 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
943 );
944 builder = builder.with_pricing_mode(pricing);
945
946 let chain_name = match request.chain_name {
947 Some(value) => value,
948 None => fetch_chain_name(&request.network_name, request.node_id)
949 .await
950 .map_err(to_mcp_error)?,
951 };
952
953 let tx = builder
954 .with_chain_name(chain_name)
955 .with_secret_key(&signer.secret_key)
956 .build()
957 .map_err(to_mcp_error)?;
958
959 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
960 let response = rpc
961 .put_transaction(Transaction::V1(tx))
962 .await
963 .map_err(to_mcp_error)?;
964
965 Ok(ok_value(json!({
966 "network_name": request.network_name,
967 "node_id": request.node_id,
968 "transaction_hash": response.transaction_hash.to_hex_string(),
969 })))
970 }
971
972 #[tool(
973 description = "Transfer tokens between derived account paths. from_path signs, to_path resolves recipient."
974 )]
975 async fn transfer_tokens(
976 &self,
977 Parameters(request): Parameters<TransferTokensRequest>,
978 ) -> std::result::Result<CallToolResult, ErrorData> {
979 let network = self
980 .manager
981 .get_network(&request.network_name)
982 .await
983 .map_err(to_mcp_error)?;
984 ensure_running_network(&network).await?;
985
986 let from = self
987 .manager
988 .derived_account_for_path(&network, &request.from_path)
989 .await
990 .map_err(to_mcp_error)?;
991 let to = self
992 .manager
993 .derived_account_for_path(&network, &request.to_path)
994 .await
995 .map_err(to_mcp_error)?;
996
997 verify_path_hash_consistency(&network.layout, &request.from_path, &from.account_hash)
998 .await
999 .map_err(to_mcp_error)?;
1000 verify_path_hash_consistency(&network.layout, &request.to_path, &to.account_hash)
1001 .await
1002 .map_err(to_mcp_error)?;
1003
1004 let amount = casper_types::U512::from_dec_str(&request.amount)
1005 .with_context(|| "amount must be a decimal U512 string")
1006 .map_err(to_mcp_error)?;
1007 let to_public_key = PublicKey::from_hex(&to.public_key_hex)
1008 .with_context(|| "failed to parse derived recipient public key")
1009 .map_err(to_mcp_error)?;
1010
1011 let mut builder =
1012 TransactionV1Builder::new_transfer(amount, None, to_public_key, request.transfer_id)
1013 .map_err(to_mcp_error)?;
1014
1015 if let Some(ttl_millis) = request.ttl_millis {
1016 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
1017 }
1018
1019 let pricing = build_pricing_mode(
1020 request.gas_price_tolerance,
1021 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
1022 );
1023 builder = builder.with_pricing_mode(pricing);
1024
1025 let chain_name = match request.chain_name {
1026 Some(value) => value,
1027 None => fetch_chain_name(&request.network_name, request.node_id)
1028 .await
1029 .map_err(to_mcp_error)?,
1030 };
1031
1032 let tx = builder
1033 .with_chain_name(chain_name)
1034 .with_secret_key(&from.secret_key)
1035 .build()
1036 .map_err(to_mcp_error)?;
1037
1038 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1039 let response = rpc
1040 .put_transaction(Transaction::V1(tx))
1041 .await
1042 .map_err(to_mcp_error)?;
1043
1044 Ok(ok_value(json!({
1045 "network_name": request.network_name,
1046 "node_id": request.node_id,
1047 "transaction_hash": response.transaction_hash.to_hex_string(),
1048 "from_path": request.from_path,
1049 "to_path": request.to_path,
1050 "amount": request.amount,
1051 })))
1052 }
1053
1054 #[tool(
1055 description = "Wait for transaction execution result with timeout and return execution effects/result payload."
1056 )]
1057 async fn wait_transaction(
1058 &self,
1059 Parameters(request): Parameters<WaitTransactionRequest>,
1060 ) -> std::result::Result<CallToolResult, ErrorData> {
1061 let network = self
1062 .manager
1063 .get_network(&request.network_name)
1064 .await
1065 .map_err(to_mcp_error)?;
1066 ensure_running_network(&network).await?;
1067
1068 let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1069 let poll = Duration::from_millis(request.poll_interval_millis.unwrap_or(1_000));
1070 let tx_hash =
1071 parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1072 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1073 let deadline = Instant::now() + timeout;
1074
1075 loop {
1076 let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1077
1078 if let Some(exec_info) = response.execution_info.clone()
1079 && exec_info.execution_result.is_some()
1080 {
1081 return Ok(ok_value(
1082 serde_json::to_value(response).map_err(internal_serde_error)?,
1083 ));
1084 }
1085
1086 if Instant::now() >= deadline {
1087 return Err(ErrorData::resource_not_found(
1088 format!(
1089 "transaction {} execution result not available before timeout",
1090 request.transaction_hash
1091 ),
1092 None,
1093 ));
1094 }
1095
1096 tokio::time::sleep(poll).await;
1097 }
1098 }
1099
1100 #[tool(
1101 description = "Get transaction details via info_get_transaction (non-waiting). Returns typed JSON response from node."
1102 )]
1103 async fn get_transaction(
1104 &self,
1105 Parameters(request): Parameters<GetTransactionRequest>,
1106 ) -> std::result::Result<CallToolResult, ErrorData> {
1107 let network = self
1108 .manager
1109 .get_network(&request.network_name)
1110 .await
1111 .map_err(to_mcp_error)?;
1112 ensure_running_network(&network).await?;
1113
1114 let tx_hash =
1115 parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1116 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1117 let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1118
1119 Ok(ok_value(
1120 serde_json::to_value(response).map_err(internal_serde_error)?,
1121 ))
1122 }
1123}
1124
1125#[tool_handler]
1126impl ServerHandler for McpServer {
1127 fn get_info(&self) -> ServerInfo {
1128 Self::server_info()
1129 }
1130}
1131
1132#[derive(Debug)]
1133struct NetworkManager {
1134 assets_root: PathBuf,
1135 managed: Mutex<HashMap<String, Arc<ManagedNetwork>>>,
1136 http: reqwest::Client,
1137}
1138
1139#[derive(Debug)]
1140struct ManagedNetwork {
1141 layout: AssetsLayout,
1142 state: Arc<Mutex<State>>,
1143 node_count: AtomicU32,
1144 rust_log: String,
1145 seed: Arc<str>,
1146 sse_store: Arc<SseStore>,
1147 last_block_hook_hash: Mutex<Option<String>>,
1148 shutdown: Arc<AtomicBool>,
1149 control_shutdown: Arc<AtomicBool>,
1150 asset_mutation_lock: Mutex<()>,
1151 sse_tasks: Mutex<Vec<tokio::task::JoinHandle<()>>>,
1152 control_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
1153}
1154
1155impl ManagedNetwork {
1156 async fn is_running(&self) -> bool {
1157 let state = self.state.lock().await;
1158 processes_running(&state)
1159 }
1160
1161 async fn stop(&self) -> Result<()> {
1162 self.shutdown.store(true, Ordering::SeqCst);
1163 self.control_shutdown.store(true, Ordering::SeqCst);
1164
1165 if let Some(control_task) = self.control_task.lock().await.take() {
1166 control_task.abort();
1167 let _ = control_task.await;
1168 }
1169 let _ = tokio_fs::remove_file(self.layout.control_socket_path()).await;
1170
1171 let tasks = {
1172 let mut guard = self.sse_tasks.lock().await;
1173 guard.drain(..).collect::<Vec<_>>()
1174 };
1175 for task in tasks {
1176 task.abort();
1177 let _ = task.await;
1178 }
1179
1180 let mut state = self.state.lock().await;
1181 process::stop(&mut state).await
1182 }
1183}
1184
1185impl NetworkManager {
1186 async fn new(assets_root: PathBuf) -> Result<Self> {
1187 let http = reqwest::Client::builder()
1188 .no_proxy()
1189 .timeout(Duration::from_secs(5))
1190 .build()?;
1191 Ok(Self {
1192 assets_root,
1193 managed: Mutex::new(HashMap::new()),
1194 http,
1195 })
1196 }
1197
1198 async fn spawn_network(&self, request: SpawnNetworkRequest) -> Result<SpawnNetworkResponse> {
1199 let network_name = request
1200 .network_name
1201 .unwrap_or_else(|| DEFAULT_NETWORK_NAME.to_string());
1202
1203 if network_name.trim().is_empty() {
1204 return Err(anyhow!("network_name must not be empty"));
1205 }
1206
1207 let maybe_existing = {
1208 let mut managed = self.managed.lock().await;
1209 if let Some(existing) = managed.get(&network_name)
1210 && existing.is_running().await
1211 && !request.force_setup.unwrap_or(true)
1212 {
1213 return Ok(SpawnNetworkResponse {
1214 network_name,
1215 node_count: existing.node_count.load(Ordering::SeqCst),
1216 managed: true,
1217 already_running: true,
1218 forced_setup: false,
1219 });
1220 }
1221 managed.remove(&network_name)
1222 };
1223
1224 if let Some(existing) = maybe_existing {
1225 let _ = existing.stop().await;
1226 }
1227
1228 let force_setup = request.force_setup.unwrap_or(true);
1229 let requested_nodes = request.node_count.unwrap_or(DEFAULT_NODE_COUNT);
1230 let users = request.users;
1231 let delay = request.delay.unwrap_or(DEFAULT_DELAY_SECS);
1232 let log_level = request
1233 .log_level
1234 .unwrap_or_else(|| DEFAULT_LOG_LEVEL.to_string());
1235 let node_log_format = request
1236 .node_log_format
1237 .unwrap_or_else(|| DEFAULT_NODE_LOG_FORMAT.to_string());
1238 let seed: Arc<str> = Arc::from(request.seed.unwrap_or_else(|| DEFAULT_SEED.to_string()));
1239
1240 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.clone());
1241 let assets_exist = layout.exists().await;
1242
1243 let asset_selector = assets::optional_asset_selector(
1244 request.asset.as_deref(),
1245 request.custom_asset.as_deref(),
1246 )?;
1247
1248 let setup_result = if force_setup {
1249 assets::teardown(&layout).await?;
1250 Some(
1251 assets::setup_local(
1252 &layout,
1253 &SetupOptions {
1254 nodes: requested_nodes,
1255 users,
1256 delay_seconds: delay,
1257 network_name: network_name.clone(),
1258 asset: asset_selector.clone(),
1259 protocol_version: request.protocol_version.clone(),
1260 chainspec_overrides: Vec::new(),
1261 node_log_format,
1262 seed: Arc::clone(&seed),
1263 },
1264 )
1265 .await?,
1266 )
1267 } else if !assets_exist {
1268 Some(
1269 assets::setup_local(
1270 &layout,
1271 &SetupOptions {
1272 nodes: requested_nodes,
1273 users,
1274 delay_seconds: delay,
1275 network_name: network_name.clone(),
1276 asset: asset_selector,
1277 protocol_version: request.protocol_version.clone(),
1278 chainspec_overrides: Vec::new(),
1279 node_log_format,
1280 seed: Arc::clone(&seed),
1281 },
1282 )
1283 .await?,
1284 )
1285 } else {
1286 None
1287 };
1288
1289 if !layout.exists().await {
1290 return Err(anyhow!(
1291 "assets missing under {}; call spawn_network with force_setup=true",
1292 layout.net_dir().display()
1293 ));
1294 }
1295
1296 assets::ensure_network_hook_samples(&layout).await?;
1297
1298 let node_count = layout.count_nodes().await?;
1299 if node_count == 0 {
1300 return Err(anyhow!("network has no nodes to start"));
1301 }
1302
1303 ensure_sidecar_available(&layout, node_count).await?;
1304
1305 let state_path = layout.net_dir().join(STATE_FILE_NAME);
1306 if !tokio_fs::try_exists(&state_path).await.unwrap_or(false) {
1307 let protocol_version = match &setup_result {
1308 Some(result) => result.protocol_version.clone(),
1309 None => latest_layout_protocol_version(&layout).await?,
1310 };
1311 assets::prepare_genesis_hooks(&layout, &protocol_version).await?;
1312 }
1313 let mut state = State::new(state_path).await?;
1314
1315 let rust_log = log_level.clone();
1316 process::start(
1317 &layout,
1318 &StartPlan {
1319 rust_log: rust_log.clone(),
1320 seed: Arc::clone(&seed),
1321 },
1322 &mut state,
1323 )
1324 .await?;
1325
1326 let managed = Arc::new(ManagedNetwork {
1327 layout: layout.clone(),
1328 state: Arc::new(Mutex::new(state)),
1329 node_count: AtomicU32::new(node_count),
1330 rust_log,
1331 seed,
1332 sse_store: Arc::new(SseStore::new(DEFAULT_SSE_HISTORY_CAPACITY)),
1333 last_block_hook_hash: Mutex::new(None),
1334 shutdown: Arc::new(AtomicBool::new(false)),
1335 control_shutdown: Arc::new(AtomicBool::new(false)),
1336 asset_mutation_lock: Mutex::new(()),
1337 sse_tasks: Mutex::new(Vec::new()),
1338 control_task: Mutex::new(None),
1339 });
1340 spawn_pid_sync_tasks(Arc::clone(&managed.state)).await;
1341
1342 self.spawn_sse_collectors(&managed).await;
1343 if let Err(err) = self.spawn_control_server(&managed).await {
1344 let _ = managed.stop().await;
1345 return Err(err);
1346 }
1347
1348 self.managed
1349 .lock()
1350 .await
1351 .insert(network_name.clone(), Arc::clone(&managed));
1352
1353 Ok(SpawnNetworkResponse {
1354 network_name,
1355 node_count,
1356 managed: true,
1357 already_running: false,
1358 forced_setup: force_setup,
1359 })
1360 }
1361
1362 async fn spawn_sse_collectors(&self, network: &Arc<ManagedNetwork>) {
1363 let mut tasks = Vec::new();
1364 for node_id in 1..=network.node_count.load(Ordering::SeqCst) {
1365 let endpoint = assets::sse_endpoint(node_id);
1366 let network = Arc::clone(network);
1367 let task = tokio::spawn(async move {
1368 run_sse_listener(network, node_id, endpoint).await;
1369 });
1370 tasks.push(task);
1371 }
1372 let mut guard = network.sse_tasks.lock().await;
1373 guard.extend(tasks);
1374 }
1375
1376 async fn spawn_control_server(&self, network: &Arc<ManagedNetwork>) -> Result<()> {
1377 let socket_path = network.layout.control_socket_path();
1378 if let Ok(metadata) = tokio_fs::symlink_metadata(&socket_path).await {
1379 if metadata.is_dir() {
1380 tokio_fs::remove_dir_all(&socket_path).await?;
1381 } else {
1382 tokio_fs::remove_file(&socket_path).await?;
1383 }
1384 }
1385 if let Some(parent) = socket_path.parent() {
1386 tokio_fs::create_dir_all(parent).await?;
1387 }
1388
1389 let listener = UnixListener::bind(&socket_path)?;
1390 let network_for_task = Arc::clone(network);
1391 let shutdown = Arc::clone(&network_for_task.control_shutdown);
1392 let task = tokio::spawn(async move {
1393 loop {
1394 if shutdown.load(Ordering::SeqCst) {
1395 break;
1396 }
1397
1398 let accepted =
1399 tokio::time::timeout(Duration::from_millis(250), listener.accept()).await;
1400 let (stream, _) = match accepted {
1401 Ok(Ok(pair)) => pair,
1402 Ok(Err(_)) => break,
1403 Err(_) => continue,
1404 };
1405 let network = Arc::clone(&network_for_task);
1406 tokio::spawn(async move {
1407 handle_managed_control_stream(stream, network).await;
1408 });
1409 }
1410
1411 let _ = tokio_fs::remove_file(&socket_path).await;
1412 });
1413 *network.control_task.lock().await = Some(task);
1414 Ok(())
1415 }
1416
1417 async fn stage_protocol(
1418 &self,
1419 network_name: &str,
1420 asset: Option<&str>,
1421 custom_asset: Option<&str>,
1422 legacy_asset_name: Option<&str>,
1423 protocol_version: &str,
1424 activation_point: u64,
1425 ) -> Result<StageProtocolResponse> {
1426 let asset_selector = stage_asset_selector(asset, custom_asset, legacy_asset_name)?;
1427 let managed = {
1428 let guard = self.managed.lock().await;
1429 guard.get(network_name).cloned()
1430 };
1431
1432 if let Some(network) = managed
1433 && network.is_running().await
1434 {
1435 if let Ok(Some(current_era)) = read_current_era_from_status(1).await
1436 && activation_point <= current_era
1437 {
1438 return Err(anyhow!(
1439 "activation point {} must be greater than current era {}",
1440 activation_point,
1441 current_era
1442 ));
1443 }
1444
1445 let request = ControlRequest::StageProtocol {
1446 asset: match &asset_selector {
1447 AssetSelector::Versioned(asset) => Some(asset.clone()),
1448 AssetSelector::LatestVersioned | AssetSelector::Custom(_) => None,
1449 },
1450 custom_asset: match &asset_selector {
1451 AssetSelector::Custom(asset) => Some(asset.clone()),
1452 AssetSelector::LatestVersioned | AssetSelector::Versioned(_) => None,
1453 },
1454 asset_name: None,
1455 protocol_version: protocol_version.to_string(),
1456 activation_point,
1457 chainspec_overrides: Vec::new(),
1458 restart_sidecars: true,
1459 rust_log: Some(network.rust_log.clone()),
1460 };
1461 let socket_path = network.layout.control_socket_path();
1462 return match send_request(&socket_path, &request).await {
1463 Ok(ControlResponse::Ok { result }) => match result {
1464 ControlResult::StageProtocol {
1465 live_mode,
1466 staged_nodes,
1467 restarted_sidecars,
1468 } => Ok(StageProtocolResponse {
1469 network_name: network_name.to_string(),
1470 live_mode,
1471 staged_nodes,
1472 restarted_sidecars,
1473 }),
1474 ControlResult::RuntimeStatus { .. } => Err(anyhow!(
1475 "unexpected runtime_status response from {}",
1476 socket_path.display()
1477 )),
1478 ControlResult::AddNodes { .. } => Err(anyhow!(
1479 "unexpected add_nodes response from {}",
1480 socket_path.display()
1481 )),
1482 },
1483 Ok(ControlResponse::Error { error }) => Err(anyhow!(error)),
1484 Err(err) => Err(anyhow!(
1485 "failed to reach managed control socket {}: {}",
1486 socket_path.display(),
1487 err
1488 )),
1489 };
1490 }
1491
1492 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1493 if !layout.exists().await {
1494 return Err(anyhow!(
1495 "assets for {} not found under {}",
1496 network_name,
1497 layout.net_dir().display()
1498 ));
1499 }
1500
1501 let staged = assets::stage_protocol(
1502 &layout,
1503 &StageProtocolOptions {
1504 asset: asset_selector,
1505 protocol_version: protocol_version.to_string(),
1506 activation_point,
1507 chainspec_overrides: Vec::new(),
1508 },
1509 )
1510 .await?;
1511
1512 Ok(StageProtocolResponse {
1513 network_name: network_name.to_string(),
1514 live_mode: false,
1515 staged_nodes: staged.staged_nodes,
1516 restarted_sidecars: Vec::new(),
1517 })
1518 }
1519
1520 async fn wait_network_ready(
1521 &self,
1522 network_name: &str,
1523 timeout_seconds: Option<u64>,
1524 ) -> Result<WaitReadyResponse> {
1525 let network = self.get_network(network_name).await?;
1526 let timeout = Duration::from_secs(timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1527 let deadline = Instant::now() + timeout;
1528
1529 loop {
1530 let running = ensure_running_network(&network).await.is_ok();
1531 if running {
1532 let status = check_rest_ready(&network).await;
1533 if let Ok(rest_by_node) = status {
1534 let state = network.state.lock().await;
1535 let block_observed = state.last_block_height.is_some()
1536 || rest_by_node.values().any(rest_has_block);
1537 if block_observed {
1538 return Ok(WaitReadyResponse {
1539 network_name: network_name.to_string(),
1540 ready: true,
1541 node_count: network.node_count.load(Ordering::SeqCst),
1542 rest: rest_by_node,
1543 last_block_height: state.last_block_height,
1544 });
1545 }
1546 }
1547 }
1548
1549 if Instant::now() >= deadline {
1550 return Ok(WaitReadyResponse {
1551 network_name: network_name.to_string(),
1552 ready: false,
1553 node_count: network.node_count.load(Ordering::SeqCst),
1554 rest: HashMap::new(),
1555 last_block_height: None,
1556 });
1557 }
1558
1559 tokio::time::sleep(Duration::from_millis(500)).await;
1560 }
1561 }
1562
1563 async fn despawn_network(&self, network_name: &str, purge: bool) -> Result<DespawnResponse> {
1564 let managed = {
1565 let mut guard = self.managed.lock().await;
1566 guard.remove(network_name)
1567 };
1568
1569 let Some(network) = managed else {
1570 return Err(anyhow!("network '{}' is not managed", network_name));
1571 };
1572
1573 network.stop().await?;
1574 if purge {
1575 assets::teardown(&network.layout).await?;
1576 }
1577
1578 Ok(DespawnResponse {
1579 network_name: network_name.to_string(),
1580 purged: purge,
1581 })
1582 }
1583
1584 async fn list_networks(&self) -> Result<ListNetworksResponse> {
1585 let discovered = discover_network_names(&self.assets_root).await?;
1586 let managed_snapshot = {
1587 let guard = self.managed.lock().await;
1588 guard
1589 .iter()
1590 .map(|(name, network)| (name.clone(), Arc::clone(network)))
1591 .collect::<Vec<_>>()
1592 };
1593
1594 let mut rows = Vec::new();
1595
1596 for name in &discovered {
1597 if let Some((_, network)) = managed_snapshot.iter().find(|(n, _)| n == name) {
1598 rows.push(NetworkRow {
1599 network_name: name.clone(),
1600 discovered: true,
1601 managed: true,
1602 running: network.is_running().await,
1603 node_count: Some(network.node_count.load(Ordering::SeqCst)),
1604 });
1605 } else {
1606 let layout = AssetsLayout::new(self.assets_root.clone(), name.clone());
1607 rows.push(NetworkRow {
1608 network_name: name.clone(),
1609 discovered: true,
1610 managed: false,
1611 running: false,
1612 node_count: layout.count_nodes().await.ok(),
1613 });
1614 }
1615 }
1616
1617 for (name, network) in managed_snapshot {
1618 if !discovered.iter().any(|candidate| candidate == &name) {
1619 rows.push(NetworkRow {
1620 network_name: name,
1621 discovered: false,
1622 managed: true,
1623 running: network.is_running().await,
1624 node_count: Some(network.node_count.load(Ordering::SeqCst)),
1625 });
1626 }
1627 }
1628
1629 rows.sort_by(|a, b| a.network_name.cmp(&b.network_name));
1630
1631 Ok(ListNetworksResponse { networks: rows })
1632 }
1633
1634 async fn managed_processes(
1635 &self,
1636 request: ManagedProcessesRequest,
1637 ) -> Result<ManagedProcessesResponse> {
1638 let network = self.get_network(&request.network_name).await?;
1639 let running_only = request.running_only.unwrap_or(true);
1640 let process_name = request
1641 .process_name
1642 .as_deref()
1643 .map(str::trim)
1644 .filter(|name| !name.is_empty())
1645 .map(ToString::to_string);
1646
1647 let process_name_lc = process_name.as_ref().map(|name| name.to_ascii_lowercase());
1648 let state = network.state.lock().await;
1649 let mut processes = Vec::new();
1650 for process in &state.processes {
1651 if let Some(name_lc) = process_name_lc.as_deref()
1652 && !process.id.to_ascii_lowercase().contains(name_lc)
1653 {
1654 continue;
1655 }
1656
1657 let pid = process.current_pid();
1658 let running = matches!(process.last_status, ProcessStatus::Running)
1659 && pid.is_some_and(is_pid_running);
1660 if running_only && !running {
1661 continue;
1662 }
1663
1664 processes.push(ManagedProcessRow {
1665 id: process.id.clone(),
1666 node_id: process.node_id,
1667 kind: process_kind_name(&process.kind).to_string(),
1668 pid,
1669 running,
1670 last_status: process_status_name(&process.last_status).to_string(),
1671 command: process.command.clone(),
1672 args: process.args.clone(),
1673 cwd: process.cwd.clone(),
1674 stdout_path: process.stdout_path.clone(),
1675 stderr_path: process.stderr_path.clone(),
1676 });
1677 }
1678
1679 Ok(ManagedProcessesResponse {
1680 network_name: request.network_name,
1681 running_only,
1682 process_name,
1683 processes,
1684 })
1685 }
1686
1687 async fn get_network(&self, network_name: &str) -> Result<Arc<ManagedNetwork>> {
1688 let guard = self.managed.lock().await;
1689 guard.get(network_name).cloned().ok_or_else(|| {
1690 anyhow!(
1691 "network '{}' is not managed; call spawn_network first",
1692 network_name
1693 )
1694 })
1695 }
1696
1697 async fn stop_all_networks(&self) -> Result<()> {
1698 let managed = {
1699 let mut guard = self.managed.lock().await;
1700 guard
1701 .drain()
1702 .map(|(_, network)| network)
1703 .collect::<Vec<_>>()
1704 };
1705
1706 let mut errors = Vec::new();
1707 for network in managed {
1708 if let Err(err) = network.stop().await {
1709 errors.push(err.to_string());
1710 }
1711 }
1712
1713 if errors.is_empty() {
1714 Ok(())
1715 } else {
1716 Err(anyhow!(errors.join("\n")))
1717 }
1718 }
1719
1720 async fn raw_rpc_query(
1721 &self,
1722 node_id: u32,
1723 method: &str,
1724 params: Option<Value>,
1725 ) -> Result<Value> {
1726 let endpoint = assets::rpc_endpoint(node_id);
1727 let payload = match params {
1728 Some(params) => json!({
1729 "id": 1,
1730 "jsonrpc": "2.0",
1731 "method": method,
1732 "params": params,
1733 }),
1734 None => json!({
1735 "id": 1,
1736 "jsonrpc": "2.0",
1737 "method": method,
1738 }),
1739 };
1740
1741 let response = self
1742 .http
1743 .post(endpoint)
1744 .json(&payload)
1745 .send()
1746 .await?
1747 .error_for_status()?;
1748 Ok(response.json::<Value>().await?)
1749 }
1750
1751 async fn list_derived_accounts(&self, network_name: &str) -> Result<Vec<DerivedAccountRow>> {
1752 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1753 let csv = assets::derived_accounts_summary(&layout)
1754 .await
1755 .ok_or_else(|| {
1756 anyhow!(
1757 "missing derived-accounts.csv for network '{}'",
1758 network_name
1759 )
1760 })?;
1761
1762 let seed = {
1763 let managed = self.managed.lock().await;
1764 managed
1765 .get(network_name)
1766 .map(|network| Arc::clone(&network.seed))
1767 };
1768
1769 parse_derived_accounts_csv(&csv, seed).await
1770 }
1771
1772 async fn derived_account_for_path(
1773 &self,
1774 network: &Arc<ManagedNetwork>,
1775 path: &str,
1776 ) -> Result<DerivedSigner> {
1777 let material =
1778 assets::derive_account_from_seed_path(Arc::clone(&network.seed), path).await?;
1779 let secret_key = SecretKey::from_pem(&material.secret_key_pem)?;
1780 Ok(DerivedSigner {
1781 public_key_hex: material.public_key_hex,
1782 account_hash: material.account_hash,
1783 secret_key,
1784 })
1785 }
1786}
1787
1788async fn handle_managed_control_stream(
1789 mut stream: tokio::net::UnixStream,
1790 network: Arc<ManagedNetwork>,
1791) {
1792 let mut request_bytes = Vec::new();
1793 let response = match stream.read_to_end(&mut request_bytes).await {
1794 Ok(_) => match serde_json::from_slice::<ControlRequest>(&request_bytes) {
1795 Ok(request) => handle_managed_control_request(&network, request).await,
1796 Err(err) => ControlResponse::Error {
1797 error: format!("invalid control request: {}", err),
1798 },
1799 },
1800 Err(err) => ControlResponse::Error {
1801 error: format!("failed to read control request: {}", err),
1802 },
1803 };
1804
1805 let response_bytes = serde_json::to_vec(&response).unwrap_or_else(|err| {
1806 format!(
1807 "{{\"status\":\"error\",\"error\":\"failed to serialize control response: {}\"}}",
1808 err
1809 )
1810 .into_bytes()
1811 });
1812 let _ = stream.write_all(&response_bytes).await;
1813 let _ = stream.shutdown().await;
1814}
1815
1816async fn spawn_added_sse_collectors(network: &Arc<ManagedNetwork>, node_ids: &[u32]) {
1817 let mut tasks = Vec::new();
1818 for node_id in node_ids {
1819 let node_id = *node_id;
1820 let endpoint = assets::sse_endpoint(node_id);
1821 let network = Arc::clone(network);
1822 let task = tokio::spawn(async move {
1823 run_sse_listener(network, node_id, endpoint).await;
1824 });
1825 tasks.push(task);
1826 }
1827 let mut guard = network.sse_tasks.lock().await;
1828 guard.extend(tasks);
1829}
1830
1831async fn handle_managed_control_request(
1832 network: &Arc<ManagedNetwork>,
1833 request: ControlRequest,
1834) -> ControlResponse {
1835 match request {
1836 ControlRequest::RuntimeStatus => {
1837 let state = network.state.lock().await;
1838 ControlResponse::Ok {
1839 result: ControlResult::RuntimeStatus {
1840 running_node_ids: running_node_ids(&state),
1841 last_block_height: state.last_block_height,
1842 },
1843 }
1844 }
1845 ControlRequest::StageProtocol {
1846 asset,
1847 custom_asset,
1848 asset_name,
1849 protocol_version,
1850 activation_point,
1851 chainspec_overrides,
1852 restart_sidecars,
1853 rust_log,
1854 } => {
1855 let asset_selector = match stage_asset_selector(
1856 asset.as_deref(),
1857 custom_asset.as_deref(),
1858 asset_name.as_deref(),
1859 ) {
1860 Ok(asset_selector) => asset_selector,
1861 Err(err) => {
1862 return ControlResponse::Error {
1863 error: err.to_string(),
1864 };
1865 }
1866 };
1867 let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1868 let staged = assets::stage_protocol(
1869 &network.layout,
1870 &StageProtocolOptions {
1871 asset: asset_selector,
1872 protocol_version,
1873 activation_point,
1874 chainspec_overrides,
1875 },
1876 )
1877 .await;
1878 let staged = match staged {
1879 Ok(staged) => staged,
1880 Err(err) => {
1881 return ControlResponse::Error {
1882 error: err.to_string(),
1883 };
1884 }
1885 };
1886
1887 let mut restarted_sidecars = Vec::new();
1888 if restart_sidecars {
1889 let rust_log = rust_log.unwrap_or_else(|| network.rust_log.clone());
1890 let mut state = network.state.lock().await;
1891 match process::restart_sidecars(&network.layout, &mut state, &rust_log).await {
1892 Ok(restarted) => {
1893 for proc in restarted {
1894 restarted_sidecars.push(proc.record.node_id);
1895 }
1896 }
1897 Err(err) => {
1898 return ControlResponse::Error {
1899 error: err.to_string(),
1900 };
1901 }
1902 }
1903 }
1904
1905 ControlResponse::Ok {
1906 result: ControlResult::StageProtocol {
1907 live_mode: true,
1908 staged_nodes: staged.staged_nodes,
1909 restarted_sidecars,
1910 },
1911 }
1912 }
1913 ControlRequest::AddNodes { count } => {
1914 let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1915 let added = match assets::add_nodes(
1916 &network.layout,
1917 &AddNodesOptions {
1918 count,
1919 seed: Arc::clone(&network.seed),
1920 },
1921 )
1922 .await
1923 {
1924 Ok(added) => added,
1925 Err(err) => {
1926 return ControlResponse::Error {
1927 error: err.to_string(),
1928 };
1929 }
1930 };
1931
1932 let started = {
1933 let mut state = network.state.lock().await;
1934 process::start_added_nodes(
1935 &network.layout,
1936 &mut state,
1937 &added.added_node_ids,
1938 added.total_nodes,
1939 &network.rust_log,
1940 Arc::clone(&network.seed),
1941 )
1942 .await
1943 };
1944 let started = match started {
1945 Ok(started) => started,
1946 Err(err) => {
1947 let error =
1948 rollback_added_nodes_after_start_error(&network.layout, &added, err).await;
1949 return ControlResponse::Error { error };
1950 }
1951 };
1952 let process_ids = started
1953 .iter()
1954 .map(|proc| proc.record.id.clone())
1955 .collect::<Vec<_>>();
1956 let started_processes = started.len() as u32;
1957 network
1958 .node_count
1959 .store(added.total_nodes, Ordering::SeqCst);
1960 spawn_added_sse_collectors(network, &added.added_node_ids).await;
1961 spawn_pid_sync_tasks_for_ids(Arc::clone(&network.state), &process_ids).await;
1962
1963 ControlResponse::Ok {
1964 result: ControlResult::AddNodes {
1965 added_node_ids: added.added_node_ids,
1966 total_nodes: added.total_nodes,
1967 started_processes,
1968 },
1969 }
1970 }
1971 }
1972}
1973
1974async fn rollback_added_nodes_after_start_error(
1975 layout: &AssetsLayout,
1976 added: &assets::AddNodesResult,
1977 err: anyhow::Error,
1978) -> String {
1979 match assets::rollback_added_nodes(layout, &added.added_node_ids).await {
1980 Ok(()) => err.to_string(),
1981 Err(rollback_err) => format!(
1982 "{}; failed to roll back added node assets: {}",
1983 err, rollback_err
1984 ),
1985 }
1986}
1987
1988async fn read_current_era_from_status(node_id: u32) -> Result<Option<u64>> {
1989 let value = fetch_rest_status(node_id).await?;
1990 Ok(value
1991 .get("last_added_block_info")
1992 .and_then(|info| info.get("era_id"))
1993 .and_then(|era_id| {
1994 era_id
1995 .as_u64()
1996 .or_else(|| era_id.as_str().and_then(|raw| raw.parse::<u64>().ok()))
1997 }))
1998}
1999
2000#[derive(Debug)]
2001struct SseStore {
2002 sequence: AtomicU64,
2003 events: Mutex<VecDeque<SseRecord>>,
2004 notify: Notify,
2005 capacity: usize,
2006}
2007
2008impl SseStore {
2009 fn new(capacity: usize) -> Self {
2010 Self {
2011 sequence: AtomicU64::new(0),
2012 events: Mutex::new(VecDeque::new()),
2013 notify: Notify::new(),
2014 capacity,
2015 }
2016 }
2017
2018 async fn latest_sequence(&self) -> u64 {
2019 self.sequence.load(Ordering::SeqCst)
2020 }
2021
2022 async fn push(&self, node_id: u32, event: SseEvent) {
2023 let sequence = self.sequence.fetch_add(1, Ordering::SeqCst) + 1;
2024 let event_type = sse_event_type(&event).to_string();
2025 let payload = sse_event_payload(&event);
2026
2027 let record = SseRecord {
2028 sequence,
2029 timestamp_rfc3339: timestamp_prefix(),
2030 node_id,
2031 event_type,
2032 payload,
2033 };
2034
2035 let mut guard = self.events.lock().await;
2036 guard.push_back(record);
2037 while guard.len() > self.capacity {
2038 let _ = guard.pop_front();
2039 }
2040 drop(guard);
2041
2042 self.notify.notify_waiters();
2043 }
2044
2045 async fn wait_next(
2046 &self,
2047 after_sequence: u64,
2048 filter: &SseFilter,
2049 timeout: Duration,
2050 ) -> Result<SseRecord> {
2051 let deadline = Instant::now() + timeout;
2052 loop {
2053 if let Some(record) = self.find_first_after(after_sequence, filter).await {
2054 return Ok(record);
2055 }
2056
2057 if Instant::now() >= deadline {
2058 return Err(anyhow!("timed out waiting for SSE event"));
2059 }
2060
2061 let remaining = deadline.saturating_duration_since(Instant::now());
2062 tokio::time::timeout(remaining, self.notify.notified()).await?;
2063 }
2064 }
2065
2066 async fn find_first_after(&self, after_sequence: u64, filter: &SseFilter) -> Option<SseRecord> {
2067 let guard = self.events.lock().await;
2068 guard
2069 .iter()
2070 .find(|event| event.sequence > after_sequence && filter.matches(event))
2071 .cloned()
2072 }
2073
2074 async fn history(
2075 &self,
2076 before_sequence: Option<u64>,
2077 limit: usize,
2078 filter: &SseFilter,
2079 ) -> SseHistoryPage {
2080 let before = before_sequence.unwrap_or(u64::MAX);
2081 let guard = self.events.lock().await;
2082
2083 let mut matched = guard
2084 .iter()
2085 .filter(|event| event.sequence < before)
2086 .filter(|event| filter.matches(event))
2087 .cloned()
2088 .collect::<Vec<_>>();
2089
2090 let total = matched.len();
2091
2092 if matched.len() > limit {
2093 matched = matched.split_off(matched.len() - limit);
2094 }
2095
2096 let next_before_sequence = matched.first().map(|event| event.sequence);
2097 SseHistoryPage {
2098 total_matching: total,
2099 returned: matched.len(),
2100 next_before_sequence,
2101 events: matched,
2102 }
2103 }
2104}
2105
2106#[derive(Debug, Default)]
2107struct SseFilter {
2108 include_event_types: Vec<String>,
2109 exclude_event_types: Vec<String>,
2110}
2111
2112impl SseFilter {
2113 fn matches(&self, event: &SseRecord) -> bool {
2114 let include = if self.include_event_types.is_empty() {
2115 true
2116 } else {
2117 self.include_event_types
2118 .iter()
2119 .any(|name| name == &event.event_type)
2120 };
2121 if !include {
2122 return false;
2123 }
2124 !self
2125 .exclude_event_types
2126 .iter()
2127 .any(|name| name == &event.event_type)
2128 }
2129}
2130
2131#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2132struct SseRecord {
2133 sequence: u64,
2134 timestamp_rfc3339: String,
2135 node_id: u32,
2136 event_type: String,
2137 payload: Value,
2138}
2139
2140#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2141struct SseHistoryPage {
2142 total_matching: usize,
2143 returned: usize,
2144 next_before_sequence: Option<u64>,
2145 events: Vec<SseRecord>,
2146}
2147
2148#[derive(Debug)]
2149struct DerivedSigner {
2150 public_key_hex: String,
2151 account_hash: String,
2152 secret_key: SecretKey,
2153}
2154
2155#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2156struct SpawnNetworkRequest {
2157 network_name: Option<String>,
2158 asset: Option<String>,
2159 custom_asset: Option<String>,
2160 protocol_version: Option<String>,
2161 node_count: Option<u32>,
2162 users: Option<u32>,
2163 delay: Option<u64>,
2164 log_level: Option<String>,
2165 node_log_format: Option<String>,
2166 seed: Option<String>,
2167 force_setup: Option<bool>,
2168}
2169
2170#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2171struct SpawnNetworkResponse {
2172 network_name: String,
2173 node_count: u32,
2174 managed: bool,
2175 already_running: bool,
2176 forced_setup: bool,
2177}
2178
2179#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2180struct WaitNetworkReadyRequest {
2181 network_name: String,
2182 timeout_seconds: Option<u64>,
2183}
2184
2185#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2186struct WaitReadyResponse {
2187 network_name: String,
2188 ready: bool,
2189 node_count: u32,
2190 rest: HashMap<u32, Value>,
2191 last_block_height: Option<u64>,
2192}
2193
2194#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2195struct StageProtocolRequest {
2196 network_name: String,
2197 asset: Option<String>,
2198 custom_asset: Option<String>,
2199 asset_name: Option<String>,
2200 protocol_version: String,
2201 activation_point: u64,
2202}
2203
2204#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2205struct StageProtocolResponse {
2206 network_name: String,
2207 live_mode: bool,
2208 staged_nodes: u32,
2209 restarted_sidecars: Vec<u32>,
2210}
2211
2212#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2213struct DespawnNetworkRequest {
2214 network_name: String,
2215 purge: Option<bool>,
2216}
2217
2218#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2219struct DespawnResponse {
2220 network_name: String,
2221 purged: bool,
2222}
2223
2224#[derive(Debug, Deserialize, Default, rmcp::schemars::JsonSchema)]
2225struct ListNetworksRequest {}
2226
2227#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2228struct ListNetworksResponse {
2229 networks: Vec<NetworkRow>,
2230}
2231
2232#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2233struct NetworkRow {
2234 network_name: String,
2235 discovered: bool,
2236 managed: bool,
2237 running: bool,
2238 node_count: Option<u32>,
2239}
2240
2241#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2242struct ManagedProcessesRequest {
2243 network_name: String,
2244 process_name: Option<String>,
2245 running_only: Option<bool>,
2246}
2247
2248#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2249struct ManagedProcessesResponse {
2250 network_name: String,
2251 running_only: bool,
2252 process_name: Option<String>,
2253 processes: Vec<ManagedProcessRow>,
2254}
2255
2256#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2257struct ManagedProcessRow {
2258 id: String,
2259 node_id: u32,
2260 kind: String,
2261 pid: Option<u32>,
2262 running: bool,
2263 last_status: String,
2264 command: String,
2265 args: Vec<String>,
2266 cwd: String,
2267 stdout_path: String,
2268 stderr_path: String,
2269}
2270
2271#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2272struct NodeScopedRequest {
2273 network_name: String,
2274 node_id: u32,
2275}
2276
2277#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2278struct RpcQueryRequest {
2279 network_name: String,
2280 node_id: u32,
2281 method: String,
2282 params: Option<Value>,
2283}
2284
2285#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2286struct RpcQueryBalanceRequest {
2287 network_name: String,
2288 node_id: u32,
2289 purse_identifier: String,
2290 block_id: Option<String>,
2291 state_root_hash: Option<String>,
2292}
2293
2294#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2295struct RpcQueryGlobalStateRequest {
2296 network_name: String,
2297 node_id: u32,
2298 key: String,
2299 path: Option<Vec<String>>,
2300 block_id: Option<String>,
2301 state_root_hash: Option<String>,
2302}
2303
2304#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2305struct CurrentBlockRequest {
2306 network_name: String,
2307 node_id: u32,
2308 block_id: Option<String>,
2309}
2310
2311#[derive(Debug, Clone, Copy, Deserialize, Serialize, rmcp::schemars::JsonSchema)]
2312#[serde(rename_all = "lowercase")]
2313enum NodeLogStream {
2314 Stdout,
2315 Stderr,
2316}
2317
2318#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2319struct GetNodeLogsRequest {
2320 network_name: String,
2321 node_id: u32,
2322 stream: NodeLogStream,
2323 limit: Option<usize>,
2324 before_line: Option<usize>,
2325}
2326
2327#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2328struct LogLine {
2329 line_number: usize,
2330 content: String,
2331}
2332
2333#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2334struct LogPage {
2335 path: String,
2336 total_lines: usize,
2337 returned: usize,
2338 next_before_line: Option<usize>,
2339 lines: Vec<LogLine>,
2340}
2341
2342#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2343struct WaitNextSseRequest {
2344 network_name: String,
2345 include_event_types: Option<Vec<String>>,
2346 exclude_event_types: Option<Vec<String>>,
2347 after_sequence: Option<u64>,
2348 timeout_seconds: Option<u64>,
2349}
2350
2351#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2352struct SseHistoryRequest {
2353 network_name: String,
2354 include_event_types: Option<Vec<String>>,
2355 exclude_event_types: Option<Vec<String>>,
2356 before_sequence: Option<u64>,
2357 limit: Option<usize>,
2358}
2359
2360#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2361struct ListDerivedAccountsRequest {
2362 network_name: String,
2363}
2364
2365#[derive(Debug, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2366struct DerivedAccountRow {
2367 kind: String,
2368 name: String,
2369 key_type: String,
2370 derivation: String,
2371 path: String,
2372 account_hash: String,
2373 balance: String,
2374 public_key: Option<String>,
2375}
2376
2377#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2378struct SendTransactionSignedRequest {
2379 network_name: String,
2380 node_id: u32,
2381 signer_path: String,
2382 transaction: Value,
2383}
2384
2385#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2386struct MakeTransactionPackageCallRequest {
2387 network_name: String,
2388 node_id: u32,
2389 transaction_package_name: String,
2390 transaction_package_version: Option<EntityVersion>,
2391 session_entry_point: String,
2392 #[serde(alias = "session_args_json")]
2393 session_args: Option<Value>,
2394 signer_path: Option<String>,
2395 initiator_public_key: Option<String>,
2396 chain_name: Option<String>,
2397 ttl_millis: Option<u64>,
2398 gas_price_tolerance: Option<u8>,
2399 payment_amount: Option<u64>,
2400 runtime_transferred_value: Option<u64>,
2401 runtime_seed_hex: Option<String>,
2402}
2403
2404#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2405struct MakeTransactionContractCallRequest {
2406 network_name: String,
2407 node_id: u32,
2408 transaction_contract_hash: String,
2409 session_entry_point: String,
2410 #[serde(alias = "session_args_json")]
2411 session_args: Option<Value>,
2412 signer_path: Option<String>,
2413 initiator_public_key: Option<String>,
2414 chain_name: Option<String>,
2415 ttl_millis: Option<u64>,
2416 gas_price_tolerance: Option<u8>,
2417 payment_amount: Option<u64>,
2418 runtime_transferred_value: Option<u64>,
2419 runtime_seed_hex: Option<String>,
2420}
2421
2422#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2423struct MakeTransactionSessionWasmRequest {
2424 network_name: String,
2425 node_id: u32,
2426 wasm_path: String,
2427 #[serde(alias = "session_args_json")]
2428 session_args: Option<Value>,
2429 is_install_upgrade: Option<bool>,
2430 signer_path: Option<String>,
2431 initiator_public_key: Option<String>,
2432 chain_name: Option<String>,
2433 ttl_millis: Option<u64>,
2434 gas_price_tolerance: Option<u8>,
2435 payment_amount: Option<u64>,
2436 runtime_transferred_value: Option<u64>,
2437 runtime_seed_hex: Option<String>,
2438}
2439
2440#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2441struct SendSessionWasmRequest {
2442 network_name: String,
2443 node_id: u32,
2444 signer_path: String,
2445 wasm_path: String,
2446 #[serde(alias = "session_args_json")]
2447 session_args: Option<Value>,
2448 chain_name: Option<String>,
2449 is_install_upgrade: Option<bool>,
2450 ttl_millis: Option<u64>,
2451 gas_price_tolerance: Option<u8>,
2452 payment_amount: Option<u64>,
2453 runtime_transferred_value: Option<u64>,
2454 runtime_seed_hex: Option<String>,
2455}
2456
2457#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2458struct TransferTokensRequest {
2459 network_name: String,
2460 node_id: u32,
2461 from_path: String,
2462 to_path: String,
2463 amount: String,
2464 transfer_id: Option<u64>,
2465 chain_name: Option<String>,
2466 ttl_millis: Option<u64>,
2467 gas_price_tolerance: Option<u8>,
2468 payment_amount: Option<u64>,
2469}
2470
2471#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2472struct WaitTransactionRequest {
2473 network_name: String,
2474 node_id: u32,
2475 transaction_hash: String,
2476 timeout_seconds: Option<u64>,
2477 poll_interval_millis: Option<u64>,
2478}
2479
2480#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2481struct GetTransactionRequest {
2482 network_name: String,
2483 node_id: u32,
2484 transaction_hash: String,
2485}
2486
2487#[derive(Debug, Deserialize)]
2488#[serde(rename_all = "snake_case")]
2489struct RestStatusProbe {
2490 reactor_state: Option<String>,
2491}
2492
2493fn to_mcp_error(err: impl std::fmt::Display) -> ErrorData {
2494 ErrorData::internal_error(err.to_string(), None)
2495}
2496
2497fn internal_serde_error(err: serde_json::Error) -> ErrorData {
2498 ErrorData::internal_error(format!("serde error: {}", err), None)
2499}
2500
2501fn ok_value(value: Value) -> rmcp::model::CallToolResult {
2502 rmcp::model::CallToolResult::structured(value)
2503}
2504
2505fn parse_transaction_json(value: Value) -> std::result::Result<Transaction, ErrorData> {
2506 if let Value::String(encoded) = &value {
2507 let _ = encoded;
2508 return Err(ErrorData::invalid_params(
2509 "invalid transaction payload: encoded JSON strings are not supported. Provide transaction as a typed JSON object",
2510 None,
2511 ));
2512 }
2513
2514 if let Ok(transaction) = serde_json::from_value::<Transaction>(value.clone()) {
2515 return Ok(transaction);
2516 }
2517
2518 if let Some(inner) = value.get("transaction") {
2519 return parse_transaction_json(inner.clone());
2520 }
2521
2522 Err(ErrorData::invalid_params(
2523 "failed to parse transaction JSON; expected Transaction object or { transaction: Transaction }, with typed JSON values only.",
2524 None,
2525 ))
2526}
2527
2528fn parse_transaction_hash_input(input: &str) -> Result<TransactionHash> {
2529 let value = input.trim();
2530 if value.is_empty() {
2531 return Err(anyhow!("transaction_hash must not be empty"));
2532 }
2533
2534 let unwrapped = value
2535 .strip_prefix("transaction-v1-hash(")
2536 .and_then(|inner| inner.strip_suffix(')'))
2537 .or_else(|| {
2538 value
2539 .strip_prefix("deploy-hash(")
2540 .and_then(|inner| inner.strip_suffix(')'))
2541 })
2542 .unwrap_or(value);
2543
2544 if unwrapped.contains("..") {
2545 return Err(anyhow!(
2546 "transaction_hash appears abbreviated; provide full hex digest"
2547 ));
2548 }
2549
2550 let normalized = unwrapped.strip_prefix("0x").unwrap_or(unwrapped);
2551 let digest = Digest::from_hex(normalized)
2552 .map_err(|err| anyhow!("failed to parse transaction hash digest: {}", err))?;
2553 Ok(TransactionHash::from(TransactionV1Hash::from(digest)))
2554}
2555
2556fn mcp_rpc_client(network_name: &str, node_id: u32) -> Result<CasperClient> {
2557 CasperClient::new(
2558 network_name.to_string(),
2559 vec![assets::rpc_endpoint(node_id)],
2560 )
2561 .map_err(|err| {
2562 anyhow!(
2563 "failed to initialize rpc client for node {}: {}",
2564 node_id,
2565 err
2566 )
2567 })
2568}
2569
2570fn extract_rpc_result(response: Value) -> Result<Value> {
2571 if let Some(error) = response.get("error") {
2572 return Err(anyhow!("rpc request failed: {}", error));
2573 }
2574 response
2575 .get("result")
2576 .cloned()
2577 .ok_or_else(|| anyhow!("rpc response missing result field"))
2578}
2579
2580async fn fetch_block_result(
2581 manager: &NetworkManager,
2582 network_name: &str,
2583 node_id: u32,
2584 block_id: Option<&str>,
2585) -> Result<Value> {
2586 let block_id = normalize_optional_identifier(block_id);
2587 if block_id.is_empty() {
2588 let rpc = mcp_rpc_client(network_name, node_id)?;
2589 let result = rpc.get_block().await?;
2590 return serde_json::to_value(result).map_err(Into::into);
2591 }
2592
2593 let block_identifier = parse_block_identifier_value(&block_id)?;
2594 let response = manager
2595 .raw_rpc_query(
2596 node_id,
2597 "chain_get_block",
2598 Some(json!({
2599 "block_identifier": block_identifier
2600 })),
2601 )
2602 .await?;
2603 extract_rpc_result(response)
2604}
2605
2606fn build_query_global_state_params(
2607 key: &str,
2608 path: Vec<String>,
2609 block_id: &str,
2610 state_root_hash: &str,
2611) -> Result<Value> {
2612 let key = parse_query_key(key)?;
2613 let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2614 let mut params = serde_json::Map::new();
2615 params.insert("key".to_string(), Value::String(key));
2616 params.insert(
2617 "path".to_string(),
2618 Value::Array(path.into_iter().map(Value::String).collect()),
2619 );
2620 if let Some(state_identifier) = state_identifier {
2621 params.insert("state_identifier".to_string(), state_identifier);
2622 }
2623 Ok(Value::Object(params))
2624}
2625
2626fn build_query_balance_params(
2627 purse_identifier: &str,
2628 block_id: &str,
2629 state_root_hash: &str,
2630) -> Result<Value> {
2631 let purse_identifier = parse_purse_identifier(purse_identifier)?;
2632 let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2633 let mut params = serde_json::Map::new();
2634 params.insert("purse_identifier".to_string(), purse_identifier);
2635 if let Some(state_identifier) = state_identifier {
2636 params.insert("state_identifier".to_string(), state_identifier);
2637 }
2638 Ok(Value::Object(params))
2639}
2640
2641fn parse_state_identifier(block_id: &str, state_root_hash: &str) -> Result<Option<Value>> {
2642 let block_id = block_id.trim();
2643 if !block_id.is_empty() {
2644 if block_id.len() == Digest::LENGTH * 2 {
2645 Digest::from_hex(block_id)
2646 .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2647 return Ok(Some(json!({
2648 "BlockHash": block_id,
2649 })));
2650 }
2651
2652 let height = block_id
2653 .parse::<u64>()
2654 .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2655 return Ok(Some(json!({
2656 "BlockHeight": height,
2657 })));
2658 }
2659
2660 let state_root_hash = state_root_hash.trim();
2661 if state_root_hash.is_empty() {
2662 return Ok(None);
2663 }
2664 Digest::from_hex(state_root_hash)
2665 .map_err(|err| anyhow!("invalid state_root_hash digest: {}", err))?;
2666 Ok(Some(json!({
2667 "StateRootHash": state_root_hash,
2668 })))
2669}
2670
2671fn parse_block_identifier_value(block_id: &str) -> Result<Value> {
2672 let block_id = block_id.trim();
2673 if block_id.is_empty() {
2674 return Err(anyhow!("block identifier must not be empty"));
2675 }
2676 if block_id.len() == Digest::LENGTH * 2 {
2677 Digest::from_hex(block_id)
2678 .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2679 Ok(json!({
2680 "Hash": block_id,
2681 }))
2682 } else {
2683 let height = block_id
2684 .parse::<u64>()
2685 .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2686 Ok(json!({
2687 "Height": height,
2688 }))
2689 }
2690}
2691
2692fn parse_query_key(key: &str) -> Result<String> {
2693 let key = key.trim();
2694 if key.is_empty() {
2695 return Err(anyhow!("key must not be empty"));
2696 }
2697 if let Ok(contract_hash) = ContractHash::from_formatted_str(key) {
2698 return Ok(Key::Hash(contract_hash.value()).to_formatted_string());
2699 }
2700 if let Ok(parsed) = Key::from_formatted_str(key) {
2701 return Ok(parsed.to_formatted_string());
2702 }
2703 if let Ok(public_key) = PublicKey::from_hex(key) {
2704 return Ok(Key::Account(public_key.to_account_hash()).to_formatted_string());
2705 }
2706 Err(anyhow!(
2707 "failed to parse key for query; expected formatted Key or public key hex"
2708 ))
2709}
2710
2711fn parse_contract_hash_for_invocation(input: &str) -> Result<AddressableEntityHash> {
2712 let input = input.trim();
2713 if input.is_empty() {
2714 return Err(anyhow!("transaction_contract_hash must not be empty"));
2715 }
2716
2717 if let Ok(contract_hash) = ContractHash::from_formatted_str(input) {
2718 return Ok(contract_hash.into());
2719 }
2720
2721 let digest_hex = input
2722 .strip_prefix("hash-")
2723 .or_else(|| input.strip_prefix("contract-hash-"))
2724 .unwrap_or(input);
2725 let bytes = hex_to_bytes(digest_hex)?;
2726 if bytes.len() != Digest::LENGTH {
2727 return Err(anyhow!(
2728 "transaction_contract_hash must be {} bytes",
2729 Digest::LENGTH
2730 ));
2731 }
2732 let mut hash = [0u8; Digest::LENGTH];
2733 hash.copy_from_slice(&bytes);
2734 Ok(ContractHash::new(hash).into())
2735}
2736
2737fn parse_purse_identifier(input: &str) -> Result<Value> {
2738 let input = input.trim();
2739 if input.is_empty() {
2740 return Err(anyhow!("purse_identifier must not be empty"));
2741 }
2742
2743 if input.starts_with("account-hash-") {
2744 casper_types::account::AccountHash::from_formatted_str(input)
2745 .map_err(|err| anyhow!("invalid account hash purse identifier: {}", err))?;
2746 return Ok(json!({
2747 "main_purse_under_account_hash": input,
2748 }));
2749 }
2750
2751 if input.starts_with("entity-") {
2752 return Ok(json!({
2753 "main_purse_under_entity_addr": input,
2754 }));
2755 }
2756
2757 if input.starts_with("uref-") {
2758 URef::from_formatted_str(input)
2759 .map_err(|err| anyhow!("invalid uref purse identifier: {}", err))?;
2760 return Ok(json!({
2761 "purse_uref": input,
2762 }));
2763 }
2764
2765 let public_key = PublicKey::from_hex(input)
2766 .map_err(|err| anyhow!("invalid public key purse identifier: {}", err))?;
2767 Ok(json!({
2768 "main_purse_under_public_key": public_key.to_hex_string(),
2769 }))
2770}
2771
2772fn build_pricing_mode(gas_price_tolerance: Option<u8>, payment_amount: Option<u64>) -> PricingMode {
2773 if let Some(payment_amount) = payment_amount {
2774 PricingMode::PaymentLimited {
2775 payment_amount,
2776 gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2777 standard_payment: true,
2778 }
2779 } else {
2780 PricingMode::Fixed {
2781 gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2782 additional_computation_factor: 0,
2783 }
2784 }
2785}
2786
2787fn parse_optional_seed_hex(seed: Option<&str>) -> Result<Option<[u8; 32]>> {
2788 let Some(seed) = seed else {
2789 return Ok(None);
2790 };
2791 let seed = seed.trim();
2792 if seed.is_empty() {
2793 return Ok(None);
2794 }
2795
2796 let cleaned = seed.strip_prefix("0x").unwrap_or(seed);
2797 let bytes = hex_to_bytes(cleaned)?;
2798 if bytes.len() != 32 {
2799 return Err(anyhow!("runtime_seed_hex must be exactly 32 bytes"));
2800 }
2801
2802 let mut out = [0u8; 32];
2803 out.copy_from_slice(&bytes);
2804 Ok(Some(out))
2805}
2806
2807fn normalize_optional_identifier(value: Option<&str>) -> String {
2808 value.map(str::trim).unwrap_or_default().to_string()
2809}
2810
2811async fn resolve_global_state_identifier(
2812 network_name: &str,
2813 node_id: u32,
2814 block_id: Option<&str>,
2815 state_root_hash: Option<&str>,
2816) -> Result<(String, String)> {
2817 let block_id = normalize_optional_identifier(block_id);
2818 let state_root_hash = normalize_optional_identifier(state_root_hash);
2819
2820 if !block_id.is_empty() || !state_root_hash.is_empty() {
2821 return Ok((block_id, state_root_hash));
2822 }
2823
2824 let rpc = mcp_rpc_client(network_name, node_id)?;
2825 let response = rpc.get_block().await?;
2826 let latest_block = response
2827 .block_with_signatures
2828 .ok_or_else(|| anyhow!("latest block was not returned by chain_get_block"))?;
2829
2830 Ok((latest_block.block.hash().to_hex_string(), String::new()))
2831}
2832
2833fn hex_to_bytes(input: &str) -> Result<Vec<u8>> {
2834 if !input.len().is_multiple_of(2) {
2835 return Err(anyhow!("hex string must have even length"));
2836 }
2837 let mut out = Vec::with_capacity(input.len() / 2);
2838 let mut chars = input.chars();
2839 while let (Some(hi), Some(lo)) = (chars.next(), chars.next()) {
2840 let byte = ((hex_nibble(hi)? as u8) << 4) | (hex_nibble(lo)? as u8);
2841 out.push(byte);
2842 }
2843 Ok(out)
2844}
2845
2846fn hex_nibble(ch: char) -> Result<u32> {
2847 ch.to_digit(16)
2848 .ok_or_else(|| anyhow!("invalid hex character '{}'", ch))
2849}
2850
2851async fn ensure_running_network(
2852 network: &Arc<ManagedNetwork>,
2853) -> std::result::Result<(), ErrorData> {
2854 if network.is_running().await {
2855 Ok(())
2856 } else {
2857 Err(ErrorData::resource_not_found(
2858 "network is not running; call spawn_network then wait_network_ready",
2859 None,
2860 ))
2861 }
2862}
2863
2864async fn check_rest_ready(network: &Arc<ManagedNetwork>) -> Result<HashMap<u32, Value>> {
2865 let node_ids = {
2866 let state = network.state.lock().await;
2867 state
2868 .processes
2869 .iter()
2870 .filter_map(|process| {
2871 if matches!(process.kind, ProcessKind::Node) {
2872 Some(process.node_id)
2873 } else {
2874 None
2875 }
2876 })
2877 .collect::<Vec<_>>()
2878 };
2879
2880 if node_ids.is_empty() {
2881 return Err(anyhow!("network has no node processes"));
2882 }
2883
2884 let mut by_node = HashMap::new();
2885 for node_id in node_ids {
2886 let value = fetch_rest_status(node_id).await?;
2887 let probe: RestStatusProbe = serde_json::from_value(value.clone())
2888 .with_context(|| format!("invalid /status payload for node {}", node_id))?;
2889 if probe.reactor_state.as_deref() != Some("Validate") {
2890 return Err(anyhow!("node {} reactor is not Validate", node_id));
2891 }
2892 by_node.insert(node_id, value);
2893 }
2894
2895 Ok(by_node)
2896}
2897
2898fn rest_has_block(value: &Value) -> bool {
2899 value
2900 .get("last_added_block_info")
2901 .and_then(|entry| entry.get("height"))
2902 .and_then(Value::as_u64)
2903 .is_some()
2904}
2905
2906async fn fetch_rest_status(node_id: u32) -> Result<Value> {
2907 let url = format!("{}/status", assets::rest_endpoint(node_id));
2908 let client = reqwest::Client::builder()
2909 .no_proxy()
2910 .timeout(Duration::from_secs(4))
2911 .build()?;
2912 let response = client.get(url).send().await?.error_for_status()?;
2913 Ok(response.json::<Value>().await?)
2914}
2915
2916async fn claim_block_hook(network: &Arc<ManagedNetwork>, block_hash: &str) -> bool {
2917 let mut last_block_hook_hash = network.last_block_hook_hash.lock().await;
2918 if last_block_hook_hash.as_deref() == Some(block_hash) {
2919 return false;
2920 }
2921 *last_block_hook_hash = Some(block_hash.to_string());
2922 true
2923}
2924
2925async fn run_sse_listener(network: Arc<ManagedNetwork>, node_id: u32, endpoint: String) {
2926 let mut backoff = ExponentialBackoff::default();
2927 let mut connection_version: Option<String> = None;
2928
2929 loop {
2930 if network.shutdown.load(Ordering::SeqCst) {
2931 return;
2932 }
2933
2934 let config = match ListenerConfig::builder()
2935 .with_endpoint(endpoint.clone())
2936 .build()
2937 {
2938 Ok(config) => config,
2939 Err(_) => {
2940 if !sleep_backoff(&mut backoff).await {
2941 return;
2942 }
2943 continue;
2944 }
2945 };
2946
2947 let stream = match sse::listener(config).await {
2948 Ok(stream) => {
2949 backoff.reset();
2950 stream
2951 }
2952 Err(_) => {
2953 if !sleep_backoff(&mut backoff).await {
2954 return;
2955 }
2956 continue;
2957 }
2958 };
2959
2960 futures::pin_mut!(stream);
2961 let mut failed = false;
2962
2963 while let Some(event) = stream.next().await {
2964 if network.shutdown.load(Ordering::SeqCst) {
2965 return;
2966 }
2967
2968 match event {
2969 Ok(event) => {
2970 if let SseEvent::ApiVersion(version) = &event {
2971 connection_version = Some(version.to_string());
2972 }
2973 if let SseEvent::BlockAdded { block_hash, block } = &event {
2974 let _ = record_last_block_height(&network.state, block.height()).await;
2975 assets::spawn_pending_post_genesis_hook(network.layout.clone());
2976 if let Some(protocol_version) = connection_version.as_deref()
2977 && claim_block_hook(&network, &block_hash.to_string()).await
2978 {
2979 assets::spawn_block_added_hook(
2980 network.layout.clone(),
2981 protocol_version.to_string(),
2982 json!({
2983 "block_hash": block_hash.to_string(),
2984 "height": block.height(),
2985 "era_id": block.era_id().value(),
2986 }),
2987 );
2988 }
2989 }
2990 network.sse_store.push(node_id, event).await;
2991 }
2992 Err(_) => {
2993 failed = true;
2994 break;
2995 }
2996 }
2997 }
2998
2999 connection_version = None;
3000 if failed && !sleep_backoff(&mut backoff).await {
3001 return;
3002 }
3003 }
3004}
3005
3006async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
3007 let mut state = state.lock().await;
3008 if state.last_block_height == Some(height) {
3009 return Ok(());
3010 }
3011 state.last_block_height = Some(height);
3012 state.touch().await
3013}
3014
3015async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
3016 if let Some(delay) = backoff.next_backoff() {
3017 tokio::time::sleep(delay).await;
3018 true
3019 } else {
3020 false
3021 }
3022}
3023
3024fn sse_event_type(event: &SseEvent) -> &'static str {
3025 match event {
3026 SseEvent::ApiVersion(_) => "ApiVersion",
3027 SseEvent::DeployAccepted(_) => "DeployAccepted",
3028 SseEvent::BlockAdded { .. } => "BlockAdded",
3029 SseEvent::DeployProcessed(_) => "DeployProcessed",
3030 SseEvent::DeployExpired(_) => "DeployExpired",
3031 SseEvent::TransactionAccepted(_) => "TransactionAccepted",
3032 SseEvent::TransactionProcessed { .. } => "TransactionProcessed",
3033 SseEvent::TransactionExpired { .. } => "TransactionExpired",
3034 SseEvent::Fault { .. } => "Fault",
3035 SseEvent::FinalitySignature(_) => "FinalitySignature",
3036 SseEvent::Step { .. } => "Step",
3037 SseEvent::Shutdown => "Shutdown",
3038 }
3039}
3040
3041fn process_kind_name(kind: &ProcessKind) -> &'static str {
3042 match kind {
3043 ProcessKind::Node => "node",
3044 ProcessKind::Sidecar => "sidecar",
3045 }
3046}
3047
3048fn process_status_name(status: &ProcessStatus) -> &'static str {
3049 match status {
3050 ProcessStatus::Running => "running",
3051 ProcessStatus::Stopped => "stopped",
3052 ProcessStatus::Exited => "exited",
3053 ProcessStatus::Unknown => "unknown",
3054 ProcessStatus::Skipped => "skipped",
3055 }
3056}
3057
3058fn sse_event_payload(event: &SseEvent) -> Value {
3059 match event {
3060 SseEvent::ApiVersion(version) => json!({ "api_version": version.to_string() }),
3061 SseEvent::DeployAccepted(payload) => payload.clone(),
3062 SseEvent::BlockAdded { block_hash, block } => json!({
3063 "block_hash": block_hash.to_string(),
3064 "height": block.height(),
3065 "era_id": block.era_id().value(),
3066 }),
3067 SseEvent::DeployProcessed(payload) => payload.clone(),
3068 SseEvent::DeployExpired(payload) => payload.clone(),
3069 SseEvent::TransactionAccepted(transaction) => {
3070 json!({ "transaction_hash": transaction.hash().to_hex_string() })
3071 }
3072 SseEvent::TransactionProcessed {
3073 transaction_hash,
3074 execution_result,
3075 messages,
3076 ..
3077 } => json!({
3078 "transaction_hash": transaction_hash.to_hex_string(),
3079 "execution_result": execution_result,
3080 "messages": messages,
3081 }),
3082 SseEvent::TransactionExpired { transaction_hash } => json!({
3083 "transaction_hash": transaction_hash.to_hex_string(),
3084 }),
3085 SseEvent::Fault {
3086 era_id,
3087 public_key,
3088 timestamp,
3089 } => json!({
3090 "era_id": era_id.value(),
3091 "public_key": public_key.to_hex(),
3092 "timestamp": timestamp,
3093 }),
3094 SseEvent::FinalitySignature(signature) => json!({
3095 "block_hash": signature.block_hash().to_string(),
3096 "era_id": signature.era_id().value(),
3097 "signature": signature.signature().to_hex(),
3098 }),
3099 SseEvent::Step {
3100 era_id,
3101 execution_effects,
3102 } => json!({
3103 "era_id": era_id.value(),
3104 "execution_effects": execution_effects.get(),
3105 }),
3106 SseEvent::Shutdown => json!({}),
3107 }
3108}
3109
3110fn timestamp_prefix() -> String {
3111 time::OffsetDateTime::now_utc()
3112 .format(&time::format_description::well_known::Rfc3339)
3113 .unwrap_or_else(|_| "unknown-time".to_string())
3114}
3115
3116fn processes_running(state: &State) -> bool {
3117 if state.processes.is_empty() {
3118 return false;
3119 }
3120
3121 state.processes.iter().all(|process| {
3122 matches!(process.last_status, ProcessStatus::Running)
3123 && process.current_pid().is_some_and(is_pid_running)
3124 })
3125}
3126
3127fn running_node_ids(state: &State) -> Vec<u32> {
3128 let mut node_ids = std::collections::BTreeSet::new();
3129 for process in &state.processes {
3130 if !matches!(process.kind, ProcessKind::Node) {
3131 continue;
3132 }
3133 if !matches!(process.last_status, ProcessStatus::Running) {
3134 continue;
3135 }
3136 let Some(pid) = process.current_pid() else {
3137 continue;
3138 };
3139 if !is_pid_running(pid) {
3140 continue;
3141 }
3142 node_ids.insert(process.node_id);
3143 }
3144 node_ids.into_iter().collect()
3145}
3146
3147fn is_pid_running(pid: u32) -> bool {
3148 let pid = Pid::from_raw(pid as i32);
3149 match kill(pid, None) {
3150 Ok(()) => true,
3151 Err(Errno::ESRCH) => false,
3152 Err(_) => true,
3153 }
3154}
3155
3156async fn discover_network_names(assets_root: &Path) -> Result<Vec<String>> {
3157 if !is_dir(assets_root).await {
3158 return Ok(Vec::new());
3159 }
3160
3161 let mut names = Vec::new();
3162 let mut entries = tokio_fs::read_dir(assets_root).await?;
3163 while let Some(entry) = entries.next_entry().await? {
3164 if !entry.file_type().await?.is_dir() {
3165 continue;
3166 }
3167 let name = entry.file_name().to_string_lossy().to_string();
3168 if !name.is_empty() {
3169 names.push(name);
3170 }
3171 }
3172
3173 names.sort();
3174 Ok(names)
3175}
3176
3177async fn ensure_sidecar_available(layout: &AssetsLayout, node_count: u32) -> Result<()> {
3178 for node_id in 1..=node_count {
3179 let version_dir = layout.latest_protocol_version_dir(node_id).await?;
3180 let sidecar_bin = layout
3181 .node_bin_dir(node_id)
3182 .join(&version_dir)
3183 .join("casper-sidecar");
3184 let sidecar_cfg = layout
3185 .node_config_root(node_id)
3186 .join(&version_dir)
3187 .join("sidecar.toml");
3188
3189 if !is_file(&sidecar_bin).await {
3190 return Err(anyhow!(
3191 "missing sidecar binary for node {}: {}",
3192 node_id,
3193 sidecar_bin.display()
3194 ));
3195 }
3196
3197 if !is_file(&sidecar_cfg).await {
3198 return Err(anyhow!(
3199 "missing sidecar.toml for node {}: {}",
3200 node_id,
3201 sidecar_cfg.display()
3202 ));
3203 }
3204 }
3205
3206 Ok(())
3207}
3208
3209async fn latest_layout_protocol_version(layout: &AssetsLayout) -> Result<String> {
3210 Ok(layout
3211 .latest_protocol_version_dir(1)
3212 .await?
3213 .replace('_', "."))
3214}
3215
3216fn stage_asset_selector(
3217 asset: Option<&str>,
3218 custom_asset: Option<&str>,
3219 legacy_asset_name: Option<&str>,
3220) -> Result<AssetSelector> {
3221 let custom_asset = match (custom_asset, legacy_asset_name) {
3222 (Some(_), Some(_)) => {
3223 return Err(anyhow!(
3224 "custom_asset and asset_name are mutually exclusive"
3225 ));
3226 }
3227 (Some(custom_asset), None) | (None, Some(custom_asset)) => Some(custom_asset),
3228 (None, None) => None,
3229 };
3230 assets::required_asset_selector(asset, custom_asset)
3231}
3232
3233async fn parse_derived_accounts_csv(
3234 csv: &str,
3235 seed: Option<Arc<str>>,
3236) -> Result<Vec<DerivedAccountRow>> {
3237 let mut lines = csv.lines();
3238 let header = lines
3239 .next()
3240 .ok_or_else(|| anyhow!("derived accounts csv is empty"))?;
3241 if header.trim() != "kind,name,key_type,derivation,path,account_hash,balance" {
3242 return Err(anyhow!("unexpected derived-accounts.csv header"));
3243 }
3244
3245 let mut rows = Vec::new();
3246 for line in lines {
3247 let line = line.trim();
3248 if line.is_empty() {
3249 continue;
3250 }
3251
3252 let parts = line.splitn(7, ',').collect::<Vec<_>>();
3253 if parts.len() != 7 {
3254 return Err(anyhow!("invalid derived account row: {}", line));
3255 }
3256
3257 let path = parts[4].to_string();
3258 let public_key = if let Some(seed) = &seed {
3259 match assets::derive_account_from_seed_path(Arc::clone(seed), &path).await {
3260 Ok(material) => Some(material.public_key_hex),
3261 Err(_) => None,
3262 }
3263 } else {
3264 None
3265 };
3266
3267 rows.push(DerivedAccountRow {
3268 kind: parts[0].to_string(),
3269 name: parts[1].to_string(),
3270 key_type: parts[2].to_string(),
3271 derivation: parts[3].to_string(),
3272 path,
3273 account_hash: parts[5].to_string(),
3274 balance: parts[6].to_string(),
3275 public_key,
3276 });
3277 }
3278
3279 Ok(rows)
3280}
3281
3282async fn verify_path_hash_consistency(
3283 layout: &AssetsLayout,
3284 path: &str,
3285 expected_account_hash: &str,
3286) -> Result<()> {
3287 let Some(csv) = assets::derived_accounts_summary(layout).await else {
3288 return Ok(());
3289 };
3290
3291 for line in csv.lines().skip(1) {
3292 let line = line.trim();
3293 if line.is_empty() {
3294 continue;
3295 }
3296 let parts = line.splitn(7, ',').collect::<Vec<_>>();
3297 if parts.len() != 7 {
3298 continue;
3299 }
3300 if parts[4] == path {
3301 if parts[5] != expected_account_hash {
3302 return Err(anyhow!(
3303 "derived account hash mismatch for path {}: csv={} derived={}",
3304 path,
3305 parts[5],
3306 expected_account_hash
3307 ));
3308 }
3309 return Ok(());
3310 }
3311 }
3312
3313 Ok(())
3314}
3315
3316async fn fetch_chain_name(network_name: &str, node_id: u32) -> Result<String> {
3317 let rpc = mcp_rpc_client(network_name, node_id)?;
3318 rpc.get_network_name().await.map_err(Into::into)
3319}
3320
3321fn extract_block_height(value: &Value) -> Option<u64> {
3322 value
3323 .pointer("/block_with_signatures/block/header/height")
3324 .and_then(Value::as_u64)
3325 .or_else(|| {
3326 value
3327 .pointer("/block/block/header/height")
3328 .and_then(Value::as_u64)
3329 })
3330 .or_else(|| {
3331 value
3332 .pointer("/block_with_signatures/Version2/block/Version2/header/height")
3333 .and_then(Value::as_u64)
3334 })
3335 .or_else(|| {
3336 value
3337 .pointer("/block_with_signatures/Version1/block/Version1/header/height")
3338 .and_then(Value::as_u64)
3339 })
3340 .or_else(|| find_first_height(value))
3341}
3342
3343fn find_first_height(value: &Value) -> Option<u64> {
3344 match value {
3345 Value::Object(map) => {
3346 if let Some(height) = map.get("height").and_then(Value::as_u64) {
3347 return Some(height);
3348 }
3349 for nested in map.values() {
3350 if let Some(height) = find_first_height(nested) {
3351 return Some(height);
3352 }
3353 }
3354 None
3355 }
3356 Value::Array(items) => {
3357 for item in items {
3358 if let Some(height) = find_first_height(item) {
3359 return Some(height);
3360 }
3361 }
3362 None
3363 }
3364 _ => None,
3365 }
3366}
3367
3368fn parse_no_such_block_range_from_error(error_text: &str) -> Option<(u64, u64)> {
3369 let start = error_text.find('{')?;
3370 let payload = &error_text[start..];
3371 let value: Value = serde_json::from_str(payload).ok()?;
3372
3373 let code = value.get("code").and_then(Value::as_i64)?;
3374 let message = value.get("message").and_then(Value::as_str)?;
3375 if code != -32001 || !message.eq_ignore_ascii_case("No such block") {
3376 return None;
3377 }
3378
3379 let low = value
3380 .pointer("/data/available_block_range/low")
3381 .and_then(Value::as_u64)
3382 .unwrap_or(0);
3383 let high = value
3384 .pointer("/data/available_block_range/high")
3385 .and_then(Value::as_u64)
3386 .unwrap_or(low);
3387 Some((low, high))
3388}
3389
3390async fn read_log_page(path: &Path, before_line: Option<usize>, limit: usize) -> Result<LogPage> {
3391 let contents = tokio_fs::read_to_string(path)
3392 .await
3393 .with_context(|| format!("failed to read log file {}", path.display()))?;
3394
3395 let all_lines = contents
3396 .lines()
3397 .map(ToString::to_string)
3398 .collect::<Vec<_>>();
3399 let total_lines = all_lines.len();
3400
3401 let before = before_line.unwrap_or(total_lines + 1);
3402 if before == 0 {
3403 return Err(anyhow!("before_line must be >= 1"));
3404 }
3405
3406 let end_exclusive = before.saturating_sub(1).min(total_lines);
3407 let start = end_exclusive.saturating_sub(limit);
3408
3409 let mut lines = Vec::new();
3410 for (idx, content) in all_lines[start..end_exclusive].iter().enumerate() {
3411 lines.push(LogLine {
3412 line_number: start + idx + 1,
3413 content: content.clone(),
3414 });
3415 }
3416
3417 let next_before_line = if start == 0 { None } else { Some(start + 1) };
3418
3419 Ok(LogPage {
3420 path: path.display().to_string(),
3421 total_lines,
3422 returned: lines.len(),
3423 next_before_line,
3424 lines,
3425 })
3426}
3427
3428async fn is_dir(path: &Path) -> bool {
3429 tokio_fs::metadata(path)
3430 .await
3431 .map(|meta| meta.is_dir())
3432 .unwrap_or(false)
3433}
3434
3435async fn is_file(path: &Path) -> bool {
3436 tokio_fs::metadata(path)
3437 .await
3438 .map(|meta| meta.is_file())
3439 .unwrap_or(false)
3440}
3441
3442pub async fn run(args: McpArgs) -> Result<()> {
3443 let assets_root = match args.net_path {
3444 Some(path) => path,
3445 None => assets::default_assets_root()?,
3446 };
3447
3448 let manager = Arc::new(NetworkManager::new(assets_root).await?);
3449
3450 let result = match args.transport {
3451 McpTransport::Stdio => run_stdio(manager.clone()).await,
3452 McpTransport::Http => run_http(manager.clone(), &args.http_bind, &args.http_path).await,
3453 McpTransport::Both => run_both(manager.clone(), &args.http_bind, &args.http_path).await,
3454 };
3455
3456 let stop_result = manager.stop_all_networks().await;
3457
3458 match (result, stop_result) {
3459 (Ok(()), Ok(())) => Ok(()),
3460 (Err(err), Ok(())) => Err(err),
3461 (Ok(()), Err(stop_err)) => Err(stop_err),
3462 (Err(run_err), Err(stop_err)) => Err(anyhow!(
3463 "mcp server failed: {run_err}; additionally failed to stop networks: {stop_err}"
3464 )),
3465 }
3466}
3467
3468async fn run_stdio(manager: Arc<NetworkManager>) -> Result<()> {
3469 let service = McpServer::new(manager).serve(mcp_stdio()).await?;
3470 service.waiting().await?;
3471 Ok(())
3472}
3473
3474async fn run_http(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3475 let path = normalize_http_path(path);
3476 let socket = std::net::SocketAddr::from_str(bind)
3477 .with_context(|| format!("invalid http bind address '{}'", bind))?;
3478
3479 let service: StreamableHttpService<McpServer, LocalSessionManager> = StreamableHttpService::new(
3480 {
3481 let manager = manager.clone();
3482 move || Ok(McpServer::new(manager.clone()))
3483 },
3484 Arc::new(LocalSessionManager::default()),
3485 StreamableHttpServerConfig::default(),
3486 );
3487
3488 let router = axum::Router::new().nest_service(&path, service);
3489 let listener = tokio::net::TcpListener::bind(socket).await?;
3490
3491 axum::serve(listener, router)
3492 .with_graceful_shutdown(async {
3493 let _ = tokio::signal::ctrl_c().await;
3494 })
3495 .await?;
3496
3497 Ok(())
3498}
3499
3500async fn run_both(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3501 let path = normalize_http_path(path);
3502 let socket = std::net::SocketAddr::from_str(bind)
3503 .with_context(|| format!("invalid http bind address '{}'", bind))?;
3504
3505 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
3506
3507 let mut http_task = {
3508 let manager = manager.clone();
3509 tokio::spawn(async move {
3510 let service: StreamableHttpService<McpServer, LocalSessionManager> =
3511 StreamableHttpService::new(
3512 {
3513 let manager = manager.clone();
3514 move || Ok(McpServer::new(manager.clone()))
3515 },
3516 Arc::new(LocalSessionManager::default()),
3517 StreamableHttpServerConfig::default(),
3518 );
3519
3520 let router = axum::Router::new().nest_service(&path, service);
3521 let listener = tokio::net::TcpListener::bind(socket).await?;
3522
3523 axum::serve(listener, router)
3524 .with_graceful_shutdown(async move {
3525 tokio::select! {
3526 _ = async {
3527 loop {
3528 if *shutdown_rx.borrow() {
3529 break;
3530 }
3531 if shutdown_rx.changed().await.is_err() {
3532 break;
3533 }
3534 }
3535 } => {}
3536 _ = tokio::signal::ctrl_c() => {}
3537 }
3538 })
3539 .await
3540 .map_err(anyhow::Error::from)
3541 })
3542 };
3543
3544 let mut stdio_task = tokio::spawn(async move { run_stdio(manager).await });
3545
3546 let result = tokio::select! {
3547 res = &mut stdio_task => match res {
3548 Ok(inner) => inner,
3549 Err(join_err) => Err(anyhow!("stdio task failed: {}", join_err)),
3550 },
3551 res = &mut http_task => match res {
3552 Ok(inner) => inner,
3553 Err(join_err) => Err(anyhow!("http task failed: {}", join_err)),
3554 },
3555 };
3556
3557 let _ = shutdown_tx.send(true);
3558 let _ = tokio::time::timeout(Duration::from_secs(2), &mut http_task).await;
3559 let _ = tokio::time::timeout(Duration::from_secs(2), &mut stdio_task).await;
3560
3561 result
3562}
3563
3564fn normalize_http_path(path: &str) -> String {
3565 if path.starts_with('/') {
3566 path.to_string()
3567 } else {
3568 format!("/{}", path)
3569 }
3570}
3571
3572#[cfg(test)]
3573mod tests {
3574 use super::*;
3575
3576 #[tokio::test]
3577 async fn parse_derived_accounts_rows() {
3578 let csv = "kind,name,key_type,derivation,path,account_hash,balance\nvalidator,node-1,secp256k1,bip32,m/44'/506'/0'/0/0,account-hash-a,100\nuser,user-1,secp256k1,bip32,m/44'/506'/0'/0/100,account-hash-b,200";
3579 let rows = parse_derived_accounts_csv(csv, None).await.unwrap();
3580 assert_eq!(rows.len(), 2);
3581 assert_eq!(rows[0].path, "m/44'/506'/0'/0/0");
3582 assert_eq!(rows[1].account_hash, "account-hash-b");
3583 }
3584
3585 #[test]
3586 fn sse_filter_include_exclude() {
3587 let filter = SseFilter {
3588 include_event_types: vec!["BlockAdded".to_string()],
3589 exclude_event_types: vec!["TransactionAccepted".to_string()],
3590 };
3591
3592 let block = SseRecord {
3593 sequence: 1,
3594 timestamp_rfc3339: "t".to_string(),
3595 node_id: 1,
3596 event_type: "BlockAdded".to_string(),
3597 payload: json!({}),
3598 };
3599 let tx = SseRecord {
3600 sequence: 2,
3601 timestamp_rfc3339: "t".to_string(),
3602 node_id: 1,
3603 event_type: "TransactionAccepted".to_string(),
3604 payload: json!({}),
3605 };
3606
3607 assert!(filter.matches(&block));
3608 assert!(!filter.matches(&tx));
3609 }
3610
3611 #[tokio::test]
3612 async fn sse_history_paginates_by_sequence() {
3613 let store = SseStore::new(100);
3614 for sequence in 1..=5 {
3615 let mut guard = store.events.lock().await;
3616 guard.push_back(SseRecord {
3617 sequence,
3618 timestamp_rfc3339: "t".to_string(),
3619 node_id: 1,
3620 event_type: "BlockAdded".to_string(),
3621 payload: json!({ "sequence": sequence }),
3622 });
3623 drop(guard);
3624 }
3625 let filter = SseFilter::default();
3626 let page = store.history(Some(6), 2, &filter).await;
3627 assert_eq!(page.returned, 2);
3628 assert_eq!(page.events[0].sequence, 4);
3629 assert_eq!(page.events[1].sequence, 5);
3630 assert_eq!(page.next_before_sequence, Some(4));
3631 }
3632
3633 #[tokio::test]
3634 async fn read_log_page_uses_before_cursor() {
3635 let temp = tempfile::NamedTempFile::new().unwrap();
3636 tokio_fs::write(temp.path(), "a\nb\nc\nd\n").await.unwrap();
3637
3638 let page = read_log_page(temp.path(), Some(5), 2).await.unwrap();
3639 assert_eq!(page.lines.len(), 2);
3640 assert_eq!(page.lines[0].line_number, 3);
3641 assert_eq!(page.lines[1].line_number, 4);
3642 assert_eq!(page.next_before_line, Some(3));
3643 }
3644
3645 #[tokio::test]
3646 async fn sidecar_preflight_fails_when_missing() {
3647 let root = tempfile::tempdir().unwrap();
3648 let layout = AssetsLayout::new(root.path().to_path_buf(), "test-net".to_string());
3649 let node_bin = layout.node_bin_dir(1).join("2_0_0");
3650 let node_cfg = layout.node_config_root(1).join("2_0_0");
3651 tokio_fs::create_dir_all(&node_bin).await.unwrap();
3652 tokio_fs::create_dir_all(&node_cfg).await.unwrap();
3653 tokio_fs::write(node_bin.join("casper-node"), "bin")
3654 .await
3655 .unwrap();
3656 tokio_fs::write(node_cfg.join("sidecar.toml"), "cfg")
3657 .await
3658 .unwrap();
3659
3660 let result = ensure_sidecar_available(&layout, 1).await;
3661 assert!(result.is_err());
3662 }
3663
3664 #[test]
3665 fn normalize_optional_identifier_trims_and_defaults() {
3666 assert_eq!(normalize_optional_identifier(None), "");
3667 assert_eq!(normalize_optional_identifier(Some(" ")), "");
3668 assert_eq!(normalize_optional_identifier(Some(" 123 ")), "123");
3669 }
3670
3671 #[test]
3672 fn parse_state_identifier_variants() {
3673 assert_eq!(
3674 parse_state_identifier("42", "").unwrap(),
3675 Some(json!({ "BlockHeight": 42u64 }))
3676 );
3677 assert_eq!(
3678 parse_state_identifier(
3679 "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234",
3680 ""
3681 )
3682 .unwrap(),
3683 Some(json!({
3684 "BlockHash": "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234"
3685 }))
3686 );
3687 assert_eq!(parse_state_identifier("", "").unwrap(), None,);
3688 }
3689
3690 #[test]
3691 fn parse_contract_hash_for_invocation_accepts_common_formats() {
3692 let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3693 let contract = parse_contract_hash_for_invocation(&format!("contract-{}", hash)).unwrap();
3694 let key_hash = parse_contract_hash_for_invocation(&format!("hash-{}", hash)).unwrap();
3695 let raw = parse_contract_hash_for_invocation(hash).unwrap();
3696 assert_eq!(contract.to_hex_string(), hash);
3697 assert_eq!(key_hash.to_hex_string(), hash);
3698 assert_eq!(raw.to_hex_string(), hash);
3699 }
3700
3701 #[test]
3702 fn parse_query_key_accepts_contract_hash_format() {
3703 let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3704 let key = parse_query_key(&format!("contract-{}", hash)).unwrap();
3705 assert_eq!(key, format!("hash-{}", hash));
3706 }
3707
3708 #[test]
3709 fn parse_transaction_json_rejects_escaped_string_payload() {
3710 let tx_json = json!({
3711 "Version1": {
3712 "hash": "7eeb092361e31b4cc9885e3621f1470f29631338ecc703643c22da1d38fd81a9",
3713 "payload": {
3714 "initiator_addr": {
3715 "PublicKey": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f"
3716 },
3717 "timestamp": "2026-02-27T18:03:18.541Z",
3718 "ttl": "30m",
3719 "chain_name": "casper-devnet",
3720 "pricing_mode": {
3721 "PaymentLimited": {
3722 "payment_amount": 100000000000u64,
3723 "gas_price_tolerance": 5,
3724 "standard_payment": true
3725 }
3726 },
3727 "fields": {
3728 "args": {"Named": []},
3729 "entry_point": {"Custom": "counter_inc"},
3730 "scheduling": "Standard",
3731 "target": {
3732 "Stored": {
3733 "id": {
3734 "ByPackageName": {
3735 "name": "counter_package_name",
3736 "version": null
3737 }
3738 },
3739 "runtime": "VmCasperV1"
3740 }
3741 }
3742 }
3743 },
3744 "approvals": [{
3745 "signer": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f",
3746 "signature": "02c64336e5ed2832bdb84adb3f334d585548ee096066aa9d0797c11ab3f074ec9d7bd396994bc9b9c239342be801bc385a9c5083779bace4dfe0b400d4a13c07db"
3747 }]
3748 }
3749 });
3750 let direct = parse_transaction_json(tx_json.clone()).unwrap();
3751 let wrapped = parse_transaction_json(json!({ "transaction": tx_json })).unwrap();
3752 let encoded = serde_json::to_string(&direct).unwrap();
3753 let from_string_err = parse_transaction_json(Value::String(encoded.clone()));
3754 let wrapped_string_err = parse_transaction_json(json!({
3755 "transaction": encoded
3756 }));
3757
3758 assert_eq!(
3759 direct.hash().to_hex_string(),
3760 wrapped.hash().to_hex_string()
3761 );
3762 assert!(from_string_err.is_err());
3763 assert!(wrapped_string_err.is_err());
3764 }
3765
3766 #[test]
3767 fn parse_no_such_block_range_from_error_extracts_bounds() {
3768 let error = "casper client error: response for rpc-id 1 chain_get_block is json-rpc error: {\"code\":-32001,\"message\":\"No such block\",\"data\":{\"message\":\"no block found for the provided identifier\",\"available_block_range\":{\"low\":0,\"high\":0}}}";
3769 let range = parse_no_such_block_range_from_error(error).unwrap();
3770 assert_eq!(range, (0, 0));
3771 }
3772
3773 #[test]
3774 fn send_transaction_signed_request_requires_transaction_field() {
3775 let payload = json!({
3776 "network_name": "casper-devnet",
3777 "node_id": 1,
3778 "signer_path": "m/44'/506'/0'/0/100",
3779 "transaction": { "Version1": { "hash": "abc" } }
3780 });
3781 let request: SendTransactionSignedRequest = serde_json::from_value(payload).unwrap();
3782 assert!(request.transaction.get("Version1").is_some());
3783
3784 let legacy_payload = json!({
3785 "network_name": "casper-devnet",
3786 "node_id": 1,
3787 "signer_path": "m/44'/506'/0'/0/100",
3788 "transaction_json": { "Version1": { "hash": "def" } }
3789 });
3790 let legacy_err = serde_json::from_value::<SendTransactionSignedRequest>(legacy_payload)
3791 .unwrap_err()
3792 .to_string();
3793 assert!(legacy_err.contains("missing field `transaction`"));
3794 }
3795}