Skip to main content

casper_devnet/
mcp.rs

1use self::args_parser::parse_session_args;
2use crate::assets::{
3    self, AddNodesOptions, AssetSelector, AssetsLayout, SetupOptions, StageProtocolOptions,
4};
5use crate::control::{ControlRequest, ControlResponse, ControlResult, send_request};
6use crate::process::{self, StartPlan};
7use crate::state::{
8    ProcessKind, ProcessStatus, STATE_FILE_NAME, State, spawn_pid_sync_tasks,
9    spawn_pid_sync_tasks_for_ids,
10};
11use anyhow::{Context, Result, anyhow};
12use backoff::ExponentialBackoff;
13use backoff::backoff::Backoff;
14use casper_types::contracts::ContractHash;
15use casper_types::{
16    AddressableEntityHash, AsymmetricType, Digest, EntityVersion, Key, PricingMode, PublicKey,
17    SecretKey, TimeDiff, Transaction, TransactionHash, TransactionRuntimeParams, TransactionV1Hash,
18    URef,
19};
20use clap::{Args, ValueEnum};
21use futures::StreamExt;
22use nix::errno::Errno;
23use nix::sys::signal::kill;
24use nix::unistd::Pid;
25use rmcp::handler::server::{router::tool::ToolRouter, wrapper::Parameters};
26use rmcp::model::{CallToolResult, ServerCapabilities, ServerInfo};
27use rmcp::service::ServiceExt;
28use rmcp::transport::{
29    StreamableHttpServerConfig,
30    streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService},
31};
32use rmcp::{
33    ErrorData, ServerHandler, tool, tool_handler, tool_router, transport::stdio as mcp_stdio,
34};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::{HashMap, VecDeque};
38use std::path::{Path, PathBuf};
39use std::str::FromStr;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
42use std::time::{Duration, Instant};
43use tokio::fs as tokio_fs;
44use tokio::io::{AsyncReadExt, AsyncWriteExt};
45use tokio::net::UnixListener;
46use tokio::sync::{Mutex, Notify};
47use veles_casper_rust_sdk::TransactionV1Builder;
48use veles_casper_rust_sdk::jsonrpc::CasperClient;
49use veles_casper_rust_sdk::sse::event::SseEvent;
50use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
51
52const DEFAULT_HTTP_BIND: &str = "127.0.0.1:32100";
53const DEFAULT_HTTP_PATH: &str = "/mcp";
54const DEFAULT_NETWORK_NAME: &str = "casper-dev";
55const DEFAULT_NODE_COUNT: u32 = 4;
56const DEFAULT_DELAY_SECS: u64 = 3;
57const DEFAULT_LOG_LEVEL: &str = "info";
58const DEFAULT_NODE_LOG_FORMAT: &str = "json";
59const DEFAULT_SEED: &str = "default";
60const DEFAULT_TIMEOUT_SECS: u64 = 60;
61const DEFAULT_LOG_PAGE_SIZE: usize = 200;
62const DEFAULT_SSE_PAGE_SIZE: usize = 200;
63const DEFAULT_SSE_HISTORY_CAPACITY: usize = 20_000;
64const DEFAULT_PAYMENT_AMOUNT: u64 = 100_000_000_000;
65
66mod args_parser;
67
68#[derive(Debug, Clone, Copy, ValueEnum)]
69#[value(rename_all = "kebab-case")]
70pub enum McpTransport {
71    Stdio,
72    Http,
73    Both,
74}
75
76#[derive(Debug, Args, Clone)]
77pub struct McpArgs {
78    #[arg(long, value_enum, default_value_t = McpTransport::Both)]
79    transport: McpTransport,
80
81    #[arg(long, default_value = DEFAULT_HTTP_BIND)]
82    http_bind: String,
83
84    #[arg(long, default_value = DEFAULT_HTTP_PATH)]
85    http_path: String,
86
87    #[arg(long, value_name = "PATH")]
88    net_path: Option<PathBuf>,
89}
90
91#[derive(Debug, Clone)]
92struct McpServer {
93    manager: Arc<NetworkManager>,
94    tool_router: ToolRouter<Self>,
95}
96
97impl McpServer {
98    fn new(manager: Arc<NetworkManager>) -> Self {
99        Self {
100            manager,
101            tool_router: Self::tool_router(),
102        }
103    }
104
105    fn server_info() -> ServerInfo {
106        ServerInfo {
107            instructions: Some(
108                "Control multiple local Casper devnets. Start with spawn_network, then wait_network_ready before RPC or transaction tools. Do not use external casper-client binaries or curl; use MCP tools directly (for example get_transaction, wait_transaction, make_transaction_package_call, make_transaction_contract_call, make_transaction_session_wasm, send_transaction_signed).".to_string(),
109            ),
110            capabilities: ServerCapabilities::builder().enable_tools().build(),
111            ..Default::default()
112        }
113    }
114}
115
116#[tool_router]
117impl McpServer {
118    #[tool(
119        description = "Spawn or resume a managed network. Defaults to force_setup=true for fresh devnet startup."
120    )]
121    async fn spawn_network(
122        &self,
123        Parameters(request): Parameters<SpawnNetworkRequest>,
124    ) -> std::result::Result<CallToolResult, ErrorData> {
125        let result = self
126            .manager
127            .spawn_network(request)
128            .await
129            .map_err(to_mcp_error)?;
130        Ok(ok_value(
131            serde_json::to_value(result).map_err(internal_serde_error)?,
132        ))
133    }
134
135    #[tool(
136        description = "Wait for a managed network to be ready: processes running, /status healthy, reactor Validate, and at least one block observed."
137    )]
138    async fn wait_network_ready(
139        &self,
140        Parameters(request): Parameters<WaitNetworkReadyRequest>,
141    ) -> std::result::Result<CallToolResult, ErrorData> {
142        let result = self
143            .manager
144            .wait_network_ready(&request.network_name, request.timeout_seconds)
145            .await
146            .map_err(to_mcp_error)?;
147        Ok(ok_value(
148            serde_json::to_value(result).map_err(internal_serde_error)?,
149        ))
150    }
151
152    #[tool(
153        description = "Stage a protocol upgrade from a named custom asset. When the network is running, sidecars are restarted after staging."
154    )]
155    async fn stage_protocol(
156        &self,
157        Parameters(request): Parameters<StageProtocolRequest>,
158    ) -> std::result::Result<CallToolResult, ErrorData> {
159        let result = self
160            .manager
161            .stage_protocol(
162                &request.network_name,
163                request.asset.as_deref(),
164                request.custom_asset.as_deref(),
165                request.asset_name.as_deref(),
166                &request.protocol_version,
167                request.activation_point,
168            )
169            .await
170            .map_err(to_mcp_error)?;
171        Ok(ok_value(
172            serde_json::to_value(result).map_err(internal_serde_error)?,
173        ))
174    }
175
176    #[tool(
177        description = "Despawn a managed network. By default it stops processes and keeps files; set purge=true to remove files."
178    )]
179    async fn despawn_network(
180        &self,
181        Parameters(request): Parameters<DespawnNetworkRequest>,
182    ) -> std::result::Result<CallToolResult, ErrorData> {
183        let result = self
184            .manager
185            .despawn_network(&request.network_name, request.purge.unwrap_or(false))
186            .await
187            .map_err(to_mcp_error)?;
188        Ok(ok_value(
189            serde_json::to_value(result).map_err(internal_serde_error)?,
190        ))
191    }
192
193    #[tool(description = "List discovered network names and managed/running state.")]
194    async fn list_networks(
195        &self,
196        _: Parameters<ListNetworksRequest>,
197    ) -> std::result::Result<CallToolResult, ErrorData> {
198        let result = self.manager.list_networks().await.map_err(to_mcp_error)?;
199        Ok(ok_value(
200            serde_json::to_value(result).map_err(internal_serde_error)?,
201        ))
202    }
203
204    #[tool(
205        description = "List managed network processes, optionally filtered by process name. Defaults to running_only=true."
206    )]
207    async fn managed_processes(
208        &self,
209        Parameters(request): Parameters<ManagedProcessesRequest>,
210    ) -> std::result::Result<CallToolResult, ErrorData> {
211        let result = self
212            .manager
213            .managed_processes(request)
214            .await
215            .map_err(to_mcp_error)?;
216        Ok(ok_value(
217            serde_json::to_value(result).map_err(internal_serde_error)?,
218        ))
219    }
220
221    #[tool(description = "Call node REST /status for a managed network node.")]
222    async fn status(
223        &self,
224        Parameters(request): Parameters<NodeScopedRequest>,
225    ) -> std::result::Result<CallToolResult, ErrorData> {
226        let network = self
227            .manager
228            .get_network(&request.network_name)
229            .await
230            .map_err(to_mcp_error)?;
231        ensure_running_network(&network).await?;
232        let status = fetch_rest_status(request.node_id)
233            .await
234            .map_err(to_mcp_error)?;
235        Ok(ok_value(status))
236    }
237
238    #[tool(
239        description = "Run a raw JSON-RPC call against node sidecar endpoint. Useful as a generic query helper."
240    )]
241    async fn rpc_query(
242        &self,
243        Parameters(request): Parameters<RpcQueryRequest>,
244    ) -> std::result::Result<CallToolResult, ErrorData> {
245        let network = self
246            .manager
247            .get_network(&request.network_name)
248            .await
249            .map_err(to_mcp_error)?;
250        ensure_running_network(&network).await?;
251        let value = self
252            .manager
253            .raw_rpc_query(request.node_id, &request.method, request.params)
254            .await
255            .map_err(to_mcp_error)?;
256        Ok(ok_value(value))
257    }
258
259    #[tool(description = "Typed balance query by account/public key/uref/entity identifier.")]
260    async fn rpc_query_balance(
261        &self,
262        Parameters(request): Parameters<RpcQueryBalanceRequest>,
263    ) -> std::result::Result<CallToolResult, ErrorData> {
264        let network = self
265            .manager
266            .get_network(&request.network_name)
267            .await
268            .map_err(to_mcp_error)?;
269        ensure_running_network(&network).await?;
270
271        let block_id = normalize_optional_identifier(request.block_id.as_deref());
272        let state_root_hash = normalize_optional_identifier(request.state_root_hash.as_deref());
273        let params =
274            build_query_balance_params(&request.purse_identifier, &block_id, &state_root_hash)
275                .map_err(to_mcp_error)?;
276        let response = self
277            .manager
278            .raw_rpc_query(request.node_id, "query_balance", Some(params))
279            .await
280            .map_err(to_mcp_error)?;
281        Ok(ok_value(
282            extract_rpc_result(response).map_err(to_mcp_error)?,
283        ))
284    }
285
286    #[tool(
287        description = "Typed global state query by key + optional path + optional block/state-root identifier. If no identifier is provided, the latest block hash is used."
288    )]
289    async fn rpc_query_global_state(
290        &self,
291        Parameters(request): Parameters<RpcQueryGlobalStateRequest>,
292    ) -> std::result::Result<CallToolResult, ErrorData> {
293        let network = self
294            .manager
295            .get_network(&request.network_name)
296            .await
297            .map_err(to_mcp_error)?;
298        ensure_running_network(&network).await?;
299
300        let (block_id, state_root_hash) = resolve_global_state_identifier(
301            &request.network_name,
302            request.node_id,
303            request.block_id.as_deref(),
304            request.state_root_hash.as_deref(),
305        )
306        .await
307        .map_err(to_mcp_error)?;
308        let params = build_query_global_state_params(
309            &request.key,
310            request.path.unwrap_or_default(),
311            &block_id,
312            &state_root_hash,
313        )
314        .map_err(to_mcp_error)?;
315        let response = self
316            .manager
317            .raw_rpc_query(request.node_id, "query_global_state", Some(params))
318            .await
319            .map_err(to_mcp_error)?;
320
321        Ok(ok_value(
322            extract_rpc_result(response).map_err(to_mcp_error)?,
323        ))
324    }
325
326    #[tool(description = "Get current block height using typed chain_get_block RPC.")]
327    async fn current_block_height(
328        &self,
329        Parameters(request): Parameters<CurrentBlockRequest>,
330    ) -> std::result::Result<CallToolResult, ErrorData> {
331        let network = self
332            .manager
333            .get_network(&request.network_name)
334            .await
335            .map_err(to_mcp_error)?;
336        ensure_running_network(&network).await?;
337
338        let value = fetch_block_result(
339            &self.manager,
340            &request.network_name,
341            request.node_id,
342            request.block_id.as_deref(),
343        )
344        .await;
345        let value = match value {
346            Ok(value) => value,
347            Err(err) => {
348                if request.block_id.is_none()
349                    && let Some((low, high)) =
350                        parse_no_such_block_range_from_error(&err.to_string())
351                {
352                    return Ok(ok_value(json!({
353                        "network_name": request.network_name,
354                        "node_id": request.node_id,
355                        "height": high,
356                        "block": Value::Null,
357                        "pending": true,
358                        "available_block_range": {
359                            "low": low,
360                            "high": high,
361                        },
362                        "message": "latest block is not yet queryable; retry shortly",
363                    })));
364                }
365                return Err(to_mcp_error(err));
366            }
367        };
368        let height = extract_block_height(&value).ok_or_else(|| {
369            ErrorData::internal_error("missing block height in RPC response", None)
370        })?;
371
372        Ok(ok_value(json!({
373            "network_name": request.network_name,
374            "node_id": request.node_id,
375            "height": height,
376            "block": value,
377        })))
378    }
379
380    #[tool(description = "Get current block payload using typed chain_get_block RPC.")]
381    async fn current_block(
382        &self,
383        Parameters(request): Parameters<CurrentBlockRequest>,
384    ) -> std::result::Result<CallToolResult, ErrorData> {
385        let network = self
386            .manager
387            .get_network(&request.network_name)
388            .await
389            .map_err(to_mcp_error)?;
390        ensure_running_network(&network).await?;
391
392        let value = fetch_block_result(
393            &self.manager,
394            &request.network_name,
395            request.node_id,
396            request.block_id.as_deref(),
397        )
398        .await
399        .map_err(to_mcp_error)?;
400        Ok(ok_value(value))
401    }
402
403    #[tool(description = "Get paginated node stdout/stderr logs from on-disk files.")]
404    async fn get_node_logs(
405        &self,
406        Parameters(request): Parameters<GetNodeLogsRequest>,
407    ) -> std::result::Result<CallToolResult, ErrorData> {
408        let network = self
409            .manager
410            .get_network(&request.network_name)
411            .await
412            .map_err(to_mcp_error)?;
413
414        let limit = request.limit.unwrap_or(DEFAULT_LOG_PAGE_SIZE);
415        if limit == 0 {
416            return Err(ErrorData::invalid_params(
417                "limit must be greater than 0",
418                None,
419            ));
420        }
421
422        let file_name = match request.stream {
423            NodeLogStream::Stdout => "stdout.log",
424            NodeLogStream::Stderr => "stderr.log",
425        };
426        let path = network
427            .layout
428            .node_logs_dir(request.node_id)
429            .join(file_name);
430
431        let page = read_log_page(&path, request.before_line, limit)
432            .await
433            .map_err(to_mcp_error)?;
434        Ok(ok_value(
435            serde_json::to_value(page).map_err(internal_serde_error)?,
436        ))
437    }
438
439    #[tool(
440        description = "Wait for the next SSE event after a sequence cursor with optional event type filters."
441    )]
442    async fn wait_next_sse(
443        &self,
444        Parameters(request): Parameters<WaitNextSseRequest>,
445    ) -> std::result::Result<CallToolResult, ErrorData> {
446        let network = self
447            .manager
448            .get_network(&request.network_name)
449            .await
450            .map_err(to_mcp_error)?;
451        ensure_running_network(&network).await?;
452
453        let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
454        let filter = SseFilter {
455            include_event_types: request.include_event_types.unwrap_or_default(),
456            exclude_event_types: request.exclude_event_types.unwrap_or_default(),
457        };
458
459        let after = match request.after_sequence {
460            Some(value) => value,
461            None => network.sse_store.latest_sequence().await,
462        };
463
464        let event = network
465            .sse_store
466            .wait_next(after, &filter, timeout)
467            .await
468            .map_err(to_mcp_error)?;
469
470        Ok(ok_value(
471            serde_json::to_value(event).map_err(internal_serde_error)?,
472        ))
473    }
474
475    #[tool(
476        description = "Fetch paginated SSE history with sequence cursor and optional event type filters."
477    )]
478    async fn sse_history(
479        &self,
480        Parameters(request): Parameters<SseHistoryRequest>,
481    ) -> std::result::Result<CallToolResult, ErrorData> {
482        let network = self
483            .manager
484            .get_network(&request.network_name)
485            .await
486            .map_err(to_mcp_error)?;
487
488        let limit = request.limit.unwrap_or(DEFAULT_SSE_PAGE_SIZE);
489        if limit == 0 {
490            return Err(ErrorData::invalid_params(
491                "limit must be greater than 0",
492                None,
493            ));
494        }
495
496        let filter = SseFilter {
497            include_event_types: request.include_event_types.unwrap_or_default(),
498            exclude_event_types: request.exclude_event_types.unwrap_or_default(),
499        };
500
501        let history = network
502            .sse_store
503            .history(request.before_sequence, limit, &filter)
504            .await;
505        Ok(ok_value(
506            serde_json::to_value(history).map_err(internal_serde_error)?,
507        ))
508    }
509
510    #[tool(description = "List derived accounts from derived-accounts.csv for a network.")]
511    async fn list_derived_accounts(
512        &self,
513        Parameters(request): Parameters<ListDerivedAccountsRequest>,
514    ) -> std::result::Result<CallToolResult, ErrorData> {
515        let accounts = self
516            .manager
517            .list_derived_accounts(&request.network_name)
518            .await
519            .map_err(to_mcp_error)?;
520        Ok(ok_value(
521            serde_json::to_value(accounts).map_err(internal_serde_error)?,
522        ))
523    }
524
525    #[tool(
526        description = "Submit signed or unsigned transaction JSON. Signer key is derived from signer_path for this managed network. transaction must be a typed Transaction JSON object."
527    )]
528    async fn send_transaction_signed(
529        &self,
530        Parameters(request): Parameters<SendTransactionSignedRequest>,
531    ) -> std::result::Result<CallToolResult, ErrorData> {
532        let network = self
533            .manager
534            .get_network(&request.network_name)
535            .await
536            .map_err(to_mcp_error)?;
537        ensure_running_network(&network).await?;
538
539        let signer = self
540            .manager
541            .derived_account_for_path(&network, &request.signer_path)
542            .await
543            .map_err(to_mcp_error)?;
544        verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
545            .await
546            .map_err(to_mcp_error)?;
547
548        let mut transaction = parse_transaction_json(request.transaction)?;
549        transaction.sign(&signer.secret_key);
550
551        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
552        let response = rpc
553            .put_transaction(transaction)
554            .await
555            .map_err(to_mcp_error)?;
556
557        Ok(ok_value(json!({
558            "network_name": request.network_name,
559            "node_id": request.node_id,
560            "transaction_hash": response.transaction_hash.to_hex_string(),
561        })))
562    }
563
564    #[tool(
565        description = "Create a stored package-name call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
566    )]
567    async fn make_transaction_package_call(
568        &self,
569        Parameters(request): Parameters<MakeTransactionPackageCallRequest>,
570    ) -> std::result::Result<CallToolResult, ErrorData> {
571        let network = self
572            .manager
573            .get_network(&request.network_name)
574            .await
575            .map_err(to_mcp_error)?;
576        ensure_running_network(&network).await?;
577
578        let runtime = match (
579            request.runtime_transferred_value,
580            request.runtime_seed_hex.as_deref(),
581        ) {
582            (None, None) => TransactionRuntimeParams::VmCasperV1,
583            (transferred_value, maybe_seed_hex) => {
584                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
585                TransactionRuntimeParams::VmCasperV2 {
586                    transferred_value: transferred_value.unwrap_or(0),
587                    seed,
588                }
589            }
590        };
591
592        let mut builder = TransactionV1Builder::new_targeting_package_via_alias(
593            request.transaction_package_name.clone(),
594            request.transaction_package_version,
595            request.session_entry_point.clone(),
596            runtime,
597        );
598
599        if let Some(args_json) = request.session_args.as_ref()
600            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
601        {
602            builder = builder.with_runtime_args(runtime_args);
603        }
604
605        if let Some(ttl_millis) = request.ttl_millis {
606            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
607        }
608
609        let pricing = build_pricing_mode(
610            request.gas_price_tolerance,
611            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
612        );
613        builder = builder.with_pricing_mode(pricing);
614
615        let chain_name = match request.chain_name {
616            Some(value) => value,
617            None => fetch_chain_name(&request.network_name, request.node_id)
618                .await
619                .map_err(to_mcp_error)?,
620        };
621        builder = builder.with_chain_name(chain_name.clone());
622
623        let mut signed = false;
624        let mut signer_path = None;
625        let mut derived_signer = None;
626        if let Some(path) = request.signer_path {
627            let signer = self
628                .manager
629                .derived_account_for_path(&network, &path)
630                .await
631                .map_err(to_mcp_error)?;
632            verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
633                .await
634                .map_err(to_mcp_error)?;
635            derived_signer = Some(signer);
636            signed = true;
637            signer_path = Some(path);
638        } else if let Some(initiator_public_key) = request.initiator_public_key {
639            let public_key = PublicKey::from_hex(initiator_public_key.trim())
640                .with_context(|| "failed to parse initiator_public_key as hex public key")
641                .map_err(to_mcp_error)?;
642            builder = builder.with_initiator_addr(public_key);
643        } else {
644            return Err(ErrorData::invalid_params(
645                "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
646                None,
647            ));
648        }
649
650        if let Some(signer) = derived_signer.as_ref() {
651            builder = builder.with_secret_key(&signer.secret_key);
652        }
653
654        let tx = builder.build().map_err(to_mcp_error)?;
655        let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
656
657        Ok(ok_value(json!({
658            "network_name": request.network_name,
659            "node_id": request.node_id,
660            "chain_name": chain_name,
661            "signed": signed,
662            "signer_path": signer_path,
663            "transaction": tx_json,
664        })))
665    }
666
667    #[tool(
668        description = "Create a stored contract-hash call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
669    )]
670    async fn make_transaction_contract_call(
671        &self,
672        Parameters(request): Parameters<MakeTransactionContractCallRequest>,
673    ) -> std::result::Result<CallToolResult, ErrorData> {
674        let network = self
675            .manager
676            .get_network(&request.network_name)
677            .await
678            .map_err(to_mcp_error)?;
679        ensure_running_network(&network).await?;
680
681        let runtime = match (
682            request.runtime_transferred_value,
683            request.runtime_seed_hex.as_deref(),
684        ) {
685            (None, None) => TransactionRuntimeParams::VmCasperV1,
686            (transferred_value, maybe_seed_hex) => {
687                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
688                TransactionRuntimeParams::VmCasperV2 {
689                    transferred_value: transferred_value.unwrap_or(0),
690                    seed,
691                }
692            }
693        };
694
695        let contract_hash = parse_contract_hash_for_invocation(&request.transaction_contract_hash)
696            .map_err(to_mcp_error)?;
697        let mut builder = TransactionV1Builder::new_targeting_invocable_entity(
698            contract_hash,
699            request.session_entry_point.clone(),
700            runtime,
701        );
702
703        if let Some(args_json) = request.session_args.as_ref()
704            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
705        {
706            builder = builder.with_runtime_args(runtime_args);
707        }
708
709        if let Some(ttl_millis) = request.ttl_millis {
710            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
711        }
712
713        let pricing = build_pricing_mode(
714            request.gas_price_tolerance,
715            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
716        );
717        builder = builder.with_pricing_mode(pricing);
718
719        let chain_name = match request.chain_name {
720            Some(value) => value,
721            None => fetch_chain_name(&request.network_name, request.node_id)
722                .await
723                .map_err(to_mcp_error)?,
724        };
725        builder = builder.with_chain_name(chain_name.clone());
726
727        let mut signed = false;
728        let mut signer_path = None;
729        let mut derived_signer = None;
730        if let Some(path) = request.signer_path {
731            let signer = self
732                .manager
733                .derived_account_for_path(&network, &path)
734                .await
735                .map_err(to_mcp_error)?;
736            verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
737                .await
738                .map_err(to_mcp_error)?;
739            derived_signer = Some(signer);
740            signed = true;
741            signer_path = Some(path);
742        } else if let Some(initiator_public_key) = request.initiator_public_key {
743            let public_key = PublicKey::from_hex(initiator_public_key.trim())
744                .with_context(|| "failed to parse initiator_public_key as hex public key")
745                .map_err(to_mcp_error)?;
746            builder = builder.with_initiator_addr(public_key);
747        } else {
748            return Err(ErrorData::invalid_params(
749                "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
750                None,
751            ));
752        }
753
754        if let Some(signer) = derived_signer.as_ref() {
755            builder = builder.with_secret_key(&signer.secret_key);
756        }
757
758        let tx = builder.build().map_err(to_mcp_error)?;
759        let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
760
761        Ok(ok_value(json!({
762            "network_name": request.network_name,
763            "node_id": request.node_id,
764            "chain_name": chain_name,
765            "transaction_contract_hash": request.transaction_contract_hash,
766            "signed": signed,
767            "signer_path": signer_path,
768            "transaction": tx_json,
769        })))
770    }
771
772    #[tool(
773        description = "Create a session wasm transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
774    )]
775    async fn make_transaction_session_wasm(
776        &self,
777        Parameters(request): Parameters<MakeTransactionSessionWasmRequest>,
778    ) -> std::result::Result<CallToolResult, ErrorData> {
779        let network = self
780            .manager
781            .get_network(&request.network_name)
782            .await
783            .map_err(to_mcp_error)?;
784        ensure_running_network(&network).await?;
785
786        let wasm_path = PathBuf::from(&request.wasm_path);
787        let wasm_bytes = tokio_fs::read(&wasm_path)
788            .await
789            .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
790            .map_err(to_mcp_error)?;
791
792        let runtime = match (
793            request.runtime_transferred_value,
794            request.runtime_seed_hex.as_deref(),
795        ) {
796            (None, None) => TransactionRuntimeParams::VmCasperV1,
797            (transferred_value, maybe_seed_hex) => {
798                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
799                TransactionRuntimeParams::VmCasperV2 {
800                    transferred_value: transferred_value.unwrap_or(0),
801                    seed,
802                }
803            }
804        };
805
806        let mut builder = TransactionV1Builder::new_session(
807            request.is_install_upgrade.unwrap_or(true),
808            wasm_bytes.into(),
809            runtime,
810        );
811
812        if let Some(args_json) = request.session_args.as_ref()
813            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
814        {
815            builder = builder.with_runtime_args(runtime_args);
816        }
817
818        if let Some(ttl_millis) = request.ttl_millis {
819            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
820        }
821
822        let pricing = build_pricing_mode(
823            request.gas_price_tolerance,
824            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
825        );
826        builder = builder.with_pricing_mode(pricing);
827
828        let chain_name = match request.chain_name {
829            Some(value) => value,
830            None => fetch_chain_name(&request.network_name, request.node_id)
831                .await
832                .map_err(to_mcp_error)?,
833        };
834        builder = builder.with_chain_name(chain_name.clone());
835
836        let mut signed = false;
837        let mut signer_path = None;
838        let mut derived_signer = None;
839        if let Some(path) = request.signer_path {
840            let signer = self
841                .manager
842                .derived_account_for_path(&network, &path)
843                .await
844                .map_err(to_mcp_error)?;
845            verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
846                .await
847                .map_err(to_mcp_error)?;
848            derived_signer = Some(signer);
849            signed = true;
850            signer_path = Some(path);
851        } else if let Some(initiator_public_key) = request.initiator_public_key {
852            let public_key = PublicKey::from_hex(initiator_public_key.trim())
853                .with_context(|| "failed to parse initiator_public_key as hex public key")
854                .map_err(to_mcp_error)?;
855            builder = builder.with_initiator_addr(public_key);
856        } else {
857            return Err(ErrorData::invalid_params(
858                "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
859                None,
860            ));
861        }
862
863        if let Some(signer) = derived_signer.as_ref() {
864            builder = builder.with_secret_key(&signer.secret_key);
865        }
866
867        let tx = builder.build().map_err(to_mcp_error)?;
868        let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
869
870        Ok(ok_value(json!({
871            "network_name": request.network_name,
872            "node_id": request.node_id,
873            "chain_name": chain_name,
874            "wasm_path": request.wasm_path,
875            "signed": signed,
876            "signer_path": signer_path,
877            "transaction": tx_json,
878        })))
879    }
880
881    #[tool(
882        description = "Build, sign and submit a session wasm transaction from a derived account path. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
883    )]
884    async fn send_session_wasm(
885        &self,
886        Parameters(request): Parameters<SendSessionWasmRequest>,
887    ) -> std::result::Result<CallToolResult, ErrorData> {
888        let network = self
889            .manager
890            .get_network(&request.network_name)
891            .await
892            .map_err(to_mcp_error)?;
893        ensure_running_network(&network).await?;
894
895        let signer = self
896            .manager
897            .derived_account_for_path(&network, &request.signer_path)
898            .await
899            .map_err(to_mcp_error)?;
900        verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
901            .await
902            .map_err(to_mcp_error)?;
903
904        let wasm_path = PathBuf::from(&request.wasm_path);
905        let wasm_bytes = tokio_fs::read(&wasm_path)
906            .await
907            .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
908            .map_err(to_mcp_error)?;
909
910        let runtime = match (
911            request.runtime_transferred_value,
912            request.runtime_seed_hex.as_deref(),
913        ) {
914            (None, None) => TransactionRuntimeParams::VmCasperV1,
915            (transferred_value, maybe_seed_hex) => {
916                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
917                TransactionRuntimeParams::VmCasperV2 {
918                    transferred_value: transferred_value.unwrap_or(0),
919                    seed,
920                }
921            }
922        };
923
924        let mut builder = TransactionV1Builder::new_session(
925            request.is_install_upgrade.unwrap_or(true),
926            wasm_bytes.into(),
927            runtime,
928        );
929
930        if let Some(args_json) = request.session_args.as_ref()
931            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
932        {
933            builder = builder.with_runtime_args(runtime_args);
934        }
935
936        if let Some(ttl_millis) = request.ttl_millis {
937            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
938        }
939
940        let pricing = build_pricing_mode(
941            request.gas_price_tolerance,
942            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
943        );
944        builder = builder.with_pricing_mode(pricing);
945
946        let chain_name = match request.chain_name {
947            Some(value) => value,
948            None => fetch_chain_name(&request.network_name, request.node_id)
949                .await
950                .map_err(to_mcp_error)?,
951        };
952
953        let tx = builder
954            .with_chain_name(chain_name)
955            .with_secret_key(&signer.secret_key)
956            .build()
957            .map_err(to_mcp_error)?;
958
959        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
960        let response = rpc
961            .put_transaction(Transaction::V1(tx))
962            .await
963            .map_err(to_mcp_error)?;
964
965        Ok(ok_value(json!({
966            "network_name": request.network_name,
967            "node_id": request.node_id,
968            "transaction_hash": response.transaction_hash.to_hex_string(),
969        })))
970    }
971
972    #[tool(
973        description = "Transfer tokens between derived account paths. from_path signs, to_path resolves recipient."
974    )]
975    async fn transfer_tokens(
976        &self,
977        Parameters(request): Parameters<TransferTokensRequest>,
978    ) -> std::result::Result<CallToolResult, ErrorData> {
979        let network = self
980            .manager
981            .get_network(&request.network_name)
982            .await
983            .map_err(to_mcp_error)?;
984        ensure_running_network(&network).await?;
985
986        let from = self
987            .manager
988            .derived_account_for_path(&network, &request.from_path)
989            .await
990            .map_err(to_mcp_error)?;
991        let to = self
992            .manager
993            .derived_account_for_path(&network, &request.to_path)
994            .await
995            .map_err(to_mcp_error)?;
996
997        verify_path_hash_consistency(&network.layout, &request.from_path, &from.account_hash)
998            .await
999            .map_err(to_mcp_error)?;
1000        verify_path_hash_consistency(&network.layout, &request.to_path, &to.account_hash)
1001            .await
1002            .map_err(to_mcp_error)?;
1003
1004        let amount = casper_types::U512::from_dec_str(&request.amount)
1005            .with_context(|| "amount must be a decimal U512 string")
1006            .map_err(to_mcp_error)?;
1007        let to_public_key = PublicKey::from_hex(&to.public_key_hex)
1008            .with_context(|| "failed to parse derived recipient public key")
1009            .map_err(to_mcp_error)?;
1010
1011        let mut builder =
1012            TransactionV1Builder::new_transfer(amount, None, to_public_key, request.transfer_id)
1013                .map_err(to_mcp_error)?;
1014
1015        if let Some(ttl_millis) = request.ttl_millis {
1016            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
1017        }
1018
1019        let pricing = build_pricing_mode(
1020            request.gas_price_tolerance,
1021            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
1022        );
1023        builder = builder.with_pricing_mode(pricing);
1024
1025        let chain_name = match request.chain_name {
1026            Some(value) => value,
1027            None => fetch_chain_name(&request.network_name, request.node_id)
1028                .await
1029                .map_err(to_mcp_error)?,
1030        };
1031
1032        let tx = builder
1033            .with_chain_name(chain_name)
1034            .with_secret_key(&from.secret_key)
1035            .build()
1036            .map_err(to_mcp_error)?;
1037
1038        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1039        let response = rpc
1040            .put_transaction(Transaction::V1(tx))
1041            .await
1042            .map_err(to_mcp_error)?;
1043
1044        Ok(ok_value(json!({
1045            "network_name": request.network_name,
1046            "node_id": request.node_id,
1047            "transaction_hash": response.transaction_hash.to_hex_string(),
1048            "from_path": request.from_path,
1049            "to_path": request.to_path,
1050            "amount": request.amount,
1051        })))
1052    }
1053
1054    #[tool(
1055        description = "Wait for transaction execution result with timeout and return execution effects/result payload."
1056    )]
1057    async fn wait_transaction(
1058        &self,
1059        Parameters(request): Parameters<WaitTransactionRequest>,
1060    ) -> std::result::Result<CallToolResult, ErrorData> {
1061        let network = self
1062            .manager
1063            .get_network(&request.network_name)
1064            .await
1065            .map_err(to_mcp_error)?;
1066        ensure_running_network(&network).await?;
1067
1068        let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1069        let poll = Duration::from_millis(request.poll_interval_millis.unwrap_or(1_000));
1070        let tx_hash =
1071            parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1072        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1073        let deadline = Instant::now() + timeout;
1074
1075        loop {
1076            let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1077
1078            if let Some(exec_info) = response.execution_info.clone()
1079                && exec_info.execution_result.is_some()
1080            {
1081                return Ok(ok_value(
1082                    serde_json::to_value(response).map_err(internal_serde_error)?,
1083                ));
1084            }
1085
1086            if Instant::now() >= deadline {
1087                return Err(ErrorData::resource_not_found(
1088                    format!(
1089                        "transaction {} execution result not available before timeout",
1090                        request.transaction_hash
1091                    ),
1092                    None,
1093                ));
1094            }
1095
1096            tokio::time::sleep(poll).await;
1097        }
1098    }
1099
1100    #[tool(
1101        description = "Get transaction details via info_get_transaction (non-waiting). Returns typed JSON response from node."
1102    )]
1103    async fn get_transaction(
1104        &self,
1105        Parameters(request): Parameters<GetTransactionRequest>,
1106    ) -> std::result::Result<CallToolResult, ErrorData> {
1107        let network = self
1108            .manager
1109            .get_network(&request.network_name)
1110            .await
1111            .map_err(to_mcp_error)?;
1112        ensure_running_network(&network).await?;
1113
1114        let tx_hash =
1115            parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1116        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1117        let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1118
1119        Ok(ok_value(
1120            serde_json::to_value(response).map_err(internal_serde_error)?,
1121        ))
1122    }
1123}
1124
1125#[tool_handler]
1126impl ServerHandler for McpServer {
1127    fn get_info(&self) -> ServerInfo {
1128        Self::server_info()
1129    }
1130}
1131
1132#[derive(Debug)]
1133struct NetworkManager {
1134    assets_root: PathBuf,
1135    managed: Mutex<HashMap<String, Arc<ManagedNetwork>>>,
1136    http: reqwest::Client,
1137}
1138
1139#[derive(Debug)]
1140struct ManagedNetwork {
1141    layout: AssetsLayout,
1142    state: Arc<Mutex<State>>,
1143    node_count: AtomicU32,
1144    rust_log: String,
1145    seed: Arc<str>,
1146    sse_store: Arc<SseStore>,
1147    last_block_hook_hash: Mutex<Option<String>>,
1148    shutdown: Arc<AtomicBool>,
1149    control_shutdown: Arc<AtomicBool>,
1150    asset_mutation_lock: Mutex<()>,
1151    sse_tasks: Mutex<Vec<tokio::task::JoinHandle<()>>>,
1152    control_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
1153}
1154
1155impl ManagedNetwork {
1156    async fn is_running(&self) -> bool {
1157        let state = self.state.lock().await;
1158        processes_running(&state)
1159    }
1160
1161    async fn stop(&self) -> Result<()> {
1162        self.shutdown.store(true, Ordering::SeqCst);
1163        self.control_shutdown.store(true, Ordering::SeqCst);
1164
1165        if let Some(control_task) = self.control_task.lock().await.take() {
1166            control_task.abort();
1167            let _ = control_task.await;
1168        }
1169        let _ = tokio_fs::remove_file(self.layout.control_socket_path()).await;
1170
1171        let tasks = {
1172            let mut guard = self.sse_tasks.lock().await;
1173            guard.drain(..).collect::<Vec<_>>()
1174        };
1175        for task in tasks {
1176            task.abort();
1177            let _ = task.await;
1178        }
1179
1180        let mut state = self.state.lock().await;
1181        process::stop(&mut state).await
1182    }
1183}
1184
1185impl NetworkManager {
1186    async fn new(assets_root: PathBuf) -> Result<Self> {
1187        let http = reqwest::Client::builder()
1188            .no_proxy()
1189            .timeout(Duration::from_secs(5))
1190            .build()?;
1191        Ok(Self {
1192            assets_root,
1193            managed: Mutex::new(HashMap::new()),
1194            http,
1195        })
1196    }
1197
1198    async fn spawn_network(&self, request: SpawnNetworkRequest) -> Result<SpawnNetworkResponse> {
1199        let network_name = request
1200            .network_name
1201            .unwrap_or_else(|| DEFAULT_NETWORK_NAME.to_string());
1202
1203        if network_name.trim().is_empty() {
1204            return Err(anyhow!("network_name must not be empty"));
1205        }
1206
1207        let maybe_existing = {
1208            let mut managed = self.managed.lock().await;
1209            if let Some(existing) = managed.get(&network_name)
1210                && existing.is_running().await
1211                && !request.force_setup.unwrap_or(true)
1212            {
1213                return Ok(SpawnNetworkResponse {
1214                    network_name,
1215                    node_count: existing.node_count.load(Ordering::SeqCst),
1216                    managed: true,
1217                    already_running: true,
1218                    forced_setup: false,
1219                });
1220            }
1221            managed.remove(&network_name)
1222        };
1223
1224        if let Some(existing) = maybe_existing {
1225            let _ = existing.stop().await;
1226        }
1227
1228        let force_setup = request.force_setup.unwrap_or(true);
1229        let requested_nodes = request.node_count.unwrap_or(DEFAULT_NODE_COUNT);
1230        let users = request.users;
1231        let delay = request.delay.unwrap_or(DEFAULT_DELAY_SECS);
1232        let log_level = request
1233            .log_level
1234            .unwrap_or_else(|| DEFAULT_LOG_LEVEL.to_string());
1235        let node_log_format = request
1236            .node_log_format
1237            .unwrap_or_else(|| DEFAULT_NODE_LOG_FORMAT.to_string());
1238        let seed: Arc<str> = Arc::from(request.seed.unwrap_or_else(|| DEFAULT_SEED.to_string()));
1239
1240        let layout = AssetsLayout::new(self.assets_root.clone(), network_name.clone());
1241        let assets_exist = layout.exists().await;
1242
1243        let asset_selector = assets::optional_asset_selector(
1244            request.asset.as_deref(),
1245            request.custom_asset.as_deref(),
1246        )?;
1247
1248        let setup_result = if force_setup {
1249            assets::teardown(&layout).await?;
1250            Some(
1251                assets::setup_local(
1252                    &layout,
1253                    &SetupOptions {
1254                        nodes: requested_nodes,
1255                        users,
1256                        delay_seconds: delay,
1257                        network_name: network_name.clone(),
1258                        asset: asset_selector.clone(),
1259                        protocol_version: request.protocol_version.clone(),
1260                        chainspec_overrides: Vec::new(),
1261                        node_log_format,
1262                        seed: Arc::clone(&seed),
1263                    },
1264                )
1265                .await?,
1266            )
1267        } else if !assets_exist {
1268            Some(
1269                assets::setup_local(
1270                    &layout,
1271                    &SetupOptions {
1272                        nodes: requested_nodes,
1273                        users,
1274                        delay_seconds: delay,
1275                        network_name: network_name.clone(),
1276                        asset: asset_selector,
1277                        protocol_version: request.protocol_version.clone(),
1278                        chainspec_overrides: Vec::new(),
1279                        node_log_format,
1280                        seed: Arc::clone(&seed),
1281                    },
1282                )
1283                .await?,
1284            )
1285        } else {
1286            None
1287        };
1288
1289        if !layout.exists().await {
1290            return Err(anyhow!(
1291                "assets missing under {}; call spawn_network with force_setup=true",
1292                layout.net_dir().display()
1293            ));
1294        }
1295
1296        assets::ensure_network_hook_samples(&layout).await?;
1297
1298        if !force_setup && assets_exist {
1299            let _ = assets::ensure_consensus_keys(&layout, Arc::clone(&seed)).await?;
1300        }
1301
1302        let node_count = layout.count_nodes().await?;
1303        if node_count == 0 {
1304            return Err(anyhow!("network has no nodes to start"));
1305        }
1306
1307        ensure_sidecar_available(&layout, node_count).await?;
1308
1309        let state_path = layout.net_dir().join(STATE_FILE_NAME);
1310        if !tokio_fs::try_exists(&state_path).await.unwrap_or(false) {
1311            let protocol_version = match &setup_result {
1312                Some(result) => result.protocol_version.clone(),
1313                None => latest_layout_protocol_version(&layout).await?,
1314            };
1315            assets::prepare_genesis_hooks(&layout, &protocol_version).await?;
1316        }
1317        let mut state = State::new(state_path).await?;
1318
1319        let rust_log = log_level.clone();
1320        process::start(
1321            &layout,
1322            &StartPlan {
1323                rust_log: rust_log.clone(),
1324            },
1325            &mut state,
1326        )
1327        .await?;
1328
1329        let managed = Arc::new(ManagedNetwork {
1330            layout: layout.clone(),
1331            state: Arc::new(Mutex::new(state)),
1332            node_count: AtomicU32::new(node_count),
1333            rust_log,
1334            seed,
1335            sse_store: Arc::new(SseStore::new(DEFAULT_SSE_HISTORY_CAPACITY)),
1336            last_block_hook_hash: Mutex::new(None),
1337            shutdown: Arc::new(AtomicBool::new(false)),
1338            control_shutdown: Arc::new(AtomicBool::new(false)),
1339            asset_mutation_lock: Mutex::new(()),
1340            sse_tasks: Mutex::new(Vec::new()),
1341            control_task: Mutex::new(None),
1342        });
1343        spawn_pid_sync_tasks(Arc::clone(&managed.state)).await;
1344
1345        self.spawn_sse_collectors(&managed).await;
1346        if let Err(err) = self.spawn_control_server(&managed).await {
1347            let _ = managed.stop().await;
1348            return Err(err);
1349        }
1350
1351        self.managed
1352            .lock()
1353            .await
1354            .insert(network_name.clone(), Arc::clone(&managed));
1355
1356        Ok(SpawnNetworkResponse {
1357            network_name,
1358            node_count,
1359            managed: true,
1360            already_running: false,
1361            forced_setup: force_setup,
1362        })
1363    }
1364
1365    async fn spawn_sse_collectors(&self, network: &Arc<ManagedNetwork>) {
1366        let mut tasks = Vec::new();
1367        for node_id in 1..=network.node_count.load(Ordering::SeqCst) {
1368            let endpoint = assets::sse_endpoint(node_id);
1369            let network = Arc::clone(network);
1370            let task = tokio::spawn(async move {
1371                run_sse_listener(network, node_id, endpoint).await;
1372            });
1373            tasks.push(task);
1374        }
1375        let mut guard = network.sse_tasks.lock().await;
1376        guard.extend(tasks);
1377    }
1378
1379    async fn spawn_control_server(&self, network: &Arc<ManagedNetwork>) -> Result<()> {
1380        let socket_path = network.layout.control_socket_path();
1381        if let Ok(metadata) = tokio_fs::symlink_metadata(&socket_path).await {
1382            if metadata.is_dir() {
1383                tokio_fs::remove_dir_all(&socket_path).await?;
1384            } else {
1385                tokio_fs::remove_file(&socket_path).await?;
1386            }
1387        }
1388        if let Some(parent) = socket_path.parent() {
1389            tokio_fs::create_dir_all(parent).await?;
1390        }
1391
1392        let listener = UnixListener::bind(&socket_path)?;
1393        let network_for_task = Arc::clone(network);
1394        let shutdown = Arc::clone(&network_for_task.control_shutdown);
1395        let task = tokio::spawn(async move {
1396            loop {
1397                if shutdown.load(Ordering::SeqCst) {
1398                    break;
1399                }
1400
1401                let accepted =
1402                    tokio::time::timeout(Duration::from_millis(250), listener.accept()).await;
1403                let (stream, _) = match accepted {
1404                    Ok(Ok(pair)) => pair,
1405                    Ok(Err(_)) => break,
1406                    Err(_) => continue,
1407                };
1408                let network = Arc::clone(&network_for_task);
1409                tokio::spawn(async move {
1410                    handle_managed_control_stream(stream, network).await;
1411                });
1412            }
1413
1414            let _ = tokio_fs::remove_file(&socket_path).await;
1415        });
1416        *network.control_task.lock().await = Some(task);
1417        Ok(())
1418    }
1419
1420    async fn stage_protocol(
1421        &self,
1422        network_name: &str,
1423        asset: Option<&str>,
1424        custom_asset: Option<&str>,
1425        legacy_asset_name: Option<&str>,
1426        protocol_version: &str,
1427        activation_point: u64,
1428    ) -> Result<StageProtocolResponse> {
1429        let asset_selector = stage_asset_selector(asset, custom_asset, legacy_asset_name)?;
1430        let managed = {
1431            let guard = self.managed.lock().await;
1432            guard.get(network_name).cloned()
1433        };
1434
1435        if let Some(network) = managed
1436            && network.is_running().await
1437        {
1438            if let Ok(Some(current_era)) = read_current_era_from_status(1).await
1439                && activation_point <= current_era
1440            {
1441                return Err(anyhow!(
1442                    "activation point {} must be greater than current era {}",
1443                    activation_point,
1444                    current_era
1445                ));
1446            }
1447
1448            let request = ControlRequest::StageProtocol {
1449                asset: match &asset_selector {
1450                    AssetSelector::Versioned(asset) => Some(asset.clone()),
1451                    AssetSelector::LatestVersioned | AssetSelector::Custom(_) => None,
1452                },
1453                custom_asset: match &asset_selector {
1454                    AssetSelector::Custom(asset) => Some(asset.clone()),
1455                    AssetSelector::LatestVersioned | AssetSelector::Versioned(_) => None,
1456                },
1457                asset_name: None,
1458                protocol_version: protocol_version.to_string(),
1459                activation_point,
1460                chainspec_overrides: Vec::new(),
1461                restart_sidecars: true,
1462                rust_log: Some(network.rust_log.clone()),
1463            };
1464            let socket_path = network.layout.control_socket_path();
1465            return match send_request(&socket_path, &request).await {
1466                Ok(ControlResponse::Ok { result }) => match result {
1467                    ControlResult::StageProtocol {
1468                        live_mode,
1469                        staged_nodes,
1470                        restarted_sidecars,
1471                    } => Ok(StageProtocolResponse {
1472                        network_name: network_name.to_string(),
1473                        live_mode,
1474                        staged_nodes,
1475                        restarted_sidecars,
1476                    }),
1477                    ControlResult::RuntimeStatus { .. } => Err(anyhow!(
1478                        "unexpected runtime_status response from {}",
1479                        socket_path.display()
1480                    )),
1481                    ControlResult::AddNodes { .. } => Err(anyhow!(
1482                        "unexpected add_nodes response from {}",
1483                        socket_path.display()
1484                    )),
1485                },
1486                Ok(ControlResponse::Error { error }) => Err(anyhow!(error)),
1487                Err(err) => Err(anyhow!(
1488                    "failed to reach managed control socket {}: {}",
1489                    socket_path.display(),
1490                    err
1491                )),
1492            };
1493        }
1494
1495        let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1496        if !layout.exists().await {
1497            return Err(anyhow!(
1498                "assets for {} not found under {}",
1499                network_name,
1500                layout.net_dir().display()
1501            ));
1502        }
1503
1504        let staged = assets::stage_protocol(
1505            &layout,
1506            &StageProtocolOptions {
1507                asset: asset_selector,
1508                protocol_version: protocol_version.to_string(),
1509                activation_point,
1510                chainspec_overrides: Vec::new(),
1511            },
1512        )
1513        .await?;
1514
1515        Ok(StageProtocolResponse {
1516            network_name: network_name.to_string(),
1517            live_mode: false,
1518            staged_nodes: staged.staged_nodes,
1519            restarted_sidecars: Vec::new(),
1520        })
1521    }
1522
1523    async fn wait_network_ready(
1524        &self,
1525        network_name: &str,
1526        timeout_seconds: Option<u64>,
1527    ) -> Result<WaitReadyResponse> {
1528        let network = self.get_network(network_name).await?;
1529        let timeout = Duration::from_secs(timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1530        let deadline = Instant::now() + timeout;
1531
1532        loop {
1533            let running = ensure_running_network(&network).await.is_ok();
1534            if running {
1535                let status = check_rest_ready(&network).await;
1536                if let Ok(rest_by_node) = status {
1537                    let state = network.state.lock().await;
1538                    let block_observed = state.last_block_height.is_some()
1539                        || rest_by_node.values().any(rest_has_block);
1540                    if block_observed {
1541                        return Ok(WaitReadyResponse {
1542                            network_name: network_name.to_string(),
1543                            ready: true,
1544                            node_count: network.node_count.load(Ordering::SeqCst),
1545                            rest: rest_by_node,
1546                            last_block_height: state.last_block_height,
1547                        });
1548                    }
1549                }
1550            }
1551
1552            if Instant::now() >= deadline {
1553                return Ok(WaitReadyResponse {
1554                    network_name: network_name.to_string(),
1555                    ready: false,
1556                    node_count: network.node_count.load(Ordering::SeqCst),
1557                    rest: HashMap::new(),
1558                    last_block_height: None,
1559                });
1560            }
1561
1562            tokio::time::sleep(Duration::from_millis(500)).await;
1563        }
1564    }
1565
1566    async fn despawn_network(&self, network_name: &str, purge: bool) -> Result<DespawnResponse> {
1567        let managed = {
1568            let mut guard = self.managed.lock().await;
1569            guard.remove(network_name)
1570        };
1571
1572        let Some(network) = managed else {
1573            return Err(anyhow!("network '{}' is not managed", network_name));
1574        };
1575
1576        network.stop().await?;
1577        if purge {
1578            assets::teardown(&network.layout).await?;
1579        }
1580
1581        Ok(DespawnResponse {
1582            network_name: network_name.to_string(),
1583            purged: purge,
1584        })
1585    }
1586
1587    async fn list_networks(&self) -> Result<ListNetworksResponse> {
1588        let discovered = discover_network_names(&self.assets_root).await?;
1589        let managed_snapshot = {
1590            let guard = self.managed.lock().await;
1591            guard
1592                .iter()
1593                .map(|(name, network)| (name.clone(), Arc::clone(network)))
1594                .collect::<Vec<_>>()
1595        };
1596
1597        let mut rows = Vec::new();
1598
1599        for name in &discovered {
1600            if let Some((_, network)) = managed_snapshot.iter().find(|(n, _)| n == name) {
1601                rows.push(NetworkRow {
1602                    network_name: name.clone(),
1603                    discovered: true,
1604                    managed: true,
1605                    running: network.is_running().await,
1606                    node_count: Some(network.node_count.load(Ordering::SeqCst)),
1607                });
1608            } else {
1609                let layout = AssetsLayout::new(self.assets_root.clone(), name.clone());
1610                rows.push(NetworkRow {
1611                    network_name: name.clone(),
1612                    discovered: true,
1613                    managed: false,
1614                    running: false,
1615                    node_count: layout.count_nodes().await.ok(),
1616                });
1617            }
1618        }
1619
1620        for (name, network) in managed_snapshot {
1621            if !discovered.iter().any(|candidate| candidate == &name) {
1622                rows.push(NetworkRow {
1623                    network_name: name,
1624                    discovered: false,
1625                    managed: true,
1626                    running: network.is_running().await,
1627                    node_count: Some(network.node_count.load(Ordering::SeqCst)),
1628                });
1629            }
1630        }
1631
1632        rows.sort_by(|a, b| a.network_name.cmp(&b.network_name));
1633
1634        Ok(ListNetworksResponse { networks: rows })
1635    }
1636
1637    async fn managed_processes(
1638        &self,
1639        request: ManagedProcessesRequest,
1640    ) -> Result<ManagedProcessesResponse> {
1641        let network = self.get_network(&request.network_name).await?;
1642        let running_only = request.running_only.unwrap_or(true);
1643        let process_name = request
1644            .process_name
1645            .as_deref()
1646            .map(str::trim)
1647            .filter(|name| !name.is_empty())
1648            .map(ToString::to_string);
1649
1650        let process_name_lc = process_name.as_ref().map(|name| name.to_ascii_lowercase());
1651        let state = network.state.lock().await;
1652        let mut processes = Vec::new();
1653        for process in &state.processes {
1654            if let Some(name_lc) = process_name_lc.as_deref()
1655                && !process.id.to_ascii_lowercase().contains(name_lc)
1656            {
1657                continue;
1658            }
1659
1660            let pid = process.current_pid();
1661            let running = matches!(process.last_status, ProcessStatus::Running)
1662                && pid.is_some_and(is_pid_running);
1663            if running_only && !running {
1664                continue;
1665            }
1666
1667            processes.push(ManagedProcessRow {
1668                id: process.id.clone(),
1669                node_id: process.node_id,
1670                kind: process_kind_name(&process.kind).to_string(),
1671                pid,
1672                running,
1673                last_status: process_status_name(&process.last_status).to_string(),
1674                command: process.command.clone(),
1675                args: process.args.clone(),
1676                cwd: process.cwd.clone(),
1677                stdout_path: process.stdout_path.clone(),
1678                stderr_path: process.stderr_path.clone(),
1679            });
1680        }
1681
1682        Ok(ManagedProcessesResponse {
1683            network_name: request.network_name,
1684            running_only,
1685            process_name,
1686            processes,
1687        })
1688    }
1689
1690    async fn get_network(&self, network_name: &str) -> Result<Arc<ManagedNetwork>> {
1691        let guard = self.managed.lock().await;
1692        guard.get(network_name).cloned().ok_or_else(|| {
1693            anyhow!(
1694                "network '{}' is not managed; call spawn_network first",
1695                network_name
1696            )
1697        })
1698    }
1699
1700    async fn stop_all_networks(&self) -> Result<()> {
1701        let managed = {
1702            let mut guard = self.managed.lock().await;
1703            guard
1704                .drain()
1705                .map(|(_, network)| network)
1706                .collect::<Vec<_>>()
1707        };
1708
1709        let mut errors = Vec::new();
1710        for network in managed {
1711            if let Err(err) = network.stop().await {
1712                errors.push(err.to_string());
1713            }
1714        }
1715
1716        if errors.is_empty() {
1717            Ok(())
1718        } else {
1719            Err(anyhow!(errors.join("\n")))
1720        }
1721    }
1722
1723    async fn raw_rpc_query(
1724        &self,
1725        node_id: u32,
1726        method: &str,
1727        params: Option<Value>,
1728    ) -> Result<Value> {
1729        let endpoint = assets::rpc_endpoint(node_id);
1730        let payload = match params {
1731            Some(params) => json!({
1732                "id": 1,
1733                "jsonrpc": "2.0",
1734                "method": method,
1735                "params": params,
1736            }),
1737            None => json!({
1738                "id": 1,
1739                "jsonrpc": "2.0",
1740                "method": method,
1741            }),
1742        };
1743
1744        let response = self
1745            .http
1746            .post(endpoint)
1747            .json(&payload)
1748            .send()
1749            .await?
1750            .error_for_status()?;
1751        Ok(response.json::<Value>().await?)
1752    }
1753
1754    async fn list_derived_accounts(&self, network_name: &str) -> Result<Vec<DerivedAccountRow>> {
1755        let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1756        let csv = assets::derived_accounts_summary(&layout)
1757            .await
1758            .ok_or_else(|| {
1759                anyhow!(
1760                    "missing derived-accounts.csv for network '{}'",
1761                    network_name
1762                )
1763            })?;
1764
1765        let seed = {
1766            let managed = self.managed.lock().await;
1767            managed
1768                .get(network_name)
1769                .map(|network| Arc::clone(&network.seed))
1770        };
1771
1772        parse_derived_accounts_csv(&csv, seed).await
1773    }
1774
1775    async fn derived_account_for_path(
1776        &self,
1777        network: &Arc<ManagedNetwork>,
1778        path: &str,
1779    ) -> Result<DerivedSigner> {
1780        let material =
1781            assets::derive_account_from_seed_path(Arc::clone(&network.seed), path).await?;
1782        let secret_key = SecretKey::from_pem(&material.secret_key_pem)?;
1783        Ok(DerivedSigner {
1784            public_key_hex: material.public_key_hex,
1785            account_hash: material.account_hash,
1786            secret_key,
1787        })
1788    }
1789}
1790
1791async fn handle_managed_control_stream(
1792    mut stream: tokio::net::UnixStream,
1793    network: Arc<ManagedNetwork>,
1794) {
1795    let mut request_bytes = Vec::new();
1796    let response = match stream.read_to_end(&mut request_bytes).await {
1797        Ok(_) => match serde_json::from_slice::<ControlRequest>(&request_bytes) {
1798            Ok(request) => handle_managed_control_request(&network, request).await,
1799            Err(err) => ControlResponse::Error {
1800                error: format!("invalid control request: {}", err),
1801            },
1802        },
1803        Err(err) => ControlResponse::Error {
1804            error: format!("failed to read control request: {}", err),
1805        },
1806    };
1807
1808    let response_bytes = serde_json::to_vec(&response).unwrap_or_else(|err| {
1809        format!(
1810            "{{\"status\":\"error\",\"error\":\"failed to serialize control response: {}\"}}",
1811            err
1812        )
1813        .into_bytes()
1814    });
1815    let _ = stream.write_all(&response_bytes).await;
1816    let _ = stream.shutdown().await;
1817}
1818
1819async fn spawn_added_sse_collectors(network: &Arc<ManagedNetwork>, node_ids: &[u32]) {
1820    let mut tasks = Vec::new();
1821    for node_id in node_ids {
1822        let node_id = *node_id;
1823        let endpoint = assets::sse_endpoint(node_id);
1824        let network = Arc::clone(network);
1825        let task = tokio::spawn(async move {
1826            run_sse_listener(network, node_id, endpoint).await;
1827        });
1828        tasks.push(task);
1829    }
1830    let mut guard = network.sse_tasks.lock().await;
1831    guard.extend(tasks);
1832}
1833
1834async fn handle_managed_control_request(
1835    network: &Arc<ManagedNetwork>,
1836    request: ControlRequest,
1837) -> ControlResponse {
1838    match request {
1839        ControlRequest::RuntimeStatus => {
1840            let state = network.state.lock().await;
1841            ControlResponse::Ok {
1842                result: ControlResult::RuntimeStatus {
1843                    running_node_ids: running_node_ids(&state),
1844                    last_block_height: state.last_block_height,
1845                },
1846            }
1847        }
1848        ControlRequest::StageProtocol {
1849            asset,
1850            custom_asset,
1851            asset_name,
1852            protocol_version,
1853            activation_point,
1854            chainspec_overrides,
1855            restart_sidecars,
1856            rust_log,
1857        } => {
1858            let asset_selector = match stage_asset_selector(
1859                asset.as_deref(),
1860                custom_asset.as_deref(),
1861                asset_name.as_deref(),
1862            ) {
1863                Ok(asset_selector) => asset_selector,
1864                Err(err) => {
1865                    return ControlResponse::Error {
1866                        error: err.to_string(),
1867                    };
1868                }
1869            };
1870            let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1871            if let Err(err) =
1872                assets::ensure_consensus_keys(&network.layout, Arc::clone(&network.seed)).await
1873            {
1874                return ControlResponse::Error {
1875                    error: format!("failed to recreate consensus keys: {}", err),
1876                };
1877            }
1878
1879            let staged = assets::stage_protocol(
1880                &network.layout,
1881                &StageProtocolOptions {
1882                    asset: asset_selector,
1883                    protocol_version,
1884                    activation_point,
1885                    chainspec_overrides,
1886                },
1887            )
1888            .await;
1889            let staged = match staged {
1890                Ok(staged) => staged,
1891                Err(err) => {
1892                    return ControlResponse::Error {
1893                        error: err.to_string(),
1894                    };
1895                }
1896            };
1897
1898            let mut restarted_sidecars = Vec::new();
1899            if restart_sidecars {
1900                let rust_log = rust_log.unwrap_or_else(|| network.rust_log.clone());
1901                let mut state = network.state.lock().await;
1902                match process::restart_sidecars(&network.layout, &mut state, &rust_log).await {
1903                    Ok(restarted) => {
1904                        for proc in restarted {
1905                            restarted_sidecars.push(proc.record.node_id);
1906                        }
1907                    }
1908                    Err(err) => {
1909                        return ControlResponse::Error {
1910                            error: err.to_string(),
1911                        };
1912                    }
1913                }
1914            }
1915
1916            ControlResponse::Ok {
1917                result: ControlResult::StageProtocol {
1918                    live_mode: true,
1919                    staged_nodes: staged.staged_nodes,
1920                    restarted_sidecars,
1921                },
1922            }
1923        }
1924        ControlRequest::AddNodes { count } => {
1925            let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1926            let added = match assets::add_nodes(
1927                &network.layout,
1928                &AddNodesOptions {
1929                    count,
1930                    seed: Arc::clone(&network.seed),
1931                },
1932            )
1933            .await
1934            {
1935                Ok(added) => added,
1936                Err(err) => {
1937                    return ControlResponse::Error {
1938                        error: err.to_string(),
1939                    };
1940                }
1941            };
1942
1943            let started = {
1944                let mut state = network.state.lock().await;
1945                process::start_added_nodes(
1946                    &network.layout,
1947                    &mut state,
1948                    &added.added_node_ids,
1949                    added.total_nodes,
1950                    &network.rust_log,
1951                )
1952                .await
1953            };
1954            let started = match started {
1955                Ok(started) => started,
1956                Err(err) => {
1957                    let error =
1958                        rollback_added_nodes_after_start_error(&network.layout, &added, err).await;
1959                    return ControlResponse::Error { error };
1960                }
1961            };
1962            let process_ids = started
1963                .iter()
1964                .map(|proc| proc.record.id.clone())
1965                .collect::<Vec<_>>();
1966            let started_processes = started.len() as u32;
1967            network
1968                .node_count
1969                .store(added.total_nodes, Ordering::SeqCst);
1970            spawn_added_sse_collectors(network, &added.added_node_ids).await;
1971            spawn_pid_sync_tasks_for_ids(Arc::clone(&network.state), &process_ids).await;
1972
1973            ControlResponse::Ok {
1974                result: ControlResult::AddNodes {
1975                    added_node_ids: added.added_node_ids,
1976                    total_nodes: added.total_nodes,
1977                    started_processes,
1978                },
1979            }
1980        }
1981    }
1982}
1983
1984async fn rollback_added_nodes_after_start_error(
1985    layout: &AssetsLayout,
1986    added: &assets::AddNodesResult,
1987    err: anyhow::Error,
1988) -> String {
1989    match assets::rollback_added_nodes(layout, &added.added_node_ids).await {
1990        Ok(()) => err.to_string(),
1991        Err(rollback_err) => format!(
1992            "{}; failed to roll back added node assets: {}",
1993            err, rollback_err
1994        ),
1995    }
1996}
1997
1998async fn read_current_era_from_status(node_id: u32) -> Result<Option<u64>> {
1999    let value = fetch_rest_status(node_id).await?;
2000    Ok(value
2001        .get("last_added_block_info")
2002        .and_then(|info| info.get("era_id"))
2003        .and_then(|era_id| {
2004            era_id
2005                .as_u64()
2006                .or_else(|| era_id.as_str().and_then(|raw| raw.parse::<u64>().ok()))
2007        }))
2008}
2009
2010#[derive(Debug)]
2011struct SseStore {
2012    sequence: AtomicU64,
2013    events: Mutex<VecDeque<SseRecord>>,
2014    notify: Notify,
2015    capacity: usize,
2016}
2017
2018impl SseStore {
2019    fn new(capacity: usize) -> Self {
2020        Self {
2021            sequence: AtomicU64::new(0),
2022            events: Mutex::new(VecDeque::new()),
2023            notify: Notify::new(),
2024            capacity,
2025        }
2026    }
2027
2028    async fn latest_sequence(&self) -> u64 {
2029        self.sequence.load(Ordering::SeqCst)
2030    }
2031
2032    async fn push(&self, node_id: u32, event: SseEvent) {
2033        let sequence = self.sequence.fetch_add(1, Ordering::SeqCst) + 1;
2034        let event_type = sse_event_type(&event).to_string();
2035        let payload = sse_event_payload(&event);
2036
2037        let record = SseRecord {
2038            sequence,
2039            timestamp_rfc3339: timestamp_prefix(),
2040            node_id,
2041            event_type,
2042            payload,
2043        };
2044
2045        let mut guard = self.events.lock().await;
2046        guard.push_back(record);
2047        while guard.len() > self.capacity {
2048            let _ = guard.pop_front();
2049        }
2050        drop(guard);
2051
2052        self.notify.notify_waiters();
2053    }
2054
2055    async fn wait_next(
2056        &self,
2057        after_sequence: u64,
2058        filter: &SseFilter,
2059        timeout: Duration,
2060    ) -> Result<SseRecord> {
2061        let deadline = Instant::now() + timeout;
2062        loop {
2063            if let Some(record) = self.find_first_after(after_sequence, filter).await {
2064                return Ok(record);
2065            }
2066
2067            if Instant::now() >= deadline {
2068                return Err(anyhow!("timed out waiting for SSE event"));
2069            }
2070
2071            let remaining = deadline.saturating_duration_since(Instant::now());
2072            tokio::time::timeout(remaining, self.notify.notified()).await?;
2073        }
2074    }
2075
2076    async fn find_first_after(&self, after_sequence: u64, filter: &SseFilter) -> Option<SseRecord> {
2077        let guard = self.events.lock().await;
2078        guard
2079            .iter()
2080            .find(|event| event.sequence > after_sequence && filter.matches(event))
2081            .cloned()
2082    }
2083
2084    async fn history(
2085        &self,
2086        before_sequence: Option<u64>,
2087        limit: usize,
2088        filter: &SseFilter,
2089    ) -> SseHistoryPage {
2090        let before = before_sequence.unwrap_or(u64::MAX);
2091        let guard = self.events.lock().await;
2092
2093        let mut matched = guard
2094            .iter()
2095            .filter(|event| event.sequence < before)
2096            .filter(|event| filter.matches(event))
2097            .cloned()
2098            .collect::<Vec<_>>();
2099
2100        let total = matched.len();
2101
2102        if matched.len() > limit {
2103            matched = matched.split_off(matched.len() - limit);
2104        }
2105
2106        let next_before_sequence = matched.first().map(|event| event.sequence);
2107        SseHistoryPage {
2108            total_matching: total,
2109            returned: matched.len(),
2110            next_before_sequence,
2111            events: matched,
2112        }
2113    }
2114}
2115
2116#[derive(Debug, Default)]
2117struct SseFilter {
2118    include_event_types: Vec<String>,
2119    exclude_event_types: Vec<String>,
2120}
2121
2122impl SseFilter {
2123    fn matches(&self, event: &SseRecord) -> bool {
2124        let include = if self.include_event_types.is_empty() {
2125            true
2126        } else {
2127            self.include_event_types
2128                .iter()
2129                .any(|name| name == &event.event_type)
2130        };
2131        if !include {
2132            return false;
2133        }
2134        !self
2135            .exclude_event_types
2136            .iter()
2137            .any(|name| name == &event.event_type)
2138    }
2139}
2140
2141#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2142struct SseRecord {
2143    sequence: u64,
2144    timestamp_rfc3339: String,
2145    node_id: u32,
2146    event_type: String,
2147    payload: Value,
2148}
2149
2150#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2151struct SseHistoryPage {
2152    total_matching: usize,
2153    returned: usize,
2154    next_before_sequence: Option<u64>,
2155    events: Vec<SseRecord>,
2156}
2157
2158#[derive(Debug)]
2159struct DerivedSigner {
2160    public_key_hex: String,
2161    account_hash: String,
2162    secret_key: SecretKey,
2163}
2164
2165#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2166struct SpawnNetworkRequest {
2167    network_name: Option<String>,
2168    asset: Option<String>,
2169    custom_asset: Option<String>,
2170    protocol_version: Option<String>,
2171    node_count: Option<u32>,
2172    users: Option<u32>,
2173    delay: Option<u64>,
2174    log_level: Option<String>,
2175    node_log_format: Option<String>,
2176    seed: Option<String>,
2177    force_setup: Option<bool>,
2178}
2179
2180#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2181struct SpawnNetworkResponse {
2182    network_name: String,
2183    node_count: u32,
2184    managed: bool,
2185    already_running: bool,
2186    forced_setup: bool,
2187}
2188
2189#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2190struct WaitNetworkReadyRequest {
2191    network_name: String,
2192    timeout_seconds: Option<u64>,
2193}
2194
2195#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2196struct WaitReadyResponse {
2197    network_name: String,
2198    ready: bool,
2199    node_count: u32,
2200    rest: HashMap<u32, Value>,
2201    last_block_height: Option<u64>,
2202}
2203
2204#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2205struct StageProtocolRequest {
2206    network_name: String,
2207    asset: Option<String>,
2208    custom_asset: Option<String>,
2209    asset_name: Option<String>,
2210    protocol_version: String,
2211    activation_point: u64,
2212}
2213
2214#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2215struct StageProtocolResponse {
2216    network_name: String,
2217    live_mode: bool,
2218    staged_nodes: u32,
2219    restarted_sidecars: Vec<u32>,
2220}
2221
2222#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2223struct DespawnNetworkRequest {
2224    network_name: String,
2225    purge: Option<bool>,
2226}
2227
2228#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2229struct DespawnResponse {
2230    network_name: String,
2231    purged: bool,
2232}
2233
2234#[derive(Debug, Deserialize, Default, rmcp::schemars::JsonSchema)]
2235struct ListNetworksRequest {}
2236
2237#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2238struct ListNetworksResponse {
2239    networks: Vec<NetworkRow>,
2240}
2241
2242#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2243struct NetworkRow {
2244    network_name: String,
2245    discovered: bool,
2246    managed: bool,
2247    running: bool,
2248    node_count: Option<u32>,
2249}
2250
2251#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2252struct ManagedProcessesRequest {
2253    network_name: String,
2254    process_name: Option<String>,
2255    running_only: Option<bool>,
2256}
2257
2258#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2259struct ManagedProcessesResponse {
2260    network_name: String,
2261    running_only: bool,
2262    process_name: Option<String>,
2263    processes: Vec<ManagedProcessRow>,
2264}
2265
2266#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2267struct ManagedProcessRow {
2268    id: String,
2269    node_id: u32,
2270    kind: String,
2271    pid: Option<u32>,
2272    running: bool,
2273    last_status: String,
2274    command: String,
2275    args: Vec<String>,
2276    cwd: String,
2277    stdout_path: String,
2278    stderr_path: String,
2279}
2280
2281#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2282struct NodeScopedRequest {
2283    network_name: String,
2284    node_id: u32,
2285}
2286
2287#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2288struct RpcQueryRequest {
2289    network_name: String,
2290    node_id: u32,
2291    method: String,
2292    params: Option<Value>,
2293}
2294
2295#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2296struct RpcQueryBalanceRequest {
2297    network_name: String,
2298    node_id: u32,
2299    purse_identifier: String,
2300    block_id: Option<String>,
2301    state_root_hash: Option<String>,
2302}
2303
2304#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2305struct RpcQueryGlobalStateRequest {
2306    network_name: String,
2307    node_id: u32,
2308    key: String,
2309    path: Option<Vec<String>>,
2310    block_id: Option<String>,
2311    state_root_hash: Option<String>,
2312}
2313
2314#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2315struct CurrentBlockRequest {
2316    network_name: String,
2317    node_id: u32,
2318    block_id: Option<String>,
2319}
2320
2321#[derive(Debug, Clone, Copy, Deserialize, Serialize, rmcp::schemars::JsonSchema)]
2322#[serde(rename_all = "lowercase")]
2323enum NodeLogStream {
2324    Stdout,
2325    Stderr,
2326}
2327
2328#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2329struct GetNodeLogsRequest {
2330    network_name: String,
2331    node_id: u32,
2332    stream: NodeLogStream,
2333    limit: Option<usize>,
2334    before_line: Option<usize>,
2335}
2336
2337#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2338struct LogLine {
2339    line_number: usize,
2340    content: String,
2341}
2342
2343#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2344struct LogPage {
2345    path: String,
2346    total_lines: usize,
2347    returned: usize,
2348    next_before_line: Option<usize>,
2349    lines: Vec<LogLine>,
2350}
2351
2352#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2353struct WaitNextSseRequest {
2354    network_name: String,
2355    include_event_types: Option<Vec<String>>,
2356    exclude_event_types: Option<Vec<String>>,
2357    after_sequence: Option<u64>,
2358    timeout_seconds: Option<u64>,
2359}
2360
2361#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2362struct SseHistoryRequest {
2363    network_name: String,
2364    include_event_types: Option<Vec<String>>,
2365    exclude_event_types: Option<Vec<String>>,
2366    before_sequence: Option<u64>,
2367    limit: Option<usize>,
2368}
2369
2370#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2371struct ListDerivedAccountsRequest {
2372    network_name: String,
2373}
2374
2375#[derive(Debug, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2376struct DerivedAccountRow {
2377    kind: String,
2378    name: String,
2379    key_type: String,
2380    derivation: String,
2381    path: String,
2382    account_hash: String,
2383    balance: String,
2384    public_key: Option<String>,
2385}
2386
2387#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2388struct SendTransactionSignedRequest {
2389    network_name: String,
2390    node_id: u32,
2391    signer_path: String,
2392    transaction: Value,
2393}
2394
2395#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2396struct MakeTransactionPackageCallRequest {
2397    network_name: String,
2398    node_id: u32,
2399    transaction_package_name: String,
2400    transaction_package_version: Option<EntityVersion>,
2401    session_entry_point: String,
2402    #[serde(alias = "session_args_json")]
2403    session_args: Option<Value>,
2404    signer_path: Option<String>,
2405    initiator_public_key: Option<String>,
2406    chain_name: Option<String>,
2407    ttl_millis: Option<u64>,
2408    gas_price_tolerance: Option<u8>,
2409    payment_amount: Option<u64>,
2410    runtime_transferred_value: Option<u64>,
2411    runtime_seed_hex: Option<String>,
2412}
2413
2414#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2415struct MakeTransactionContractCallRequest {
2416    network_name: String,
2417    node_id: u32,
2418    transaction_contract_hash: String,
2419    session_entry_point: String,
2420    #[serde(alias = "session_args_json")]
2421    session_args: Option<Value>,
2422    signer_path: Option<String>,
2423    initiator_public_key: Option<String>,
2424    chain_name: Option<String>,
2425    ttl_millis: Option<u64>,
2426    gas_price_tolerance: Option<u8>,
2427    payment_amount: Option<u64>,
2428    runtime_transferred_value: Option<u64>,
2429    runtime_seed_hex: Option<String>,
2430}
2431
2432#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2433struct MakeTransactionSessionWasmRequest {
2434    network_name: String,
2435    node_id: u32,
2436    wasm_path: String,
2437    #[serde(alias = "session_args_json")]
2438    session_args: Option<Value>,
2439    is_install_upgrade: Option<bool>,
2440    signer_path: Option<String>,
2441    initiator_public_key: Option<String>,
2442    chain_name: Option<String>,
2443    ttl_millis: Option<u64>,
2444    gas_price_tolerance: Option<u8>,
2445    payment_amount: Option<u64>,
2446    runtime_transferred_value: Option<u64>,
2447    runtime_seed_hex: Option<String>,
2448}
2449
2450#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2451struct SendSessionWasmRequest {
2452    network_name: String,
2453    node_id: u32,
2454    signer_path: String,
2455    wasm_path: String,
2456    #[serde(alias = "session_args_json")]
2457    session_args: Option<Value>,
2458    chain_name: Option<String>,
2459    is_install_upgrade: Option<bool>,
2460    ttl_millis: Option<u64>,
2461    gas_price_tolerance: Option<u8>,
2462    payment_amount: Option<u64>,
2463    runtime_transferred_value: Option<u64>,
2464    runtime_seed_hex: Option<String>,
2465}
2466
2467#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2468struct TransferTokensRequest {
2469    network_name: String,
2470    node_id: u32,
2471    from_path: String,
2472    to_path: String,
2473    amount: String,
2474    transfer_id: Option<u64>,
2475    chain_name: Option<String>,
2476    ttl_millis: Option<u64>,
2477    gas_price_tolerance: Option<u8>,
2478    payment_amount: Option<u64>,
2479}
2480
2481#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2482struct WaitTransactionRequest {
2483    network_name: String,
2484    node_id: u32,
2485    transaction_hash: String,
2486    timeout_seconds: Option<u64>,
2487    poll_interval_millis: Option<u64>,
2488}
2489
2490#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2491struct GetTransactionRequest {
2492    network_name: String,
2493    node_id: u32,
2494    transaction_hash: String,
2495}
2496
2497#[derive(Debug, Deserialize)]
2498#[serde(rename_all = "snake_case")]
2499struct RestStatusProbe {
2500    reactor_state: Option<String>,
2501}
2502
2503fn to_mcp_error(err: impl std::fmt::Display) -> ErrorData {
2504    ErrorData::internal_error(err.to_string(), None)
2505}
2506
2507fn internal_serde_error(err: serde_json::Error) -> ErrorData {
2508    ErrorData::internal_error(format!("serde error: {}", err), None)
2509}
2510
2511fn ok_value(value: Value) -> rmcp::model::CallToolResult {
2512    rmcp::model::CallToolResult::structured(value)
2513}
2514
2515fn parse_transaction_json(value: Value) -> std::result::Result<Transaction, ErrorData> {
2516    if let Value::String(encoded) = &value {
2517        let _ = encoded;
2518        return Err(ErrorData::invalid_params(
2519            "invalid transaction payload: encoded JSON strings are not supported. Provide transaction as a typed JSON object",
2520            None,
2521        ));
2522    }
2523
2524    if let Ok(transaction) = serde_json::from_value::<Transaction>(value.clone()) {
2525        return Ok(transaction);
2526    }
2527
2528    if let Some(inner) = value.get("transaction") {
2529        return parse_transaction_json(inner.clone());
2530    }
2531
2532    Err(ErrorData::invalid_params(
2533        "failed to parse transaction JSON; expected Transaction object or { transaction: Transaction }, with typed JSON values only.",
2534        None,
2535    ))
2536}
2537
2538fn parse_transaction_hash_input(input: &str) -> Result<TransactionHash> {
2539    let value = input.trim();
2540    if value.is_empty() {
2541        return Err(anyhow!("transaction_hash must not be empty"));
2542    }
2543
2544    let unwrapped = value
2545        .strip_prefix("transaction-v1-hash(")
2546        .and_then(|inner| inner.strip_suffix(')'))
2547        .or_else(|| {
2548            value
2549                .strip_prefix("deploy-hash(")
2550                .and_then(|inner| inner.strip_suffix(')'))
2551        })
2552        .unwrap_or(value);
2553
2554    if unwrapped.contains("..") {
2555        return Err(anyhow!(
2556            "transaction_hash appears abbreviated; provide full hex digest"
2557        ));
2558    }
2559
2560    let normalized = unwrapped.strip_prefix("0x").unwrap_or(unwrapped);
2561    let digest = Digest::from_hex(normalized)
2562        .map_err(|err| anyhow!("failed to parse transaction hash digest: {}", err))?;
2563    Ok(TransactionHash::from(TransactionV1Hash::from(digest)))
2564}
2565
2566fn mcp_rpc_client(network_name: &str, node_id: u32) -> Result<CasperClient> {
2567    CasperClient::new(
2568        network_name.to_string(),
2569        vec![assets::rpc_endpoint(node_id)],
2570    )
2571    .map_err(|err| {
2572        anyhow!(
2573            "failed to initialize rpc client for node {}: {}",
2574            node_id,
2575            err
2576        )
2577    })
2578}
2579
2580fn extract_rpc_result(response: Value) -> Result<Value> {
2581    if let Some(error) = response.get("error") {
2582        return Err(anyhow!("rpc request failed: {}", error));
2583    }
2584    response
2585        .get("result")
2586        .cloned()
2587        .ok_or_else(|| anyhow!("rpc response missing result field"))
2588}
2589
2590async fn fetch_block_result(
2591    manager: &NetworkManager,
2592    network_name: &str,
2593    node_id: u32,
2594    block_id: Option<&str>,
2595) -> Result<Value> {
2596    let block_id = normalize_optional_identifier(block_id);
2597    if block_id.is_empty() {
2598        let rpc = mcp_rpc_client(network_name, node_id)?;
2599        let result = rpc.get_block().await?;
2600        return serde_json::to_value(result).map_err(Into::into);
2601    }
2602
2603    let block_identifier = parse_block_identifier_value(&block_id)?;
2604    let response = manager
2605        .raw_rpc_query(
2606            node_id,
2607            "chain_get_block",
2608            Some(json!({
2609                "block_identifier": block_identifier
2610            })),
2611        )
2612        .await?;
2613    extract_rpc_result(response)
2614}
2615
2616fn build_query_global_state_params(
2617    key: &str,
2618    path: Vec<String>,
2619    block_id: &str,
2620    state_root_hash: &str,
2621) -> Result<Value> {
2622    let key = parse_query_key(key)?;
2623    let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2624    let mut params = serde_json::Map::new();
2625    params.insert("key".to_string(), Value::String(key));
2626    params.insert(
2627        "path".to_string(),
2628        Value::Array(path.into_iter().map(Value::String).collect()),
2629    );
2630    if let Some(state_identifier) = state_identifier {
2631        params.insert("state_identifier".to_string(), state_identifier);
2632    }
2633    Ok(Value::Object(params))
2634}
2635
2636fn build_query_balance_params(
2637    purse_identifier: &str,
2638    block_id: &str,
2639    state_root_hash: &str,
2640) -> Result<Value> {
2641    let purse_identifier = parse_purse_identifier(purse_identifier)?;
2642    let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2643    let mut params = serde_json::Map::new();
2644    params.insert("purse_identifier".to_string(), purse_identifier);
2645    if let Some(state_identifier) = state_identifier {
2646        params.insert("state_identifier".to_string(), state_identifier);
2647    }
2648    Ok(Value::Object(params))
2649}
2650
2651fn parse_state_identifier(block_id: &str, state_root_hash: &str) -> Result<Option<Value>> {
2652    let block_id = block_id.trim();
2653    if !block_id.is_empty() {
2654        if block_id.len() == Digest::LENGTH * 2 {
2655            Digest::from_hex(block_id)
2656                .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2657            return Ok(Some(json!({
2658                "BlockHash": block_id,
2659            })));
2660        }
2661
2662        let height = block_id
2663            .parse::<u64>()
2664            .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2665        return Ok(Some(json!({
2666            "BlockHeight": height,
2667        })));
2668    }
2669
2670    let state_root_hash = state_root_hash.trim();
2671    if state_root_hash.is_empty() {
2672        return Ok(None);
2673    }
2674    Digest::from_hex(state_root_hash)
2675        .map_err(|err| anyhow!("invalid state_root_hash digest: {}", err))?;
2676    Ok(Some(json!({
2677        "StateRootHash": state_root_hash,
2678    })))
2679}
2680
2681fn parse_block_identifier_value(block_id: &str) -> Result<Value> {
2682    let block_id = block_id.trim();
2683    if block_id.is_empty() {
2684        return Err(anyhow!("block identifier must not be empty"));
2685    }
2686    if block_id.len() == Digest::LENGTH * 2 {
2687        Digest::from_hex(block_id)
2688            .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2689        Ok(json!({
2690            "Hash": block_id,
2691        }))
2692    } else {
2693        let height = block_id
2694            .parse::<u64>()
2695            .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2696        Ok(json!({
2697            "Height": height,
2698        }))
2699    }
2700}
2701
2702fn parse_query_key(key: &str) -> Result<String> {
2703    let key = key.trim();
2704    if key.is_empty() {
2705        return Err(anyhow!("key must not be empty"));
2706    }
2707    if let Ok(contract_hash) = ContractHash::from_formatted_str(key) {
2708        return Ok(Key::Hash(contract_hash.value()).to_formatted_string());
2709    }
2710    if let Ok(parsed) = Key::from_formatted_str(key) {
2711        return Ok(parsed.to_formatted_string());
2712    }
2713    if let Ok(public_key) = PublicKey::from_hex(key) {
2714        return Ok(Key::Account(public_key.to_account_hash()).to_formatted_string());
2715    }
2716    Err(anyhow!(
2717        "failed to parse key for query; expected formatted Key or public key hex"
2718    ))
2719}
2720
2721fn parse_contract_hash_for_invocation(input: &str) -> Result<AddressableEntityHash> {
2722    let input = input.trim();
2723    if input.is_empty() {
2724        return Err(anyhow!("transaction_contract_hash must not be empty"));
2725    }
2726
2727    if let Ok(contract_hash) = ContractHash::from_formatted_str(input) {
2728        return Ok(contract_hash.into());
2729    }
2730
2731    let digest_hex = input
2732        .strip_prefix("hash-")
2733        .or_else(|| input.strip_prefix("contract-hash-"))
2734        .unwrap_or(input);
2735    let bytes = hex_to_bytes(digest_hex)?;
2736    if bytes.len() != Digest::LENGTH {
2737        return Err(anyhow!(
2738            "transaction_contract_hash must be {} bytes",
2739            Digest::LENGTH
2740        ));
2741    }
2742    let mut hash = [0u8; Digest::LENGTH];
2743    hash.copy_from_slice(&bytes);
2744    Ok(ContractHash::new(hash).into())
2745}
2746
2747fn parse_purse_identifier(input: &str) -> Result<Value> {
2748    let input = input.trim();
2749    if input.is_empty() {
2750        return Err(anyhow!("purse_identifier must not be empty"));
2751    }
2752
2753    if input.starts_with("account-hash-") {
2754        casper_types::account::AccountHash::from_formatted_str(input)
2755            .map_err(|err| anyhow!("invalid account hash purse identifier: {}", err))?;
2756        return Ok(json!({
2757            "main_purse_under_account_hash": input,
2758        }));
2759    }
2760
2761    if input.starts_with("entity-") {
2762        return Ok(json!({
2763            "main_purse_under_entity_addr": input,
2764        }));
2765    }
2766
2767    if input.starts_with("uref-") {
2768        URef::from_formatted_str(input)
2769            .map_err(|err| anyhow!("invalid uref purse identifier: {}", err))?;
2770        return Ok(json!({
2771            "purse_uref": input,
2772        }));
2773    }
2774
2775    let public_key = PublicKey::from_hex(input)
2776        .map_err(|err| anyhow!("invalid public key purse identifier: {}", err))?;
2777    Ok(json!({
2778        "main_purse_under_public_key": public_key.to_hex_string(),
2779    }))
2780}
2781
2782fn build_pricing_mode(gas_price_tolerance: Option<u8>, payment_amount: Option<u64>) -> PricingMode {
2783    if let Some(payment_amount) = payment_amount {
2784        PricingMode::PaymentLimited {
2785            payment_amount,
2786            gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2787            standard_payment: true,
2788        }
2789    } else {
2790        PricingMode::Fixed {
2791            gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2792            additional_computation_factor: 0,
2793        }
2794    }
2795}
2796
2797fn parse_optional_seed_hex(seed: Option<&str>) -> Result<Option<[u8; 32]>> {
2798    let Some(seed) = seed else {
2799        return Ok(None);
2800    };
2801    let seed = seed.trim();
2802    if seed.is_empty() {
2803        return Ok(None);
2804    }
2805
2806    let cleaned = seed.strip_prefix("0x").unwrap_or(seed);
2807    let bytes = hex_to_bytes(cleaned)?;
2808    if bytes.len() != 32 {
2809        return Err(anyhow!("runtime_seed_hex must be exactly 32 bytes"));
2810    }
2811
2812    let mut out = [0u8; 32];
2813    out.copy_from_slice(&bytes);
2814    Ok(Some(out))
2815}
2816
2817fn normalize_optional_identifier(value: Option<&str>) -> String {
2818    value.map(str::trim).unwrap_or_default().to_string()
2819}
2820
2821async fn resolve_global_state_identifier(
2822    network_name: &str,
2823    node_id: u32,
2824    block_id: Option<&str>,
2825    state_root_hash: Option<&str>,
2826) -> Result<(String, String)> {
2827    let block_id = normalize_optional_identifier(block_id);
2828    let state_root_hash = normalize_optional_identifier(state_root_hash);
2829
2830    if !block_id.is_empty() || !state_root_hash.is_empty() {
2831        return Ok((block_id, state_root_hash));
2832    }
2833
2834    let rpc = mcp_rpc_client(network_name, node_id)?;
2835    let response = rpc.get_block().await?;
2836    let latest_block = response
2837        .block_with_signatures
2838        .ok_or_else(|| anyhow!("latest block was not returned by chain_get_block"))?;
2839
2840    Ok((latest_block.block.hash().to_hex_string(), String::new()))
2841}
2842
2843fn hex_to_bytes(input: &str) -> Result<Vec<u8>> {
2844    if !input.len().is_multiple_of(2) {
2845        return Err(anyhow!("hex string must have even length"));
2846    }
2847    let mut out = Vec::with_capacity(input.len() / 2);
2848    let mut chars = input.chars();
2849    while let (Some(hi), Some(lo)) = (chars.next(), chars.next()) {
2850        let byte = ((hex_nibble(hi)? as u8) << 4) | (hex_nibble(lo)? as u8);
2851        out.push(byte);
2852    }
2853    Ok(out)
2854}
2855
2856fn hex_nibble(ch: char) -> Result<u32> {
2857    ch.to_digit(16)
2858        .ok_or_else(|| anyhow!("invalid hex character '{}'", ch))
2859}
2860
2861async fn ensure_running_network(
2862    network: &Arc<ManagedNetwork>,
2863) -> std::result::Result<(), ErrorData> {
2864    if network.is_running().await {
2865        Ok(())
2866    } else {
2867        Err(ErrorData::resource_not_found(
2868            "network is not running; call spawn_network then wait_network_ready",
2869            None,
2870        ))
2871    }
2872}
2873
2874async fn check_rest_ready(network: &Arc<ManagedNetwork>) -> Result<HashMap<u32, Value>> {
2875    let node_ids = {
2876        let state = network.state.lock().await;
2877        state
2878            .processes
2879            .iter()
2880            .filter_map(|process| {
2881                if matches!(process.kind, ProcessKind::Node) {
2882                    Some(process.node_id)
2883                } else {
2884                    None
2885                }
2886            })
2887            .collect::<Vec<_>>()
2888    };
2889
2890    if node_ids.is_empty() {
2891        return Err(anyhow!("network has no node processes"));
2892    }
2893
2894    let mut by_node = HashMap::new();
2895    for node_id in node_ids {
2896        let value = fetch_rest_status(node_id).await?;
2897        let probe: RestStatusProbe = serde_json::from_value(value.clone())
2898            .with_context(|| format!("invalid /status payload for node {}", node_id))?;
2899        if probe.reactor_state.as_deref() != Some("Validate") {
2900            return Err(anyhow!("node {} reactor is not Validate", node_id));
2901        }
2902        by_node.insert(node_id, value);
2903    }
2904
2905    Ok(by_node)
2906}
2907
2908fn rest_has_block(value: &Value) -> bool {
2909    value
2910        .get("last_added_block_info")
2911        .and_then(|entry| entry.get("height"))
2912        .and_then(Value::as_u64)
2913        .is_some()
2914}
2915
2916async fn fetch_rest_status(node_id: u32) -> Result<Value> {
2917    let url = format!("{}/status", assets::rest_endpoint(node_id));
2918    let client = reqwest::Client::builder()
2919        .no_proxy()
2920        .timeout(Duration::from_secs(4))
2921        .build()?;
2922    let response = client.get(url).send().await?.error_for_status()?;
2923    Ok(response.json::<Value>().await?)
2924}
2925
2926async fn claim_block_hook(network: &Arc<ManagedNetwork>, block_hash: &str) -> bool {
2927    let mut last_block_hook_hash = network.last_block_hook_hash.lock().await;
2928    if last_block_hook_hash.as_deref() == Some(block_hash) {
2929        return false;
2930    }
2931    *last_block_hook_hash = Some(block_hash.to_string());
2932    true
2933}
2934
2935async fn run_sse_listener(network: Arc<ManagedNetwork>, node_id: u32, endpoint: String) {
2936    let mut backoff = ExponentialBackoff::default();
2937    let mut connection_version: Option<String> = None;
2938
2939    loop {
2940        if network.shutdown.load(Ordering::SeqCst) {
2941            return;
2942        }
2943
2944        let config = match ListenerConfig::builder()
2945            .with_endpoint(endpoint.clone())
2946            .build()
2947        {
2948            Ok(config) => config,
2949            Err(_) => {
2950                if !sleep_backoff(&mut backoff).await {
2951                    return;
2952                }
2953                continue;
2954            }
2955        };
2956
2957        let stream = match sse::listener(config).await {
2958            Ok(stream) => {
2959                backoff.reset();
2960                stream
2961            }
2962            Err(_) => {
2963                if !sleep_backoff(&mut backoff).await {
2964                    return;
2965                }
2966                continue;
2967            }
2968        };
2969
2970        futures::pin_mut!(stream);
2971        let mut failed = false;
2972
2973        while let Some(event) = stream.next().await {
2974            if network.shutdown.load(Ordering::SeqCst) {
2975                return;
2976            }
2977
2978            match event {
2979                Ok(event) => {
2980                    if let SseEvent::ApiVersion(version) = &event {
2981                        connection_version = Some(version.to_string());
2982                    }
2983                    if let SseEvent::BlockAdded { block_hash, block } = &event {
2984                        let _ = record_last_block_height(&network.state, block.height()).await;
2985                        assets::spawn_pending_post_genesis_hook(network.layout.clone());
2986                        if let Some(protocol_version) = connection_version.as_deref()
2987                            && claim_block_hook(&network, &block_hash.to_string()).await
2988                        {
2989                            assets::spawn_block_added_hook(
2990                                network.layout.clone(),
2991                                protocol_version.to_string(),
2992                                json!({
2993                                    "block_hash": block_hash.to_string(),
2994                                    "height": block.height(),
2995                                    "era_id": block.era_id().value(),
2996                                }),
2997                            );
2998                        }
2999                    }
3000                    network.sse_store.push(node_id, event).await;
3001                }
3002                Err(_) => {
3003                    failed = true;
3004                    break;
3005                }
3006            }
3007        }
3008
3009        connection_version = None;
3010        if failed && !sleep_backoff(&mut backoff).await {
3011            return;
3012        }
3013    }
3014}
3015
3016async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
3017    let mut state = state.lock().await;
3018    if state.last_block_height == Some(height) {
3019        return Ok(());
3020    }
3021    state.last_block_height = Some(height);
3022    state.touch().await
3023}
3024
3025async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
3026    if let Some(delay) = backoff.next_backoff() {
3027        tokio::time::sleep(delay).await;
3028        true
3029    } else {
3030        false
3031    }
3032}
3033
3034fn sse_event_type(event: &SseEvent) -> &'static str {
3035    match event {
3036        SseEvent::ApiVersion(_) => "ApiVersion",
3037        SseEvent::DeployAccepted(_) => "DeployAccepted",
3038        SseEvent::BlockAdded { .. } => "BlockAdded",
3039        SseEvent::DeployProcessed(_) => "DeployProcessed",
3040        SseEvent::DeployExpired(_) => "DeployExpired",
3041        SseEvent::TransactionAccepted(_) => "TransactionAccepted",
3042        SseEvent::TransactionProcessed { .. } => "TransactionProcessed",
3043        SseEvent::TransactionExpired { .. } => "TransactionExpired",
3044        SseEvent::Fault { .. } => "Fault",
3045        SseEvent::FinalitySignature(_) => "FinalitySignature",
3046        SseEvent::Step { .. } => "Step",
3047        SseEvent::Shutdown => "Shutdown",
3048    }
3049}
3050
3051fn process_kind_name(kind: &ProcessKind) -> &'static str {
3052    match kind {
3053        ProcessKind::Node => "node",
3054        ProcessKind::Sidecar => "sidecar",
3055    }
3056}
3057
3058fn process_status_name(status: &ProcessStatus) -> &'static str {
3059    match status {
3060        ProcessStatus::Running => "running",
3061        ProcessStatus::Stopped => "stopped",
3062        ProcessStatus::Exited => "exited",
3063        ProcessStatus::Unknown => "unknown",
3064        ProcessStatus::Skipped => "skipped",
3065    }
3066}
3067
3068fn sse_event_payload(event: &SseEvent) -> Value {
3069    match event {
3070        SseEvent::ApiVersion(version) => json!({ "api_version": version.to_string() }),
3071        SseEvent::DeployAccepted(payload) => payload.clone(),
3072        SseEvent::BlockAdded { block_hash, block } => json!({
3073            "block_hash": block_hash.to_string(),
3074            "height": block.height(),
3075            "era_id": block.era_id().value(),
3076        }),
3077        SseEvent::DeployProcessed(payload) => payload.clone(),
3078        SseEvent::DeployExpired(payload) => payload.clone(),
3079        SseEvent::TransactionAccepted(transaction) => {
3080            json!({ "transaction_hash": transaction.hash().to_hex_string() })
3081        }
3082        SseEvent::TransactionProcessed {
3083            transaction_hash,
3084            execution_result,
3085            messages,
3086            ..
3087        } => json!({
3088            "transaction_hash": transaction_hash.to_hex_string(),
3089            "execution_result": execution_result,
3090            "messages": messages,
3091        }),
3092        SseEvent::TransactionExpired { transaction_hash } => json!({
3093            "transaction_hash": transaction_hash.to_hex_string(),
3094        }),
3095        SseEvent::Fault {
3096            era_id,
3097            public_key,
3098            timestamp,
3099        } => json!({
3100            "era_id": era_id.value(),
3101            "public_key": public_key.to_hex(),
3102            "timestamp": timestamp,
3103        }),
3104        SseEvent::FinalitySignature(signature) => json!({
3105            "block_hash": signature.block_hash().to_string(),
3106            "era_id": signature.era_id().value(),
3107            "signature": signature.signature().to_hex(),
3108        }),
3109        SseEvent::Step {
3110            era_id,
3111            execution_effects,
3112        } => json!({
3113            "era_id": era_id.value(),
3114            "execution_effects": execution_effects.get(),
3115        }),
3116        SseEvent::Shutdown => json!({}),
3117    }
3118}
3119
3120fn timestamp_prefix() -> String {
3121    time::OffsetDateTime::now_utc()
3122        .format(&time::format_description::well_known::Rfc3339)
3123        .unwrap_or_else(|_| "unknown-time".to_string())
3124}
3125
3126fn processes_running(state: &State) -> bool {
3127    if state.processes.is_empty() {
3128        return false;
3129    }
3130
3131    state.processes.iter().all(|process| {
3132        matches!(process.last_status, ProcessStatus::Running)
3133            && process.current_pid().is_some_and(is_pid_running)
3134    })
3135}
3136
3137fn running_node_ids(state: &State) -> Vec<u32> {
3138    let mut node_ids = std::collections::BTreeSet::new();
3139    for process in &state.processes {
3140        if !matches!(process.kind, ProcessKind::Node) {
3141            continue;
3142        }
3143        if !matches!(process.last_status, ProcessStatus::Running) {
3144            continue;
3145        }
3146        let Some(pid) = process.current_pid() else {
3147            continue;
3148        };
3149        if !is_pid_running(pid) {
3150            continue;
3151        }
3152        node_ids.insert(process.node_id);
3153    }
3154    node_ids.into_iter().collect()
3155}
3156
3157fn is_pid_running(pid: u32) -> bool {
3158    let pid = Pid::from_raw(pid as i32);
3159    match kill(pid, None) {
3160        Ok(()) => true,
3161        Err(Errno::ESRCH) => false,
3162        Err(_) => true,
3163    }
3164}
3165
3166async fn discover_network_names(assets_root: &Path) -> Result<Vec<String>> {
3167    if !is_dir(assets_root).await {
3168        return Ok(Vec::new());
3169    }
3170
3171    let mut names = Vec::new();
3172    let mut entries = tokio_fs::read_dir(assets_root).await?;
3173    while let Some(entry) = entries.next_entry().await? {
3174        if !entry.file_type().await?.is_dir() {
3175            continue;
3176        }
3177        let name = entry.file_name().to_string_lossy().to_string();
3178        if !name.is_empty() {
3179            names.push(name);
3180        }
3181    }
3182
3183    names.sort();
3184    Ok(names)
3185}
3186
3187async fn ensure_sidecar_available(layout: &AssetsLayout, node_count: u32) -> Result<()> {
3188    for node_id in 1..=node_count {
3189        let version_dir = layout.latest_protocol_version_dir(node_id).await?;
3190        let sidecar_bin = layout
3191            .node_bin_dir(node_id)
3192            .join(&version_dir)
3193            .join("casper-sidecar");
3194        let sidecar_cfg = layout
3195            .node_config_root(node_id)
3196            .join(&version_dir)
3197            .join("sidecar.toml");
3198
3199        if !is_file(&sidecar_bin).await {
3200            return Err(anyhow!(
3201                "missing sidecar binary for node {}: {}",
3202                node_id,
3203                sidecar_bin.display()
3204            ));
3205        }
3206
3207        if !is_file(&sidecar_cfg).await {
3208            return Err(anyhow!(
3209                "missing sidecar.toml for node {}: {}",
3210                node_id,
3211                sidecar_cfg.display()
3212            ));
3213        }
3214    }
3215
3216    Ok(())
3217}
3218
3219async fn latest_layout_protocol_version(layout: &AssetsLayout) -> Result<String> {
3220    Ok(layout
3221        .latest_protocol_version_dir(1)
3222        .await?
3223        .replace('_', "."))
3224}
3225
3226fn stage_asset_selector(
3227    asset: Option<&str>,
3228    custom_asset: Option<&str>,
3229    legacy_asset_name: Option<&str>,
3230) -> Result<AssetSelector> {
3231    let custom_asset = match (custom_asset, legacy_asset_name) {
3232        (Some(_), Some(_)) => {
3233            return Err(anyhow!(
3234                "custom_asset and asset_name are mutually exclusive"
3235            ));
3236        }
3237        (Some(custom_asset), None) | (None, Some(custom_asset)) => Some(custom_asset),
3238        (None, None) => None,
3239    };
3240    assets::required_asset_selector(asset, custom_asset)
3241}
3242
3243async fn parse_derived_accounts_csv(
3244    csv: &str,
3245    seed: Option<Arc<str>>,
3246) -> Result<Vec<DerivedAccountRow>> {
3247    let mut lines = csv.lines();
3248    let header = lines
3249        .next()
3250        .ok_or_else(|| anyhow!("derived accounts csv is empty"))?;
3251    if header.trim() != "kind,name,key_type,derivation,path,account_hash,balance" {
3252        return Err(anyhow!("unexpected derived-accounts.csv header"));
3253    }
3254
3255    let mut rows = Vec::new();
3256    for line in lines {
3257        let line = line.trim();
3258        if line.is_empty() {
3259            continue;
3260        }
3261
3262        let parts = line.splitn(7, ',').collect::<Vec<_>>();
3263        if parts.len() != 7 {
3264            return Err(anyhow!("invalid derived account row: {}", line));
3265        }
3266
3267        let path = parts[4].to_string();
3268        let public_key = if let Some(seed) = &seed {
3269            match assets::derive_account_from_seed_path(Arc::clone(seed), &path).await {
3270                Ok(material) => Some(material.public_key_hex),
3271                Err(_) => None,
3272            }
3273        } else {
3274            None
3275        };
3276
3277        rows.push(DerivedAccountRow {
3278            kind: parts[0].to_string(),
3279            name: parts[1].to_string(),
3280            key_type: parts[2].to_string(),
3281            derivation: parts[3].to_string(),
3282            path,
3283            account_hash: parts[5].to_string(),
3284            balance: parts[6].to_string(),
3285            public_key,
3286        });
3287    }
3288
3289    Ok(rows)
3290}
3291
3292async fn verify_path_hash_consistency(
3293    layout: &AssetsLayout,
3294    path: &str,
3295    expected_account_hash: &str,
3296) -> Result<()> {
3297    let Some(csv) = assets::derived_accounts_summary(layout).await else {
3298        return Ok(());
3299    };
3300
3301    for line in csv.lines().skip(1) {
3302        let line = line.trim();
3303        if line.is_empty() {
3304            continue;
3305        }
3306        let parts = line.splitn(7, ',').collect::<Vec<_>>();
3307        if parts.len() != 7 {
3308            continue;
3309        }
3310        if parts[4] == path {
3311            if parts[5] != expected_account_hash {
3312                return Err(anyhow!(
3313                    "derived account hash mismatch for path {}: csv={} derived={}",
3314                    path,
3315                    parts[5],
3316                    expected_account_hash
3317                ));
3318            }
3319            return Ok(());
3320        }
3321    }
3322
3323    Ok(())
3324}
3325
3326async fn fetch_chain_name(network_name: &str, node_id: u32) -> Result<String> {
3327    let rpc = mcp_rpc_client(network_name, node_id)?;
3328    rpc.get_network_name().await.map_err(Into::into)
3329}
3330
3331fn extract_block_height(value: &Value) -> Option<u64> {
3332    value
3333        .pointer("/block_with_signatures/block/header/height")
3334        .and_then(Value::as_u64)
3335        .or_else(|| {
3336            value
3337                .pointer("/block/block/header/height")
3338                .and_then(Value::as_u64)
3339        })
3340        .or_else(|| {
3341            value
3342                .pointer("/block_with_signatures/Version2/block/Version2/header/height")
3343                .and_then(Value::as_u64)
3344        })
3345        .or_else(|| {
3346            value
3347                .pointer("/block_with_signatures/Version1/block/Version1/header/height")
3348                .and_then(Value::as_u64)
3349        })
3350        .or_else(|| find_first_height(value))
3351}
3352
3353fn find_first_height(value: &Value) -> Option<u64> {
3354    match value {
3355        Value::Object(map) => {
3356            if let Some(height) = map.get("height").and_then(Value::as_u64) {
3357                return Some(height);
3358            }
3359            for nested in map.values() {
3360                if let Some(height) = find_first_height(nested) {
3361                    return Some(height);
3362                }
3363            }
3364            None
3365        }
3366        Value::Array(items) => {
3367            for item in items {
3368                if let Some(height) = find_first_height(item) {
3369                    return Some(height);
3370                }
3371            }
3372            None
3373        }
3374        _ => None,
3375    }
3376}
3377
3378fn parse_no_such_block_range_from_error(error_text: &str) -> Option<(u64, u64)> {
3379    let start = error_text.find('{')?;
3380    let payload = &error_text[start..];
3381    let value: Value = serde_json::from_str(payload).ok()?;
3382
3383    let code = value.get("code").and_then(Value::as_i64)?;
3384    let message = value.get("message").and_then(Value::as_str)?;
3385    if code != -32001 || !message.eq_ignore_ascii_case("No such block") {
3386        return None;
3387    }
3388
3389    let low = value
3390        .pointer("/data/available_block_range/low")
3391        .and_then(Value::as_u64)
3392        .unwrap_or(0);
3393    let high = value
3394        .pointer("/data/available_block_range/high")
3395        .and_then(Value::as_u64)
3396        .unwrap_or(low);
3397    Some((low, high))
3398}
3399
3400async fn read_log_page(path: &Path, before_line: Option<usize>, limit: usize) -> Result<LogPage> {
3401    let contents = tokio_fs::read_to_string(path)
3402        .await
3403        .with_context(|| format!("failed to read log file {}", path.display()))?;
3404
3405    let all_lines = contents
3406        .lines()
3407        .map(ToString::to_string)
3408        .collect::<Vec<_>>();
3409    let total_lines = all_lines.len();
3410
3411    let before = before_line.unwrap_or(total_lines + 1);
3412    if before == 0 {
3413        return Err(anyhow!("before_line must be >= 1"));
3414    }
3415
3416    let end_exclusive = before.saturating_sub(1).min(total_lines);
3417    let start = end_exclusive.saturating_sub(limit);
3418
3419    let mut lines = Vec::new();
3420    for (idx, content) in all_lines[start..end_exclusive].iter().enumerate() {
3421        lines.push(LogLine {
3422            line_number: start + idx + 1,
3423            content: content.clone(),
3424        });
3425    }
3426
3427    let next_before_line = if start == 0 { None } else { Some(start + 1) };
3428
3429    Ok(LogPage {
3430        path: path.display().to_string(),
3431        total_lines,
3432        returned: lines.len(),
3433        next_before_line,
3434        lines,
3435    })
3436}
3437
3438async fn is_dir(path: &Path) -> bool {
3439    tokio_fs::metadata(path)
3440        .await
3441        .map(|meta| meta.is_dir())
3442        .unwrap_or(false)
3443}
3444
3445async fn is_file(path: &Path) -> bool {
3446    tokio_fs::metadata(path)
3447        .await
3448        .map(|meta| meta.is_file())
3449        .unwrap_or(false)
3450}
3451
3452pub async fn run(args: McpArgs) -> Result<()> {
3453    let assets_root = match args.net_path {
3454        Some(path) => path,
3455        None => assets::default_assets_root()?,
3456    };
3457
3458    let manager = Arc::new(NetworkManager::new(assets_root).await?);
3459
3460    let result = match args.transport {
3461        McpTransport::Stdio => run_stdio(manager.clone()).await,
3462        McpTransport::Http => run_http(manager.clone(), &args.http_bind, &args.http_path).await,
3463        McpTransport::Both => run_both(manager.clone(), &args.http_bind, &args.http_path).await,
3464    };
3465
3466    let stop_result = manager.stop_all_networks().await;
3467
3468    match (result, stop_result) {
3469        (Ok(()), Ok(())) => Ok(()),
3470        (Err(err), Ok(())) => Err(err),
3471        (Ok(()), Err(stop_err)) => Err(stop_err),
3472        (Err(run_err), Err(stop_err)) => Err(anyhow!(
3473            "mcp server failed: {run_err}; additionally failed to stop networks: {stop_err}"
3474        )),
3475    }
3476}
3477
3478async fn run_stdio(manager: Arc<NetworkManager>) -> Result<()> {
3479    let service = McpServer::new(manager).serve(mcp_stdio()).await?;
3480    service.waiting().await?;
3481    Ok(())
3482}
3483
3484async fn run_http(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3485    let path = normalize_http_path(path);
3486    let socket = std::net::SocketAddr::from_str(bind)
3487        .with_context(|| format!("invalid http bind address '{}'", bind))?;
3488
3489    let service: StreamableHttpService<McpServer, LocalSessionManager> = StreamableHttpService::new(
3490        {
3491            let manager = manager.clone();
3492            move || Ok(McpServer::new(manager.clone()))
3493        },
3494        Arc::new(LocalSessionManager::default()),
3495        StreamableHttpServerConfig::default(),
3496    );
3497
3498    let router = axum::Router::new().nest_service(&path, service);
3499    let listener = tokio::net::TcpListener::bind(socket).await?;
3500
3501    axum::serve(listener, router)
3502        .with_graceful_shutdown(async {
3503            let _ = tokio::signal::ctrl_c().await;
3504        })
3505        .await?;
3506
3507    Ok(())
3508}
3509
3510async fn run_both(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3511    let path = normalize_http_path(path);
3512    let socket = std::net::SocketAddr::from_str(bind)
3513        .with_context(|| format!("invalid http bind address '{}'", bind))?;
3514
3515    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
3516
3517    let mut http_task = {
3518        let manager = manager.clone();
3519        tokio::spawn(async move {
3520            let service: StreamableHttpService<McpServer, LocalSessionManager> =
3521                StreamableHttpService::new(
3522                    {
3523                        let manager = manager.clone();
3524                        move || Ok(McpServer::new(manager.clone()))
3525                    },
3526                    Arc::new(LocalSessionManager::default()),
3527                    StreamableHttpServerConfig::default(),
3528                );
3529
3530            let router = axum::Router::new().nest_service(&path, service);
3531            let listener = tokio::net::TcpListener::bind(socket).await?;
3532
3533            axum::serve(listener, router)
3534                .with_graceful_shutdown(async move {
3535                    tokio::select! {
3536                        _ = async {
3537                            loop {
3538                                if *shutdown_rx.borrow() {
3539                                    break;
3540                                }
3541                                if shutdown_rx.changed().await.is_err() {
3542                                    break;
3543                                }
3544                            }
3545                        } => {}
3546                        _ = tokio::signal::ctrl_c() => {}
3547                    }
3548                })
3549                .await
3550                .map_err(anyhow::Error::from)
3551        })
3552    };
3553
3554    let mut stdio_task = tokio::spawn(async move { run_stdio(manager).await });
3555
3556    let result = tokio::select! {
3557        res = &mut stdio_task => match res {
3558            Ok(inner) => inner,
3559            Err(join_err) => Err(anyhow!("stdio task failed: {}", join_err)),
3560        },
3561        res = &mut http_task => match res {
3562            Ok(inner) => inner,
3563            Err(join_err) => Err(anyhow!("http task failed: {}", join_err)),
3564        },
3565    };
3566
3567    let _ = shutdown_tx.send(true);
3568    let _ = tokio::time::timeout(Duration::from_secs(2), &mut http_task).await;
3569    let _ = tokio::time::timeout(Duration::from_secs(2), &mut stdio_task).await;
3570
3571    result
3572}
3573
3574fn normalize_http_path(path: &str) -> String {
3575    if path.starts_with('/') {
3576        path.to_string()
3577    } else {
3578        format!("/{}", path)
3579    }
3580}
3581
3582#[cfg(test)]
3583mod tests {
3584    use super::*;
3585
3586    #[tokio::test]
3587    async fn parse_derived_accounts_rows() {
3588        let csv = "kind,name,key_type,derivation,path,account_hash,balance\nvalidator,node-1,secp256k1,bip32,m/44'/506'/0'/0/0,account-hash-a,100\nuser,user-1,secp256k1,bip32,m/44'/506'/0'/0/100,account-hash-b,200";
3589        let rows = parse_derived_accounts_csv(csv, None).await.unwrap();
3590        assert_eq!(rows.len(), 2);
3591        assert_eq!(rows[0].path, "m/44'/506'/0'/0/0");
3592        assert_eq!(rows[1].account_hash, "account-hash-b");
3593    }
3594
3595    #[test]
3596    fn sse_filter_include_exclude() {
3597        let filter = SseFilter {
3598            include_event_types: vec!["BlockAdded".to_string()],
3599            exclude_event_types: vec!["TransactionAccepted".to_string()],
3600        };
3601
3602        let block = SseRecord {
3603            sequence: 1,
3604            timestamp_rfc3339: "t".to_string(),
3605            node_id: 1,
3606            event_type: "BlockAdded".to_string(),
3607            payload: json!({}),
3608        };
3609        let tx = SseRecord {
3610            sequence: 2,
3611            timestamp_rfc3339: "t".to_string(),
3612            node_id: 1,
3613            event_type: "TransactionAccepted".to_string(),
3614            payload: json!({}),
3615        };
3616
3617        assert!(filter.matches(&block));
3618        assert!(!filter.matches(&tx));
3619    }
3620
3621    #[tokio::test]
3622    async fn sse_history_paginates_by_sequence() {
3623        let store = SseStore::new(100);
3624        for sequence in 1..=5 {
3625            let mut guard = store.events.lock().await;
3626            guard.push_back(SseRecord {
3627                sequence,
3628                timestamp_rfc3339: "t".to_string(),
3629                node_id: 1,
3630                event_type: "BlockAdded".to_string(),
3631                payload: json!({ "sequence": sequence }),
3632            });
3633            drop(guard);
3634        }
3635        let filter = SseFilter::default();
3636        let page = store.history(Some(6), 2, &filter).await;
3637        assert_eq!(page.returned, 2);
3638        assert_eq!(page.events[0].sequence, 4);
3639        assert_eq!(page.events[1].sequence, 5);
3640        assert_eq!(page.next_before_sequence, Some(4));
3641    }
3642
3643    #[tokio::test]
3644    async fn read_log_page_uses_before_cursor() {
3645        let temp = tempfile::NamedTempFile::new().unwrap();
3646        tokio_fs::write(temp.path(), "a\nb\nc\nd\n").await.unwrap();
3647
3648        let page = read_log_page(temp.path(), Some(5), 2).await.unwrap();
3649        assert_eq!(page.lines.len(), 2);
3650        assert_eq!(page.lines[0].line_number, 3);
3651        assert_eq!(page.lines[1].line_number, 4);
3652        assert_eq!(page.next_before_line, Some(3));
3653    }
3654
3655    #[tokio::test]
3656    async fn sidecar_preflight_fails_when_missing() {
3657        let root = tempfile::tempdir().unwrap();
3658        let layout = AssetsLayout::new(root.path().to_path_buf(), "test-net".to_string());
3659        let node_bin = layout.node_bin_dir(1).join("2_0_0");
3660        let node_cfg = layout.node_config_root(1).join("2_0_0");
3661        tokio_fs::create_dir_all(&node_bin).await.unwrap();
3662        tokio_fs::create_dir_all(&node_cfg).await.unwrap();
3663        tokio_fs::write(node_bin.join("casper-node"), "bin")
3664            .await
3665            .unwrap();
3666        tokio_fs::write(node_cfg.join("sidecar.toml"), "cfg")
3667            .await
3668            .unwrap();
3669
3670        let result = ensure_sidecar_available(&layout, 1).await;
3671        assert!(result.is_err());
3672    }
3673
3674    #[test]
3675    fn normalize_optional_identifier_trims_and_defaults() {
3676        assert_eq!(normalize_optional_identifier(None), "");
3677        assert_eq!(normalize_optional_identifier(Some("   ")), "");
3678        assert_eq!(normalize_optional_identifier(Some(" 123 ")), "123");
3679    }
3680
3681    #[test]
3682    fn parse_state_identifier_variants() {
3683        assert_eq!(
3684            parse_state_identifier("42", "").unwrap(),
3685            Some(json!({ "BlockHeight": 42u64 }))
3686        );
3687        assert_eq!(
3688            parse_state_identifier(
3689                "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234",
3690                ""
3691            )
3692            .unwrap(),
3693            Some(json!({
3694                "BlockHash": "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234"
3695            }))
3696        );
3697        assert_eq!(parse_state_identifier("", "").unwrap(), None,);
3698    }
3699
3700    #[test]
3701    fn parse_contract_hash_for_invocation_accepts_common_formats() {
3702        let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3703        let contract = parse_contract_hash_for_invocation(&format!("contract-{}", hash)).unwrap();
3704        let key_hash = parse_contract_hash_for_invocation(&format!("hash-{}", hash)).unwrap();
3705        let raw = parse_contract_hash_for_invocation(hash).unwrap();
3706        assert_eq!(contract.to_hex_string(), hash);
3707        assert_eq!(key_hash.to_hex_string(), hash);
3708        assert_eq!(raw.to_hex_string(), hash);
3709    }
3710
3711    #[test]
3712    fn parse_query_key_accepts_contract_hash_format() {
3713        let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3714        let key = parse_query_key(&format!("contract-{}", hash)).unwrap();
3715        assert_eq!(key, format!("hash-{}", hash));
3716    }
3717
3718    #[test]
3719    fn parse_transaction_json_rejects_escaped_string_payload() {
3720        let tx_json = json!({
3721            "Version1": {
3722                "hash": "7eeb092361e31b4cc9885e3621f1470f29631338ecc703643c22da1d38fd81a9",
3723                "payload": {
3724                    "initiator_addr": {
3725                        "PublicKey": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f"
3726                    },
3727                    "timestamp": "2026-02-27T18:03:18.541Z",
3728                    "ttl": "30m",
3729                    "chain_name": "casper-devnet",
3730                    "pricing_mode": {
3731                        "PaymentLimited": {
3732                            "payment_amount": 100000000000u64,
3733                            "gas_price_tolerance": 5,
3734                            "standard_payment": true
3735                        }
3736                    },
3737                    "fields": {
3738                        "args": {"Named": []},
3739                        "entry_point": {"Custom": "counter_inc"},
3740                        "scheduling": "Standard",
3741                        "target": {
3742                            "Stored": {
3743                                "id": {
3744                                    "ByPackageName": {
3745                                        "name": "counter_package_name",
3746                                        "version": null
3747                                    }
3748                                },
3749                                "runtime": "VmCasperV1"
3750                            }
3751                        }
3752                    }
3753                },
3754                "approvals": [{
3755                    "signer": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f",
3756                    "signature": "02c64336e5ed2832bdb84adb3f334d585548ee096066aa9d0797c11ab3f074ec9d7bd396994bc9b9c239342be801bc385a9c5083779bace4dfe0b400d4a13c07db"
3757                }]
3758            }
3759        });
3760        let direct = parse_transaction_json(tx_json.clone()).unwrap();
3761        let wrapped = parse_transaction_json(json!({ "transaction": tx_json })).unwrap();
3762        let encoded = serde_json::to_string(&direct).unwrap();
3763        let from_string_err = parse_transaction_json(Value::String(encoded.clone()));
3764        let wrapped_string_err = parse_transaction_json(json!({
3765            "transaction": encoded
3766        }));
3767
3768        assert_eq!(
3769            direct.hash().to_hex_string(),
3770            wrapped.hash().to_hex_string()
3771        );
3772        assert!(from_string_err.is_err());
3773        assert!(wrapped_string_err.is_err());
3774    }
3775
3776    #[test]
3777    fn parse_no_such_block_range_from_error_extracts_bounds() {
3778        let error = "casper client error: response for rpc-id 1 chain_get_block is json-rpc error: {\"code\":-32001,\"message\":\"No such block\",\"data\":{\"message\":\"no block found for the provided identifier\",\"available_block_range\":{\"low\":0,\"high\":0}}}";
3779        let range = parse_no_such_block_range_from_error(error).unwrap();
3780        assert_eq!(range, (0, 0));
3781    }
3782
3783    #[test]
3784    fn send_transaction_signed_request_requires_transaction_field() {
3785        let payload = json!({
3786            "network_name": "casper-devnet",
3787            "node_id": 1,
3788            "signer_path": "m/44'/506'/0'/0/100",
3789            "transaction": { "Version1": { "hash": "abc" } }
3790        });
3791        let request: SendTransactionSignedRequest = serde_json::from_value(payload).unwrap();
3792        assert!(request.transaction.get("Version1").is_some());
3793
3794        let legacy_payload = json!({
3795            "network_name": "casper-devnet",
3796            "node_id": 1,
3797            "signer_path": "m/44'/506'/0'/0/100",
3798            "transaction_json": { "Version1": { "hash": "def" } }
3799        });
3800        let legacy_err = serde_json::from_value::<SendTransactionSignedRequest>(legacy_payload)
3801            .unwrap_err()
3802            .to_string();
3803        assert!(legacy_err.contains("missing field `transaction`"));
3804    }
3805}