Skip to main content

agentkit_mcp/
lib.rs

1//! Model Context Protocol integration for agentkit, built on top of [`rmcp`].
2//!
3//! This crate exposes:
4//!
5//! - [`McpServerConfig`] / [`McpTransportBinding`] / [`StdioTransportConfig`] /
6//!   [`StreamableHttpTransportConfig`] — declarative transport configuration.
7//! - [`McpConnection`] — a live, single-server connection wrapping
8//!   [`rmcp::service::RunningService`].
9//! - [`McpServerManager`] — multi-server lifecycle, discovery, catalog diffing,
10//!   and auth replay.
11//! - [`McpServerHandle`], [`McpToolExecutor`], [`McpToolAdapter`],
12//!   [`McpResourceHandle`], [`McpPromptHandle`],
13//!   [`McpCapabilityProvider`] — bridges into the agentkit `Tool` / capabilities
14//!   systems.
15//!
16//! Wire-protocol types (`CallToolResult`, `ReadResourceResult`, `Content`,
17//! `ToolAnnotations`, `Prompt`, sampling/elicitation/roots payloads, …) are
18//! re-exported from [`rmcp::model`] directly — there is no parallel
19//! agentkit-side vocabulary. As `rmcp` tracks new MCP spec revisions, those
20//! types and their fields propagate into agentkit unchanged.
21
22use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::fmt;
24use std::sync::{Arc, RwLock};
25
26use agentkit_capabilities::{
27    CapabilityContext, CapabilityError, CapabilityProvider, Invocable, PromptContents,
28    PromptDescriptor, PromptId, PromptProvider, ResourceContents, ResourceDescriptor, ResourceId,
29    ResourceProvider,
30};
31use agentkit_core::{
32    DataRef, Item, ItemKind, MediaPart, MetadataMap, Modality, Part, TextPart, ToolOutput,
33    ToolResultPart,
34};
35use agentkit_tools_core::{
36    AllowAllPermissions, CatalogReader, CatalogWriter, PermissionChecker, Tool, ToolAnnotations,
37    ToolCapabilityProvider, ToolContext, ToolError, ToolName, ToolRegistry, ToolRequest,
38    ToolResult, ToolSpec, dynamic_catalog,
39};
40use async_trait::async_trait;
41use futures_util::future::try_join_all;
42use futures_util::stream::BoxStream;
43use http::{HeaderName, HeaderValue};
44use rmcp::ServiceExt;
45use rmcp::handler::client::ClientHandler;
46use rmcp::model as rmcp_model;
47use rmcp::service::{ClientInitializeError, Peer, RoleClient, RunningService, ServiceError};
48use rmcp::transport::streamable_http_client::{
49    AuthRequiredError, InsufficientScopeError, StreamableHttpClient as RmcpStreamableHttpClient,
50    StreamableHttpClientTransportConfig as RmcpStreamableHttpClientTransportConfig,
51    StreamableHttpError, StreamableHttpPostResponse,
52};
53use rmcp::transport::{
54    ConfigureCommandExt, DynamicTransportError, StreamableHttpClientTransport, TokioChildProcess,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::{Value, json};
58use sse_stream::{Error as SseError, Sse};
59use thiserror::Error;
60use tokio::sync::{Mutex, broadcast, mpsc};
61
62/// Re-exports of the rmcp wire-protocol types this crate now surfaces directly
63/// instead of wrapping. Pull these in to pattern-match on tool annotations,
64/// content blocks, structured tool output, embedded resources, sampling /
65/// elicitation requests, progress and log notifications, etc.
66pub use rmcp::model::{
67    Annotations as McpAnnotations, AudioContent, CallToolResult,
68    CancelledNotificationParam as McpCancelledNotificationParam,
69    ClientCapabilities as McpClientCapabilities, Content,
70    CreateElicitationRequestParams as McpCreateElicitationRequestParams,
71    CreateElicitationResult as McpCreateElicitationResult,
72    CreateMessageRequestParams as McpCreateMessageRequestParams,
73    CreateMessageResult as McpCreateMessageResult, ElicitationAction as McpElicitationAction,
74    ElicitationCapability as McpElicitationCapability, EmbeddedResource,
75    FormElicitationCapability as McpFormElicitationCapability, GetPromptResult, ImageContent,
76    Implementation as McpImplementation, ListRootsResult as McpListRootsResult,
77    LoggingLevel as McpLoggingLevel,
78    LoggingMessageNotificationParam as McpLoggingMessageNotificationParam,
79    ProgressNotificationParam as McpProgressNotificationParam, Prompt as McpPrompt, PromptArgument,
80    PromptMessage, PromptMessageContent, PromptMessageRole, RawAudioContent, RawContent,
81    RawEmbeddedResource, RawImageContent, RawResource as McpRawResource, RawTextContent,
82    ReadResourceResult, Resource as McpResource, ResourceContents as McpResourceContents,
83    ResourceUpdatedNotificationParam as McpResourceUpdatedNotificationParam, Root as McpRoot,
84    RootsCapabilities as McpRootsCapabilities, SamplingCapability as McpSamplingCapability,
85    SamplingMessage as McpSamplingMessage, SetLevelRequestParams as McpSetLevelRequestParams,
86    TextContent, Tool as McpTool, ToolAnnotations as McpToolAnnotations,
87    UrlElicitationCapability as McpUrlElicitationCapability,
88};
89
90/// Re-export of the JSON-RPC client→server envelope handed to
91/// [`McpHttpClient::post_message`].
92pub use rmcp::model::ClientJsonRpcMessage;
93
94/// Re-exports of the rmcp Streamable HTTP transport types used by
95/// [`McpHttpClient`] implementations.
96pub use rmcp::transport::streamable_http_client::{
97    StreamableHttpError as McpStreamableHttpError,
98    StreamableHttpPostResponse as McpStreamableHttpPostResponse,
99};
100
101/// Re-export of the SSE event/error types referenced by [`McpHttpClient::get_stream`].
102pub use sse_stream::{Error as McpSseError, Sse as McpSse};
103
104/// Alias for [`McpTool`].
105pub type McpToolDescriptor = McpTool;
106/// Alias for [`McpResource`].
107pub type McpResourceDescriptor = McpResource;
108/// Alias for [`McpPrompt`].
109pub type McpPromptDescriptor = McpPrompt;
110
111/// An auth challenge raised by an MCP server during a tool call, resource
112/// read, prompt fetch, or connection handshake.
113///
114/// Hosts handle these via an [`McpAuthResponder`] registered on
115/// [`McpHandlerConfig::with_auth_responder`]. The responder is invoked
116/// inline by [`McpToolAdapter::invoke`] (and equivalent paths in
117/// [`McpServerManager`]) — auth never crosses the executor boundary as a
118/// loop interrupt.
119#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
120pub struct AuthRequest {
121    /// Unique identifier for this auth challenge.
122    pub id: String,
123    /// Name of the authentication provider (e.g. `"github"`, `"google"`).
124    pub provider: String,
125    /// The MCP operation that triggered the auth requirement.
126    pub operation: AuthOperation,
127    /// Provider-specific challenge data (e.g. OAuth URLs, scopes).
128    pub challenge: MetadataMap,
129}
130
131impl AuthRequest {
132    /// Convenience: returns the MCP server id this challenge targets, if any.
133    pub fn server_id(&self) -> Option<&str> {
134        self.operation.server_id()
135    }
136}
137
138/// The MCP operation that triggered an [`AuthRequest`].
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140pub enum AuthOperation {
141    /// Connecting to an MCP server.
142    McpConnect {
143        server_id: String,
144        metadata: MetadataMap,
145    },
146    /// Invoking a tool on an MCP server.
147    McpToolCall {
148        server_id: String,
149        tool_name: String,
150        input: Value,
151        metadata: MetadataMap,
152    },
153    /// Reading a resource from an MCP server.
154    McpResourceRead {
155        server_id: String,
156        resource_id: String,
157        metadata: MetadataMap,
158    },
159    /// Fetching a prompt from an MCP server.
160    McpPromptGet {
161        server_id: String,
162        prompt_id: String,
163        args: Value,
164        metadata: MetadataMap,
165    },
166    /// Any other MCP method that requires auth (resource subscribe/unsubscribe,
167    /// logging level changes, future protocol additions). The typed variants
168    /// above cover the common cases; this catch-all preserves the method name
169    /// and JSON params verbatim for hosts that need to render or log them.
170    McpOther {
171        server_id: String,
172        method: String,
173        params: Value,
174        metadata: MetadataMap,
175    },
176}
177
178impl AuthOperation {
179    /// Returns the MCP server ID this operation targets.
180    pub fn server_id(&self) -> Option<&str> {
181        match self {
182            Self::McpConnect { server_id, .. }
183            | Self::McpToolCall { server_id, .. }
184            | Self::McpResourceRead { server_id, .. }
185            | Self::McpPromptGet { server_id, .. }
186            | Self::McpOther { server_id, .. } => Some(server_id.as_str()),
187        }
188    }
189}
190
191/// Outcome of an [`AuthRequest`] after the host's [`McpAuthResponder`] runs.
192#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum AuthResolution {
194    /// The host obtained credentials.
195    Provided {
196        request: AuthRequest,
197        credentials: MetadataMap,
198    },
199    /// The host cancelled the auth flow.
200    Cancelled { request: AuthRequest },
201}
202
203impl AuthResolution {
204    /// Builds a successful auth resolution.
205    pub fn provided(request: AuthRequest, credentials: MetadataMap) -> Self {
206        Self::Provided {
207            request,
208            credentials,
209        }
210    }
211
212    /// Builds a cancelled auth resolution.
213    pub fn cancelled(request: AuthRequest) -> Self {
214        Self::Cancelled { request }
215    }
216
217    /// Returns the underlying [`AuthRequest`] regardless of variant.
218    pub fn request(&self) -> &AuthRequest {
219        match self {
220            Self::Provided { request, .. } | Self::Cancelled { request } => request,
221        }
222    }
223}
224
225/// Host-supplied resolver for MCP auth challenges.
226///
227/// Install one via [`McpHandlerConfig::with_auth_responder`]. When an MCP
228/// server returns an auth challenge during a tool call, resource read, or
229/// prompt fetch, the adapter invokes [`McpAuthResponder::resolve`] inline,
230/// applies the resulting credentials to the [`McpConnection`], and retries
231/// the original operation. Auth never surfaces as a loop interrupt.
232///
233/// Hosts that want to interleave the auth UI with the loop's main thread
234/// implement a thin channel-bridging responder (responder sends the
235/// challenge to the UI thread on a `mpsc::Sender`, awaits a `oneshot`
236/// reply with the resolution).
237#[async_trait]
238pub trait McpAuthResponder: Send + Sync + 'static {
239    async fn resolve(&self, request: AuthRequest) -> Result<AuthResolution, McpError>;
240}
241
242/// Typed view of a JSON-RPC error returned by an MCP server for an invoked
243/// method.
244///
245/// Surfaced by [`McpError::Invocation`] so callers can branch on the
246/// underlying error code without re-parsing strings. The variants cover
247/// every rmcp [`rmcp::model::ErrorCode`] constant defined at the time of
248/// writing; anything else (custom server codes, future protocol additions)
249/// lands in [`Self::Other`] with the original code preserved.
250///
251/// For the URL elicitation case ([`rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED`])
252/// the `data` payload is best-effort parsed into [`UrlElicitationData`].
253/// When the server's payload does not match the documented shape the typed
254/// `data` slot is `None` but `raw_data` always preserves the original
255/// [`serde_json::Value`] so callers can fall back to ad-hoc inspection.
256#[derive(Debug, Clone, thiserror::Error)]
257pub enum McpInvocationError {
258    /// JSON-RPC error `-32042` (URL elicitation required).
259    #[error("url elicitation required: {message}")]
260    UrlElicitation {
261        /// Human-readable message from the server.
262        message: String,
263        /// Typed view of the server's `data` payload, when it matched the
264        /// documented URL elicitation shape.
265        data: Option<UrlElicitationData>,
266        /// The original `data` value, preserved verbatim.
267        raw_data: Option<serde_json::Value>,
268    },
269    /// JSON-RPC error `-32600` (invalid request).
270    #[error("invalid request: {message}")]
271    InvalidRequest {
272        message: String,
273        data: Option<serde_json::Value>,
274    },
275    /// JSON-RPC error `-32601` (method not found).
276    #[error("method not found: {message}")]
277    MethodNotFound {
278        message: String,
279        data: Option<serde_json::Value>,
280    },
281    /// JSON-RPC error `-32602` (invalid params).
282    #[error("invalid params: {message}")]
283    InvalidParams {
284        message: String,
285        data: Option<serde_json::Value>,
286    },
287    /// JSON-RPC error `-32603` (internal error).
288    #[error("internal error: {message}")]
289    InternalError {
290        message: String,
291        data: Option<serde_json::Value>,
292    },
293    /// JSON-RPC error `-32700` (parse error).
294    #[error("parse error: {message}")]
295    ParseError {
296        message: String,
297        data: Option<serde_json::Value>,
298    },
299    /// JSON-RPC error `-32002` (resource not found).
300    #[error("resource not found: {message}")]
301    ResourceNotFound {
302        message: String,
303        data: Option<serde_json::Value>,
304    },
305    /// Forward-compat for custom server codes and any future rmcp additions
306    /// not yet recognized by this crate.
307    #[error("mcp error code {code}: {message}")]
308    Other {
309        code: i32,
310        message: String,
311        data: Option<serde_json::Value>,
312    },
313}
314
315/// Typed payload for the URL elicitation error case.
316///
317/// Mirrors the shape of [`rmcp::model::CreateElicitationRequestParams::UrlElicitationParams`]
318/// (camelCase on the wire). Server messages that include extra fields are
319/// accepted; missing required fields make typed parsing fail and the
320/// surrounding [`McpInvocationError::UrlElicitation`] preserves the raw
321/// payload instead.
322#[derive(Debug, Clone, serde::Deserialize)]
323#[serde(rename_all = "camelCase")]
324pub struct UrlElicitationData {
325    /// The URL where the user can complete the elicitation.
326    pub url: String,
327    /// The server-issued identifier for this elicitation request.
328    pub elicitation_id: String,
329    /// Optional human-readable message that accompanied the payload.
330    #[serde(default)]
331    pub message: Option<String>,
332}
333
334impl McpInvocationError {
335    /// Lifts an rmcp wire error into the typed enum. Infallible: well-known
336    /// codes attempt a typed `data` parse and degrade to a raw-only view
337    /// when parsing fails; unrecognized codes land in [`Self::Other`].
338    pub fn from_error_data(err: rmcp::model::ErrorData) -> Self {
339        let rmcp::model::ErrorData {
340            code,
341            message,
342            data,
343        } = err;
344        let message = message.into_owned();
345        match code {
346            rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED => {
347                let typed = data.as_ref().and_then(|value| {
348                    serde_json::from_value::<UrlElicitationData>(value.clone()).ok()
349                });
350                Self::UrlElicitation {
351                    message,
352                    data: typed,
353                    raw_data: data,
354                }
355            }
356            rmcp::model::ErrorCode::INVALID_REQUEST => Self::InvalidRequest { message, data },
357            rmcp::model::ErrorCode::METHOD_NOT_FOUND => Self::MethodNotFound { message, data },
358            rmcp::model::ErrorCode::INVALID_PARAMS => Self::InvalidParams { message, data },
359            rmcp::model::ErrorCode::INTERNAL_ERROR => Self::InternalError { message, data },
360            rmcp::model::ErrorCode::PARSE_ERROR => Self::ParseError { message, data },
361            rmcp::model::ErrorCode::RESOURCE_NOT_FOUND => Self::ResourceNotFound { message, data },
362            other => Self::Other {
363                code: other.0,
364                message,
365                data,
366            },
367        }
368    }
369
370    /// Returns the underlying JSON-RPC error code.
371    pub fn code(&self) -> i32 {
372        match self {
373            Self::UrlElicitation { .. } => rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED.0,
374            Self::InvalidRequest { .. } => rmcp::model::ErrorCode::INVALID_REQUEST.0,
375            Self::MethodNotFound { .. } => rmcp::model::ErrorCode::METHOD_NOT_FOUND.0,
376            Self::InvalidParams { .. } => rmcp::model::ErrorCode::INVALID_PARAMS.0,
377            Self::InternalError { .. } => rmcp::model::ErrorCode::INTERNAL_ERROR.0,
378            Self::ParseError { .. } => rmcp::model::ErrorCode::PARSE_ERROR.0,
379            Self::ResourceNotFound { .. } => rmcp::model::ErrorCode::RESOURCE_NOT_FOUND.0,
380            Self::Other { code, .. } => *code,
381        }
382    }
383}
384
385/// Userland hook invoked when an MCP server returns a JSON-RPC error for an
386/// invoked method.
387///
388/// Lets the host translate well-known errors (e.g. URL elicitation
389/// challenges) into a synthesized tool result the agent can render —
390/// without agentkit-mcp baking in any specific UX or response policy.
391///
392/// Install one via [`McpHandlerConfig::with_error_responder`]. When set,
393/// [`McpToolAdapter::invoke`] forwards every JSON-RPC error to the
394/// responder before falling back to [`ToolError::ExecutionFailed`]; the
395/// responder returns either a synthesized [`CallToolResult`] (treated as a
396/// successful call so `structured_content` flows through
397/// [`ToolOutput::Structured`]) or [`ErrorResponderOutcome::PassThrough`] to
398/// preserve the default failure path.
399#[async_trait]
400pub trait McpErrorResponder: Send + Sync + 'static {
401    /// Inspects an invocation error and decides whether to synthesize a
402    /// replacement [`CallToolResult`] or propagate the error.
403    async fn handle(
404        &self,
405        error: &McpInvocationError,
406        ctx: McpErrorContext<'_>,
407    ) -> ErrorResponderOutcome;
408}
409
410/// Context describing which server / method / input produced the
411/// invocation error currently being inspected by [`McpErrorResponder::handle`].
412pub struct McpErrorContext<'a> {
413    /// The server that returned the error.
414    pub server_id: &'a McpServerId,
415    /// The MCP method that was invoked.
416    pub method: &'a McpMethod,
417    /// The input payload supplied to the invocation, when available.
418    pub input: Option<&'a serde_json::Value>,
419}
420
421/// Decision returned by an [`McpErrorResponder`].
422pub enum ErrorResponderOutcome {
423    /// Replace the error with a synthesized successful response. The
424    /// returned [`CallToolResult`] flows through agentkit-mcp's normal
425    /// tool-result conversion: `structured_content` becomes
426    /// [`ToolOutput::Structured`], `content` becomes text / media parts,
427    /// and `is_error` is honoured.
428    SynthesizeResult(CallToolResult),
429    /// Defer to default behavior; the invocation error continues to surface
430    /// as [`ToolError::ExecutionFailed`].
431    PassThrough,
432}
433
434/// Unique identifier for a registered MCP server.
435///
436/// Each MCP server in a [`McpServerManager`] is addressed by its `McpServerId`.
437#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
438pub struct McpServerId(pub String);
439
440impl McpServerId {
441    /// Creates a new server identifier from any string-like value.
442    pub fn new(value: impl Into<String>) -> Self {
443        Self(value.into())
444    }
445}
446
447impl fmt::Display for McpServerId {
448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449        self.0.fmt(f)
450    }
451}
452
453/// Configuration for an MCP server that communicates over standard I/O.
454///
455/// The specified command is spawned as a child process; rmcp drives the
456/// JSON-RPC framing over its stdin/stdout.
457#[derive(Clone, Debug, PartialEq, Eq)]
458pub struct StdioTransportConfig {
459    /// The executable to launch (e.g. `"npx"`, `"python"`, `"node"`).
460    pub command: String,
461    /// Command-line arguments passed to the executable.
462    pub args: Vec<String>,
463    /// Additional environment variables set for the child process.
464    pub env: Vec<(String, String)>,
465    /// Optional working directory for the child process.
466    pub cwd: Option<std::path::PathBuf>,
467}
468
469impl StdioTransportConfig {
470    /// Creates a new stdio transport configuration for the given command.
471    pub fn new(command: impl Into<String>) -> Self {
472        Self {
473            command: command.into(),
474            args: Vec::new(),
475            env: Vec::new(),
476            cwd: None,
477        }
478    }
479
480    /// Appends a command-line argument. Returns `self` for chaining.
481    pub fn with_arg(mut self, arg: impl Into<String>) -> Self {
482        self.args.push(arg.into());
483        self
484    }
485
486    /// Adds an environment variable for the child process. Returns `self` for chaining.
487    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
488        self.env.push((key.into(), value.into()));
489        self
490    }
491
492    /// Sets the working directory for the child process. Returns `self` for chaining.
493    pub fn with_cwd(mut self, cwd: impl Into<std::path::PathBuf>) -> Self {
494        self.cwd = Some(cwd.into());
495        self
496    }
497}
498
499/// Configuration for an MCP server that communicates over the MCP Streamable HTTP transport.
500#[derive(Clone, Default)]
501pub struct StreamableHttpTransportConfig {
502    /// The MCP endpoint URL to connect to.
503    pub url: String,
504    /// Static bearer token sent as an HTTP `Authorization: Bearer ...` header.
505    ///
506    /// Ignored when [`Self::http_client`] is set, since the custom client owns
507    /// authorization for every request.
508    pub bearer_token: Option<String>,
509    /// Static custom HTTP headers sent with every Streamable HTTP request.
510    ///
511    /// Ignored when [`Self::http_client`] is set.
512    pub headers: Vec<(HeaderName, HeaderValue)>,
513    /// Optional caller-supplied HTTP client.
514    ///
515    /// When `Some`, agentkit-mcp routes every Streamable HTTP request through
516    /// the provided implementation instead of rmcp's default reqwest client.
517    /// This is the seam to inject dynamic bearers, request signing, retry
518    /// middleware, custom TLS, and so on. See [`McpHttpClient`].
519    pub http_client: Option<Arc<dyn McpHttpClient>>,
520}
521
522impl fmt::Debug for StreamableHttpTransportConfig {
523    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524        f.debug_struct("StreamableHttpTransportConfig")
525            .field("url", &self.url)
526            .field(
527                "bearer_token",
528                &self.bearer_token.as_deref().map(|_| "<redacted>"),
529            )
530            .field("headers", &self.headers)
531            .field(
532                "http_client",
533                &self.http_client.as_ref().map(|_| "<custom>"),
534            )
535            .finish()
536    }
537}
538
539impl StreamableHttpTransportConfig {
540    /// Creates a new Streamable HTTP transport configuration for the given MCP endpoint.
541    pub fn new(url: impl Into<String>) -> Self {
542        Self {
543            url: url.into(),
544            bearer_token: None,
545            headers: Vec::new(),
546            http_client: None,
547        }
548    }
549
550    /// Sets a static bearer token for Streamable HTTP authorization.
551    ///
552    /// Ignored when a custom [`McpHttpClient`] is installed via
553    /// [`Self::with_http_client`].
554    pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
555        self.bearer_token = Some(token.into());
556        self
557    }
558
559    /// Installs a caller-supplied HTTP client for every Streamable HTTP
560    /// request issued by this transport.
561    ///
562    /// This is the only seam capable of producing per-request dynamic state
563    /// (rotating bearers, request signing, distributed-tracing headers).
564    /// rmcp's default reqwest path is bypassed entirely when this is set, so
565    /// implementations are responsible for forwarding `auth_header` /
566    /// `custom_headers` if they want the static config to keep applying.
567    pub fn with_http_client(mut self, client: Arc<dyn McpHttpClient>) -> Self {
568        self.http_client = Some(client);
569        self
570    }
571
572    /// Adds a static HTTP header for every Streamable HTTP request.
573    ///
574    /// Reserved MCP session and protocol headers are still managed by RMCP.
575    /// Ignored when a custom [`McpHttpClient`] is installed.
576    pub fn with_header<N, V>(mut self, name: N, value: V) -> Result<Self, McpError>
577    where
578        N: TryInto<HeaderName>,
579        N::Error: fmt::Display,
580        V: TryInto<HeaderValue>,
581        V::Error: fmt::Display,
582    {
583        let name = name
584            .try_into()
585            .map_err(|error| McpError::Transport(format!("invalid HTTP header name: {error}")))?;
586        let value = value
587            .try_into()
588            .map_err(|error| McpError::Transport(format!("invalid HTTP header value: {error}")))?;
589        self.headers.push((name, value));
590        Ok(self)
591    }
592}
593
594/// Type alias for the SSE stream returned by [`McpHttpClient::get_stream`].
595pub type McpSseStream = BoxStream<'static, Result<Sse, SseError>>;
596
597/// Pluggable HTTP transport for the MCP Streamable HTTP client.
598///
599/// Mirrors [`rmcp::transport::streamable_http_client::StreamableHttpClient`]
600/// but is dyn-compatible (boxed via `async_trait`) so the configuration can
601/// store an `Arc<dyn McpHttpClient>` without genericizing every type that
602/// flows through [`McpServerConfig`] / [`McpTransportBinding`].
603///
604/// The associated error type is fixed to [`reqwest::Error`] so that
605/// agentkit-mcp's auth-challenge detection (which downcasts to
606/// [`StreamableHttpError<reqwest::Error>`]) keeps working — implementations
607/// that wrap a non-reqwest backend should map their failures into a
608/// `reqwest::Error` before returning.
609///
610/// All three methods are invoked by rmcp's worker on every protocol op.
611/// `auth_header` and `custom_headers` carry the values resolved from
612/// [`StreamableHttpTransportConfig`] at connection time; implementations are
613/// free to ignore them and inject their own per-call values (e.g. a fresh
614/// bearer pulled from a runtime registry).
615#[async_trait]
616pub trait McpHttpClient: Send + Sync + 'static {
617    /// POSTs a single client→server JSON-RPC message. The response carries
618    /// either a JSON body or an SSE stream depending on what the server
619    /// negotiates.
620    async fn post_message(
621        &self,
622        uri: Arc<str>,
623        message: ClientJsonRpcMessage,
624        session_id: Option<Arc<str>>,
625        auth_header: Option<String>,
626        custom_headers: HashMap<HeaderName, HeaderValue>,
627    ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>>;
628
629    /// Tears down a server-issued session (HTTP DELETE).
630    async fn delete_session(
631        &self,
632        uri: Arc<str>,
633        session_id: Arc<str>,
634        auth_header: Option<String>,
635        custom_headers: HashMap<HeaderName, HeaderValue>,
636    ) -> Result<(), StreamableHttpError<reqwest::Error>>;
637
638    /// Opens a server→client SSE stream (HTTP GET) for push notifications and
639    /// reconnect resumes.
640    async fn get_stream(
641        &self,
642        uri: Arc<str>,
643        session_id: Arc<str>,
644        last_event_id: Option<String>,
645        auth_header: Option<String>,
646        custom_headers: HashMap<HeaderName, HeaderValue>,
647    ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>>;
648}
649
650/// Internal newtype that adapts an `Arc<dyn McpHttpClient>` to rmcp's
651/// generic, non-dyn-compatible [`RmcpStreamableHttpClient`] trait.
652#[derive(Clone)]
653struct DynHttpClient(Arc<dyn McpHttpClient>);
654
655impl RmcpStreamableHttpClient for DynHttpClient {
656    type Error = reqwest::Error;
657
658    async fn post_message(
659        &self,
660        uri: Arc<str>,
661        message: ClientJsonRpcMessage,
662        session_id: Option<Arc<str>>,
663        auth_header: Option<String>,
664        custom_headers: HashMap<HeaderName, HeaderValue>,
665    ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>> {
666        self.0
667            .post_message(uri, message, session_id, auth_header, custom_headers)
668            .await
669    }
670
671    async fn delete_session(
672        &self,
673        uri: Arc<str>,
674        session_id: Arc<str>,
675        auth_header: Option<String>,
676        custom_headers: HashMap<HeaderName, HeaderValue>,
677    ) -> Result<(), StreamableHttpError<reqwest::Error>> {
678        self.0
679            .delete_session(uri, session_id, auth_header, custom_headers)
680            .await
681    }
682
683    async fn get_stream(
684        &self,
685        uri: Arc<str>,
686        session_id: Arc<str>,
687        last_event_id: Option<String>,
688        auth_header: Option<String>,
689        custom_headers: HashMap<HeaderName, HeaderValue>,
690    ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>> {
691        self.0
692            .get_stream(uri, session_id, last_event_id, auth_header, custom_headers)
693            .await
694    }
695}
696
697/// Selects which transport an MCP server should use.
698#[derive(Clone, Debug)]
699pub enum McpTransportBinding {
700    /// Communicate over the child process's stdin/stdout.
701    Stdio(StdioTransportConfig),
702    /// Communicate over the MCP Streamable HTTP transport.
703    StreamableHttp(StreamableHttpTransportConfig),
704}
705
706/// Full configuration for a single MCP server.
707#[derive(Clone, Debug)]
708pub struct McpServerConfig {
709    /// Unique identifier for this server.
710    pub id: McpServerId,
711    /// Transport binding that determines how communication happens.
712    pub transport: McpTransportBinding,
713    /// Arbitrary metadata attached to this server configuration.
714    pub metadata: MetadataMap,
715}
716
717impl McpServerConfig {
718    /// Creates a new server configuration with the given identifier and transport.
719    pub fn new(id: impl Into<String>, transport: McpTransportBinding) -> Self {
720        Self {
721            id: McpServerId::new(id),
722            transport,
723            metadata: MetadataMap::new(),
724        }
725    }
726
727    /// Creates a stdio-backed server configuration.
728    pub fn stdio(id: impl Into<String>, command: impl Into<String>) -> Self {
729        Self::new(
730            id,
731            McpTransportBinding::Stdio(StdioTransportConfig::new(command)),
732        )
733    }
734
735    /// Creates a Streamable HTTP-backed server configuration.
736    pub fn streamable_http(id: impl Into<String>, url: impl Into<String>) -> Self {
737        Self::new(
738            id,
739            McpTransportBinding::StreamableHttp(StreamableHttpTransportConfig::new(url)),
740        )
741    }
742
743    /// Replaces the configuration metadata.
744    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
745        self.metadata = metadata;
746        self
747    }
748}
749
750type CustomNamespace = Arc<dyn Fn(&McpServerId, &str) -> String + Send + Sync>;
751
752/// Strategy used to derive the agentkit-side tool name for an MCP tool.
753///
754/// The default (`Default`) preserves agentkit's historical
755/// `mcp_<server>_<tool>` shape so that names satisfy provider validators
756/// that only allow `[a-zA-Z0-9_-]` (e.g. Anthropic on Vertex). Use
757/// [`McpToolNamespace::None`] when the calling provider already namespaces
758/// remote tools, or [`McpToolNamespace::Custom`] for a bespoke scheme.
759#[derive(Clone, Default)]
760pub enum McpToolNamespace {
761    /// Format names as `mcp_<server>_<tool>`.
762    #[default]
763    Default,
764    /// Use the raw MCP tool name with no prefix at all.
765    None,
766    /// Apply a caller-supplied function for full control.
767    Custom(CustomNamespace),
768}
769
770impl fmt::Debug for McpToolNamespace {
771    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
772        match self {
773            Self::Default => f.write_str("McpToolNamespace::Default"),
774            Self::None => f.write_str("McpToolNamespace::None"),
775            Self::Custom(_) => f.write_str("McpToolNamespace::Custom(<fn>)"),
776        }
777    }
778}
779
780impl McpToolNamespace {
781    /// Builds a custom namespace from a closure.
782    pub fn custom(f: impl Fn(&McpServerId, &str) -> String + Send + Sync + 'static) -> Self {
783        Self::Custom(Arc::new(f))
784    }
785
786    /// Applies the namespace strategy to produce the agentkit tool name.
787    pub fn apply(&self, server_id: &McpServerId, tool_name: &str) -> String {
788        match self {
789            Self::Default => format!("mcp_{server_id}_{tool_name}"),
790            Self::None => tool_name.to_string(),
791            Self::Custom(f) => f(server_id, tool_name),
792        }
793    }
794
795    /// Recovers the raw MCP tool name from an agentkit-side name. Returns
796    /// `None` for [`Self::Custom`] (no general inverse) or when the name
797    /// doesn't match the expected shape.
798    pub fn unapply(&self, server_id: &McpServerId, agentkit_name: &str) -> Option<String> {
799        match self {
800            Self::Default => agentkit_name
801                .strip_prefix(&format!("mcp_{server_id}_"))
802                .map(str::to_string),
803            Self::None => Some(agentkit_name.to_string()),
804            Self::Custom(_) => None,
805        }
806    }
807}
808
809/// A snapshot of all capabilities discovered from a single MCP server.
810///
811/// Tools, resources, and prompts are stored as raw rmcp wire types
812/// ([`McpTool`], [`McpResource`], [`McpPrompt`]) so that consumers see the
813/// full typed surface — `Tool::annotations`, `Tool::output_schema`,
814/// `Tool::execution`, `Tool::icons`; `Resource::title` / `mime_type` /
815/// `size`; `Prompt::arguments` (with the typed `required` flag and per-arg
816/// `description`).
817#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
818pub struct McpDiscoverySnapshot {
819    /// The server this snapshot was taken from.
820    pub server_id: McpServerId,
821    /// Tools advertised by the server.
822    pub tools: Vec<McpTool>,
823    /// Resources advertised by the server.
824    pub resources: Vec<McpResource>,
825    /// Prompts advertised by the server.
826    pub prompts: Vec<McpPrompt>,
827    /// Arbitrary metadata attached to this snapshot.
828    pub metadata: MetadataMap,
829}
830
831/// Catalog and lifecycle events emitted by [`McpServerManager`].
832#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
833pub enum McpCatalogEvent {
834    /// A server connected and completed initial discovery.
835    ServerConnected { server_id: McpServerId },
836    /// A server disconnected.
837    ServerDisconnected { server_id: McpServerId },
838    /// The server's tool list changed.
839    ToolsChanged {
840        server_id: McpServerId,
841        added: Vec<String>,
842        removed: Vec<String>,
843        changed: Vec<String>,
844    },
845    /// The server's resource list changed.
846    ResourcesChanged {
847        server_id: McpServerId,
848        added: Vec<String>,
849        removed: Vec<String>,
850        changed: Vec<String>,
851    },
852    /// The server's prompt list changed.
853    PromptsChanged {
854        server_id: McpServerId,
855        added: Vec<String>,
856        removed: Vec<String>,
857        changed: Vec<String>,
858    },
859    /// Authentication state changed for a server.
860    AuthChanged { server_id: McpServerId },
861    /// A catalog refresh failed.
862    RefreshFailed {
863        server_id: McpServerId,
864        message: String,
865    },
866}
867
868/// Capabilities advertised by an MCP server during the `initialize` handshake.
869#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
870pub struct McpServerCapabilities {
871    /// Advertised `tools` capability.
872    #[serde(default, skip_serializing_if = "Option::is_none")]
873    pub tools: Option<ToolsCapability>,
874    /// Advertised `resources` capability.
875    #[serde(default, skip_serializing_if = "Option::is_none")]
876    pub resources: Option<ResourcesCapability>,
877    /// Advertised `prompts` capability.
878    #[serde(default, skip_serializing_if = "Option::is_none")]
879    pub prompts: Option<PromptsCapability>,
880    /// Advertised `logging` capability.
881    #[serde(default, skip_serializing_if = "Option::is_none")]
882    pub logging: Option<LoggingCapability>,
883}
884
885impl McpServerCapabilities {
886    /// Returns a capabilities struct with every top-level capability
887    /// advertised. Useful for tests.
888    pub fn all() -> Self {
889        Self {
890            tools: Some(ToolsCapability::default()),
891            resources: Some(ResourcesCapability::default()),
892            prompts: Some(PromptsCapability::default()),
893            logging: Some(LoggingCapability::default()),
894        }
895    }
896}
897
898/// Tools sub-capability flags from the MCP `initialize` response.
899#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
900#[serde(rename_all = "camelCase")]
901pub struct ToolsCapability {
902    /// Server emits `notifications/tools/list_changed` when the catalog changes.
903    #[serde(default, skip_serializing_if = "Option::is_none")]
904    pub list_changed: Option<bool>,
905}
906
907/// Resources sub-capability flags from the MCP `initialize` response.
908#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
909#[serde(rename_all = "camelCase")]
910pub struct ResourcesCapability {
911    /// Server supports `resources/subscribe`.
912    #[serde(default, skip_serializing_if = "Option::is_none")]
913    pub subscribe: Option<bool>,
914    /// Server emits `notifications/resources/list_changed`.
915    #[serde(default, skip_serializing_if = "Option::is_none")]
916    pub list_changed: Option<bool>,
917}
918
919/// Prompts sub-capability flags from the MCP `initialize` response.
920#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
921#[serde(rename_all = "camelCase")]
922pub struct PromptsCapability {
923    /// Server emits `notifications/prompts/list_changed`.
924    #[serde(default, skip_serializing_if = "Option::is_none")]
925    pub list_changed: Option<bool>,
926}
927
928/// Logging sub-capability. Spec reserves the key with no defined sub-fields yet.
929#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
930pub struct LoggingCapability {}
931
932/// Server-originated catalog notifications observed by [`McpClientHandler`].
933///
934/// Drained by [`McpConnection`] inside
935/// [`McpServerManager::refresh_changed_catalogs`] to trigger re-discovery of
936/// the affected capability lists. For richer push-style consumption (progress,
937/// logging, resource updates, cancellation), subscribe via
938/// [`McpConnection::subscribe_events`] and pattern-match on
939/// [`McpServerEvent`].
940#[allow(clippy::enum_variant_names)]
941#[derive(Clone, Debug)]
942pub enum McpServerNotification {
943    /// Server announced `notifications/tools/list_changed`.
944    ToolsChanged,
945    /// Server announced `notifications/resources/list_changed`.
946    ResourcesChanged,
947    /// Server announced `notifications/prompts/list_changed`.
948    PromptsChanged,
949}
950
951/// Server-pushed events broadcast to every [`McpConnection::subscribe_events`]
952/// receiver.
953///
954/// Covers the rmcp client-handler notification surface that does not feed the
955/// catalog refresh path: progress, logging, resource updates, cancellation,
956/// plus list-changed announcements (also delivered over the legacy
957/// [`McpServerNotification`] channel).
958#[derive(Clone, Debug)]
959pub enum McpServerEvent {
960    /// `notifications/progress` from the server, scoped to a
961    /// `progress_token` issued in a previous request.
962    Progress(McpProgressNotificationParam),
963    /// `notifications/message` (server log emission). Drives the optional
964    /// log-level negotiation initiated by [`McpConnection::set_logging_level`].
965    Logging(McpLoggingMessageNotificationParam),
966    /// `notifications/resources/updated` for a resource the client previously
967    /// subscribed to via [`McpConnection::subscribe_resource`].
968    ResourceUpdated(McpResourceUpdatedNotificationParam),
969    /// `notifications/tools/list_changed`.
970    ToolListChanged,
971    /// `notifications/resources/list_changed`.
972    ResourceListChanged,
973    /// `notifications/prompts/list_changed`.
974    PromptListChanged,
975    /// `notifications/cancelled` from the server, requesting cancellation of
976    /// an in-flight client request.
977    Cancelled(McpCancelledNotificationParam),
978}
979
980/// Pluggable handler invoked when an MCP server issues `sampling/createMessage`.
981///
982/// Install one via [`McpHandlerConfig::with_sampling_responder`] to expose
983/// the host application's LLM as a sampling target for connected MCP servers.
984#[async_trait]
985pub trait McpSamplingResponder: Send + Sync + 'static {
986    /// Produces a sampled completion in response to a server-initiated
987    /// `sampling/createMessage` request.
988    async fn create_message(
989        &self,
990        params: McpCreateMessageRequestParams,
991    ) -> Result<McpCreateMessageResult, McpError>;
992}
993
994/// Pluggable handler invoked when an MCP server issues `elicitation/create`.
995///
996/// Install one via [`McpHandlerConfig::with_elicitation_responder`] to drive
997/// the host application's user-input UI.
998#[async_trait]
999pub trait McpElicitationResponder: Send + Sync + 'static {
1000    /// Returns the user's response to a server-initiated elicitation request.
1001    async fn create_elicitation(
1002        &self,
1003        params: McpCreateElicitationRequestParams,
1004    ) -> Result<McpCreateElicitationResult, McpError>;
1005}
1006
1007/// Pluggable handler invoked when an MCP server issues `roots/list`.
1008///
1009/// Install one via [`McpHandlerConfig::with_roots_provider`] to surface
1010/// workspace roots that scope the server's filesystem access.
1011#[async_trait]
1012pub trait McpRootsProvider: Send + Sync + 'static {
1013    /// Returns the roots the server should consider in scope.
1014    async fn list_roots(&self) -> Result<Vec<McpRoot>, McpError>;
1015}
1016
1017/// Default broadcast capacity for [`McpServerEvent`] subscribers.
1018const DEFAULT_EVENTS_CAPACITY: usize = 128;
1019
1020/// Channels paired with an [`McpClientHandler`] returned by
1021/// [`McpHandlerConfig::build`].
1022///
1023/// `notifications` is the legacy mpsc receiver consumed by the catalog refresh
1024/// path inside [`McpServerManager::refresh_changed_catalogs`]. `events` is the
1025/// broadcast sender that surfaces every [`McpServerEvent`] — clone it once and
1026/// pass it into [`McpConnection::from_running_service_with_events`] when
1027/// adopting an externally constructed [`rmcp::service::RunningService`]. If the
1028/// adopted connection also needs adapter-time hooks from the same
1029/// [`McpHandlerConfig`], use
1030/// [`McpConnection::from_running_service_with_events_and_handler_config`].
1031pub struct McpClientChannels {
1032    /// Legacy mpsc receiver for catalog list-changed announcements.
1033    pub notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1034    /// Broadcast sender that forwards every [`McpServerEvent`] to subscribers.
1035    pub events: broadcast::Sender<McpServerEvent>,
1036}
1037
1038/// rmcp [`ClientHandler`] used by [`McpConnection`].
1039///
1040/// You only need to construct this directly if you're wiring rmcp transports
1041/// that [`McpTransportBinding`] does not cover (in-memory pipes, websockets,
1042/// custom IO). Build one via [`McpHandlerConfig::build`], then pair the
1043/// resulting service with [`McpConnection::from_running_service`],
1044/// [`McpConnection::from_running_service_with_events`], or
1045/// [`McpConnection::from_running_service_with_events_and_handler_config`] when
1046/// the connection must preserve adapter-time hooks from the config.
1047#[derive(Clone)]
1048pub struct McpClientHandler {
1049    info: rmcp_model::ClientInfo,
1050    notifications: mpsc::UnboundedSender<McpServerNotification>,
1051    events: broadcast::Sender<McpServerEvent>,
1052    sampling: Option<Arc<dyn McpSamplingResponder>>,
1053    elicitation: Option<Arc<dyn McpElicitationResponder>>,
1054    roots: Option<Arc<dyn McpRootsProvider>>,
1055}
1056
1057impl ClientHandler for McpClientHandler {
1058    fn create_message(
1059        &self,
1060        params: rmcp_model::CreateMessageRequestParams,
1061        _context: rmcp::service::RequestContext<RoleClient>,
1062    ) -> impl Future<Output = Result<rmcp_model::CreateMessageResult, rmcp_model::ErrorData>>
1063    + rmcp::service::MaybeSendFuture
1064    + '_ {
1065        let responder = self.sampling.clone();
1066        async move {
1067            match responder {
1068                Some(responder) => responder.create_message(params).await.map_err(Into::into),
1069                None => Err(rmcp_model::ErrorData::method_not_found::<
1070                    rmcp_model::CreateMessageRequestMethod,
1071                >()),
1072            }
1073        }
1074    }
1075
1076    fn list_roots(
1077        &self,
1078        _context: rmcp::service::RequestContext<RoleClient>,
1079    ) -> impl Future<Output = Result<rmcp_model::ListRootsResult, rmcp_model::ErrorData>>
1080    + rmcp::service::MaybeSendFuture
1081    + '_ {
1082        let provider = self.roots.clone();
1083        async move {
1084            match provider {
1085                Some(provider) => provider
1086                    .list_roots()
1087                    .await
1088                    .map(McpListRootsResult::new)
1089                    .map_err(Into::into),
1090                None => Ok(McpListRootsResult::default()),
1091            }
1092        }
1093    }
1094
1095    fn create_elicitation(
1096        &self,
1097        params: rmcp_model::CreateElicitationRequestParams,
1098        _context: rmcp::service::RequestContext<RoleClient>,
1099    ) -> impl Future<Output = Result<rmcp_model::CreateElicitationResult, rmcp_model::ErrorData>>
1100    + rmcp::service::MaybeSendFuture
1101    + '_ {
1102        let responder = self.elicitation.clone();
1103        async move {
1104            match responder {
1105                Some(responder) => responder
1106                    .create_elicitation(params)
1107                    .await
1108                    .map_err(Into::into),
1109                None => Ok(McpCreateElicitationResult::new(
1110                    McpElicitationAction::Decline,
1111                )),
1112            }
1113        }
1114    }
1115
1116    fn on_progress(
1117        &self,
1118        params: rmcp_model::ProgressNotificationParam,
1119        _context: rmcp::service::NotificationContext<RoleClient>,
1120    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1121        let _ = self.events.send(McpServerEvent::Progress(params));
1122        std::future::ready(())
1123    }
1124
1125    fn on_logging_message(
1126        &self,
1127        params: rmcp_model::LoggingMessageNotificationParam,
1128        _context: rmcp::service::NotificationContext<RoleClient>,
1129    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1130        let _ = self.events.send(McpServerEvent::Logging(params));
1131        std::future::ready(())
1132    }
1133
1134    fn on_resource_updated(
1135        &self,
1136        params: rmcp_model::ResourceUpdatedNotificationParam,
1137        _context: rmcp::service::NotificationContext<RoleClient>,
1138    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1139        let _ = self.events.send(McpServerEvent::ResourceUpdated(params));
1140        std::future::ready(())
1141    }
1142
1143    fn on_cancelled(
1144        &self,
1145        params: rmcp_model::CancelledNotificationParam,
1146        _context: rmcp::service::NotificationContext<RoleClient>,
1147    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1148        let _ = self.events.send(McpServerEvent::Cancelled(params));
1149        std::future::ready(())
1150    }
1151
1152    fn on_tool_list_changed(
1153        &self,
1154        _context: rmcp::service::NotificationContext<RoleClient>,
1155    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1156        let _ = self.notifications.send(McpServerNotification::ToolsChanged);
1157        let _ = self.events.send(McpServerEvent::ToolListChanged);
1158        std::future::ready(())
1159    }
1160
1161    fn on_resource_list_changed(
1162        &self,
1163        _context: rmcp::service::NotificationContext<RoleClient>,
1164    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1165        let _ = self
1166            .notifications
1167            .send(McpServerNotification::ResourcesChanged);
1168        let _ = self.events.send(McpServerEvent::ResourceListChanged);
1169        std::future::ready(())
1170    }
1171
1172    fn on_prompt_list_changed(
1173        &self,
1174        _context: rmcp::service::NotificationContext<RoleClient>,
1175    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1176        let _ = self
1177            .notifications
1178            .send(McpServerNotification::PromptsChanged);
1179        let _ = self.events.send(McpServerEvent::PromptListChanged);
1180        std::future::ready(())
1181    }
1182
1183    fn get_info(&self) -> rmcp_model::ClientInfo {
1184        self.info.clone()
1185    }
1186}
1187
1188impl From<McpError> for rmcp_model::ErrorData {
1189    fn from(error: McpError) -> Self {
1190        rmcp_model::ErrorData::internal_error(error.to_string(), None)
1191    }
1192}
1193
1194type RmcpClientService = RunningService<RoleClient, McpClientHandler>;
1195
1196/// Configuration applied to every [`McpClientHandler`] this crate builds on
1197/// behalf of a connection or [`McpServerManager`].
1198///
1199/// Holds the optional sampling / elicitation / roots responders plus the
1200/// broadcast capacity for [`McpServerEvent`] subscribers. Pass an instance to
1201/// [`McpConnection::connect_with_handler`] to drive a single connection, or
1202/// install one on the manager via
1203/// [`McpServerManager::with_handler_config`] / per-trait builders.
1204#[derive(Clone, Default)]
1205pub struct McpHandlerConfig {
1206    /// Responder for server-initiated `sampling/createMessage` requests.
1207    pub sampling: Option<Arc<dyn McpSamplingResponder>>,
1208    /// Responder for server-initiated `elicitation/create` requests.
1209    pub elicitation: Option<Arc<dyn McpElicitationResponder>>,
1210    /// Provider for `roots/list`.
1211    pub roots: Option<Arc<dyn McpRootsProvider>>,
1212    /// Resolver for auth challenges raised during MCP operations. When
1213    /// installed, [`McpToolAdapter::invoke`] (and other operation paths)
1214    /// invoke the responder inline on auth challenges and retry — auth
1215    /// never surfaces as a loop interrupt.
1216    pub auth: Option<Arc<dyn McpAuthResponder>>,
1217    /// Handler invoked when an MCP server returns a JSON-RPC error for an
1218    /// invoked tool. When installed, [`McpToolAdapter::invoke`] forwards
1219    /// the typed [`McpInvocationError`] to the responder before falling
1220    /// back to [`ToolError::ExecutionFailed`]; the responder may synthesize
1221    /// a [`CallToolResult`] (the agent sees a successful tool call) or
1222    /// pass the error through unchanged.
1223    pub error_responder: Option<Arc<dyn McpErrorResponder>>,
1224    /// Broadcast capacity for the [`McpServerEvent`] channel. Defaults to
1225    /// `DEFAULT_EVENTS_CAPACITY` when `None`.
1226    pub events_capacity: Option<usize>,
1227}
1228
1229impl McpHandlerConfig {
1230    /// Returns an empty handler config.
1231    pub fn new() -> Self {
1232        Self::default()
1233    }
1234
1235    /// Sets the sampling responder.
1236    pub fn with_sampling_responder(mut self, responder: Arc<dyn McpSamplingResponder>) -> Self {
1237        self.sampling = Some(responder);
1238        self
1239    }
1240
1241    /// Sets the elicitation responder.
1242    pub fn with_elicitation_responder(
1243        mut self,
1244        responder: Arc<dyn McpElicitationResponder>,
1245    ) -> Self {
1246        self.elicitation = Some(responder);
1247        self
1248    }
1249
1250    /// Sets the roots provider.
1251    pub fn with_roots_provider(mut self, provider: Arc<dyn McpRootsProvider>) -> Self {
1252        self.roots = Some(provider);
1253        self
1254    }
1255
1256    /// Sets the auth responder.
1257    pub fn with_auth_responder(mut self, responder: Arc<dyn McpAuthResponder>) -> Self {
1258        self.auth = Some(responder);
1259        self
1260    }
1261
1262    /// Sets the invocation-error responder.
1263    pub fn with_error_responder(mut self, responder: Arc<dyn McpErrorResponder>) -> Self {
1264        self.error_responder = Some(responder);
1265        self
1266    }
1267
1268    /// Sets the broadcast capacity for [`McpServerEvent`] subscribers.
1269    pub fn with_events_capacity(mut self, capacity: usize) -> Self {
1270        self.events_capacity = Some(capacity);
1271        self
1272    }
1273
1274    /// Builds a handler together with a fresh [`McpClientChannels`] pair —
1275    /// the notification receiver and a new broadcast sender for
1276    /// [`McpServerEvent`].
1277    pub fn build(&self) -> (McpClientHandler, McpClientChannels) {
1278        self.build_inner(None)
1279    }
1280
1281    /// Builds a handler that publishes [`McpServerEvent`] into the provided
1282    /// broadcast sender. Use this when adopting an externally constructed
1283    /// rmcp service via [`McpConnection::from_running_service_with_events`]
1284    /// so subscribers see the same stream.
1285    pub fn build_with(
1286        &self,
1287        events: broadcast::Sender<McpServerEvent>,
1288    ) -> (McpClientHandler, McpClientChannels) {
1289        self.build_inner(Some(events))
1290    }
1291
1292    fn build_inner(
1293        &self,
1294        events: Option<broadcast::Sender<McpServerEvent>>,
1295    ) -> (McpClientHandler, McpClientChannels) {
1296        let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
1297        let events_tx = events.unwrap_or_else(|| {
1298            let capacity = self.events_capacity.unwrap_or(DEFAULT_EVENTS_CAPACITY);
1299            let (tx, _) = broadcast::channel(capacity);
1300            tx
1301        });
1302
1303        let mut capabilities = rmcp_model::ClientCapabilities::default();
1304        if self.sampling.is_some() {
1305            capabilities.sampling = Some(McpSamplingCapability::default());
1306        }
1307        if self.elicitation.is_some() {
1308            capabilities.elicitation = Some(McpElicitationCapability {
1309                form: Some(McpFormElicitationCapability::default()),
1310                url: None,
1311            });
1312        }
1313        if self.roots.is_some() {
1314            capabilities.roots = Some(McpRootsCapabilities::default());
1315        }
1316
1317        let handler = McpClientHandler {
1318            info: rmcp_model::ClientInfo::new(
1319                capabilities,
1320                rmcp_model::Implementation::new("agentkit-mcp", env!("CARGO_PKG_VERSION"))
1321                    .with_title("agentkit MCP client"),
1322            )
1323            .with_protocol_version(rmcp_model::ProtocolVersion::LATEST),
1324            notifications: notifications_tx,
1325            events: events_tx.clone(),
1326            sampling: self.sampling.clone(),
1327            elicitation: self.elicitation.clone(),
1328            roots: self.roots.clone(),
1329        };
1330
1331        (
1332            handler,
1333            McpClientChannels {
1334                notifications: notifications_rx,
1335                events: events_tx,
1336            },
1337        )
1338    }
1339}
1340
1341/// A live connection to a single MCP server, wrapping an
1342/// [`rmcp::service::RunningService`].
1343pub struct McpConnection {
1344    server_id: McpServerId,
1345    config: Option<McpServerConfig>,
1346    inner: Mutex<RmcpClientService>,
1347    peer: RwLock<Peer<RoleClient>>,
1348    auth: Mutex<Option<MetadataMap>>,
1349    notifications: Mutex<mpsc::UnboundedReceiver<McpServerNotification>>,
1350    events: broadcast::Sender<McpServerEvent>,
1351    handler_config: McpHandlerConfig,
1352    capabilities: McpServerCapabilities,
1353}
1354
1355/// The result of replaying an MCP operation after auth resolution.
1356#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1357pub enum McpOperationResult {
1358    /// The server was successfully (re)connected; contains the discovery snapshot.
1359    Connected(McpDiscoverySnapshot),
1360    /// A tool call completed; contains the typed rmcp [`CallToolResult`].
1361    Tool(CallToolResult),
1362    /// A resource was read successfully.
1363    Resource(ReadResourceResult),
1364    /// A prompt was retrieved successfully.
1365    Prompt(GetPromptResult),
1366}
1367
1368impl McpConnection {
1369    /// Connects to an MCP server, performs the rmcp `initialize` handshake,
1370    /// and returns a ready-to-use connection. No sampling / elicitation /
1371    /// roots responders are wired; use [`Self::connect_with_handler`] when
1372    /// the server may issue those requests.
1373    pub async fn connect(config: &McpServerConfig) -> Result<Self, McpError> {
1374        Self::connect_with_auth(config, None, McpHandlerConfig::default()).await
1375    }
1376
1377    /// Connects to an MCP server with a fully configured [`McpHandlerConfig`].
1378    pub async fn connect_with_handler(
1379        config: &McpServerConfig,
1380        handler_config: McpHandlerConfig,
1381    ) -> Result<Self, McpError> {
1382        Self::connect_with_auth(config, None, handler_config).await
1383    }
1384
1385    async fn connect_with_auth(
1386        config: &McpServerConfig,
1387        auth: Option<&MetadataMap>,
1388        handler_config: McpHandlerConfig,
1389    ) -> Result<Self, McpError> {
1390        let (handler, channels) = handler_config.build();
1391        let McpClientChannels {
1392            notifications: notification_rx,
1393            events: events_tx,
1394        } = channels;
1395        let (service, capabilities) = match &config.transport {
1396            McpTransportBinding::Stdio(binding) => {
1397                connect_rmcp_stdio(config, binding, handler).await?
1398            }
1399            McpTransportBinding::StreamableHttp(binding) => {
1400                connect_rmcp_streamable_http(config, binding, auth, handler).await?
1401            }
1402        };
1403
1404        let peer = service.peer().clone();
1405        Ok(Self {
1406            server_id: config.id.clone(),
1407            config: Some(config.clone()),
1408            inner: Mutex::new(service),
1409            peer: RwLock::new(peer),
1410            auth: Mutex::new(auth.cloned()),
1411            notifications: Mutex::new(notification_rx),
1412            events: events_tx,
1413            handler_config,
1414            capabilities,
1415        })
1416    }
1417
1418    /// Adopts an externally constructed [`rmcp::service::RunningService`] as
1419    /// an [`McpConnection`].
1420    ///
1421    /// Use this when you need a transport rmcp supports but
1422    /// [`McpTransportBinding`] does not (in-memory pipes for tests, websockets,
1423    /// custom IO). Pair the service with the notification receiver returned by
1424    /// [`McpHandlerConfig::build`] so list-change notifications stay
1425    /// observable.
1426    ///
1427    /// The connection has no [`McpServerConfig`] attached, so reconnect-on-auth
1428    /// is unavailable; [`resolve_auth`](Self::resolve_auth) only updates stored
1429    /// credentials in this mode. Server-pushed events from the underlying
1430    /// handler are *not* forwarded to subscribers — use
1431    /// [`Self::from_running_service_with_events`] paired with the broadcast
1432    /// sender from [`McpClientChannels`] when you need event delivery.
1433    pub fn from_running_service(
1434        server_id: impl Into<McpServerId>,
1435        service: RmcpClientService,
1436        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1437    ) -> Self {
1438        let (events_tx, _) = broadcast::channel(DEFAULT_EVENTS_CAPACITY);
1439        Self::from_running_service_with_events(server_id, service, notifications, events_tx)
1440    }
1441
1442    /// Variant of [`Self::from_running_service`] that wires the broadcast
1443    /// sender returned by [`McpHandlerConfig::build`] (or [`build_with`])
1444    /// so [`Self::subscribe_events`] receivers observe the same stream the
1445    /// handler is publishing into.
1446    ///
1447    /// [`build_with`]: McpHandlerConfig::build_with
1448    pub fn from_running_service_with_events(
1449        server_id: impl Into<McpServerId>,
1450        service: RmcpClientService,
1451        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1452        events: broadcast::Sender<McpServerEvent>,
1453    ) -> Self {
1454        Self::from_running_service_with_events_and_handler_config(
1455            server_id,
1456            service,
1457            notifications,
1458            events,
1459            McpHandlerConfig::default(),
1460        )
1461    }
1462
1463    /// Variant of [`Self::from_running_service_with_events`] that also
1464    /// preserves the handler config used to build the adopted service.
1465    ///
1466    /// Use this when the connection needs to reach client-side hooks that are
1467    /// not carried by the rmcp handler itself, such as [`McpAuthResponder`] and
1468    /// [`McpErrorResponder`] during [`McpToolAdapter`] invocation.
1469    pub fn from_running_service_with_events_and_handler_config(
1470        server_id: impl Into<McpServerId>,
1471        service: RmcpClientService,
1472        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1473        events: broadcast::Sender<McpServerEvent>,
1474        handler_config: McpHandlerConfig,
1475    ) -> Self {
1476        let capabilities = service
1477            .peer_info()
1478            .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1479            .unwrap_or_default();
1480        let peer = service.peer().clone();
1481        Self {
1482            server_id: server_id.into(),
1483            config: None,
1484            inner: Mutex::new(service),
1485            peer: RwLock::new(peer),
1486            auth: Mutex::new(None),
1487            notifications: Mutex::new(notifications),
1488            events,
1489            handler_config,
1490            capabilities,
1491        }
1492    }
1493
1494    async fn reconnect_inner(&self, auth: Option<&MetadataMap>) -> Result<(), McpError> {
1495        let Some(config) = self.config.clone() else {
1496            return Ok(());
1497        };
1498        let (handler, channels) = self.handler_config.build_with(self.events.clone());
1499        let McpClientChannels {
1500            notifications: notification_rx,
1501            ..
1502        } = channels;
1503        let (service, _capabilities) = match &config.transport {
1504            McpTransportBinding::Stdio(binding) => {
1505                connect_rmcp_stdio(&config, binding, handler).await?
1506            }
1507            McpTransportBinding::StreamableHttp(binding) => {
1508                connect_rmcp_streamable_http(&config, binding, auth, handler).await?
1509            }
1510        };
1511        let new_peer = service.peer().clone();
1512        *self.notifications.lock().await = notification_rx;
1513        *self.inner.lock().await = service;
1514        *self.peer.write().expect("MCP peer lock poisoned") = new_peer;
1515        Ok(())
1516    }
1517
1518    fn peer(&self) -> Peer<RoleClient> {
1519        self.peer.read().expect("MCP peer lock poisoned").clone()
1520    }
1521
1522    /// Returns the [`McpServerId`] for this connection.
1523    pub fn server_id(&self) -> &McpServerId {
1524        &self.server_id
1525    }
1526
1527    /// Returns the capabilities advertised by the server during `initialize`.
1528    pub fn capabilities(&self) -> &McpServerCapabilities {
1529        &self.capabilities
1530    }
1531
1532    /// Returns the [`McpHandlerConfig`] this connection was built with.
1533    /// Used by [`McpToolAdapter`] to reach the registered
1534    /// [`McpAuthResponder`] when an auth challenge surfaces.
1535    pub fn handler_config(&self) -> &McpHandlerConfig {
1536        &self.handler_config
1537    }
1538
1539    /// Subscribes to the per-connection [`McpServerEvent`] broadcast.
1540    ///
1541    /// Receivers buffer up to `events_capacity` (configured via
1542    /// [`McpHandlerConfig::with_events_capacity`], defaults to
1543    /// `DEFAULT_EVENTS_CAPACITY`) before slow consumers are signalled with
1544    /// [`broadcast::error::RecvError::Lagged`]. Catalog `*ListChanged` events
1545    /// are also delivered through the legacy [`McpServerNotification`]
1546    /// receiver consumed by [`McpServerManager::refresh_changed_catalogs`].
1547    pub fn subscribe_events(&self) -> broadcast::Receiver<McpServerEvent> {
1548        self.events.subscribe()
1549    }
1550
1551    /// Subscribes to `notifications/resources/updated` for the given URI.
1552    ///
1553    /// Updates surface as [`McpServerEvent::ResourceUpdated`] on every
1554    /// receiver returned by [`Self::subscribe_events`].
1555    pub async fn subscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1556        let uri = uri.into();
1557        self.peer()
1558            .subscribe(rmcp_model::SubscribeRequestParams::new(uri.clone()))
1559            .await
1560            .map_err(|error| {
1561                rmcp_operation_error(
1562                    &self.server_id,
1563                    McpMethod::ResourcesSubscribe { uri },
1564                    error,
1565                )
1566            })
1567    }
1568
1569    /// Cancels a previous [`Self::subscribe_resource`] subscription.
1570    pub async fn unsubscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1571        let uri = uri.into();
1572        self.peer()
1573            .unsubscribe(rmcp_model::UnsubscribeRequestParams::new(uri.clone()))
1574            .await
1575            .map_err(|error| {
1576                rmcp_operation_error(
1577                    &self.server_id,
1578                    McpMethod::ResourcesUnsubscribe { uri },
1579                    error,
1580                )
1581            })
1582    }
1583
1584    /// Negotiates the minimum severity the server should emit through
1585    /// `notifications/message`. Surfaced as [`McpServerEvent::Logging`].
1586    pub async fn set_logging_level(&self, level: McpLoggingLevel) -> Result<(), McpError> {
1587        self.peer()
1588            .set_level(rmcp_model::SetLevelRequestParams::new(level))
1589            .await
1590            .map_err(|error| {
1591                rmcp_operation_error(
1592                    &self.server_id,
1593                    McpMethod::LoggingSetLevel {
1594                        level: format!("{level:?}"),
1595                    },
1596                    error,
1597                )
1598            })
1599    }
1600
1601    /// Sends a `notifications/cancelled` to the server, asking it to stop
1602    /// processing a previously issued request.
1603    pub async fn notify_cancelled(
1604        &self,
1605        params: McpCancelledNotificationParam,
1606    ) -> Result<(), McpError> {
1607        self.peer()
1608            .notify_cancelled(params)
1609            .await
1610            .map_err(rmcp_service_error)
1611    }
1612
1613    /// Notifies the server that the client's roots list has changed; servers
1614    /// may respond by re-issuing `roots/list`.
1615    pub async fn notify_roots_list_changed(&self) -> Result<(), McpError> {
1616        self.peer()
1617            .notify_roots_list_changed()
1618            .await
1619            .map_err(rmcp_service_error)
1620    }
1621
1622    /// Gracefully closes the underlying rmcp service.
1623    ///
1624    /// For Streamable HTTP this drives the rmcp transport to issue a `DELETE`
1625    /// against the negotiated session, releasing server-side state.
1626    pub async fn close(&self) -> Result<(), McpError> {
1627        let mut inner = self.inner.lock().await;
1628        inner
1629            .close()
1630            .await
1631            .map(|_| ())
1632            .map_err(|error| McpError::Transport(format!("rmcp service close failed: {error}")))
1633    }
1634
1635    /// Stores or clears authentication credentials and, when configured to do
1636    /// so via [`McpServerConfig`], reconnects to apply them.
1637    pub async fn resolve_auth(&self, resolution: AuthResolution) -> Result<(), McpError> {
1638        let mut auth_slot = self.auth.lock().await;
1639        match resolution {
1640            AuthResolution::Provided { credentials, .. } => {
1641                *auth_slot = Some(credentials);
1642            }
1643            AuthResolution::Cancelled { .. } => {
1644                *auth_slot = None;
1645            }
1646        }
1647        let snapshot = auth_slot.clone();
1648        drop(auth_slot);
1649        // Only reconnect if we have a config to reconnect with. Without one
1650        // (e.g. constructed via [`from_running_service`]) the auth is stored
1651        // but not pushed to the live transport.
1652        if self.config.is_some() {
1653            self.reconnect_inner(snapshot.as_ref()).await?;
1654        }
1655        Ok(())
1656    }
1657
1658    /// Discovers tools, resources, and prompts that the server advertised.
1659    pub async fn discover(&self) -> Result<McpDiscoverySnapshot, McpError> {
1660        let tools = async {
1661            match self.capabilities.tools {
1662                Some(_) => self.list_tools().await,
1663                None => Ok(Vec::new()),
1664            }
1665        };
1666        let resources = async {
1667            match self.capabilities.resources {
1668                Some(_) => self.list_resources().await,
1669                None => Ok(Vec::new()),
1670            }
1671        };
1672        let prompts = async {
1673            match self.capabilities.prompts {
1674                Some(_) => self.list_prompts().await,
1675                None => Ok(Vec::new()),
1676            }
1677        };
1678        let (tools, resources, prompts) = tokio::try_join!(tools, resources, prompts)?;
1679        Ok(McpDiscoverySnapshot {
1680            server_id: self.server_id.clone(),
1681            tools,
1682            resources,
1683            prompts,
1684            metadata: MetadataMap::new(),
1685        })
1686    }
1687
1688    async fn drain_notifications(&self) -> Vec<McpServerNotification> {
1689        let mut notifications = self.notifications.lock().await;
1690        let mut drained = Vec::new();
1691        while let Ok(notification) = notifications.try_recv() {
1692            drained.push(notification);
1693        }
1694        drained
1695    }
1696
1697    /// Lists all tools advertised by the connected MCP server.
1698    pub async fn list_tools(&self) -> Result<Vec<McpTool>, McpError> {
1699        self.peer()
1700            .list_all_tools()
1701            .await
1702            .map_err(rmcp_service_error)
1703    }
1704
1705    /// Lists all resources advertised by the connected MCP server.
1706    pub async fn list_resources(&self) -> Result<Vec<McpResource>, McpError> {
1707        self.peer()
1708            .list_all_resources()
1709            .await
1710            .map_err(rmcp_service_error)
1711    }
1712
1713    /// Lists all prompts advertised by the connected MCP server.
1714    pub async fn list_prompts(&self) -> Result<Vec<McpPrompt>, McpError> {
1715        self.peer()
1716            .list_all_prompts()
1717            .await
1718            .map_err(rmcp_service_error)
1719    }
1720
1721    /// Invokes a tool on the MCP server.
1722    ///
1723    /// Returns the typed [`CallToolResult`] — the [`Vec<Content>`] block list,
1724    /// the optional `structured_content` field, and the `is_error` flag are
1725    /// all preserved. Adapters convert this into agentkit
1726    /// [`ToolOutput`]/[`InvocableOutput`] at the boundary.
1727    pub async fn call_tool(
1728        &self,
1729        name: &str,
1730        arguments: Value,
1731    ) -> Result<CallToolResult, McpError> {
1732        let arguments_for_auth = arguments.clone();
1733        let mut params = rmcp_model::CallToolRequestParams::new(name.to_string());
1734        if !arguments.is_null() {
1735            params =
1736                params.with_arguments(value_to_json_object(arguments, "tools/call arguments")?);
1737        }
1738        let name_owned = name.to_string();
1739        self.peer().call_tool(params).await.map_err(|error| {
1740            rmcp_operation_error(
1741                &self.server_id,
1742                McpMethod::ToolsCall {
1743                    name: name_owned,
1744                    arguments: arguments_for_auth,
1745                },
1746                error,
1747            )
1748        })
1749    }
1750
1751    /// Reads a resource from the MCP server by URI.
1752    ///
1753    /// Returns the typed [`ReadResourceResult`] — the full
1754    /// [`Vec<McpResourceContents>`] is preserved (text vs blob, mime types,
1755    /// metadata). Use [`McpResourceHandle`] for the agentkit
1756    /// [`ResourceProvider`] view that collapses to a single inline `DataRef`.
1757    pub async fn read_resource(&self, uri: &str) -> Result<ReadResourceResult, McpError> {
1758        let uri_owned = uri.to_string();
1759        self.peer()
1760            .read_resource(rmcp_model::ReadResourceRequestParams::new(uri))
1761            .await
1762            .map_err(|error| {
1763                rmcp_operation_error(
1764                    &self.server_id,
1765                    McpMethod::ResourcesRead { uri: uri_owned },
1766                    error,
1767                )
1768            })
1769    }
1770
1771    /// Retrieves a prompt from the MCP server, rendering it with the given
1772    /// arguments.
1773    ///
1774    /// Returns the typed [`GetPromptResult`] — message role and content
1775    /// blocks (text/image/audio/embedded resource) are preserved. Use
1776    /// [`McpPromptHandle`] for the collapsed agentkit [`PromptProvider`]
1777    /// view.
1778    pub async fn get_prompt(
1779        &self,
1780        name: &str,
1781        arguments: Value,
1782    ) -> Result<GetPromptResult, McpError> {
1783        let arguments_for_auth = arguments.clone();
1784        let name_owned = name.to_string();
1785        let mut params = rmcp_model::GetPromptRequestParams::new(name);
1786        if !arguments.is_null() {
1787            params =
1788                params.with_arguments(value_to_json_object(arguments, "prompts/get arguments")?);
1789        }
1790        self.peer().get_prompt(params).await.map_err(|error| {
1791            rmcp_operation_error(
1792                &self.server_id,
1793                McpMethod::PromptsGet {
1794                    name: name_owned,
1795                    arguments: arguments_for_auth,
1796                },
1797                error,
1798            )
1799        })
1800    }
1801}
1802
1803async fn connect_rmcp_stdio(
1804    config: &McpServerConfig,
1805    binding: &StdioTransportConfig,
1806    handler: McpClientHandler,
1807) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1808    let transport = TokioChildProcess::new(
1809        tokio::process::Command::new(&binding.command).configure(|command| {
1810            command.args(&binding.args);
1811            if let Some(cwd) = &binding.cwd {
1812                command.current_dir(cwd);
1813            }
1814            for (key, value) in &binding.env {
1815                command.env(key, value);
1816            }
1817        }),
1818    )
1819    .map_err(McpError::Io)?;
1820
1821    let service = handler
1822        .serve(transport)
1823        .await
1824        .map_err(|error| rmcp_initialize_error(config, error))?;
1825    let capabilities = service
1826        .peer_info()
1827        .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1828        .unwrap_or_default();
1829
1830    Ok((service, capabilities))
1831}
1832
1833async fn connect_rmcp_streamable_http(
1834    config: &McpServerConfig,
1835    binding: &StreamableHttpTransportConfig,
1836    auth: Option<&MetadataMap>,
1837    handler: McpClientHandler,
1838) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1839    let auth_header = auth
1840        .and_then(bearer_token_from_metadata)
1841        .or_else(|| binding.bearer_token.clone());
1842    let mut rmcp_config = RmcpStreamableHttpClientTransportConfig::with_uri(binding.url.clone());
1843    if let Some(auth_header) = auth_header {
1844        rmcp_config = rmcp_config.auth_header(auth_header);
1845    }
1846    rmcp_config = rmcp_config.custom_headers(binding.headers.iter().cloned().collect());
1847
1848    let result = match binding.http_client.as_ref() {
1849        Some(client) => {
1850            let transport = StreamableHttpClientTransport::with_client(
1851                DynHttpClient(client.clone()),
1852                rmcp_config,
1853            );
1854            handler.serve(transport).await
1855        }
1856        None => {
1857            let transport = StreamableHttpClientTransport::from_config(rmcp_config);
1858            handler.serve(transport).await
1859        }
1860    };
1861    let service = result.map_err(|error| rmcp_initialize_error(config, error))?;
1862    let capabilities = service
1863        .peer_info()
1864        .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1865        .unwrap_or_default();
1866
1867    Ok((service, capabilities))
1868}
1869
1870/// Adapter exposing a single MCP resource as a [`ResourceProvider`].
1871pub struct McpResourceHandle {
1872    connection: Arc<McpConnection>,
1873    descriptor: ResourceDescriptor,
1874}
1875
1876#[async_trait]
1877impl ResourceProvider for McpResourceHandle {
1878    async fn list_resources(&self) -> Result<Vec<ResourceDescriptor>, CapabilityError> {
1879        Ok(vec![self.descriptor.clone()])
1880    }
1881
1882    async fn read_resource(
1883        &self,
1884        id: &ResourceId,
1885        _ctx: &mut CapabilityContext<'_>,
1886    ) -> Result<ResourceContents, CapabilityError> {
1887        let result = self
1888            .connection
1889            .read_resource(&id.0)
1890            .await
1891            .map_err(|error| match error {
1892                McpError::AuthRequired(request) => {
1893                    CapabilityError::Unavailable(format!("auth required: {:?}", request))
1894                }
1895                other => CapabilityError::ExecutionFailed(other.to_string()),
1896            })?;
1897        read_resource_result_to_capabilities(result)
1898            .map_err(|error| CapabilityError::ExecutionFailed(error.to_string()))
1899    }
1900}
1901
1902/// Adapter exposing a single MCP prompt as a [`PromptProvider`].
1903pub struct McpPromptHandle {
1904    connection: Arc<McpConnection>,
1905    descriptor: PromptDescriptor,
1906}
1907
1908#[async_trait]
1909impl PromptProvider for McpPromptHandle {
1910    async fn list_prompts(&self) -> Result<Vec<PromptDescriptor>, CapabilityError> {
1911        Ok(vec![self.descriptor.clone()])
1912    }
1913
1914    async fn get_prompt(
1915        &self,
1916        id: &PromptId,
1917        args: Value,
1918        _ctx: &mut CapabilityContext<'_>,
1919    ) -> Result<PromptContents, CapabilityError> {
1920        let result =
1921            self.connection
1922                .get_prompt(&id.0, args)
1923                .await
1924                .map_err(|error| match error {
1925                    McpError::AuthRequired(request) => {
1926                        CapabilityError::Unavailable(format!("auth required: {:?}", request))
1927                    }
1928                    other => CapabilityError::ExecutionFailed(other.to_string()),
1929                })?;
1930        Ok(get_prompt_result_to_capabilities(result))
1931    }
1932}
1933
1934/// A [`CapabilityProvider`] that surfaces MCP tools, resources, and prompts.
1935///
1936/// The tool side is built by wrapping [`McpToolAdapter`]s in
1937/// [`agentkit_tools_core::ToolInvocableAdapter`], so the same
1938/// permission-check + adapter-spec plumbing the rest of agentkit uses also
1939/// applies to MCP tools — this crate no longer ships its own
1940/// `McpInvocable`.
1941pub struct McpCapabilityProvider {
1942    invocables: Vec<Arc<dyn Invocable>>,
1943    resources: Vec<Arc<dyn ResourceProvider>>,
1944    prompts: Vec<Arc<dyn PromptProvider>>,
1945}
1946
1947impl McpCapabilityProvider {
1948    /// Builds a capability provider from an existing connection and snapshot,
1949    /// using the [`McpToolNamespace::Default`] tool naming strategy.
1950    pub fn from_snapshot(connection: Arc<McpConnection>, snapshot: &McpDiscoverySnapshot) -> Self {
1951        Self::from_snapshot_with_namespace(connection, snapshot, &McpToolNamespace::Default)
1952    }
1953
1954    /// Builds a capability provider with a custom tool naming strategy.
1955    pub fn from_snapshot_with_namespace(
1956        connection: Arc<McpConnection>,
1957        snapshot: &McpDiscoverySnapshot,
1958        namespace: &McpToolNamespace,
1959    ) -> Self {
1960        let server_id = connection.server_id().clone();
1961        let registry =
1962            snapshot
1963                .tools
1964                .iter()
1965                .cloned()
1966                .fold(ToolRegistry::new(), |registry, tool| {
1967                    registry.with(McpToolAdapter::with_namespace(
1968                        &server_id,
1969                        connection.clone(),
1970                        tool,
1971                        namespace,
1972                    ))
1973                });
1974        let permissions: Arc<dyn PermissionChecker> = Arc::new(AllowAllPermissions);
1975        let resources_arc: Arc<dyn agentkit_tools_core::ToolResources> = Arc::new(());
1976        let invocables =
1977            ToolCapabilityProvider::from_registry(&registry, permissions, resources_arc)
1978                .invocables();
1979
1980        let resources = snapshot
1981            .resources
1982            .iter()
1983            .cloned()
1984            .map(|resource| {
1985                Arc::new(McpResourceHandle {
1986                    connection: connection.clone(),
1987                    descriptor: resource_descriptor_from_rmcp(resource),
1988                }) as Arc<dyn ResourceProvider>
1989            })
1990            .collect();
1991
1992        let prompts = snapshot
1993            .prompts
1994            .iter()
1995            .cloned()
1996            .map(|prompt| {
1997                Arc::new(McpPromptHandle {
1998                    connection: connection.clone(),
1999                    descriptor: prompt_descriptor_from_rmcp(prompt),
2000                }) as Arc<dyn PromptProvider>
2001            })
2002            .collect();
2003
2004        Self {
2005            invocables,
2006            resources,
2007            prompts,
2008        }
2009    }
2010
2011    /// Merges multiple capability providers into one.
2012    pub fn merge<I>(providers: I) -> Self
2013    where
2014        I: IntoIterator<Item = Self>,
2015    {
2016        let mut invocables = Vec::new();
2017        let mut resources = Vec::new();
2018        let mut prompts = Vec::new();
2019
2020        for provider in providers {
2021            invocables.extend(provider.invocables);
2022            resources.extend(provider.resources);
2023            prompts.extend(provider.prompts);
2024        }
2025
2026        Self {
2027            invocables,
2028            resources,
2029            prompts,
2030        }
2031    }
2032
2033    /// Connects to an MCP server, performs discovery, and builds a provider.
2034    pub async fn connect(
2035        config: &McpServerConfig,
2036    ) -> Result<(Arc<McpConnection>, Self, McpDiscoverySnapshot), McpError> {
2037        let connection = Arc::new(McpConnection::connect(config).await?);
2038        let snapshot = connection.discover().await?;
2039        let provider = Self::from_snapshot(connection.clone(), &snapshot);
2040
2041        Ok((connection, provider, snapshot))
2042    }
2043}
2044
2045impl CapabilityProvider for McpCapabilityProvider {
2046    fn invocables(&self) -> Vec<Arc<dyn Invocable>> {
2047        self.invocables.clone()
2048    }
2049
2050    fn resources(&self) -> Vec<Arc<dyn ResourceProvider>> {
2051        self.resources.clone()
2052    }
2053
2054    fn prompts(&self) -> Vec<Arc<dyn PromptProvider>> {
2055        self.prompts.clone()
2056    }
2057}
2058
2059/// A connected MCP server together with its configuration and snapshot.
2060#[derive(Clone)]
2061pub struct McpServerHandle {
2062    config: McpServerConfig,
2063    connection: Arc<McpConnection>,
2064    snapshot: McpDiscoverySnapshot,
2065    namespace: McpToolNamespace,
2066}
2067
2068impl McpServerHandle {
2069    /// Returns the original configuration used to connect this server.
2070    pub fn config(&self) -> &McpServerConfig {
2071        &self.config
2072    }
2073
2074    /// Returns the server's unique identifier.
2075    pub fn server_id(&self) -> &McpServerId {
2076        self.connection.server_id()
2077    }
2078
2079    /// Returns a shared reference to the underlying [`McpConnection`].
2080    pub fn connection(&self) -> Arc<McpConnection> {
2081        self.connection.clone()
2082    }
2083
2084    /// Returns the discovery snapshot captured when the server was connected.
2085    pub fn snapshot(&self) -> &McpDiscoverySnapshot {
2086        &self.snapshot
2087    }
2088
2089    /// Returns the tool naming strategy in effect for this server.
2090    pub fn namespace(&self) -> &McpToolNamespace {
2091        &self.namespace
2092    }
2093
2094    /// Builds a [`ToolRegistry`] containing an [`McpToolAdapter`] for each tool.
2095    pub fn tool_registry(&self) -> ToolRegistry {
2096        self.snapshot
2097            .tools
2098            .iter()
2099            .cloned()
2100            .fold(ToolRegistry::new(), |registry, tool| {
2101                registry.with(McpToolAdapter::with_namespace(
2102                    self.server_id(),
2103                    self.connection.clone(),
2104                    tool,
2105                    &self.namespace,
2106                ))
2107            })
2108    }
2109
2110    /// Builds an [`McpCapabilityProvider`] from this server's snapshot.
2111    pub fn capability_provider(&self) -> McpCapabilityProvider {
2112        McpCapabilityProvider::from_snapshot_with_namespace(
2113            self.connection.clone(),
2114            &self.snapshot,
2115            &self.namespace,
2116        )
2117    }
2118}
2119
2120/// Manages the lifecycle of one or more MCP servers.
2121pub struct McpServerManager {
2122    configs: BTreeMap<McpServerId, McpServerConfig>,
2123    connections: BTreeMap<McpServerId, McpServerHandle>,
2124    auth: BTreeMap<McpServerId, MetadataMap>,
2125    catalog_tx: broadcast::Sender<McpCatalogEvent>,
2126    namespace: McpToolNamespace,
2127    handler_config: McpHandlerConfig,
2128    catalog_writer: CatalogWriter,
2129    /// Agentkit-namespaced tool names this manager has registered for each
2130    /// connected server. Used to perform surgical writes against the
2131    /// [`CatalogWriter`] on connect/disconnect/refresh without rebuilding
2132    /// the whole catalog.
2133    server_tools: BTreeMap<McpServerId, BTreeSet<ToolName>>,
2134}
2135
2136impl Default for McpServerManager {
2137    fn default() -> Self {
2138        let (catalog_tx, _) = broadcast::channel(128);
2139        let (catalog_writer, _) = dynamic_catalog("mcp");
2140        Self {
2141            configs: BTreeMap::new(),
2142            connections: BTreeMap::new(),
2143            auth: BTreeMap::new(),
2144            catalog_tx,
2145            namespace: McpToolNamespace::Default,
2146            handler_config: McpHandlerConfig::default(),
2147            catalog_writer,
2148            server_tools: BTreeMap::new(),
2149        }
2150    }
2151}
2152
2153impl McpServerManager {
2154    /// Creates an empty server manager with no registered servers.
2155    pub fn new() -> Self {
2156        Self::default()
2157    }
2158
2159    /// Sets the tool naming strategy for every adapter built by this manager.
2160    pub fn with_namespace(mut self, namespace: McpToolNamespace) -> Self {
2161        self.namespace = namespace;
2162        self
2163    }
2164
2165    /// Replaces the tool naming strategy in place.
2166    pub fn set_namespace(&mut self, namespace: McpToolNamespace) -> &mut Self {
2167        self.namespace = namespace;
2168        self
2169    }
2170
2171    /// Returns the active tool naming strategy.
2172    pub fn namespace(&self) -> &McpToolNamespace {
2173        &self.namespace
2174    }
2175
2176    /// Replaces the [`McpHandlerConfig`] applied to every connection this
2177    /// manager opens.
2178    pub fn with_handler_config(mut self, handler_config: McpHandlerConfig) -> Self {
2179        self.handler_config = handler_config;
2180        self
2181    }
2182
2183    /// Sets the [`McpHandlerConfig`] in place.
2184    pub fn set_handler_config(&mut self, handler_config: McpHandlerConfig) -> &mut Self {
2185        self.handler_config = handler_config;
2186        self
2187    }
2188
2189    /// Returns the active [`McpHandlerConfig`].
2190    pub fn handler_config(&self) -> &McpHandlerConfig {
2191        &self.handler_config
2192    }
2193
2194    /// Registers a server configuration. Returns `self` for chaining.
2195    pub fn with_server(mut self, config: McpServerConfig) -> Self {
2196        self.register_server(config);
2197        self
2198    }
2199
2200    /// Registers a server configuration by mutable reference.
2201    pub fn register_server(&mut self, config: McpServerConfig) -> &mut Self {
2202        self.configs.insert(config.id.clone(), config);
2203        self
2204    }
2205
2206    /// Returns the handle for a connected server, or `None` if not connected.
2207    pub fn connected_server(&self, server_id: &McpServerId) -> Option<&McpServerHandle> {
2208        self.connections.get(server_id)
2209    }
2210
2211    /// Returns handles for all currently connected servers.
2212    pub fn connected_servers(&self) -> Vec<&McpServerHandle> {
2213        self.connections.values().collect()
2214    }
2215
2216    /// Subscribes to MCP catalog and lifecycle events.
2217    pub fn subscribe_catalog_events(&self) -> broadcast::Receiver<McpCatalogEvent> {
2218        self.catalog_tx.subscribe()
2219    }
2220
2221    fn emit_catalog_event(&self, event: McpCatalogEvent) {
2222        let _ = self.catalog_tx.send(event);
2223    }
2224
2225    /// Connects a single registered server by its identifier.
2226    pub async fn connect_server(
2227        &mut self,
2228        server_id: &McpServerId,
2229    ) -> Result<McpServerHandle, McpError> {
2230        let config = self
2231            .configs
2232            .get(server_id)
2233            .cloned()
2234            .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2235        let connection = Arc::new(
2236            McpConnection::connect_with_auth(
2237                &config,
2238                self.auth.get(server_id),
2239                self.handler_config.clone(),
2240            )
2241            .await?,
2242        );
2243        let snapshot = connection.discover().await?;
2244        let handle = McpServerHandle {
2245            config,
2246            connection,
2247            snapshot,
2248            namespace: self.namespace.clone(),
2249        };
2250        self.connections.insert(server_id.clone(), handle.clone());
2251        self.register_server_tools(server_id, &handle.snapshot);
2252        self.emit_catalog_event(McpCatalogEvent::ServerConnected {
2253            server_id: server_id.clone(),
2254        });
2255        Ok(handle)
2256    }
2257
2258    /// Connects all registered servers concurrently.
2259    pub async fn connect_all(&mut self) -> Result<Vec<McpServerHandle>, McpError> {
2260        let plans: Vec<(McpServerId, McpServerConfig, Option<MetadataMap>)> = self
2261            .configs
2262            .iter()
2263            .map(|(id, cfg)| (id.clone(), cfg.clone(), self.auth.get(id).cloned()))
2264            .collect();
2265        let handler_config = self.handler_config.clone();
2266        let namespace = self.namespace.clone();
2267
2268        let futures = plans.into_iter().map(|(server_id, config, auth)| {
2269            let handler_config = handler_config.clone();
2270            let namespace = namespace.clone();
2271            async move {
2272                let connection = Arc::new(
2273                    McpConnection::connect_with_auth(&config, auth.as_ref(), handler_config)
2274                        .await?,
2275                );
2276                let snapshot = connection.discover().await?;
2277                Ok::<(McpServerId, McpServerHandle), McpError>((
2278                    server_id,
2279                    McpServerHandle {
2280                        config,
2281                        connection,
2282                        snapshot,
2283                        namespace,
2284                    },
2285                ))
2286            }
2287        });
2288
2289        let results = try_join_all(futures).await?;
2290        let mut handles = Vec::with_capacity(results.len());
2291        let mut connected: Vec<(McpServerId, McpDiscoverySnapshot)> =
2292            Vec::with_capacity(results.len());
2293        for (server_id, handle) in results {
2294            connected.push((server_id.clone(), handle.snapshot.clone()));
2295            self.connections.insert(server_id, handle.clone());
2296            handles.push(handle);
2297        }
2298        for (server_id, snapshot) in &connected {
2299            self.register_server_tools(server_id, snapshot);
2300        }
2301        for (server_id, _) in connected {
2302            self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2303        }
2304        Ok(handles)
2305    }
2306
2307    /// Re-discovers capabilities for a connected server.
2308    pub async fn refresh_server(
2309        &mut self,
2310        server_id: &McpServerId,
2311    ) -> Result<McpDiscoverySnapshot, McpError> {
2312        let handle = self
2313            .connections
2314            .get_mut(server_id)
2315            .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2316        let previous = handle.snapshot.clone();
2317        let snapshot = match handle.connection.discover().await {
2318            Ok(snapshot) => snapshot,
2319            Err(error) => {
2320                self.emit_catalog_event(McpCatalogEvent::RefreshFailed {
2321                    server_id: server_id.clone(),
2322                    message: error.to_string(),
2323                });
2324                return Err(error);
2325            }
2326        };
2327        handle.snapshot = snapshot.clone();
2328        let events = diff_discovery_snapshots(server_id, &previous, &snapshot);
2329        if !events.is_empty() {
2330            self.apply_catalog_events(server_id, &snapshot, &events);
2331            for event in events {
2332                self.emit_catalog_event(event);
2333            }
2334        }
2335        Ok(snapshot)
2336    }
2337
2338    /// Processes pending server list-change notifications.
2339    pub async fn refresh_changed_catalogs(&mut self) -> Result<Vec<McpCatalogEvent>, McpError> {
2340        let server_ids = self.connections.keys().cloned().collect::<Vec<_>>();
2341        let mut emitted = Vec::new();
2342
2343        for server_id in server_ids {
2344            let Some(connection) = self
2345                .connections
2346                .get(&server_id)
2347                .map(McpServerHandle::connection)
2348            else {
2349                continue;
2350            };
2351            let notifications = connection.drain_notifications().await;
2352            if notifications.is_empty() {
2353                continue;
2354            }
2355
2356            let handle = self
2357                .connections
2358                .get_mut(&server_id)
2359                .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2360            let previous = handle.snapshot.clone();
2361            let snapshot = match handle.connection.discover().await {
2362                Ok(snapshot) => snapshot,
2363                Err(error) => {
2364                    let event = McpCatalogEvent::RefreshFailed {
2365                        server_id: server_id.clone(),
2366                        message: error.to_string(),
2367                    };
2368                    self.emit_catalog_event(event.clone());
2369                    emitted.push(event);
2370                    return Err(error);
2371                }
2372            };
2373            handle.snapshot = snapshot.clone();
2374            let events = diff_discovery_snapshots(&server_id, &previous, &snapshot);
2375            if !events.is_empty() {
2376                self.apply_catalog_events(&server_id, &snapshot, &events);
2377                for event in events {
2378                    self.emit_catalog_event(event.clone());
2379                    emitted.push(event);
2380                }
2381            }
2382        }
2383
2384        Ok(emitted)
2385    }
2386
2387    /// Disconnects a server and removes it from active connections.
2388    pub async fn disconnect_server(&mut self, server_id: &McpServerId) -> Result<(), McpError> {
2389        let Some(handle) = self.connections.remove(server_id) else {
2390            return Err(McpError::UnknownServer(server_id.to_string()));
2391        };
2392        handle.connection.close().await?;
2393        self.unregister_server_tools(server_id);
2394        self.emit_catalog_event(McpCatalogEvent::ServerDisconnected {
2395            server_id: server_id.clone(),
2396        });
2397        Ok(())
2398    }
2399
2400    /// Stores or clears authentication credentials for a server.
2401    pub async fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), McpError> {
2402        let server_id = resolution
2403            .request()
2404            .server_id()
2405            .ok_or_else(|| McpError::AuthResolution("auth resolution missing server id".into()))?;
2406        let server_id = McpServerId::new(server_id);
2407        match &resolution {
2408            AuthResolution::Provided { credentials, .. } => {
2409                self.auth.insert(server_id.clone(), credentials.clone());
2410            }
2411            AuthResolution::Cancelled { .. } => {
2412                self.auth.remove(&server_id);
2413            }
2414        }
2415
2416        if let Some(handle) = self.connections.get(&server_id) {
2417            handle.connection.resolve_auth(resolution).await?;
2418        } else if !self.configs.contains_key(&server_id) {
2419            return Err(McpError::UnknownServer(server_id.to_string()));
2420        }
2421        self.emit_catalog_event(McpCatalogEvent::AuthChanged { server_id });
2422        Ok(())
2423    }
2424
2425    /// Builds a one-shot snapshot [`ToolRegistry`] of every tool across all
2426    /// connected servers. Use [`source`](Self::source) instead when wiring
2427    /// the manager into an [`agentkit_loop::Agent`] so tool catalog changes
2428    /// flow through automatically.
2429    pub fn tool_registry(&self) -> ToolRegistry {
2430        self.connections
2431            .values()
2432            .fold(ToolRegistry::new(), |mut registry, handle| {
2433                for tool in handle.snapshot.tools.iter().cloned() {
2434                    registry.register(McpToolAdapter::with_namespace(
2435                        handle.server_id(),
2436                        handle.connection.clone(),
2437                        tool,
2438                        &self.namespace,
2439                    ));
2440                }
2441                registry
2442            })
2443    }
2444
2445    /// Returns the manager's federated [`CatalogReader`].
2446    ///
2447    /// The manager keeps an internal `CatalogWriter` in sync with every
2448    /// connect, disconnect, and catalog refresh; the returned reader sees
2449    /// the added/removed/changed tool sets via
2450    /// [`ToolSource::drain_catalog_events`]. Pass it to
2451    /// [`agentkit_loop::AgentBuilder::tools`] alongside any frozen native
2452    /// [`ToolRegistry`].
2453    ///
2454    /// Each call returns a fresh reader subscription — events emitted before
2455    /// this call are not replayed. Call once at agent setup time and reuse.
2456    pub fn source(&self) -> CatalogReader {
2457        self.catalog_writer.reader()
2458    }
2459
2460    /// Surgically updates the tool catalog from the diff events produced
2461    /// by [`diff_discovery_snapshots`]. Only [`McpCatalogEvent::ToolsChanged`]
2462    /// affects the catalog — resource and prompt diffs are observed by the
2463    /// caller via the broadcast stream and don't touch tool state.
2464    fn apply_catalog_events(
2465        &mut self,
2466        server_id: &McpServerId,
2467        snapshot: &McpDiscoverySnapshot,
2468        events: &[McpCatalogEvent],
2469    ) {
2470        for event in events {
2471            if let McpCatalogEvent::ToolsChanged {
2472                added,
2473                removed,
2474                changed,
2475                ..
2476            } = event
2477            {
2478                self.apply_server_tool_diff(server_id, snapshot, added, removed, changed);
2479            }
2480        }
2481    }
2482
2483    /// Registers every tool from a freshly-discovered snapshot, recording
2484    /// the agentkit-namespaced names so [`Self::unregister_server_tools`]
2485    /// can later remove exactly this set.
2486    fn register_server_tools(&mut self, server_id: &McpServerId, snapshot: &McpDiscoverySnapshot) {
2487        let connection = match self.connections.get(server_id) {
2488            Some(handle) => handle.connection.clone(),
2489            None => return,
2490        };
2491        let previous = self.server_tools.remove(server_id).unwrap_or_default();
2492        let mut names = BTreeSet::new();
2493        for tool in &snapshot.tools {
2494            let adapter = McpToolAdapter::with_namespace(
2495                server_id,
2496                connection.clone(),
2497                tool.clone(),
2498                &self.namespace,
2499            );
2500            names.insert(adapter.spec().name.clone());
2501            self.catalog_writer.upsert(Arc::new(adapter));
2502        }
2503        for stale in previous.difference(&names) {
2504            self.catalog_writer.remove(stale);
2505        }
2506        self.server_tools.insert(server_id.clone(), names);
2507    }
2508
2509    /// Removes every agentkit-namespaced tool previously registered for
2510    /// `server_id`. No-op if the server was never registered.
2511    fn unregister_server_tools(&mut self, server_id: &McpServerId) {
2512        let Some(names) = self.server_tools.remove(server_id) else {
2513            return;
2514        };
2515        for name in names {
2516            self.catalog_writer.remove(&name);
2517        }
2518    }
2519
2520    /// Applies a per-tool diff (in raw MCP names) against the current
2521    /// catalog: removes are pruned, adds and changes are upserted from the
2522    /// fresh snapshot. Updates the per-server name index accordingly.
2523    fn apply_server_tool_diff(
2524        &mut self,
2525        server_id: &McpServerId,
2526        snapshot: &McpDiscoverySnapshot,
2527        added: &[String],
2528        removed: &[String],
2529        changed: &[String],
2530    ) {
2531        let connection = match self.connections.get(server_id) {
2532            Some(handle) => handle.connection.clone(),
2533            None => return,
2534        };
2535        let names = self.server_tools.entry(server_id.clone()).or_default();
2536
2537        for raw_name in removed {
2538            let agentkit_name = ToolName::new(self.namespace.apply(server_id, raw_name));
2539            if names.remove(&agentkit_name) {
2540                self.catalog_writer.remove(&agentkit_name);
2541            }
2542        }
2543
2544        let upsert_one = |raw_name: &str| -> Option<(ToolName, McpToolAdapter)> {
2545            let tool = snapshot
2546                .tools
2547                .iter()
2548                .find(|tool| tool.name.as_ref() == raw_name)?
2549                .clone();
2550            let adapter = McpToolAdapter::with_namespace(
2551                server_id,
2552                connection.clone(),
2553                tool,
2554                &self.namespace,
2555            );
2556            Some((adapter.spec().name.clone(), adapter))
2557        };
2558
2559        for raw_name in added.iter().chain(changed.iter()) {
2560            if let Some((agentkit_name, adapter)) = upsert_one(raw_name) {
2561                names.insert(agentkit_name);
2562                self.catalog_writer.upsert(Arc::new(adapter));
2563            }
2564        }
2565    }
2566
2567    /// Builds a combined [`McpCapabilityProvider`] from all connected servers.
2568    pub fn capability_provider(&self) -> McpCapabilityProvider {
2569        McpCapabilityProvider::merge(
2570            self.connections
2571                .values()
2572                .map(McpServerHandle::capability_provider),
2573        )
2574    }
2575}
2576
2577fn diff_discovery_snapshots(
2578    server_id: &McpServerId,
2579    previous: &McpDiscoverySnapshot,
2580    current: &McpDiscoverySnapshot,
2581) -> Vec<McpCatalogEvent> {
2582    let mut events = Vec::new();
2583    let (added, removed, changed) = diff_named_items(
2584        previous.tools.iter().map(|item| (item.name.as_ref(), item)),
2585        current.tools.iter().map(|item| (item.name.as_ref(), item)),
2586    );
2587    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2588        events.push(McpCatalogEvent::ToolsChanged {
2589            server_id: server_id.clone(),
2590            added,
2591            removed,
2592            changed,
2593        });
2594    }
2595
2596    let (added, removed, changed) = diff_named_items(
2597        previous
2598            .resources
2599            .iter()
2600            .map(|item| (item.uri.as_str(), item)),
2601        current
2602            .resources
2603            .iter()
2604            .map(|item| (item.uri.as_str(), item)),
2605    );
2606    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2607        events.push(McpCatalogEvent::ResourcesChanged {
2608            server_id: server_id.clone(),
2609            added,
2610            removed,
2611            changed,
2612        });
2613    }
2614
2615    let (added, removed, changed) = diff_named_items(
2616        previous
2617            .prompts
2618            .iter()
2619            .map(|item| (item.name.as_str(), item)),
2620        current
2621            .prompts
2622            .iter()
2623            .map(|item| (item.name.as_str(), item)),
2624    );
2625    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2626        events.push(McpCatalogEvent::PromptsChanged {
2627            server_id: server_id.clone(),
2628            added,
2629            removed,
2630            changed,
2631        });
2632    }
2633
2634    events
2635}
2636
2637/// Merge-walks two name-keyed sequences and produces added/removed/changed
2638/// name lists. Each side is sorted in place; no intermediate maps are
2639/// allocated. Names are cloned only at output time.
2640fn diff_named_items<'a, T>(
2641    previous: impl IntoIterator<Item = (&'a str, &'a T)>,
2642    current: impl IntoIterator<Item = (&'a str, &'a T)>,
2643) -> (Vec<String>, Vec<String>, Vec<String>)
2644where
2645    T: PartialEq + 'a,
2646{
2647    let mut prev: Vec<(&str, &T)> = previous.into_iter().collect();
2648    let mut curr: Vec<(&str, &T)> = current.into_iter().collect();
2649    prev.sort_unstable_by_key(|(name, _)| *name);
2650    curr.sort_unstable_by_key(|(name, _)| *name);
2651
2652    let mut added = Vec::new();
2653    let mut removed = Vec::new();
2654    let mut changed = Vec::new();
2655    let (mut i, mut j) = (0, 0);
2656    while i < prev.len() && j < curr.len() {
2657        match prev[i].0.cmp(curr[j].0) {
2658            std::cmp::Ordering::Less => {
2659                removed.push(prev[i].0.to_string());
2660                i += 1;
2661            }
2662            std::cmp::Ordering::Greater => {
2663                added.push(curr[j].0.to_string());
2664                j += 1;
2665            }
2666            std::cmp::Ordering::Equal => {
2667                if prev[i].1 != curr[j].1 {
2668                    changed.push(curr[j].0.to_string());
2669                }
2670                i += 1;
2671                j += 1;
2672            }
2673        }
2674    }
2675    while i < prev.len() {
2676        removed.push(prev[i].0.to_string());
2677        i += 1;
2678    }
2679    while j < curr.len() {
2680        added.push(curr[j].0.to_string());
2681        j += 1;
2682    }
2683
2684    (added, removed, changed)
2685}
2686
2687/// Adapter exposing an MCP tool as an agentkit [`Tool`].
2688pub struct McpToolAdapter {
2689    tool_name: String,
2690    connection: Arc<McpConnection>,
2691    spec: ToolSpec,
2692}
2693
2694impl McpToolAdapter {
2695    /// Creates a new tool adapter for the given MCP tool, using the
2696    /// [`McpToolNamespace::Default`] naming strategy.
2697    pub fn new(server_id: &McpServerId, connection: Arc<McpConnection>, tool: McpTool) -> Self {
2698        Self::with_namespace(server_id, connection, tool, &McpToolNamespace::Default)
2699    }
2700
2701    /// Creates a new tool adapter with a custom name-namespacing strategy.
2702    pub fn with_namespace(
2703        server_id: &McpServerId,
2704        connection: Arc<McpConnection>,
2705        tool: McpTool,
2706        namespace: &McpToolNamespace,
2707    ) -> Self {
2708        let spec = tool_spec_from_tool(server_id, &tool, namespace);
2709        Self {
2710            tool_name: tool.name.into_owned(),
2711            connection,
2712            spec,
2713        }
2714    }
2715
2716    async fn handle_invocation_error(
2717        &self,
2718        err: McpInvocationError,
2719        input: &Value,
2720    ) -> Result<CallToolResult, ToolError> {
2721        let Some(responder) = self.connection.handler_config().error_responder.clone() else {
2722            return Err(ToolError::ExecutionFailed(err.to_string()));
2723        };
2724        let method = McpMethod::ToolsCall {
2725            name: self.tool_name.clone(),
2726            arguments: input.clone(),
2727        };
2728        let ctx = McpErrorContext {
2729            server_id: self.connection.server_id(),
2730            method: &method,
2731            input: Some(input),
2732        };
2733        match responder.handle(&err, ctx).await {
2734            ErrorResponderOutcome::SynthesizeResult(result) => Ok(result),
2735            ErrorResponderOutcome::PassThrough => Err(ToolError::ExecutionFailed(err.to_string())),
2736        }
2737    }
2738}
2739
2740#[async_trait]
2741impl Tool for McpToolAdapter {
2742    fn spec(&self) -> &ToolSpec {
2743        &self.spec
2744    }
2745
2746    async fn invoke(
2747        &self,
2748        request: ToolRequest,
2749        _ctx: &mut ToolContext<'_>,
2750    ) -> Result<ToolResult, ToolError> {
2751        let input = request.input;
2752        let result = match self
2753            .connection
2754            .call_tool(&self.tool_name, input.clone())
2755            .await
2756        {
2757            Ok(result) => result,
2758            Err(McpError::AuthRequired(auth_request)) => {
2759                let responder = self
2760                    .connection
2761                    .handler_config()
2762                    .auth
2763                    .clone()
2764                    .ok_or_else(|| {
2765                        ToolError::ExecutionFailed(
2766                            "MCP server requires auth but no McpAuthResponder is registered".into(),
2767                        )
2768                    })?;
2769                let resolution = responder.resolve(*auth_request).await.map_err(|error| {
2770                    ToolError::ExecutionFailed(format!("auth responder failed: {error}"))
2771                })?;
2772                match &resolution {
2773                    AuthResolution::Provided { .. } => {
2774                        self.connection
2775                            .resolve_auth(resolution.clone())
2776                            .await
2777                            .map_err(|error| {
2778                                ToolError::ExecutionFailed(format!(
2779                                    "applying auth resolution failed: {error}"
2780                                ))
2781                            })?;
2782                    }
2783                    AuthResolution::Cancelled { .. } => {
2784                        return Err(ToolError::ExecutionFailed(
2785                            "user cancelled MCP auth flow".into(),
2786                        ));
2787                    }
2788                }
2789                match self
2790                    .connection
2791                    .call_tool(&self.tool_name, input.clone())
2792                    .await
2793                {
2794                    Ok(result) => result,
2795                    Err(McpError::AuthRequired(req)) => {
2796                        return Err(ToolError::ExecutionFailed(format!(
2797                            "MCP auth challenge unresolved after retry: {}",
2798                            req.id
2799                        )));
2800                    }
2801                    Err(McpError::Invocation(err)) => {
2802                        self.handle_invocation_error(err, &input).await?
2803                    }
2804                    Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
2805                }
2806            }
2807            Err(McpError::Invocation(err)) => self.handle_invocation_error(err, &input).await?,
2808            Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
2809        };
2810
2811        let is_error = result.is_error.unwrap_or(false);
2812        Ok(ToolResult {
2813            result: ToolResultPart {
2814                call_id: request.call_id,
2815                output: call_tool_result_to_tool_output(result),
2816                is_error,
2817                metadata: MetadataMap::new(),
2818            },
2819            duration: None,
2820            metadata: MetadataMap::new(),
2821        })
2822    }
2823}
2824
2825fn rmcp_server_capabilities_to_agentkit(
2826    capabilities: &rmcp_model::ServerCapabilities,
2827) -> McpServerCapabilities {
2828    McpServerCapabilities {
2829        tools: capabilities.tools.as_ref().map(|tools| ToolsCapability {
2830            list_changed: tools.list_changed,
2831        }),
2832        resources: capabilities
2833            .resources
2834            .as_ref()
2835            .map(|resources| ResourcesCapability {
2836                subscribe: resources.subscribe,
2837                list_changed: resources.list_changed,
2838            }),
2839        prompts: capabilities
2840            .prompts
2841            .as_ref()
2842            .map(|prompts| PromptsCapability {
2843                list_changed: prompts.list_changed,
2844            }),
2845        logging: capabilities.logging.as_ref().map(|_| LoggingCapability {}),
2846    }
2847}
2848
2849fn tool_spec_from_tool(
2850    server_id: &McpServerId,
2851    tool: &McpTool,
2852    namespace: &McpToolNamespace,
2853) -> ToolSpec {
2854    ToolSpec {
2855        name: ToolName::new(namespace.apply(server_id, &tool.name)),
2856        description: tool
2857            .description
2858            .as_ref()
2859            .map(|d| d.to_string())
2860            .unwrap_or_else(|| tool.name.to_string()),
2861        input_schema: Value::Object((*tool.input_schema).clone()),
2862        annotations: tool_annotations_from_rmcp(tool.annotations.as_ref()),
2863        metadata: MetadataMap::new(),
2864    }
2865}
2866
2867fn tool_annotations_from_rmcp(annotations: Option<&McpToolAnnotations>) -> ToolAnnotations {
2868    let Some(annotations) = annotations else {
2869        return ToolAnnotations::default();
2870    };
2871    // rmcp expresses each hint as `Option<bool>` (advisory; absent means
2872    // unspecified). agentkit collapses absent → false. Tools that need to
2873    // distinguish "absent" from "false" should inspect the underlying
2874    // `McpTool::annotations` directly via the snapshot. MCP has no
2875    // `needs_approval` hint, so leave it unset and let the loop's permission
2876    // policy drive approval.
2877    ToolAnnotations {
2878        read_only_hint: annotations.read_only_hint.unwrap_or(false),
2879        destructive_hint: annotations.destructive_hint.unwrap_or(false),
2880        idempotent_hint: annotations.idempotent_hint.unwrap_or(false),
2881        needs_approval_hint: false,
2882        supports_streaming_hint: false,
2883    }
2884}
2885
2886fn resource_descriptor_from_rmcp(resource: McpResource) -> ResourceDescriptor {
2887    let raw = resource.raw;
2888    ResourceDescriptor {
2889        id: ResourceId::new(raw.uri),
2890        name: raw.name,
2891        description: raw.description,
2892        mime_type: raw.mime_type,
2893        metadata: MetadataMap::new(),
2894    }
2895}
2896
2897fn prompt_descriptor_from_rmcp(prompt: McpPrompt) -> PromptDescriptor {
2898    let arguments = prompt.arguments.unwrap_or_default();
2899    let mut required = Vec::new();
2900    let properties = arguments
2901        .into_iter()
2902        .map(|argument| {
2903            let mut schema = serde_json::Map::new();
2904            schema.insert("type".into(), Value::String("string".into()));
2905            if let Some(description) = argument.description {
2906                schema.insert("description".into(), Value::String(description));
2907            }
2908            if argument.required.unwrap_or(false) {
2909                required.push(Value::String(argument.name.clone()));
2910            }
2911            (argument.name, Value::Object(schema))
2912        })
2913        .collect::<serde_json::Map<String, Value>>();
2914    let mut input_schema = serde_json::Map::new();
2915    input_schema.insert("type".into(), Value::String("object".into()));
2916    input_schema.insert("properties".into(), Value::Object(properties));
2917    if !required.is_empty() {
2918        input_schema.insert("required".into(), Value::Array(required));
2919    }
2920
2921    PromptDescriptor {
2922        id: PromptId::new(prompt.name.clone()),
2923        name: prompt.name,
2924        description: prompt.description,
2925        input_schema: Value::Object(input_schema),
2926        metadata: MetadataMap::new(),
2927    }
2928}
2929
2930fn read_resource_result_to_capabilities(
2931    result: ReadResourceResult,
2932) -> Result<ResourceContents, McpError> {
2933    let content = result
2934        .contents
2935        .into_iter()
2936        .next()
2937        .ok_or_else(|| McpError::Protocol("resources/read returned no contents".into()))?;
2938    Ok(resource_contents_to_capabilities(content))
2939}
2940
2941fn resource_contents_to_capabilities(content: McpResourceContents) -> ResourceContents {
2942    let mut metadata = MetadataMap::new();
2943    let data = match content {
2944        McpResourceContents::TextResourceContents {
2945            text, mime_type, ..
2946        } => {
2947            if let Some(mime) = mime_type {
2948                metadata.insert("mime_type".into(), Value::String(mime));
2949            }
2950            DataRef::InlineText(text)
2951        }
2952        McpResourceContents::BlobResourceContents {
2953            blob,
2954            mime_type,
2955            uri,
2956            ..
2957        } => {
2958            if let Some(mime) = mime_type {
2959                metadata.insert("mime_type".into(), Value::String(mime));
2960            }
2961            metadata.insert("uri".into(), Value::String(uri));
2962            // rmcp delivers blobs as base64-encoded text on the wire.
2963            DataRef::InlineText(blob)
2964        }
2965    };
2966    ResourceContents { data, metadata }
2967}
2968
2969fn get_prompt_result_to_capabilities(result: GetPromptResult) -> PromptContents {
2970    let items = result
2971        .messages
2972        .into_iter()
2973        .map(prompt_message_to_item)
2974        .collect();
2975    let mut metadata = MetadataMap::new();
2976    if let Some(description) = result.description {
2977        metadata.insert("description".into(), Value::String(description));
2978    }
2979    PromptContents { items, metadata }
2980}
2981
2982fn prompt_message_to_item(message: PromptMessage) -> Item {
2983    let kind = match message.role {
2984        PromptMessageRole::Assistant => ItemKind::Assistant,
2985        PromptMessageRole::User => ItemKind::User,
2986    };
2987    Item {
2988        id: None,
2989        kind,
2990        parts: vec![prompt_message_content_to_part(message.content)],
2991        metadata: MetadataMap::new(),
2992        usage: None,
2993        finish_reason: None,
2994        created_at: None,
2995    }
2996}
2997
2998fn prompt_message_content_to_part(content: PromptMessageContent) -> Part {
2999    match content {
3000        PromptMessageContent::Text { text } => Part::Text(TextPart::new(text)),
3001        PromptMessageContent::Image { image } => Part::Media(MediaPart::new(
3002            Modality::Image,
3003            image.mime_type.clone(),
3004            DataRef::InlineText(image.data.clone()),
3005        )),
3006        PromptMessageContent::Resource { resource } => {
3007            let agentkit_resource = resource_contents_to_capabilities(resource.resource.clone());
3008            agentkit_part_from_resource(agentkit_resource)
3009        }
3010        PromptMessageContent::ResourceLink { link } => Part::Text(TextPart::new(link.uri.clone())),
3011    }
3012}
3013
3014fn agentkit_part_from_resource(resource: ResourceContents) -> Part {
3015    let mime = resource
3016        .metadata
3017        .get("mime_type")
3018        .and_then(Value::as_str)
3019        .unwrap_or("text/plain")
3020        .to_string();
3021    Part::Media(MediaPart::new(Modality::Binary, mime, resource.data))
3022}
3023
3024fn call_tool_result_to_tool_output(result: CallToolResult) -> ToolOutput {
3025    if let Some(structured) = result.structured_content {
3026        return ToolOutput::Structured(structured);
3027    }
3028    let parts = call_tool_content_to_parts(result.content);
3029    if parts.iter().all(|part| matches!(part, Part::Text(_))) {
3030        let text = parts
3031            .iter()
3032            .filter_map(|part| match part {
3033                Part::Text(text) => Some(text.text.clone()),
3034                _ => None,
3035            })
3036            .collect::<Vec<_>>()
3037            .join("\n");
3038        ToolOutput::Text(text)
3039    } else {
3040        ToolOutput::Parts(parts)
3041    }
3042}
3043
3044fn call_tool_content_to_parts(contents: Vec<Content>) -> Vec<Part> {
3045    contents.into_iter().map(content_to_part).collect()
3046}
3047
3048fn content_to_part(content: Content) -> Part {
3049    match content.raw {
3050        RawContent::Text(text) => Part::Text(TextPart::new(text.text)),
3051        RawContent::Image(image) => Part::Media(MediaPart::new(
3052            Modality::Image,
3053            image.mime_type,
3054            DataRef::InlineText(image.data),
3055        )),
3056        RawContent::Audio(audio) => Part::Media(MediaPart::new(
3057            Modality::Audio,
3058            audio.mime_type,
3059            DataRef::InlineText(audio.data),
3060        )),
3061        RawContent::Resource(embedded) => {
3062            agentkit_part_from_resource(resource_contents_to_capabilities(embedded.resource))
3063        }
3064        RawContent::ResourceLink(link) => Part::Text(TextPart::new(link.uri)),
3065    }
3066}
3067
3068fn value_to_json_object(value: Value, context: &str) -> Result<rmcp_model::JsonObject, McpError> {
3069    match value {
3070        Value::Object(object) => Ok(object),
3071        Value::Null => Ok(serde_json::Map::new()),
3072        other => Err(McpError::Protocol(format!(
3073            "{context} must be a JSON object, got {other}"
3074        ))),
3075    }
3076}
3077
3078fn bearer_token_from_metadata(metadata: &MetadataMap) -> Option<String> {
3079    ["bearer_token", "access_token", "token", "api_key"]
3080        .into_iter()
3081        .find_map(|key| metadata.get(key).and_then(Value::as_str).map(str::to_owned))
3082}
3083
3084fn rmcp_initialize_error(config: &McpServerConfig, error: ClientInitializeError) -> McpError {
3085    if let Some(signal) = match &error {
3086        ClientInitializeError::TransportError { error: dyn_err, .. } => {
3087            transport_auth_signal(dyn_err)
3088        }
3089        _ => None,
3090    } {
3091        return McpError::AuthRequired(Box::new(auth_request_from_signal(
3092            &config.id,
3093            McpMethod::Initialize,
3094            signal,
3095            &error.to_string(),
3096        )));
3097    }
3098    McpError::Transport(error.to_string())
3099}
3100
3101fn rmcp_service_error(error: ServiceError) -> McpError {
3102    service_error_to_mcp_error(error)
3103}
3104
3105fn rmcp_operation_error(
3106    server_id: &McpServerId,
3107    method: McpMethod,
3108    error: ServiceError,
3109) -> McpError {
3110    if let Some(signal) = service_auth_signal(&error) {
3111        return McpError::AuthRequired(Box::new(auth_request_from_signal(
3112            server_id,
3113            method,
3114            signal,
3115            &error.to_string(),
3116        )));
3117    }
3118    service_error_to_mcp_error(error)
3119}
3120
3121fn service_error_to_mcp_error(error: ServiceError) -> McpError {
3122    match error {
3123        ServiceError::McpError(data) => {
3124            McpError::Invocation(McpInvocationError::from_error_data(data))
3125        }
3126        other => McpError::Transport(other.to_string()),
3127    }
3128}
3129
3130#[derive(Debug)]
3131enum AuthSignal {
3132    Required {
3133        www_authenticate: Option<String>,
3134    },
3135    InsufficientScope {
3136        www_authenticate: Option<String>,
3137        required_scope: Option<String>,
3138    },
3139}
3140
3141fn service_auth_signal(error: &ServiceError) -> Option<AuthSignal> {
3142    match error {
3143        ServiceError::TransportSend(dyn_err) => transport_auth_signal(dyn_err),
3144        _ => None,
3145    }
3146}
3147
3148fn transport_auth_signal(error: &DynamicTransportError) -> Option<AuthSignal> {
3149    let inner = error
3150        .error
3151        .downcast_ref::<StreamableHttpError<reqwest::Error>>()?;
3152    match inner {
3153        StreamableHttpError::AuthRequired(AuthRequiredError {
3154            www_authenticate_header,
3155            ..
3156        }) => Some(AuthSignal::Required {
3157            www_authenticate: Some(www_authenticate_header.clone()),
3158        }),
3159        StreamableHttpError::InsufficientScope(InsufficientScopeError {
3160            www_authenticate_header,
3161            required_scope,
3162            ..
3163        }) => Some(AuthSignal::InsufficientScope {
3164            www_authenticate: Some(www_authenticate_header.clone()),
3165            required_scope: required_scope.clone(),
3166        }),
3167        _ => None,
3168    }
3169}
3170
3171fn auth_request_from_signal(
3172    server_id: &McpServerId,
3173    method: McpMethod,
3174    signal: AuthSignal,
3175    message: &str,
3176) -> AuthRequest {
3177    let method_name = method.method_name();
3178    let mut challenge = MetadataMap::new();
3179    challenge.insert("server_id".into(), Value::String(server_id.to_string()));
3180    challenge.insert("method".into(), Value::String(method_name.into()));
3181    challenge.insert("message".into(), Value::String(message.into()));
3182    challenge.insert("flow_kind".into(), Value::String("http_bearer".into()));
3183    match signal {
3184        AuthSignal::Required { www_authenticate } => {
3185            if let Some(header) = www_authenticate {
3186                challenge.insert("www_authenticate".into(), Value::String(header));
3187            }
3188        }
3189        AuthSignal::InsufficientScope {
3190            www_authenticate,
3191            required_scope,
3192        } => {
3193            challenge.insert("insufficient_scope".into(), Value::Bool(true));
3194            if let Some(header) = www_authenticate {
3195                challenge.insert("www_authenticate".into(), Value::String(header));
3196            }
3197            if let Some(scope) = required_scope {
3198                challenge.insert("required_scope".into(), Value::String(scope));
3199            }
3200        }
3201    }
3202    AuthRequest {
3203        id: format!("mcp:{}:{}", server_id, method_name),
3204        provider: format!("mcp.{}", server_id),
3205        operation: method.into_auth_operation(server_id),
3206        challenge,
3207    }
3208}
3209
3210/// Typed dispatch for MCP requests that may surface auth or invocation
3211/// errors. Each peer call constructs the matching variant;
3212/// [`auth_request_from_signal`] converts to a public [`AuthOperation`]
3213/// (typed for the four common cases, [`AuthOperation::McpOther`] for the
3214/// long tail). The same value is also exposed to [`McpErrorResponder`]
3215/// implementations via [`McpErrorContext::method`].
3216#[derive(Debug, Clone)]
3217pub enum McpMethod {
3218    /// `initialize` — the MCP handshake.
3219    Initialize,
3220    /// `tools/call`.
3221    ToolsCall {
3222        /// The raw MCP tool name (no agentkit namespacing).
3223        name: String,
3224        /// The arguments object as sent to the server.
3225        arguments: Value,
3226    },
3227    /// `resources/read`.
3228    ResourcesRead {
3229        /// Resource URI being read.
3230        uri: String,
3231    },
3232    /// `resources/subscribe`.
3233    ResourcesSubscribe {
3234        /// Resource URI being subscribed to.
3235        uri: String,
3236    },
3237    /// `resources/unsubscribe`.
3238    ResourcesUnsubscribe {
3239        /// Resource URI being unsubscribed from.
3240        uri: String,
3241    },
3242    /// `prompts/get`.
3243    PromptsGet {
3244        /// The raw MCP prompt name.
3245        name: String,
3246        /// Arguments forwarded to the prompt.
3247        arguments: Value,
3248    },
3249    /// `logging/setLevel`.
3250    LoggingSetLevel {
3251        /// Negotiated minimum log severity, formatted for diagnostics.
3252        level: String,
3253    },
3254}
3255
3256impl McpMethod {
3257    /// Returns the JSON-RPC method name (e.g. `"tools/call"`).
3258    pub fn method_name(&self) -> &'static str {
3259        match self {
3260            Self::Initialize => "initialize",
3261            Self::ToolsCall { .. } => "tools/call",
3262            Self::ResourcesRead { .. } => "resources/read",
3263            Self::ResourcesSubscribe { .. } => "resources/subscribe",
3264            Self::ResourcesUnsubscribe { .. } => "resources/unsubscribe",
3265            Self::PromptsGet { .. } => "prompts/get",
3266            Self::LoggingSetLevel { .. } => "logging/setLevel",
3267        }
3268    }
3269
3270    fn into_auth_operation(self, server_id: &McpServerId) -> AuthOperation {
3271        let server = server_id.to_string();
3272        match self {
3273            Self::Initialize => AuthOperation::McpConnect {
3274                server_id: server,
3275                metadata: MetadataMap::new(),
3276            },
3277            Self::ToolsCall { name, arguments } => AuthOperation::McpToolCall {
3278                server_id: server,
3279                tool_name: name,
3280                input: arguments,
3281                metadata: MetadataMap::new(),
3282            },
3283            Self::ResourcesRead { uri } => AuthOperation::McpResourceRead {
3284                server_id: server,
3285                resource_id: uri,
3286                metadata: MetadataMap::new(),
3287            },
3288            Self::PromptsGet { name, arguments } => AuthOperation::McpPromptGet {
3289                server_id: server,
3290                prompt_id: name,
3291                args: arguments,
3292                metadata: MetadataMap::new(),
3293            },
3294            other @ (Self::ResourcesSubscribe { .. }
3295            | Self::ResourcesUnsubscribe { .. }
3296            | Self::LoggingSetLevel { .. }) => {
3297                let method = other.method_name().to_string();
3298                AuthOperation::McpOther {
3299                    server_id: server,
3300                    method,
3301                    params: other.into_params_json(),
3302                    metadata: MetadataMap::new(),
3303                }
3304            }
3305        }
3306    }
3307
3308    fn into_params_json(self) -> Value {
3309        match self {
3310            Self::Initialize => json!({}),
3311            Self::ToolsCall { name, arguments } => json!({ "name": name, "arguments": arguments }),
3312            Self::ResourcesRead { uri } => json!({ "uri": uri }),
3313            Self::ResourcesSubscribe { uri } => json!({ "uri": uri }),
3314            Self::ResourcesUnsubscribe { uri } => json!({ "uri": uri }),
3315            Self::PromptsGet { name, arguments } => {
3316                json!({ "name": name, "arguments": arguments })
3317            }
3318            Self::LoggingSetLevel { level } => json!({ "level": level }),
3319        }
3320    }
3321}
3322
3323/// Errors produced by MCP transport, protocol, and lifecycle operations.
3324#[derive(Debug, Error)]
3325pub enum McpError {
3326    /// An underlying I/O error.
3327    #[error("io error: {0}")]
3328    Io(#[from] std::io::Error),
3329    /// A JSON serialization or deserialization error.
3330    #[error("serialization error: {0}")]
3331    Serialize(#[from] serde_json::Error),
3332    /// A transport-level error.
3333    #[error("transport error: {0}")]
3334    Transport(String),
3335    /// An MCP protocol violation.
3336    #[error("protocol error: {0}")]
3337    Protocol(String),
3338    /// The server requires authentication before the operation can proceed.
3339    #[error("MCP auth required: {0:?}")]
3340    AuthRequired(Box<AuthRequest>),
3341    /// An error occurred while resolving or replaying authentication.
3342    #[error("auth resolution error: {0}")]
3343    AuthResolution(String),
3344    /// The MCP server returned a JSON-RPC error for the invoked method.
3345    #[error("invocation error: {0}")]
3346    Invocation(McpInvocationError),
3347    /// The referenced server ID is not registered in the [`McpServerManager`].
3348    #[error("unknown MCP server: {0}")]
3349    UnknownServer(String),
3350}
3351
3352impl From<&str> for McpServerId {
3353    fn from(value: &str) -> Self {
3354        Self::new(value)
3355    }
3356}
3357
3358impl From<String> for McpServerId {
3359    fn from(value: String) -> Self {
3360        Self::new(value)
3361    }
3362}