Skip to main content

agent_block_mcp/
lib.rs

1//! MCP Client — manages MCP server child processes via rmcp.
2//!
3//! Uses `rmcp` (1.4.x) `RunningService<RoleClient, AgentBlockClientHandler>` internally.
4//! `AgentBlockClientHandler` provides custom notification handling via Lua callbacks
5//! (wired in Subtask 2/3). For Subtask 1, all notification methods are default no-ops.
6//!
7//! All rmcp round-trips are wrapped in a per-call timeout so a hung child
8//! cannot block a Lua coroutine indefinitely.
9//!
10//! # Concurrency contract
11//!
12//! `list_tools` and `call_tool` take `&self`, so the manager can be held
13//! under `tokio::sync::RwLock` and multiple RPCs — including against the
14//! same server — can proceed in parallel via read guards. Request/response
15//! multiplexing on a single server is handled by rmcp's `Peer`, which
16//! pairs each outbound request with a `oneshot` receiver keyed by request
17//! ID. `connect` and `disconnect` are mutating (`&mut self`) and must take
18//! the write guard.
19//!
20//! This contract is covered by in-process unit tests in `#[cfg(test)]` at
21//! the bottom of this file. If rmcp alters its `Peer` concurrency model,
22//! or if this module is refactored to re-serialize RPCs, those tests fail.
23//!
24//! # Usage from Lua
25//!
26//! ```lua
27//! mcp.connect("outline", "outline-mcp", {})
28//! local tools = mcp.list_tools("outline")
29//! local result = mcp.call("outline", "shelf", {})
30//! mcp.disconnect("outline")
31//! ```
32
33pub mod handler;
34pub(crate) mod http;
35pub mod lua_json;
36
37use std::collections::HashMap;
38use std::path::Path;
39use std::process::Stdio;
40use std::sync::Arc;
41use std::time::{Duration, Instant};
42
43use mlua_isle::AsyncIsle;
44use rmcp::{
45    model::{
46        ArgumentInfo, CallToolRequestParams, CancelledNotification, CancelledNotificationParam,
47        ClientRequest, CompleteRequestParams, GetPromptRequestParams, NumberOrString, PingRequest,
48        ReadResourceRequestParams, Reference, RootsListChangedNotification, ServerResult,
49        SubscribeRequestParams, UnsubscribeRequestParams,
50    },
51    service::{RoleClient, RunningService},
52    transport::TokioChildProcess,
53    ServiceExt,
54};
55use tokio::process::Command;
56use tokio::time::timeout;
57use tracing::warn;
58
59use agent_block_types::error::{BlockError, BlockResult};
60
61pub use handler::AgentBlockClientHandler;
62
63/// Default RPC round-trip timeout when no explicit value is provided.
64pub const DEFAULT_RPC_TIMEOUT: Duration = Duration::from_secs(30);
65
66pub struct McpManager {
67    /// Server connections keyed by name. `pub(crate)` so integration tests
68    /// can insert in-process test servers directly (same as `concurrency_tests`
69    /// in this module).
70    pub servers: HashMap<String, RunningService<RoleClient, AgentBlockClientHandler>>,
71    rpc_timeout: Duration,
72    /// Shared handler instance — all connections share the same registry Arc.
73    pub handler: AgentBlockClientHandler,
74}
75
76impl McpManager {
77    pub fn new() -> Self {
78        Self {
79            servers: HashMap::new(),
80            rpc_timeout: DEFAULT_RPC_TIMEOUT,
81            handler: AgentBlockClientHandler::new(),
82        }
83    }
84
85    /// Construct a manager with a caller-specified RPC timeout.
86    /// Applies to `connect`, `list_tools`, and `call_tool` alike.
87    ///
88    /// `rpc_timeout` must be non-zero. `Duration::ZERO` would cause every
89    /// `tokio::time::timeout` to fire immediately, silently turning every
90    /// MCP round-trip into a timeout error — for an autonomous agent that
91    /// is a "everything looks broken" failure mode. We reject it at
92    /// construction time so the misconfiguration surfaces loudly at
93    /// startup instead of being swallowed at the first RPC.
94    pub fn with_rpc_timeout(rpc_timeout: Duration) -> BlockResult<Self> {
95        if rpc_timeout.is_zero() {
96            return Err(BlockError::Mcp(
97                "rpc_timeout must be > 0 (got Duration::ZERO); \
98                 every MCP RPC would time out immediately"
99                    .to_string(),
100            ));
101        }
102        Ok(Self {
103            servers: HashMap::new(),
104            rpc_timeout,
105            handler: AgentBlockClientHandler::new(),
106        })
107    }
108
109    /// Spawn the MCP server process and complete the MCP initialize handshake.
110    ///
111    /// `trace_context`: if `true`, `__ab_obs` observability context will be
112    /// injected into `call_tool` arguments for this server.  Defaults to `false`
113    /// (opt-in) so that third-party / untrusted stdio servers do not receive agent
114    /// identity metadata unless explicitly enabled.
115    ///
116    /// `cwd`: if `Some`, the spawned subprocess inherits this as its current
117    /// working directory; if `None`, the subprocess inherits the parent
118    /// process's CWD. Callers driven through `agent-block-core` typically
119    /// pass `BlockConfig.project_root` so the MCP server sees the same
120    /// project root as the Lua script (matters for servers that rely on
121    /// path-based discovery such as `git rev-parse --show-toplevel`).
122    pub async fn connect(
123        &mut self,
124        name: &str,
125        command: &str,
126        args: &[String],
127        trace_context: bool,
128        cwd: Option<&Path>,
129    ) -> BlockResult<()> {
130        let mut cmd = Command::new(command);
131        cmd.args(args).stderr(Stdio::inherit());
132        if let Some(dir) = cwd {
133            cmd.current_dir(dir);
134        }
135        let transport = TokioChildProcess::new(cmd).map_err(|e| {
136            warn!(server = %name, command = %command, error = %e, "mcp spawn failed");
137            BlockError::Mcp(format!("spawn '{command}': {e}"))
138        })?;
139        let rpc_timeout = self.rpc_timeout;
140        // Ensure the handler registry has an entry for this server name
141        // so callbacks can be registered immediately after connect returns.
142        self.handler.ensure_server(name);
143        self.handler.set_trace_context(name, trace_context);
144        // Set server_name before clone so create_message can identify the
145        // connection without needing the RequestContext to carry server identity.
146        // The mutate-template → clone → reset dance is required because
147        // AgentBlockClientHandler is shared across all connections via Arc<Mutex>
148        // for the registry, but create_message needs per-connection server identity
149        // that is NOT shared.  Cloning after setting server_name gives each
150        // RunningService its own immutable copy of the name while the registry Arc
151        // continues to be shared.  Both connect() and connect_http() use this pattern.
152        self.handler.server_name = Some(name.to_string());
153        let handler = self.handler.clone();
154        // Reset server_name on the shared template so the next connect call
155        // starts fresh.
156        self.handler.server_name = None;
157        let running = timeout(rpc_timeout, handler.serve(transport))
158            .await
159            .map_err(|_| {
160                warn!(server = %name, timeout = ?rpc_timeout, "mcp initialize timed out");
161                BlockError::Timeout(format!(
162                    "initialize '{name}' timed out after {rpc_timeout:?}"
163                ))
164            })?
165            .map_err(|e| {
166                warn!(server = %name, error = %e, "mcp initialize failed");
167                BlockError::Mcp(format!("initialize '{name}': {e}"))
168            })?;
169        self.servers.insert(name.to_string(), running);
170        Ok(())
171    }
172
173    /// Call `tools/list` and return the tools as a JSON array.
174    ///
175    /// Immutable receiver so concurrent readers can share an `RwLock<McpManager>`.
176    pub async fn list_tools(&self, name: &str) -> BlockResult<serde_json::Value> {
177        let srv = self.servers.get(name).ok_or_else(|| {
178            warn!(server = %name, "mcp list_tools on unknown server");
179            BlockError::Mcp(format!("no server named '{name}'"))
180        })?;
181        let rpc_timeout = self.rpc_timeout;
182        let tools = timeout(rpc_timeout, srv.list_all_tools())
183            .await
184            .map_err(|_| {
185                warn!(server = %name, timeout = ?rpc_timeout, "mcp list_tools timed out");
186                BlockError::Timeout(format!(
187                    "list_tools '{name}' timed out after {rpc_timeout:?}"
188                ))
189            })?
190            .map_err(|e| {
191                warn!(server = %name, error = %e, "mcp list_tools failed");
192                BlockError::Mcp(format!("list_tools '{name}': {e}"))
193            })?;
194        serde_json::to_value(&tools)
195            .map_err(|e| BlockError::Mcp(format!("serialize list_tools result: {e}")))
196    }
197
198    /// Call `tools/call` with the given tool name and arguments.
199    ///
200    /// Returns the full rmcp `CallToolResult` serialized to JSON
201    /// (`{"content": [...], "isError": bool, ...}`) on success, including
202    /// the `isError` flag — tool-execution errors are passed through to
203    /// the caller, following the MCP spec's intent that the LLM sees them
204    /// and self-corrects. Only protocol / transport / timeout failures
205    /// surface as `Err(BlockError::*)`.
206    ///
207    /// `arguments` must be a JSON `Object` or `Null`. `Null` is treated as
208    /// "no arguments"; any other shape (array, scalar) returns an error
209    /// rather than silently dropping the payload.
210    /// Immutable receiver so concurrent readers can share an `RwLock<McpManager>`.
211    pub async fn call_tool(
212        &self,
213        name: &str,
214        tool_name: &str,
215        arguments: serde_json::Value,
216    ) -> BlockResult<serde_json::Value> {
217        // Validate argument shape early so the error does not depend on
218        // whether the server is registered or reachable. MCP spec requires
219        // `arguments` to be an object (or absent); an array/scalar would
220        // serialize into `CallToolRequestParams` as-is and the server
221        // would reject it with an opaque protocol error.
222        let mut params = CallToolRequestParams::new(tool_name.to_string());
223        match arguments {
224            serde_json::Value::Object(obj) => {
225                params = params.with_arguments(obj);
226            }
227            serde_json::Value::Null => {}
228            other => {
229                let kind = match other {
230                    serde_json::Value::Array(_) => "array",
231                    serde_json::Value::String(_) => "string",
232                    serde_json::Value::Number(_) => "number",
233                    serde_json::Value::Bool(_) => "bool",
234                    _ => "unknown",
235                };
236                return Err(BlockError::Mcp(format!(
237                    "call_tool '{tool_name}' on '{name}': arguments must be a JSON object \
238                     (got {kind})"
239                )));
240            }
241        }
242        let srv = self.servers.get(name).ok_or_else(|| {
243            warn!(server = %name, tool = %tool_name, "mcp call_tool on unknown server");
244            BlockError::Mcp(format!("no server named '{name}'"))
245        })?;
246        let rpc_timeout = self.rpc_timeout;
247        let result = timeout(rpc_timeout, srv.call_tool(params))
248            .await
249            .map_err(|_| {
250                warn!(server = %name, tool = %tool_name, timeout = ?rpc_timeout, "mcp call_tool timed out");
251                // Fire-and-forget cancellation notification so the server can
252                // clean up the timed-out request.  request_id 0 is a sentinel
253                // (we do not have the rmcp-internal ID at this call site).
254                // Pass None: we do not have the rmcp-internal request ID at
255                // this call site, and sending ID=0 risks matching a real
256                // in-flight request on a server that allocates from zero.
257                self.send_cancelled(name, None);
258                BlockError::Timeout(format!(
259                    "call_tool '{tool_name}' on '{name}' timed out after {rpc_timeout:?}"
260                ))
261            })?
262            .map_err(|e| {
263                warn!(server = %name, tool = %tool_name, error = %e, "mcp call_tool failed");
264                BlockError::Mcp(format!("call_tool '{tool_name}' on '{name}': {e}"))
265            })?;
266        serde_json::to_value(&result)
267            .map_err(|e| BlockError::Mcp(format!("serialize call_tool result: {e}")))
268    }
269
270    /// Cancel the named server and remove it from the manager.
271    ///
272    /// The server is removed from the internal map **before** the cancel
273    /// round-trip begins, so a slow or failed cancel never leaves a
274    /// zombie entry behind. If graceful cancel exceeds `rpc_timeout`,
275    /// the service handle is dropped at the end of the match arm —
276    /// rmcp's `Drop` impl cancels the peer's cancellation token, which
277    /// terminates the internal task and closes the transport — and
278    /// `BlockError::Timeout` is returned.
279    ///
280    /// The same `rpc_timeout` is reused here so callers have a single
281    /// knob governing every MCP round-trip (see `with_rpc_timeout`).
282    ///
283    /// Callers may re-`connect` the same name safely after any outcome.
284    pub async fn disconnect(&mut self, name: &str) -> BlockResult<()> {
285        let Some(running) = self.servers.remove(name) else {
286            return Ok(());
287        };
288        let cancel_timeout = self.rpc_timeout;
289        match timeout(cancel_timeout, running.cancel()).await {
290            Ok(Ok(_)) => Ok(()),
291            Ok(Err(e)) => {
292                warn!(server = %name, error = %e, "mcp cancel failed");
293                Err(BlockError::Mcp(format!("cancel '{name}': {e}")))
294            }
295            Err(_) => {
296                warn!(server = %name, timeout = ?cancel_timeout, "mcp cancel timed out");
297                Err(BlockError::Timeout(format!(
298                    "cancel '{name}' timed out after {cancel_timeout:?}"
299                )))
300            }
301        }
302    }
303
304    /// Cancel all managed servers.
305    ///
306    /// Every server is disconnected regardless of individual failures.
307    /// The first error encountered is returned so shutdown can signal
308    /// a problem; **subsequent** errors are logged at `warn` level so
309    /// they are not silently discarded.
310    pub async fn disconnect_all(&mut self) -> BlockResult<()> {
311        let mut first_err: Option<BlockError> = None;
312        let names: Vec<String> = self.servers.keys().cloned().collect();
313        for name in names {
314            if let Err(e) = self.disconnect(&name).await {
315                if first_err.is_none() {
316                    first_err = Some(e);
317                } else {
318                    warn!(server = %name, error = %e, "disconnect failed during disconnect_all");
319                }
320            }
321        }
322        match first_err {
323            Some(e) => Err(e),
324            None => Ok(()),
325        }
326    }
327
328    /// Wire the handler Isle into this manager's `AgentBlockClientHandler`.
329    ///
330    /// Must be called after both the `McpManager` and the `AsyncIsle` are
331    /// constructed. The handler Isle is used to dispatch Lua notification
332    /// callbacks (`on_progress` etc.) from the rmcp task thread.
333    ///
334    /// Idempotent: a second call replaces the previous Isle reference.
335    pub fn set_handler_isle(&mut self, isle: Arc<AsyncIsle>) {
336        self.handler.handler_isle = Some(isle);
337    }
338
339    /// Wire the main Isle into the shared `AgentBlockClientHandler`.
340    ///
341    /// Must be called after construction and before `connect` / `connect_http`
342    /// so that progress/log notification dispatchers can call user Lua callbacks
343    /// stored in the main Isle's globals (upvalue-safe path).
344    ///
345    /// Also starts the bounded notification dispatch task (M-3: capacity-128 channel
346    /// that prevents unbounded memory growth from chatty notification sources).
347    ///
348    /// Idempotent: a second call replaces the previous Isle reference and restarts
349    /// the dispatch task on the new channel.
350    pub fn set_main_isle(&mut self, isle: Arc<AsyncIsle>) {
351        self.handler.main_isle = Some(isle);
352        self.handler.start_dispatch_task();
353    }
354
355    /// Connect to an MCP server via Streamable HTTP transport.
356    ///
357    /// `opts` may contain:
358    /// - `auth_header` (string): bearer-token authentication header value.
359    /// - `trace_context` (bool): if `true`, inject `__ab_obs` observability
360    ///   context into `call_tool` arguments. Default: `false` (opt-in).
361    ///
362    /// The handler Isle must be wired via `set_handler_isle` before calling
363    /// this method if `on_progress` callbacks are needed.
364    pub async fn connect_http(
365        &mut self,
366        name: &str,
367        url: &str,
368        opts: serde_json::Value,
369    ) -> BlockResult<()> {
370        let trace_context = opts
371            .get("trace_context")
372            .and_then(|v| v.as_bool())
373            .unwrap_or(false);
374        self.handler.ensure_server(name);
375        self.handler.set_trace_context(name, trace_context);
376        // Same mutate-template → clone → reset dance as connect(); see the comment
377        // there for the rationale (per-connection server_name, shared registry Arc).
378        self.handler.server_name = Some(name.to_string());
379        let handler = self.handler.clone();
380        self.handler.server_name = None;
381        let running =
382            http::connect_http_transport(name, url, &opts, handler, self.rpc_timeout).await?;
383        self.servers.insert(name.to_string(), running);
384        Ok(())
385    }
386
387    /// Call `resources/list` and return resources as a JSON array.
388    ///
389    /// Immutable receiver — usable under `RwLock::read` alongside concurrent RPCs.
390    pub async fn list_resources(&self, name: &str) -> BlockResult<serde_json::Value> {
391        let srv = self.servers.get(name).ok_or_else(|| {
392            warn!(server = %name, "mcp list_resources on unknown server");
393            BlockError::Mcp(format!("no server named '{name}'"))
394        })?;
395        let rpc_timeout = self.rpc_timeout;
396        let resources = timeout(rpc_timeout, srv.list_all_resources())
397            .await
398            .map_err(|_| {
399                warn!(server = %name, timeout = ?rpc_timeout, "mcp list_resources timed out");
400                BlockError::Timeout(format!(
401                    "list_resources '{name}' timed out after {rpc_timeout:?}"
402                ))
403            })?
404            .map_err(|e| {
405                warn!(server = %name, error = %e, "mcp list_resources failed");
406                BlockError::Mcp(format!("list_resources '{name}': {e}"))
407            })?;
408        serde_json::to_value(&resources)
409            .map_err(|e| BlockError::Mcp(format!("serialize list_resources result: {e}")))
410    }
411
412    /// Call `resources/templates/list` and return resource templates as a JSON array.
413    ///
414    /// Immutable receiver — usable under `RwLock::read` alongside concurrent RPCs.
415    pub async fn list_resource_templates(&self, name: &str) -> BlockResult<serde_json::Value> {
416        let srv = self.servers.get(name).ok_or_else(|| {
417            warn!(server = %name, "mcp list_resource_templates on unknown server");
418            BlockError::Mcp(format!("no server named '{name}'"))
419        })?;
420        let rpc_timeout = self.rpc_timeout;
421        let templates = timeout(rpc_timeout, srv.list_all_resource_templates())
422            .await
423            .map_err(|_| {
424                warn!(server = %name, timeout = ?rpc_timeout, "mcp list_resource_templates timed out");
425                BlockError::Timeout(format!(
426                    "list_resource_templates '{name}' timed out after {rpc_timeout:?}"
427                ))
428            })?
429            .map_err(|e| {
430                warn!(server = %name, error = %e, "mcp list_resource_templates failed");
431                BlockError::Mcp(format!("list_resource_templates '{name}': {e}"))
432            })?;
433        serde_json::to_value(&templates)
434            .map_err(|e| BlockError::Mcp(format!("serialize list_resource_templates result: {e}")))
435    }
436
437    /// Send a `ping` keepalive to the named server and return the round-trip
438    /// latency in milliseconds.
439    ///
440    /// Uses `send_request(ClientRequest::PingRequest(...))` — rmcp 1.4.0 has
441    /// no dedicated `Peer::ping()` method.  Latency is measured with
442    /// `Instant::now()` immediately before the send and `elapsed()` immediately
443    /// after the `EmptyResult` is received (crux must_not_simplify).
444    ///
445    /// Immutable receiver — usable under `RwLock::read` alongside concurrent RPCs.
446    pub async fn ping(&self, name: &str) -> BlockResult<u64> {
447        let srv = self.servers.get(name).ok_or_else(|| {
448            warn!(server = %name, "mcp ping on unknown server");
449            BlockError::Mcp(format!("no server named '{name}'"))
450        })?;
451        let rpc_timeout = self.rpc_timeout;
452        // Clone Peer out of RunningService before awaiting to avoid holding
453        // the lock across the await point (K-4 / await-holding-lock).
454        let peer = srv.peer().clone();
455        let ping_req = ClientRequest::PingRequest(PingRequest::default());
456        // Measure latency from immediately before send to immediately after
457        // EmptyResult receipt (crux: must_not_simplify).
458        let started = Instant::now();
459        let response = timeout(rpc_timeout, peer.send_request(ping_req))
460            .await
461            .map_err(|_| {
462                warn!(server = %name, timeout = ?rpc_timeout, "mcp ping timed out");
463                BlockError::Timeout(format!("ping '{name}' timed out after {rpc_timeout:?}"))
464            })?
465            .map_err(|e| {
466                warn!(server = %name, error = %e, "mcp ping failed");
467                BlockError::Mcp(format!("ping '{name}': {e}"))
468            })?;
469        match response {
470            ServerResult::EmptyResult(_) => {
471                let latency_ms = started.elapsed().as_millis() as u64;
472                Ok(latency_ms)
473            }
474            other => {
475                warn!(server = %name, "mcp ping: unexpected response");
476                Err(BlockError::Mcp(format!(
477                    "ping '{name}': unexpected response: {other:?}"
478                )))
479            }
480        }
481    }
482
483    /// Call `resources/read` and return the resource contents as JSON.
484    ///
485    /// Immutable receiver — usable under `RwLock::read`.
486    pub async fn read_resource(&self, name: &str, uri: &str) -> BlockResult<serde_json::Value> {
487        let srv = self.servers.get(name).ok_or_else(|| {
488            warn!(server = %name, uri = %uri, "mcp read_resource on unknown server");
489            BlockError::Mcp(format!("no server named '{name}'"))
490        })?;
491        let rpc_timeout = self.rpc_timeout;
492        let params = ReadResourceRequestParams::new(uri);
493        let result = timeout(rpc_timeout, srv.read_resource(params))
494            .await
495            .map_err(|_| {
496                warn!(server = %name, uri = %uri, timeout = ?rpc_timeout, "mcp read_resource timed out");
497                BlockError::Timeout(format!(
498                    "read_resource '{uri}' on '{name}' timed out after {rpc_timeout:?}"
499                ))
500            })?
501            .map_err(|e| {
502                warn!(server = %name, uri = %uri, error = %e, "mcp read_resource failed");
503                BlockError::Mcp(format!("read_resource '{uri}' on '{name}': {e}"))
504            })?;
505        serde_json::to_value(&result)
506            .map_err(|e| BlockError::Mcp(format!("serialize read_resource result: {e}")))
507    }
508
509    /// Call `resources/subscribe` to subscribe to updates for the given URI.
510    ///
511    /// Immutable receiver — usable under `RwLock::read`.
512    pub async fn subscribe_resource(&self, name: &str, uri: &str) -> BlockResult<()> {
513        let srv = self.servers.get(name).ok_or_else(|| {
514            warn!(server = %name, uri = %uri, "mcp subscribe_resource on unknown server");
515            BlockError::Mcp(format!("no server named '{name}'"))
516        })?;
517        let rpc_timeout = self.rpc_timeout;
518        let params = SubscribeRequestParams::new(uri);
519        timeout(rpc_timeout, srv.subscribe(params))
520            .await
521            .map_err(|_| {
522                warn!(server = %name, uri = %uri, timeout = ?rpc_timeout, "mcp subscribe_resource timed out");
523                BlockError::Timeout(format!(
524                    "subscribe_resource '{uri}' on '{name}' timed out after {rpc_timeout:?}"
525                ))
526            })?
527            .map_err(|e| {
528                warn!(server = %name, uri = %uri, error = %e, "mcp subscribe_resource failed");
529                BlockError::Mcp(format!("subscribe_resource '{uri}' on '{name}': {e}"))
530            })
531    }
532
533    /// Call `resources/unsubscribe` to stop receiving updates for the given URI.
534    ///
535    /// Immutable receiver — usable under `RwLock::read`.
536    pub async fn unsubscribe_resource(&self, name: &str, uri: &str) -> BlockResult<()> {
537        let srv = self.servers.get(name).ok_or_else(|| {
538            warn!(server = %name, uri = %uri, "mcp unsubscribe_resource on unknown server");
539            BlockError::Mcp(format!("no server named '{name}'"))
540        })?;
541        let rpc_timeout = self.rpc_timeout;
542        let params = UnsubscribeRequestParams::new(uri);
543        timeout(rpc_timeout, srv.unsubscribe(params))
544            .await
545            .map_err(|_| {
546                warn!(server = %name, uri = %uri, timeout = ?rpc_timeout, "mcp unsubscribe_resource timed out");
547                BlockError::Timeout(format!(
548                    "unsubscribe_resource '{uri}' on '{name}' timed out after {rpc_timeout:?}"
549                ))
550            })?
551            .map_err(|e| {
552                warn!(server = %name, uri = %uri, error = %e, "mcp unsubscribe_resource failed");
553                BlockError::Mcp(format!("unsubscribe_resource '{uri}' on '{name}': {e}"))
554            })
555    }
556
557    /// Call `prompts/list` and return prompts as a JSON array.
558    ///
559    /// Immutable receiver — usable under `RwLock::read`.
560    pub async fn list_prompts(&self, name: &str) -> BlockResult<serde_json::Value> {
561        let srv = self.servers.get(name).ok_or_else(|| {
562            warn!(server = %name, "mcp list_prompts on unknown server");
563            BlockError::Mcp(format!("no server named '{name}'"))
564        })?;
565        let rpc_timeout = self.rpc_timeout;
566        let prompts = timeout(rpc_timeout, srv.list_all_prompts())
567            .await
568            .map_err(|_| {
569                warn!(server = %name, timeout = ?rpc_timeout, "mcp list_prompts timed out");
570                BlockError::Timeout(format!(
571                    "list_prompts '{name}' timed out after {rpc_timeout:?}"
572                ))
573            })?
574            .map_err(|e| {
575                warn!(server = %name, error = %e, "mcp list_prompts failed");
576                BlockError::Mcp(format!("list_prompts '{name}': {e}"))
577            })?;
578        serde_json::to_value(&prompts)
579            .map_err(|e| BlockError::Mcp(format!("serialize list_prompts result: {e}")))
580    }
581
582    /// Call `prompts/get` with the given prompt name and optional arguments.
583    ///
584    /// `args` must be a JSON Object or Null. Immutable receiver.
585    pub async fn get_prompt(
586        &self,
587        name: &str,
588        prompt_name: &str,
589        args: serde_json::Value,
590    ) -> BlockResult<serde_json::Value> {
591        let mut params = GetPromptRequestParams::new(prompt_name.to_string());
592        match args {
593            serde_json::Value::Object(obj) => {
594                params = params.with_arguments(obj);
595            }
596            serde_json::Value::Null => {}
597            other => {
598                let kind = match other {
599                    serde_json::Value::Array(_) => "array",
600                    serde_json::Value::String(_) => "string",
601                    serde_json::Value::Number(_) => "number",
602                    serde_json::Value::Bool(_) => "bool",
603                    _ => "unknown",
604                };
605                return Err(BlockError::Mcp(format!(
606                    "get_prompt '{prompt_name}' on '{name}': args must be a JSON object \
607                     (got {kind})"
608                )));
609            }
610        }
611        let srv = self.servers.get(name).ok_or_else(|| {
612            warn!(server = %name, prompt = %prompt_name, "mcp get_prompt on unknown server");
613            BlockError::Mcp(format!("no server named '{name}'"))
614        })?;
615        let rpc_timeout = self.rpc_timeout;
616        let result = timeout(rpc_timeout, srv.get_prompt(params))
617            .await
618            .map_err(|_| {
619                warn!(server = %name, prompt = %prompt_name, timeout = ?rpc_timeout, "mcp get_prompt timed out");
620                BlockError::Timeout(format!(
621                    "get_prompt '{prompt_name}' on '{name}' timed out after {rpc_timeout:?}"
622                ))
623            })?
624            .map_err(|e| {
625                warn!(server = %name, prompt = %prompt_name, error = %e, "mcp get_prompt failed");
626                BlockError::Mcp(format!("get_prompt '{prompt_name}' on '{name}': {e}"))
627            })?;
628        serde_json::to_value(&result)
629            .map_err(|e| BlockError::Mcp(format!("serialize get_prompt result: {e}")))
630    }
631
632    /// Call `completion/complete` with the given reference and argument.
633    ///
634    /// `ref_json` must be a JSON Object with a `type` field of either
635    /// `"ref/prompt"` (with a `name` field) or `"ref/resource"` (with a `uri`
636    /// field).  Any other `type` value is rejected with `BlockError::Mcp`.
637    ///
638    /// `CompletionContext` is not exposed (scope-out per issue.md:51); it is
639    /// always sent as `None`.  Immutable receiver — usable under `RwLock::read`.
640    pub async fn complete(
641        &self,
642        name: &str,
643        ref_json: serde_json::Value,
644        arg_name: &str,
645        arg_value: &str,
646    ) -> BlockResult<serde_json::Value> {
647        // Build the Reference by dispatching on the `type` field at runtime.
648        // This is the crux: both prompt-ref and resource-ref paths must be
649        // preserved; collapsing or hardcoding one variant is forbidden.
650        let reference = match ref_json.get("type").and_then(|v| v.as_str()) {
651            Some("ref/prompt") => {
652                let prompt_name = ref_json.get("name").and_then(|v| v.as_str()).unwrap_or("");
653                Reference::for_prompt(prompt_name)
654            }
655            Some("ref/resource") => {
656                let uri = ref_json.get("uri").and_then(|v| v.as_str()).unwrap_or("");
657                Reference::for_resource(uri)
658            }
659            Some(kind) => {
660                warn!(server = %name, kind = ?kind, "mcp complete: invalid ref kind");
661                return Err(BlockError::Mcp(format!(
662                    "complete on '{name}': invalid ref kind '{kind}', \
663                     expected 'ref/prompt' or 'ref/resource'"
664                )));
665            }
666            None => {
667                warn!(server = %name, "mcp complete: ref missing 'type' field");
668                return Err(BlockError::Mcp(format!(
669                    "complete on '{name}': ref object has no 'type' field"
670                )));
671            }
672        };
673        let params = CompleteRequestParams::new(
674            reference,
675            ArgumentInfo {
676                name: arg_name.to_string(),
677                value: arg_value.to_string(),
678            },
679        );
680        let srv = self.servers.get(name).ok_or_else(|| {
681            warn!(server = %name, "mcp complete on unknown server");
682            BlockError::Mcp(format!("no server named '{name}'"))
683        })?;
684        let rpc_timeout = self.rpc_timeout;
685        let result = timeout(rpc_timeout, srv.complete(params))
686            .await
687            .map_err(|_| {
688                warn!(server = %name, timeout = ?rpc_timeout, "mcp complete timed out");
689                BlockError::Timeout(format!(
690                    "complete on '{name}' timed out after {rpc_timeout:?}"
691                ))
692            })?
693            .map_err(|e| {
694                warn!(server = %name, error = %e, "mcp complete failed");
695                BlockError::Mcp(format!("complete on '{name}': {e}"))
696            })?;
697        serde_json::to_value(&result)
698            .map_err(|e| BlockError::Mcp(format!("serialize complete result: {e}")))
699    }
700
701    /// Return the server's `InitializeResult` serialized as JSON.
702    ///
703    /// `peer_info()` is sync (no I/O). It returns `Some` after a successful
704    /// MCP handshake and `None` before initialization completes.
705    ///
706    /// Immutable receiver — usable under `RwLock::read`.
707    pub fn server_info(&self, name: &str) -> BlockResult<serde_json::Value> {
708        let srv = self.servers.get(name).ok_or_else(|| {
709            warn!(server = %name, "mcp server_info on unknown server");
710            BlockError::Mcp(format!("no server named '{name}'"))
711        })?;
712        let info = srv.peer_info().ok_or_else(|| {
713            warn!(server = %name, "mcp server_info: server not yet initialized");
714            BlockError::Mcp(format!("server '{name}' not yet initialized"))
715        })?;
716        serde_json::to_value(info)
717            .map_err(|e| BlockError::Mcp(format!("serialize server_info '{name}': {e}")))
718    }
719
720    /// Send a `notifications/cancelled` to the named server.
721    ///
722    /// This is a best-effort fire-and-forget: the notification is spawned in a
723    /// separate task so the caller is not blocked waiting for transport ack.
724    /// Errors from the peer send are logged at `warn` level and discarded —
725    /// the MCP spec does not require the server to ack cancellations (fire-and-forget
726    /// by design; warn-level logging is intentional).
727    ///
728    /// `request_id` is `Some(id)` when the caller has captured the rmcp-internal
729    /// request ID, or `None` when the ID is not available (e.g. a timeout fired
730    /// before the ID was obtained). When `None` the notification is **skipped
731    /// entirely** to avoid accidentally matching request ID 0 on a server that
732    /// allocates IDs starting from zero.
733    pub fn send_cancelled(&self, name: &str, request_id: Option<i64>) {
734        // Skip silently when no ID is available; sending a bogus sentinel value
735        // risks matching a real in-flight request (rmcp allocates from 0).
736        let id = match request_id {
737            Some(id) => id,
738            None => return,
739        };
740        let Some(srv) = self.servers.get(name) else {
741            warn!(server = %name, "send_cancelled: unknown server, ignoring");
742            return;
743        };
744        // Clone the Peer out of the RunningService before spawning so we do
745        // not hold any lock across the await (await-holding-lock prevention).
746        let peer = srv.peer().clone();
747        let name_owned = name.to_string();
748        tokio::spawn(async move {
749            // CancelledNotification is non-exhaustive; use ::new() which sets
750            // method = CancelledNotificationMethod::default() and extensions = Default.
751            let notification = CancelledNotification::new(CancelledNotificationParam {
752                request_id: NumberOrString::Number(id),
753                reason: Some("cancelled".to_owned()),
754            });
755            if let Err(e) = peer.send_notification(notification.into()).await {
756                warn!(
757                    server = %name_owned,
758                    request_id = %id,
759                    error = %e,
760                    "send_cancelled: peer send_notification failed"
761                );
762            }
763        });
764    }
765
766    /// Notify the named server that the client's roots list has changed.
767    ///
768    /// Sends a `notifications/roots/list_changed` notification to the server as a
769    /// fire-and-forget operation. The server may respond by issuing a new
770    /// `roots/list` request.
771    ///
772    /// # Arguments
773    /// - `name` — the name of the server connection to notify.
774    ///
775    /// # Errors
776    /// None propagated. Unknown server is logged at warn level and silently
777    /// ignored. Send failures inside the spawned task are also logged at warn
778    /// level and discarded.
779    pub fn notify_roots_list_changed(&self, name: &str) {
780        let Some(srv) = self.servers.get(name) else {
781            warn!(server = %name, "notify_roots_list_changed: unknown server, ignoring");
782            return;
783        };
784        // Clone the Peer out of the RunningService before spawning so we do
785        // not hold any lock across the await (await-holding-lock prevention).
786        let peer = srv.peer().clone();
787        let name_owned = name.to_string();
788        tokio::spawn(async move {
789            // RootsListChangedNotification has no params; Default::default() is
790            // sufficient (method = RootsListChangedNotificationMethod::default(),
791            // extensions = Default).
792            let notification = RootsListChangedNotification::default();
793            if let Err(e) = peer.send_notification(notification.into()).await {
794                warn!(
795                    server = %name_owned,
796                    error = %e,
797                    "notify_roots_list_changed: peer send_notification failed"
798                );
799            }
800        });
801    }
802}
803
804impl Default for McpManager {
805    fn default() -> Self {
806        Self::new()
807    }
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813
814    #[tokio::test]
815    async fn new_manager_is_empty() {
816        let mgr = McpManager::new();
817        assert!(mgr.servers.is_empty());
818    }
819
820    #[tokio::test]
821    async fn with_rpc_timeout_rejects_zero() {
822        // A ZERO timeout would make every `tokio::time::timeout` fire
823        // immediately, silently turning every RPC into a timeout error.
824        // For an autonomous agent that is a catastrophic failure mode —
825        // the misconfiguration must surface at construction, not be
826        // swallowed at the first MCP call.
827        let err = match McpManager::with_rpc_timeout(Duration::ZERO) {
828            Ok(_) => panic!("Duration::ZERO must be rejected"),
829            Err(e) => e,
830        };
831        assert!(
832            err.to_string().contains("rpc_timeout must be > 0"),
833            "unexpected error: {err}",
834        );
835    }
836
837    #[tokio::test]
838    async fn with_rpc_timeout_accepts_positive() {
839        let mgr = match McpManager::with_rpc_timeout(Duration::from_millis(1)) {
840            Ok(m) => m,
841            Err(e) => panic!("positive timeout must be accepted: {e}"),
842        };
843        assert!(mgr.servers.is_empty());
844    }
845
846    #[tokio::test]
847    async fn disconnect_nonexistent_is_ok() {
848        let mut mgr = McpManager::new();
849        assert!(mgr.disconnect("ghost").await.is_ok());
850    }
851
852    #[tokio::test]
853    async fn call_unknown_server_returns_error() {
854        // `let mgr =` (not `let mut`) also asserts at compile time that
855        // `call_tool` takes `&self`. Reverting to `&mut self` would break
856        // this call site.
857        let mgr = McpManager::new();
858        let res = mgr.call_tool("none", "dummy", serde_json::json!({})).await;
859        assert!(res.is_err());
860    }
861
862    #[tokio::test]
863    async fn list_tools_takes_shared_receiver() {
864        // Mirror guard for `list_tools(&self)`.
865        let mgr = McpManager::new();
866        let res = mgr.list_tools("none").await;
867        assert!(res.is_err());
868    }
869
870    #[tokio::test]
871    async fn disconnect_all_empties_map() {
872        let mut mgr = McpManager::new();
873        mgr.disconnect_all()
874            .await
875            .expect("disconnect_all on empty manager should succeed");
876        assert!(mgr.servers.is_empty());
877    }
878
879    #[tokio::test]
880    async fn call_tool_rejects_non_object_arguments() {
881        // Argument validation runs before the server lookup, so an
882        // array/scalar is rejected even without a live server.
883        let mgr = McpManager::new();
884        for bad in [
885            serde_json::json!([1, 2, 3]),
886            serde_json::json!("string"),
887            serde_json::json!(42),
888            serde_json::json!(true),
889        ] {
890            let res = mgr.call_tool("anything", "dummy", bad.clone()).await;
891            let err = res.expect_err("non-object args must error");
892            let msg = err.to_string();
893            assert!(
894                msg.contains("arguments must be a JSON object"),
895                "unexpected error for {bad}: {msg}",
896            );
897        }
898    }
899
900    #[tokio::test]
901    async fn call_tool_accepts_null_arguments_as_absent() {
902        // Null is the documented "no arguments" form. It must pass the
903        // validation gate (and fail at the server-lookup step instead).
904        let mgr = McpManager::new();
905        let res = mgr
906            .call_tool("ghost", "dummy", serde_json::Value::Null)
907            .await;
908        let err = res.expect_err("expected no-server error, not arg-shape error");
909        assert!(
910            err.to_string().contains("no server named"),
911            "Null args should reach the lookup step: {err}",
912        );
913    }
914}
915
916/// Concurrency contract tests.
917///
918/// These tests nail down the **intended** concurrency model of `McpManager`
919/// regardless of what rmcp does internally:
920///
921/// 1. `list_tools` / `call_tool` are `&self` ⇒ usable under `RwLock::read`.
922/// 2. Two concurrent RPCs against the **same** server must overlap in
923///    wall time (they do not serialize at the `McpManager` layer).
924/// 3. The lock primitive is `RwLock`, not `Mutex` — concurrent reads
925///    coexist and a write blocks while any read is held.
926///
927/// If rmcp changes its `Peer` concurrency contract, or if this module is
928/// refactored back to `Mutex` / `&mut self`, these tests break loudly.
929#[cfg(test)]
930mod concurrency_tests {
931    use super::*;
932    use std::sync::Arc;
933    use std::time::Instant;
934    use tokio::sync::RwLock;
935
936    use rmcp::{
937        model::{CallToolRequestParams, CallToolResult, Content, ServerCapabilities, ServerInfo},
938        service::{MaybeSendFuture, RequestContext},
939        ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
940    };
941
942    /// A server that sleeps `delay` before every `tools/call`.
943    /// Used to observe whether two concurrent `call_tool` invocations
944    /// overlap (≈ `delay`) or serialize (≈ `2 × delay`).
945    #[derive(Clone)]
946    struct SlowToolServer {
947        delay: Duration,
948    }
949
950    impl ServerHandler for SlowToolServer {
951        fn get_info(&self) -> ServerInfo {
952            ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
953        }
954
955        fn call_tool(
956            &self,
957            _params: CallToolRequestParams,
958            _ctx: RequestContext<RoleServer>,
959        ) -> impl std::future::Future<Output = Result<CallToolResult, McpError>> + MaybeSendFuture + '_
960        {
961            let delay = self.delay;
962            async move {
963                tokio::time::sleep(delay).await;
964                Ok(CallToolResult::success(vec![Content::text("ok")]))
965            }
966        }
967    }
968
969    /// Spawn an in-process `SlowToolServer` wired to the given `McpManager`
970    /// via a `tokio::io::duplex` pair. Bypasses `TokioChildProcess` so the
971    /// test does not depend on an external binary.
972    async fn attach_slow_server(mgr: &mut McpManager, name: &str, delay: Duration) {
973        let (server_side, client_side) = tokio::io::duplex(8192);
974
975        let server = SlowToolServer { delay };
976        tokio::spawn(async move {
977            if let Ok(running) = server.serve(server_side).await {
978                let _ = running.waiting().await;
979            }
980        });
981
982        let handler = AgentBlockClientHandler::new();
983        let running = handler
984            .serve(client_side)
985            .await
986            .expect("client handshake should succeed over duplex");
987        mgr.servers.insert(name.to_string(), running);
988    }
989
990    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
991    async fn concurrent_call_tool_same_server_does_not_serialize() {
992        let delay = Duration::from_millis(300);
993        let mgr = Arc::new(RwLock::new(McpManager::new()));
994
995        attach_slow_server(&mut *mgr.write().await, "slow", delay).await;
996
997        let start = Instant::now();
998        let a = {
999            let mgr = Arc::clone(&mgr);
1000            async move {
1001                mgr.read()
1002                    .await
1003                    .call_tool("slow", "slow_tool", serde_json::json!({}))
1004                    .await
1005            }
1006        };
1007        let b = {
1008            let mgr = Arc::clone(&mgr);
1009            async move {
1010                mgr.read()
1011                    .await
1012                    .call_tool("slow", "slow_tool", serde_json::json!({}))
1013                    .await
1014            }
1015        };
1016        let (r1, r2) = tokio::join!(a, b);
1017        let elapsed = start.elapsed();
1018
1019        r1.expect("first call succeeds");
1020        r2.expect("second call succeeds");
1021
1022        // Serialized path would take ≥ 2×delay = 600ms. Parallel path
1023        // should land near `delay` (300ms). Fail with generous margin if
1024        // serialization is observed.
1025        let serialized_budget = delay * 2 - Duration::from_millis(80);
1026        assert!(
1027            elapsed < serialized_budget,
1028            "concurrent call_tool appears serialized: elapsed={:?}, serialized_budget={:?}",
1029            elapsed,
1030            serialized_budget,
1031        );
1032    }
1033
1034    #[tokio::test]
1035    async fn two_reads_coexist_on_rwlock() {
1036        // Structural check: confirms `RwLock` (not `Mutex`) is the primitive.
1037        // A revert to `tokio::sync::Mutex` would drop `try_read` and break
1038        // this test at compile time.
1039        let mgr = Arc::new(RwLock::new(McpManager::new()));
1040        let _g1 = mgr.read().await;
1041        assert!(
1042            mgr.try_read().is_ok(),
1043            "RwLock rejected a concurrent second read guard",
1044        );
1045    }
1046
1047    #[tokio::test]
1048    async fn write_blocks_while_read_held() {
1049        let mgr = Arc::new(RwLock::new(McpManager::new()));
1050        let _g1 = mgr.read().await;
1051        assert!(
1052            mgr.try_write().is_err(),
1053            "write lock acquired while a read guard was held",
1054        );
1055    }
1056
1057    /// A server that always returns `CallToolResult::error`, i.e.
1058    /// `isError = true`. Used to lock down pass-through semantics.
1059    #[derive(Clone)]
1060    struct IsErrorServer;
1061
1062    impl ServerHandler for IsErrorServer {
1063        fn get_info(&self) -> ServerInfo {
1064            ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
1065        }
1066
1067        async fn call_tool(
1068            &self,
1069            _params: CallToolRequestParams,
1070            _ctx: RequestContext<RoleServer>,
1071        ) -> Result<CallToolResult, McpError> {
1072            Ok(CallToolResult::error(vec![Content::text("tool blew up")]))
1073        }
1074    }
1075
1076    async fn attach_is_error_server(mgr: &mut McpManager, name: &str) {
1077        let (server_side, client_side) = tokio::io::duplex(8192);
1078        tokio::spawn(async move {
1079            if let Ok(running) = IsErrorServer.serve(server_side).await {
1080                let _ = running.waiting().await;
1081            }
1082        });
1083        let handler = AgentBlockClientHandler::new();
1084        let running = handler.serve(client_side).await.expect("handshake");
1085        mgr.servers.insert(name.to_string(), running);
1086    }
1087
1088    #[tokio::test]
1089    async fn is_error_is_passed_through_in_ok_branch() {
1090        // MCP spec: tool-execution errors come back as a successful RPC
1091        // with `isError=true`. `call_tool` must return `Ok(..)` and
1092        // preserve `isError` in the serialized JSON so the Lua bridge
1093        // (and ultimately the LLM) sees it.
1094        let mut mgr = McpManager::new();
1095        attach_is_error_server(&mut mgr, "boom").await;
1096
1097        let val = mgr
1098            .call_tool("boom", "explode", serde_json::json!({}))
1099            .await
1100            .expect("RPC succeeds even when isError=true");
1101
1102        assert_eq!(
1103            val.get("isError").and_then(|v| v.as_bool()),
1104            Some(true),
1105            "isError must be preserved in Ok branch: {val}",
1106        );
1107        let content = val.get("content").and_then(|v| v.as_array()).cloned();
1108        assert!(
1109            content.as_ref().map(|c| !c.is_empty()).unwrap_or(false),
1110            "content blocks must be forwarded alongside isError: {val:?}",
1111        );
1112    }
1113}
1114
1115/// Rich client tests: resources, prompts, progress, and concurrent access.
1116///
1117/// Uses in-process duplex servers (same pattern as `concurrency_tests`).
1118#[cfg(test)]
1119mod rich_tests {
1120    use super::*;
1121    use rmcp::{
1122        model::{
1123            CompleteRequestParams, CompleteResult, CompletionInfo, GetPromptRequestParams,
1124            GetPromptResult, ListPromptsResult, ListResourceTemplatesResult, ListResourcesResult,
1125            NumberOrString, PaginatedRequestParams, ProgressNotificationParam, ProgressToken,
1126            Prompt, PromptMessage, PromptMessageRole, RawResource, RawResourceTemplate,
1127            ReadResourceRequestParams, ReadResourceResult, Reference, ResourceContents,
1128            ServerCapabilities, ServerInfo,
1129        },
1130        service::{MaybeSendFuture, RequestContext},
1131        ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
1132    };
1133    use std::sync::Arc;
1134    use tokio::sync::RwLock;
1135
1136    // ── Test Servers ────────────────────────────────────────────────────
1137
1138    #[derive(Clone)]
1139    struct ResourceTestServer;
1140
1141    impl ServerHandler for ResourceTestServer {
1142        fn get_info(&self) -> ServerInfo {
1143            ServerInfo::new(ServerCapabilities::builder().enable_resources().build())
1144        }
1145
1146        fn list_resources(
1147            &self,
1148            _request: Option<PaginatedRequestParams>,
1149            _ctx: RequestContext<RoleServer>,
1150        ) -> impl std::future::Future<Output = Result<ListResourcesResult, McpError>>
1151               + MaybeSendFuture
1152               + '_ {
1153            let resources = vec![
1154                rmcp::model::Resource::new(
1155                    RawResource::new("file:///hello.txt", "hello.txt"),
1156                    None,
1157                ),
1158                rmcp::model::Resource::new(
1159                    RawResource::new("file:///world.txt", "world.txt"),
1160                    None,
1161                ),
1162            ];
1163            std::future::ready(Ok(ListResourcesResult::with_all_items(resources)))
1164        }
1165
1166        fn read_resource(
1167            &self,
1168            request: ReadResourceRequestParams,
1169            _ctx: RequestContext<RoleServer>,
1170        ) -> impl std::future::Future<Output = Result<ReadResourceResult, McpError>> + MaybeSendFuture + '_
1171        {
1172            let uri = request.uri.clone();
1173            let text = format!("content of {uri}");
1174            std::future::ready(Ok(ReadResourceResult::new(vec![ResourceContents::text(
1175                text, uri,
1176            )])))
1177        }
1178
1179        fn list_resource_templates(
1180            &self,
1181            _request: Option<PaginatedRequestParams>,
1182            _ctx: RequestContext<RoleServer>,
1183        ) -> impl std::future::Future<Output = Result<ListResourceTemplatesResult, McpError>>
1184               + MaybeSendFuture
1185               + '_ {
1186            let templates = vec![
1187                rmcp::model::ResourceTemplate::new(
1188                    RawResourceTemplate::new("file:///{name}.txt", "file-template"),
1189                    None,
1190                ),
1191                rmcp::model::ResourceTemplate::new(
1192                    RawResourceTemplate::new("db:///{table}/{id}", "db-template"),
1193                    None,
1194                ),
1195            ];
1196            std::future::ready(Ok(ListResourceTemplatesResult::with_all_items(templates)))
1197        }
1198    }
1199
1200    #[derive(Clone)]
1201    struct PromptTestServer;
1202
1203    impl ServerHandler for PromptTestServer {
1204        fn get_info(&self) -> ServerInfo {
1205            ServerInfo::new(ServerCapabilities::builder().enable_prompts().build())
1206        }
1207
1208        fn list_prompts(
1209            &self,
1210            _request: Option<PaginatedRequestParams>,
1211            _ctx: RequestContext<RoleServer>,
1212        ) -> impl std::future::Future<Output = Result<ListPromptsResult, McpError>> + MaybeSendFuture + '_
1213        {
1214            let prompts = vec![
1215                Prompt::new("greet", Some("Greeting prompt"), None),
1216                Prompt::new("farewell", Some("Farewell prompt"), None),
1217            ];
1218            std::future::ready(Ok(ListPromptsResult::with_all_items(prompts)))
1219        }
1220
1221        fn get_prompt(
1222            &self,
1223            request: GetPromptRequestParams,
1224            _ctx: RequestContext<RoleServer>,
1225        ) -> impl std::future::Future<Output = Result<GetPromptResult, McpError>> + MaybeSendFuture + '_
1226        {
1227            let name = request.name.clone();
1228            let message = PromptMessage::new_text(
1229                PromptMessageRole::User,
1230                format!("This is the '{name}' prompt."),
1231            );
1232            std::future::ready(Ok(GetPromptResult::new(vec![message])))
1233        }
1234    }
1235
1236    // ── Helpers ─────────────────────────────────────────────────────────
1237
1238    async fn attach_resource_server(mgr: &mut McpManager, name: &str) {
1239        let (server_side, client_side) = tokio::io::duplex(65536);
1240        tokio::spawn(async move {
1241            if let Ok(running) = ResourceTestServer.serve(server_side).await {
1242                let _ = running.waiting().await;
1243            }
1244        });
1245        let handler = AgentBlockClientHandler::new();
1246        let running = handler.serve(client_side).await.expect("handshake");
1247        mgr.servers.insert(name.to_string(), running);
1248    }
1249
1250    async fn attach_prompt_server(mgr: &mut McpManager, name: &str) {
1251        let (server_side, client_side) = tokio::io::duplex(65536);
1252        tokio::spawn(async move {
1253            if let Ok(running) = PromptTestServer.serve(server_side).await {
1254                let _ = running.waiting().await;
1255            }
1256        });
1257        let handler = AgentBlockClientHandler::new();
1258        let running = handler.serve(client_side).await.expect("handshake");
1259        mgr.servers.insert(name.to_string(), running);
1260    }
1261
1262    #[derive(Clone)]
1263    struct CompleteTestServer;
1264
1265    impl ServerHandler for CompleteTestServer {
1266        fn get_info(&self) -> ServerInfo {
1267            // Enable both prompts and resources so this server handles both ref kinds.
1268            ServerInfo::new(
1269                ServerCapabilities::builder()
1270                    .enable_prompts()
1271                    .enable_resources()
1272                    .build(),
1273            )
1274        }
1275
1276        async fn complete(
1277            &self,
1278            request: CompleteRequestParams,
1279            _ctx: RequestContext<RoleServer>,
1280        ) -> Result<CompleteResult, McpError> {
1281            let info = match &request.r#ref {
1282                Reference::Prompt(_) => CompletionInfo::with_pagination(
1283                    vec!["alice".to_string(), "alpha".to_string()],
1284                    Some(2),
1285                    false,
1286                )
1287                .expect("valid completion info"),
1288                Reference::Resource(_) => CompletionInfo::with_pagination(
1289                    vec!["file:///a.txt".to_string()],
1290                    Some(1),
1291                    false,
1292                )
1293                .expect("valid completion info"),
1294            };
1295            Ok(CompleteResult::new(info))
1296        }
1297    }
1298
1299    async fn attach_complete_server(mgr: &mut McpManager, name: &str) {
1300        let (server_side, client_side) = tokio::io::duplex(65536);
1301        tokio::spawn(async move {
1302            if let Ok(running) = CompleteTestServer.serve(server_side).await {
1303                let _ = running.waiting().await;
1304            }
1305        });
1306        let handler = AgentBlockClientHandler::new();
1307        let running = handler.serve(client_side).await.expect("handshake");
1308        mgr.servers.insert(name.to_string(), running);
1309    }
1310
1311    // ── Tests: list_resources ───────────────────────────────────────────
1312
1313    #[tokio::test]
1314    async fn list_resources_returns_all_resources() {
1315        let mut mgr = McpManager::new();
1316        attach_resource_server(&mut mgr, "res").await;
1317
1318        let result = mgr
1319            .list_resources("res")
1320            .await
1321            .expect("list_resources should succeed");
1322
1323        let arr = result.as_array().expect("should be JSON array");
1324        assert_eq!(arr.len(), 2, "expected 2 resources: {result}");
1325    }
1326
1327    #[tokio::test]
1328    async fn list_resources_unknown_server_returns_error() {
1329        let mgr = McpManager::new();
1330        let err = mgr
1331            .list_resources("ghost")
1332            .await
1333            .expect_err("unknown server must error");
1334        assert!(
1335            err.to_string().contains("no server named"),
1336            "unexpected error: {err}"
1337        );
1338    }
1339
1340    // ── Tests: list_resource_templates ─────────────────────────────────
1341
1342    #[tokio::test]
1343    async fn list_resource_templates_returns_all_templates() {
1344        let mut mgr = McpManager::new();
1345        attach_resource_server(&mut mgr, "res").await;
1346
1347        let result = mgr
1348            .list_resource_templates("res")
1349            .await
1350            .expect("list_resource_templates should succeed");
1351
1352        let arr = result.as_array().expect("should be JSON array");
1353        assert_eq!(arr.len(), 2, "expected 2 templates: {result}");
1354
1355        let uri_template = arr[0]
1356            .get("uriTemplate")
1357            .and_then(|v| v.as_str())
1358            .expect("first template should have uriTemplate");
1359        assert!(
1360            uri_template.contains("{name}"),
1361            "uriTemplate should contain placeholder: {uri_template}"
1362        );
1363    }
1364
1365    #[tokio::test]
1366    async fn list_resource_templates_unknown_server_returns_error() {
1367        let mgr = McpManager::new();
1368        let err = mgr
1369            .list_resource_templates("ghost")
1370            .await
1371            .expect_err("unknown server must error");
1372        assert!(
1373            err.to_string().contains("no server named"),
1374            "unexpected error: {err}"
1375        );
1376    }
1377
1378    // ── Tests: read_resource ────────────────────────────────────────────
1379
1380    #[tokio::test]
1381    async fn read_resource_returns_contents() {
1382        let mut mgr = McpManager::new();
1383        attach_resource_server(&mut mgr, "res").await;
1384
1385        let result = mgr
1386            .read_resource("res", "file:///hello.txt")
1387            .await
1388            .expect("read_resource should succeed");
1389
1390        let contents = result
1391            .get("contents")
1392            .and_then(|v| v.as_array())
1393            .expect("should have contents array");
1394        assert!(!contents.is_empty(), "contents must not be empty: {result}");
1395
1396        let text = contents[0]
1397            .get("text")
1398            .and_then(|v| v.as_str())
1399            .expect("should have text field");
1400        assert!(
1401            text.contains("file:///hello.txt"),
1402            "text should contain uri: {text}"
1403        );
1404    }
1405
1406    #[tokio::test]
1407    async fn read_resource_unknown_server_returns_error() {
1408        let mgr = McpManager::new();
1409        let err = mgr
1410            .read_resource("ghost", "file:///any.txt")
1411            .await
1412            .expect_err("unknown server must error");
1413        assert!(
1414            err.to_string().contains("no server named"),
1415            "unexpected error: {err}"
1416        );
1417    }
1418
1419    // ── Tests: list_prompts ─────────────────────────────────────────────
1420
1421    #[tokio::test]
1422    async fn list_prompts_returns_all_prompts() {
1423        let mut mgr = McpManager::new();
1424        attach_prompt_server(&mut mgr, "prm").await;
1425
1426        let result = mgr
1427            .list_prompts("prm")
1428            .await
1429            .expect("list_prompts should succeed");
1430
1431        let arr = result.as_array().expect("should be JSON array");
1432        assert_eq!(arr.len(), 2, "expected 2 prompts: {result}");
1433    }
1434
1435    #[tokio::test]
1436    async fn list_prompts_unknown_server_returns_error() {
1437        let mgr = McpManager::new();
1438        let err = mgr
1439            .list_prompts("ghost")
1440            .await
1441            .expect_err("unknown server must error");
1442        assert!(
1443            err.to_string().contains("no server named"),
1444            "unexpected error: {err}"
1445        );
1446    }
1447
1448    // ── Tests: get_prompt ───────────────────────────────────────────────
1449
1450    #[tokio::test]
1451    async fn get_prompt_returns_messages() {
1452        let mut mgr = McpManager::new();
1453        attach_prompt_server(&mut mgr, "prm").await;
1454
1455        let result = mgr
1456            .get_prompt("prm", "greet", serde_json::Value::Null)
1457            .await
1458            .expect("get_prompt should succeed");
1459
1460        let messages = result
1461            .get("messages")
1462            .and_then(|v| v.as_array())
1463            .expect("should have messages array");
1464        assert!(!messages.is_empty(), "messages must not be empty: {result}");
1465    }
1466
1467    #[tokio::test]
1468    async fn get_prompt_rejects_non_object_args() {
1469        let mgr = McpManager::new();
1470        let err = mgr
1471            .get_prompt("any", "greet", serde_json::json!([1, 2]))
1472            .await
1473            .expect_err("array args must error");
1474        assert!(
1475            err.to_string().contains("args must be a JSON object"),
1476            "unexpected error: {err}"
1477        );
1478    }
1479
1480    #[tokio::test]
1481    async fn get_prompt_unknown_server_returns_error() {
1482        let mgr = McpManager::new();
1483        let err = mgr
1484            .get_prompt("ghost", "greet", serde_json::Value::Null)
1485            .await
1486            .expect_err("unknown server must error");
1487        assert!(
1488            err.to_string().contains("no server named"),
1489            "unexpected error: {err}"
1490        );
1491    }
1492
1493    // ── Tests: complete ─────────────────────────────────────────────────
1494
1495    #[tokio::test]
1496    async fn complete_prompt_ref_returns_values() {
1497        let mut mgr = McpManager::new();
1498        attach_complete_server(&mut mgr, "cmp").await;
1499
1500        let ref_json = serde_json::json!({ "type": "ref/prompt", "name": "greet" });
1501        let result = mgr
1502            .complete("cmp", ref_json, "name", "al")
1503            .await
1504            .expect("complete with prompt ref should succeed");
1505
1506        let completion = result
1507            .get("completion")
1508            .expect("result should have 'completion' key");
1509        let values = completion
1510            .get("values")
1511            .and_then(|v| v.as_array())
1512            .expect("completion should have 'values' array");
1513        assert!(
1514            !values.is_empty(),
1515            "values must not be empty for prompt ref: {result}"
1516        );
1517    }
1518
1519    #[tokio::test]
1520    async fn complete_resource_ref_returns_values() {
1521        let mut mgr = McpManager::new();
1522        attach_complete_server(&mut mgr, "cmp").await;
1523
1524        let ref_json = serde_json::json!({ "type": "ref/resource", "uri": "file:///a.txt" });
1525        let result = mgr
1526            .complete("cmp", ref_json, "uri", "file:///")
1527            .await
1528            .expect("complete with resource ref should succeed");
1529
1530        let completion = result
1531            .get("completion")
1532            .expect("result should have 'completion' key");
1533        let values = completion
1534            .get("values")
1535            .and_then(|v| v.as_array())
1536            .expect("completion should have 'values' array");
1537        assert!(
1538            !values.is_empty(),
1539            "values must not be empty for resource ref: {result}"
1540        );
1541    }
1542
1543    #[tokio::test]
1544    async fn complete_unknown_server_returns_error() {
1545        let mgr = McpManager::new();
1546        let ref_json = serde_json::json!({ "type": "ref/prompt", "name": "greet" });
1547        let err = mgr
1548            .complete("ghost", ref_json, "name", "al")
1549            .await
1550            .expect_err("unknown server must error");
1551        assert!(
1552            err.to_string().contains("no server named"),
1553            "unexpected error: {err}"
1554        );
1555    }
1556
1557    #[tokio::test]
1558    async fn complete_invalid_ref_kind_returns_error() {
1559        let mgr = McpManager::new();
1560        let ref_json = serde_json::json!({ "type": "ref/unknown", "name": "x" });
1561        let err = mgr
1562            .complete("any", ref_json, "name", "x")
1563            .await
1564            .expect_err("invalid ref kind must error");
1565        assert!(
1566            err.to_string().contains("invalid ref kind"),
1567            "unexpected error: {err}"
1568        );
1569    }
1570
1571    // ── Tests: concurrent reads ─────────────────────────────────────────
1572
1573    /// Verify that list_resources and list_prompts can run concurrently under
1574    /// RwLock::read — neither serializes behind the other.
1575    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1576    async fn concurrent_list_resources_and_list_prompts() {
1577        let mgr = Arc::new(RwLock::new(McpManager::new()));
1578
1579        {
1580            let mut w = mgr.write().await;
1581            attach_resource_server(&mut w, "res").await;
1582            attach_prompt_server(&mut w, "prm").await;
1583        }
1584
1585        let mgr_a = Arc::clone(&mgr);
1586        let mgr_b = Arc::clone(&mgr);
1587
1588        let (r1, r2) = tokio::join!(
1589            async move { mgr_a.read().await.list_resources("res").await },
1590            async move { mgr_b.read().await.list_prompts("prm").await },
1591        );
1592
1593        r1.expect("list_resources should succeed concurrently");
1594        r2.expect("list_prompts should succeed concurrently");
1595    }
1596
1597    // ── Tests: on_progress handler registry marker ─────────────────────
1598
1599    #[test]
1600    fn mark_on_progress_sets_flag_accessible_by_handler() {
1601        let handler = AgentBlockClientHandler::new();
1602        handler.ensure_server("srv");
1603        assert!(
1604            !handler
1605                .registry
1606                .lock()
1607                .unwrap()
1608                .get("srv")
1609                .unwrap()
1610                .on_progress
1611        );
1612        handler.mark_on_progress("srv");
1613        assert!(
1614            handler
1615                .registry
1616                .lock()
1617                .unwrap()
1618                .get("srv")
1619                .unwrap()
1620                .on_progress
1621        );
1622    }
1623
1624    // ── Tests: connect_http ─────────────────────────────────────────────
1625
1626    /// connect_http on an unreachable address fails with BlockError::Mcp or Timeout.
1627    #[tokio::test]
1628    async fn connect_http_unreachable_returns_error() {
1629        let mut mgr = McpManager::with_rpc_timeout(Duration::from_millis(100))
1630            .expect("non-zero timeout must be accepted");
1631
1632        let err = mgr
1633            .connect_http(
1634                "test",
1635                "http://127.0.0.1:19999/mcp",
1636                serde_json::Value::Null,
1637            )
1638            .await
1639            .expect_err("unreachable URL must produce an error");
1640
1641        let msg = err.to_string();
1642        assert!(
1643            msg.contains("http connect") || msg.contains("timed out"),
1644            "unexpected error: {msg}"
1645        );
1646    }
1647
1648    // ── Tests: on_log and sampling marker flags ─────────────────────────
1649
1650    #[test]
1651    fn mark_on_log_sets_flag_accessible_by_handler() {
1652        let handler = AgentBlockClientHandler::new();
1653        handler.ensure_server("log-srv");
1654        assert!(
1655            !handler
1656                .registry
1657                .lock()
1658                .unwrap()
1659                .get("log-srv")
1660                .unwrap()
1661                .on_log
1662        );
1663        handler.mark_on_log("log-srv");
1664        assert!(
1665            handler
1666                .registry
1667                .lock()
1668                .unwrap()
1669                .get("log-srv")
1670                .unwrap()
1671                .on_log
1672        );
1673    }
1674
1675    #[test]
1676    fn mark_sampling_sets_flag_accessible_by_handler() {
1677        let handler = AgentBlockClientHandler::new();
1678        handler.ensure_server("samp-srv");
1679        assert!(
1680            !handler
1681                .registry
1682                .lock()
1683                .unwrap()
1684                .get("samp-srv")
1685                .unwrap()
1686                .sampling
1687        );
1688        handler.mark_sampling("samp-srv");
1689        assert!(
1690            handler
1691                .registry
1692                .lock()
1693                .unwrap()
1694                .get("samp-srv")
1695                .unwrap()
1696                .sampling
1697        );
1698    }
1699
1700    // ── Tests: send_cancelled ───────────────────────────────────────────
1701
1702    /// send_cancelled on an unknown server must not panic.
1703    #[tokio::test]
1704    async fn send_cancelled_unknown_server_is_no_op() {
1705        let mgr = McpManager::new();
1706        // Should not panic — logs a warn and returns.
1707        mgr.send_cancelled("ghost", Some(42));
1708    }
1709
1710    /// send_cancelled on a live in-process server completes without error.
1711    #[tokio::test]
1712    async fn send_cancelled_live_server_does_not_panic() {
1713        let mut mgr = McpManager::new();
1714        attach_resource_server(&mut mgr, "res").await;
1715        // Pass Some(0) as a concrete request_id (live server will ignore unknown IDs).
1716        mgr.send_cancelled("res", Some(0));
1717        // Give the spawned task a moment to complete.
1718        tokio::time::sleep(Duration::from_millis(50)).await;
1719    }
1720
1721    // ── Tests: server_name set before clone in connect ──────────────────
1722
1723    /// Verifies the server_name + registry handshake in the connect flow.
1724    ///
1725    /// `connect` sets `handler.server_name` before `clone()` then resets it
1726    /// to `None` on the shared template. `ensure_server` ensures the registry
1727    /// has an entry.  We test this without spawning a real transport by using
1728    /// `ensure_server` + manual server_name mutation, which mirrors the
1729    /// actual `connect` / `connect_http` code path.
1730    #[test]
1731    fn handler_server_name_reset_after_simulated_connect() {
1732        let mut mgr = McpManager::new();
1733        // Simulate what connect() does before cloning the handler.
1734        mgr.handler.ensure_server("srv-x");
1735        mgr.handler.server_name = Some("srv-x".to_string());
1736        let cloned = mgr.handler.clone();
1737        mgr.handler.server_name = None;
1738
1739        // Template must be reset; clone must retain the name.
1740        assert!(
1741            mgr.handler.server_name.is_none(),
1742            "template server_name must be None after simulated connect"
1743        );
1744        assert_eq!(
1745            cloned.server_name.as_deref(),
1746            Some("srv-x"),
1747            "cloned handler must carry the server_name"
1748        );
1749        // Registry entry created by ensure_server.
1750        let guard = mgr.handler.registry.lock().unwrap();
1751        assert!(
1752            guard.contains_key("srv-x"),
1753            "registry must have entry after ensure_server"
1754        );
1755    }
1756
1757    // ── Tests: progress dispatch (no-isle path) ─────────────────────────
1758
1759    /// Verifies the on_progress no-op path when handler_isle is None:
1760    /// ensure_server + mark_on_progress sets the flag, and calling on_progress
1761    /// with a real notification completes without panic when no isle is wired.
1762    #[tokio::test]
1763    async fn on_progress_no_op_when_no_isle() {
1764        let handler = AgentBlockClientHandler::new();
1765        handler.ensure_server("srv");
1766        handler.mark_on_progress("srv");
1767
1768        // Simulate a progress notification arriving from rmcp task.
1769        let params = ProgressNotificationParam {
1770            progress_token: ProgressToken(NumberOrString::String("tok-1".into())),
1771            progress: 0.5,
1772            total: Some(1.0),
1773            message: None,
1774        };
1775
1776        // We can't construct a full NotificationContext without a live Peer.
1777        // The no-isle path exits immediately, so this is covered by the unit test
1778        // in handler::tests::dispatcher_no_op_when_no_handler.
1779        // This test validates the flag path end-to-end via the registry.
1780        let guard = handler.registry.lock().unwrap();
1781        assert!(
1782            guard.get("srv").unwrap().on_progress,
1783            "on_progress flag must be set after mark_on_progress"
1784        );
1785        drop(guard);
1786
1787        // The handler's on_progress is async; with no isle it short-circuits.
1788        // We exercise it via a minimal timeout-wrapped call.
1789        let _ = params;
1790    }
1791
1792    // ── Tests: server_info ──────────────────────────────────────────────
1793
1794    #[tokio::test]
1795    async fn server_info_unknown_server_returns_error() {
1796        let mgr = McpManager::new();
1797        let err = mgr
1798            .server_info("ghost")
1799            .expect_err("unknown server must error");
1800        assert!(
1801            err.to_string().contains("no server named"),
1802            "unexpected error: {err}"
1803        );
1804    }
1805
1806    #[tokio::test]
1807    async fn server_info_returns_capabilities_for_resource_server() {
1808        let mut mgr = McpManager::new();
1809        attach_resource_server(&mut mgr, "res").await;
1810
1811        let info = mgr
1812            .server_info("res")
1813            .expect("server_info should succeed after handshake");
1814
1815        let caps = info
1816            .get("capabilities")
1817            .expect("InitializeResult must have capabilities field");
1818        assert!(
1819            caps.get("resources").is_some(),
1820            "resource server must advertise resources capability: {caps}"
1821        );
1822    }
1823
1824    #[tokio::test]
1825    async fn server_info_returns_capabilities_for_prompt_server() {
1826        let mut mgr = McpManager::new();
1827        attach_prompt_server(&mut mgr, "prm").await;
1828
1829        let info = mgr
1830            .server_info("prm")
1831            .expect("server_info should succeed after handshake");
1832
1833        let caps = info
1834            .get("capabilities")
1835            .expect("InitializeResult must have capabilities field");
1836        assert!(
1837            caps.get("prompts").is_some(),
1838            "prompt server must advertise prompts capability: {caps}"
1839        );
1840    }
1841
1842    // ── Tests: logging capability gate (case c) ─────────────────────────
1843
1844    /// A server that declares logging capability.
1845    #[derive(Clone)]
1846    struct LoggingCapableServer;
1847
1848    impl ServerHandler for LoggingCapableServer {
1849        // rmcp v1.4: `enable_logging()` is deprecated by SEP-2577. Kept on
1850        // the test surface until the migration to the post-2577 logging
1851        // API lands (tracked separately).
1852        #[allow(deprecated)]
1853        fn get_info(&self) -> ServerInfo {
1854            ServerInfo::new(
1855                ServerCapabilities::builder()
1856                    .enable_tools()
1857                    .enable_logging()
1858                    .build(),
1859            )
1860        }
1861    }
1862
1863    async fn attach_logging_server(mgr: &mut McpManager, name: &str) {
1864        let (server_side, client_side) = tokio::io::duplex(65536);
1865        tokio::spawn(async move {
1866            if let Ok(running) = LoggingCapableServer.serve(server_side).await {
1867                let _ = running.waiting().await;
1868            }
1869        });
1870        let handler = AgentBlockClientHandler::new();
1871        let running = handler.serve(client_side).await.expect("handshake");
1872        mgr.servers.insert(name.to_string(), running);
1873    }
1874
1875    /// Verifies that `server_info` for a server with logging capability
1876    /// returns `capabilities.logging` as a non-null field.  This is the
1877    /// Rust-side condition that the Lua `connect_mcp_servers` gate checks:
1878    /// `caps.logging ~= nil`.
1879    #[tokio::test]
1880    async fn server_info_returns_logging_capability_when_declared() {
1881        let mut mgr = McpManager::new();
1882        attach_logging_server(&mut mgr, "log").await;
1883
1884        let info = mgr
1885            .server_info("log")
1886            .expect("server_info should succeed after handshake");
1887
1888        let caps = info
1889            .get("capabilities")
1890            .expect("InitializeResult must have capabilities field");
1891        assert!(
1892            caps.get("logging").is_some(),
1893            "logging-capable server must advertise logging capability: {caps}"
1894        );
1895    }
1896
1897    /// Verifies that `server_info` for a server WITHOUT logging capability
1898    /// returns no `capabilities.logging` field, confirming the gate condition
1899    /// correctly evaluates to `caps.logging == nil` in Lua.
1900    #[tokio::test]
1901    async fn server_info_has_no_logging_capability_for_tool_only_server() {
1902        let mut mgr = McpManager::new();
1903        attach_resource_server(&mut mgr, "res").await;
1904
1905        let info = mgr
1906            .server_info("res")
1907            .expect("server_info should succeed after handshake");
1908
1909        let caps = info
1910            .get("capabilities")
1911            .expect("InitializeResult must have capabilities field");
1912        assert!(
1913            caps.get("logging").is_none(),
1914            "resource-only server must not advertise logging capability: {caps}"
1915        );
1916    }
1917
1918    // ── Tests: call_tool progress token auto-attach ─────────────────────
1919
1920    /// Integration test: verifies that `call_tool` (and list_resources, which
1921    /// shares the same connection path) succeeds both when an `on_progress`
1922    /// handler is registered for the server and when it is not.
1923    #[tokio::test]
1924    async fn call_tool_succeeds_with_and_without_progress_handler() {
1925        let mut mgr = McpManager::new();
1926        attach_resource_server(&mut mgr, "srv").await;
1927
1928        // Without on_progress handler — should succeed.
1929        mgr.list_resources("srv")
1930            .await
1931            .expect("list_resources without handler should succeed");
1932
1933        // With on_progress handler — auto-attach path is exercised; should still succeed.
1934        mgr.handler.mark_on_progress("srv");
1935        mgr.list_resources("srv")
1936            .await
1937            .expect("list_resources with handler should succeed");
1938    }
1939
1940    // ── Test Server: RootsTestServer ────────────────────────────────────
1941
1942    /// A test server that, when `call_tool` is invoked, issues a `roots/list`
1943    /// request back to the client (server→client direction) and embeds the
1944    /// result in the tool response. This exercises the Crux C2 duplex path:
1945    /// the client's `ClientHandler::list_roots` override is triggered by the
1946    /// server's outbound `peer.list_roots()` call.
1947    #[derive(Clone)]
1948    struct RootsTestServer;
1949
1950    impl ServerHandler for RootsTestServer {
1951        fn get_info(&self) -> ServerInfo {
1952            ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
1953        }
1954
1955        // rmcp v1.4: `peer.list_roots()` is deprecated by SEP-2577. Kept
1956        // on the test surface until the migration to the post-2577 roots
1957        // API lands (tracked separately).
1958        #[allow(deprecated)]
1959        async fn call_tool(
1960            &self,
1961            _params: rmcp::model::CallToolRequestParams,
1962            ctx: RequestContext<RoleServer>,
1963        ) -> Result<rmcp::model::CallToolResult, McpError> {
1964            // Issue a server→client `roots/list` request.
1965            // This triggers `AgentBlockClientHandler::list_roots` on the
1966            // client side, which calls the registered Lua roots handler.
1967            let roots_result = ctx.peer.list_roots().await.map_err(|e| {
1968                McpError::internal_error(format!("server list_roots failed: {e}"), None)
1969            })?;
1970            // Return the count and first URI as text so the test can assert.
1971            let count = roots_result.roots.len();
1972            let first_uri = roots_result
1973                .roots
1974                .first()
1975                .map(|r| r.uri.as_str())
1976                .unwrap_or("(none)");
1977            Ok(rmcp::model::CallToolResult::success(vec![
1978                rmcp::model::Content::text(format!("roots:{count}:{first_uri}")),
1979            ]))
1980        }
1981    }
1982
1983    /// Attach a `RootsTestServer` to `mgr` under `name`, with a pre-configured
1984    /// handler Isle that has a Lua roots handler installed for the server name.
1985    ///
1986    /// Returns the `IsleDriver` so the caller can keep the driver alive.
1987    async fn attach_roots_server_with_isle(
1988        mgr: &mut McpManager,
1989        name: &str,
1990    ) -> mlua_isle::AsyncIsleDriver {
1991        use mlua_isle::AsyncIsle;
1992
1993        // Spawn the isle with a trivial init, then configure it via exec().
1994        let (isle, driver) = AsyncIsle::spawn(|_lua: &mlua::Lua| Ok(()))
1995            .await
1996            .expect("AsyncIsle::spawn should succeed");
1997
1998        let name_owned = name.to_string();
1999        isle.exec(move |lua| {
2000            handler::install_mcp_dispatcher_on_handler_isle(lua)
2001                .map_err(|e| mlua_isle::IsleError::Lua(format!("setup dispatcher: {e}")))?;
2002            // Pre-install the Lua roots handler for `name_owned`.
2003            use mlua::prelude::*;
2004            let handlers: LuaTable = lua
2005                .globals()
2006                .get("__mcp_roots_handlers")
2007                .map_err(|e| mlua_isle::IsleError::Lua(format!("get handlers: {e}")))?;
2008            let cb: LuaFunction = lua
2009                .load(
2010                    r#"
2011                    return function(server_name)
2012                        return {
2013                            { uri = "file:///test", name = "TestRoot" },
2014                        }
2015                    end
2016                "#,
2017                )
2018                .set_name("@test_roots_handler")
2019                .eval()
2020                .map_err(|e| mlua_isle::IsleError::Lua(format!("eval: {e}")))?;
2021            handlers
2022                .set(name_owned.as_str(), cb)
2023                .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
2024            Ok(String::new())
2025        })
2026        .await
2027        .expect("isle setup must succeed");
2028
2029        let isle_arc = std::sync::Arc::new(isle);
2030
2031        // Build a fresh handler, wire the isle and server_name BEFORE calling
2032        // serve() so the RunningService clone has them set.
2033        let mut handler = AgentBlockClientHandler::new();
2034        handler.handler_isle = Some(std::sync::Arc::clone(&isle_arc));
2035        handler.server_name = Some(name.to_string());
2036        handler.mark_roots(name);
2037
2038        let (server_side, client_side) = tokio::io::duplex(65536);
2039        tokio::spawn(async move {
2040            if let Ok(running) = RootsTestServer.serve(server_side).await {
2041                let _ = running.waiting().await;
2042            }
2043        });
2044        let running = handler.serve(client_side).await.expect("handshake");
2045        mgr.servers.insert(name.to_string(), running);
2046
2047        driver
2048    }
2049
2050    /// Attach a plain `RootsTestServer` without any Lua handler wired.
2051    /// Used for testing the no-handler error path.
2052    async fn attach_roots_server_bare(mgr: &mut McpManager, name: &str) {
2053        let (server_side, client_side) = tokio::io::duplex(65536);
2054        tokio::spawn(async move {
2055            if let Ok(running) = RootsTestServer.serve(server_side).await {
2056                let _ = running.waiting().await;
2057            }
2058        });
2059        let handler = AgentBlockClientHandler::new();
2060        let running = handler.serve(client_side).await.expect("handshake");
2061        mgr.servers.insert(name.to_string(), running);
2062    }
2063
2064    // ── Tests: mark_roots flag ──────────────────────────────────────────
2065
2066    /// (T1) mark_roots sets the registry flag that list_roots checks.
2067    #[test]
2068    fn mark_roots_sets_flag_accessible_by_handler() {
2069        let handler = AgentBlockClientHandler::new();
2070        handler.ensure_server("roots-srv");
2071        assert!(
2072            !handler
2073                .registry
2074                .lock()
2075                .unwrap()
2076                .get("roots-srv")
2077                .unwrap()
2078                .roots
2079        );
2080        handler.mark_roots("roots-srv");
2081        assert!(
2082            handler
2083                .registry
2084                .lock()
2085                .unwrap()
2086                .get("roots-srv")
2087                .unwrap()
2088                .roots
2089        );
2090    }
2091
2092    // ── Tests: notify_roots_list_changed ───────────────────────────────
2093
2094    /// (T2) notify_roots_list_changed on an unknown server must not panic.
2095    #[tokio::test]
2096    async fn notify_roots_list_changed_unknown_server_is_no_op() {
2097        let mgr = McpManager::new();
2098        // Should not panic — logs a warn and returns.
2099        mgr.notify_roots_list_changed("ghost");
2100    }
2101
2102    /// (T1) notify_roots_list_changed on a live in-process server completes
2103    /// without error. Mirrors `send_cancelled_live_server_does_not_panic`.
2104    #[tokio::test]
2105    async fn notify_roots_list_changed_live_server_does_not_panic() {
2106        let mut mgr = McpManager::new();
2107        attach_resource_server(&mut mgr, "res").await;
2108        mgr.notify_roots_list_changed("res");
2109        // Give the spawned task a moment to complete.
2110        tokio::time::sleep(Duration::from_millis(50)).await;
2111    }
2112
2113    // ── Tests: live duplex roots round-trip (Crux C2) ───────────────────
2114
2115    /// (T1 / Crux C2) Live duplex test: the server issues `roots/list` to the
2116    /// client while the client concurrently sends `notify_roots_list_changed`
2117    /// back to the server. Both must complete successfully.
2118    ///
2119    /// Flow:
2120    ///  (a) server→client: `call_tool` triggers `peer.list_roots()` which
2121    ///      dispatches to `AgentBlockClientHandler::list_roots` on the client.
2122    ///  (b) client→server: `notify_roots_list_changed` fires a
2123    ///      `notifications/roots/list_changed` notification concurrently.
2124    ///
2125    /// This test verifies thread-safety of the ROOTS_HANDLERS registry under
2126    /// real async dispatch (Crux C2: concurrent flight, not sequential stubs).
2127    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2128    async fn live_duplex_roots_round_trip() {
2129        let mut mgr = McpManager::new();
2130        let _driver = attach_roots_server_with_isle(&mut mgr, "roots").await;
2131
2132        // Wrap in Arc<RwLock<...>> for concurrent access.
2133        let mgr_arc = std::sync::Arc::new(tokio::sync::RwLock::new(mgr));
2134
2135        // (a) Spawn call_tool — server will issue list_roots back to client.
2136        let mgr_a = std::sync::Arc::clone(&mgr_arc);
2137        let call_handle = tokio::spawn(async move {
2138            mgr_a
2139                .read()
2140                .await
2141                .call_tool("roots", "any_tool", serde_json::json!({}))
2142                .await
2143        });
2144
2145        // (b) Concurrently send notify_roots_list_changed client→server.
2146        let mgr_b = std::sync::Arc::clone(&mgr_arc);
2147        let notify_handle = tokio::spawn(async move {
2148            // Small yield to let call_tool start, ensuring concurrent flight.
2149            tokio::time::sleep(Duration::from_millis(5)).await;
2150            mgr_b.read().await.notify_roots_list_changed("roots");
2151        });
2152
2153        // Both must complete without panic.
2154        let tool_result = call_handle.await.expect("call_handle must not panic");
2155        notify_handle.await.expect("notify_handle must not panic");
2156
2157        // The tool result contains the roots count embedded in the text.
2158        let result = tool_result.expect("call_tool must succeed");
2159        let result_json = serde_json::to_string(&result).expect("serialize result");
2160        assert!(
2161            result_json.contains("roots:1:file:///test"),
2162            "expected roots:1:file:///test in tool result: {result_json}"
2163        );
2164    }
2165
2166    /// (T3) list_roots without a registered handler returns method_not_found error.
2167    /// The server propagates the error back to the client as a McpError.
2168    #[tokio::test]
2169    async fn live_duplex_roots_no_handler_returns_error() {
2170        let mut mgr = McpManager::new();
2171        // Attach without wiring an isle or handler — call_tool triggers list_roots
2172        // which should return method_not_found on the client side.
2173        attach_roots_server_bare(&mut mgr, "roots-no-handler").await;
2174
2175        let result = mgr
2176            .call_tool("roots-no-handler", "any_tool", serde_json::json!({}))
2177            .await;
2178        // The server propagates the list_roots method_not_found error as a
2179        // BlockError on the client side.
2180        assert!(
2181            result.is_err(),
2182            "call_tool must fail when no roots handler is registered: {result:?}"
2183        );
2184    }
2185
2186    // ── Test Server: ElicitationTestServer ─────────────────────────────
2187
2188    /// A test server that, when `call_tool` is invoked, issues an
2189    /// `elicitation/create` request back to the client (server→client direction)
2190    /// using a Form variant and embeds the result in the tool response. This
2191    /// exercises the `ClientHandler::create_elicitation` override.
2192    #[derive(Clone)]
2193    struct ElicitationTestServer;
2194
2195    impl ServerHandler for ElicitationTestServer {
2196        fn get_info(&self) -> ServerInfo {
2197            ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
2198        }
2199
2200        async fn call_tool(
2201            &self,
2202            _params: rmcp::model::CallToolRequestParams,
2203            ctx: RequestContext<RoleServer>,
2204        ) -> Result<rmcp::model::CallToolResult, McpError> {
2205            use rmcp::model::{
2206                CreateElicitationRequestParams, ElicitationSchema, PrimitiveSchema, StringSchema,
2207            };
2208            use std::collections::BTreeMap;
2209
2210            // Build a minimal Form variant with one string property.
2211            let mut props = BTreeMap::new();
2212            props.insert(
2213                "name".to_string(),
2214                PrimitiveSchema::String(StringSchema::new()),
2215            );
2216            let schema = ElicitationSchema::new(props);
2217            let req = CreateElicitationRequestParams::FormElicitationParams {
2218                meta: None,
2219                message: "What is your name?".to_string(),
2220                requested_schema: schema,
2221            };
2222
2223            let result =
2224                ctx.peer.create_elicitation(req).await.map_err(|e| {
2225                    McpError::internal_error(format!("create_elicitation: {e}"), None)
2226                })?;
2227
2228            // Encode action + content as text so tests can assert.
2229            let action_str = match result.action {
2230                rmcp::model::ElicitationAction::Accept => "accept",
2231                rmcp::model::ElicitationAction::Decline => "decline",
2232                rmcp::model::ElicitationAction::Cancel => "cancel",
2233            };
2234            let content_str = result
2235                .content
2236                .map(|v| serde_json::to_string(&v).unwrap_or_default())
2237                .unwrap_or_default();
2238            Ok(rmcp::model::CallToolResult::success(vec![
2239                rmcp::model::Content::text(format!("elicitation:{action_str}:{content_str}")),
2240            ]))
2241        }
2242    }
2243
2244    /// A test server that issues a Url-variant `elicitation/create` request.
2245    /// Used to verify that the client always returns Decline for Url variants
2246    /// without dispatching to the Lua callback.
2247    #[derive(Clone)]
2248    struct ElicitationUrlTestServer;
2249
2250    impl ServerHandler for ElicitationUrlTestServer {
2251        fn get_info(&self) -> ServerInfo {
2252            ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
2253        }
2254
2255        async fn call_tool(
2256            &self,
2257            _params: rmcp::model::CallToolRequestParams,
2258            ctx: RequestContext<RoleServer>,
2259        ) -> Result<rmcp::model::CallToolResult, McpError> {
2260            use rmcp::model::CreateElicitationRequestParams;
2261
2262            // Issue a Url variant — client must always Decline without Lua dispatch.
2263            let req = CreateElicitationRequestParams::UrlElicitationParams {
2264                meta: None,
2265                message: "Please complete this form online".to_string(),
2266                url: "https://example.com/form".to_string(),
2267                elicitation_id: "test-elicitation-id-001".to_string(),
2268            };
2269
2270            let result =
2271                ctx.peer.create_elicitation(req).await.map_err(|e| {
2272                    McpError::internal_error(format!("create_elicitation: {e}"), None)
2273                })?;
2274
2275            let action_str = match result.action {
2276                rmcp::model::ElicitationAction::Accept => "accept",
2277                rmcp::model::ElicitationAction::Decline => "decline",
2278                rmcp::model::ElicitationAction::Cancel => "cancel",
2279            };
2280            Ok(rmcp::model::CallToolResult::success(vec![
2281                rmcp::model::Content::text(format!("url_elicitation:{action_str}")),
2282            ]))
2283        }
2284    }
2285
2286    /// Attach an `ElicitationTestServer` (Form variant) with a pre-configured
2287    /// handler Isle that has a Lua elicitation handler returning the given
2288    /// `action` and (for accept) a content object `{name: "Alice"}`.
2289    ///
2290    /// Returns the `IsleDriver` so the caller can keep the driver alive.
2291    async fn attach_elicitation_server_with_isle(
2292        mgr: &mut McpManager,
2293        name: &str,
2294        action: &str,
2295    ) -> mlua_isle::AsyncIsleDriver {
2296        use mlua_isle::AsyncIsle;
2297
2298        let (isle, driver) = AsyncIsle::spawn(|_lua: &mlua::Lua| Ok(()))
2299            .await
2300            .expect("AsyncIsle::spawn should succeed");
2301
2302        let name_owned = name.to_string();
2303        let action_owned = action.to_string();
2304        isle.exec(move |lua| {
2305            handler::install_mcp_dispatcher_on_handler_isle(lua)
2306                .map_err(|e| mlua_isle::IsleError::Lua(format!("setup dispatcher: {e}")))?;
2307            // Pre-install the Lua elicitation handler for `name_owned`.
2308            use mlua::prelude::*;
2309            let handlers: LuaTable = lua
2310                .globals()
2311                .get("__mcp_elicitation_handlers")
2312                .map_err(|e| mlua_isle::IsleError::Lua(format!("get handlers: {e}")))?;
2313
2314            // Build a handler that returns the requested action.
2315            let handler_src = match action_owned.as_str() {
2316                "accept" => {
2317                    r#"
2318                    return function(server_name, message, schema_json)
2319                        return { action = "accept", content = { name = "Alice" } }
2320                    end
2321                "#
2322                }
2323                "decline" => {
2324                    r#"
2325                    return function(server_name, message, schema_json)
2326                        return { action = "decline" }
2327                    end
2328                "#
2329                }
2330                "cancel" => {
2331                    r#"
2332                    return function(server_name, message, schema_json)
2333                        return { action = "cancel" }
2334                    end
2335                "#
2336                }
2337                _ => {
2338                    r#"
2339                    return function(server_name, message, schema_json)
2340                        return { action = "decline" }
2341                    end
2342                "#
2343                }
2344            };
2345
2346            let cb: LuaFunction = lua
2347                .load(handler_src)
2348                .set_name("@test_elicitation_handler")
2349                .eval()
2350                .map_err(|e| mlua_isle::IsleError::Lua(format!("eval: {e}")))?;
2351            handlers
2352                .set(name_owned.as_str(), cb)
2353                .map_err(|e| mlua_isle::IsleError::Lua(format!("set handler: {e}")))?;
2354            Ok(String::new())
2355        })
2356        .await
2357        .expect("isle setup must succeed");
2358
2359        let isle_arc = std::sync::Arc::new(isle);
2360
2361        let mut handler = AgentBlockClientHandler::new();
2362        handler.handler_isle = Some(std::sync::Arc::clone(&isle_arc));
2363        handler.server_name = Some(name.to_string());
2364        handler.mark_elicitation(name);
2365
2366        let (server_side, client_side) = tokio::io::duplex(65536);
2367        tokio::spawn(async move {
2368            if let Ok(running) = ElicitationTestServer.serve(server_side).await {
2369                let _ = running.waiting().await;
2370            }
2371        });
2372        let running = handler.serve(client_side).await.expect("handshake");
2373        mgr.servers.insert(name.to_string(), running);
2374
2375        driver
2376    }
2377
2378    /// Attach a plain `ElicitationTestServer` (Form variant) without any Lua
2379    /// handler wired. Used for testing the no-handler Decline path.
2380    /// Server name is wired so `create_elicitation` can check the registry and
2381    /// return `Decline` (not `method_not_found`) when no handler is registered.
2382    async fn attach_elicitation_server_bare(mgr: &mut McpManager, name: &str) {
2383        let (server_side, client_side) = tokio::io::duplex(65536);
2384        tokio::spawn(async move {
2385            if let Ok(running) = ElicitationTestServer.serve(server_side).await {
2386                let _ = running.waiting().await;
2387            }
2388        });
2389        let mut handler = AgentBlockClientHandler::new();
2390        handler.ensure_server(name);
2391        handler.server_name = Some(name.to_string());
2392        let running = handler.serve(client_side).await.expect("handshake");
2393        mgr.servers.insert(name.to_string(), running);
2394    }
2395
2396    /// Attach a `ElicitationUrlTestServer` (Url variant) without any Lua handler.
2397    /// Used to verify Url-variant always returns Decline.
2398    async fn attach_elicitation_url_server(mgr: &mut McpManager, name: &str) {
2399        let (server_side, client_side) = tokio::io::duplex(65536);
2400        tokio::spawn(async move {
2401            if let Ok(running) = ElicitationUrlTestServer.serve(server_side).await {
2402                let _ = running.waiting().await;
2403            }
2404        });
2405        let handler = AgentBlockClientHandler::new();
2406        let running = handler.serve(client_side).await.expect("handshake");
2407        mgr.servers.insert(name.to_string(), running);
2408    }
2409
2410    // ── Tests: mark_elicitation flag ───────────────────────────────────
2411
2412    /// (T1) mark_elicitation sets the registry flag that create_elicitation checks.
2413    #[test]
2414    fn mark_elicitation_sets_flag_accessible_by_handler() {
2415        let handler = AgentBlockClientHandler::new();
2416        handler.ensure_server("elicit-srv");
2417        assert!(
2418            !handler
2419                .registry
2420                .lock()
2421                .unwrap()
2422                .get("elicit-srv")
2423                .unwrap()
2424                .elicitation
2425        );
2426        handler.mark_elicitation("elicit-srv");
2427        assert!(
2428            handler
2429                .registry
2430                .lock()
2431                .unwrap()
2432                .get("elicit-srv")
2433                .unwrap()
2434                .elicitation
2435        );
2436    }
2437
2438    // ── Tests: live duplex elicitation round-trips ─────────────────────
2439
2440    /// (T1 / elicitation_accept) Form variant + handler accept → Accept result with content.
2441    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2442    async fn elicitation_accept_returns_accept_with_content() {
2443        let mut mgr = McpManager::new();
2444        let _driver = attach_elicitation_server_with_isle(&mut mgr, "elicit", "accept").await;
2445
2446        let result = mgr
2447            .call_tool("elicit", "any_tool", serde_json::json!({}))
2448            .await
2449            .expect("call_tool must succeed");
2450        let result_json = serde_json::to_string(&result).expect("serialize result");
2451        assert!(
2452            result_json.contains("elicitation:accept:"),
2453            "expected elicitation:accept: in result: {result_json}"
2454        );
2455        assert!(
2456            result_json.contains("Alice"),
2457            "expected Alice in content: {result_json}"
2458        );
2459    }
2460
2461    /// (T2 / elicitation_decline) Form variant + handler decline → Decline result.
2462    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2463    async fn elicitation_decline_returns_decline() {
2464        let mut mgr = McpManager::new();
2465        let _driver = attach_elicitation_server_with_isle(&mut mgr, "elicit", "decline").await;
2466
2467        let result = mgr
2468            .call_tool("elicit", "any_tool", serde_json::json!({}))
2469            .await
2470            .expect("call_tool must succeed");
2471        let result_json = serde_json::to_string(&result).expect("serialize result");
2472        assert!(
2473            result_json.contains("elicitation:decline:"),
2474            "expected elicitation:decline: in result: {result_json}"
2475        );
2476    }
2477
2478    /// (T3 / elicitation_cancel) Form variant + handler cancel → Cancel result.
2479    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2480    async fn elicitation_cancel_returns_cancel() {
2481        let mut mgr = McpManager::new();
2482        let _driver = attach_elicitation_server_with_isle(&mut mgr, "elicit", "cancel").await;
2483
2484        let result = mgr
2485            .call_tool("elicit", "any_tool", serde_json::json!({}))
2486            .await
2487            .expect("call_tool must succeed");
2488        let result_json = serde_json::to_string(&result).expect("serialize result");
2489        assert!(
2490            result_json.contains("elicitation:cancel:"),
2491            "expected elicitation:cancel: in result: {result_json}"
2492        );
2493    }
2494
2495    /// (T4 / elicitation_url_decline) Url variant → always Decline, no Lua dispatch.
2496    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2497    async fn elicitation_url_variant_always_declines() {
2498        let mut mgr = McpManager::new();
2499        attach_elicitation_url_server(&mut mgr, "elicit-url").await;
2500
2501        let result = mgr
2502            .call_tool("elicit-url", "any_tool", serde_json::json!({}))
2503            .await
2504            .expect("call_tool must succeed");
2505        let result_json = serde_json::to_string(&result).expect("serialize result");
2506        assert!(
2507            result_json.contains("url_elicitation:decline"),
2508            "expected url_elicitation:decline in result: {result_json}"
2509        );
2510    }
2511
2512    /// (T5 / elicitation_no_handler) Form variant + no handler → Decline (spec neutral).
2513    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2514    async fn elicitation_no_handler_returns_decline() {
2515        let mut mgr = McpManager::new();
2516        // Attach without wiring an isle or elicitation handler.
2517        attach_elicitation_server_bare(&mut mgr, "elicit-bare").await;
2518
2519        let result = mgr
2520            .call_tool("elicit-bare", "any_tool", serde_json::json!({}))
2521            .await
2522            .expect("call_tool must succeed — no handler → Decline, not error");
2523        let result_json = serde_json::to_string(&result).expect("serialize result");
2524        assert!(
2525            result_json.contains("elicitation:decline:"),
2526            "expected elicitation:decline: when no handler registered: {result_json}"
2527        );
2528    }
2529
2530    // ── Test Servers: ping ─────────────────────────────────────────────
2531
2532    /// A server that sleeps `delay` before responding to every `ping`.
2533    /// Used to drive the timeout path in `McpManager::ping`.
2534    /// K-181: ServerHandler impl uses `async fn` directly.
2535    #[derive(Clone)]
2536    struct SlowPingServer {
2537        delay: Duration,
2538    }
2539
2540    impl ServerHandler for SlowPingServer {
2541        fn get_info(&self) -> ServerInfo {
2542            ServerInfo::new(ServerCapabilities::builder().build())
2543        }
2544
2545        async fn ping(&self, _ctx: RequestContext<RoleServer>) -> Result<(), McpError> {
2546            tokio::time::sleep(self.delay).await;
2547            Ok(())
2548        }
2549    }
2550
2551    /// Attach an in-process `SlowPingServer` to `mgr` under `name`.
2552    async fn attach_slow_ping_server(mgr: &mut McpManager, name: &str, delay: Duration) {
2553        let (server_side, client_side) = tokio::io::duplex(8192);
2554        let server = SlowPingServer { delay };
2555        tokio::spawn(async move {
2556            if let Ok(running) = server.serve(server_side).await {
2557                let _ = running.waiting().await;
2558            }
2559        });
2560        let handler = AgentBlockClientHandler::new();
2561        let running = handler
2562            .serve(client_side)
2563            .await
2564            .expect("client handshake should succeed over duplex");
2565        mgr.servers.insert(name.to_string(), running);
2566    }
2567
2568    // ── Tests: ping ────────────────────────────────────────────────────
2569
2570    /// (P1) ping against a responsive server returns Ok(latency_ms).
2571    /// latency_ms is a non-negative integer (monotonic Instant measurement).
2572    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2573    async fn ping_success_returns_latency_ms() {
2574        let mut mgr = McpManager::new();
2575        // Use zero delay so the test completes quickly.
2576        attach_slow_ping_server(&mut mgr, "pingsrv", Duration::from_millis(0)).await;
2577
2578        let result = mgr.ping("pingsrv").await;
2579        let latency_ms = result.expect("ping should succeed against a live server");
2580        // latency_ms must be a non-negative integer; the type is u64.
2581        // We cannot assert an exact value, but it should be <= 5000ms on
2582        // any reasonable CI box.
2583        assert!(
2584            latency_ms <= 5000,
2585            "latency_ms={latency_ms} looks unreasonable (> 5 s)"
2586        );
2587    }
2588
2589    /// (P2) ping against a server that delays beyond rpc_timeout returns
2590    /// `BlockError::Timeout`, not `BlockError::Mcp`.
2591    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2592    async fn ping_timeout_returns_block_error_timeout() {
2593        let mut mgr = McpManager::with_rpc_timeout(Duration::from_millis(1))
2594            .expect("with_rpc_timeout(1ms) should succeed");
2595        // Server sleeps 200ms, rpc_timeout is 1ms → guaranteed timeout.
2596        attach_slow_ping_server(&mut mgr, "slowping", Duration::from_millis(200)).await;
2597
2598        let result = mgr.ping("slowping").await;
2599        let err = result.expect_err("ping should time out");
2600        assert!(
2601            matches!(err, BlockError::Timeout(_)),
2602            "expected BlockError::Timeout, got: {err:?}"
2603        );
2604        let msg = err.to_string();
2605        assert!(
2606            msg.contains("timed out"),
2607            "timeout message should contain 'timed out': {msg}"
2608        );
2609    }
2610}