Skip to main content

casper_devnet/
mcp.rs

1use self::args_parser::parse_session_args;
2use crate::assets::{
3    self, AddNodesOptions, AssetSelector, AssetsLayout, SetupOptions, StageProtocolOptions,
4};
5use crate::control::{ControlRequest, ControlResponse, ControlResult, send_request};
6use crate::process::{self, StartPlan};
7use crate::state::{
8    ProcessKind, ProcessStatus, STATE_FILE_NAME, State, spawn_pid_sync_tasks,
9    spawn_pid_sync_tasks_for_ids,
10};
11use anyhow::{Context, Result, anyhow};
12use backoff::ExponentialBackoff;
13use backoff::backoff::Backoff;
14use casper_types::contracts::ContractHash;
15use casper_types::{
16    AddressableEntityHash, AsymmetricType, Digest, EntityVersion, Key, PricingMode, PublicKey,
17    SecretKey, TimeDiff, Transaction, TransactionHash, TransactionRuntimeParams, TransactionV1Hash,
18    URef,
19};
20use clap::{Args, ValueEnum};
21use futures::StreamExt;
22use nix::errno::Errno;
23use nix::sys::signal::kill;
24use nix::unistd::Pid;
25use rmcp::handler::server::{router::tool::ToolRouter, wrapper::Parameters};
26use rmcp::model::{CallToolResult, ServerCapabilities, ServerInfo};
27use rmcp::service::ServiceExt;
28use rmcp::transport::{
29    StreamableHttpServerConfig,
30    streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService},
31};
32use rmcp::{
33    ErrorData, ServerHandler, tool, tool_handler, tool_router, transport::stdio as mcp_stdio,
34};
35use serde::{Deserialize, Serialize};
36use serde_json::{Value, json};
37use std::collections::{HashMap, VecDeque};
38use std::path::{Path, PathBuf};
39use std::str::FromStr;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
42use std::time::{Duration, Instant};
43use tokio::fs as tokio_fs;
44use tokio::io::{AsyncReadExt, AsyncWriteExt};
45use tokio::net::UnixListener;
46use tokio::sync::{Mutex, Notify};
47use veles_casper_rust_sdk::TransactionV1Builder;
48use veles_casper_rust_sdk::jsonrpc::CasperClient;
49use veles_casper_rust_sdk::sse::event::SseEvent;
50use veles_casper_rust_sdk::sse::{self, config::ListenerConfig};
51
52const DEFAULT_HTTP_BIND: &str = "127.0.0.1:32100";
53const DEFAULT_HTTP_PATH: &str = "/mcp";
54const DEFAULT_NETWORK_NAME: &str = "casper-dev";
55const DEFAULT_NODE_COUNT: u32 = 4;
56const DEFAULT_DELAY_SECS: u64 = 3;
57const DEFAULT_LOG_LEVEL: &str = "info";
58const DEFAULT_NODE_LOG_FORMAT: &str = "json";
59const DEFAULT_SEED: &str = "default";
60const DEFAULT_TIMEOUT_SECS: u64 = 60;
61const DEFAULT_LOG_PAGE_SIZE: usize = 200;
62const DEFAULT_SSE_PAGE_SIZE: usize = 200;
63const DEFAULT_SSE_HISTORY_CAPACITY: usize = 20_000;
64const DEFAULT_PAYMENT_AMOUNT: u64 = 100_000_000_000;
65
66mod args_parser;
67
68#[derive(Debug, Clone, Copy, ValueEnum)]
69#[value(rename_all = "kebab-case")]
70pub enum McpTransport {
71    Stdio,
72    Http,
73    Both,
74}
75
76#[derive(Debug, Args, Clone)]
77pub struct McpArgs {
78    #[arg(long, value_enum, default_value_t = McpTransport::Both)]
79    transport: McpTransport,
80
81    #[arg(long, default_value = DEFAULT_HTTP_BIND)]
82    http_bind: String,
83
84    #[arg(long, default_value = DEFAULT_HTTP_PATH)]
85    http_path: String,
86
87    #[arg(long, value_name = "PATH")]
88    net_path: Option<PathBuf>,
89}
90
91#[derive(Debug, Clone)]
92struct McpServer {
93    manager: Arc<NetworkManager>,
94    tool_router: ToolRouter<Self>,
95}
96
97impl McpServer {
98    fn new(manager: Arc<NetworkManager>) -> Self {
99        Self {
100            manager,
101            tool_router: Self::tool_router(),
102        }
103    }
104
105    fn server_info() -> ServerInfo {
106        ServerInfo {
107            instructions: Some(
108                "Control multiple local Casper devnets. Start with spawn_network, then wait_network_ready before RPC or transaction tools. Do not use external casper-client binaries or curl; use MCP tools directly (for example get_transaction, wait_transaction, make_transaction_package_call, make_transaction_contract_call, make_transaction_session_wasm, send_transaction_signed).".to_string(),
109            ),
110            capabilities: ServerCapabilities::builder().enable_tools().build(),
111            ..Default::default()
112        }
113    }
114}
115
116#[tool_router]
117impl McpServer {
118    #[tool(
119        description = "Spawn or resume a managed network. Defaults to force_setup=true for fresh devnet startup."
120    )]
121    async fn spawn_network(
122        &self,
123        Parameters(request): Parameters<SpawnNetworkRequest>,
124    ) -> std::result::Result<CallToolResult, ErrorData> {
125        let result = self
126            .manager
127            .spawn_network(request)
128            .await
129            .map_err(to_mcp_error)?;
130        Ok(ok_value(
131            serde_json::to_value(result).map_err(internal_serde_error)?,
132        ))
133    }
134
135    #[tool(
136        description = "Wait for a managed network to be ready: processes running, /status healthy, reactor Validate, and at least one block observed."
137    )]
138    async fn wait_network_ready(
139        &self,
140        Parameters(request): Parameters<WaitNetworkReadyRequest>,
141    ) -> std::result::Result<CallToolResult, ErrorData> {
142        let result = self
143            .manager
144            .wait_network_ready(&request.network_name, request.timeout_seconds)
145            .await
146            .map_err(to_mcp_error)?;
147        Ok(ok_value(
148            serde_json::to_value(result).map_err(internal_serde_error)?,
149        ))
150    }
151
152    #[tool(
153        description = "Stage a protocol upgrade from a named custom asset. When the network is running, sidecars are restarted after staging."
154    )]
155    async fn stage_protocol(
156        &self,
157        Parameters(request): Parameters<StageProtocolRequest>,
158    ) -> std::result::Result<CallToolResult, ErrorData> {
159        let result = self
160            .manager
161            .stage_protocol(
162                &request.network_name,
163                request.asset.as_deref(),
164                request.custom_asset.as_deref(),
165                request.asset_name.as_deref(),
166                &request.protocol_version,
167                request.activation_point,
168            )
169            .await
170            .map_err(to_mcp_error)?;
171        Ok(ok_value(
172            serde_json::to_value(result).map_err(internal_serde_error)?,
173        ))
174    }
175
176    #[tool(
177        description = "Despawn a managed network. By default it stops processes and keeps files; set purge=true to remove files."
178    )]
179    async fn despawn_network(
180        &self,
181        Parameters(request): Parameters<DespawnNetworkRequest>,
182    ) -> std::result::Result<CallToolResult, ErrorData> {
183        let result = self
184            .manager
185            .despawn_network(&request.network_name, request.purge.unwrap_or(false))
186            .await
187            .map_err(to_mcp_error)?;
188        Ok(ok_value(
189            serde_json::to_value(result).map_err(internal_serde_error)?,
190        ))
191    }
192
193    #[tool(description = "List discovered network names and managed/running state.")]
194    async fn list_networks(
195        &self,
196        _: Parameters<ListNetworksRequest>,
197    ) -> std::result::Result<CallToolResult, ErrorData> {
198        let result = self.manager.list_networks().await.map_err(to_mcp_error)?;
199        Ok(ok_value(
200            serde_json::to_value(result).map_err(internal_serde_error)?,
201        ))
202    }
203
204    #[tool(
205        description = "List managed network processes, optionally filtered by process name. Defaults to running_only=true."
206    )]
207    async fn managed_processes(
208        &self,
209        Parameters(request): Parameters<ManagedProcessesRequest>,
210    ) -> std::result::Result<CallToolResult, ErrorData> {
211        let result = self
212            .manager
213            .managed_processes(request)
214            .await
215            .map_err(to_mcp_error)?;
216        Ok(ok_value(
217            serde_json::to_value(result).map_err(internal_serde_error)?,
218        ))
219    }
220
221    #[tool(description = "Call node REST /status for a managed network node.")]
222    async fn status(
223        &self,
224        Parameters(request): Parameters<NodeScopedRequest>,
225    ) -> std::result::Result<CallToolResult, ErrorData> {
226        let network = self
227            .manager
228            .get_network(&request.network_name)
229            .await
230            .map_err(to_mcp_error)?;
231        ensure_running_network(&network).await?;
232        let status = fetch_rest_status(request.node_id)
233            .await
234            .map_err(to_mcp_error)?;
235        Ok(ok_value(status))
236    }
237
238    #[tool(
239        description = "Run a raw JSON-RPC call against node sidecar endpoint. Useful as a generic query helper."
240    )]
241    async fn rpc_query(
242        &self,
243        Parameters(request): Parameters<RpcQueryRequest>,
244    ) -> std::result::Result<CallToolResult, ErrorData> {
245        let network = self
246            .manager
247            .get_network(&request.network_name)
248            .await
249            .map_err(to_mcp_error)?;
250        ensure_running_network(&network).await?;
251        let value = self
252            .manager
253            .raw_rpc_query(request.node_id, &request.method, request.params)
254            .await
255            .map_err(to_mcp_error)?;
256        Ok(ok_value(value))
257    }
258
259    #[tool(description = "Typed balance query by account/public key/uref/entity identifier.")]
260    async fn rpc_query_balance(
261        &self,
262        Parameters(request): Parameters<RpcQueryBalanceRequest>,
263    ) -> std::result::Result<CallToolResult, ErrorData> {
264        let network = self
265            .manager
266            .get_network(&request.network_name)
267            .await
268            .map_err(to_mcp_error)?;
269        ensure_running_network(&network).await?;
270
271        let block_id = normalize_optional_identifier(request.block_id.as_deref());
272        let state_root_hash = normalize_optional_identifier(request.state_root_hash.as_deref());
273        let params =
274            build_query_balance_params(&request.purse_identifier, &block_id, &state_root_hash)
275                .map_err(to_mcp_error)?;
276        let response = self
277            .manager
278            .raw_rpc_query(request.node_id, "query_balance", Some(params))
279            .await
280            .map_err(to_mcp_error)?;
281        Ok(ok_value(
282            extract_rpc_result(response).map_err(to_mcp_error)?,
283        ))
284    }
285
286    #[tool(
287        description = "Typed global state query by key + optional path + optional block/state-root identifier. If no identifier is provided, the latest block hash is used."
288    )]
289    async fn rpc_query_global_state(
290        &self,
291        Parameters(request): Parameters<RpcQueryGlobalStateRequest>,
292    ) -> std::result::Result<CallToolResult, ErrorData> {
293        let network = self
294            .manager
295            .get_network(&request.network_name)
296            .await
297            .map_err(to_mcp_error)?;
298        ensure_running_network(&network).await?;
299
300        let (block_id, state_root_hash) = resolve_global_state_identifier(
301            &request.network_name,
302            request.node_id,
303            request.block_id.as_deref(),
304            request.state_root_hash.as_deref(),
305        )
306        .await
307        .map_err(to_mcp_error)?;
308        let params = build_query_global_state_params(
309            &request.key,
310            request.path.unwrap_or_default(),
311            &block_id,
312            &state_root_hash,
313        )
314        .map_err(to_mcp_error)?;
315        let response = self
316            .manager
317            .raw_rpc_query(request.node_id, "query_global_state", Some(params))
318            .await
319            .map_err(to_mcp_error)?;
320
321        Ok(ok_value(
322            extract_rpc_result(response).map_err(to_mcp_error)?,
323        ))
324    }
325
326    #[tool(description = "Get current block height using typed chain_get_block RPC.")]
327    async fn current_block_height(
328        &self,
329        Parameters(request): Parameters<CurrentBlockRequest>,
330    ) -> std::result::Result<CallToolResult, ErrorData> {
331        let network = self
332            .manager
333            .get_network(&request.network_name)
334            .await
335            .map_err(to_mcp_error)?;
336        ensure_running_network(&network).await?;
337
338        let value = fetch_block_result(
339            &self.manager,
340            &request.network_name,
341            request.node_id,
342            request.block_id.as_deref(),
343        )
344        .await;
345        let value = match value {
346            Ok(value) => value,
347            Err(err) => {
348                if request.block_id.is_none()
349                    && let Some((low, high)) =
350                        parse_no_such_block_range_from_error(&err.to_string())
351                {
352                    return Ok(ok_value(json!({
353                        "network_name": request.network_name,
354                        "node_id": request.node_id,
355                        "height": high,
356                        "block": Value::Null,
357                        "pending": true,
358                        "available_block_range": {
359                            "low": low,
360                            "high": high,
361                        },
362                        "message": "latest block is not yet queryable; retry shortly",
363                    })));
364                }
365                return Err(to_mcp_error(err));
366            }
367        };
368        let height = extract_block_height(&value).ok_or_else(|| {
369            ErrorData::internal_error("missing block height in RPC response", None)
370        })?;
371
372        Ok(ok_value(json!({
373            "network_name": request.network_name,
374            "node_id": request.node_id,
375            "height": height,
376            "block": value,
377        })))
378    }
379
380    #[tool(description = "Get current block payload using typed chain_get_block RPC.")]
381    async fn current_block(
382        &self,
383        Parameters(request): Parameters<CurrentBlockRequest>,
384    ) -> std::result::Result<CallToolResult, ErrorData> {
385        let network = self
386            .manager
387            .get_network(&request.network_name)
388            .await
389            .map_err(to_mcp_error)?;
390        ensure_running_network(&network).await?;
391
392        let value = fetch_block_result(
393            &self.manager,
394            &request.network_name,
395            request.node_id,
396            request.block_id.as_deref(),
397        )
398        .await
399        .map_err(to_mcp_error)?;
400        Ok(ok_value(value))
401    }
402
403    #[tool(description = "Get paginated node stdout/stderr logs from on-disk files.")]
404    async fn get_node_logs(
405        &self,
406        Parameters(request): Parameters<GetNodeLogsRequest>,
407    ) -> std::result::Result<CallToolResult, ErrorData> {
408        let network = self
409            .manager
410            .get_network(&request.network_name)
411            .await
412            .map_err(to_mcp_error)?;
413
414        let limit = request.limit.unwrap_or(DEFAULT_LOG_PAGE_SIZE);
415        if limit == 0 {
416            return Err(ErrorData::invalid_params(
417                "limit must be greater than 0",
418                None,
419            ));
420        }
421
422        let file_name = match request.stream {
423            NodeLogStream::Stdout => "stdout.log",
424            NodeLogStream::Stderr => "stderr.log",
425        };
426        let path = network
427            .layout
428            .node_logs_dir(request.node_id)
429            .join(file_name);
430
431        let page = read_log_page(&path, request.before_line, limit)
432            .await
433            .map_err(to_mcp_error)?;
434        Ok(ok_value(
435            serde_json::to_value(page).map_err(internal_serde_error)?,
436        ))
437    }
438
439    #[tool(
440        description = "Wait for the next SSE event after a sequence cursor with optional event type filters."
441    )]
442    async fn wait_next_sse(
443        &self,
444        Parameters(request): Parameters<WaitNextSseRequest>,
445    ) -> std::result::Result<CallToolResult, ErrorData> {
446        let network = self
447            .manager
448            .get_network(&request.network_name)
449            .await
450            .map_err(to_mcp_error)?;
451        ensure_running_network(&network).await?;
452
453        let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
454        let filter = SseFilter {
455            include_event_types: request.include_event_types.unwrap_or_default(),
456            exclude_event_types: request.exclude_event_types.unwrap_or_default(),
457        };
458
459        let after = match request.after_sequence {
460            Some(value) => value,
461            None => network.sse_store.latest_sequence().await,
462        };
463
464        let event = network
465            .sse_store
466            .wait_next(after, &filter, timeout)
467            .await
468            .map_err(to_mcp_error)?;
469
470        Ok(ok_value(
471            serde_json::to_value(event).map_err(internal_serde_error)?,
472        ))
473    }
474
475    #[tool(
476        description = "Fetch paginated SSE history with sequence cursor and optional event type filters."
477    )]
478    async fn sse_history(
479        &self,
480        Parameters(request): Parameters<SseHistoryRequest>,
481    ) -> std::result::Result<CallToolResult, ErrorData> {
482        let network = self
483            .manager
484            .get_network(&request.network_name)
485            .await
486            .map_err(to_mcp_error)?;
487
488        let limit = request.limit.unwrap_or(DEFAULT_SSE_PAGE_SIZE);
489        if limit == 0 {
490            return Err(ErrorData::invalid_params(
491                "limit must be greater than 0",
492                None,
493            ));
494        }
495
496        let filter = SseFilter {
497            include_event_types: request.include_event_types.unwrap_or_default(),
498            exclude_event_types: request.exclude_event_types.unwrap_or_default(),
499        };
500
501        let history = network
502            .sse_store
503            .history(request.before_sequence, limit, &filter)
504            .await;
505        Ok(ok_value(
506            serde_json::to_value(history).map_err(internal_serde_error)?,
507        ))
508    }
509
510    #[tool(description = "List derived accounts from derived-accounts.csv for a network.")]
511    async fn list_derived_accounts(
512        &self,
513        Parameters(request): Parameters<ListDerivedAccountsRequest>,
514    ) -> std::result::Result<CallToolResult, ErrorData> {
515        let accounts = self
516            .manager
517            .list_derived_accounts(&request.network_name)
518            .await
519            .map_err(to_mcp_error)?;
520        Ok(ok_value(
521            serde_json::to_value(accounts).map_err(internal_serde_error)?,
522        ))
523    }
524
525    #[tool(
526        description = "Submit signed or unsigned transaction JSON. Signer key is derived from signer_path for this managed network. transaction must be a typed Transaction JSON object."
527    )]
528    async fn send_transaction_signed(
529        &self,
530        Parameters(request): Parameters<SendTransactionSignedRequest>,
531    ) -> std::result::Result<CallToolResult, ErrorData> {
532        let network = self
533            .manager
534            .get_network(&request.network_name)
535            .await
536            .map_err(to_mcp_error)?;
537        ensure_running_network(&network).await?;
538
539        let signer = self
540            .manager
541            .derived_account_for_path(&network, &request.signer_path)
542            .await
543            .map_err(to_mcp_error)?;
544        verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
545            .await
546            .map_err(to_mcp_error)?;
547
548        let mut transaction = parse_transaction_json(request.transaction)?;
549        transaction.sign(&signer.secret_key);
550
551        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
552        let response = rpc
553            .put_transaction(transaction)
554            .await
555            .map_err(to_mcp_error)?;
556
557        Ok(ok_value(json!({
558            "network_name": request.network_name,
559            "node_id": request.node_id,
560            "transaction_hash": response.transaction_hash.to_hex_string(),
561        })))
562    }
563
564    #[tool(
565        description = "Create a stored package-name call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
566    )]
567    async fn make_transaction_package_call(
568        &self,
569        Parameters(request): Parameters<MakeTransactionPackageCallRequest>,
570    ) -> std::result::Result<CallToolResult, ErrorData> {
571        let network = self
572            .manager
573            .get_network(&request.network_name)
574            .await
575            .map_err(to_mcp_error)?;
576        ensure_running_network(&network).await?;
577
578        let runtime = match (
579            request.runtime_transferred_value,
580            request.runtime_seed_hex.as_deref(),
581        ) {
582            (None, None) => TransactionRuntimeParams::VmCasperV1,
583            (transferred_value, maybe_seed_hex) => {
584                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
585                TransactionRuntimeParams::VmCasperV2 {
586                    transferred_value: transferred_value.unwrap_or(0),
587                    seed,
588                }
589            }
590        };
591
592        let mut builder = TransactionV1Builder::new_targeting_package_via_alias(
593            request.transaction_package_name.clone(),
594            request.transaction_package_version,
595            request.session_entry_point.clone(),
596            runtime,
597        );
598
599        if let Some(args_json) = request.session_args.as_ref()
600            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
601        {
602            builder = builder.with_runtime_args(runtime_args);
603        }
604
605        if let Some(ttl_millis) = request.ttl_millis {
606            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
607        }
608
609        let pricing = build_pricing_mode(
610            request.gas_price_tolerance,
611            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
612        );
613        builder = builder.with_pricing_mode(pricing);
614
615        let chain_name = match request.chain_name {
616            Some(value) => value,
617            None => fetch_chain_name(&request.network_name, request.node_id)
618                .await
619                .map_err(to_mcp_error)?,
620        };
621        builder = builder.with_chain_name(chain_name.clone());
622
623        let mut signed = false;
624        let mut signer_path = None;
625        let mut derived_signer = None;
626        if let Some(path) = request.signer_path {
627            let signer = self
628                .manager
629                .derived_account_for_path(&network, &path)
630                .await
631                .map_err(to_mcp_error)?;
632            verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
633                .await
634                .map_err(to_mcp_error)?;
635            derived_signer = Some(signer);
636            signed = true;
637            signer_path = Some(path);
638        } else if let Some(initiator_public_key) = request.initiator_public_key {
639            let public_key = PublicKey::from_hex(initiator_public_key.trim())
640                .with_context(|| "failed to parse initiator_public_key as hex public key")
641                .map_err(to_mcp_error)?;
642            builder = builder.with_initiator_addr(public_key);
643        } else {
644            return Err(ErrorData::invalid_params(
645                "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
646                None,
647            ));
648        }
649
650        if let Some(signer) = derived_signer.as_ref() {
651            builder = builder.with_secret_key(&signer.secret_key);
652        }
653
654        let tx = builder.build().map_err(to_mcp_error)?;
655        let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
656
657        Ok(ok_value(json!({
658            "network_name": request.network_name,
659            "node_id": request.node_id,
660            "chain_name": chain_name,
661            "signed": signed,
662            "signer_path": signer_path,
663            "transaction": tx_json,
664        })))
665    }
666
667    #[tool(
668        description = "Create a stored contract-hash call transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
669    )]
670    async fn make_transaction_contract_call(
671        &self,
672        Parameters(request): Parameters<MakeTransactionContractCallRequest>,
673    ) -> std::result::Result<CallToolResult, ErrorData> {
674        let network = self
675            .manager
676            .get_network(&request.network_name)
677            .await
678            .map_err(to_mcp_error)?;
679        ensure_running_network(&network).await?;
680
681        let runtime = match (
682            request.runtime_transferred_value,
683            request.runtime_seed_hex.as_deref(),
684        ) {
685            (None, None) => TransactionRuntimeParams::VmCasperV1,
686            (transferred_value, maybe_seed_hex) => {
687                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
688                TransactionRuntimeParams::VmCasperV2 {
689                    transferred_value: transferred_value.unwrap_or(0),
690                    seed,
691                }
692            }
693        };
694
695        let contract_hash = parse_contract_hash_for_invocation(&request.transaction_contract_hash)
696            .map_err(to_mcp_error)?;
697        let mut builder = TransactionV1Builder::new_targeting_invocable_entity(
698            contract_hash,
699            request.session_entry_point.clone(),
700            runtime,
701        );
702
703        if let Some(args_json) = request.session_args.as_ref()
704            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
705        {
706            builder = builder.with_runtime_args(runtime_args);
707        }
708
709        if let Some(ttl_millis) = request.ttl_millis {
710            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
711        }
712
713        let pricing = build_pricing_mode(
714            request.gas_price_tolerance,
715            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
716        );
717        builder = builder.with_pricing_mode(pricing);
718
719        let chain_name = match request.chain_name {
720            Some(value) => value,
721            None => fetch_chain_name(&request.network_name, request.node_id)
722                .await
723                .map_err(to_mcp_error)?,
724        };
725        builder = builder.with_chain_name(chain_name.clone());
726
727        let mut signed = false;
728        let mut signer_path = None;
729        let mut derived_signer = None;
730        if let Some(path) = request.signer_path {
731            let signer = self
732                .manager
733                .derived_account_for_path(&network, &path)
734                .await
735                .map_err(to_mcp_error)?;
736            verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
737                .await
738                .map_err(to_mcp_error)?;
739            derived_signer = Some(signer);
740            signed = true;
741            signer_path = Some(path);
742        } else if let Some(initiator_public_key) = request.initiator_public_key {
743            let public_key = PublicKey::from_hex(initiator_public_key.trim())
744                .with_context(|| "failed to parse initiator_public_key as hex public key")
745                .map_err(to_mcp_error)?;
746            builder = builder.with_initiator_addr(public_key);
747        } else {
748            return Err(ErrorData::invalid_params(
749                "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
750                None,
751            ));
752        }
753
754        if let Some(signer) = derived_signer.as_ref() {
755            builder = builder.with_secret_key(&signer.secret_key);
756        }
757
758        let tx = builder.build().map_err(to_mcp_error)?;
759        let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
760
761        Ok(ok_value(json!({
762            "network_name": request.network_name,
763            "node_id": request.node_id,
764            "chain_name": chain_name,
765            "transaction_contract_hash": request.transaction_contract_hash,
766            "signed": signed,
767            "signer_path": signer_path,
768            "transaction": tx_json,
769        })))
770    }
771
772    #[tool(
773        description = "Create a session wasm transaction (make-transaction style). Returns transaction JSON for follow-up submission with send_transaction_signed. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
774    )]
775    async fn make_transaction_session_wasm(
776        &self,
777        Parameters(request): Parameters<MakeTransactionSessionWasmRequest>,
778    ) -> std::result::Result<CallToolResult, ErrorData> {
779        let network = self
780            .manager
781            .get_network(&request.network_name)
782            .await
783            .map_err(to_mcp_error)?;
784        ensure_running_network(&network).await?;
785
786        let wasm_path = PathBuf::from(&request.wasm_path);
787        let wasm_bytes = tokio_fs::read(&wasm_path)
788            .await
789            .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
790            .map_err(to_mcp_error)?;
791
792        let runtime = match (
793            request.runtime_transferred_value,
794            request.runtime_seed_hex.as_deref(),
795        ) {
796            (None, None) => TransactionRuntimeParams::VmCasperV1,
797            (transferred_value, maybe_seed_hex) => {
798                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
799                TransactionRuntimeParams::VmCasperV2 {
800                    transferred_value: transferred_value.unwrap_or(0),
801                    seed,
802                }
803            }
804        };
805
806        let mut builder = TransactionV1Builder::new_session(
807            request.is_install_upgrade.unwrap_or(true),
808            wasm_bytes.into(),
809            runtime,
810        );
811
812        if let Some(args_json) = request.session_args.as_ref()
813            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
814        {
815            builder = builder.with_runtime_args(runtime_args);
816        }
817
818        if let Some(ttl_millis) = request.ttl_millis {
819            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
820        }
821
822        let pricing = build_pricing_mode(
823            request.gas_price_tolerance,
824            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
825        );
826        builder = builder.with_pricing_mode(pricing);
827
828        let chain_name = match request.chain_name {
829            Some(value) => value,
830            None => fetch_chain_name(&request.network_name, request.node_id)
831                .await
832                .map_err(to_mcp_error)?,
833        };
834        builder = builder.with_chain_name(chain_name.clone());
835
836        let mut signed = false;
837        let mut signer_path = None;
838        let mut derived_signer = None;
839        if let Some(path) = request.signer_path {
840            let signer = self
841                .manager
842                .derived_account_for_path(&network, &path)
843                .await
844                .map_err(to_mcp_error)?;
845            verify_path_hash_consistency(&network.layout, &path, &signer.account_hash)
846                .await
847                .map_err(to_mcp_error)?;
848            derived_signer = Some(signer);
849            signed = true;
850            signer_path = Some(path);
851        } else if let Some(initiator_public_key) = request.initiator_public_key {
852            let public_key = PublicKey::from_hex(initiator_public_key.trim())
853                .with_context(|| "failed to parse initiator_public_key as hex public key")
854                .map_err(to_mcp_error)?;
855            builder = builder.with_initiator_addr(public_key);
856        } else {
857            return Err(ErrorData::invalid_params(
858                "provide signer_path (for signed tx) or initiator_public_key (for unsigned tx)",
859                None,
860            ));
861        }
862
863        if let Some(signer) = derived_signer.as_ref() {
864            builder = builder.with_secret_key(&signer.secret_key);
865        }
866
867        let tx = builder.build().map_err(to_mcp_error)?;
868        let tx_json = serde_json::to_value(Transaction::V1(tx)).map_err(internal_serde_error)?;
869
870        Ok(ok_value(json!({
871            "network_name": request.network_name,
872            "node_id": request.node_id,
873            "chain_name": chain_name,
874            "wasm_path": request.wasm_path,
875            "signed": signed,
876            "signer_path": signer_path,
877            "transaction": tx_json,
878        })))
879    }
880
881    #[tool(
882        description = "Build, sign and submit a session wasm transaction from a derived account path. session_args accepts either: (1) array of {name,type,value} objects, e.g. [{\"name\":\"value\",\"type\":\"I32\",\"value\":\"1\"}], or (2) full RuntimeArgs JSON object. Legacy field name session_args_json is accepted as an alias, but values must be typed JSON (not encoded JSON strings). Not supported: object shorthand like {\"value\":1} or casper-client string args like [\"value:i32=1\"]. For composite CLTypes (List/Map/Tuple/Result/ByteArray), value must be hex bytes (0x...)."
883    )]
884    async fn send_session_wasm(
885        &self,
886        Parameters(request): Parameters<SendSessionWasmRequest>,
887    ) -> std::result::Result<CallToolResult, ErrorData> {
888        let network = self
889            .manager
890            .get_network(&request.network_name)
891            .await
892            .map_err(to_mcp_error)?;
893        ensure_running_network(&network).await?;
894
895        let signer = self
896            .manager
897            .derived_account_for_path(&network, &request.signer_path)
898            .await
899            .map_err(to_mcp_error)?;
900        verify_path_hash_consistency(&network.layout, &request.signer_path, &signer.account_hash)
901            .await
902            .map_err(to_mcp_error)?;
903
904        let wasm_path = PathBuf::from(&request.wasm_path);
905        let wasm_bytes = tokio_fs::read(&wasm_path)
906            .await
907            .with_context(|| format!("failed to read wasm at {}", wasm_path.display()))
908            .map_err(to_mcp_error)?;
909
910        let runtime = match (
911            request.runtime_transferred_value,
912            request.runtime_seed_hex.as_deref(),
913        ) {
914            (None, None) => TransactionRuntimeParams::VmCasperV1,
915            (transferred_value, maybe_seed_hex) => {
916                let seed = parse_optional_seed_hex(maybe_seed_hex).map_err(to_mcp_error)?;
917                TransactionRuntimeParams::VmCasperV2 {
918                    transferred_value: transferred_value.unwrap_or(0),
919                    seed,
920                }
921            }
922        };
923
924        let mut builder = TransactionV1Builder::new_session(
925            request.is_install_upgrade.unwrap_or(true),
926            wasm_bytes.into(),
927            runtime,
928        );
929
930        if let Some(args_json) = request.session_args.as_ref()
931            && let Some(runtime_args) = parse_session_args(args_json).map_err(to_mcp_error)?
932        {
933            builder = builder.with_runtime_args(runtime_args);
934        }
935
936        if let Some(ttl_millis) = request.ttl_millis {
937            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
938        }
939
940        let pricing = build_pricing_mode(
941            request.gas_price_tolerance,
942            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
943        );
944        builder = builder.with_pricing_mode(pricing);
945
946        let chain_name = match request.chain_name {
947            Some(value) => value,
948            None => fetch_chain_name(&request.network_name, request.node_id)
949                .await
950                .map_err(to_mcp_error)?,
951        };
952
953        let tx = builder
954            .with_chain_name(chain_name)
955            .with_secret_key(&signer.secret_key)
956            .build()
957            .map_err(to_mcp_error)?;
958
959        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
960        let response = rpc
961            .put_transaction(Transaction::V1(tx))
962            .await
963            .map_err(to_mcp_error)?;
964
965        Ok(ok_value(json!({
966            "network_name": request.network_name,
967            "node_id": request.node_id,
968            "transaction_hash": response.transaction_hash.to_hex_string(),
969        })))
970    }
971
972    #[tool(
973        description = "Transfer tokens between derived account paths. from_path signs, to_path resolves recipient."
974    )]
975    async fn transfer_tokens(
976        &self,
977        Parameters(request): Parameters<TransferTokensRequest>,
978    ) -> std::result::Result<CallToolResult, ErrorData> {
979        let network = self
980            .manager
981            .get_network(&request.network_name)
982            .await
983            .map_err(to_mcp_error)?;
984        ensure_running_network(&network).await?;
985
986        let from = self
987            .manager
988            .derived_account_for_path(&network, &request.from_path)
989            .await
990            .map_err(to_mcp_error)?;
991        let to = self
992            .manager
993            .derived_account_for_path(&network, &request.to_path)
994            .await
995            .map_err(to_mcp_error)?;
996
997        verify_path_hash_consistency(&network.layout, &request.from_path, &from.account_hash)
998            .await
999            .map_err(to_mcp_error)?;
1000        verify_path_hash_consistency(&network.layout, &request.to_path, &to.account_hash)
1001            .await
1002            .map_err(to_mcp_error)?;
1003
1004        let amount = casper_types::U512::from_dec_str(&request.amount)
1005            .with_context(|| "amount must be a decimal U512 string")
1006            .map_err(to_mcp_error)?;
1007        let to_public_key = PublicKey::from_hex(&to.public_key_hex)
1008            .with_context(|| "failed to parse derived recipient public key")
1009            .map_err(to_mcp_error)?;
1010
1011        let mut builder =
1012            TransactionV1Builder::new_transfer(amount, None, to_public_key, request.transfer_id)
1013                .map_err(to_mcp_error)?;
1014
1015        if let Some(ttl_millis) = request.ttl_millis {
1016            builder = builder.with_ttl(TimeDiff::from_millis(ttl_millis));
1017        }
1018
1019        let pricing = build_pricing_mode(
1020            request.gas_price_tolerance,
1021            Some(request.payment_amount.unwrap_or(DEFAULT_PAYMENT_AMOUNT)),
1022        );
1023        builder = builder.with_pricing_mode(pricing);
1024
1025        let chain_name = match request.chain_name {
1026            Some(value) => value,
1027            None => fetch_chain_name(&request.network_name, request.node_id)
1028                .await
1029                .map_err(to_mcp_error)?,
1030        };
1031
1032        let tx = builder
1033            .with_chain_name(chain_name)
1034            .with_secret_key(&from.secret_key)
1035            .build()
1036            .map_err(to_mcp_error)?;
1037
1038        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1039        let response = rpc
1040            .put_transaction(Transaction::V1(tx))
1041            .await
1042            .map_err(to_mcp_error)?;
1043
1044        Ok(ok_value(json!({
1045            "network_name": request.network_name,
1046            "node_id": request.node_id,
1047            "transaction_hash": response.transaction_hash.to_hex_string(),
1048            "from_path": request.from_path,
1049            "to_path": request.to_path,
1050            "amount": request.amount,
1051        })))
1052    }
1053
1054    #[tool(
1055        description = "Wait for transaction execution result with timeout and return execution effects/result payload."
1056    )]
1057    async fn wait_transaction(
1058        &self,
1059        Parameters(request): Parameters<WaitTransactionRequest>,
1060    ) -> std::result::Result<CallToolResult, ErrorData> {
1061        let network = self
1062            .manager
1063            .get_network(&request.network_name)
1064            .await
1065            .map_err(to_mcp_error)?;
1066        ensure_running_network(&network).await?;
1067
1068        let timeout = Duration::from_secs(request.timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1069        let poll = Duration::from_millis(request.poll_interval_millis.unwrap_or(1_000));
1070        let tx_hash =
1071            parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1072        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1073        let deadline = Instant::now() + timeout;
1074
1075        loop {
1076            let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1077
1078            if let Some(exec_info) = response.execution_info.clone()
1079                && exec_info.execution_result.is_some()
1080            {
1081                return Ok(ok_value(
1082                    serde_json::to_value(response).map_err(internal_serde_error)?,
1083                ));
1084            }
1085
1086            if Instant::now() >= deadline {
1087                return Err(ErrorData::resource_not_found(
1088                    format!(
1089                        "transaction {} execution result not available before timeout",
1090                        request.transaction_hash
1091                    ),
1092                    None,
1093                ));
1094            }
1095
1096            tokio::time::sleep(poll).await;
1097        }
1098    }
1099
1100    #[tool(
1101        description = "Get transaction details via info_get_transaction (non-waiting). Returns typed JSON response from node."
1102    )]
1103    async fn get_transaction(
1104        &self,
1105        Parameters(request): Parameters<GetTransactionRequest>,
1106    ) -> std::result::Result<CallToolResult, ErrorData> {
1107        let network = self
1108            .manager
1109            .get_network(&request.network_name)
1110            .await
1111            .map_err(to_mcp_error)?;
1112        ensure_running_network(&network).await?;
1113
1114        let tx_hash =
1115            parse_transaction_hash_input(&request.transaction_hash).map_err(to_mcp_error)?;
1116        let rpc = mcp_rpc_client(&request.network_name, request.node_id).map_err(to_mcp_error)?;
1117        let response = rpc.get_transaction(tx_hash).await.map_err(to_mcp_error)?;
1118
1119        Ok(ok_value(
1120            serde_json::to_value(response).map_err(internal_serde_error)?,
1121        ))
1122    }
1123}
1124
1125#[tool_handler]
1126impl ServerHandler for McpServer {
1127    fn get_info(&self) -> ServerInfo {
1128        Self::server_info()
1129    }
1130}
1131
1132#[derive(Debug)]
1133struct NetworkManager {
1134    assets_root: PathBuf,
1135    managed: Mutex<HashMap<String, Arc<ManagedNetwork>>>,
1136    http: reqwest::Client,
1137}
1138
1139#[derive(Debug)]
1140struct ManagedNetwork {
1141    layout: AssetsLayout,
1142    state: Arc<Mutex<State>>,
1143    node_count: AtomicU32,
1144    rust_log: String,
1145    seed: Arc<str>,
1146    sse_store: Arc<SseStore>,
1147    last_block_hook_hash: Mutex<Option<String>>,
1148    shutdown: Arc<AtomicBool>,
1149    control_shutdown: Arc<AtomicBool>,
1150    asset_mutation_lock: Mutex<()>,
1151    sse_tasks: Mutex<Vec<tokio::task::JoinHandle<()>>>,
1152    control_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
1153}
1154
1155impl ManagedNetwork {
1156    async fn is_running(&self) -> bool {
1157        let state = self.state.lock().await;
1158        processes_running(&state)
1159    }
1160
1161    async fn stop(&self) -> Result<()> {
1162        self.shutdown.store(true, Ordering::SeqCst);
1163        self.control_shutdown.store(true, Ordering::SeqCst);
1164
1165        if let Some(control_task) = self.control_task.lock().await.take() {
1166            control_task.abort();
1167            let _ = control_task.await;
1168        }
1169        let _ = tokio_fs::remove_file(self.layout.control_socket_path()).await;
1170
1171        let tasks = {
1172            let mut guard = self.sse_tasks.lock().await;
1173            guard.drain(..).collect::<Vec<_>>()
1174        };
1175        for task in tasks {
1176            task.abort();
1177            let _ = task.await;
1178        }
1179
1180        let mut state = self.state.lock().await;
1181        process::stop(&mut state).await
1182    }
1183}
1184
1185impl NetworkManager {
1186    async fn new(assets_root: PathBuf) -> Result<Self> {
1187        let http = reqwest::Client::builder()
1188            .no_proxy()
1189            .timeout(Duration::from_secs(5))
1190            .build()?;
1191        Ok(Self {
1192            assets_root,
1193            managed: Mutex::new(HashMap::new()),
1194            http,
1195        })
1196    }
1197
1198    async fn spawn_network(&self, request: SpawnNetworkRequest) -> Result<SpawnNetworkResponse> {
1199        let network_name = request
1200            .network_name
1201            .unwrap_or_else(|| DEFAULT_NETWORK_NAME.to_string());
1202
1203        if network_name.trim().is_empty() {
1204            return Err(anyhow!("network_name must not be empty"));
1205        }
1206
1207        let maybe_existing = {
1208            let mut managed = self.managed.lock().await;
1209            if let Some(existing) = managed.get(&network_name)
1210                && existing.is_running().await
1211                && !request.force_setup.unwrap_or(true)
1212            {
1213                return Ok(SpawnNetworkResponse {
1214                    network_name,
1215                    node_count: existing.node_count.load(Ordering::SeqCst),
1216                    managed: true,
1217                    already_running: true,
1218                    forced_setup: false,
1219                });
1220            }
1221            managed.remove(&network_name)
1222        };
1223
1224        if let Some(existing) = maybe_existing {
1225            let _ = existing.stop().await;
1226        }
1227
1228        let force_setup = request.force_setup.unwrap_or(true);
1229        let requested_nodes = request.node_count.unwrap_or(DEFAULT_NODE_COUNT);
1230        let users = request.users;
1231        let delay = request.delay.unwrap_or(DEFAULT_DELAY_SECS);
1232        let log_level = request
1233            .log_level
1234            .unwrap_or_else(|| DEFAULT_LOG_LEVEL.to_string());
1235        let node_log_format = request
1236            .node_log_format
1237            .unwrap_or_else(|| DEFAULT_NODE_LOG_FORMAT.to_string());
1238        let seed: Arc<str> = Arc::from(request.seed.unwrap_or_else(|| DEFAULT_SEED.to_string()));
1239
1240        let layout = AssetsLayout::new(self.assets_root.clone(), network_name.clone());
1241        let assets_exist = layout.exists().await;
1242
1243        let asset_selector = assets::optional_asset_selector(
1244            request.asset.as_deref(),
1245            request.custom_asset.as_deref(),
1246        )?;
1247
1248        let setup_result = if force_setup {
1249            assets::teardown(&layout).await?;
1250            Some(
1251                assets::setup_local(
1252                    &layout,
1253                    &SetupOptions {
1254                        nodes: requested_nodes,
1255                        users,
1256                        delay_seconds: delay,
1257                        network_name: network_name.clone(),
1258                        asset: asset_selector.clone(),
1259                        protocol_version: request.protocol_version.clone(),
1260                        chainspec_overrides: Vec::new(),
1261                        node_log_format,
1262                        seed: Arc::clone(&seed),
1263                    },
1264                )
1265                .await?,
1266            )
1267        } else if !assets_exist {
1268            Some(
1269                assets::setup_local(
1270                    &layout,
1271                    &SetupOptions {
1272                        nodes: requested_nodes,
1273                        users,
1274                        delay_seconds: delay,
1275                        network_name: network_name.clone(),
1276                        asset: asset_selector,
1277                        protocol_version: request.protocol_version.clone(),
1278                        chainspec_overrides: Vec::new(),
1279                        node_log_format,
1280                        seed: Arc::clone(&seed),
1281                    },
1282                )
1283                .await?,
1284            )
1285        } else {
1286            None
1287        };
1288
1289        if !layout.exists().await {
1290            return Err(anyhow!(
1291                "assets missing under {}; call spawn_network with force_setup=true",
1292                layout.net_dir().display()
1293            ));
1294        }
1295
1296        assets::ensure_network_hook_samples(&layout).await?;
1297
1298        let node_count = layout.count_nodes().await?;
1299        if node_count == 0 {
1300            return Err(anyhow!("network has no nodes to start"));
1301        }
1302
1303        ensure_sidecar_available(&layout, node_count).await?;
1304
1305        let state_path = layout.net_dir().join(STATE_FILE_NAME);
1306        if !tokio_fs::try_exists(&state_path).await.unwrap_or(false) {
1307            let protocol_version = match &setup_result {
1308                Some(result) => result.protocol_version.clone(),
1309                None => latest_layout_protocol_version(&layout).await?,
1310            };
1311            assets::prepare_genesis_hooks(&layout, &protocol_version).await?;
1312        }
1313        let mut state = State::new(state_path).await?;
1314
1315        let rust_log = log_level.clone();
1316        process::start(
1317            &layout,
1318            &StartPlan {
1319                rust_log: rust_log.clone(),
1320                seed: Arc::clone(&seed),
1321            },
1322            &mut state,
1323        )
1324        .await?;
1325
1326        let managed = Arc::new(ManagedNetwork {
1327            layout: layout.clone(),
1328            state: Arc::new(Mutex::new(state)),
1329            node_count: AtomicU32::new(node_count),
1330            rust_log,
1331            seed,
1332            sse_store: Arc::new(SseStore::new(DEFAULT_SSE_HISTORY_CAPACITY)),
1333            last_block_hook_hash: Mutex::new(None),
1334            shutdown: Arc::new(AtomicBool::new(false)),
1335            control_shutdown: Arc::new(AtomicBool::new(false)),
1336            asset_mutation_lock: Mutex::new(()),
1337            sse_tasks: Mutex::new(Vec::new()),
1338            control_task: Mutex::new(None),
1339        });
1340        spawn_pid_sync_tasks(Arc::clone(&managed.state)).await;
1341
1342        self.spawn_sse_collectors(&managed).await;
1343        if let Err(err) = self.spawn_control_server(&managed).await {
1344            let _ = managed.stop().await;
1345            return Err(err);
1346        }
1347
1348        self.managed
1349            .lock()
1350            .await
1351            .insert(network_name.clone(), Arc::clone(&managed));
1352
1353        Ok(SpawnNetworkResponse {
1354            network_name,
1355            node_count,
1356            managed: true,
1357            already_running: false,
1358            forced_setup: force_setup,
1359        })
1360    }
1361
1362    async fn spawn_sse_collectors(&self, network: &Arc<ManagedNetwork>) {
1363        let mut tasks = Vec::new();
1364        for node_id in 1..=network.node_count.load(Ordering::SeqCst) {
1365            let endpoint = assets::sse_endpoint(node_id);
1366            let network = Arc::clone(network);
1367            let task = tokio::spawn(async move {
1368                run_sse_listener(network, node_id, endpoint).await;
1369            });
1370            tasks.push(task);
1371        }
1372        let mut guard = network.sse_tasks.lock().await;
1373        guard.extend(tasks);
1374    }
1375
1376    async fn spawn_control_server(&self, network: &Arc<ManagedNetwork>) -> Result<()> {
1377        let socket_path = network.layout.control_socket_path();
1378        if let Ok(metadata) = tokio_fs::symlink_metadata(&socket_path).await {
1379            if metadata.is_dir() {
1380                tokio_fs::remove_dir_all(&socket_path).await?;
1381            } else {
1382                tokio_fs::remove_file(&socket_path).await?;
1383            }
1384        }
1385        if let Some(parent) = socket_path.parent() {
1386            tokio_fs::create_dir_all(parent).await?;
1387        }
1388
1389        let listener = UnixListener::bind(&socket_path)?;
1390        let network_for_task = Arc::clone(network);
1391        let shutdown = Arc::clone(&network_for_task.control_shutdown);
1392        let task = tokio::spawn(async move {
1393            loop {
1394                if shutdown.load(Ordering::SeqCst) {
1395                    break;
1396                }
1397
1398                let accepted =
1399                    tokio::time::timeout(Duration::from_millis(250), listener.accept()).await;
1400                let (stream, _) = match accepted {
1401                    Ok(Ok(pair)) => pair,
1402                    Ok(Err(_)) => break,
1403                    Err(_) => continue,
1404                };
1405                let network = Arc::clone(&network_for_task);
1406                tokio::spawn(async move {
1407                    handle_managed_control_stream(stream, network).await;
1408                });
1409            }
1410
1411            let _ = tokio_fs::remove_file(&socket_path).await;
1412        });
1413        *network.control_task.lock().await = Some(task);
1414        Ok(())
1415    }
1416
1417    async fn stage_protocol(
1418        &self,
1419        network_name: &str,
1420        asset: Option<&str>,
1421        custom_asset: Option<&str>,
1422        legacy_asset_name: Option<&str>,
1423        protocol_version: &str,
1424        activation_point: u64,
1425    ) -> Result<StageProtocolResponse> {
1426        let asset_selector = stage_asset_selector(asset, custom_asset, legacy_asset_name)?;
1427        let managed = {
1428            let guard = self.managed.lock().await;
1429            guard.get(network_name).cloned()
1430        };
1431
1432        if let Some(network) = managed
1433            && network.is_running().await
1434        {
1435            if let Ok(Some(current_era)) = read_current_era_from_status(1).await
1436                && activation_point <= current_era
1437            {
1438                return Err(anyhow!(
1439                    "activation point {} must be greater than current era {}",
1440                    activation_point,
1441                    current_era
1442                ));
1443            }
1444
1445            let request = ControlRequest::StageProtocol {
1446                asset: match &asset_selector {
1447                    AssetSelector::Versioned(asset) => Some(asset.clone()),
1448                    AssetSelector::LatestVersioned | AssetSelector::Custom(_) => None,
1449                },
1450                custom_asset: match &asset_selector {
1451                    AssetSelector::Custom(asset) => Some(asset.clone()),
1452                    AssetSelector::LatestVersioned | AssetSelector::Versioned(_) => None,
1453                },
1454                asset_name: None,
1455                protocol_version: protocol_version.to_string(),
1456                activation_point,
1457                chainspec_overrides: Vec::new(),
1458                restart_sidecars: true,
1459                rust_log: Some(network.rust_log.clone()),
1460            };
1461            let socket_path = network.layout.control_socket_path();
1462            return match send_request(&socket_path, &request).await {
1463                Ok(ControlResponse::Ok { result }) => match result {
1464                    ControlResult::StageProtocol {
1465                        live_mode,
1466                        staged_nodes,
1467                        restarted_sidecars,
1468                    } => Ok(StageProtocolResponse {
1469                        network_name: network_name.to_string(),
1470                        live_mode,
1471                        staged_nodes,
1472                        restarted_sidecars,
1473                    }),
1474                    ControlResult::RuntimeStatus { .. } => Err(anyhow!(
1475                        "unexpected runtime_status response from {}",
1476                        socket_path.display()
1477                    )),
1478                    ControlResult::AddNodes { .. } => Err(anyhow!(
1479                        "unexpected add_nodes response from {}",
1480                        socket_path.display()
1481                    )),
1482                },
1483                Ok(ControlResponse::Error { error }) => Err(anyhow!(error)),
1484                Err(err) => Err(anyhow!(
1485                    "failed to reach managed control socket {}: {}",
1486                    socket_path.display(),
1487                    err
1488                )),
1489            };
1490        }
1491
1492        let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1493        if !layout.exists().await {
1494            return Err(anyhow!(
1495                "assets for {} not found under {}",
1496                network_name,
1497                layout.net_dir().display()
1498            ));
1499        }
1500
1501        let staged = assets::stage_protocol(
1502            &layout,
1503            &StageProtocolOptions {
1504                asset: asset_selector,
1505                protocol_version: protocol_version.to_string(),
1506                activation_point,
1507                chainspec_overrides: Vec::new(),
1508            },
1509        )
1510        .await?;
1511
1512        Ok(StageProtocolResponse {
1513            network_name: network_name.to_string(),
1514            live_mode: false,
1515            staged_nodes: staged.staged_nodes,
1516            restarted_sidecars: Vec::new(),
1517        })
1518    }
1519
1520    async fn wait_network_ready(
1521        &self,
1522        network_name: &str,
1523        timeout_seconds: Option<u64>,
1524    ) -> Result<WaitReadyResponse> {
1525        let network = self.get_network(network_name).await?;
1526        let timeout = Duration::from_secs(timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS));
1527        let deadline = Instant::now() + timeout;
1528
1529        loop {
1530            let running = ensure_running_network(&network).await.is_ok();
1531            if running {
1532                let status = check_rest_ready(&network).await;
1533                if let Ok(rest_by_node) = status {
1534                    let state = network.state.lock().await;
1535                    let block_observed = state.last_block_height.is_some()
1536                        || rest_by_node.values().any(rest_has_block);
1537                    if block_observed {
1538                        return Ok(WaitReadyResponse {
1539                            network_name: network_name.to_string(),
1540                            ready: true,
1541                            node_count: network.node_count.load(Ordering::SeqCst),
1542                            rest: rest_by_node,
1543                            last_block_height: state.last_block_height,
1544                        });
1545                    }
1546                }
1547            }
1548
1549            if Instant::now() >= deadline {
1550                return Ok(WaitReadyResponse {
1551                    network_name: network_name.to_string(),
1552                    ready: false,
1553                    node_count: network.node_count.load(Ordering::SeqCst),
1554                    rest: HashMap::new(),
1555                    last_block_height: None,
1556                });
1557            }
1558
1559            tokio::time::sleep(Duration::from_millis(500)).await;
1560        }
1561    }
1562
1563    async fn despawn_network(&self, network_name: &str, purge: bool) -> Result<DespawnResponse> {
1564        let managed = {
1565            let mut guard = self.managed.lock().await;
1566            guard.remove(network_name)
1567        };
1568
1569        let Some(network) = managed else {
1570            return Err(anyhow!("network '{}' is not managed", network_name));
1571        };
1572
1573        network.stop().await?;
1574        if purge {
1575            assets::teardown(&network.layout).await?;
1576        }
1577
1578        Ok(DespawnResponse {
1579            network_name: network_name.to_string(),
1580            purged: purge,
1581        })
1582    }
1583
1584    async fn list_networks(&self) -> Result<ListNetworksResponse> {
1585        let discovered = discover_network_names(&self.assets_root).await?;
1586        let managed_snapshot = {
1587            let guard = self.managed.lock().await;
1588            guard
1589                .iter()
1590                .map(|(name, network)| (name.clone(), Arc::clone(network)))
1591                .collect::<Vec<_>>()
1592        };
1593
1594        let mut rows = Vec::new();
1595
1596        for name in &discovered {
1597            if let Some((_, network)) = managed_snapshot.iter().find(|(n, _)| n == name) {
1598                rows.push(NetworkRow {
1599                    network_name: name.clone(),
1600                    discovered: true,
1601                    managed: true,
1602                    running: network.is_running().await,
1603                    node_count: Some(network.node_count.load(Ordering::SeqCst)),
1604                });
1605            } else {
1606                let layout = AssetsLayout::new(self.assets_root.clone(), name.clone());
1607                rows.push(NetworkRow {
1608                    network_name: name.clone(),
1609                    discovered: true,
1610                    managed: false,
1611                    running: false,
1612                    node_count: layout.count_nodes().await.ok(),
1613                });
1614            }
1615        }
1616
1617        for (name, network) in managed_snapshot {
1618            if !discovered.iter().any(|candidate| candidate == &name) {
1619                rows.push(NetworkRow {
1620                    network_name: name,
1621                    discovered: false,
1622                    managed: true,
1623                    running: network.is_running().await,
1624                    node_count: Some(network.node_count.load(Ordering::SeqCst)),
1625                });
1626            }
1627        }
1628
1629        rows.sort_by(|a, b| a.network_name.cmp(&b.network_name));
1630
1631        Ok(ListNetworksResponse { networks: rows })
1632    }
1633
1634    async fn managed_processes(
1635        &self,
1636        request: ManagedProcessesRequest,
1637    ) -> Result<ManagedProcessesResponse> {
1638        let network = self.get_network(&request.network_name).await?;
1639        let running_only = request.running_only.unwrap_or(true);
1640        let process_name = request
1641            .process_name
1642            .as_deref()
1643            .map(str::trim)
1644            .filter(|name| !name.is_empty())
1645            .map(ToString::to_string);
1646
1647        let process_name_lc = process_name.as_ref().map(|name| name.to_ascii_lowercase());
1648        let state = network.state.lock().await;
1649        let mut processes = Vec::new();
1650        for process in &state.processes {
1651            if let Some(name_lc) = process_name_lc.as_deref()
1652                && !process.id.to_ascii_lowercase().contains(name_lc)
1653            {
1654                continue;
1655            }
1656
1657            let pid = process.current_pid();
1658            let running = matches!(process.last_status, ProcessStatus::Running)
1659                && pid.is_some_and(is_pid_running);
1660            if running_only && !running {
1661                continue;
1662            }
1663
1664            processes.push(ManagedProcessRow {
1665                id: process.id.clone(),
1666                node_id: process.node_id,
1667                kind: process_kind_name(&process.kind).to_string(),
1668                pid,
1669                running,
1670                last_status: process_status_name(&process.last_status).to_string(),
1671                command: process.command.clone(),
1672                args: process.args.clone(),
1673                cwd: process.cwd.clone(),
1674                stdout_path: process.stdout_path.clone(),
1675                stderr_path: process.stderr_path.clone(),
1676            });
1677        }
1678
1679        Ok(ManagedProcessesResponse {
1680            network_name: request.network_name,
1681            running_only,
1682            process_name,
1683            processes,
1684        })
1685    }
1686
1687    async fn get_network(&self, network_name: &str) -> Result<Arc<ManagedNetwork>> {
1688        let guard = self.managed.lock().await;
1689        guard.get(network_name).cloned().ok_or_else(|| {
1690            anyhow!(
1691                "network '{}' is not managed; call spawn_network first",
1692                network_name
1693            )
1694        })
1695    }
1696
1697    async fn stop_all_networks(&self) -> Result<()> {
1698        let managed = {
1699            let mut guard = self.managed.lock().await;
1700            guard
1701                .drain()
1702                .map(|(_, network)| network)
1703                .collect::<Vec<_>>()
1704        };
1705
1706        let mut errors = Vec::new();
1707        for network in managed {
1708            if let Err(err) = network.stop().await {
1709                errors.push(err.to_string());
1710            }
1711        }
1712
1713        if errors.is_empty() {
1714            Ok(())
1715        } else {
1716            Err(anyhow!(errors.join("\n")))
1717        }
1718    }
1719
1720    async fn raw_rpc_query(
1721        &self,
1722        node_id: u32,
1723        method: &str,
1724        params: Option<Value>,
1725    ) -> Result<Value> {
1726        let endpoint = assets::rpc_endpoint(node_id);
1727        let payload = match params {
1728            Some(params) => json!({
1729                "id": 1,
1730                "jsonrpc": "2.0",
1731                "method": method,
1732                "params": params,
1733            }),
1734            None => json!({
1735                "id": 1,
1736                "jsonrpc": "2.0",
1737                "method": method,
1738            }),
1739        };
1740
1741        let response = self
1742            .http
1743            .post(endpoint)
1744            .json(&payload)
1745            .send()
1746            .await?
1747            .error_for_status()?;
1748        Ok(response.json::<Value>().await?)
1749    }
1750
1751    async fn list_derived_accounts(&self, network_name: &str) -> Result<Vec<DerivedAccountRow>> {
1752        let layout = AssetsLayout::new(self.assets_root.clone(), network_name.to_string());
1753        let csv = assets::derived_accounts_summary(&layout)
1754            .await
1755            .ok_or_else(|| {
1756                anyhow!(
1757                    "missing derived-accounts.csv for network '{}'",
1758                    network_name
1759                )
1760            })?;
1761
1762        let seed = {
1763            let managed = self.managed.lock().await;
1764            managed
1765                .get(network_name)
1766                .map(|network| Arc::clone(&network.seed))
1767        };
1768
1769        parse_derived_accounts_csv(&csv, seed).await
1770    }
1771
1772    async fn derived_account_for_path(
1773        &self,
1774        network: &Arc<ManagedNetwork>,
1775        path: &str,
1776    ) -> Result<DerivedSigner> {
1777        let material =
1778            assets::derive_account_from_seed_path(Arc::clone(&network.seed), path).await?;
1779        let secret_key = SecretKey::from_pem(&material.secret_key_pem)?;
1780        Ok(DerivedSigner {
1781            public_key_hex: material.public_key_hex,
1782            account_hash: material.account_hash,
1783            secret_key,
1784        })
1785    }
1786}
1787
1788async fn handle_managed_control_stream(
1789    mut stream: tokio::net::UnixStream,
1790    network: Arc<ManagedNetwork>,
1791) {
1792    let mut request_bytes = Vec::new();
1793    let response = match stream.read_to_end(&mut request_bytes).await {
1794        Ok(_) => match serde_json::from_slice::<ControlRequest>(&request_bytes) {
1795            Ok(request) => handle_managed_control_request(&network, request).await,
1796            Err(err) => ControlResponse::Error {
1797                error: format!("invalid control request: {}", err),
1798            },
1799        },
1800        Err(err) => ControlResponse::Error {
1801            error: format!("failed to read control request: {}", err),
1802        },
1803    };
1804
1805    let response_bytes = serde_json::to_vec(&response).unwrap_or_else(|err| {
1806        format!(
1807            "{{\"status\":\"error\",\"error\":\"failed to serialize control response: {}\"}}",
1808            err
1809        )
1810        .into_bytes()
1811    });
1812    let _ = stream.write_all(&response_bytes).await;
1813    let _ = stream.shutdown().await;
1814}
1815
1816async fn spawn_added_sse_collectors(network: &Arc<ManagedNetwork>, node_ids: &[u32]) {
1817    let mut tasks = Vec::new();
1818    for node_id in node_ids {
1819        let node_id = *node_id;
1820        let endpoint = assets::sse_endpoint(node_id);
1821        let network = Arc::clone(network);
1822        let task = tokio::spawn(async move {
1823            run_sse_listener(network, node_id, endpoint).await;
1824        });
1825        tasks.push(task);
1826    }
1827    let mut guard = network.sse_tasks.lock().await;
1828    guard.extend(tasks);
1829}
1830
1831async fn handle_managed_control_request(
1832    network: &Arc<ManagedNetwork>,
1833    request: ControlRequest,
1834) -> ControlResponse {
1835    match request {
1836        ControlRequest::RuntimeStatus => {
1837            let state = network.state.lock().await;
1838            ControlResponse::Ok {
1839                result: ControlResult::RuntimeStatus {
1840                    running_node_ids: running_node_ids(&state),
1841                    last_block_height: state.last_block_height,
1842                },
1843            }
1844        }
1845        ControlRequest::StageProtocol {
1846            asset,
1847            custom_asset,
1848            asset_name,
1849            protocol_version,
1850            activation_point,
1851            chainspec_overrides,
1852            restart_sidecars,
1853            rust_log,
1854        } => {
1855            let asset_selector = match stage_asset_selector(
1856                asset.as_deref(),
1857                custom_asset.as_deref(),
1858                asset_name.as_deref(),
1859            ) {
1860                Ok(asset_selector) => asset_selector,
1861                Err(err) => {
1862                    return ControlResponse::Error {
1863                        error: err.to_string(),
1864                    };
1865                }
1866            };
1867            let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1868            let staged = assets::stage_protocol(
1869                &network.layout,
1870                &StageProtocolOptions {
1871                    asset: asset_selector,
1872                    protocol_version,
1873                    activation_point,
1874                    chainspec_overrides,
1875                },
1876            )
1877            .await;
1878            let staged = match staged {
1879                Ok(staged) => staged,
1880                Err(err) => {
1881                    return ControlResponse::Error {
1882                        error: err.to_string(),
1883                    };
1884                }
1885            };
1886
1887            let mut restarted_sidecars = Vec::new();
1888            if restart_sidecars {
1889                let rust_log = rust_log.unwrap_or_else(|| network.rust_log.clone());
1890                let mut state = network.state.lock().await;
1891                match process::restart_sidecars(&network.layout, &mut state, &rust_log).await {
1892                    Ok(restarted) => {
1893                        for proc in restarted {
1894                            restarted_sidecars.push(proc.record.node_id);
1895                        }
1896                    }
1897                    Err(err) => {
1898                        return ControlResponse::Error {
1899                            error: err.to_string(),
1900                        };
1901                    }
1902                }
1903            }
1904
1905            ControlResponse::Ok {
1906                result: ControlResult::StageProtocol {
1907                    live_mode: true,
1908                    staged_nodes: staged.staged_nodes,
1909                    restarted_sidecars,
1910                },
1911            }
1912        }
1913        ControlRequest::AddNodes { count } => {
1914            let _asset_mutation_guard = network.asset_mutation_lock.lock().await;
1915            let added = match assets::add_nodes(
1916                &network.layout,
1917                &AddNodesOptions {
1918                    count,
1919                    seed: Arc::clone(&network.seed),
1920                },
1921            )
1922            .await
1923            {
1924                Ok(added) => added,
1925                Err(err) => {
1926                    return ControlResponse::Error {
1927                        error: err.to_string(),
1928                    };
1929                }
1930            };
1931
1932            let started = {
1933                let mut state = network.state.lock().await;
1934                process::start_added_nodes(
1935                    &network.layout,
1936                    &mut state,
1937                    &added.added_node_ids,
1938                    added.total_nodes,
1939                    &network.rust_log,
1940                    Arc::clone(&network.seed),
1941                )
1942                .await
1943            };
1944            let started = match started {
1945                Ok(started) => started,
1946                Err(err) => {
1947                    let error =
1948                        rollback_added_nodes_after_start_error(&network.layout, &added, err).await;
1949                    return ControlResponse::Error { error };
1950                }
1951            };
1952            let process_ids = started
1953                .iter()
1954                .map(|proc| proc.record.id.clone())
1955                .collect::<Vec<_>>();
1956            let started_processes = started.len() as u32;
1957            network
1958                .node_count
1959                .store(added.total_nodes, Ordering::SeqCst);
1960            spawn_added_sse_collectors(network, &added.added_node_ids).await;
1961            spawn_pid_sync_tasks_for_ids(Arc::clone(&network.state), &process_ids).await;
1962
1963            ControlResponse::Ok {
1964                result: ControlResult::AddNodes {
1965                    added_node_ids: added.added_node_ids,
1966                    total_nodes: added.total_nodes,
1967                    started_processes,
1968                },
1969            }
1970        }
1971    }
1972}
1973
1974async fn rollback_added_nodes_after_start_error(
1975    layout: &AssetsLayout,
1976    added: &assets::AddNodesResult,
1977    err: anyhow::Error,
1978) -> String {
1979    match assets::rollback_added_nodes(layout, &added.added_node_ids).await {
1980        Ok(()) => err.to_string(),
1981        Err(rollback_err) => format!(
1982            "{}; failed to roll back added node assets: {}",
1983            err, rollback_err
1984        ),
1985    }
1986}
1987
1988async fn read_current_era_from_status(node_id: u32) -> Result<Option<u64>> {
1989    let value = fetch_rest_status(node_id).await?;
1990    Ok(value
1991        .get("last_added_block_info")
1992        .and_then(|info| info.get("era_id"))
1993        .and_then(|era_id| {
1994            era_id
1995                .as_u64()
1996                .or_else(|| era_id.as_str().and_then(|raw| raw.parse::<u64>().ok()))
1997        }))
1998}
1999
2000#[derive(Debug)]
2001struct SseStore {
2002    sequence: AtomicU64,
2003    events: Mutex<VecDeque<SseRecord>>,
2004    notify: Notify,
2005    capacity: usize,
2006}
2007
2008impl SseStore {
2009    fn new(capacity: usize) -> Self {
2010        Self {
2011            sequence: AtomicU64::new(0),
2012            events: Mutex::new(VecDeque::new()),
2013            notify: Notify::new(),
2014            capacity,
2015        }
2016    }
2017
2018    async fn latest_sequence(&self) -> u64 {
2019        self.sequence.load(Ordering::SeqCst)
2020    }
2021
2022    async fn push(&self, node_id: u32, event: SseEvent) {
2023        let sequence = self.sequence.fetch_add(1, Ordering::SeqCst) + 1;
2024        let event_type = sse_event_type(&event).to_string();
2025        let payload = sse_event_payload(&event);
2026
2027        let record = SseRecord {
2028            sequence,
2029            timestamp_rfc3339: timestamp_prefix(),
2030            node_id,
2031            event_type,
2032            payload,
2033        };
2034
2035        let mut guard = self.events.lock().await;
2036        guard.push_back(record);
2037        while guard.len() > self.capacity {
2038            let _ = guard.pop_front();
2039        }
2040        drop(guard);
2041
2042        self.notify.notify_waiters();
2043    }
2044
2045    async fn wait_next(
2046        &self,
2047        after_sequence: u64,
2048        filter: &SseFilter,
2049        timeout: Duration,
2050    ) -> Result<SseRecord> {
2051        let deadline = Instant::now() + timeout;
2052        loop {
2053            if let Some(record) = self.find_first_after(after_sequence, filter).await {
2054                return Ok(record);
2055            }
2056
2057            if Instant::now() >= deadline {
2058                return Err(anyhow!("timed out waiting for SSE event"));
2059            }
2060
2061            let remaining = deadline.saturating_duration_since(Instant::now());
2062            tokio::time::timeout(remaining, self.notify.notified()).await?;
2063        }
2064    }
2065
2066    async fn find_first_after(&self, after_sequence: u64, filter: &SseFilter) -> Option<SseRecord> {
2067        let guard = self.events.lock().await;
2068        guard
2069            .iter()
2070            .find(|event| event.sequence > after_sequence && filter.matches(event))
2071            .cloned()
2072    }
2073
2074    async fn history(
2075        &self,
2076        before_sequence: Option<u64>,
2077        limit: usize,
2078        filter: &SseFilter,
2079    ) -> SseHistoryPage {
2080        let before = before_sequence.unwrap_or(u64::MAX);
2081        let guard = self.events.lock().await;
2082
2083        let mut matched = guard
2084            .iter()
2085            .filter(|event| event.sequence < before)
2086            .filter(|event| filter.matches(event))
2087            .cloned()
2088            .collect::<Vec<_>>();
2089
2090        let total = matched.len();
2091
2092        if matched.len() > limit {
2093            matched = matched.split_off(matched.len() - limit);
2094        }
2095
2096        let next_before_sequence = matched.first().map(|event| event.sequence);
2097        SseHistoryPage {
2098            total_matching: total,
2099            returned: matched.len(),
2100            next_before_sequence,
2101            events: matched,
2102        }
2103    }
2104}
2105
2106#[derive(Debug, Default)]
2107struct SseFilter {
2108    include_event_types: Vec<String>,
2109    exclude_event_types: Vec<String>,
2110}
2111
2112impl SseFilter {
2113    fn matches(&self, event: &SseRecord) -> bool {
2114        let include = if self.include_event_types.is_empty() {
2115            true
2116        } else {
2117            self.include_event_types
2118                .iter()
2119                .any(|name| name == &event.event_type)
2120        };
2121        if !include {
2122            return false;
2123        }
2124        !self
2125            .exclude_event_types
2126            .iter()
2127            .any(|name| name == &event.event_type)
2128    }
2129}
2130
2131#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2132struct SseRecord {
2133    sequence: u64,
2134    timestamp_rfc3339: String,
2135    node_id: u32,
2136    event_type: String,
2137    payload: Value,
2138}
2139
2140#[derive(Debug, Clone, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2141struct SseHistoryPage {
2142    total_matching: usize,
2143    returned: usize,
2144    next_before_sequence: Option<u64>,
2145    events: Vec<SseRecord>,
2146}
2147
2148#[derive(Debug)]
2149struct DerivedSigner {
2150    public_key_hex: String,
2151    account_hash: String,
2152    secret_key: SecretKey,
2153}
2154
2155#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2156struct SpawnNetworkRequest {
2157    network_name: Option<String>,
2158    asset: Option<String>,
2159    custom_asset: Option<String>,
2160    protocol_version: Option<String>,
2161    node_count: Option<u32>,
2162    users: Option<u32>,
2163    delay: Option<u64>,
2164    log_level: Option<String>,
2165    node_log_format: Option<String>,
2166    seed: Option<String>,
2167    force_setup: Option<bool>,
2168}
2169
2170#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2171struct SpawnNetworkResponse {
2172    network_name: String,
2173    node_count: u32,
2174    managed: bool,
2175    already_running: bool,
2176    forced_setup: bool,
2177}
2178
2179#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2180struct WaitNetworkReadyRequest {
2181    network_name: String,
2182    timeout_seconds: Option<u64>,
2183}
2184
2185#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2186struct WaitReadyResponse {
2187    network_name: String,
2188    ready: bool,
2189    node_count: u32,
2190    rest: HashMap<u32, Value>,
2191    last_block_height: Option<u64>,
2192}
2193
2194#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2195struct StageProtocolRequest {
2196    network_name: String,
2197    asset: Option<String>,
2198    custom_asset: Option<String>,
2199    asset_name: Option<String>,
2200    protocol_version: String,
2201    activation_point: u64,
2202}
2203
2204#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2205struct StageProtocolResponse {
2206    network_name: String,
2207    live_mode: bool,
2208    staged_nodes: u32,
2209    restarted_sidecars: Vec<u32>,
2210}
2211
2212#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2213struct DespawnNetworkRequest {
2214    network_name: String,
2215    purge: Option<bool>,
2216}
2217
2218#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2219struct DespawnResponse {
2220    network_name: String,
2221    purged: bool,
2222}
2223
2224#[derive(Debug, Deserialize, Default, rmcp::schemars::JsonSchema)]
2225struct ListNetworksRequest {}
2226
2227#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2228struct ListNetworksResponse {
2229    networks: Vec<NetworkRow>,
2230}
2231
2232#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2233struct NetworkRow {
2234    network_name: String,
2235    discovered: bool,
2236    managed: bool,
2237    running: bool,
2238    node_count: Option<u32>,
2239}
2240
2241#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2242struct ManagedProcessesRequest {
2243    network_name: String,
2244    process_name: Option<String>,
2245    running_only: Option<bool>,
2246}
2247
2248#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2249struct ManagedProcessesResponse {
2250    network_name: String,
2251    running_only: bool,
2252    process_name: Option<String>,
2253    processes: Vec<ManagedProcessRow>,
2254}
2255
2256#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2257struct ManagedProcessRow {
2258    id: String,
2259    node_id: u32,
2260    kind: String,
2261    pid: Option<u32>,
2262    running: bool,
2263    last_status: String,
2264    command: String,
2265    args: Vec<String>,
2266    cwd: String,
2267    stdout_path: String,
2268    stderr_path: String,
2269}
2270
2271#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2272struct NodeScopedRequest {
2273    network_name: String,
2274    node_id: u32,
2275}
2276
2277#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2278struct RpcQueryRequest {
2279    network_name: String,
2280    node_id: u32,
2281    method: String,
2282    params: Option<Value>,
2283}
2284
2285#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2286struct RpcQueryBalanceRequest {
2287    network_name: String,
2288    node_id: u32,
2289    purse_identifier: String,
2290    block_id: Option<String>,
2291    state_root_hash: Option<String>,
2292}
2293
2294#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2295struct RpcQueryGlobalStateRequest {
2296    network_name: String,
2297    node_id: u32,
2298    key: String,
2299    path: Option<Vec<String>>,
2300    block_id: Option<String>,
2301    state_root_hash: Option<String>,
2302}
2303
2304#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2305struct CurrentBlockRequest {
2306    network_name: String,
2307    node_id: u32,
2308    block_id: Option<String>,
2309}
2310
2311#[derive(Debug, Clone, Copy, Deserialize, Serialize, rmcp::schemars::JsonSchema)]
2312#[serde(rename_all = "lowercase")]
2313enum NodeLogStream {
2314    Stdout,
2315    Stderr,
2316}
2317
2318#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2319struct GetNodeLogsRequest {
2320    network_name: String,
2321    node_id: u32,
2322    stream: NodeLogStream,
2323    limit: Option<usize>,
2324    before_line: Option<usize>,
2325}
2326
2327#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2328struct LogLine {
2329    line_number: usize,
2330    content: String,
2331}
2332
2333#[derive(Debug, Serialize, rmcp::schemars::JsonSchema)]
2334struct LogPage {
2335    path: String,
2336    total_lines: usize,
2337    returned: usize,
2338    next_before_line: Option<usize>,
2339    lines: Vec<LogLine>,
2340}
2341
2342#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2343struct WaitNextSseRequest {
2344    network_name: String,
2345    include_event_types: Option<Vec<String>>,
2346    exclude_event_types: Option<Vec<String>>,
2347    after_sequence: Option<u64>,
2348    timeout_seconds: Option<u64>,
2349}
2350
2351#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2352struct SseHistoryRequest {
2353    network_name: String,
2354    include_event_types: Option<Vec<String>>,
2355    exclude_event_types: Option<Vec<String>>,
2356    before_sequence: Option<u64>,
2357    limit: Option<usize>,
2358}
2359
2360#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2361struct ListDerivedAccountsRequest {
2362    network_name: String,
2363}
2364
2365#[derive(Debug, Serialize, Deserialize, rmcp::schemars::JsonSchema)]
2366struct DerivedAccountRow {
2367    kind: String,
2368    name: String,
2369    key_type: String,
2370    derivation: String,
2371    path: String,
2372    account_hash: String,
2373    balance: String,
2374    public_key: Option<String>,
2375}
2376
2377#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2378struct SendTransactionSignedRequest {
2379    network_name: String,
2380    node_id: u32,
2381    signer_path: String,
2382    transaction: Value,
2383}
2384
2385#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2386struct MakeTransactionPackageCallRequest {
2387    network_name: String,
2388    node_id: u32,
2389    transaction_package_name: String,
2390    transaction_package_version: Option<EntityVersion>,
2391    session_entry_point: String,
2392    #[serde(alias = "session_args_json")]
2393    session_args: Option<Value>,
2394    signer_path: Option<String>,
2395    initiator_public_key: Option<String>,
2396    chain_name: Option<String>,
2397    ttl_millis: Option<u64>,
2398    gas_price_tolerance: Option<u8>,
2399    payment_amount: Option<u64>,
2400    runtime_transferred_value: Option<u64>,
2401    runtime_seed_hex: Option<String>,
2402}
2403
2404#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2405struct MakeTransactionContractCallRequest {
2406    network_name: String,
2407    node_id: u32,
2408    transaction_contract_hash: String,
2409    session_entry_point: String,
2410    #[serde(alias = "session_args_json")]
2411    session_args: Option<Value>,
2412    signer_path: Option<String>,
2413    initiator_public_key: Option<String>,
2414    chain_name: Option<String>,
2415    ttl_millis: Option<u64>,
2416    gas_price_tolerance: Option<u8>,
2417    payment_amount: Option<u64>,
2418    runtime_transferred_value: Option<u64>,
2419    runtime_seed_hex: Option<String>,
2420}
2421
2422#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2423struct MakeTransactionSessionWasmRequest {
2424    network_name: String,
2425    node_id: u32,
2426    wasm_path: String,
2427    #[serde(alias = "session_args_json")]
2428    session_args: Option<Value>,
2429    is_install_upgrade: Option<bool>,
2430    signer_path: Option<String>,
2431    initiator_public_key: Option<String>,
2432    chain_name: Option<String>,
2433    ttl_millis: Option<u64>,
2434    gas_price_tolerance: Option<u8>,
2435    payment_amount: Option<u64>,
2436    runtime_transferred_value: Option<u64>,
2437    runtime_seed_hex: Option<String>,
2438}
2439
2440#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2441struct SendSessionWasmRequest {
2442    network_name: String,
2443    node_id: u32,
2444    signer_path: String,
2445    wasm_path: String,
2446    #[serde(alias = "session_args_json")]
2447    session_args: Option<Value>,
2448    chain_name: Option<String>,
2449    is_install_upgrade: Option<bool>,
2450    ttl_millis: Option<u64>,
2451    gas_price_tolerance: Option<u8>,
2452    payment_amount: Option<u64>,
2453    runtime_transferred_value: Option<u64>,
2454    runtime_seed_hex: Option<String>,
2455}
2456
2457#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2458struct TransferTokensRequest {
2459    network_name: String,
2460    node_id: u32,
2461    from_path: String,
2462    to_path: String,
2463    amount: String,
2464    transfer_id: Option<u64>,
2465    chain_name: Option<String>,
2466    ttl_millis: Option<u64>,
2467    gas_price_tolerance: Option<u8>,
2468    payment_amount: Option<u64>,
2469}
2470
2471#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2472struct WaitTransactionRequest {
2473    network_name: String,
2474    node_id: u32,
2475    transaction_hash: String,
2476    timeout_seconds: Option<u64>,
2477    poll_interval_millis: Option<u64>,
2478}
2479
2480#[derive(Debug, Deserialize, rmcp::schemars::JsonSchema)]
2481struct GetTransactionRequest {
2482    network_name: String,
2483    node_id: u32,
2484    transaction_hash: String,
2485}
2486
2487#[derive(Debug, Deserialize)]
2488#[serde(rename_all = "snake_case")]
2489struct RestStatusProbe {
2490    reactor_state: Option<String>,
2491}
2492
2493fn to_mcp_error(err: impl std::fmt::Display) -> ErrorData {
2494    ErrorData::internal_error(err.to_string(), None)
2495}
2496
2497fn internal_serde_error(err: serde_json::Error) -> ErrorData {
2498    ErrorData::internal_error(format!("serde error: {}", err), None)
2499}
2500
2501fn ok_value(value: Value) -> rmcp::model::CallToolResult {
2502    rmcp::model::CallToolResult::structured(value)
2503}
2504
2505fn parse_transaction_json(value: Value) -> std::result::Result<Transaction, ErrorData> {
2506    if let Value::String(encoded) = &value {
2507        let _ = encoded;
2508        return Err(ErrorData::invalid_params(
2509            "invalid transaction payload: encoded JSON strings are not supported. Provide transaction as a typed JSON object",
2510            None,
2511        ));
2512    }
2513
2514    if let Ok(transaction) = serde_json::from_value::<Transaction>(value.clone()) {
2515        return Ok(transaction);
2516    }
2517
2518    if let Some(inner) = value.get("transaction") {
2519        return parse_transaction_json(inner.clone());
2520    }
2521
2522    Err(ErrorData::invalid_params(
2523        "failed to parse transaction JSON; expected Transaction object or { transaction: Transaction }, with typed JSON values only.",
2524        None,
2525    ))
2526}
2527
2528fn parse_transaction_hash_input(input: &str) -> Result<TransactionHash> {
2529    let value = input.trim();
2530    if value.is_empty() {
2531        return Err(anyhow!("transaction_hash must not be empty"));
2532    }
2533
2534    let unwrapped = value
2535        .strip_prefix("transaction-v1-hash(")
2536        .and_then(|inner| inner.strip_suffix(')'))
2537        .or_else(|| {
2538            value
2539                .strip_prefix("deploy-hash(")
2540                .and_then(|inner| inner.strip_suffix(')'))
2541        })
2542        .unwrap_or(value);
2543
2544    if unwrapped.contains("..") {
2545        return Err(anyhow!(
2546            "transaction_hash appears abbreviated; provide full hex digest"
2547        ));
2548    }
2549
2550    let normalized = unwrapped.strip_prefix("0x").unwrap_or(unwrapped);
2551    let digest = Digest::from_hex(normalized)
2552        .map_err(|err| anyhow!("failed to parse transaction hash digest: {}", err))?;
2553    Ok(TransactionHash::from(TransactionV1Hash::from(digest)))
2554}
2555
2556fn mcp_rpc_client(network_name: &str, node_id: u32) -> Result<CasperClient> {
2557    CasperClient::new(
2558        network_name.to_string(),
2559        vec![assets::rpc_endpoint(node_id)],
2560    )
2561    .map_err(|err| {
2562        anyhow!(
2563            "failed to initialize rpc client for node {}: {}",
2564            node_id,
2565            err
2566        )
2567    })
2568}
2569
2570fn extract_rpc_result(response: Value) -> Result<Value> {
2571    if let Some(error) = response.get("error") {
2572        return Err(anyhow!("rpc request failed: {}", error));
2573    }
2574    response
2575        .get("result")
2576        .cloned()
2577        .ok_or_else(|| anyhow!("rpc response missing result field"))
2578}
2579
2580async fn fetch_block_result(
2581    manager: &NetworkManager,
2582    network_name: &str,
2583    node_id: u32,
2584    block_id: Option<&str>,
2585) -> Result<Value> {
2586    let block_id = normalize_optional_identifier(block_id);
2587    if block_id.is_empty() {
2588        let rpc = mcp_rpc_client(network_name, node_id)?;
2589        let result = rpc.get_block().await?;
2590        return serde_json::to_value(result).map_err(Into::into);
2591    }
2592
2593    let block_identifier = parse_block_identifier_value(&block_id)?;
2594    let response = manager
2595        .raw_rpc_query(
2596            node_id,
2597            "chain_get_block",
2598            Some(json!({
2599                "block_identifier": block_identifier
2600            })),
2601        )
2602        .await?;
2603    extract_rpc_result(response)
2604}
2605
2606fn build_query_global_state_params(
2607    key: &str,
2608    path: Vec<String>,
2609    block_id: &str,
2610    state_root_hash: &str,
2611) -> Result<Value> {
2612    let key = parse_query_key(key)?;
2613    let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2614    let mut params = serde_json::Map::new();
2615    params.insert("key".to_string(), Value::String(key));
2616    params.insert(
2617        "path".to_string(),
2618        Value::Array(path.into_iter().map(Value::String).collect()),
2619    );
2620    if let Some(state_identifier) = state_identifier {
2621        params.insert("state_identifier".to_string(), state_identifier);
2622    }
2623    Ok(Value::Object(params))
2624}
2625
2626fn build_query_balance_params(
2627    purse_identifier: &str,
2628    block_id: &str,
2629    state_root_hash: &str,
2630) -> Result<Value> {
2631    let purse_identifier = parse_purse_identifier(purse_identifier)?;
2632    let state_identifier = parse_state_identifier(block_id, state_root_hash)?;
2633    let mut params = serde_json::Map::new();
2634    params.insert("purse_identifier".to_string(), purse_identifier);
2635    if let Some(state_identifier) = state_identifier {
2636        params.insert("state_identifier".to_string(), state_identifier);
2637    }
2638    Ok(Value::Object(params))
2639}
2640
2641fn parse_state_identifier(block_id: &str, state_root_hash: &str) -> Result<Option<Value>> {
2642    let block_id = block_id.trim();
2643    if !block_id.is_empty() {
2644        if block_id.len() == Digest::LENGTH * 2 {
2645            Digest::from_hex(block_id)
2646                .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2647            return Ok(Some(json!({
2648                "BlockHash": block_id,
2649            })));
2650        }
2651
2652        let height = block_id
2653            .parse::<u64>()
2654            .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2655        return Ok(Some(json!({
2656            "BlockHeight": height,
2657        })));
2658    }
2659
2660    let state_root_hash = state_root_hash.trim();
2661    if state_root_hash.is_empty() {
2662        return Ok(None);
2663    }
2664    Digest::from_hex(state_root_hash)
2665        .map_err(|err| anyhow!("invalid state_root_hash digest: {}", err))?;
2666    Ok(Some(json!({
2667        "StateRootHash": state_root_hash,
2668    })))
2669}
2670
2671fn parse_block_identifier_value(block_id: &str) -> Result<Value> {
2672    let block_id = block_id.trim();
2673    if block_id.is_empty() {
2674        return Err(anyhow!("block identifier must not be empty"));
2675    }
2676    if block_id.len() == Digest::LENGTH * 2 {
2677        Digest::from_hex(block_id)
2678            .map_err(|err| anyhow!("invalid block hash digest in block_id: {}", err))?;
2679        Ok(json!({
2680            "Hash": block_id,
2681        }))
2682    } else {
2683        let height = block_id
2684            .parse::<u64>()
2685            .map_err(|err| anyhow!("invalid block height in block_id: {}", err))?;
2686        Ok(json!({
2687            "Height": height,
2688        }))
2689    }
2690}
2691
2692fn parse_query_key(key: &str) -> Result<String> {
2693    let key = key.trim();
2694    if key.is_empty() {
2695        return Err(anyhow!("key must not be empty"));
2696    }
2697    if let Ok(contract_hash) = ContractHash::from_formatted_str(key) {
2698        return Ok(Key::Hash(contract_hash.value()).to_formatted_string());
2699    }
2700    if let Ok(parsed) = Key::from_formatted_str(key) {
2701        return Ok(parsed.to_formatted_string());
2702    }
2703    if let Ok(public_key) = PublicKey::from_hex(key) {
2704        return Ok(Key::Account(public_key.to_account_hash()).to_formatted_string());
2705    }
2706    Err(anyhow!(
2707        "failed to parse key for query; expected formatted Key or public key hex"
2708    ))
2709}
2710
2711fn parse_contract_hash_for_invocation(input: &str) -> Result<AddressableEntityHash> {
2712    let input = input.trim();
2713    if input.is_empty() {
2714        return Err(anyhow!("transaction_contract_hash must not be empty"));
2715    }
2716
2717    if let Ok(contract_hash) = ContractHash::from_formatted_str(input) {
2718        return Ok(contract_hash.into());
2719    }
2720
2721    let digest_hex = input
2722        .strip_prefix("hash-")
2723        .or_else(|| input.strip_prefix("contract-hash-"))
2724        .unwrap_or(input);
2725    let bytes = hex_to_bytes(digest_hex)?;
2726    if bytes.len() != Digest::LENGTH {
2727        return Err(anyhow!(
2728            "transaction_contract_hash must be {} bytes",
2729            Digest::LENGTH
2730        ));
2731    }
2732    let mut hash = [0u8; Digest::LENGTH];
2733    hash.copy_from_slice(&bytes);
2734    Ok(ContractHash::new(hash).into())
2735}
2736
2737fn parse_purse_identifier(input: &str) -> Result<Value> {
2738    let input = input.trim();
2739    if input.is_empty() {
2740        return Err(anyhow!("purse_identifier must not be empty"));
2741    }
2742
2743    if input.starts_with("account-hash-") {
2744        casper_types::account::AccountHash::from_formatted_str(input)
2745            .map_err(|err| anyhow!("invalid account hash purse identifier: {}", err))?;
2746        return Ok(json!({
2747            "main_purse_under_account_hash": input,
2748        }));
2749    }
2750
2751    if input.starts_with("entity-") {
2752        return Ok(json!({
2753            "main_purse_under_entity_addr": input,
2754        }));
2755    }
2756
2757    if input.starts_with("uref-") {
2758        URef::from_formatted_str(input)
2759            .map_err(|err| anyhow!("invalid uref purse identifier: {}", err))?;
2760        return Ok(json!({
2761            "purse_uref": input,
2762        }));
2763    }
2764
2765    let public_key = PublicKey::from_hex(input)
2766        .map_err(|err| anyhow!("invalid public key purse identifier: {}", err))?;
2767    Ok(json!({
2768        "main_purse_under_public_key": public_key.to_hex_string(),
2769    }))
2770}
2771
2772fn build_pricing_mode(gas_price_tolerance: Option<u8>, payment_amount: Option<u64>) -> PricingMode {
2773    if let Some(payment_amount) = payment_amount {
2774        PricingMode::PaymentLimited {
2775            payment_amount,
2776            gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2777            standard_payment: true,
2778        }
2779    } else {
2780        PricingMode::Fixed {
2781            gas_price_tolerance: gas_price_tolerance.unwrap_or(5),
2782            additional_computation_factor: 0,
2783        }
2784    }
2785}
2786
2787fn parse_optional_seed_hex(seed: Option<&str>) -> Result<Option<[u8; 32]>> {
2788    let Some(seed) = seed else {
2789        return Ok(None);
2790    };
2791    let seed = seed.trim();
2792    if seed.is_empty() {
2793        return Ok(None);
2794    }
2795
2796    let cleaned = seed.strip_prefix("0x").unwrap_or(seed);
2797    let bytes = hex_to_bytes(cleaned)?;
2798    if bytes.len() != 32 {
2799        return Err(anyhow!("runtime_seed_hex must be exactly 32 bytes"));
2800    }
2801
2802    let mut out = [0u8; 32];
2803    out.copy_from_slice(&bytes);
2804    Ok(Some(out))
2805}
2806
2807fn normalize_optional_identifier(value: Option<&str>) -> String {
2808    value.map(str::trim).unwrap_or_default().to_string()
2809}
2810
2811async fn resolve_global_state_identifier(
2812    network_name: &str,
2813    node_id: u32,
2814    block_id: Option<&str>,
2815    state_root_hash: Option<&str>,
2816) -> Result<(String, String)> {
2817    let block_id = normalize_optional_identifier(block_id);
2818    let state_root_hash = normalize_optional_identifier(state_root_hash);
2819
2820    if !block_id.is_empty() || !state_root_hash.is_empty() {
2821        return Ok((block_id, state_root_hash));
2822    }
2823
2824    let rpc = mcp_rpc_client(network_name, node_id)?;
2825    let response = rpc.get_block().await?;
2826    let latest_block = response
2827        .block_with_signatures
2828        .ok_or_else(|| anyhow!("latest block was not returned by chain_get_block"))?;
2829
2830    Ok((latest_block.block.hash().to_hex_string(), String::new()))
2831}
2832
2833fn hex_to_bytes(input: &str) -> Result<Vec<u8>> {
2834    if !input.len().is_multiple_of(2) {
2835        return Err(anyhow!("hex string must have even length"));
2836    }
2837    let mut out = Vec::with_capacity(input.len() / 2);
2838    let mut chars = input.chars();
2839    while let (Some(hi), Some(lo)) = (chars.next(), chars.next()) {
2840        let byte = ((hex_nibble(hi)? as u8) << 4) | (hex_nibble(lo)? as u8);
2841        out.push(byte);
2842    }
2843    Ok(out)
2844}
2845
2846fn hex_nibble(ch: char) -> Result<u32> {
2847    ch.to_digit(16)
2848        .ok_or_else(|| anyhow!("invalid hex character '{}'", ch))
2849}
2850
2851async fn ensure_running_network(
2852    network: &Arc<ManagedNetwork>,
2853) -> std::result::Result<(), ErrorData> {
2854    if network.is_running().await {
2855        Ok(())
2856    } else {
2857        Err(ErrorData::resource_not_found(
2858            "network is not running; call spawn_network then wait_network_ready",
2859            None,
2860        ))
2861    }
2862}
2863
2864async fn check_rest_ready(network: &Arc<ManagedNetwork>) -> Result<HashMap<u32, Value>> {
2865    let node_ids = {
2866        let state = network.state.lock().await;
2867        state
2868            .processes
2869            .iter()
2870            .filter_map(|process| {
2871                if matches!(process.kind, ProcessKind::Node) {
2872                    Some(process.node_id)
2873                } else {
2874                    None
2875                }
2876            })
2877            .collect::<Vec<_>>()
2878    };
2879
2880    if node_ids.is_empty() {
2881        return Err(anyhow!("network has no node processes"));
2882    }
2883
2884    let mut by_node = HashMap::new();
2885    for node_id in node_ids {
2886        let value = fetch_rest_status(node_id).await?;
2887        let probe: RestStatusProbe = serde_json::from_value(value.clone())
2888            .with_context(|| format!("invalid /status payload for node {}", node_id))?;
2889        if probe.reactor_state.as_deref() != Some("Validate") {
2890            return Err(anyhow!("node {} reactor is not Validate", node_id));
2891        }
2892        by_node.insert(node_id, value);
2893    }
2894
2895    Ok(by_node)
2896}
2897
2898fn rest_has_block(value: &Value) -> bool {
2899    value
2900        .get("last_added_block_info")
2901        .and_then(|entry| entry.get("height"))
2902        .and_then(Value::as_u64)
2903        .is_some()
2904}
2905
2906async fn fetch_rest_status(node_id: u32) -> Result<Value> {
2907    let url = format!("{}/status", assets::rest_endpoint(node_id));
2908    let client = reqwest::Client::builder()
2909        .no_proxy()
2910        .timeout(Duration::from_secs(4))
2911        .build()?;
2912    let response = client.get(url).send().await?.error_for_status()?;
2913    Ok(response.json::<Value>().await?)
2914}
2915
2916async fn claim_block_hook(network: &Arc<ManagedNetwork>, block_hash: &str) -> bool {
2917    let mut last_block_hook_hash = network.last_block_hook_hash.lock().await;
2918    if last_block_hook_hash.as_deref() == Some(block_hash) {
2919        return false;
2920    }
2921    *last_block_hook_hash = Some(block_hash.to_string());
2922    true
2923}
2924
2925async fn run_sse_listener(network: Arc<ManagedNetwork>, node_id: u32, endpoint: String) {
2926    let mut backoff = ExponentialBackoff::default();
2927    let mut connection_version: Option<String> = None;
2928
2929    loop {
2930        if network.shutdown.load(Ordering::SeqCst) {
2931            return;
2932        }
2933
2934        let config = match ListenerConfig::builder()
2935            .with_endpoint(endpoint.clone())
2936            .build()
2937        {
2938            Ok(config) => config,
2939            Err(_) => {
2940                if !sleep_backoff(&mut backoff).await {
2941                    return;
2942                }
2943                continue;
2944            }
2945        };
2946
2947        let stream = match sse::listener(config).await {
2948            Ok(stream) => {
2949                backoff.reset();
2950                stream
2951            }
2952            Err(_) => {
2953                if !sleep_backoff(&mut backoff).await {
2954                    return;
2955                }
2956                continue;
2957            }
2958        };
2959
2960        futures::pin_mut!(stream);
2961        let mut failed = false;
2962
2963        while let Some(event) = stream.next().await {
2964            if network.shutdown.load(Ordering::SeqCst) {
2965                return;
2966            }
2967
2968            match event {
2969                Ok(event) => {
2970                    if let SseEvent::ApiVersion(version) = &event {
2971                        connection_version = Some(version.to_string());
2972                    }
2973                    if let SseEvent::BlockAdded { block_hash, block } = &event {
2974                        let _ = record_last_block_height(&network.state, block.height()).await;
2975                        assets::spawn_pending_post_genesis_hook(network.layout.clone());
2976                        if let Some(protocol_version) = connection_version.as_deref()
2977                            && claim_block_hook(&network, &block_hash.to_string()).await
2978                        {
2979                            assets::spawn_block_added_hook(
2980                                network.layout.clone(),
2981                                protocol_version.to_string(),
2982                                json!({
2983                                    "block_hash": block_hash.to_string(),
2984                                    "height": block.height(),
2985                                    "era_id": block.era_id().value(),
2986                                }),
2987                            );
2988                        }
2989                    }
2990                    network.sse_store.push(node_id, event).await;
2991                }
2992                Err(_) => {
2993                    failed = true;
2994                    break;
2995                }
2996            }
2997        }
2998
2999        connection_version = None;
3000        if failed && !sleep_backoff(&mut backoff).await {
3001            return;
3002        }
3003    }
3004}
3005
3006async fn record_last_block_height(state: &Arc<Mutex<State>>, height: u64) -> Result<()> {
3007    let mut state = state.lock().await;
3008    if state.last_block_height == Some(height) {
3009        return Ok(());
3010    }
3011    state.last_block_height = Some(height);
3012    state.touch().await
3013}
3014
3015async fn sleep_backoff(backoff: &mut ExponentialBackoff) -> bool {
3016    if let Some(delay) = backoff.next_backoff() {
3017        tokio::time::sleep(delay).await;
3018        true
3019    } else {
3020        false
3021    }
3022}
3023
3024fn sse_event_type(event: &SseEvent) -> &'static str {
3025    match event {
3026        SseEvent::ApiVersion(_) => "ApiVersion",
3027        SseEvent::DeployAccepted(_) => "DeployAccepted",
3028        SseEvent::BlockAdded { .. } => "BlockAdded",
3029        SseEvent::DeployProcessed(_) => "DeployProcessed",
3030        SseEvent::DeployExpired(_) => "DeployExpired",
3031        SseEvent::TransactionAccepted(_) => "TransactionAccepted",
3032        SseEvent::TransactionProcessed { .. } => "TransactionProcessed",
3033        SseEvent::TransactionExpired { .. } => "TransactionExpired",
3034        SseEvent::Fault { .. } => "Fault",
3035        SseEvent::FinalitySignature(_) => "FinalitySignature",
3036        SseEvent::Step { .. } => "Step",
3037        SseEvent::Shutdown => "Shutdown",
3038    }
3039}
3040
3041fn process_kind_name(kind: &ProcessKind) -> &'static str {
3042    match kind {
3043        ProcessKind::Node => "node",
3044        ProcessKind::Sidecar => "sidecar",
3045    }
3046}
3047
3048fn process_status_name(status: &ProcessStatus) -> &'static str {
3049    match status {
3050        ProcessStatus::Running => "running",
3051        ProcessStatus::Stopped => "stopped",
3052        ProcessStatus::Exited => "exited",
3053        ProcessStatus::Unknown => "unknown",
3054        ProcessStatus::Skipped => "skipped",
3055    }
3056}
3057
3058fn sse_event_payload(event: &SseEvent) -> Value {
3059    match event {
3060        SseEvent::ApiVersion(version) => json!({ "api_version": version.to_string() }),
3061        SseEvent::DeployAccepted(payload) => payload.clone(),
3062        SseEvent::BlockAdded { block_hash, block } => json!({
3063            "block_hash": block_hash.to_string(),
3064            "height": block.height(),
3065            "era_id": block.era_id().value(),
3066        }),
3067        SseEvent::DeployProcessed(payload) => payload.clone(),
3068        SseEvent::DeployExpired(payload) => payload.clone(),
3069        SseEvent::TransactionAccepted(transaction) => {
3070            json!({ "transaction_hash": transaction.hash().to_hex_string() })
3071        }
3072        SseEvent::TransactionProcessed {
3073            transaction_hash,
3074            execution_result,
3075            messages,
3076            ..
3077        } => json!({
3078            "transaction_hash": transaction_hash.to_hex_string(),
3079            "execution_result": execution_result,
3080            "messages": messages,
3081        }),
3082        SseEvent::TransactionExpired { transaction_hash } => json!({
3083            "transaction_hash": transaction_hash.to_hex_string(),
3084        }),
3085        SseEvent::Fault {
3086            era_id,
3087            public_key,
3088            timestamp,
3089        } => json!({
3090            "era_id": era_id.value(),
3091            "public_key": public_key.to_hex(),
3092            "timestamp": timestamp,
3093        }),
3094        SseEvent::FinalitySignature(signature) => json!({
3095            "block_hash": signature.block_hash().to_string(),
3096            "era_id": signature.era_id().value(),
3097            "signature": signature.signature().to_hex(),
3098        }),
3099        SseEvent::Step {
3100            era_id,
3101            execution_effects,
3102        } => json!({
3103            "era_id": era_id.value(),
3104            "execution_effects": execution_effects.get(),
3105        }),
3106        SseEvent::Shutdown => json!({}),
3107    }
3108}
3109
3110fn timestamp_prefix() -> String {
3111    time::OffsetDateTime::now_utc()
3112        .format(&time::format_description::well_known::Rfc3339)
3113        .unwrap_or_else(|_| "unknown-time".to_string())
3114}
3115
3116fn processes_running(state: &State) -> bool {
3117    if state.processes.is_empty() {
3118        return false;
3119    }
3120
3121    state.processes.iter().all(|process| {
3122        matches!(process.last_status, ProcessStatus::Running)
3123            && process.current_pid().is_some_and(is_pid_running)
3124    })
3125}
3126
3127fn running_node_ids(state: &State) -> Vec<u32> {
3128    let mut node_ids = std::collections::BTreeSet::new();
3129    for process in &state.processes {
3130        if !matches!(process.kind, ProcessKind::Node) {
3131            continue;
3132        }
3133        if !matches!(process.last_status, ProcessStatus::Running) {
3134            continue;
3135        }
3136        let Some(pid) = process.current_pid() else {
3137            continue;
3138        };
3139        if !is_pid_running(pid) {
3140            continue;
3141        }
3142        node_ids.insert(process.node_id);
3143    }
3144    node_ids.into_iter().collect()
3145}
3146
3147fn is_pid_running(pid: u32) -> bool {
3148    let pid = Pid::from_raw(pid as i32);
3149    match kill(pid, None) {
3150        Ok(()) => true,
3151        Err(Errno::ESRCH) => false,
3152        Err(_) => true,
3153    }
3154}
3155
3156async fn discover_network_names(assets_root: &Path) -> Result<Vec<String>> {
3157    if !is_dir(assets_root).await {
3158        return Ok(Vec::new());
3159    }
3160
3161    let mut names = Vec::new();
3162    let mut entries = tokio_fs::read_dir(assets_root).await?;
3163    while let Some(entry) = entries.next_entry().await? {
3164        if !entry.file_type().await?.is_dir() {
3165            continue;
3166        }
3167        let name = entry.file_name().to_string_lossy().to_string();
3168        if !name.is_empty() {
3169            names.push(name);
3170        }
3171    }
3172
3173    names.sort();
3174    Ok(names)
3175}
3176
3177async fn ensure_sidecar_available(layout: &AssetsLayout, node_count: u32) -> Result<()> {
3178    for node_id in 1..=node_count {
3179        let version_dir = layout.latest_protocol_version_dir(node_id).await?;
3180        let sidecar_bin = layout
3181            .node_bin_dir(node_id)
3182            .join(&version_dir)
3183            .join("casper-sidecar");
3184        let sidecar_cfg = layout
3185            .node_config_root(node_id)
3186            .join(&version_dir)
3187            .join("sidecar.toml");
3188
3189        if !is_file(&sidecar_bin).await {
3190            return Err(anyhow!(
3191                "missing sidecar binary for node {}: {}",
3192                node_id,
3193                sidecar_bin.display()
3194            ));
3195        }
3196
3197        if !is_file(&sidecar_cfg).await {
3198            return Err(anyhow!(
3199                "missing sidecar.toml for node {}: {}",
3200                node_id,
3201                sidecar_cfg.display()
3202            ));
3203        }
3204    }
3205
3206    Ok(())
3207}
3208
3209async fn latest_layout_protocol_version(layout: &AssetsLayout) -> Result<String> {
3210    Ok(layout
3211        .latest_protocol_version_dir(1)
3212        .await?
3213        .replace('_', "."))
3214}
3215
3216fn stage_asset_selector(
3217    asset: Option<&str>,
3218    custom_asset: Option<&str>,
3219    legacy_asset_name: Option<&str>,
3220) -> Result<AssetSelector> {
3221    let custom_asset = match (custom_asset, legacy_asset_name) {
3222        (Some(_), Some(_)) => {
3223            return Err(anyhow!(
3224                "custom_asset and asset_name are mutually exclusive"
3225            ));
3226        }
3227        (Some(custom_asset), None) | (None, Some(custom_asset)) => Some(custom_asset),
3228        (None, None) => None,
3229    };
3230    assets::required_asset_selector(asset, custom_asset)
3231}
3232
3233async fn parse_derived_accounts_csv(
3234    csv: &str,
3235    seed: Option<Arc<str>>,
3236) -> Result<Vec<DerivedAccountRow>> {
3237    let mut lines = csv.lines();
3238    let header = lines
3239        .next()
3240        .ok_or_else(|| anyhow!("derived accounts csv is empty"))?;
3241    if header.trim() != "kind,name,key_type,derivation,path,account_hash,balance" {
3242        return Err(anyhow!("unexpected derived-accounts.csv header"));
3243    }
3244
3245    let mut rows = Vec::new();
3246    for line in lines {
3247        let line = line.trim();
3248        if line.is_empty() {
3249            continue;
3250        }
3251
3252        let parts = line.splitn(7, ',').collect::<Vec<_>>();
3253        if parts.len() != 7 {
3254            return Err(anyhow!("invalid derived account row: {}", line));
3255        }
3256
3257        let path = parts[4].to_string();
3258        let public_key = if let Some(seed) = &seed {
3259            match assets::derive_account_from_seed_path(Arc::clone(seed), &path).await {
3260                Ok(material) => Some(material.public_key_hex),
3261                Err(_) => None,
3262            }
3263        } else {
3264            None
3265        };
3266
3267        rows.push(DerivedAccountRow {
3268            kind: parts[0].to_string(),
3269            name: parts[1].to_string(),
3270            key_type: parts[2].to_string(),
3271            derivation: parts[3].to_string(),
3272            path,
3273            account_hash: parts[5].to_string(),
3274            balance: parts[6].to_string(),
3275            public_key,
3276        });
3277    }
3278
3279    Ok(rows)
3280}
3281
3282async fn verify_path_hash_consistency(
3283    layout: &AssetsLayout,
3284    path: &str,
3285    expected_account_hash: &str,
3286) -> Result<()> {
3287    let Some(csv) = assets::derived_accounts_summary(layout).await else {
3288        return Ok(());
3289    };
3290
3291    for line in csv.lines().skip(1) {
3292        let line = line.trim();
3293        if line.is_empty() {
3294            continue;
3295        }
3296        let parts = line.splitn(7, ',').collect::<Vec<_>>();
3297        if parts.len() != 7 {
3298            continue;
3299        }
3300        if parts[4] == path {
3301            if parts[5] != expected_account_hash {
3302                return Err(anyhow!(
3303                    "derived account hash mismatch for path {}: csv={} derived={}",
3304                    path,
3305                    parts[5],
3306                    expected_account_hash
3307                ));
3308            }
3309            return Ok(());
3310        }
3311    }
3312
3313    Ok(())
3314}
3315
3316async fn fetch_chain_name(network_name: &str, node_id: u32) -> Result<String> {
3317    let rpc = mcp_rpc_client(network_name, node_id)?;
3318    rpc.get_network_name().await.map_err(Into::into)
3319}
3320
3321fn extract_block_height(value: &Value) -> Option<u64> {
3322    value
3323        .pointer("/block_with_signatures/block/header/height")
3324        .and_then(Value::as_u64)
3325        .or_else(|| {
3326            value
3327                .pointer("/block/block/header/height")
3328                .and_then(Value::as_u64)
3329        })
3330        .or_else(|| {
3331            value
3332                .pointer("/block_with_signatures/Version2/block/Version2/header/height")
3333                .and_then(Value::as_u64)
3334        })
3335        .or_else(|| {
3336            value
3337                .pointer("/block_with_signatures/Version1/block/Version1/header/height")
3338                .and_then(Value::as_u64)
3339        })
3340        .or_else(|| find_first_height(value))
3341}
3342
3343fn find_first_height(value: &Value) -> Option<u64> {
3344    match value {
3345        Value::Object(map) => {
3346            if let Some(height) = map.get("height").and_then(Value::as_u64) {
3347                return Some(height);
3348            }
3349            for nested in map.values() {
3350                if let Some(height) = find_first_height(nested) {
3351                    return Some(height);
3352                }
3353            }
3354            None
3355        }
3356        Value::Array(items) => {
3357            for item in items {
3358                if let Some(height) = find_first_height(item) {
3359                    return Some(height);
3360                }
3361            }
3362            None
3363        }
3364        _ => None,
3365    }
3366}
3367
3368fn parse_no_such_block_range_from_error(error_text: &str) -> Option<(u64, u64)> {
3369    let start = error_text.find('{')?;
3370    let payload = &error_text[start..];
3371    let value: Value = serde_json::from_str(payload).ok()?;
3372
3373    let code = value.get("code").and_then(Value::as_i64)?;
3374    let message = value.get("message").and_then(Value::as_str)?;
3375    if code != -32001 || !message.eq_ignore_ascii_case("No such block") {
3376        return None;
3377    }
3378
3379    let low = value
3380        .pointer("/data/available_block_range/low")
3381        .and_then(Value::as_u64)
3382        .unwrap_or(0);
3383    let high = value
3384        .pointer("/data/available_block_range/high")
3385        .and_then(Value::as_u64)
3386        .unwrap_or(low);
3387    Some((low, high))
3388}
3389
3390async fn read_log_page(path: &Path, before_line: Option<usize>, limit: usize) -> Result<LogPage> {
3391    let contents = tokio_fs::read_to_string(path)
3392        .await
3393        .with_context(|| format!("failed to read log file {}", path.display()))?;
3394
3395    let all_lines = contents
3396        .lines()
3397        .map(ToString::to_string)
3398        .collect::<Vec<_>>();
3399    let total_lines = all_lines.len();
3400
3401    let before = before_line.unwrap_or(total_lines + 1);
3402    if before == 0 {
3403        return Err(anyhow!("before_line must be >= 1"));
3404    }
3405
3406    let end_exclusive = before.saturating_sub(1).min(total_lines);
3407    let start = end_exclusive.saturating_sub(limit);
3408
3409    let mut lines = Vec::new();
3410    for (idx, content) in all_lines[start..end_exclusive].iter().enumerate() {
3411        lines.push(LogLine {
3412            line_number: start + idx + 1,
3413            content: content.clone(),
3414        });
3415    }
3416
3417    let next_before_line = if start == 0 { None } else { Some(start + 1) };
3418
3419    Ok(LogPage {
3420        path: path.display().to_string(),
3421        total_lines,
3422        returned: lines.len(),
3423        next_before_line,
3424        lines,
3425    })
3426}
3427
3428async fn is_dir(path: &Path) -> bool {
3429    tokio_fs::metadata(path)
3430        .await
3431        .map(|meta| meta.is_dir())
3432        .unwrap_or(false)
3433}
3434
3435async fn is_file(path: &Path) -> bool {
3436    tokio_fs::metadata(path)
3437        .await
3438        .map(|meta| meta.is_file())
3439        .unwrap_or(false)
3440}
3441
3442pub async fn run(args: McpArgs) -> Result<()> {
3443    let assets_root = match args.net_path {
3444        Some(path) => path,
3445        None => assets::default_assets_root()?,
3446    };
3447
3448    let manager = Arc::new(NetworkManager::new(assets_root).await?);
3449
3450    let result = match args.transport {
3451        McpTransport::Stdio => run_stdio(manager.clone()).await,
3452        McpTransport::Http => run_http(manager.clone(), &args.http_bind, &args.http_path).await,
3453        McpTransport::Both => run_both(manager.clone(), &args.http_bind, &args.http_path).await,
3454    };
3455
3456    let stop_result = manager.stop_all_networks().await;
3457
3458    match (result, stop_result) {
3459        (Ok(()), Ok(())) => Ok(()),
3460        (Err(err), Ok(())) => Err(err),
3461        (Ok(()), Err(stop_err)) => Err(stop_err),
3462        (Err(run_err), Err(stop_err)) => Err(anyhow!(
3463            "mcp server failed: {run_err}; additionally failed to stop networks: {stop_err}"
3464        )),
3465    }
3466}
3467
3468async fn run_stdio(manager: Arc<NetworkManager>) -> Result<()> {
3469    let service = McpServer::new(manager).serve(mcp_stdio()).await?;
3470    service.waiting().await?;
3471    Ok(())
3472}
3473
3474async fn run_http(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3475    let path = normalize_http_path(path);
3476    let socket = std::net::SocketAddr::from_str(bind)
3477        .with_context(|| format!("invalid http bind address '{}'", bind))?;
3478
3479    let service: StreamableHttpService<McpServer, LocalSessionManager> = StreamableHttpService::new(
3480        {
3481            let manager = manager.clone();
3482            move || Ok(McpServer::new(manager.clone()))
3483        },
3484        Arc::new(LocalSessionManager::default()),
3485        StreamableHttpServerConfig::default(),
3486    );
3487
3488    let router = axum::Router::new().nest_service(&path, service);
3489    let listener = tokio::net::TcpListener::bind(socket).await?;
3490
3491    axum::serve(listener, router)
3492        .with_graceful_shutdown(async {
3493            let _ = tokio::signal::ctrl_c().await;
3494        })
3495        .await?;
3496
3497    Ok(())
3498}
3499
3500async fn run_both(manager: Arc<NetworkManager>, bind: &str, path: &str) -> Result<()> {
3501    let path = normalize_http_path(path);
3502    let socket = std::net::SocketAddr::from_str(bind)
3503        .with_context(|| format!("invalid http bind address '{}'", bind))?;
3504
3505    let (shutdown_tx, mut shutdown_rx) = tokio::sync::watch::channel(false);
3506
3507    let mut http_task = {
3508        let manager = manager.clone();
3509        tokio::spawn(async move {
3510            let service: StreamableHttpService<McpServer, LocalSessionManager> =
3511                StreamableHttpService::new(
3512                    {
3513                        let manager = manager.clone();
3514                        move || Ok(McpServer::new(manager.clone()))
3515                    },
3516                    Arc::new(LocalSessionManager::default()),
3517                    StreamableHttpServerConfig::default(),
3518                );
3519
3520            let router = axum::Router::new().nest_service(&path, service);
3521            let listener = tokio::net::TcpListener::bind(socket).await?;
3522
3523            axum::serve(listener, router)
3524                .with_graceful_shutdown(async move {
3525                    tokio::select! {
3526                        _ = async {
3527                            loop {
3528                                if *shutdown_rx.borrow() {
3529                                    break;
3530                                }
3531                                if shutdown_rx.changed().await.is_err() {
3532                                    break;
3533                                }
3534                            }
3535                        } => {}
3536                        _ = tokio::signal::ctrl_c() => {}
3537                    }
3538                })
3539                .await
3540                .map_err(anyhow::Error::from)
3541        })
3542    };
3543
3544    let mut stdio_task = tokio::spawn(async move { run_stdio(manager).await });
3545
3546    let result = tokio::select! {
3547        res = &mut stdio_task => match res {
3548            Ok(inner) => inner,
3549            Err(join_err) => Err(anyhow!("stdio task failed: {}", join_err)),
3550        },
3551        res = &mut http_task => match res {
3552            Ok(inner) => inner,
3553            Err(join_err) => Err(anyhow!("http task failed: {}", join_err)),
3554        },
3555    };
3556
3557    let _ = shutdown_tx.send(true);
3558    let _ = tokio::time::timeout(Duration::from_secs(2), &mut http_task).await;
3559    let _ = tokio::time::timeout(Duration::from_secs(2), &mut stdio_task).await;
3560
3561    result
3562}
3563
3564fn normalize_http_path(path: &str) -> String {
3565    if path.starts_with('/') {
3566        path.to_string()
3567    } else {
3568        format!("/{}", path)
3569    }
3570}
3571
3572#[cfg(test)]
3573mod tests {
3574    use super::*;
3575
3576    #[tokio::test]
3577    async fn parse_derived_accounts_rows() {
3578        let csv = "kind,name,key_type,derivation,path,account_hash,balance\nvalidator,node-1,secp256k1,bip32,m/44'/506'/0'/0/0,account-hash-a,100\nuser,user-1,secp256k1,bip32,m/44'/506'/0'/0/100,account-hash-b,200";
3579        let rows = parse_derived_accounts_csv(csv, None).await.unwrap();
3580        assert_eq!(rows.len(), 2);
3581        assert_eq!(rows[0].path, "m/44'/506'/0'/0/0");
3582        assert_eq!(rows[1].account_hash, "account-hash-b");
3583    }
3584
3585    #[test]
3586    fn sse_filter_include_exclude() {
3587        let filter = SseFilter {
3588            include_event_types: vec!["BlockAdded".to_string()],
3589            exclude_event_types: vec!["TransactionAccepted".to_string()],
3590        };
3591
3592        let block = SseRecord {
3593            sequence: 1,
3594            timestamp_rfc3339: "t".to_string(),
3595            node_id: 1,
3596            event_type: "BlockAdded".to_string(),
3597            payload: json!({}),
3598        };
3599        let tx = SseRecord {
3600            sequence: 2,
3601            timestamp_rfc3339: "t".to_string(),
3602            node_id: 1,
3603            event_type: "TransactionAccepted".to_string(),
3604            payload: json!({}),
3605        };
3606
3607        assert!(filter.matches(&block));
3608        assert!(!filter.matches(&tx));
3609    }
3610
3611    #[tokio::test]
3612    async fn sse_history_paginates_by_sequence() {
3613        let store = SseStore::new(100);
3614        for sequence in 1..=5 {
3615            let mut guard = store.events.lock().await;
3616            guard.push_back(SseRecord {
3617                sequence,
3618                timestamp_rfc3339: "t".to_string(),
3619                node_id: 1,
3620                event_type: "BlockAdded".to_string(),
3621                payload: json!({ "sequence": sequence }),
3622            });
3623            drop(guard);
3624        }
3625        let filter = SseFilter::default();
3626        let page = store.history(Some(6), 2, &filter).await;
3627        assert_eq!(page.returned, 2);
3628        assert_eq!(page.events[0].sequence, 4);
3629        assert_eq!(page.events[1].sequence, 5);
3630        assert_eq!(page.next_before_sequence, Some(4));
3631    }
3632
3633    #[tokio::test]
3634    async fn read_log_page_uses_before_cursor() {
3635        let temp = tempfile::NamedTempFile::new().unwrap();
3636        tokio_fs::write(temp.path(), "a\nb\nc\nd\n").await.unwrap();
3637
3638        let page = read_log_page(temp.path(), Some(5), 2).await.unwrap();
3639        assert_eq!(page.lines.len(), 2);
3640        assert_eq!(page.lines[0].line_number, 3);
3641        assert_eq!(page.lines[1].line_number, 4);
3642        assert_eq!(page.next_before_line, Some(3));
3643    }
3644
3645    #[tokio::test]
3646    async fn sidecar_preflight_fails_when_missing() {
3647        let root = tempfile::tempdir().unwrap();
3648        let layout = AssetsLayout::new(root.path().to_path_buf(), "test-net".to_string());
3649        let node_bin = layout.node_bin_dir(1).join("2_0_0");
3650        let node_cfg = layout.node_config_root(1).join("2_0_0");
3651        tokio_fs::create_dir_all(&node_bin).await.unwrap();
3652        tokio_fs::create_dir_all(&node_cfg).await.unwrap();
3653        tokio_fs::write(node_bin.join("casper-node"), "bin")
3654            .await
3655            .unwrap();
3656        tokio_fs::write(node_cfg.join("sidecar.toml"), "cfg")
3657            .await
3658            .unwrap();
3659
3660        let result = ensure_sidecar_available(&layout, 1).await;
3661        assert!(result.is_err());
3662    }
3663
3664    #[test]
3665    fn normalize_optional_identifier_trims_and_defaults() {
3666        assert_eq!(normalize_optional_identifier(None), "");
3667        assert_eq!(normalize_optional_identifier(Some("   ")), "");
3668        assert_eq!(normalize_optional_identifier(Some(" 123 ")), "123");
3669    }
3670
3671    #[test]
3672    fn parse_state_identifier_variants() {
3673        assert_eq!(
3674            parse_state_identifier("42", "").unwrap(),
3675            Some(json!({ "BlockHeight": 42u64 }))
3676        );
3677        assert_eq!(
3678            parse_state_identifier(
3679                "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234",
3680                ""
3681            )
3682            .unwrap(),
3683            Some(json!({
3684                "BlockHash": "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234"
3685            }))
3686        );
3687        assert_eq!(parse_state_identifier("", "").unwrap(), None,);
3688    }
3689
3690    #[test]
3691    fn parse_contract_hash_for_invocation_accepts_common_formats() {
3692        let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3693        let contract = parse_contract_hash_for_invocation(&format!("contract-{}", hash)).unwrap();
3694        let key_hash = parse_contract_hash_for_invocation(&format!("hash-{}", hash)).unwrap();
3695        let raw = parse_contract_hash_for_invocation(hash).unwrap();
3696        assert_eq!(contract.to_hex_string(), hash);
3697        assert_eq!(key_hash.to_hex_string(), hash);
3698        assert_eq!(raw.to_hex_string(), hash);
3699    }
3700
3701    #[test]
3702    fn parse_query_key_accepts_contract_hash_format() {
3703        let hash = "2f6fbeebbe1bdf6f8ff05880edfa4e4f79849d2b4f0ecf65482177e4fabc1234";
3704        let key = parse_query_key(&format!("contract-{}", hash)).unwrap();
3705        assert_eq!(key, format!("hash-{}", hash));
3706    }
3707
3708    #[test]
3709    fn parse_transaction_json_rejects_escaped_string_payload() {
3710        let tx_json = json!({
3711            "Version1": {
3712                "hash": "7eeb092361e31b4cc9885e3621f1470f29631338ecc703643c22da1d38fd81a9",
3713                "payload": {
3714                    "initiator_addr": {
3715                        "PublicKey": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f"
3716                    },
3717                    "timestamp": "2026-02-27T18:03:18.541Z",
3718                    "ttl": "30m",
3719                    "chain_name": "casper-devnet",
3720                    "pricing_mode": {
3721                        "PaymentLimited": {
3722                            "payment_amount": 100000000000u64,
3723                            "gas_price_tolerance": 5,
3724                            "standard_payment": true
3725                        }
3726                    },
3727                    "fields": {
3728                        "args": {"Named": []},
3729                        "entry_point": {"Custom": "counter_inc"},
3730                        "scheduling": "Standard",
3731                        "target": {
3732                            "Stored": {
3733                                "id": {
3734                                    "ByPackageName": {
3735                                        "name": "counter_package_name",
3736                                        "version": null
3737                                    }
3738                                },
3739                                "runtime": "VmCasperV1"
3740                            }
3741                        }
3742                    }
3743                },
3744                "approvals": [{
3745                    "signer": "0202f9bae6a6c5a8345c2aa8339b54ff3fcf82d2f6a9cce1732e765c2cc403b3be9f",
3746                    "signature": "02c64336e5ed2832bdb84adb3f334d585548ee096066aa9d0797c11ab3f074ec9d7bd396994bc9b9c239342be801bc385a9c5083779bace4dfe0b400d4a13c07db"
3747                }]
3748            }
3749        });
3750        let direct = parse_transaction_json(tx_json.clone()).unwrap();
3751        let wrapped = parse_transaction_json(json!({ "transaction": tx_json })).unwrap();
3752        let encoded = serde_json::to_string(&direct).unwrap();
3753        let from_string_err = parse_transaction_json(Value::String(encoded.clone()));
3754        let wrapped_string_err = parse_transaction_json(json!({
3755            "transaction": encoded
3756        }));
3757
3758        assert_eq!(
3759            direct.hash().to_hex_string(),
3760            wrapped.hash().to_hex_string()
3761        );
3762        assert!(from_string_err.is_err());
3763        assert!(wrapped_string_err.is_err());
3764    }
3765
3766    #[test]
3767    fn parse_no_such_block_range_from_error_extracts_bounds() {
3768        let error = "casper client error: response for rpc-id 1 chain_get_block is json-rpc error: {\"code\":-32001,\"message\":\"No such block\",\"data\":{\"message\":\"no block found for the provided identifier\",\"available_block_range\":{\"low\":0,\"high\":0}}}";
3769        let range = parse_no_such_block_range_from_error(error).unwrap();
3770        assert_eq!(range, (0, 0));
3771    }
3772
3773    #[test]
3774    fn send_transaction_signed_request_requires_transaction_field() {
3775        let payload = json!({
3776            "network_name": "casper-devnet",
3777            "node_id": 1,
3778            "signer_path": "m/44'/506'/0'/0/100",
3779            "transaction": { "Version1": { "hash": "abc" } }
3780        });
3781        let request: SendTransactionSignedRequest = serde_json::from_value(payload).unwrap();
3782        assert!(request.transaction.get("Version1").is_some());
3783
3784        let legacy_payload = json!({
3785            "network_name": "casper-devnet",
3786            "node_id": 1,
3787            "signer_path": "m/44'/506'/0'/0/100",
3788            "transaction_json": { "Version1": { "hash": "def" } }
3789        });
3790        let legacy_err = serde_json::from_value::<SendTransactionSignedRequest>(legacy_payload)
3791            .unwrap_err()
3792            .to_string();
3793        assert!(legacy_err.contains("missing field `transaction`"));
3794    }
3795}