1use self::args_parser::parse_session_args;
2use crate::assets::{
3 self, AddNodesOptions, AssetSelector, AssetsLayout, SetupOptions, StageProtocolOptions,
4};
5use crate::control::{ControlRequest, ControlResponse, ControlResult, send_request};
6use crate::process::{self, StartPlan};
7use crate::state::{
8 ProcessKind, ProcessStatus, STATE_FILE_NAME, State, spawn_pid_sync_tasks,
9 spawn_pid_sync_tasks_for_ids,
10};
11use anyhow::{Context, Result, anyhow};
12use backoff::ExponentialBackoff;
13use backoff::backoff::Backoff;
14use casper_types::contracts::ContractHash;
15use casper_types::{
16 AddressableEntityHash, AsymmetricType, Digest, EntityVersion, Key, PricingMode, PublicKey,
17 SecretKey, TimeDiff, Transaction, TransactionHash, TransactionRuntimeParams, TransactionV1Hash,
18 URef,
19};
20use clap::{Args, ValueEnum};
21use futures::StreamExt;
22use nix::errno::Errno;
23use nix::sys::signal::kill;
24use nix::unistd::Pid;
25use rmcp::handler::server::{router::tool::ToolRouter, wrapper::Parameters};
26use rmcp::model::{CallToolResult, ServerCapabilities, ServerInfo};
27use rmcp::service::ServiceExt;
28use rmcp::transport::{
29 StreamableHttpServerConfig,
30 streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService},
31};
32use rmcp::{
33 ErrorData, ServerHandler, tool, tool_handler, tool_router, transport::stdio as mcp_stdio,
34};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::{HashMap, VecDeque};
38use std::path::{Path, PathBuf};
39use std::str::FromStr;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
42use std::time::{Duration, Instant};
43use tokio::fs as tokio_fs;
44use tokio::io::{AsyncReadExt, AsyncWriteExt};
45use tokio::net::UnixListener;
46use tokio::sync::{Mutex, Notify};
47use veles_casper_rust_sdk::TransactionV1Builder;
48use veles_casper_rust_sdk::jsonrpc::CasperClient;
49use veles_casper_rust_sdk::sse::event::SseEvent;
50use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
51
52const DEFAULT_HTTP_BIND: &str = "127.0.0.1:32100";
53const DEFAULT_HTTP_PATH: &str = "/mcp";
54const DEFAULT_NETWORK_NAME: &str = "casper-dev";
55const DEFAULT_NODE_COUNT: u32 = 4;
56const DEFAULT_DELAY_SECS: u64 = 3;
57const DEFAULT_LOG_LEVEL: &str = "info";
58const DEFAULT_NODE_LOG_FORMAT: &str = "json";
59const DEFAULT_SEED: &str = "default";
60const DEFAULT_TIMEOUT_SECS: u64 = 60;
61const DEFAULT_LOG_PAGE_SIZE: usize = 200;
62const DEFAULT_SSE_PAGE_SIZE: usize = 200;
63const DEFAULT_SSE_HISTORY_CAPACITY: usize = 20_000;
64const DEFAULT_PAYMENT_AMOUNT: u64 = 100_000_000_000;
65
66mod args_parser;
67
68#[derive(Debug, Clone, Copy, ValueEnum)]
69#[value(rename_all = "kebab-case")]
70pub enum McpTransport {
71 Stdio,
72 Http,
73 Both,
74}
75
76#[derive(Debug, Args, Clone)]
77pub struct McpArgs {
78 #[arg(long, value_enum, default_value_t = McpTransport::Both)]
79 transport: McpTransport,
80
81 #[arg(long, default_value = DEFAULT_HTTP_BIND)]
82 http_bind: String,
83
84 #[arg(long, default_value = DEFAULT_HTTP_PATH)]
85 http_path: String,
86
87 #[arg(long, value_name = "PATH")]
88 net_path: Option<PathBuf>,
89}
90
91#[derive(Debug, Clone)]
92struct McpServer {
93 manager: Arc<NetworkManager>,
94 tool_router: ToolRouter<Self>,
95}
96
97impl McpServer {
98 fn new(manager: Arc<NetworkManager>) -> Self {
99 Self {
100 manager,
101 tool_router: Self::tool_router(),
102 }
103 }
104
105 fn server_info() -> ServerInfo {
106 ServerInfo {
107 instructions: Some(
108 "Control multiple local Casper devnets. Start with spawn_network, then wait_network_ready before RPC or transaction tools. Do not use external casper-client binaries or curl; use MCP tools directly (for example get_transaction, wait_transaction, make_transaction_package_call, make_transaction_contract_call, make_transaction_session_wasm, send_transaction_signed).".to_string(),
109 ),
110 capabilities: ServerCapabilities::builder().enable_tools().build(),
111 ..Default::default()
112 }
113 }
114}
115
116#[tool_router]
117impl McpServer {
118 #[tool(
119 description = "Spawn or resume a managed network. Defaults to force_setup=true for fresh devnet startup."
120 )]
121 async fn spawn_network(
122 &self,
123 Parameters(request): Parameters<SpawnNetworkRequest>,
124 ) -> std::result::Result<CallToolResult, ErrorData> {
125 let result = self
126 .manager
127 .spawn_network(request)
128 .await
129 .map_err(to_mcp_error)?;
130 Ok(ok_value(
131 serde_json::to_value(result).map_err(internal_serde_error)?,
132 ))
133 }
134
135 #[tool(
136 description = "Wait for a managed network to be ready: processes running, /status healthy, reactor Validate, and at least one block observed."
137 )]
138 async fn wait_network_ready(
139 &self,
140 Parameters(request): Parameters<WaitNetworkReadyRequest>,
141 ) -> std::result::Result<CallToolResult, ErrorData> {
142 let result = self
143 .manager
144 .wait_network_ready(&request.network_name, request.timeout_seconds)
145 .await
146 .map_err(to_mcp_error)?;
147 Ok(ok_value(
148 serde_json::to_value(result).map_err(internal_serde_error)?,
149 ))
150 }
151
152 #[tool(
153 description = "Stage a protocol upgrade from a named custom asset. When the network is running, sidecars are restarted after staging."
154 )]
155 async fn stage_protocol(
156 &self,
157 Parameters(request): Parameters<StageProtocolRequest>,
158 ) -> std::result::Result<CallToolResult, ErrorData> {
159 let result = self
160 .manager
161 .stage_protocol(
162 &request.network_name,
163 request.asset.as_deref(),
164 request.custom_asset.as_deref(),
165 request.asset_name.as_deref(),
166 &request.protocol_version,
167 request.activation_point,
168 )
169 .await
170 .map_err(to_mcp_error)?;
171 Ok(ok_value(
172 serde_json::to_value(result).map_err(internal_serde_error)?,
173 ))
174 }
175
176 #[tool(
177 description = "Despawn a managed network. By default it stops processes and keeps files; set purge=true to remove files."
178 )]
179 async fn despawn_network(
180 &self,
181 Parameters(request): Parameters<DespawnNetworkRequest>,
182 ) -> std::result::Result<CallToolResult, ErrorData> {
183 let result = self
184 .manager
185 .despawn_network(&request.network_name, request.purge.unwrap_or(false))
186 .await
187 .map_err(to_mcp_error)?;
188 Ok(ok_value(
189 serde_json::to_value(result).map_err(internal_serde_error)?,
190 ))
191 }
192
193 #[tool(description = "List discovered network names and managed/running state.")]
194 async fn list_networks(
195 &self,
196 _: Parameters<ListNetworksRequest>,
197 ) -> std::result::Result<CallToolResult, ErrorData> {
198 let result = self.manager.list_networks().await.map_err(to_mcp_error)?;
199 Ok(ok_value(
200 serde_json::to_value(result).map_err(internal_serde_error)?,
201 ))
202 }
203
204 #[tool(
205 description = "List managed network processes, optionally filtered by process name. Defaults to running_only=true."
206 )]
207 async fn managed_processes(
208 &self,
209 Parameters(request): Parameters<ManagedProcessesRequest>,
210 ) -> std::result::Result<CallToolResult, ErrorData> {
211 let result = self
212 .manager
213 .managed_processes(request)
214 .await
215 .map_err(to_mcp_error)?;
216 Ok(ok_value(
217 serde_json::to_value(result).map_err(internal_serde_error)?,
218 ))
219 }
220
221 #[tool(description = "Call node REST /status for a managed network node.")]
222 async fn status(
223 &self,
224 Parameters(request): Parameters<NodeScopedRequest>,
225 ) -> std::result::Result<CallToolResult, ErrorData> {
226 let network = self
227 .manager
228 .get_network(&request.network_name)
229 .await
230 .map_err(to_mcp_error)?;
231 ensure_running_network(&network).await?;
232 let status = fetch_rest_status(request.node_id)
233 .await
234 .map_err(to_mcp_error)?;
235 Ok(ok_value(status))
236 }
237
238 #[tool(
239 description = "Run a raw JSON-RPC call against node sidecar endpoint. Useful as a generic query helper."
240 )]
241 async fn rpc_query(
242 &self,
243 Parameters(request): Parameters<RpcQueryRequest>,
244 ) -> std::result::Result<CallToolResult, ErrorData> {
245 let network = self
246 .manager
247 .get_network(&request.network_name)
248 .await
249 .map_err(to_mcp_error)?;
250 ensure_running_network(&network).await?;
251 let value = self
252 .manager
253 .raw_rpc_query(request.node_id, &request.method, request.params)
254 .await
255 .map_err(to_mcp_error)?;
256 Ok(ok_value(value))
257 }
258
259 #[tool(description = "Typed balance query by account/public key/uref/entity identifier.")]
260 async fn rpc_query_balance(
261 &self,
262 Parameters(request): Parameters<RpcQueryBalanceRequest>,
263 ) -> std::result::Result<CallToolResult, ErrorData> {
264 let network = self
265 .manager
266 .get_network(&request.network_name)
267 .await
268 .map_err(to_mcp_error)?;
269 ensure_running_network(&network).await?;
270
271 let block_id = normalize_optional_identifier(request.block_id.as_deref());
272 let state_root_hash = normalize_optional_identifier(request.state_root_hash.as_deref());
273 let params =
274 build_query_balance_params(&request.purse_identifier, &block_id, &state_root_hash)
275 .map_err(to_mcp_error)?;
276 let response = self
277 .manager
278 .raw_rpc_query(request.node_id, "query_balance", Some(params))
279 .await
280 .map_err(to_mcp_error)?;
281 Ok(ok_value(
282 extract_rpc_result(response).map_err(to_mcp_error)?,
283 ))
284 }
285
286 #[tool(
287 description = "Typed global state query by key + optional path + optional block/state-root identifier. If no identifier is provided, the latest block hash is used."
288 )]
289 async fn rpc_query_global_state(
290 &self,
291 Parameters(request): Parameters<RpcQueryGlobalStateRequest>,
292 ) -> std::result::Result<CallToolResult, ErrorData> {
293 let network = self
294 .manager
295 .get_network(&request.network_name)
296 .await
297 .map_err(to_mcp_error)?;
298 ensure_running_network(&network).await?;
299
300 let (block_id, state_root_hash) = resolve_global_state_identifier(
301 &request.network_name,
302 request.node_id,
303 request.block_id.as_deref(),
304 request.state_root_hash.as_deref(),
305 )
306 .await
307 .map_err(to_mcp_error)?;
308 let params = build_query_global_state_params(
309 &request.key,
310 request.path.unwrap_or_default(),
311 &block_id,
312 &state_root_hash,
313 )
314 .map_err(to_mcp_error)?;
315 let response = self
316 .manager
317 .raw_rpc_query(request.node_id, "query_global_state", Some(params))
318 .await
319 .map_err(to_mcp_error)?;
320
321 Ok(ok_value(
322 extract_rpc_result(response).map_err(to_mcp_error)?,
323 ))
324 }
325
326 #[tool(description = "Get current block height using typed chain_get_block RPC.")]
327 async fn current_block_height(
328 &self,
329 Parameters(request): Parameters<CurrentBlockRequest>,
330 ) -> std::result::Result<CallToolResult, ErrorData> {
331 let network = self
332 .manager
333 .get_network(&request.network_name)
334 .await
335 .map_err(to_mcp_error)?;
336 ensure_running_network(&network).await?;
337
338 let value = fetch_block_result(
339 &self.manager,
340 &request.network_name,
341 request.node_id,
342 request.block_id.as_deref(),
343 )
344 .await;
345 let value = match value {
346 Ok(value) => value,
347 Err(err) => {
348 if request.block_id.is_none()
349 && let Some((low, high)) =
350 parse_no_such_block_range_from_error(&err.to_string())
351 {
352 return Ok(ok_value(json!({
353 "network_name": request.network_name,
354 "node_id": request.node_id,
355 "height": high,
356 "block": Value::Null,
357 "pending": true,
358 "available_block_range": {
359 "low": low,
360 "high": high,
361 },
362 "message": "latest block is not yet queryable; retry shortly",
363 })));
364 }
365 return Err(to_mcp_error(err));
366 }
367 };
368 let height = extract_block_height(&value).ok_or_else(|| {
369 ErrorData::internal_error("missing block height in RPC response", None)
370 })?;
371
372 Ok(ok_value(json!({
373 "network_name": request.network_name,
374 "node_id": request.node_id,
375 "height": height,
376 "block": value,
377 })))
378 }
379
380 #[tool(description = "Get current block payload using typed chain_get_block RPC.")]
381 async fn current_block(
382 &self,
383 Parameters(request): Parameters<CurrentBlockRequest>,
384 ) -> std::result::Result<CallToolResult, ErrorData> {
385 let network = self
386 .manager
387 .get_network(&request.network_name)
388 .await
389 .map_err(to_mcp_error)?;
390 ensure_running_network(&network).await?;
391
392 let value = fetch_block_result(
393 &self.manager,
394 &request.network_name,
395 request.node_id,
396 request.block_id.as_deref(),
397 )
398 .await
399 .map_err(to_mcp_error)?;
400 Ok(ok_value(value))
401 }
402
403 #[tool(description = "Get paginated node stdout/stderr logs from on-disk files.")]
404 async fn get_node_logs(
405 &self,
406 Parameters(request): Parameters<GetNodeLogsRequest>,
407 ) -> std::result::Result<CallToolResult, ErrorData> {
408 let network = self
409 .manager
410 .get_network(&request.network_name)
411 .await
412 .map_err(to_mcp_error)?;
413
414 let limit = request.limit.unwrap_or(DEFAULT_LOG_PAGE_SIZE);
415 if limit == 0 {
416 return Err(ErrorData::invalid_params(
417 "limit must be greater than 0",
418 None,
419 ));
420 }
421
422 let file_name = match request.stream {
423 NodeLogStream::Stdout => "stdout.log",
424 NodeLogStream::Stderr => "stderr.log",
425 };
426 let path = network
427 .layout
428 .node_logs_dir(request.node_id)
429 .join(file_name);
430
431 let page = read_log_page(&path, request.before_line, limit)
432 .await
433 .map_err(to_mcp_error)?;
434 Ok(ok_value(
435 serde_json::to_value(page).map_err(internal_serde_error)?,
436 ))
437 }
438
439 #[tool(
440 description = "Wait for the next SSE event after a sequence cursor with optional event type filters."
441 )]
442 async fn wait_next_sse(
443 &self,
444 Parameters(request): Parameters<WaitNextSseRequest>,
445 ) -> std::result::Result<CallToolResult, ErrorData> {
446 let network = self
447 .manager
448 .get_network(&request.network_name)
449 .await
450 .map_err(to_mcp_error)?;
451 ensure_running_network(&network).await?;
452
453 let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
454 let filter = SseFilter {
455 include_event_types: request.include_event_types.unwrap_or_default(),
456 exclude_event_types: request.exclude_event_types.unwrap_or_default(),
457 };
458
459 let after = match request.after_sequence {
460 Some(value) => value,
461 None => network.sse_store.latest_sequence().await,
462 };
463
464 let event = network
465 .sse_store
466 .wait_next(after, &filter, timeout)
467 .await
468 .map_err(to_mcp_error)?;
469
470 Ok(ok_value(
471 serde_json::to_value(event).map_err(internal_serde_error)?,
472 ))
473 }
474
475 #[tool(
476 description = "Fetch paginated SSE history with sequence cursor and optional event type filters."
477 )]
478 async fn sse_history(
479 &self,
480 Parameters(request): Parameters<SseHistoryRequest>,
481 ) -> std::result::Result<CallToolResult, ErrorData> {
482 let network = self
483 .manager
484 .get_network(&request.network_name)
485 .await
486 .map_err(to_mcp_error)?;
487
488 let limit = request.limit.unwrap_or(DEFAULT_SSE_PAGE_SIZE);
489 if limit == 0 {
490 return Err(ErrorData::invalid_params(
491 "limit must be greater than 0",
492 None,
493 ));
494 }
495
496 let filter = SseFilter {
497 include_event_types: request.include_event_types.unwrap_or_default(),
498 exclude_event_types: request.exclude_event_types.unwrap_or_default(),
499 };
500
501 let history = network
502 .sse_store
503 .history(request.before_sequence, limit, &filter)
504 .await;
505 Ok(ok_value(
506 serde_json::to_value(history).map_err(internal_serde_error)?,
507 ))
508 }
509
510 #[tool(description = "List derived accounts from derived-accounts.csv for a network.")]
511 async fn list_derived_accounts(
512 &self,
513 Parameters(request): Parameters<ListDerivedAccountsRequest>,
514 ) -> std::result::Result<CallToolResult, ErrorData> {
515 let accounts = self
516 .manager
517 .list_derived_accounts(&request.network_name)
518 .await
519 .map_err(to_mcp_error)?;
520 Ok(ok_value(
521 serde_json::to_value(accounts).map_err(internal_serde_error)?,
522 ))
523 }
524
525 #[tool(
526 description = "Submit signed or unsigned transaction JSON. Signer key is derived from signer_path for this managed network. transaction must be a typed Transaction JSON object."
527 )]
528 async fn send_transaction_signed(
529 &self,
530 Parameters(request): Parameters<SendTransactionSignedRequest>,
531 ) -> std::result::Result<CallToolResult, ErrorData> {
532 let network = self
533 .manager
534 .get_network(&request.network_name)
535 .await
536 .map_err(to_mcp_error)?;
537 ensure_running_network(&network).await?;
538
539 let signer = self
540 .manager
541 .derived_account_for_path(&network, &request.signer_path)
542 .await
543 .map_err(to_mcp_error)?;
544 verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
545 .await
546 .map_err(to_mcp_error)?;
547
548 let mut transaction = parse_transaction_json(request.transaction)?;
549 transaction.sign(&signer.secret_key);
550
551 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
552 let response = rpc
553 .put_transaction(transaction)
554 .await
555 .map_err(to_mcp_error)?;
556
557 Ok(ok_value(json!({
558 "network_name": request.network_name,
559 "node_id": request.node_id,
560 "transaction_hash": response.transaction_hash.to_hex_string(),
561 })))
562 }
563
564 #[tool(
565 description = "Create a stored package-name call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
566 )]
567 async fn make_transaction_package_call(
568 &self,
569 Parameters(request): Parameters<MakeTransactionPackageCallRequest>,
570 ) -> std::result::Result<CallToolResult, ErrorData> {
571 let network = self
572 .manager
573 .get_network(&request.network_name)
574 .await
575 .map_err(to_mcp_error)?;
576 ensure_running_network(&network).await?;
577
578 let runtime = match (
579 request.runtime_transferred_value,
580 request.runtime_seed_hex.as_deref(),
581 ) {
582 (None, None) => TransactionRuntimeParams::VmCasperV1,
583 (transferred_value, maybe_seed_hex) => {
584 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
585 TransactionRuntimeParams::VmCasperV2 {
586 transferred_value: transferred_value.unwrap_or(0),
587 seed,
588 }
589 }
590 };
591
592 let mut builder = TransactionV1Builder::new_targeting_package_via_alias(
593 request.transaction_package_name.clone(),
594 request.transaction_package_version,
595 request.session_entry_point.clone(),
596 runtime,
597 );
598
599 if let Some(args_json) = request.session_args.as_ref()
600 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
601 {
602 builder = builder.with_runtime_args(runtime_args);
603 }
604
605 if let Some(ttl_millis) = request.ttl_millis {
606 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
607 }
608
609 let pricing = build_pricing_mode(
610 request.gas_price_tolerance,
611 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
612 );
613 builder = builder.with_pricing_mode(pricing);
614
615 let chain_name = match request.chain_name {
616 Some(value) => value,
617 None => fetch_chain_name(&request.network_name, request.node_id)
618 .await
619 .map_err(to_mcp_error)?,
620 };
621 builder = builder.with_chain_name(chain_name.clone());
622
623 let mut signed = false;
624 let mut signer_path = None;
625 let mut derived_signer = None;
626 if let Some(path) = request.signer_path {
627 let signer = self
628 .manager
629 .derived_account_for_path(&network, &path)
630 .await
631 .map_err(to_mcp_error)?;
632 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
633 .await
634 .map_err(to_mcp_error)?;
635 derived_signer = Some(signer);
636 signed = true;
637 signer_path = Some(path);
638 } else if let Some(initiator_public_key) = request.initiator_public_key {
639 let public_key = PublicKey::from_hex(initiator_public_key.trim())
640 .with_context(|| "failed to parse initiator_public_key as hex public key")
641 .map_err(to_mcp_error)?;
642 builder = builder.with_initiator_addr(public_key);
643 } else {
644 return Err(ErrorData::invalid_params(
645 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
646 None,
647 ));
648 }
649
650 if let Some(signer) = derived_signer.as_ref() {
651 builder = builder.with_secret_key(&signer.secret_key);
652 }
653
654 let tx = builder.build().map_err(to_mcp_error)?;
655 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
656
657 Ok(ok_value(json!({
658 "network_name": request.network_name,
659 "node_id": request.node_id,
660 "chain_name": chain_name,
661 "signed": signed,
662 "signer_path": signer_path,
663 "transaction": tx_json,
664 })))
665 }
666
667 #[tool(
668 description = "Create a stored contract-hash call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
669 )]
670 async fn make_transaction_contract_call(
671 &self,
672 Parameters(request): Parameters<MakeTransactionContractCallRequest>,
673 ) -> std::result::Result<CallToolResult, ErrorData> {
674 let network = self
675 .manager
676 .get_network(&request.network_name)
677 .await
678 .map_err(to_mcp_error)?;
679 ensure_running_network(&network).await?;
680
681 let runtime = match (
682 request.runtime_transferred_value,
683 request.runtime_seed_hex.as_deref(),
684 ) {
685 (None, None) => TransactionRuntimeParams::VmCasperV1,
686 (transferred_value, maybe_seed_hex) => {
687 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
688 TransactionRuntimeParams::VmCasperV2 {
689 transferred_value: transferred_value.unwrap_or(0),
690 seed,
691 }
692 }
693 };
694
695 let contract_hash = parse_contract_hash_for_invocation(&request.transaction_contract_hash)
696 .map_err(to_mcp_error)?;
697 let mut builder = TransactionV1Builder::new_targeting_invocable_entity(
698 contract_hash,
699 request.session_entry_point.clone(),
700 runtime,
701 );
702
703 if let Some(args_json) = request.session_args.as_ref()
704 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
705 {
706 builder = builder.with_runtime_args(runtime_args);
707 }
708
709 if let Some(ttl_millis) = request.ttl_millis {
710 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
711 }
712
713 let pricing = build_pricing_mode(
714 request.gas_price_tolerance,
715 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
716 );
717 builder = builder.with_pricing_mode(pricing);
718
719 let chain_name = match request.chain_name {
720 Some(value) => value,
721 None => fetch_chain_name(&request.network_name, request.node_id)
722 .await
723 .map_err(to_mcp_error)?,
724 };
725 builder = builder.with_chain_name(chain_name.clone());
726
727 let mut signed = false;
728 let mut signer_path = None;
729 let mut derived_signer = None;
730 if let Some(path) = request.signer_path {
731 let signer = self
732 .manager
733 .derived_account_for_path(&network, &path)
734 .await
735 .map_err(to_mcp_error)?;
736 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
737 .await
738 .map_err(to_mcp_error)?;
739 derived_signer = Some(signer);
740 signed = true;
741 signer_path = Some(path);
742 } else if let Some(initiator_public_key) = request.initiator_public_key {
743 let public_key = PublicKey::from_hex(initiator_public_key.trim())
744 .with_context(|| "failed to parse initiator_public_key as hex public key")
745 .map_err(to_mcp_error)?;
746 builder = builder.with_initiator_addr(public_key);
747 } else {
748 return Err(ErrorData::invalid_params(
749 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
750 None,
751 ));
752 }
753
754 if let Some(signer) = derived_signer.as_ref() {
755 builder = builder.with_secret_key(&signer.secret_key);
756 }
757
758 let tx = builder.build().map_err(to_mcp_error)?;
759 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
760
761 Ok(ok_value(json!({
762 "network_name": request.network_name,
763 "node_id": request.node_id,
764 "chain_name": chain_name,
765 "transaction_contract_hash": request.transaction_contract_hash,
766 "signed": signed,
767 "signer_path": signer_path,
768 "transaction": tx_json,
769 })))
770 }
771
772 #[tool(
773 description = "Create a session wasm transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
774 )]
775 async fn make_transaction_session_wasm(
776 &self,
777 Parameters(request): Parameters<MakeTransactionSessionWasmRequest>,
778 ) -> std::result::Result<CallToolResult, ErrorData> {
779 let network = self
780 .manager
781 .get_network(&request.network_name)
782 .await
783 .map_err(to_mcp_error)?;
784 ensure_running_network(&network).await?;
785
786 let wasm_path = PathBuf::from(&request.wasm_path);
787 let wasm_bytes = tokio_fs::read(&wasm_path)
788 .await
789 .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
790 .map_err(to_mcp_error)?;
791
792 let runtime = match (
793 request.runtime_transferred_value,
794 request.runtime_seed_hex.as_deref(),
795 ) {
796 (None, None) => TransactionRuntimeParams::VmCasperV1,
797 (transferred_value, maybe_seed_hex) => {
798 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
799 TransactionRuntimeParams::VmCasperV2 {
800 transferred_value: transferred_value.unwrap_or(0),
801 seed,
802 }
803 }
804 };
805
806 let mut builder = TransactionV1Builder::new_session(
807 request.is_install_upgrade.unwrap_or(true),
808 wasm_bytes.into(),
809 runtime,
810 );
811
812 if let Some(args_json) = request.session_args.as_ref()
813 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
814 {
815 builder = builder.with_runtime_args(runtime_args);
816 }
817
818 if let Some(ttl_millis) = request.ttl_millis {
819 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
820 }
821
822 let pricing = build_pricing_mode(
823 request.gas_price_tolerance,
824 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
825 );
826 builder = builder.with_pricing_mode(pricing);
827
828 let chain_name = match request.chain_name {
829 Some(value) => value,
830 None => fetch_chain_name(&request.network_name, request.node_id)
831 .await
832 .map_err(to_mcp_error)?,
833 };
834 builder = builder.with_chain_name(chain_name.clone());
835
836 let mut signed = false;
837 let mut signer_path = None;
838 let mut derived_signer = None;
839 if let Some(path) = request.signer_path {
840 let signer = self
841 .manager
842 .derived_account_for_path(&network, &path)
843 .await
844 .map_err(to_mcp_error)?;
845 verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
846 .await
847 .map_err(to_mcp_error)?;
848 derived_signer = Some(signer);
849 signed = true;
850 signer_path = Some(path);
851 } else if let Some(initiator_public_key) = request.initiator_public_key {
852 let public_key = PublicKey::from_hex(initiator_public_key.trim())
853 .with_context(|| "failed to parse initiator_public_key as hex public key")
854 .map_err(to_mcp_error)?;
855 builder = builder.with_initiator_addr(public_key);
856 } else {
857 return Err(ErrorData::invalid_params(
858 "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
859 None,
860 ));
861 }
862
863 if let Some(signer) = derived_signer.as_ref() {
864 builder = builder.with_secret_key(&signer.secret_key);
865 }
866
867 let tx = builder.build().map_err(to_mcp_error)?;
868 let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
869
870 Ok(ok_value(json!({
871 "network_name": request.network_name,
872 "node_id": request.node_id,
873 "chain_name": chain_name,
874 "wasm_path": request.wasm_path,
875 "signed": signed,
876 "signer_path": signer_path,
877 "transaction": tx_json,
878 })))
879 }
880
881 #[tool(
882 description = "Build, sign and submit a session wasm transaction from a derived account path. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
883 )]
884 async fn send_session_wasm(
885 &self,
886 Parameters(request): Parameters<SendSessionWasmRequest>,
887 ) -> std::result::Result<CallToolResult, ErrorData> {
888 let network = self
889 .manager
890 .get_network(&request.network_name)
891 .await
892 .map_err(to_mcp_error)?;
893 ensure_running_network(&network).await?;
894
895 let signer = self
896 .manager
897 .derived_account_for_path(&network, &request.signer_path)
898 .await
899 .map_err(to_mcp_error)?;
900 verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
901 .await
902 .map_err(to_mcp_error)?;
903
904 let wasm_path = PathBuf::from(&request.wasm_path);
905 let wasm_bytes = tokio_fs::read(&wasm_path)
906 .await
907 .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
908 .map_err(to_mcp_error)?;
909
910 let runtime = match (
911 request.runtime_transferred_value,
912 request.runtime_seed_hex.as_deref(),
913 ) {
914 (None, None) => TransactionRuntimeParams::VmCasperV1,
915 (transferred_value, maybe_seed_hex) => {
916 let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
917 TransactionRuntimeParams::VmCasperV2 {
918 transferred_value: transferred_value.unwrap_or(0),
919 seed,
920 }
921 }
922 };
923
924 let mut builder = TransactionV1Builder::new_session(
925 request.is_install_upgrade.unwrap_or(true),
926 wasm_bytes.into(),
927 runtime,
928 );
929
930 if let Some(args_json) = request.session_args.as_ref()
931 && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
932 {
933 builder = builder.with_runtime_args(runtime_args);
934 }
935
936 if let Some(ttl_millis) = request.ttl_millis {
937 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
938 }
939
940 let pricing = build_pricing_mode(
941 request.gas_price_tolerance,
942 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
943 );
944 builder = builder.with_pricing_mode(pricing);
945
946 let chain_name = match request.chain_name {
947 Some(value) => value,
948 None => fetch_chain_name(&request.network_name, request.node_id)
949 .await
950 .map_err(to_mcp_error)?,
951 };
952
953 let tx = builder
954 .with_chain_name(chain_name)
955 .with_secret_key(&signer.secret_key)
956 .build()
957 .map_err(to_mcp_error)?;
958
959 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
960 let response = rpc
961 .put_transaction(Transaction::V1(tx))
962 .await
963 .map_err(to_mcp_error)?;
964
965 Ok(ok_value(json!({
966 "network_name": request.network_name,
967 "node_id": request.node_id,
968 "transaction_hash": response.transaction_hash.to_hex_string(),
969 })))
970 }
971
972 #[tool(
973 description = "Transfer tokens between derived account paths. from_path signs, to_path resolves recipient."
974 )]
975 async fn transfer_tokens(
976 &self,
977 Parameters(request): Parameters<TransferTokensRequest>,
978 ) -> std::result::Result<CallToolResult, ErrorData> {
979 let network = self
980 .manager
981 .get_network(&request.network_name)
982 .await
983 .map_err(to_mcp_error)?;
984 ensure_running_network(&network).await?;
985
986 let from = self
987 .manager
988 .derived_account_for_path(&network, &request.from_path)
989 .await
990 .map_err(to_mcp_error)?;
991 let to = self
992 .manager
993 .derived_account_for_path(&network, &request.to_path)
994 .await
995 .map_err(to_mcp_error)?;
996
997 verify_path_hash_consistency(&network.layout, &request.from_path, &from.account_hash)
998 .await
999 .map_err(to_mcp_error)?;
1000 verify_path_hash_consistency(&network.layout, &request.to_path, &to.account_hash)
1001 .await
1002 .map_err(to_mcp_error)?;
1003
1004 let amount = casper_types::U512::from_dec_str(&request.amount)
1005 .with_context(|| "amount must be a decimal U512 string")
1006 .map_err(to_mcp_error)?;
1007 let to_public_key = PublicKey::from_hex(&to.public_key_hex)
1008 .with_context(|| "failed to parse derived recipient public key")
1009 .map_err(to_mcp_error)?;
1010
1011 let mut builder =
1012 TransactionV1Builder::new_transfer(amount, None, to_public_key, request.transfer_id)
1013 .map_err(to_mcp_error)?;
1014
1015 if let Some(ttl_millis) = request.ttl_millis {
1016 builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
1017 }
1018
1019 let pricing = build_pricing_mode(
1020 request.gas_price_tolerance,
1021 Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
1022 );
1023 builder = builder.with_pricing_mode(pricing);
1024
1025 let chain_name = match request.chain_name {
1026 Some(value) => value,
1027 None => fetch_chain_name(&request.network_name, request.node_id)
1028 .await
1029 .map_err(to_mcp_error)?,
1030 };
1031
1032 let tx = builder
1033 .with_chain_name(chain_name)
1034 .with_secret_key(&from.secret_key)
1035 .build()
1036 .map_err(to_mcp_error)?;
1037
1038 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1039 let response = rpc
1040 .put_transaction(Transaction::V1(tx))
1041 .await
1042 .map_err(to_mcp_error)?;
1043
1044 Ok(ok_value(json!({
1045 "network_name": request.network_name,
1046 "node_id": request.node_id,
1047 "transaction_hash": response.transaction_hash.to_hex_string(),
1048 "from_path": request.from_path,
1049 "to_path": request.to_path,
1050 "amount": request.amount,
1051 })))
1052 }
1053
1054 #[tool(
1055 description = "Wait for transaction execution result with timeout and return execution effects/result payload."
1056 )]
1057 async fn wait_transaction(
1058 &self,
1059 Parameters(request): Parameters<WaitTransactionRequest>,
1060 ) -> std::result::Result<CallToolResult, ErrorData> {
1061 let network = self
1062 .manager
1063 .get_network(&request.network_name)
1064 .await
1065 .map_err(to_mcp_error)?;
1066 ensure_running_network(&network).await?;
1067
1068 let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1069 let poll = Duration::from_millis(request.poll_interval_millis.unwrap_or(1_000));
1070 let tx_hash =
1071 parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1072 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1073 let deadline = Instant::now() + timeout;
1074
1075 loop {
1076 let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1077
1078 if let Some(exec_info) = response.execution_info.clone()
1079 && exec_info.execution_result.is_some()
1080 {
1081 return Ok(ok_value(
1082 serde_json::to_value(response).map_err(internal_serde_error)?,
1083 ));
1084 }
1085
1086 if Instant::now() >= deadline {
1087 return Err(ErrorData::resource_not_found(
1088 format!(
1089 "transaction {} execution result not available before timeout",
1090 request.transaction_hash
1091 ),
1092 None,
1093 ));
1094 }
1095
1096 tokio::time::sleep(poll).await;
1097 }
1098 }
1099
1100 #[tool(
1101 description = "Get transaction details via info_get_transaction (non-waiting). Returns typed JSON response from node."
1102 )]
1103 async fn get_transaction(
1104 &self,
1105 Parameters(request): Parameters<GetTransactionRequest>,
1106 ) -> std::result::Result<CallToolResult, ErrorData> {
1107 let network = self
1108 .manager
1109 .get_network(&request.network_name)
1110 .await
1111 .map_err(to_mcp_error)?;
1112 ensure_running_network(&network).await?;
1113
1114 let tx_hash =
1115 parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1116 let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1117 let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1118
1119 Ok(ok_value(
1120 serde_json::to_value(response).map_err(internal_serde_error)?,
1121 ))
1122 }
1123}
1124
1125#[tool_handler]
1126impl ServerHandler for McpServer {
1127 fn get_info(&self) -> ServerInfo {
1128 Self::server_info()
1129 }
1130}
1131
1132#[derive(Debug)]
1133struct NetworkManager {
1134 assets_root: PathBuf,
1135 managed: Mutex<HashMap<String, Arc<ManagedNetwork>>>,
1136 http: reqwest::Client,
1137}
1138
1139#[derive(Debug)]
1140struct ManagedNetwork {
1141 layout: AssetsLayout,
1142 state: Arc<Mutex<State>>,
1143 node_count: AtomicU32,
1144 rust_log: String,
1145 seed: Arc<str>,
1146 sse_store: Arc<SseStore>,
1147 last_block_hook_hash: Mutex<Option<String>>,
1148 shutdown: Arc<AtomicBool>,
1149 control_shutdown: Arc<AtomicBool>,
1150 asset_mutation_lock: Mutex<()>,
1151 sse_tasks: Mutex<Vec<tokio::task::JoinHandle<()>>>,
1152 control_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
1153}
1154
1155impl ManagedNetwork {
1156 async fn is_running(&self) -> bool {
1157 let state = self.state.lock().await;
1158 processes_running(&state)
1159 }
1160
1161 async fn stop(&self) -> Result<()> {
1162 self.shutdown.store(true, Ordering::SeqCst);
1163 self.control_shutdown.store(true, Ordering::SeqCst);
1164
1165 if let Some(control_task) = self.control_task.lock().await.take() {
1166 control_task.abort();
1167 let _ = control_task.await;
1168 }
1169 let _ = tokio_fs::remove_file(self.layout.control_socket_path()).await;
1170
1171 let tasks = {
1172 let mut guard = self.sse_tasks.lock().await;
1173 guard.drain(..).collect::<Vec<_>>()
1174 };
1175 for task in tasks {
1176 task.abort();
1177 let _ = task.await;
1178 }
1179
1180 let mut state = self.state.lock().await;
1181 process::stop(&mut state).await
1182 }
1183}
1184
1185impl NetworkManager {
1186 async fn new(assets_root: PathBuf) -> Result<Self> {
1187 let http = reqwest::Client::builder()
1188 .no_proxy()
1189 .timeout(Duration::from_secs(5))
1190 .build()?;
1191 Ok(Self {
1192 assets_root,
1193 managed: Mutex::new(HashMap::new()),
1194 http,
1195 })
1196 }
1197
1198 async fn spawn_network(&self, request: SpawnNetworkRequest) -> Result<SpawnNetworkResponse> {
1199 let network_name = request
1200 .network_name
1201 .unwrap_or_else(|| DEFAULT_NETWORK_NAME.to_string());
1202
1203 if network_name.trim().is_empty() {
1204 return Err(anyhow!("network_name must not be empty"));
1205 }
1206
1207 let maybe_existing = {
1208 let mut managed = self.managed.lock().await;
1209 if let Some(existing) = managed.get(&network_name)
1210 && existing.is_running().await
1211 && !request.force_setup.unwrap_or(true)
1212 {
1213 return Ok(SpawnNetworkResponse {
1214 network_name,
1215 node_count: existing.node_count.load(Ordering::SeqCst),
1216 managed: true,
1217 already_running: true,
1218 forced_setup: false,
1219 });
1220 }
1221 managed.remove(&network_name)
1222 };
1223
1224 if let Some(existing) = maybe_existing {
1225 let _ = existing.stop().await;
1226 }
1227
1228 let force_setup = request.force_setup.unwrap_or(true);
1229 let requested_nodes = request.node_count.unwrap_or(DEFAULT_NODE_COUNT);
1230 let users = request.users;
1231 let delay = request.delay.unwrap_or(DEFAULT_DELAY_SECS);
1232 let log_level = request
1233 .log_level
1234 .unwrap_or_else(|| DEFAULT_LOG_LEVEL.to_string());
1235 let node_log_format = request
1236 .node_log_format
1237 .unwrap_or_else(|| DEFAULT_NODE_LOG_FORMAT.to_string());
1238 let seed: Arc<str> = Arc::from(request.seed.unwrap_or_else(|| DEFAULT_SEED.to_string()));
1239
1240 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.clone());
1241 let assets_exist = layout.exists().await;
1242
1243 let asset_selector = assets::optional_asset_selector(
1244 request.asset.as_deref(),
1245 request.custom_asset.as_deref(),
1246 )?;
1247
1248 let setup_result = if force_setup {
1249 assets::teardown(&layout).await?;
1250 Some(
1251 assets::setup_local(
1252 &layout,
1253 &SetupOptions {
1254 nodes: requested_nodes,
1255 users,
1256 delay_seconds: delay,
1257 network_name: network_name.clone(),
1258 asset: asset_selector.clone(),
1259 protocol_version: request.protocol_version.clone(),
1260 chainspec_overrides: Vec::new(),
1261 node_log_format,
1262 seed: Arc::clone(&seed),
1263 },
1264 )
1265 .await?,
1266 )
1267 } else if !assets_exist {
1268 Some(
1269 assets::setup_local(
1270 &layout,
1271 &SetupOptions {
1272 nodes: requested_nodes,
1273 users,
1274 delay_seconds: delay,
1275 network_name: network_name.clone(),
1276 asset: asset_selector,
1277 protocol_version: request.protocol_version.clone(),
1278 chainspec_overrides: Vec::new(),
1279 node_log_format,
1280 seed: Arc::clone(&seed),
1281 },
1282 )
1283 .await?,
1284 )
1285 } else {
1286 None
1287 };
1288
1289 if !layout.exists().await {
1290 return Err(anyhow!(
1291 "assets missing under {}; call spawn_network with force_setup=true",
1292 layout.net_dir().display()
1293 ));
1294 }
1295
1296 assets::ensure_network_hook_samples(&layout).await?;
1297
1298 if !force_setup && assets_exist {
1299 let _ = assets::ensure_consensus_keys(&layout, Arc::clone(&seed)).await?;
1300 }
1301
1302 let node_count = layout.count_nodes().await?;
1303 if node_count == 0 {
1304 return Err(anyhow!("network has no nodes to start"));
1305 }
1306
1307 ensure_sidecar_available(&layout, node_count).await?;
1308
1309 let state_path = layout.net_dir().join(STATE_FILE_NAME);
1310 if !tokio_fs::try_exists(&state_path).await.unwrap_or(false) {
1311 let protocol_version = match &setup_result {
1312 Some(result) => result.protocol_version.clone(),
1313 None => latest_layout_protocol_version(&layout).await?,
1314 };
1315 assets::prepare_genesis_hooks(&layout, &protocol_version).await?;
1316 }
1317 let mut state = State::new(state_path).await?;
1318
1319 let rust_log = log_level.clone();
1320 process::start(
1321 &layout,
1322 &StartPlan {
1323 rust_log: rust_log.clone(),
1324 },
1325 &mut state,
1326 )
1327 .await?;
1328
1329 let managed = Arc::new(ManagedNetwork {
1330 layout: layout.clone(),
1331 state: Arc::new(Mutex::new(state)),
1332 node_count: AtomicU32::new(node_count),
1333 rust_log,
1334 seed,
1335 sse_store: Arc::new(SseStore::new(DEFAULT_SSE_HISTORY_CAPACITY)),
1336 last_block_hook_hash: Mutex::new(None),
1337 shutdown: Arc::new(AtomicBool::new(false)),
1338 control_shutdown: Arc::new(AtomicBool::new(false)),
1339 asset_mutation_lock: Mutex::new(()),
1340 sse_tasks: Mutex::new(Vec::new()),
1341 control_task: Mutex::new(None),
1342 });
1343 spawn_pid_sync_tasks(Arc::clone(&managed.state)).await;
1344
1345 self.spawn_sse_collectors(&managed).await;
1346 if let Err(err) = self.spawn_control_server(&managed).await {
1347 let _ = managed.stop().await;
1348 return Err(err);
1349 }
1350
1351 self.managed
1352 .lock()
1353 .await
1354 .insert(network_name.clone(), Arc::clone(&managed));
1355
1356 Ok(SpawnNetworkResponse {
1357 network_name,
1358 node_count,
1359 managed: true,
1360 already_running: false,
1361 forced_setup: force_setup,
1362 })
1363 }
1364
1365 async fn spawn_sse_collectors(&self, network: &Arc<ManagedNetwork>) {
1366 let mut tasks = Vec::new();
1367 for node_id in 1..=network.node_count.load(Ordering::SeqCst) {
1368 let endpoint = assets::sse_endpoint(node_id);
1369 let network = Arc::clone(network);
1370 let task = tokio::spawn(async move {
1371 run_sse_listener(network, node_id, endpoint).await;
1372 });
1373 tasks.push(task);
1374 }
1375 let mut guard = network.sse_tasks.lock().await;
1376 guard.extend(tasks);
1377 }
1378
1379 async fn spawn_control_server(&self, network: &Arc<ManagedNetwork>) -> Result<()> {
1380 let socket_path = network.layout.control_socket_path();
1381 if let Ok(metadata) = tokio_fs::symlink_metadata(&socket_path).await {
1382 if metadata.is_dir() {
1383 tokio_fs::remove_dir_all(&socket_path).await?;
1384 } else {
1385 tokio_fs::remove_file(&socket_path).await?;
1386 }
1387 }
1388 if let Some(parent) = socket_path.parent() {
1389 tokio_fs::create_dir_all(parent).await?;
1390 }
1391
1392 let listener = UnixListener::bind(&socket_path)?;
1393 let network_for_task = Arc::clone(network);
1394 let shutdown = Arc::clone(&network_for_task.control_shutdown);
1395 let task = tokio::spawn(async move {
1396 loop {
1397 if shutdown.load(Ordering::SeqCst) {
1398 break;
1399 }
1400
1401 let accepted =
1402 tokio::time::timeout(Duration::from_millis(250), listener.accept()).await;
1403 let (stream, _) = match accepted {
1404 Ok(Ok(pair)) => pair,
1405 Ok(Err(_)) => break,
1406 Err(_) => continue,
1407 };
1408 let network = Arc::clone(&network_for_task);
1409 tokio::spawn(async move {
1410 handle_managed_control_stream(stream, network).await;
1411 });
1412 }
1413
1414 let _ = tokio_fs::remove_file(&socket_path).await;
1415 });
1416 *network.control_task.lock().await = Some(task);
1417 Ok(())
1418 }
1419
1420 async fn stage_protocol(
1421 &self,
1422 network_name: &str,
1423 asset: Option<&str>,
1424 custom_asset: Option<&str>,
1425 legacy_asset_name: Option<&str>,
1426 protocol_version: &str,
1427 activation_point: u64,
1428 ) -> Result<StageProtocolResponse> {
1429 let asset_selector = stage_asset_selector(asset, custom_asset, legacy_asset_name)?;
1430 let managed = {
1431 let guard = self.managed.lock().await;
1432 guard.get(network_name).cloned()
1433 };
1434
1435 if let Some(network) = managed
1436 && network.is_running().await
1437 {
1438 if let Ok(Some(current_era)) = read_current_era_from_status(1).await
1439 && activation_point <= current_era
1440 {
1441 return Err(anyhow!(
1442 "activation point {} must be greater than current era {}",
1443 activation_point,
1444 current_era
1445 ));
1446 }
1447
1448 let request = ControlRequest::StageProtocol {
1449 asset: match &asset_selector {
1450 AssetSelector::Versioned(asset) => Some(asset.clone()),
1451 AssetSelector::LatestVersioned | AssetSelector::Custom(_) => None,
1452 },
1453 custom_asset: match &asset_selector {
1454 AssetSelector::Custom(asset) => Some(asset.clone()),
1455 AssetSelector::LatestVersioned | AssetSelector::Versioned(_) => None,
1456 },
1457 asset_name: None,
1458 protocol_version: protocol_version.to_string(),
1459 activation_point,
1460 chainspec_overrides: Vec::new(),
1461 restart_sidecars: true,
1462 rust_log: Some(network.rust_log.clone()),
1463 };
1464 let socket_path = network.layout.control_socket_path();
1465 return match send_request(&socket_path, &request).await {
1466 Ok(ControlResponse::Ok { result }) => match result {
1467 ControlResult::StageProtocol {
1468 live_mode,
1469 staged_nodes,
1470 restarted_sidecars,
1471 } => Ok(StageProtocolResponse {
1472 network_name: network_name.to_string(),
1473 live_mode,
1474 staged_nodes,
1475 restarted_sidecars,
1476 }),
1477 ControlResult::RuntimeStatus { .. } => Err(anyhow!(
1478 "unexpected runtime_status response from {}",
1479 socket_path.display()
1480 )),
1481 ControlResult::AddNodes { .. } => Err(anyhow!(
1482 "unexpected add_nodes response from {}",
1483 socket_path.display()
1484 )),
1485 },
1486 Ok(ControlResponse::Error { error }) => Err(anyhow!(error)),
1487 Err(err) => Err(anyhow!(
1488 "failed to reach managed control socket {}: {}",
1489 socket_path.display(),
1490 err
1491 )),
1492 };
1493 }
1494
1495 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1496 if !layout.exists().await {
1497 return Err(anyhow!(
1498 "assets for {} not found under {}",
1499 network_name,
1500 layout.net_dir().display()
1501 ));
1502 }
1503
1504 let staged = assets::stage_protocol(
1505 &layout,
1506 &StageProtocolOptions {
1507 asset: asset_selector,
1508 protocol_version: protocol_version.to_string(),
1509 activation_point,
1510 chainspec_overrides: Vec::new(),
1511 },
1512 )
1513 .await?;
1514
1515 Ok(StageProtocolResponse {
1516 network_name: network_name.to_string(),
1517 live_mode: false,
1518 staged_nodes: staged.staged_nodes,
1519 restarted_sidecars: Vec::new(),
1520 })
1521 }
1522
1523 async fn wait_network_ready(
1524 &self,
1525 network_name: &str,
1526 timeout_seconds: Option<u64>,
1527 ) -> Result<WaitReadyResponse> {
1528 let network = self.get_network(network_name).await?;
1529 let timeout = Duration::from_secs(timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1530 let deadline = Instant::now() + timeout;
1531
1532 loop {
1533 let running = ensure_running_network(&network).await.is_ok();
1534 if running {
1535 let status = check_rest_ready(&network).await;
1536 if let Ok(rest_by_node) = status {
1537 let state = network.state.lock().await;
1538 let block_observed = state.last_block_height.is_some()
1539 || rest_by_node.values().any(rest_has_block);
1540 if block_observed {
1541 return Ok(WaitReadyResponse {
1542 network_name: network_name.to_string(),
1543 ready: true,
1544 node_count: network.node_count.load(Ordering::SeqCst),
1545 rest: rest_by_node,
1546 last_block_height: state.last_block_height,
1547 });
1548 }
1549 }
1550 }
1551
1552 if Instant::now() >= deadline {
1553 return Ok(WaitReadyResponse {
1554 network_name: network_name.to_string(),
1555 ready: false,
1556 node_count: network.node_count.load(Ordering::SeqCst),
1557 rest: HashMap::new(),
1558 last_block_height: None,
1559 });
1560 }
1561
1562 tokio::time::sleep(Duration::from_millis(500)).await;
1563 }
1564 }
1565
1566 async fn despawn_network(&self, network_name: &str, purge: bool) -> Result<DespawnResponse> {
1567 let managed = {
1568 let mut guard = self.managed.lock().await;
1569 guard.remove(network_name)
1570 };
1571
1572 let Some(network) = managed else {
1573 return Err(anyhow!("network '{}' is not managed", network_name));
1574 };
1575
1576 network.stop().await?;
1577 if purge {
1578 assets::teardown(&network.layout).await?;
1579 }
1580
1581 Ok(DespawnResponse {
1582 network_name: network_name.to_string(),
1583 purged: purge,
1584 })
1585 }
1586
1587 async fn list_networks(&self) -> Result<ListNetworksResponse> {
1588 let discovered = discover_network_names(&self.assets_root).await?;
1589 let managed_snapshot = {
1590 let guard = self.managed.lock().await;
1591 guard
1592 .iter()
1593 .map(|(name, network)| (name.clone(), Arc::clone(network)))
1594 .collect::<Vec<_>>()
1595 };
1596
1597 let mut rows = Vec::new();
1598
1599 for name in &discovered {
1600 if let Some((_, network)) = managed_snapshot.iter().find(|(n, _)| n == name) {
1601 rows.push(NetworkRow {
1602 network_name: name.clone(),
1603 discovered: true,
1604 managed: true,
1605 running: network.is_running().await,
1606 node_count: Some(network.node_count.load(Ordering::SeqCst)),
1607 });
1608 } else {
1609 let layout = AssetsLayout::new(self.assets_root.clone(), name.clone());
1610 rows.push(NetworkRow {
1611 network_name: name.clone(),
1612 discovered: true,
1613 managed: false,
1614 running: false,
1615 node_count: layout.count_nodes().await.ok(),
1616 });
1617 }
1618 }
1619
1620 for (name, network) in managed_snapshot {
1621 if !discovered.iter().any(|candidate| candidate == &name) {
1622 rows.push(NetworkRow {
1623 network_name: name,
1624 discovered: false,
1625 managed: true,
1626 running: network.is_running().await,
1627 node_count: Some(network.node_count.load(Ordering::SeqCst)),
1628 });
1629 }
1630 }
1631
1632 rows.sort_by(|a, b| a.network_name.cmp(&b.network_name));
1633
1634 Ok(ListNetworksResponse { networks: rows })
1635 }
1636
1637 async fn managed_processes(
1638 &self,
1639 request: ManagedProcessesRequest,
1640 ) -> Result<ManagedProcessesResponse> {
1641 let network = self.get_network(&request.network_name).await?;
1642 let running_only = request.running_only.unwrap_or(true);
1643 let process_name = request
1644 .process_name
1645 .as_deref()
1646 .map(str::trim)
1647 .filter(|name| !name.is_empty())
1648 .map(ToString::to_string);
1649
1650 let process_name_lc = process_name.as_ref().map(|name| name.to_ascii_lowercase());
1651 let state = network.state.lock().await;
1652 let mut processes = Vec::new();
1653 for process in &state.processes {
1654 if let Some(name_lc) = process_name_lc.as_deref()
1655 && !process.id.to_ascii_lowercase().contains(name_lc)
1656 {
1657 continue;
1658 }
1659
1660 let pid = process.current_pid();
1661 let running = matches!(process.last_status, ProcessStatus::Running)
1662 && pid.is_some_and(is_pid_running);
1663 if running_only && !running {
1664 continue;
1665 }
1666
1667 processes.push(ManagedProcessRow {
1668 id: process.id.clone(),
1669 node_id: process.node_id,
1670 kind: process_kind_name(&process.kind).to_string(),
1671 pid,
1672 running,
1673 last_status: process_status_name(&process.last_status).to_string(),
1674 command: process.command.clone(),
1675 args: process.args.clone(),
1676 cwd: process.cwd.clone(),
1677 stdout_path: process.stdout_path.clone(),
1678 stderr_path: process.stderr_path.clone(),
1679 });
1680 }
1681
1682 Ok(ManagedProcessesResponse {
1683 network_name: request.network_name,
1684 running_only,
1685 process_name,
1686 processes,
1687 })
1688 }
1689
1690 async fn get_network(&self, network_name: &str) -> Result<Arc<ManagedNetwork>> {
1691 let guard = self.managed.lock().await;
1692 guard.get(network_name).cloned().ok_or_else(|| {
1693 anyhow!(
1694 "network '{}' is not managed; call spawn_network first",
1695 network_name
1696 )
1697 })
1698 }
1699
1700 async fn stop_all_networks(&self) -> Result<()> {
1701 let managed = {
1702 let mut guard = self.managed.lock().await;
1703 guard
1704 .drain()
1705 .map(|(_, network)| network)
1706 .collect::<Vec<_>>()
1707 };
1708
1709 let mut errors = Vec::new();
1710 for network in managed {
1711 if let Err(err) = network.stop().await {
1712 errors.push(err.to_string());
1713 }
1714 }
1715
1716 if errors.is_empty() {
1717 Ok(())
1718 } else {
1719 Err(anyhow!(errors.join("\n")))
1720 }
1721 }
1722
1723 async fn raw_rpc_query(
1724 &self,
1725 node_id: u32,
1726 method: &str,
1727 params: Option<Value>,
1728 ) -> Result<Value> {
1729 let endpoint = assets::rpc_endpoint(node_id);
1730 let payload = match params {
1731 Some(params) => json!({
1732 "id": 1,
1733 "jsonrpc": "2.0",
1734 "method": method,
1735 "params": params,
1736 }),
1737 None => json!({
1738 "id": 1,
1739 "jsonrpc": "2.0",
1740 "method": method,
1741 }),
1742 };
1743
1744 let response = self
1745 .http
1746 .post(endpoint)
1747 .json(&payload)
1748 .send()
1749 .await?
1750 .error_for_status()?;
1751 Ok(response.json::<Value>().await?)
1752 }
1753
1754 async fn list_derived_accounts(&self, network_name: &str) -> Result<Vec<DerivedAccountRow>> {
1755 let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1756 let csv = assets::derived_accounts_summary(&layout)
1757 .await
1758 .ok_or_else(|| {
1759 anyhow!(
1760 "missing derived-accounts.csv for network '{}'",
1761 network_name
1762 )
1763 })?;
1764
1765 let seed = {
1766 let managed = self.managed.lock().await;
1767 managed
1768 .get(network_name)
1769 .map(|network| Arc::clone(&network.seed))
1770 };
1771
1772 parse_derived_accounts_csv(&csv, seed).await
1773 }
1774
1775 async fn derived_account_for_path(
1776 &self,
1777 network: &Arc<ManagedNetwork>,
1778 path: &str,
1779 ) -> Result<DerivedSigner> {
1780 let material =
1781 assets::derive_account_from_seed_path(Arc::clone(&network.seed), path).await?;
1782 let secret_key = SecretKey::from_pem(&material.secret_key_pem)?;
1783 Ok(DerivedSigner {
1784 public_key_hex: material.public_key_hex,
1785 account_hash: material.account_hash,
1786 secret_key,
1787 })
1788 }
1789}
1790
1791async fn handle_managed_control_stream(
1792 mut stream: tokio::net::UnixStream,
1793 network: Arc<ManagedNetwork>,
1794) {
1795 let mut request_bytes = Vec::new();
1796 let response = match stream.read_to_end(&mut request_bytes).await {
1797 Ok(_) => match serde_json::from_slice::<ControlRequest>(&request_bytes) {
1798 Ok(request) => handle_managed_control_request(&network, request).await,
1799 Err(err) => ControlResponse::Error {
1800 error: format!("invalid control request: {}", err),
1801 },
1802 },
1803 Err(err) => ControlResponse::Error {
1804 error: format!("failed to read control request: {}", err),
1805 },
1806 };
1807
1808 let response_bytes = serde_json::to_vec(&response).unwrap_or_else(|err| {
1809 format!(
1810 "{{\"status\":\"error\",\"error\":\"failed to serialize control response: {}\"}}",
1811 err
1812 )
1813 .into_bytes()
1814 });
1815 let _ = stream.write_all(&response_bytes).await;
1816 let _ = stream.shutdown().await;
1817}
1818
1819async fn spawn_added_sse_collectors(network: &Arc<ManagedNetwork>, node_ids: &[u32]) {
1820 let mut tasks = Vec::new();
1821 for node_id in node_ids {
1822 let node_id = *node_id;
1823 let endpoint = assets::sse_endpoint(node_id);
1824 let network = Arc::clone(network);
1825 let task = tokio::spawn(async move {
1826 run_sse_listener(network, node_id, endpoint).await;
1827 });
1828 tasks.push(task);
1829 }
1830 let mut guard = network.sse_tasks.lock().await;
1831 guard.extend(tasks);
1832}
1833
1834async fn handle_managed_control_request(
1835 network: &Arc<ManagedNetwork>,
1836 request: ControlRequest,
1837) -> ControlResponse {
1838 match request {
1839 ControlRequest::RuntimeStatus => {
1840 let state = network.state.lock().await;
1841 ControlResponse::Ok {
1842 result: ControlResult::RuntimeStatus {
1843 running_node_ids: running_node_ids(&state),
1844 last_block_height: state.last_block_height,
1845 },
1846 }
1847 }
1848 ControlRequest::StageProtocol {
1849 asset,
1850 custom_asset,
1851 asset_name,
1852 protocol_version,
1853 activation_point,
1854 chainspec_overrides,
1855 restart_sidecars,
1856 rust_log,
1857 } => {
1858 let asset_selector = match stage_asset_selector(
1859 asset.as_deref(),
1860 custom_asset.as_deref(),
1861 asset_name.as_deref(),
1862 ) {
1863 Ok(asset_selector) => asset_selector,
1864 Err(err) => {
1865 return ControlResponse::Error {
1866 error: err.to_string(),
1867 };
1868 }
1869 };
1870 let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1871 if let Err(err) =
1872 assets::ensure_consensus_keys(&network.layout, Arc::clone(&network.seed)).await
1873 {
1874 return ControlResponse::Error {
1875 error: format!("failed to recreate consensus keys: {}", err),
1876 };
1877 }
1878
1879 let staged = assets::stage_protocol(
1880 &network.layout,
1881 &StageProtocolOptions {
1882 asset: asset_selector,
1883 protocol_version,
1884 activation_point,
1885 chainspec_overrides,
1886 },
1887 )
1888 .await;
1889 let staged = match staged {
1890 Ok(staged) => staged,
1891 Err(err) => {
1892 return ControlResponse::Error {
1893 error: err.to_string(),
1894 };
1895 }
1896 };
1897
1898 let mut restarted_sidecars = Vec::new();
1899 if restart_sidecars {
1900 let rust_log = rust_log.unwrap_or_else(|| network.rust_log.clone());
1901 let mut state = network.state.lock().await;
1902 match process::restart_sidecars(&network.layout, &mut state, &rust_log).await {
1903 Ok(restarted) => {
1904 for proc in restarted {
1905 restarted_sidecars.push(proc.record.node_id);
1906 }
1907 }
1908 Err(err) => {
1909 return ControlResponse::Error {
1910 error: err.to_string(),
1911 };
1912 }
1913 }
1914 }
1915
1916 ControlResponse::Ok {
1917 result: ControlResult::StageProtocol {
1918 live_mode: true,
1919 staged_nodes: staged.staged_nodes,
1920 restarted_sidecars,
1921 },
1922 }
1923 }
1924 ControlRequest::AddNodes { count } => {
1925 let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1926 let added = match assets::add_nodes(
1927 &network.layout,
1928 &AddNodesOptions {
1929 count,
1930 seed: Arc::clone(&network.seed),
1931 },
1932 )
1933 .await
1934 {
1935 Ok(added) => added,
1936 Err(err) => {
1937 return ControlResponse::Error {
1938 error: err.to_string(),
1939 };
1940 }
1941 };
1942
1943 let started = {
1944 let mut state = network.state.lock().await;
1945 process::start_added_nodes(
1946 &network.layout,
1947 &mut state,
1948 &added.added_node_ids,
1949 added.total_nodes,
1950 &network.rust_log,
1951 )
1952 .await
1953 };
1954 let started = match started {
1955 Ok(started) => started,
1956 Err(err) => {
1957 let error =
1958 rollback_added_nodes_after_start_error(&network.layout, &added, err).await;
1959 return ControlResponse::Error { error };
1960 }
1961 };
1962 let process_ids = started
1963 .iter()
1964 .map(|proc| proc.record.id.clone())
1965 .collect::<Vec<_>>();
1966 let started_processes = started.len() as u32;
1967 network
1968 .node_count
1969 .store(added.total_nodes, Ordering::SeqCst);
1970 spawn_added_sse_collectors(network, &added.added_node_ids).await;
1971 spawn_pid_sync_tasks_for_ids(Arc::clone(&network.state), &process_ids).await;
1972
1973 ControlResponse::Ok {
1974 result: ControlResult::AddNodes {
1975 added_node_ids: added.added_node_ids,
1976 total_nodes: added.total_nodes,
1977 started_processes,
1978 },
1979 }
1980 }
1981 }
1982}
1983
1984async fn rollback_added_nodes_after_start_error(
1985 layout: &AssetsLayout,
1986 added: &assets::AddNodesResult,
1987 err: anyhow::Error,
1988) -> String {
1989 match assets::rollback_added_nodes(layout, &added.added_node_ids).await {
1990 Ok(()) => err.to_string(),
1991 Err(rollback_err) => format!(
1992 "{}; failed to roll back added node assets: {}",
1993 err, rollback_err
1994 ),
1995 }
1996}
1997
1998async fn read_current_era_from_status(node_id: u32) -> Result<Option<u64>> {
1999 let value = fetch_rest_status(node_id).await?;
2000 Ok(value
2001 .get("last_added_block_info")
2002 .and_then(|info| info.get("era_id"))
2003 .and_then(|era_id| {
2004 era_id
2005 .as_u64()
2006 .or_else(|| era_id.as_str().and_then(|raw| raw.parse::<u64>().ok()))
2007 }))
2008}
2009
2010#[derive(Debug)]
2011struct SseStore {
2012 sequence: AtomicU64,
2013 events: Mutex<VecDeque<SseRecord>>,
2014 notify: Notify,
2015 capacity: usize,
2016}
2017
2018impl SseStore {
2019 fn new(capacity: usize) -> Self {
2020 Self {
2021 sequence: AtomicU64::new(0),
2022 events: Mutex::new(VecDeque::new()),
2023 notify: Notify::new(),
2024 capacity,
2025 }
2026 }
2027
2028 async fn latest_sequence(&self) -> u64 {
2029 self.sequence.load(Ordering::SeqCst)
2030 }
2031
2032 async fn push(&self, node_id: u32, event: SseEvent) {
2033 let sequence = self.sequence.fetch_add(1, Ordering::SeqCst) + 1;
2034 let event_type = sse_event_type(&event).to_string();
2035 let payload = sse_event_payload(&event);
2036
2037 let record = SseRecord {
2038 sequence,
2039 timestamp_rfc3339: timestamp_prefix(),
2040 node_id,
2041 event_type,
2042 payload,
2043 };
2044
2045 let mut guard = self.events.lock().await;
2046 guard.push_back(record);
2047 while guard.len() > self.capacity {
2048 let _ = guard.pop_front();
2049 }
2050 drop(guard);
2051
2052 self.notify.notify_waiters();
2053 }
2054
2055 async fn wait_next(
2056 &self,
2057 after_sequence: u64,
2058 filter: &SseFilter,
2059 timeout: Duration,
2060 ) -> Result<SseRecord> {
2061 let deadline = Instant::now() + timeout;
2062 loop {
2063 if let Some(record) = self.find_first_after(after_sequence, filter).await {
2064 return Ok(record);
2065 }
2066
2067 if Instant::now() >= deadline {
2068 return Err(anyhow!("timed out waiting for SSE event"));
2069 }
2070
2071 let remaining = deadline.saturating_duration_since(Instant::now());
2072 tokio::time::timeout(remaining, self.notify.notified()).await?;
2073 }
2074 }
2075
2076 async fn find_first_after(&self, after_sequence: u64, filter: &SseFilter) -> Option<SseRecord> {
2077 let guard = self.events.lock().await;
2078 guard
2079 .iter()
2080 .find(|event| event.sequence > after_sequence && filter.matches(event))
2081 .cloned()
2082 }
2083
2084 async fn history(
2085 &self,
2086 before_sequence: Option<u64>,
2087 limit: usize,
2088 filter: &SseFilter,
2089 ) -> SseHistoryPage {
2090 let before = before_sequence.unwrap_or(u64::MAX);
2091 let guard = self.events.lock().await;
2092
2093 let mut matched = guard
2094 .iter()
2095 .filter(|event| event.sequence < before)
2096 .filter(|event| filter.matches(event))
2097 .cloned()
2098 .collect::<Vec<_>>();
2099
2100 let total = matched.len();
2101
2102 if matched.len() > limit {
2103 matched = matched.split_off(matched.len() - limit);
2104 }
2105
2106 let next_before_sequence = matched.first().map(|event| event.sequence);
2107 SseHistoryPage {
2108 total_matching: total,
2109 returned: matched.len(),
2110 next_before_sequence,
2111 events: matched,
2112 }
2113 }
2114}
2115
2116#[derive(Debug, Default)]
2117struct SseFilter {
2118 include_event_types: Vec<String>,
2119 exclude_event_types: Vec<String>,
2120}
2121
2122impl SseFilter {
2123 fn matches(&self, event: &SseRecord) -> bool {
2124 let include = if self.include_event_types.is_empty() {
2125 true
2126 } else {
2127 self.include_event_types
2128 .iter()
2129 .any(|name| name == &event.event_type)
2130 };
2131 if !include {
2132 return false;
2133 }
2134 !self
2135 .exclude_event_types
2136 .iter()
2137 .any(|name| name == &event.event_type)
2138 }
2139}
2140
2141#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2142struct SseRecord {
2143 sequence: u64,
2144 timestamp_rfc3339: String,
2145 node_id: u32,
2146 event_type: String,
2147 payload: Value,
2148}
2149
2150#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2151struct SseHistoryPage {
2152 total_matching: usize,
2153 returned: usize,
2154 next_before_sequence: Option<u64>,
2155 events: Vec<SseRecord>,
2156}
2157
2158#[derive(Debug)]
2159struct DerivedSigner {
2160 public_key_hex: String,
2161 account_hash: String,
2162 secret_key: SecretKey,
2163}
2164
2165#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2166struct SpawnNetworkRequest {
2167 network_name: Option<String>,
2168 asset: Option<String>,
2169 custom_asset: Option<String>,
2170 protocol_version: Option<String>,
2171 node_count: Option<u32>,
2172 users: Option<u32>,
2173 delay: Option<u64>,
2174 log_level: Option<String>,
2175 node_log_format: Option<String>,
2176 seed: Option<String>,
2177 force_setup: Option<bool>,
2178}
2179
2180#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2181struct SpawnNetworkResponse {
2182 network_name: String,
2183 node_count: u32,
2184 managed: bool,
2185 already_running: bool,
2186 forced_setup: bool,
2187}
2188
2189#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2190struct WaitNetworkReadyRequest {
2191 network_name: String,
2192 timeout_seconds: Option<u64>,
2193}
2194
2195#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2196struct WaitReadyResponse {
2197 network_name: String,
2198 ready: bool,
2199 node_count: u32,
2200 rest: HashMap<u32, Value>,
2201 last_block_height: Option<u64>,
2202}
2203
2204#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2205struct StageProtocolRequest {
2206 network_name: String,
2207 asset: Option<String>,
2208 custom_asset: Option<String>,
2209 asset_name: Option<String>,
2210 protocol_version: String,
2211 activation_point: u64,
2212}
2213
2214#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2215struct StageProtocolResponse {
2216 network_name: String,
2217 live_mode: bool,
2218 staged_nodes: u32,
2219 restarted_sidecars: Vec<u32>,
2220}
2221
2222#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2223struct DespawnNetworkRequest {
2224 network_name: String,
2225 purge: Option<bool>,
2226}
2227
2228#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2229struct DespawnResponse {
2230 network_name: String,
2231 purged: bool,
2232}
2233
2234#[derive(Debug, Deserialize, Default, rmcp::schemars::JsonSchema)]
2235struct ListNetworksRequest {}
2236
2237#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2238struct ListNetworksResponse {
2239 networks: Vec<NetworkRow>,
2240}
2241
2242#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2243struct NetworkRow {
2244 network_name: String,
2245 discovered: bool,
2246 managed: bool,
2247 running: bool,
2248 node_count: Option<u32>,
2249}
2250
2251#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2252struct ManagedProcessesRequest {
2253 network_name: String,
2254 process_name: Option<String>,
2255 running_only: Option<bool>,
2256}
2257
2258#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2259struct ManagedProcessesResponse {
2260 network_name: String,
2261 running_only: bool,
2262 process_name: Option<String>,
2263 processes: Vec<ManagedProcessRow>,
2264}
2265
2266#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2267struct ManagedProcessRow {
2268 id: String,
2269 node_id: u32,
2270 kind: String,
2271 pid: Option<u32>,
2272 running: bool,
2273 last_status: String,
2274 command: String,
2275 args: Vec<String>,
2276 cwd: String,
2277 stdout_path: String,
2278 stderr_path: String,
2279}
2280
2281#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2282struct NodeScopedRequest {
2283 network_name: String,
2284 node_id: u32,
2285}
2286
2287#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2288struct RpcQueryRequest {
2289 network_name: String,
2290 node_id: u32,
2291 method: String,
2292 params: Option<Value>,
2293}
2294
2295#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2296struct RpcQueryBalanceRequest {
2297 network_name: String,
2298 node_id: u32,
2299 purse_identifier: String,
2300 block_id: Option<String>,
2301 state_root_hash: Option<String>,
2302}
2303
2304#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2305struct RpcQueryGlobalStateRequest {
2306 network_name: String,
2307 node_id: u32,
2308 key: String,
2309 path: Option<Vec<String>>,
2310 block_id: Option<String>,
2311 state_root_hash: Option<String>,
2312}
2313
2314#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2315struct CurrentBlockRequest {
2316 network_name: String,
2317 node_id: u32,
2318 block_id: Option<String>,
2319}
2320
2321#[derive(Debug, Clone, Copy, Deserialize, Serialize, rmcp::schemars::JsonSchema)]
2322#[serde(rename_all = "lowercase")]
2323enum NodeLogStream {
2324 Stdout,
2325 Stderr,
2326}
2327
2328#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2329struct GetNodeLogsRequest {
2330 network_name: String,
2331 node_id: u32,
2332 stream: NodeLogStream,
2333 limit: Option<usize>,
2334 before_line: Option<usize>,
2335}
2336
2337#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2338struct LogLine {
2339 line_number: usize,
2340 content: String,
2341}
2342
2343#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2344struct LogPage {
2345 path: String,
2346 total_lines: usize,
2347 returned: usize,
2348 next_before_line: Option<usize>,
2349 lines: Vec<LogLine>,
2350}
2351
2352#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2353struct WaitNextSseRequest {
2354 network_name: String,
2355 include_event_types: Option<Vec<String>>,
2356 exclude_event_types: Option<Vec<String>>,
2357 after_sequence: Option<u64>,
2358 timeout_seconds: Option<u64>,
2359}
2360
2361#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2362struct SseHistoryRequest {
2363 network_name: String,
2364 include_event_types: Option<Vec<String>>,
2365 exclude_event_types: Option<Vec<String>>,
2366 before_sequence: Option<u64>,
2367 limit: Option<usize>,
2368}
2369
2370#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2371struct ListDerivedAccountsRequest {
2372 network_name: String,
2373}
2374
2375#[derive(Debug, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2376struct DerivedAccountRow {
2377 kind: String,
2378 name: String,
2379 key_type: String,
2380 derivation: String,
2381 path: String,
2382 account_hash: String,
2383 balance: String,
2384 public_key: Option<String>,
2385}
2386
2387#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2388struct SendTransactionSignedRequest {
2389 network_name: String,
2390 node_id: u32,
2391 signer_path: String,
2392 transaction: Value,
2393}
2394
2395#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2396struct MakeTransactionPackageCallRequest {
2397 network_name: String,
2398 node_id: u32,
2399 transaction_package_name: String,
2400 transaction_package_version: Option<EntityVersion>,
2401 session_entry_point: String,
2402 #[serde(alias = "session_args_json")]
2403 session_args: Option<Value>,
2404 signer_path: Option<String>,
2405 initiator_public_key: Option<String>,
2406 chain_name: Option<String>,
2407 ttl_millis: Option<u64>,
2408 gas_price_tolerance: Option<u8>,
2409 payment_amount: Option<u64>,
2410 runtime_transferred_value: Option<u64>,
2411 runtime_seed_hex: Option<String>,
2412}
2413
2414#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2415struct MakeTransactionContractCallRequest {
2416 network_name: String,
2417 node_id: u32,
2418 transaction_contract_hash: String,
2419 session_entry_point: String,
2420 #[serde(alias = "session_args_json")]
2421 session_args: Option<Value>,
2422 signer_path: Option<String>,
2423 initiator_public_key: Option<String>,
2424 chain_name: Option<String>,
2425 ttl_millis: Option<u64>,
2426 gas_price_tolerance: Option<u8>,
2427 payment_amount: Option<u64>,
2428 runtime_transferred_value: Option<u64>,
2429 runtime_seed_hex: Option<String>,
2430}
2431
2432#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2433struct MakeTransactionSessionWasmRequest {
2434 network_name: String,
2435 node_id: u32,
2436 wasm_path: String,
2437 #[serde(alias = "session_args_json")]
2438 session_args: Option<Value>,
2439 is_install_upgrade: Option<bool>,
2440 signer_path: Option<String>,
2441 initiator_public_key: Option<String>,
2442 chain_name: Option<String>,
2443 ttl_millis: Option<u64>,
2444 gas_price_tolerance: Option<u8>,
2445 payment_amount: Option<u64>,
2446 runtime_transferred_value: Option<u64>,
2447 runtime_seed_hex: Option<String>,
2448}
2449
2450#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2451struct SendSessionWasmRequest {
2452 network_name: String,
2453 node_id: u32,
2454 signer_path: String,
2455 wasm_path: String,
2456 #[serde(alias = "session_args_json")]
2457 session_args: Option<Value>,
2458 chain_name: Option<String>,
2459 is_install_upgrade: Option<bool>,
2460 ttl_millis: Option<u64>,
2461 gas_price_tolerance: Option<u8>,
2462 payment_amount: Option<u64>,
2463 runtime_transferred_value: Option<u64>,
2464 runtime_seed_hex: Option<String>,
2465}
2466
2467#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2468struct TransferTokensRequest {
2469 network_name: String,
2470 node_id: u32,
2471 from_path: String,
2472 to_path: String,
2473 amount: String,
2474 transfer_id: Option<u64>,
2475 chain_name: Option<String>,
2476 ttl_millis: Option<u64>,
2477 gas_price_tolerance: Option<u8>,
2478 payment_amount: Option<u64>,
2479}
2480
2481#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2482struct WaitTransactionRequest {
2483 network_name: String,
2484 node_id: u32,
2485 transaction_hash: String,
2486 timeout_seconds: Option<u64>,
2487 poll_interval_millis: Option<u64>,
2488}
2489
2490#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2491struct GetTransactionRequest {
2492 network_name: String,
2493 node_id: u32,
2494 transaction_hash: String,
2495}
2496
2497#[derive(Debug, Deserialize)]
2498#[serde(rename_all = "snake_case")]
2499struct RestStatusProbe {
2500 reactor_state: Option<String>,
2501}
2502
2503fn to_mcp_error(err: impl std::fmt::Display) -> ErrorData {
2504 ErrorData::internal_error(err.to_string(), None)
2505}
2506
2507fn internal_serde_error(err: serde_json::Error) -> ErrorData {
2508 ErrorData::internal_error(format!("serde error: {}", err), None)
2509}
2510
2511fn ok_value(value: Value) -> rmcp::model::CallToolResult {
2512 rmcp::model::CallToolResult::structured(value)
2513}
2514
2515fn parse_transaction_json(value: Value) -> std::result::Result<Transaction, ErrorData> {
2516 if let Value::String(encoded) = &value {
2517 let _ = encoded;
2518 return Err(ErrorData::invalid_params(
2519 "invalid transaction payload: encoded JSON strings are not supported. Provide transaction as a typed JSON object",
2520 None,
2521 ));
2522 }
2523
2524 if let Ok(transaction) = serde_json::from_value::<Transaction>(value.clone()) {
2525 return Ok(transaction);
2526 }
2527
2528 if let Some(inner) = value.get("transaction") {
2529 return parse_transaction_json(inner.clone());
2530 }
2531
2532 Err(ErrorData::invalid_params(
2533 "failed to parse transaction JSON; expected Transaction object or { transaction: Transaction }, with typed JSON values only.",
2534 None,
2535 ))
2536}
2537
2538fn parse_transaction_hash_input(input: &str) -> Result<TransactionHash> {
2539 let value = input.trim();
2540 if value.is_empty() {
2541 return Err(anyhow!("transaction_hash must not be empty"));
2542 }
2543
2544 let unwrapped = value
2545 .strip_prefix("transaction-v1-hash(")
2546 .and_then(|inner| inner.strip_suffix(')'))
2547 .or_else(|| {
2548 value
2549 .strip_prefix("deploy-hash(")
2550 .and_then(|inner| inner.strip_suffix(')'))
2551 })
2552 .unwrap_or(value);
2553
2554 if unwrapped.contains("..") {
2555 return Err(anyhow!(
2556 "transaction_hash appears abbreviated; provide full hex digest"
2557 ));
2558 }
2559
2560 let normalized = unwrapped.strip_prefix("0x").unwrap_or(unwrapped);
2561 let digest = Digest::from_hex(normalized)
2562 .map_err(|err| anyhow!("failed to parse transaction hash digest: {}", err))?;
2563 Ok(TransactionHash::from(TransactionV1Hash::from(digest)))
2564}
2565
2566fn mcp_rpc_client(network_name: &str, node_id: u32) -> Result<CasperClient> {
2567 CasperClient::new(
2568 network_name.to_string(),
2569 vec![assets::rpc_endpoint(node_id)],
2570 )
2571 .map_err(|err| {
2572 anyhow!(
2573 "failed to initialize rpc client for node {}: {}",
2574 node_id,
2575 err
2576 )
2577 })
2578}
2579
2580fn extract_rpc_result(response: Value) -> Result<Value> {
2581 if let Some(error) = response.get("error") {
2582 return Err(anyhow!("rpc request failed: {}", error));
2583 }
2584 response
2585 .get("result")
2586 .cloned()
2587 .ok_or_else(|| anyhow!("rpc response missing result field"))
2588}
2589
2590async fn fetch_block_result(
2591 manager: &NetworkManager,
2592 network_name: &str,
2593 node_id: u32,
2594 block_id: Option<&str>,
2595) -> Result<Value> {
2596 let block_id = normalize_optional_identifier(block_id);
2597 if block_id.is_empty() {
2598 let rpc = mcp_rpc_client(network_name, node_id)?;
2599 let result = rpc.get_block().await?;
2600 return serde_json::to_value(result).map_err(Into::into);
2601 }
2602
2603 let block_identifier = parse_block_identifier_value(&block_id)?;
2604 let response = manager
2605 .raw_rpc_query(
2606 node_id,
2607 "chain_get_block",
2608 Some(json!({
2609 "block_identifier": block_identifier
2610 })),
2611 )
2612 .await?;
2613 extract_rpc_result(response)
2614}
2615
2616fn build_query_global_state_params(
2617 key: &str,
2618 path: Vec<String>,
2619 block_id: &str,
2620 state_root_hash: &str,
2621) -> Result<Value> {
2622 let key = parse_query_key(key)?;
2623 let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2624 let mut params = serde_json::Map::new();
2625 params.insert("key".to_string(), Value::String(key));
2626 params.insert(
2627 "path".to_string(),
2628 Value::Array(path.into_iter().map(Value::String).collect()),
2629 );
2630 if let Some(state_identifier) = state_identifier {
2631 params.insert("state_identifier".to_string(), state_identifier);
2632 }
2633 Ok(Value::Object(params))
2634}
2635
2636fn build_query_balance_params(
2637 purse_identifier: &str,
2638 block_id: &str,
2639 state_root_hash: &str,
2640) -> Result<Value> {
2641 let purse_identifier = parse_purse_identifier(purse_identifier)?;
2642 let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2643 let mut params = serde_json::Map::new();
2644 params.insert("purse_identifier".to_string(), purse_identifier);
2645 if let Some(state_identifier) = state_identifier {
2646 params.insert("state_identifier".to_string(), state_identifier);
2647 }
2648 Ok(Value::Object(params))
2649}
2650
2651fn parse_state_identifier(block_id: &str, state_root_hash: &str) -> Result<Option<Value>> {
2652 let block_id = block_id.trim();
2653 if !block_id.is_empty() {
2654 if block_id.len() == Digest::LENGTH * 2 {
2655 Digest::from_hex(block_id)
2656 .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2657 return Ok(Some(json!({
2658 "BlockHash": block_id,
2659 })));
2660 }
2661
2662 let height = block_id
2663 .parse::<u64>()
2664 .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2665 return Ok(Some(json!({
2666 "BlockHeight": height,
2667 })));
2668 }
2669
2670 let state_root_hash = state_root_hash.trim();
2671 if state_root_hash.is_empty() {
2672 return Ok(None);
2673 }
2674 Digest::from_hex(state_root_hash)
2675 .map_err(|err| anyhow!("invalid state_root_hash digest: {}", err))?;
2676 Ok(Some(json!({
2677 "StateRootHash": state_root_hash,
2678 })))
2679}
2680
2681fn parse_block_identifier_value(block_id: &str) -> Result<Value> {
2682 let block_id = block_id.trim();
2683 if block_id.is_empty() {
2684 return Err(anyhow!("block identifier must not be empty"));
2685 }
2686 if block_id.len() == Digest::LENGTH * 2 {
2687 Digest::from_hex(block_id)
2688 .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2689 Ok(json!({
2690 "Hash": block_id,
2691 }))
2692 } else {
2693 let height = block_id
2694 .parse::<u64>()
2695 .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2696 Ok(json!({
2697 "Height": height,
2698 }))
2699 }
2700}
2701
2702fn parse_query_key(key: &str) -> Result<String> {
2703 let key = key.trim();
2704 if key.is_empty() {
2705 return Err(anyhow!("key must not be empty"));
2706 }
2707 if let Ok(contract_hash) = ContractHash::from_formatted_str(key) {
2708 return Ok(Key::Hash(contract_hash.value()).to_formatted_string());
2709 }
2710 if let Ok(parsed) = Key::from_formatted_str(key) {
2711 return Ok(parsed.to_formatted_string());
2712 }
2713 if let Ok(public_key) = PublicKey::from_hex(key) {
2714 return Ok(Key::Account(public_key.to_account_hash()).to_formatted_string());
2715 }
2716 Err(anyhow!(
2717 "failed to parse key for query; expected formatted Key or public key hex"
2718 ))
2719}
2720
2721fn parse_contract_hash_for_invocation(input: &str) -> Result<AddressableEntityHash> {
2722 let input = input.trim();
2723 if input.is_empty() {
2724 return Err(anyhow!("transaction_contract_hash must not be empty"));
2725 }
2726
2727 if let Ok(contract_hash) = ContractHash::from_formatted_str(input) {
2728 return Ok(contract_hash.into());
2729 }
2730
2731 let digest_hex = input
2732 .strip_prefix("hash-")
2733 .or_else(|| input.strip_prefix("contract-hash-"))
2734 .unwrap_or(input);
2735 let bytes = hex_to_bytes(digest_hex)?;
2736 if bytes.len() != Digest::LENGTH {
2737 return Err(anyhow!(
2738 "transaction_contract_hash must be {} bytes",
2739 Digest::LENGTH
2740 ));
2741 }
2742 let mut hash = [0u8; Digest::LENGTH];
2743 hash.copy_from_slice(&bytes);
2744 Ok(ContractHash::new(hash).into())
2745}
2746
2747fn parse_purse_identifier(input: &str) -> Result<Value> {
2748 let input = input.trim();
2749 if input.is_empty() {
2750 return Err(anyhow!("purse_identifier must not be empty"));
2751 }
2752
2753 if input.starts_with("account-hash-") {
2754 casper_types::account::AccountHash::from_formatted_str(input)
2755 .map_err(|err| anyhow!("invalid account hash purse identifier: {}", err))?;
2756 return Ok(json!({
2757 "main_purse_under_account_hash": input,
2758 }));
2759 }
2760
2761 if input.starts_with("entity-") {
2762 return Ok(json!({
2763 "main_purse_under_entity_addr": input,
2764 }));
2765 }
2766
2767 if input.starts_with("uref-") {
2768 URef::from_formatted_str(input)
2769 .map_err(|err| anyhow!("invalid uref purse identifier: {}", err))?;
2770 return Ok(json!({
2771 "purse_uref": input,
2772 }));
2773 }
2774
2775 let public_key = PublicKey::from_hex(input)
2776 .map_err(|err| anyhow!("invalid public key purse identifier: {}", err))?;
2777 Ok(json!({
2778 "main_purse_under_public_key": public_key.to_hex_string(),
2779 }))
2780}
2781
2782fn build_pricing_mode(gas_price_tolerance: Option<u8>, payment_amount: Option<u64>) -> PricingMode {
2783 if let Some(payment_amount) = payment_amount {
2784 PricingMode::PaymentLimited {
2785 payment_amount,
2786 gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2787 standard_payment: true,
2788 }
2789 } else {
2790 PricingMode::Fixed {
2791 gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2792 additional_computation_factor: 0,
2793 }
2794 }
2795}
2796
2797fn parse_optional_seed_hex(seed: Option<&str>) -> Result<Option<[u8; 32]>> {
2798 let Some(seed) = seed else {
2799 return Ok(None);
2800 };
2801 let seed = seed.trim();
2802 if seed.is_empty() {
2803 return Ok(None);
2804 }
2805
2806 let cleaned = seed.strip_prefix("0x").unwrap_or(seed);
2807 let bytes = hex_to_bytes(cleaned)?;
2808 if bytes.len() != 32 {
2809 return Err(anyhow!("runtime_seed_hex must be exactly 32 bytes"));
2810 }
2811
2812 let mut out = [0u8; 32];
2813 out.copy_from_slice(&bytes);
2814 Ok(Some(out))
2815}
2816
2817fn normalize_optional_identifier(value: Option<&str>) -> String {
2818 value.map(str::trim).unwrap_or_default().to_string()
2819}
2820
2821async fn resolve_global_state_identifier(
2822 network_name: &str,
2823 node_id: u32,
2824 block_id: Option<&str>,
2825 state_root_hash: Option<&str>,
2826) -> Result<(String, String)> {
2827 let block_id = normalize_optional_identifier(block_id);
2828 let state_root_hash = normalize_optional_identifier(state_root_hash);
2829
2830 if !block_id.is_empty() || !state_root_hash.is_empty() {
2831 return Ok((block_id, state_root_hash));
2832 }
2833
2834 let rpc = mcp_rpc_client(network_name, node_id)?;
2835 let response = rpc.get_block().await?;
2836 let latest_block = response
2837 .block_with_signatures
2838 .ok_or_else(|| anyhow!("latest block was not returned by chain_get_block"))?;
2839
2840 Ok((latest_block.block.hash().to_hex_string(), String::new()))
2841}
2842
2843fn hex_to_bytes(input: &str) -> Result<Vec<u8>> {
2844 if !input.len().is_multiple_of(2) {
2845 return Err(anyhow!("hex string must have even length"));
2846 }
2847 let mut out = Vec::with_capacity(input.len() / 2);
2848 let mut chars = input.chars();
2849 while let (Some(hi), Some(lo)) = (chars.next(), chars.next()) {
2850 let byte = ((hex_nibble(hi)? as u8) << 4) | (hex_nibble(lo)? as u8);
2851 out.push(byte);
2852 }
2853 Ok(out)
2854}
2855
2856fn hex_nibble(ch: char) -> Result<u32> {
2857 ch.to_digit(16)
2858 .ok_or_else(|| anyhow!("invalid hex character '{}'", ch))
2859}
2860
2861async fn ensure_running_network(
2862 network: &Arc<ManagedNetwork>,
2863) -> std::result::Result<(), ErrorData> {
2864 if network.is_running().await {
2865 Ok(())
2866 } else {
2867 Err(ErrorData::resource_not_found(
2868 "network is not running; call spawn_network then wait_network_ready",
2869 None,
2870 ))
2871 }
2872}
2873
2874async fn check_rest_ready(network: &Arc<ManagedNetwork>) -> Result<HashMap<u32, Value>> {
2875 let node_ids = {
2876 let state = network.state.lock().await;
2877 state
2878 .processes
2879 .iter()
2880 .filter_map(|process| {
2881 if matches!(process.kind, ProcessKind::Node) {
2882 Some(process.node_id)
2883 } else {
2884 None
2885 }
2886 })
2887 .collect::<Vec<_>>()
2888 };
2889
2890 if node_ids.is_empty() {
2891 return Err(anyhow!("network has no node processes"));
2892 }
2893
2894 let mut by_node = HashMap::new();
2895 for node_id in node_ids {
2896 let value = fetch_rest_status(node_id).await?;
2897 let probe: RestStatusProbe = serde_json::from_value(value.clone())
2898 .with_context(|| format!("invalid /status payload for node {}", node_id))?;
2899 if probe.reactor_state.as_deref() != Some("Validate") {
2900 return Err(anyhow!("node {} reactor is not Validate", node_id));
2901 }
2902 by_node.insert(node_id, value);
2903 }
2904
2905 Ok(by_node)
2906}
2907
2908fn rest_has_block(value: &Value) -> bool {
2909 value
2910 .get("last_added_block_info")
2911 .and_then(|entry| entry.get("height"))
2912 .and_then(Value::as_u64)
2913 .is_some()
2914}
2915
2916async fn fetch_rest_status(node_id: u32) -> Result<Value> {
2917 let url = format!("{}/status", assets::rest_endpoint(node_id));
2918 let client = reqwest::Client::builder()
2919 .no_proxy()
2920 .timeout(Duration::from_secs(4))
2921 .build()?;
2922 let response = client.get(url).send().await?.error_for_status()?;
2923 Ok(response.json::<Value>().await?)
2924}
2925
2926async fn claim_block_hook(network: &Arc<ManagedNetwork>, block_hash: &str) -> bool {
2927 let mut last_block_hook_hash = network.last_block_hook_hash.lock().await;
2928 if last_block_hook_hash.as_deref() == Some(block_hash) {
2929 return false;
2930 }
2931 *last_block_hook_hash = Some(block_hash.to_string());
2932 true
2933}
2934
2935async fn run_sse_listener(network: Arc<ManagedNetwork>, node_id: u32, endpoint: String) {
2936 let mut backoff = ExponentialBackoff::default();
2937 let mut connection_version: Option<String> = None;
2938
2939 loop {
2940 if network.shutdown.load(Ordering::SeqCst) {
2941 return;
2942 }
2943
2944 let config = match ListenerConfig::builder()
2945 .with_endpoint(endpoint.clone())
2946 .build()
2947 {
2948 Ok(config) => config,
2949 Err(_) => {
2950 if !sleep_backoff(&mut backoff).await {
2951 return;
2952 }
2953 continue;
2954 }
2955 };
2956
2957 let stream = match sse::listener(config).await {
2958 Ok(stream) => {
2959 backoff.reset();
2960 stream
2961 }
2962 Err(_) => {
2963 if !sleep_backoff(&mut backoff).await {
2964 return;
2965 }
2966 continue;
2967 }
2968 };
2969
2970 futures::pin_mut!(stream);
2971 let mut failed = false;
2972
2973 while let Some(event) = stream.next().await {
2974 if network.shutdown.load(Ordering::SeqCst) {
2975 return;
2976 }
2977
2978 match event {
2979 Ok(event) => {
2980 if let SseEvent::ApiVersion(version) = &event {
2981 connection_version = Some(version.to_string());
2982 }
2983 if let SseEvent::BlockAdded { block_hash, block } = &event {
2984 let _ = record_last_block_height(&network.state, block.height()).await;
2985 assets::spawn_pending_post_genesis_hook(network.layout.clone());
2986 if let Some(protocol_version) = connection_version.as_deref()
2987 && claim_block_hook(&network, &block_hash.to_string()).await
2988 {
2989 assets::spawn_block_added_hook(
2990 network.layout.clone(),
2991 protocol_version.to_string(),
2992 json!({
2993 "block_hash": block_hash.to_string(),
2994 "height": block.height(),
2995 "era_id": block.era_id().value(),
2996 }),
2997 );
2998 }
2999 }
3000 network.sse_store.push(node_id, event).await;
3001 }
3002 Err(_) => {
3003 failed = true;
3004 break;
3005 }
3006 }
3007 }
3008
3009 connection_version = None;
3010 if failed && !sleep_backoff(&mut backoff).await {
3011 return;
3012 }
3013 }
3014}
3015
3016async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
3017 let mut state = state.lock().await;
3018 if state.last_block_height == Some(height) {
3019 return Ok(());
3020 }
3021 state.last_block_height = Some(height);
3022 state.touch().await
3023}
3024
3025async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
3026 if let Some(delay) = backoff.next_backoff() {
3027 tokio::time::sleep(delay).await;
3028 true
3029 } else {
3030 false
3031 }
3032}
3033
3034fn sse_event_type(event: &SseEvent) -> &'static str {
3035 match event {
3036 SseEvent::ApiVersion(_) => "ApiVersion",
3037 SseEvent::DeployAccepted(_) => "DeployAccepted",
3038 SseEvent::BlockAdded { .. } => "BlockAdded",
3039 SseEvent::DeployProcessed(_) => "DeployProcessed",
3040 SseEvent::DeployExpired(_) => "DeployExpired",
3041 SseEvent::TransactionAccepted(_) => "TransactionAccepted",
3042 SseEvent::TransactionProcessed { .. } => "TransactionProcessed",
3043 SseEvent::TransactionExpired { .. } => "TransactionExpired",
3044 SseEvent::Fault { .. } => "Fault",
3045 SseEvent::FinalitySignature(_) => "FinalitySignature",
3046 SseEvent::Step { .. } => "Step",
3047 SseEvent::Shutdown => "Shutdown",
3048 }
3049}
3050
3051fn process_kind_name(kind: &ProcessKind) -> &'static str {
3052 match kind {
3053 ProcessKind::Node => "node",
3054 ProcessKind::Sidecar => "sidecar",
3055 }
3056}
3057
3058fn process_status_name(status: &ProcessStatus) -> &'static str {
3059 match status {
3060 ProcessStatus::Running => "running",
3061 ProcessStatus::Stopped => "stopped",
3062 ProcessStatus::Exited => "exited",
3063 ProcessStatus::Unknown => "unknown",
3064 ProcessStatus::Skipped => "skipped",
3065 }
3066}
3067
3068fn sse_event_payload(event: &SseEvent) -> Value {
3069 match event {
3070 SseEvent::ApiVersion(version) => json!({ "api_version": version.to_string() }),
3071 SseEvent::DeployAccepted(payload) => payload.clone(),
3072 SseEvent::BlockAdded { block_hash, block } => json!({
3073 "block_hash": block_hash.to_string(),
3074 "height": block.height(),
3075 "era_id": block.era_id().value(),
3076 }),
3077 SseEvent::DeployProcessed(payload) => payload.clone(),
3078 SseEvent::DeployExpired(payload) => payload.clone(),
3079 SseEvent::TransactionAccepted(transaction) => {
3080 json!({ "transaction_hash": transaction.hash().to_hex_string() })
3081 }
3082 SseEvent::TransactionProcessed {
3083 transaction_hash,
3084 execution_result,
3085 messages,
3086 ..
3087 } => json!({
3088 "transaction_hash": transaction_hash.to_hex_string(),
3089 "execution_result": execution_result,
3090 "messages": messages,
3091 }),
3092 SseEvent::TransactionExpired { transaction_hash } => json!({
3093 "transaction_hash": transaction_hash.to_hex_string(),
3094 }),
3095 SseEvent::Fault {
3096 era_id,
3097 public_key,
3098 timestamp,
3099 } => json!({
3100 "era_id": era_id.value(),
3101 "public_key": public_key.to_hex(),
3102 "timestamp": timestamp,
3103 }),
3104 SseEvent::FinalitySignature(signature) => json!({
3105 "block_hash": signature.block_hash().to_string(),
3106 "era_id": signature.era_id().value(),
3107 "signature": signature.signature().to_hex(),
3108 }),
3109 SseEvent::Step {
3110 era_id,
3111 execution_effects,
3112 } => json!({
3113 "era_id": era_id.value(),
3114 "execution_effects": execution_effects.get(),
3115 }),
3116 SseEvent::Shutdown => json!({}),
3117 }
3118}
3119
3120fn timestamp_prefix() -> String {
3121 time::OffsetDateTime::now_utc()
3122 .format(&time::format_description::well_known::Rfc3339)
3123 .unwrap_or_else(|_| "unknown-time".to_string())
3124}
3125
3126fn processes_running(state: &State) -> bool {
3127 if state.processes.is_empty() {
3128 return false;
3129 }
3130
3131 state.processes.iter().all(|process| {
3132 matches!(process.last_status, ProcessStatus::Running)
3133 && process.current_pid().is_some_and(is_pid_running)
3134 })
3135}
3136
3137fn running_node_ids(state: &State) -> Vec<u32> {
3138 let mut node_ids = std::collections::BTreeSet::new();
3139 for process in &state.processes {
3140 if !matches!(process.kind, ProcessKind::Node) {
3141 continue;
3142 }
3143 if !matches!(process.last_status, ProcessStatus::Running) {
3144 continue;
3145 }
3146 let Some(pid) = process.current_pid() else {
3147 continue;
3148 };
3149 if !is_pid_running(pid) {
3150 continue;
3151 }
3152 node_ids.insert(process.node_id);
3153 }
3154 node_ids.into_iter().collect()
3155}
3156
3157fn is_pid_running(pid: u32) -> bool {
3158 let pid = Pid::from_raw(pid as i32);
3159 match kill(pid, None) {
3160 Ok(()) => true,
3161 Err(Errno::ESRCH) => false,
3162 Err(_) => true,
3163 }
3164}
3165
3166async fn discover_network_names(assets_root: &Path) -> Result<Vec<String>> {
3167 if !is_dir(assets_root).await {
3168 return Ok(Vec::new());
3169 }
3170
3171 let mut names = Vec::new();
3172 let mut entries = tokio_fs::read_dir(assets_root).await?;
3173 while let Some(entry) = entries.next_entry().await? {
3174 if !entry.file_type().await?.is_dir() {
3175 continue;
3176 }
3177 let name = entry.file_name().to_string_lossy().to_string();
3178 if !name.is_empty() {
3179 names.push(name);
3180 }
3181 }
3182
3183 names.sort();
3184 Ok(names)
3185}
3186
3187async fn ensure_sidecar_available(layout: &AssetsLayout, node_count: u32) -> Result<()> {
3188 for node_id in 1..=node_count {
3189 let version_dir = layout.latest_protocol_version_dir(node_id).await?;
3190 let sidecar_bin = layout
3191 .node_bin_dir(node_id)
3192 .join(&version_dir)
3193 .join("casper-sidecar");
3194 let sidecar_cfg = layout
3195 .node_config_root(node_id)
3196 .join(&version_dir)
3197 .join("sidecar.toml");
3198
3199 if !is_file(&sidecar_bin).await {
3200 return Err(anyhow!(
3201 "missing sidecar binary for node {}: {}",
3202 node_id,
3203 sidecar_bin.display()
3204 ));
3205 }
3206
3207 if !is_file(&sidecar_cfg).await {
3208 return Err(anyhow!(
3209 "missing sidecar.toml for node {}: {}",
3210 node_id,
3211 sidecar_cfg.display()
3212 ));
3213 }
3214 }
3215
3216 Ok(())
3217}
3218
3219async fn latest_layout_protocol_version(layout: &AssetsLayout) -> Result<String> {
3220 Ok(layout
3221 .latest_protocol_version_dir(1)
3222 .await?
3223 .replace('_', "."))
3224}
3225
3226fn stage_asset_selector(
3227 asset: Option<&str>,
3228 custom_asset: Option<&str>,
3229 legacy_asset_name: Option<&str>,
3230) -> Result<AssetSelector> {
3231 let custom_asset = match (custom_asset, legacy_asset_name) {
3232 (Some(_), Some(_)) => {
3233 return Err(anyhow!(
3234 "custom_asset and asset_name are mutually exclusive"
3235 ));
3236 }
3237 (Some(custom_asset), None) | (None, Some(custom_asset)) => Some(custom_asset),
3238 (None, None) => None,
3239 };
3240 assets::required_asset_selector(asset, custom_asset)
3241}
3242
3243async fn parse_derived_accounts_csv(
3244 csv: &str,
3245 seed: Option<Arc<str>>,
3246) -> Result<Vec<DerivedAccountRow>> {
3247 let mut lines = csv.lines();
3248 let header = lines
3249 .next()
3250 .ok_or_else(|| anyhow!("derived accounts csv is empty"))?;
3251 if header.trim() != "kind,name,key_type,derivation,path,account_hash,balance" {
3252 return Err(anyhow!("unexpected derived-accounts.csv header"));
3253 }
3254
3255 let mut rows = Vec::new();
3256 for line in lines {
3257 let line = line.trim();
3258 if line.is_empty() {
3259 continue;
3260 }
3261
3262 let parts = line.splitn(7, ',').collect::<Vec<_>>();
3263 if parts.len() != 7 {
3264 return Err(anyhow!("invalid derived account row: {}", line));
3265 }
3266
3267 let path = parts[4].to_string();
3268 let public_key = if let Some(seed) = &seed {
3269 match assets::derive_account_from_seed_path(Arc::clone(seed), &path).await {
3270 Ok(material) => Some(material.public_key_hex),
3271 Err(_) => None,
3272 }
3273 } else {
3274 None
3275 };
3276
3277 rows.push(DerivedAccountRow {
3278 kind: parts[0].to_string(),
3279 name: parts[1].to_string(),
3280 key_type: parts[2].to_string(),
3281 derivation: parts[3].to_string(),
3282 path,
3283 account_hash: parts[5].to_string(),
3284 balance: parts[6].to_string(),
3285 public_key,
3286 });
3287 }
3288
3289 Ok(rows)
3290}
3291
3292async fn verify_path_hash_consistency(
3293 layout: &AssetsLayout,
3294 path: &str,
3295 expected_account_hash: &str,
3296) -> Result<()> {
3297 let Some(csv) = assets::derived_accounts_summary(layout).await else {
3298 return Ok(());
3299 };
3300
3301 for line in csv.lines().skip(1) {
3302 let line = line.trim();
3303 if line.is_empty() {
3304 continue;
3305 }
3306 let parts = line.splitn(7, ',').collect::<Vec<_>>();
3307 if parts.len() != 7 {
3308 continue;
3309 }
3310 if parts[4] == path {
3311 if parts[5] != expected_account_hash {
3312 return Err(anyhow!(
3313 "derived account hash mismatch for path {}: csv={} derived={}",
3314 path,
3315 parts[5],
3316 expected_account_hash
3317 ));
3318 }
3319 return Ok(());
3320 }
3321 }
3322
3323 Ok(())
3324}
3325
3326async fn fetch_chain_name(network_name: &str, node_id: u32) -> Result<String> {
3327 let rpc = mcp_rpc_client(network_name, node_id)?;
3328 rpc.get_network_name().await.map_err(Into::into)
3329}
3330
3331fn extract_block_height(value: &Value) -> Option<u64> {
3332 value
3333 .pointer("/block_with_signatures/block/header/height")
3334 .and_then(Value::as_u64)
3335 .or_else(|| {
3336 value
3337 .pointer("/block/block/header/height")
3338 .and_then(Value::as_u64)
3339 })
3340 .or_else(|| {
3341 value
3342 .pointer("/block_with_signatures/Version2/block/Version2/header/height")
3343 .and_then(Value::as_u64)
3344 })
3345 .or_else(|| {
3346 value
3347 .pointer("/block_with_signatures/Version1/block/Version1/header/height")
3348 .and_then(Value::as_u64)
3349 })
3350 .or_else(|| find_first_height(value))
3351}
3352
3353fn find_first_height(value: &Value) -> Option<u64> {
3354 match value {
3355 Value::Object(map) => {
3356 if let Some(height) = map.get("height").and_then(Value::as_u64) {
3357 return Some(height);
3358 }
3359 for nested in map.values() {
3360 if let Some(height) = find_first_height(nested) {
3361 return Some(height);
3362 }
3363 }
3364 None
3365 }
3366 Value::Array(items) => {
3367 for item in items {
3368 if let Some(height) = find_first_height(item) {
3369 return Some(height);
3370 }
3371 }
3372 None
3373 }
3374 _ => None,
3375 }
3376}
3377
3378fn parse_no_such_block_range_from_error(error_text: &str) -> Option<(u64, u64)> {
3379 let start = error_text.find('{')?;
3380 let payload = &error_text[start..];
3381 let value: Value = serde_json::from_str(payload).ok()?;
3382
3383 let code = value.get("code").and_then(Value::as_i64)?;
3384 let message = value.get("message").and_then(Value::as_str)?;
3385 if code != -32001 || !message.eq_ignore_ascii_case("No such block") {
3386 return None;
3387 }
3388
3389 let low = value
3390 .pointer("/data/available_block_range/low")
3391 .and_then(Value::as_u64)
3392 .unwrap_or(0);
3393 let high = value
3394 .pointer("/data/available_block_range/high")
3395 .and_then(Value::as_u64)
3396 .unwrap_or(low);
3397 Some((low, high))
3398}
3399
3400async fn read_log_page(path: &Path, before_line: Option<usize>, limit: usize) -> Result<LogPage> {
3401 let contents = tokio_fs::read_to_string(path)
3402 .await
3403 .with_context(|| format!("failed to read log file {}", path.display()))?;
3404
3405 let all_lines = contents
3406 .lines()
3407 .map(ToString::to_string)
3408 .collect::<Vec<_>>();
3409 let total_lines = all_lines.len();
3410
3411 let before = before_line.unwrap_or(total_lines + 1);
3412 if before == 0 {
3413 return Err(anyhow!("before_line must be >= 1"));
3414 }
3415
3416 let end_exclusive = before.saturating_sub(1).min(total_lines);
3417 let start = end_exclusive.saturating_sub(limit);
3418
3419 let mut lines = Vec::new();
3420 for (idx, content) in all_lines[start..end_exclusive].iter().enumerate() {
3421 lines.push(LogLine {
3422 line_number: start + idx + 1,
3423 content: content.clone(),
3424 });
3425 }
3426
3427 let next_before_line = if start == 0 { None } else { Some(start + 1) };
3428
3429 Ok(LogPage {
3430 path: path.display().to_string(),
3431 total_lines,
3432 returned: lines.len(),
3433 next_before_line,
3434 lines,
3435 })
3436}
3437
3438async fn is_dir(path: &Path) -> bool {
3439 tokio_fs::metadata(path)
3440 .await
3441 .map(|meta| meta.is_dir())
3442 .unwrap_or(false)
3443}
3444
3445async fn is_file(path: &Path) -> bool {
3446 tokio_fs::metadata(path)
3447 .await
3448 .map(|meta| meta.is_file())
3449 .unwrap_or(false)
3450}
3451
3452pub async fn run(args: McpArgs) -> Result<()> {
3453 let assets_root = match args.net_path {
3454 Some(path) => path,
3455 None => assets::default_assets_root()?,
3456 };
3457
3458 let manager = Arc::new(NetworkManager::new(assets_root).await?);
3459
3460 let result = match args.transport {
3461 McpTransport::Stdio => run_stdio(manager.clone()).await,
3462 McpTransport::Http => run_http(manager.clone(), &args.http_bind, &args.http_path).await,
3463 McpTransport::Both => run_both(manager.clone(), &args.http_bind, &args.http_path).await,
3464 };
3465
3466 let stop_result = manager.stop_all_networks().await;
3467
3468 match (result, stop_result) {
3469 (Ok(()), Ok(())) => Ok(()),
3470 (Err(err), Ok(())) => Err(err),
3471 (Ok(()), Err(stop_err)) => Err(stop_err),
3472 (Err(run_err), Err(stop_err)) => Err(anyhow!(
3473 "mcp server failed: {run_err}; additionally failed to stop networks: {stop_err}"
3474 )),
3475 }
3476}
3477
3478async fn run_stdio(manager: Arc<NetworkManager>) -> Result<()> {
3479 let service = McpServer::new(manager).serve(mcp_stdio()).await?;
3480 service.waiting().await?;
3481 Ok(())
3482}
3483
3484async fn run_http(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3485 let path = normalize_http_path(path);
3486 let socket = std::net::SocketAddr::from_str(bind)
3487 .with_context(|| format!("invalid http bind address '{}'", bind))?;
3488
3489 let service: StreamableHttpService<McpServer, LocalSessionManager> = StreamableHttpService::new(
3490 {
3491 let manager = manager.clone();
3492 move || Ok(McpServer::new(manager.clone()))
3493 },
3494 Arc::new(LocalSessionManager::default()),
3495 StreamableHttpServerConfig::default(),
3496 );
3497
3498 let router = axum::Router::new().nest_service(&path, service);
3499 let listener = tokio::net::TcpListener::bind(socket).await?;
3500
3501 axum::serve(listener, router)
3502 .with_graceful_shutdown(async {
3503 let _ = tokio::signal::ctrl_c().await;
3504 })
3505 .await?;
3506
3507 Ok(())
3508}
3509
3510async fn run_both(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3511 let path = normalize_http_path(path);
3512 let socket = std::net::SocketAddr::from_str(bind)
3513 .with_context(|| format!("invalid http bind address '{}'", bind))?;
3514
3515 let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
3516
3517 let mut http_task = {
3518 let manager = manager.clone();
3519 tokio::spawn(async move {
3520 let service: StreamableHttpService<McpServer, LocalSessionManager> =
3521 StreamableHttpService::new(
3522 {
3523 let manager = manager.clone();
3524 move || Ok(McpServer::new(manager.clone()))
3525 },
3526 Arc::new(LocalSessionManager::default()),
3527 StreamableHttpServerConfig::default(),
3528 );
3529
3530 let router = axum::Router::new().nest_service(&path, service);
3531 let listener = tokio::net::TcpListener::bind(socket).await?;
3532
3533 axum::serve(listener, router)
3534 .with_graceful_shutdown(async move {
3535 tokio::select! {
3536 _ = async {
3537 loop {
3538 if *shutdown_rx.borrow() {
3539 break;
3540 }
3541 if shutdown_rx.changed().await.is_err() {
3542 break;
3543 }
3544 }
3545 } => {}
3546 _ = tokio::signal::ctrl_c() => {}
3547 }
3548 })
3549 .await
3550 .map_err(anyhow::Error::from)
3551 })
3552 };
3553
3554 let mut stdio_task = tokio::spawn(async move { run_stdio(manager).await });
3555
3556 let result = tokio::select! {
3557 res = &mut stdio_task => match res {
3558 Ok(inner) => inner,
3559 Err(join_err) => Err(anyhow!("stdio task failed: {}", join_err)),
3560 },
3561 res = &mut http_task => match res {
3562 Ok(inner) => inner,
3563 Err(join_err) => Err(anyhow!("http task failed: {}", join_err)),
3564 },
3565 };
3566
3567 let _ = shutdown_tx.send(true);
3568 let _ = tokio::time::timeout(Duration::from_secs(2), &mut http_task).await;
3569 let _ = tokio::time::timeout(Duration::from_secs(2), &mut stdio_task).await;
3570
3571 result
3572}
3573
3574fn normalize_http_path(path: &str) -> String {
3575 if path.starts_with('/') {
3576 path.to_string()
3577 } else {
3578 format!("/{}", path)
3579 }
3580}
3581
3582#[cfg(test)]
3583mod tests {
3584 use super::*;
3585
3586 #[tokio::test]
3587 async fn parse_derived_accounts_rows() {
3588 let csv = "kind,name,key_type,derivation,path,account_hash,balance\nvalidator,node-1,secp256k1,bip32,m/44'/506'/0'/0/0,account-hash-a,100\nuser,user-1,secp256k1,bip32,m/44'/506'/0'/0/100,account-hash-b,200";
3589 let rows = parse_derived_accounts_csv(csv, None).await.unwrap();
3590 assert_eq!(rows.len(), 2);
3591 assert_eq!(rows[0].path, "m/44'/506'/0'/0/0");
3592 assert_eq!(rows[1].account_hash, "account-hash-b");
3593 }
3594
3595 #[test]
3596 fn sse_filter_include_exclude() {
3597 let filter = SseFilter {
3598 include_event_types: vec!["BlockAdded".to_string()],
3599 exclude_event_types: vec!["TransactionAccepted".to_string()],
3600 };
3601
3602 let block = SseRecord {
3603 sequence: 1,
3604 timestamp_rfc3339: "t".to_string(),
3605 node_id: 1,
3606 event_type: "BlockAdded".to_string(),
3607 payload: json!({}),
3608 };
3609 let tx = SseRecord {
3610 sequence: 2,
3611 timestamp_rfc3339: "t".to_string(),
3612 node_id: 1,
3613 event_type: "TransactionAccepted".to_string(),
3614 payload: json!({}),
3615 };
3616
3617 assert!(filter.matches(&block));
3618 assert!(!filter.matches(&tx));
3619 }
3620
3621 #[tokio::test]
3622 async fn sse_history_paginates_by_sequence() {
3623 let store = SseStore::new(100);
3624 for sequence in 1..=5 {
3625 let mut guard = store.events.lock().await;
3626 guard.push_back(SseRecord {
3627 sequence,
3628 timestamp_rfc3339: "t".to_string(),
3629 node_id: 1,
3630 event_type: "BlockAdded".to_string(),
3631 payload: json!({ "sequence": sequence }),
3632 });
3633 drop(guard);
3634 }
3635 let filter = SseFilter::default();
3636 let page = store.history(Some(6), 2, &filter).await;
3637 assert_eq!(page.returned, 2);
3638 assert_eq!(page.events[0].sequence, 4);
3639 assert_eq!(page.events[1].sequence, 5);
3640 assert_eq!(page.next_before_sequence, Some(4));
3641 }
3642
3643 #[tokio::test]
3644 async fn read_log_page_uses_before_cursor() {
3645 let temp = tempfile::NamedTempFile::new().unwrap();
3646 tokio_fs::write(temp.path(), "a\nb\nc\nd\n").await.unwrap();
3647
3648 let page = read_log_page(temp.path(), Some(5), 2).await.unwrap();
3649 assert_eq!(page.lines.len(), 2);
3650 assert_eq!(page.lines[0].line_number, 3);
3651 assert_eq!(page.lines[1].line_number, 4);
3652 assert_eq!(page.next_before_line, Some(3));
3653 }
3654
3655 #[tokio::test]
3656 async fn sidecar_preflight_fails_when_missing() {
3657 let root = tempfile::tempdir().unwrap();
3658 let layout = AssetsLayout::new(root.path().to_path_buf(), "test-net".to_string());
3659 let node_bin = layout.node_bin_dir(1).join("2_0_0");
3660 let node_cfg = layout.node_config_root(1).join("2_0_0");
3661 tokio_fs::create_dir_all(&node_bin).await.unwrap();
3662 tokio_fs::create_dir_all(&node_cfg).await.unwrap();
3663 tokio_fs::write(node_bin.join("casper-node"), "bin")
3664 .await
3665 .unwrap();
3666 tokio_fs::write(node_cfg.join("sidecar.toml"), "cfg")
3667 .await
3668 .unwrap();
3669
3670 let result = ensure_sidecar_available(&layout, 1).await;
3671 assert!(result.is_err());
3672 }
3673
3674 #[test]
3675 fn normalize_optional_identifier_trims_and_defaults() {
3676 assert_eq!(normalize_optional_identifier(None), "");
3677 assert_eq!(normalize_optional_identifier(Some(" ")), "");
3678 assert_eq!(normalize_optional_identifier(Some(" 123 ")), "123");
3679 }
3680
3681 #[test]
3682 fn parse_state_identifier_variants() {
3683 assert_eq!(
3684 parse_state_identifier("42", "").unwrap(),
3685 Some(json!({ "BlockHeight": 42u64 }))
3686 );
3687 assert_eq!(
3688 parse_state_identifier(
3689 "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234",
3690 ""
3691 )
3692 .unwrap(),
3693 Some(json!({
3694 "BlockHash": "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234"
3695 }))
3696 );
3697 assert_eq!(parse_state_identifier("", "").unwrap(), None,);
3698 }
3699
3700 #[test]
3701 fn parse_contract_hash_for_invocation_accepts_common_formats() {
3702 let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3703 let contract = parse_contract_hash_for_invocation(&format!("contract-{}", hash)).unwrap();
3704 let key_hash = parse_contract_hash_for_invocation(&format!("hash-{}", hash)).unwrap();
3705 let raw = parse_contract_hash_for_invocation(hash).unwrap();
3706 assert_eq!(contract.to_hex_string(), hash);
3707 assert_eq!(key_hash.to_hex_string(), hash);
3708 assert_eq!(raw.to_hex_string(), hash);
3709 }
3710
3711 #[test]
3712 fn parse_query_key_accepts_contract_hash_format() {
3713 let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3714 let key = parse_query_key(&format!("contract-{}", hash)).unwrap();
3715 assert_eq!(key, format!("hash-{}", hash));
3716 }
3717
3718 #[test]
3719 fn parse_transaction_json_rejects_escaped_string_payload() {
3720 let tx_json = json!({
3721 "Version1": {
3722 "hash": "7eeb092361e31b4cc9885e3621f1470f29631338ecc703643c22da1d38fd81a9",
3723 "payload": {
3724 "initiator_addr": {
3725 "PublicKey": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f"
3726 },
3727 "timestamp": "2026-02-27T18:03:18.541Z",
3728 "ttl": "30m",
3729 "chain_name": "casper-devnet",
3730 "pricing_mode": {
3731 "PaymentLimited": {
3732 "payment_amount": 100000000000u64,
3733 "gas_price_tolerance": 5,
3734 "standard_payment": true
3735 }
3736 },
3737 "fields": {
3738 "args": {"Named": []},
3739 "entry_point": {"Custom": "counter_inc"},
3740 "scheduling": "Standard",
3741 "target": {
3742 "Stored": {
3743 "id": {
3744 "ByPackageName": {
3745 "name": "counter_package_name",
3746 "version": null
3747 }
3748 },
3749 "runtime": "VmCasperV1"
3750 }
3751 }
3752 }
3753 },
3754 "approvals": [{
3755 "signer": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f",
3756 "signature": "02c64336e5ed2832bdb84adb3f334d585548ee096066aa9d0797c11ab3f074ec9d7bd396994bc9b9c239342be801bc385a9c5083779bace4dfe0b400d4a13c07db"
3757 }]
3758 }
3759 });
3760 let direct = parse_transaction_json(tx_json.clone()).unwrap();
3761 let wrapped = parse_transaction_json(json!({ "transaction": tx_json })).unwrap();
3762 let encoded = serde_json::to_string(&direct).unwrap();
3763 let from_string_err = parse_transaction_json(Value::String(encoded.clone()));
3764 let wrapped_string_err = parse_transaction_json(json!({
3765 "transaction": encoded
3766 }));
3767
3768 assert_eq!(
3769 direct.hash().to_hex_string(),
3770 wrapped.hash().to_hex_string()
3771 );
3772 assert!(from_string_err.is_err());
3773 assert!(wrapped_string_err.is_err());
3774 }
3775
3776 #[test]
3777 fn parse_no_such_block_range_from_error_extracts_bounds() {
3778 let error = "casper client error: response for rpc-id 1 chain_get_block is json-rpc error: {\"code\":-32001,\"message\":\"No such block\",\"data\":{\"message\":\"no block found for the provided identifier\",\"available_block_range\":{\"low\":0,\"high\":0}}}";
3779 let range = parse_no_such_block_range_from_error(error).unwrap();
3780 assert_eq!(range, (0, 0));
3781 }
3782
3783 #[test]
3784 fn send_transaction_signed_request_requires_transaction_field() {
3785 let payload = json!({
3786 "network_name": "casper-devnet",
3787 "node_id": 1,
3788 "signer_path": "m/44'/506'/0'/0/100",
3789 "transaction": { "Version1": { "hash": "abc" } }
3790 });
3791 let request: SendTransactionSignedRequest = serde_json::from_value(payload).unwrap();
3792 assert!(request.transaction.get("Version1").is_some());
3793
3794 let legacy_payload = json!({
3795 "network_name": "casper-devnet",
3796 "node_id": 1,
3797 "signer_path": "m/44'/506'/0'/0/100",
3798 "transaction_json": { "Version1": { "hash": "def" } }
3799 });
3800 let legacy_err = serde_json::from_value::<SendTransactionSignedRequest>(legacy_payload)
3801 .unwrap_err()
3802 .to_string();
3803 assert!(legacy_err.contains("missing field `transaction`"));
3804 }
3805}