Skip to main content

agentkit_mcp/
lib.rs

1//! Model Context Protocol integration for agentkit, built on top of [`rmcp`].
2//!
3//! This crate exposes:
4//!
5//! - [`McpServerConfig`] / [`McpTransportBinding`] / [`StdioTransportConfig`] /
6//!   [`StreamableHttpTransportConfig`] — declarative transport configuration.
7//! - [`McpConnection`] — a live, single-server connection wrapping
8//!   [`rmcp::service::RunningService`].
9//! - [`McpServerManager`] — multi-server lifecycle, discovery, catalog diffing,
10//!   and auth replay.
11//! - [`McpServerHandle`], [`McpToolExecutor`], [`McpToolAdapter`],
12//!   [`McpResourceHandle`], [`McpPromptHandle`],
13//!   [`McpCapabilityProvider`] — bridges into the agentkit `Tool` / capabilities
14//!   systems.
15//!
16//! Wire-protocol types (`CallToolResult`, `ReadResourceResult`, `Content`,
17//! `ToolAnnotations`, `Prompt`, sampling/elicitation/roots payloads, …) are
18//! re-exported from [`rmcp::model`] directly — there is no parallel
19//! agentkit-side vocabulary. As `rmcp` tracks new MCP spec revisions, those
20//! types and their fields propagate into agentkit unchanged.
21
22use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::fmt;
24use std::sync::{Arc, RwLock};
25use std::time::Duration;
26
27use agentkit_capabilities::{
28    CapabilityContext, CapabilityError, CapabilityProvider, Invocable, PromptContents,
29    PromptDescriptor, PromptId, PromptProvider, ResourceContents, ResourceDescriptor, ResourceId,
30    ResourceProvider,
31};
32use agentkit_core::{
33    DataRef, Item, ItemKind, MediaPart, MetadataMap, Modality, Part, TextPart, ToolOutput,
34    ToolResultPart,
35};
36use agentkit_tools_core::{
37    AllowAllPermissions, CatalogReader, CatalogWriter, PermissionChecker, Tool, ToolAnnotations,
38    ToolCapabilityProvider, ToolContext, ToolError, ToolName, ToolRegistry, ToolRequest,
39    ToolResult, ToolSpec, dynamic_catalog,
40};
41use async_trait::async_trait;
42use futures_util::future::{join_all, try_join_all};
43use futures_util::stream::BoxStream;
44use http::{HeaderName, HeaderValue};
45use rmcp::ServiceExt;
46use rmcp::handler::client::ClientHandler;
47use rmcp::model as rmcp_model;
48use rmcp::service::{ClientInitializeError, Peer, RoleClient, RunningService, ServiceError};
49use rmcp::transport::streamable_http_client::{
50    AuthRequiredError, InsufficientScopeError, StreamableHttpClient as RmcpStreamableHttpClient,
51    StreamableHttpClientTransportConfig as RmcpStreamableHttpClientTransportConfig,
52    StreamableHttpError, StreamableHttpPostResponse,
53};
54use rmcp::transport::{
55    ConfigureCommandExt, DynamicTransportError, StreamableHttpClientTransport, TokioChildProcess,
56};
57use serde::{Deserialize, Serialize};
58use serde_json::{Value, json};
59use sse_stream::{Error as SseError, Sse};
60use thiserror::Error;
61use tokio::sync::{Mutex, broadcast, mpsc};
62
63/// Re-exports of the rmcp wire-protocol types this crate now surfaces directly
64/// instead of wrapping. Pull these in to pattern-match on tool annotations,
65/// content blocks, structured tool output, embedded resources, sampling /
66/// elicitation requests, progress and log notifications, etc.
67pub use rmcp::model::{
68    Annotations as McpAnnotations, AudioContent, CallToolResult,
69    CancelledNotificationParam as McpCancelledNotificationParam,
70    ClientCapabilities as McpClientCapabilities, Content,
71    CreateElicitationRequestParams as McpCreateElicitationRequestParams,
72    CreateElicitationResult as McpCreateElicitationResult,
73    CreateMessageRequestParams as McpCreateMessageRequestParams,
74    CreateMessageResult as McpCreateMessageResult, ElicitationAction as McpElicitationAction,
75    ElicitationCapability as McpElicitationCapability, EmbeddedResource,
76    FormElicitationCapability as McpFormElicitationCapability, GetPromptResult, ImageContent,
77    Implementation as McpImplementation, ListRootsResult as McpListRootsResult,
78    LoggingLevel as McpLoggingLevel,
79    LoggingMessageNotificationParam as McpLoggingMessageNotificationParam,
80    ProgressNotificationParam as McpProgressNotificationParam, Prompt as McpPrompt, PromptArgument,
81    PromptMessage, PromptMessageContent, PromptMessageRole, RawAudioContent, RawContent,
82    RawEmbeddedResource, RawImageContent, RawResource as McpRawResource, RawTextContent,
83    ReadResourceResult, Resource as McpResource, ResourceContents as McpResourceContents,
84    ResourceUpdatedNotificationParam as McpResourceUpdatedNotificationParam, Root as McpRoot,
85    RootsCapabilities as McpRootsCapabilities, SamplingCapability as McpSamplingCapability,
86    SamplingMessage as McpSamplingMessage, SetLevelRequestParams as McpSetLevelRequestParams,
87    TextContent, Tool as McpTool, ToolAnnotations as McpToolAnnotations,
88    UrlElicitationCapability as McpUrlElicitationCapability,
89};
90
91/// Re-export of the JSON-RPC client→server envelope handed to
92/// [`McpHttpClient::post_message`].
93pub use rmcp::model::ClientJsonRpcMessage;
94
95/// Re-exports of the rmcp Streamable HTTP transport types used by
96/// [`McpHttpClient`] implementations.
97pub use rmcp::transport::streamable_http_client::{
98    StreamableHttpError as McpStreamableHttpError,
99    StreamableHttpPostResponse as McpStreamableHttpPostResponse,
100};
101
102/// Re-export of the SSE event/error types referenced by [`McpHttpClient::get_stream`].
103pub use sse_stream::{Error as McpSseError, Sse as McpSse};
104
105/// Alias for [`McpTool`].
106pub type McpToolDescriptor = McpTool;
107/// Alias for [`McpResource`].
108pub type McpResourceDescriptor = McpResource;
109/// Alias for [`McpPrompt`].
110pub type McpPromptDescriptor = McpPrompt;
111
112/// An auth challenge raised by an MCP server during a tool call, resource
113/// read, prompt fetch, or connection handshake.
114///
115/// Hosts handle these via an [`McpAuthResponder`] registered on
116/// [`McpHandlerConfig::with_auth_responder`]. The responder is invoked
117/// inline by [`McpToolAdapter::invoke`] (and equivalent paths in
118/// [`McpServerManager`]) — auth never crosses the executor boundary as a
119/// loop interrupt.
120#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
121pub struct AuthRequest {
122    /// Unique identifier for this auth challenge.
123    pub id: String,
124    /// Name of the authentication provider (e.g. `"github"`, `"google"`).
125    pub provider: String,
126    /// The MCP operation that triggered the auth requirement.
127    pub operation: AuthOperation,
128    /// Provider-specific challenge data (e.g. OAuth URLs, scopes).
129    pub challenge: MetadataMap,
130}
131
132impl AuthRequest {
133    /// Convenience: returns the MCP server id this challenge targets, if any.
134    pub fn server_id(&self) -> Option<&str> {
135        self.operation.server_id()
136    }
137}
138
139/// The MCP operation that triggered an [`AuthRequest`].
140#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
141pub enum AuthOperation {
142    /// Connecting to an MCP server.
143    McpConnect {
144        server_id: String,
145        metadata: MetadataMap,
146    },
147    /// Invoking a tool on an MCP server.
148    McpToolCall {
149        server_id: String,
150        tool_name: String,
151        input: Value,
152        metadata: MetadataMap,
153    },
154    /// Reading a resource from an MCP server.
155    McpResourceRead {
156        server_id: String,
157        resource_id: String,
158        metadata: MetadataMap,
159    },
160    /// Fetching a prompt from an MCP server.
161    McpPromptGet {
162        server_id: String,
163        prompt_id: String,
164        args: Value,
165        metadata: MetadataMap,
166    },
167    /// Any other MCP method that requires auth (resource subscribe/unsubscribe,
168    /// logging level changes, future protocol additions). The typed variants
169    /// above cover the common cases; this catch-all preserves the method name
170    /// and JSON params verbatim for hosts that need to render or log them.
171    McpOther {
172        server_id: String,
173        method: String,
174        params: Value,
175        metadata: MetadataMap,
176    },
177}
178
179impl AuthOperation {
180    /// Returns the MCP server ID this operation targets.
181    pub fn server_id(&self) -> Option<&str> {
182        match self {
183            Self::McpConnect { server_id, .. }
184            | Self::McpToolCall { server_id, .. }
185            | Self::McpResourceRead { server_id, .. }
186            | Self::McpPromptGet { server_id, .. }
187            | Self::McpOther { server_id, .. } => Some(server_id.as_str()),
188        }
189    }
190}
191
192/// Outcome of an [`AuthRequest`] after the host's [`McpAuthResponder`] runs.
193#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
194pub enum AuthResolution {
195    /// The host obtained credentials.
196    Provided {
197        request: AuthRequest,
198        credentials: MetadataMap,
199    },
200    /// The host cancelled the auth flow.
201    Cancelled { request: AuthRequest },
202}
203
204impl AuthResolution {
205    /// Builds a successful auth resolution.
206    pub fn provided(request: AuthRequest, credentials: MetadataMap) -> Self {
207        Self::Provided {
208            request,
209            credentials,
210        }
211    }
212
213    /// Builds a cancelled auth resolution.
214    pub fn cancelled(request: AuthRequest) -> Self {
215        Self::Cancelled { request }
216    }
217
218    /// Returns the underlying [`AuthRequest`] regardless of variant.
219    pub fn request(&self) -> &AuthRequest {
220        match self {
221            Self::Provided { request, .. } | Self::Cancelled { request } => request,
222        }
223    }
224}
225
226/// Host-supplied resolver for MCP auth challenges.
227///
228/// Install one via [`McpHandlerConfig::with_auth_responder`]. When an MCP
229/// server returns an auth challenge during a tool call, resource read, or
230/// prompt fetch, the adapter invokes [`McpAuthResponder::resolve`] inline,
231/// applies the resulting credentials to the [`McpConnection`], and retries
232/// the original operation. Auth never surfaces as a loop interrupt.
233///
234/// Hosts that want to interleave the auth UI with the loop's main thread
235/// implement a thin channel-bridging responder (responder sends the
236/// challenge to the UI thread on a `mpsc::Sender`, awaits a `oneshot`
237/// reply with the resolution).
238#[async_trait]
239pub trait McpAuthResponder: Send + Sync + 'static {
240    async fn resolve(&self, request: AuthRequest) -> Result<AuthResolution, McpError>;
241}
242
243/// Typed view of a JSON-RPC error returned by an MCP server for an invoked
244/// method.
245///
246/// Surfaced by [`McpError::Invocation`] so callers can branch on the
247/// underlying error code without re-parsing strings. The variants cover
248/// every rmcp [`rmcp::model::ErrorCode`] constant defined at the time of
249/// writing; anything else (custom server codes, future protocol additions)
250/// lands in [`Self::Other`] with the original code preserved.
251///
252/// For the URL elicitation case ([`rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED`])
253/// the `data` payload is best-effort parsed into [`UrlElicitationData`].
254/// When the server's payload does not match the documented shape the typed
255/// `data` slot is `None` but `raw_data` always preserves the original
256/// [`serde_json::Value`] so callers can fall back to ad-hoc inspection.
257#[derive(Debug, Clone, thiserror::Error)]
258pub enum McpInvocationError {
259    /// JSON-RPC error `-32042` (URL elicitation required).
260    #[error("url elicitation required: {message}")]
261    UrlElicitation {
262        /// Human-readable message from the server.
263        message: String,
264        /// Typed view of the server's `data` payload, when it matched the
265        /// documented URL elicitation shape.
266        data: Option<UrlElicitationData>,
267        /// The original `data` value, preserved verbatim.
268        raw_data: Option<serde_json::Value>,
269    },
270    /// JSON-RPC error `-32600` (invalid request).
271    #[error("invalid request: {message}")]
272    InvalidRequest {
273        message: String,
274        data: Option<serde_json::Value>,
275    },
276    /// JSON-RPC error `-32601` (method not found).
277    #[error("method not found: {message}")]
278    MethodNotFound {
279        message: String,
280        data: Option<serde_json::Value>,
281    },
282    /// JSON-RPC error `-32602` (invalid params).
283    #[error("invalid params: {message}")]
284    InvalidParams {
285        message: String,
286        data: Option<serde_json::Value>,
287    },
288    /// JSON-RPC error `-32603` (internal error).
289    #[error("internal error: {message}")]
290    InternalError {
291        message: String,
292        data: Option<serde_json::Value>,
293    },
294    /// JSON-RPC error `-32700` (parse error).
295    #[error("parse error: {message}")]
296    ParseError {
297        message: String,
298        data: Option<serde_json::Value>,
299    },
300    /// JSON-RPC error `-32002` (resource not found).
301    #[error("resource not found: {message}")]
302    ResourceNotFound {
303        message: String,
304        data: Option<serde_json::Value>,
305    },
306    /// Forward-compat for custom server codes and any future rmcp additions
307    /// not yet recognized by this crate.
308    #[error("mcp error code {code}: {message}")]
309    Other {
310        code: i32,
311        message: String,
312        data: Option<serde_json::Value>,
313    },
314}
315
316/// Typed payload for the URL elicitation error case.
317///
318/// Mirrors the shape of [`rmcp::model::CreateElicitationRequestParams::UrlElicitationParams`]
319/// (camelCase on the wire). Server messages that include extra fields are
320/// accepted; missing required fields make typed parsing fail and the
321/// surrounding [`McpInvocationError::UrlElicitation`] preserves the raw
322/// payload instead.
323#[derive(Debug, Clone, serde::Deserialize)]
324#[serde(rename_all = "camelCase")]
325pub struct UrlElicitationData {
326    /// The URL where the user can complete the elicitation.
327    pub url: String,
328    /// The server-issued identifier for this elicitation request.
329    pub elicitation_id: String,
330    /// Optional human-readable message that accompanied the payload.
331    #[serde(default)]
332    pub message: Option<String>,
333}
334
335impl McpInvocationError {
336    /// Lifts an rmcp wire error into the typed enum. Infallible: well-known
337    /// codes attempt a typed `data` parse and degrade to a raw-only view
338    /// when parsing fails; unrecognized codes land in [`Self::Other`].
339    pub fn from_error_data(err: rmcp::model::ErrorData) -> Self {
340        let rmcp::model::ErrorData {
341            code,
342            message,
343            data,
344        } = err;
345        let message = message.into_owned();
346        match code {
347            rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED => {
348                let typed = data.as_ref().and_then(|value| {
349                    serde_json::from_value::<UrlElicitationData>(value.clone()).ok()
350                });
351                Self::UrlElicitation {
352                    message,
353                    data: typed,
354                    raw_data: data,
355                }
356            }
357            rmcp::model::ErrorCode::INVALID_REQUEST => Self::InvalidRequest { message, data },
358            rmcp::model::ErrorCode::METHOD_NOT_FOUND => Self::MethodNotFound { message, data },
359            rmcp::model::ErrorCode::INVALID_PARAMS => Self::InvalidParams { message, data },
360            rmcp::model::ErrorCode::INTERNAL_ERROR => Self::InternalError { message, data },
361            rmcp::model::ErrorCode::PARSE_ERROR => Self::ParseError { message, data },
362            rmcp::model::ErrorCode::RESOURCE_NOT_FOUND => Self::ResourceNotFound { message, data },
363            other => Self::Other {
364                code: other.0,
365                message,
366                data,
367            },
368        }
369    }
370
371    /// Returns the underlying JSON-RPC error code.
372    pub fn code(&self) -> i32 {
373        match self {
374            Self::UrlElicitation { .. } => rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED.0,
375            Self::InvalidRequest { .. } => rmcp::model::ErrorCode::INVALID_REQUEST.0,
376            Self::MethodNotFound { .. } => rmcp::model::ErrorCode::METHOD_NOT_FOUND.0,
377            Self::InvalidParams { .. } => rmcp::model::ErrorCode::INVALID_PARAMS.0,
378            Self::InternalError { .. } => rmcp::model::ErrorCode::INTERNAL_ERROR.0,
379            Self::ParseError { .. } => rmcp::model::ErrorCode::PARSE_ERROR.0,
380            Self::ResourceNotFound { .. } => rmcp::model::ErrorCode::RESOURCE_NOT_FOUND.0,
381            Self::Other { code, .. } => *code,
382        }
383    }
384}
385
386/// Userland hook invoked when an MCP server returns a JSON-RPC error for an
387/// invoked method.
388///
389/// Lets the host translate well-known errors (e.g. URL elicitation
390/// challenges) into a synthesized tool result the agent can render —
391/// without agentkit-mcp baking in any specific UX or response policy.
392///
393/// Install one via [`McpHandlerConfig::with_error_responder`]. When set,
394/// [`McpToolAdapter::invoke`] forwards every JSON-RPC error to the
395/// responder before falling back to [`ToolError::ExecutionFailed`]; the
396/// responder returns either a synthesized [`CallToolResult`] (treated as a
397/// successful call so `structured_content` flows through
398/// [`ToolOutput::Structured`]) or [`ErrorResponderOutcome::PassThrough`] to
399/// preserve the default failure path.
400#[async_trait]
401pub trait McpErrorResponder: Send + Sync + 'static {
402    /// Inspects an invocation error and decides whether to synthesize a
403    /// replacement [`CallToolResult`] or propagate the error.
404    async fn handle(
405        &self,
406        error: &McpInvocationError,
407        ctx: McpErrorContext<'_>,
408    ) -> ErrorResponderOutcome;
409}
410
411/// Context describing which server / method / input produced the
412/// invocation error currently being inspected by [`McpErrorResponder::handle`].
413pub struct McpErrorContext<'a> {
414    /// The server that returned the error.
415    pub server_id: &'a McpServerId,
416    /// The MCP method that was invoked.
417    pub method: &'a McpMethod,
418    /// The input payload supplied to the invocation, when available.
419    pub input: Option<&'a serde_json::Value>,
420}
421
422/// Decision returned by an [`McpErrorResponder`].
423pub enum ErrorResponderOutcome {
424    /// Replace the error with a synthesized successful response. The
425    /// returned [`CallToolResult`] flows through agentkit-mcp's normal
426    /// tool-result conversion: `structured_content` becomes
427    /// [`ToolOutput::Structured`], `content` becomes text / media parts,
428    /// and `is_error` is honoured.
429    SynthesizeResult(CallToolResult),
430    /// Defer to default behavior; the invocation error continues to surface
431    /// as [`ToolError::ExecutionFailed`].
432    PassThrough,
433}
434
435/// Unique identifier for a registered MCP server.
436///
437/// Each MCP server in a [`McpServerManager`] is addressed by its `McpServerId`.
438#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
439pub struct McpServerId(pub String);
440
441impl McpServerId {
442    /// Creates a new server identifier from any string-like value.
443    pub fn new(value: impl Into<String>) -> Self {
444        Self(value.into())
445    }
446}
447
448impl fmt::Display for McpServerId {
449    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450        self.0.fmt(f)
451    }
452}
453
454/// Configuration for an MCP server that communicates over standard I/O.
455///
456/// The specified command is spawned as a child process; rmcp drives the
457/// JSON-RPC framing over its stdin/stdout.
458#[derive(Clone, Debug, PartialEq, Eq)]
459pub struct StdioTransportConfig {
460    /// The executable to launch (e.g. `"npx"`, `"python"`, `"node"`).
461    pub command: String,
462    /// Command-line arguments passed to the executable.
463    pub args: Vec<String>,
464    /// Additional environment variables set for the child process.
465    pub env: Vec<(String, String)>,
466    /// Optional working directory for the child process.
467    pub cwd: Option<std::path::PathBuf>,
468}
469
470impl StdioTransportConfig {
471    /// Creates a new stdio transport configuration for the given command.
472    pub fn new(command: impl Into<String>) -> Self {
473        Self {
474            command: command.into(),
475            args: Vec::new(),
476            env: Vec::new(),
477            cwd: None,
478        }
479    }
480
481    /// Appends a command-line argument. Returns `self` for chaining.
482    pub fn with_arg(mut self, arg: impl Into<String>) -> Self {
483        self.args.push(arg.into());
484        self
485    }
486
487    /// Adds an environment variable for the child process. Returns `self` for chaining.
488    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
489        self.env.push((key.into(), value.into()));
490        self
491    }
492
493    /// Sets the working directory for the child process. Returns `self` for chaining.
494    pub fn with_cwd(mut self, cwd: impl Into<std::path::PathBuf>) -> Self {
495        self.cwd = Some(cwd.into());
496        self
497    }
498}
499
500/// Configuration for an MCP server that communicates over the MCP Streamable HTTP transport.
501#[derive(Clone, Default)]
502pub struct StreamableHttpTransportConfig {
503    /// The MCP endpoint URL to connect to.
504    pub url: String,
505    /// Static bearer token sent as an HTTP `Authorization: Bearer ...` header.
506    ///
507    /// Ignored when [`Self::http_client`] is set, since the custom client owns
508    /// authorization for every request.
509    pub bearer_token: Option<String>,
510    /// Static custom HTTP headers sent with every Streamable HTTP request.
511    ///
512    /// Ignored when [`Self::http_client`] is set.
513    pub headers: Vec<(HeaderName, HeaderValue)>,
514    /// Optional caller-supplied HTTP client.
515    ///
516    /// When `Some`, agentkit-mcp routes every Streamable HTTP request through
517    /// the provided implementation instead of rmcp's default reqwest client.
518    /// This is the seam to inject dynamic bearers, request signing, retry
519    /// middleware, custom TLS, and so on. See [`McpHttpClient`].
520    pub http_client: Option<Arc<dyn McpHttpClient>>,
521}
522
523impl fmt::Debug for StreamableHttpTransportConfig {
524    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525        f.debug_struct("StreamableHttpTransportConfig")
526            .field("url", &self.url)
527            .field(
528                "bearer_token",
529                &self.bearer_token.as_deref().map(|_| "<redacted>"),
530            )
531            .field("headers", &self.headers)
532            .field(
533                "http_client",
534                &self.http_client.as_ref().map(|_| "<custom>"),
535            )
536            .finish()
537    }
538}
539
540impl StreamableHttpTransportConfig {
541    /// Creates a new Streamable HTTP transport configuration for the given MCP endpoint.
542    pub fn new(url: impl Into<String>) -> Self {
543        Self {
544            url: url.into(),
545            bearer_token: None,
546            headers: Vec::new(),
547            http_client: None,
548        }
549    }
550
551    /// Sets a static bearer token for Streamable HTTP authorization.
552    ///
553    /// Ignored when a custom [`McpHttpClient`] is installed via
554    /// [`Self::with_http_client`].
555    pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
556        self.bearer_token = Some(token.into());
557        self
558    }
559
560    /// Installs a caller-supplied HTTP client for every Streamable HTTP
561    /// request issued by this transport.
562    ///
563    /// This is the only seam capable of producing per-request dynamic state
564    /// (rotating bearers, request signing, distributed-tracing headers).
565    /// rmcp's default reqwest path is bypassed entirely when this is set, so
566    /// implementations are responsible for forwarding `auth_header` /
567    /// `custom_headers` if they want the static config to keep applying.
568    pub fn with_http_client(mut self, client: Arc<dyn McpHttpClient>) -> Self {
569        self.http_client = Some(client);
570        self
571    }
572
573    /// Adds a static HTTP header for every Streamable HTTP request.
574    ///
575    /// Reserved MCP session and protocol headers are still managed by RMCP.
576    /// Ignored when a custom [`McpHttpClient`] is installed.
577    pub fn with_header<N, V>(mut self, name: N, value: V) -> Result<Self, McpError>
578    where
579        N: TryInto<HeaderName>,
580        N::Error: fmt::Display,
581        V: TryInto<HeaderValue>,
582        V::Error: fmt::Display,
583    {
584        let name = name
585            .try_into()
586            .map_err(|error| McpError::Transport(format!("invalid HTTP header name: {error}")))?;
587        let value = value
588            .try_into()
589            .map_err(|error| McpError::Transport(format!("invalid HTTP header value: {error}")))?;
590        self.headers.push((name, value));
591        Ok(self)
592    }
593}
594
595/// Type alias for the SSE stream returned by [`McpHttpClient::get_stream`].
596pub type McpSseStream = BoxStream<'static, Result<Sse, SseError>>;
597
598/// Pluggable HTTP transport for the MCP Streamable HTTP client.
599///
600/// Mirrors [`rmcp::transport::streamable_http_client::StreamableHttpClient`]
601/// but is dyn-compatible (boxed via `async_trait`) so the configuration can
602/// store an `Arc<dyn McpHttpClient>` without genericizing every type that
603/// flows through [`McpServerConfig`] / [`McpTransportBinding`].
604///
605/// The associated error type is fixed to [`reqwest::Error`] so that
606/// agentkit-mcp's auth-challenge detection (which downcasts to
607/// [`StreamableHttpError<reqwest::Error>`]) keeps working — implementations
608/// that wrap a non-reqwest backend should map their failures into a
609/// `reqwest::Error` before returning.
610///
611/// All three methods are invoked by rmcp's worker on every protocol op.
612/// `auth_header` and `custom_headers` carry the values resolved from
613/// [`StreamableHttpTransportConfig`] at connection time; implementations are
614/// free to ignore them and inject their own per-call values (e.g. a fresh
615/// bearer pulled from a runtime registry).
616#[async_trait]
617pub trait McpHttpClient: Send + Sync + 'static {
618    /// POSTs a single client→server JSON-RPC message. The response carries
619    /// either a JSON body or an SSE stream depending on what the server
620    /// negotiates.
621    async fn post_message(
622        &self,
623        uri: Arc<str>,
624        message: ClientJsonRpcMessage,
625        session_id: Option<Arc<str>>,
626        auth_header: Option<String>,
627        custom_headers: HashMap<HeaderName, HeaderValue>,
628    ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>>;
629
630    /// Tears down a server-issued session (HTTP DELETE).
631    async fn delete_session(
632        &self,
633        uri: Arc<str>,
634        session_id: Arc<str>,
635        auth_header: Option<String>,
636        custom_headers: HashMap<HeaderName, HeaderValue>,
637    ) -> Result<(), StreamableHttpError<reqwest::Error>>;
638
639    /// Opens a server→client SSE stream (HTTP GET) for push notifications and
640    /// reconnect resumes.
641    async fn get_stream(
642        &self,
643        uri: Arc<str>,
644        session_id: Arc<str>,
645        last_event_id: Option<String>,
646        auth_header: Option<String>,
647        custom_headers: HashMap<HeaderName, HeaderValue>,
648    ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>>;
649}
650
651/// Internal newtype that adapts an `Arc<dyn McpHttpClient>` to rmcp's
652/// generic, non-dyn-compatible [`RmcpStreamableHttpClient`] trait.
653#[derive(Clone)]
654struct DynHttpClient(Arc<dyn McpHttpClient>);
655
656impl RmcpStreamableHttpClient for DynHttpClient {
657    type Error = reqwest::Error;
658
659    async fn post_message(
660        &self,
661        uri: Arc<str>,
662        message: ClientJsonRpcMessage,
663        session_id: Option<Arc<str>>,
664        auth_header: Option<String>,
665        custom_headers: HashMap<HeaderName, HeaderValue>,
666    ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>> {
667        self.0
668            .post_message(uri, message, session_id, auth_header, custom_headers)
669            .await
670    }
671
672    async fn delete_session(
673        &self,
674        uri: Arc<str>,
675        session_id: Arc<str>,
676        auth_header: Option<String>,
677        custom_headers: HashMap<HeaderName, HeaderValue>,
678    ) -> Result<(), StreamableHttpError<reqwest::Error>> {
679        self.0
680            .delete_session(uri, session_id, auth_header, custom_headers)
681            .await
682    }
683
684    async fn get_stream(
685        &self,
686        uri: Arc<str>,
687        session_id: Arc<str>,
688        last_event_id: Option<String>,
689        auth_header: Option<String>,
690        custom_headers: HashMap<HeaderName, HeaderValue>,
691    ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>> {
692        self.0
693            .get_stream(uri, session_id, last_event_id, auth_header, custom_headers)
694            .await
695    }
696}
697
698/// Selects which transport an MCP server should use.
699#[derive(Clone, Debug)]
700pub enum McpTransportBinding {
701    /// Communicate over the child process's stdin/stdout.
702    Stdio(StdioTransportConfig),
703    /// Communicate over the MCP Streamable HTTP transport.
704    StreamableHttp(StreamableHttpTransportConfig),
705}
706
707/// Full configuration for a single MCP server.
708#[derive(Clone, Debug)]
709pub struct McpServerConfig {
710    /// Unique identifier for this server.
711    pub id: McpServerId,
712    /// Transport binding that determines how communication happens.
713    pub transport: McpTransportBinding,
714    /// Arbitrary metadata attached to this server configuration.
715    pub metadata: MetadataMap,
716}
717
718impl McpServerConfig {
719    /// Creates a new server configuration with the given identifier and transport.
720    pub fn new(id: impl Into<String>, transport: McpTransportBinding) -> Self {
721        Self {
722            id: McpServerId::new(id),
723            transport,
724            metadata: MetadataMap::new(),
725        }
726    }
727
728    /// Creates a stdio-backed server configuration.
729    pub fn stdio(id: impl Into<String>, command: impl Into<String>) -> Self {
730        Self::new(
731            id,
732            McpTransportBinding::Stdio(StdioTransportConfig::new(command)),
733        )
734    }
735
736    /// Creates a Streamable HTTP-backed server configuration.
737    pub fn streamable_http(id: impl Into<String>, url: impl Into<String>) -> Self {
738        Self::new(
739            id,
740            McpTransportBinding::StreamableHttp(StreamableHttpTransportConfig::new(url)),
741        )
742    }
743
744    /// Replaces the configuration metadata.
745    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
746        self.metadata = metadata;
747        self
748    }
749}
750
751type CustomNamespace = Arc<dyn Fn(&McpServerId, &str) -> String + Send + Sync>;
752
753/// Strategy used to derive the agentkit-side tool name for an MCP tool.
754///
755/// The default (`Default`) preserves agentkit's historical
756/// `mcp_<server>_<tool>` shape so that names satisfy provider validators
757/// that only allow `[a-zA-Z0-9_-]` (e.g. Anthropic on Vertex). Use
758/// [`McpToolNamespace::None`] when the calling provider already namespaces
759/// remote tools, or [`McpToolNamespace::Custom`] for a bespoke scheme.
760#[derive(Clone, Default)]
761pub enum McpToolNamespace {
762    /// Format names as `mcp_<server>_<tool>`.
763    #[default]
764    Default,
765    /// Use the raw MCP tool name with no prefix at all.
766    None,
767    /// Apply a caller-supplied function for full control.
768    Custom(CustomNamespace),
769}
770
771impl fmt::Debug for McpToolNamespace {
772    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773        match self {
774            Self::Default => f.write_str("McpToolNamespace::Default"),
775            Self::None => f.write_str("McpToolNamespace::None"),
776            Self::Custom(_) => f.write_str("McpToolNamespace::Custom(<fn>)"),
777        }
778    }
779}
780
781impl McpToolNamespace {
782    /// Builds a custom namespace from a closure.
783    pub fn custom(f: impl Fn(&McpServerId, &str) -> String + Send + Sync + 'static) -> Self {
784        Self::Custom(Arc::new(f))
785    }
786
787    /// Applies the namespace strategy to produce the agentkit tool name.
788    pub fn apply(&self, server_id: &McpServerId, tool_name: &str) -> String {
789        match self {
790            Self::Default => format!("mcp_{server_id}_{tool_name}"),
791            Self::None => tool_name.to_string(),
792            Self::Custom(f) => f(server_id, tool_name),
793        }
794    }
795
796    /// Recovers the raw MCP tool name from an agentkit-side name. Returns
797    /// `None` for [`Self::Custom`] (no general inverse) or when the name
798    /// doesn't match the expected shape.
799    pub fn unapply(&self, server_id: &McpServerId, agentkit_name: &str) -> Option<String> {
800        match self {
801            Self::Default => agentkit_name
802                .strip_prefix(&format!("mcp_{server_id}_"))
803                .map(str::to_string),
804            Self::None => Some(agentkit_name.to_string()),
805            Self::Custom(_) => None,
806        }
807    }
808}
809
810/// A snapshot of all capabilities discovered from a single MCP server.
811///
812/// Tools, resources, and prompts are stored as raw rmcp wire types
813/// ([`McpTool`], [`McpResource`], [`McpPrompt`]) so that consumers see the
814/// full typed surface — `Tool::annotations`, `Tool::output_schema`,
815/// `Tool::execution`, `Tool::icons`; `Resource::title` / `mime_type` /
816/// `size`; `Prompt::arguments` (with the typed `required` flag and per-arg
817/// `description`).
818#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
819pub struct McpDiscoverySnapshot {
820    /// The server this snapshot was taken from.
821    pub server_id: McpServerId,
822    /// Tools advertised by the server.
823    pub tools: Vec<McpTool>,
824    /// Resources advertised by the server.
825    pub resources: Vec<McpResource>,
826    /// Prompts advertised by the server.
827    pub prompts: Vec<McpPrompt>,
828    /// Arbitrary metadata attached to this snapshot.
829    pub metadata: MetadataMap,
830}
831
832/// Catalog and lifecycle events emitted by [`McpServerManager`].
833#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
834pub enum McpCatalogEvent {
835    /// A server connected and completed initial discovery.
836    ServerConnected { server_id: McpServerId },
837    /// A server disconnected.
838    ServerDisconnected { server_id: McpServerId },
839    /// The server's tool list changed.
840    ToolsChanged {
841        server_id: McpServerId,
842        added: Vec<String>,
843        removed: Vec<String>,
844        changed: Vec<String>,
845    },
846    /// The server's resource list changed.
847    ResourcesChanged {
848        server_id: McpServerId,
849        added: Vec<String>,
850        removed: Vec<String>,
851        changed: Vec<String>,
852    },
853    /// The server's prompt list changed.
854    PromptsChanged {
855        server_id: McpServerId,
856        added: Vec<String>,
857        removed: Vec<String>,
858        changed: Vec<String>,
859    },
860    /// Authentication state changed for a server.
861    AuthChanged { server_id: McpServerId },
862    /// A catalog refresh failed.
863    RefreshFailed {
864        server_id: McpServerId,
865        message: String,
866    },
867}
868
869/// Capabilities advertised by an MCP server during the `initialize` handshake.
870#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
871pub struct McpServerCapabilities {
872    /// Advertised `tools` capability.
873    #[serde(default, skip_serializing_if = "Option::is_none")]
874    pub tools: Option<ToolsCapability>,
875    /// Advertised `resources` capability.
876    #[serde(default, skip_serializing_if = "Option::is_none")]
877    pub resources: Option<ResourcesCapability>,
878    /// Advertised `prompts` capability.
879    #[serde(default, skip_serializing_if = "Option::is_none")]
880    pub prompts: Option<PromptsCapability>,
881    /// Advertised `logging` capability.
882    #[serde(default, skip_serializing_if = "Option::is_none")]
883    pub logging: Option<LoggingCapability>,
884}
885
886impl McpServerCapabilities {
887    /// Returns a capabilities struct with every top-level capability
888    /// advertised. Useful for tests.
889    pub fn all() -> Self {
890        Self {
891            tools: Some(ToolsCapability::default()),
892            resources: Some(ResourcesCapability::default()),
893            prompts: Some(PromptsCapability::default()),
894            logging: Some(LoggingCapability::default()),
895        }
896    }
897}
898
899/// Tools sub-capability flags from the MCP `initialize` response.
900#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
901#[serde(rename_all = "camelCase")]
902pub struct ToolsCapability {
903    /// Server emits `notifications/tools/list_changed` when the catalog changes.
904    #[serde(default, skip_serializing_if = "Option::is_none")]
905    pub list_changed: Option<bool>,
906}
907
908/// Resources sub-capability flags from the MCP `initialize` response.
909#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
910#[serde(rename_all = "camelCase")]
911pub struct ResourcesCapability {
912    /// Server supports `resources/subscribe`.
913    #[serde(default, skip_serializing_if = "Option::is_none")]
914    pub subscribe: Option<bool>,
915    /// Server emits `notifications/resources/list_changed`.
916    #[serde(default, skip_serializing_if = "Option::is_none")]
917    pub list_changed: Option<bool>,
918}
919
920/// Prompts sub-capability flags from the MCP `initialize` response.
921#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
922#[serde(rename_all = "camelCase")]
923pub struct PromptsCapability {
924    /// Server emits `notifications/prompts/list_changed`.
925    #[serde(default, skip_serializing_if = "Option::is_none")]
926    pub list_changed: Option<bool>,
927}
928
929/// Logging sub-capability. Spec reserves the key with no defined sub-fields yet.
930#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
931pub struct LoggingCapability {}
932
933/// Server-originated catalog notifications observed by [`McpClientHandler`].
934///
935/// Drained by [`McpConnection`] inside
936/// [`McpServerManager::refresh_changed_catalogs`] to trigger re-discovery of
937/// the affected capability lists. For richer push-style consumption (progress,
938/// logging, resource updates, cancellation), subscribe via
939/// [`McpConnection::subscribe_events`] and pattern-match on
940/// [`McpServerEvent`].
941#[allow(clippy::enum_variant_names)]
942#[derive(Clone, Debug)]
943pub enum McpServerNotification {
944    /// Server announced `notifications/tools/list_changed`.
945    ToolsChanged,
946    /// Server announced `notifications/resources/list_changed`.
947    ResourcesChanged,
948    /// Server announced `notifications/prompts/list_changed`.
949    PromptsChanged,
950}
951
952/// Server-pushed events broadcast to every [`McpConnection::subscribe_events`]
953/// receiver.
954///
955/// Covers the rmcp client-handler notification surface that does not feed the
956/// catalog refresh path: progress, logging, resource updates, cancellation,
957/// plus list-changed announcements (also delivered over the legacy
958/// [`McpServerNotification`] channel).
959#[derive(Clone, Debug)]
960pub enum McpServerEvent {
961    /// `notifications/progress` from the server, scoped to a
962    /// `progress_token` issued in a previous request.
963    Progress(McpProgressNotificationParam),
964    /// `notifications/message` (server log emission). Drives the optional
965    /// log-level negotiation initiated by [`McpConnection::set_logging_level`].
966    Logging(McpLoggingMessageNotificationParam),
967    /// `notifications/resources/updated` for a resource the client previously
968    /// subscribed to via [`McpConnection::subscribe_resource`].
969    ResourceUpdated(McpResourceUpdatedNotificationParam),
970    /// `notifications/tools/list_changed`.
971    ToolListChanged,
972    /// `notifications/resources/list_changed`.
973    ResourceListChanged,
974    /// `notifications/prompts/list_changed`.
975    PromptListChanged,
976    /// `notifications/cancelled` from the server, requesting cancellation of
977    /// an in-flight client request.
978    Cancelled(McpCancelledNotificationParam),
979}
980
981/// Pluggable handler invoked when an MCP server issues `sampling/createMessage`.
982///
983/// Install one via [`McpHandlerConfig::with_sampling_responder`] to expose
984/// the host application's LLM as a sampling target for connected MCP servers.
985#[async_trait]
986pub trait McpSamplingResponder: Send + Sync + 'static {
987    /// Produces a sampled completion in response to a server-initiated
988    /// `sampling/createMessage` request.
989    async fn create_message(
990        &self,
991        params: McpCreateMessageRequestParams,
992    ) -> Result<McpCreateMessageResult, McpError>;
993}
994
995/// Pluggable handler invoked when an MCP server issues `elicitation/create`.
996///
997/// Install one via [`McpHandlerConfig::with_elicitation_responder`] to drive
998/// the host application's user-input UI.
999#[async_trait]
1000pub trait McpElicitationResponder: Send + Sync + 'static {
1001    /// Returns the user's response to a server-initiated elicitation request.
1002    async fn create_elicitation(
1003        &self,
1004        params: McpCreateElicitationRequestParams,
1005    ) -> Result<McpCreateElicitationResult, McpError>;
1006}
1007
1008/// Pluggable handler invoked when an MCP server issues `roots/list`.
1009///
1010/// Install one via [`McpHandlerConfig::with_roots_provider`] to surface
1011/// workspace roots that scope the server's filesystem access.
1012#[async_trait]
1013pub trait McpRootsProvider: Send + Sync + 'static {
1014    /// Returns the roots the server should consider in scope.
1015    async fn list_roots(&self) -> Result<Vec<McpRoot>, McpError>;
1016}
1017
1018/// Default broadcast capacity for [`McpServerEvent`] subscribers.
1019const DEFAULT_EVENTS_CAPACITY: usize = 128;
1020
1021/// Channels paired with an [`McpClientHandler`] returned by
1022/// [`McpHandlerConfig::build`].
1023///
1024/// `notifications` is the legacy mpsc receiver consumed by the catalog refresh
1025/// path inside [`McpServerManager::refresh_changed_catalogs`]. `events` is the
1026/// broadcast sender that surfaces every [`McpServerEvent`] — clone it once and
1027/// pass it into [`McpConnection::from_running_service_with_events`] when
1028/// adopting an externally constructed [`rmcp::service::RunningService`]. If the
1029/// adopted connection also needs adapter-time hooks from the same
1030/// [`McpHandlerConfig`], use
1031/// [`McpConnection::from_running_service_with_events_and_handler_config`].
1032pub struct McpClientChannels {
1033    /// Legacy mpsc receiver for catalog list-changed announcements.
1034    pub notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1035    /// Broadcast sender that forwards every [`McpServerEvent`] to subscribers.
1036    pub events: broadcast::Sender<McpServerEvent>,
1037}
1038
1039/// rmcp [`ClientHandler`] used by [`McpConnection`].
1040///
1041/// You only need to construct this directly if you're wiring rmcp transports
1042/// that [`McpTransportBinding`] does not cover (in-memory pipes, websockets,
1043/// custom IO). Build one via [`McpHandlerConfig::build`], then pair the
1044/// resulting service with [`McpConnection::from_running_service`],
1045/// [`McpConnection::from_running_service_with_events`], or
1046/// [`McpConnection::from_running_service_with_events_and_handler_config`] when
1047/// the connection must preserve adapter-time hooks from the config.
1048#[derive(Clone)]
1049pub struct McpClientHandler {
1050    info: rmcp_model::ClientInfo,
1051    notifications: mpsc::UnboundedSender<McpServerNotification>,
1052    events: broadcast::Sender<McpServerEvent>,
1053    sampling: Option<Arc<dyn McpSamplingResponder>>,
1054    elicitation: Option<Arc<dyn McpElicitationResponder>>,
1055    roots: Option<Arc<dyn McpRootsProvider>>,
1056}
1057
1058impl ClientHandler for McpClientHandler {
1059    fn create_message(
1060        &self,
1061        params: rmcp_model::CreateMessageRequestParams,
1062        _context: rmcp::service::RequestContext<RoleClient>,
1063    ) -> impl Future<Output = Result<rmcp_model::CreateMessageResult, rmcp_model::ErrorData>>
1064    + rmcp::service::MaybeSendFuture
1065    + '_ {
1066        let responder = self.sampling.clone();
1067        async move {
1068            match responder {
1069                Some(responder) => responder.create_message(params).await.map_err(Into::into),
1070                None => Err(rmcp_model::ErrorData::method_not_found::<
1071                    rmcp_model::CreateMessageRequestMethod,
1072                >()),
1073            }
1074        }
1075    }
1076
1077    fn list_roots(
1078        &self,
1079        _context: rmcp::service::RequestContext<RoleClient>,
1080    ) -> impl Future<Output = Result<rmcp_model::ListRootsResult, rmcp_model::ErrorData>>
1081    + rmcp::service::MaybeSendFuture
1082    + '_ {
1083        let provider = self.roots.clone();
1084        async move {
1085            match provider {
1086                Some(provider) => provider
1087                    .list_roots()
1088                    .await
1089                    .map(McpListRootsResult::new)
1090                    .map_err(Into::into),
1091                None => Ok(McpListRootsResult::default()),
1092            }
1093        }
1094    }
1095
1096    fn create_elicitation(
1097        &self,
1098        params: rmcp_model::CreateElicitationRequestParams,
1099        _context: rmcp::service::RequestContext<RoleClient>,
1100    ) -> impl Future<Output = Result<rmcp_model::CreateElicitationResult, rmcp_model::ErrorData>>
1101    + rmcp::service::MaybeSendFuture
1102    + '_ {
1103        let responder = self.elicitation.clone();
1104        async move {
1105            match responder {
1106                Some(responder) => responder
1107                    .create_elicitation(params)
1108                    .await
1109                    .map_err(Into::into),
1110                None => Ok(McpCreateElicitationResult::new(
1111                    McpElicitationAction::Decline,
1112                )),
1113            }
1114        }
1115    }
1116
1117    fn on_progress(
1118        &self,
1119        params: rmcp_model::ProgressNotificationParam,
1120        _context: rmcp::service::NotificationContext<RoleClient>,
1121    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1122        let _ = self.events.send(McpServerEvent::Progress(params));
1123        std::future::ready(())
1124    }
1125
1126    fn on_logging_message(
1127        &self,
1128        params: rmcp_model::LoggingMessageNotificationParam,
1129        _context: rmcp::service::NotificationContext<RoleClient>,
1130    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1131        let _ = self.events.send(McpServerEvent::Logging(params));
1132        std::future::ready(())
1133    }
1134
1135    fn on_resource_updated(
1136        &self,
1137        params: rmcp_model::ResourceUpdatedNotificationParam,
1138        _context: rmcp::service::NotificationContext<RoleClient>,
1139    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1140        let _ = self.events.send(McpServerEvent::ResourceUpdated(params));
1141        std::future::ready(())
1142    }
1143
1144    fn on_cancelled(
1145        &self,
1146        params: rmcp_model::CancelledNotificationParam,
1147        _context: rmcp::service::NotificationContext<RoleClient>,
1148    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1149        let _ = self.events.send(McpServerEvent::Cancelled(params));
1150        std::future::ready(())
1151    }
1152
1153    fn on_tool_list_changed(
1154        &self,
1155        _context: rmcp::service::NotificationContext<RoleClient>,
1156    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1157        let _ = self.notifications.send(McpServerNotification::ToolsChanged);
1158        let _ = self.events.send(McpServerEvent::ToolListChanged);
1159        std::future::ready(())
1160    }
1161
1162    fn on_resource_list_changed(
1163        &self,
1164        _context: rmcp::service::NotificationContext<RoleClient>,
1165    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1166        let _ = self
1167            .notifications
1168            .send(McpServerNotification::ResourcesChanged);
1169        let _ = self.events.send(McpServerEvent::ResourceListChanged);
1170        std::future::ready(())
1171    }
1172
1173    fn on_prompt_list_changed(
1174        &self,
1175        _context: rmcp::service::NotificationContext<RoleClient>,
1176    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1177        let _ = self
1178            .notifications
1179            .send(McpServerNotification::PromptsChanged);
1180        let _ = self.events.send(McpServerEvent::PromptListChanged);
1181        std::future::ready(())
1182    }
1183
1184    fn get_info(&self) -> rmcp_model::ClientInfo {
1185        self.info.clone()
1186    }
1187}
1188
1189impl From<McpError> for rmcp_model::ErrorData {
1190    fn from(error: McpError) -> Self {
1191        rmcp_model::ErrorData::internal_error(error.to_string(), None)
1192    }
1193}
1194
1195type RmcpClientService = RunningService<RoleClient, McpClientHandler>;
1196
1197/// Configuration applied to every [`McpClientHandler`] this crate builds on
1198/// behalf of a connection or [`McpServerManager`].
1199///
1200/// Holds the optional sampling / elicitation / roots responders plus the
1201/// broadcast capacity for [`McpServerEvent`] subscribers. Pass an instance to
1202/// [`McpConnection::connect_with_handler`] to drive a single connection, or
1203/// install one on the manager via
1204/// [`McpServerManager::with_handler_config`] / per-trait builders.
1205#[derive(Clone, Default)]
1206pub struct McpHandlerConfig {
1207    /// Responder for server-initiated `sampling/createMessage` requests.
1208    pub sampling: Option<Arc<dyn McpSamplingResponder>>,
1209    /// Responder for server-initiated `elicitation/create` requests.
1210    pub elicitation: Option<Arc<dyn McpElicitationResponder>>,
1211    /// Provider for `roots/list`.
1212    pub roots: Option<Arc<dyn McpRootsProvider>>,
1213    /// Resolver for auth challenges raised during MCP operations. When
1214    /// installed, [`McpToolAdapter::invoke`] (and other operation paths)
1215    /// invoke the responder inline on auth challenges and retry — auth
1216    /// never surfaces as a loop interrupt.
1217    pub auth: Option<Arc<dyn McpAuthResponder>>,
1218    /// Handler invoked when an MCP server returns a JSON-RPC error for an
1219    /// invoked tool. When installed, [`McpToolAdapter::invoke`] forwards
1220    /// the typed [`McpInvocationError`] to the responder before falling
1221    /// back to [`ToolError::ExecutionFailed`]; the responder may synthesize
1222    /// a [`CallToolResult`] (the agent sees a successful tool call) or
1223    /// pass the error through unchanged.
1224    pub error_responder: Option<Arc<dyn McpErrorResponder>>,
1225    /// Broadcast capacity for the [`McpServerEvent`] channel. Defaults to
1226    /// `DEFAULT_EVENTS_CAPACITY` when `None`.
1227    pub events_capacity: Option<usize>,
1228}
1229
1230impl McpHandlerConfig {
1231    /// Returns an empty handler config.
1232    pub fn new() -> Self {
1233        Self::default()
1234    }
1235
1236    /// Sets the sampling responder.
1237    pub fn with_sampling_responder(mut self, responder: Arc<dyn McpSamplingResponder>) -> Self {
1238        self.sampling = Some(responder);
1239        self
1240    }
1241
1242    /// Sets the elicitation responder.
1243    pub fn with_elicitation_responder(
1244        mut self,
1245        responder: Arc<dyn McpElicitationResponder>,
1246    ) -> Self {
1247        self.elicitation = Some(responder);
1248        self
1249    }
1250
1251    /// Sets the roots provider.
1252    pub fn with_roots_provider(mut self, provider: Arc<dyn McpRootsProvider>) -> Self {
1253        self.roots = Some(provider);
1254        self
1255    }
1256
1257    /// Sets the auth responder.
1258    pub fn with_auth_responder(mut self, responder: Arc<dyn McpAuthResponder>) -> Self {
1259        self.auth = Some(responder);
1260        self
1261    }
1262
1263    /// Sets the invocation-error responder.
1264    pub fn with_error_responder(mut self, responder: Arc<dyn McpErrorResponder>) -> Self {
1265        self.error_responder = Some(responder);
1266        self
1267    }
1268
1269    /// Sets the broadcast capacity for [`McpServerEvent`] subscribers.
1270    pub fn with_events_capacity(mut self, capacity: usize) -> Self {
1271        self.events_capacity = Some(capacity);
1272        self
1273    }
1274
1275    /// Builds a handler together with a fresh [`McpClientChannels`] pair —
1276    /// the notification receiver and a new broadcast sender for
1277    /// [`McpServerEvent`].
1278    pub fn build(&self) -> (McpClientHandler, McpClientChannels) {
1279        self.build_inner(None)
1280    }
1281
1282    /// Builds a handler that publishes [`McpServerEvent`] into the provided
1283    /// broadcast sender. Use this when adopting an externally constructed
1284    /// rmcp service via [`McpConnection::from_running_service_with_events`]
1285    /// so subscribers see the same stream.
1286    pub fn build_with(
1287        &self,
1288        events: broadcast::Sender<McpServerEvent>,
1289    ) -> (McpClientHandler, McpClientChannels) {
1290        self.build_inner(Some(events))
1291    }
1292
1293    fn build_inner(
1294        &self,
1295        events: Option<broadcast::Sender<McpServerEvent>>,
1296    ) -> (McpClientHandler, McpClientChannels) {
1297        let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
1298        let events_tx = events.unwrap_or_else(|| {
1299            let capacity = self.events_capacity.unwrap_or(DEFAULT_EVENTS_CAPACITY);
1300            let (tx, _) = broadcast::channel(capacity);
1301            tx
1302        });
1303
1304        let mut capabilities = rmcp_model::ClientCapabilities::default();
1305        if self.sampling.is_some() {
1306            capabilities.sampling = Some(McpSamplingCapability::default());
1307        }
1308        if self.elicitation.is_some() {
1309            capabilities.elicitation = Some(McpElicitationCapability {
1310                form: Some(McpFormElicitationCapability::default()),
1311                url: None,
1312            });
1313        }
1314        if self.roots.is_some() {
1315            capabilities.roots = Some(McpRootsCapabilities::default());
1316        }
1317
1318        let handler = McpClientHandler {
1319            info: rmcp_model::ClientInfo::new(
1320                capabilities,
1321                rmcp_model::Implementation::new("agentkit-mcp", env!("CARGO_PKG_VERSION"))
1322                    .with_title("agentkit MCP client"),
1323            )
1324            .with_protocol_version(rmcp_model::ProtocolVersion::LATEST),
1325            notifications: notifications_tx,
1326            events: events_tx.clone(),
1327            sampling: self.sampling.clone(),
1328            elicitation: self.elicitation.clone(),
1329            roots: self.roots.clone(),
1330        };
1331
1332        (
1333            handler,
1334            McpClientChannels {
1335                notifications: notifications_rx,
1336                events: events_tx,
1337            },
1338        )
1339    }
1340}
1341
1342/// A live connection to a single MCP server, wrapping an
1343/// [`rmcp::service::RunningService`].
1344pub struct McpConnection {
1345    server_id: McpServerId,
1346    config: Option<McpServerConfig>,
1347    inner: Mutex<RmcpClientService>,
1348    peer: RwLock<Peer<RoleClient>>,
1349    auth: Mutex<Option<MetadataMap>>,
1350    notifications: Mutex<mpsc::UnboundedReceiver<McpServerNotification>>,
1351    events: broadcast::Sender<McpServerEvent>,
1352    handler_config: McpHandlerConfig,
1353    capabilities: McpServerCapabilities,
1354}
1355
1356/// The result of replaying an MCP operation after auth resolution.
1357#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1358pub enum McpOperationResult {
1359    /// The server was successfully (re)connected; contains the discovery snapshot.
1360    Connected(McpDiscoverySnapshot),
1361    /// A tool call completed; contains the typed rmcp [`CallToolResult`].
1362    Tool(CallToolResult),
1363    /// A resource was read successfully.
1364    Resource(ReadResourceResult),
1365    /// A prompt was retrieved successfully.
1366    Prompt(GetPromptResult),
1367}
1368
1369impl McpConnection {
1370    /// Connects to an MCP server, performs the rmcp `initialize` handshake,
1371    /// and returns a ready-to-use connection. No sampling / elicitation /
1372    /// roots responders are wired; use [`Self::connect_with_handler`] when
1373    /// the server may issue those requests.
1374    pub async fn connect(config: &McpServerConfig) -> Result<Self, McpError> {
1375        Self::connect_with_auth(config, None, McpHandlerConfig::default()).await
1376    }
1377
1378    /// Connects to an MCP server with a fully configured [`McpHandlerConfig`].
1379    pub async fn connect_with_handler(
1380        config: &McpServerConfig,
1381        handler_config: McpHandlerConfig,
1382    ) -> Result<Self, McpError> {
1383        Self::connect_with_auth(config, None, handler_config).await
1384    }
1385
1386    async fn connect_with_auth(
1387        config: &McpServerConfig,
1388        auth: Option<&MetadataMap>,
1389        handler_config: McpHandlerConfig,
1390    ) -> Result<Self, McpError> {
1391        let (handler, channels) = handler_config.build();
1392        let McpClientChannels {
1393            notifications: notification_rx,
1394            events: events_tx,
1395        } = channels;
1396        let (service, capabilities) = match &config.transport {
1397            McpTransportBinding::Stdio(binding) => {
1398                connect_rmcp_stdio(config, binding, handler).await?
1399            }
1400            McpTransportBinding::StreamableHttp(binding) => {
1401                connect_rmcp_streamable_http(config, binding, auth, handler).await?
1402            }
1403        };
1404
1405        let peer = service.peer().clone();
1406        Ok(Self {
1407            server_id: config.id.clone(),
1408            config: Some(config.clone()),
1409            inner: Mutex::new(service),
1410            peer: RwLock::new(peer),
1411            auth: Mutex::new(auth.cloned()),
1412            notifications: Mutex::new(notification_rx),
1413            events: events_tx,
1414            handler_config,
1415            capabilities,
1416        })
1417    }
1418
1419    /// Adopts an externally constructed [`rmcp::service::RunningService`] as
1420    /// an [`McpConnection`].
1421    ///
1422    /// Use this when you need a transport rmcp supports but
1423    /// [`McpTransportBinding`] does not (in-memory pipes for tests, websockets,
1424    /// custom IO). Pair the service with the notification receiver returned by
1425    /// [`McpHandlerConfig::build`] so list-change notifications stay
1426    /// observable.
1427    ///
1428    /// The connection has no [`McpServerConfig`] attached, so reconnect-on-auth
1429    /// is unavailable; [`resolve_auth`](Self::resolve_auth) only updates stored
1430    /// credentials in this mode. Server-pushed events from the underlying
1431    /// handler are *not* forwarded to subscribers — use
1432    /// [`Self::from_running_service_with_events`] paired with the broadcast
1433    /// sender from [`McpClientChannels`] when you need event delivery.
1434    pub fn from_running_service(
1435        server_id: impl Into<McpServerId>,
1436        service: RmcpClientService,
1437        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1438    ) -> Self {
1439        let (events_tx, _) = broadcast::channel(DEFAULT_EVENTS_CAPACITY);
1440        Self::from_running_service_with_events(server_id, service, notifications, events_tx)
1441    }
1442
1443    /// Variant of [`Self::from_running_service`] that wires the broadcast
1444    /// sender returned by [`McpHandlerConfig::build`] (or [`build_with`])
1445    /// so [`Self::subscribe_events`] receivers observe the same stream the
1446    /// handler is publishing into.
1447    ///
1448    /// [`build_with`]: McpHandlerConfig::build_with
1449    pub fn from_running_service_with_events(
1450        server_id: impl Into<McpServerId>,
1451        service: RmcpClientService,
1452        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1453        events: broadcast::Sender<McpServerEvent>,
1454    ) -> Self {
1455        Self::from_running_service_with_events_and_handler_config(
1456            server_id,
1457            service,
1458            notifications,
1459            events,
1460            McpHandlerConfig::default(),
1461        )
1462    }
1463
1464    /// Variant of [`Self::from_running_service_with_events`] that also
1465    /// preserves the handler config used to build the adopted service.
1466    ///
1467    /// Use this when the connection needs to reach client-side hooks that are
1468    /// not carried by the rmcp handler itself, such as [`McpAuthResponder`] and
1469    /// [`McpErrorResponder`] during [`McpToolAdapter`] invocation.
1470    pub fn from_running_service_with_events_and_handler_config(
1471        server_id: impl Into<McpServerId>,
1472        service: RmcpClientService,
1473        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1474        events: broadcast::Sender<McpServerEvent>,
1475        handler_config: McpHandlerConfig,
1476    ) -> Self {
1477        let capabilities = service
1478            .peer_info()
1479            .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1480            .unwrap_or_default();
1481        let peer = service.peer().clone();
1482        Self {
1483            server_id: server_id.into(),
1484            config: None,
1485            inner: Mutex::new(service),
1486            peer: RwLock::new(peer),
1487            auth: Mutex::new(None),
1488            notifications: Mutex::new(notifications),
1489            events,
1490            handler_config,
1491            capabilities,
1492        }
1493    }
1494
1495    async fn reconnect_inner(&self, auth: Option<&MetadataMap>) -> Result<(), McpError> {
1496        let Some(config) = self.config.clone() else {
1497            return Ok(());
1498        };
1499        let (handler, channels) = self.handler_config.build_with(self.events.clone());
1500        let McpClientChannels {
1501            notifications: notification_rx,
1502            ..
1503        } = channels;
1504        let (service, _capabilities) = match &config.transport {
1505            McpTransportBinding::Stdio(binding) => {
1506                connect_rmcp_stdio(&config, binding, handler).await?
1507            }
1508            McpTransportBinding::StreamableHttp(binding) => {
1509                connect_rmcp_streamable_http(&config, binding, auth, handler).await?
1510            }
1511        };
1512        let new_peer = service.peer().clone();
1513        *self.notifications.lock().await = notification_rx;
1514        *self.inner.lock().await = service;
1515        *self.peer.write().expect("MCP peer lock poisoned") = new_peer;
1516        Ok(())
1517    }
1518
1519    fn peer(&self) -> Peer<RoleClient> {
1520        self.peer.read().expect("MCP peer lock poisoned").clone()
1521    }
1522
1523    /// Returns the [`McpServerId`] for this connection.
1524    pub fn server_id(&self) -> &McpServerId {
1525        &self.server_id
1526    }
1527
1528    /// Returns the capabilities advertised by the server during `initialize`.
1529    pub fn capabilities(&self) -> &McpServerCapabilities {
1530        &self.capabilities
1531    }
1532
1533    /// Returns the [`McpHandlerConfig`] this connection was built with.
1534    /// Used by [`McpToolAdapter`] to reach the registered
1535    /// [`McpAuthResponder`] when an auth challenge surfaces.
1536    pub fn handler_config(&self) -> &McpHandlerConfig {
1537        &self.handler_config
1538    }
1539
1540    /// Subscribes to the per-connection [`McpServerEvent`] broadcast.
1541    ///
1542    /// Receivers buffer up to `events_capacity` (configured via
1543    /// [`McpHandlerConfig::with_events_capacity`], defaults to
1544    /// `DEFAULT_EVENTS_CAPACITY`) before slow consumers are signalled with
1545    /// [`broadcast::error::RecvError::Lagged`]. Catalog `*ListChanged` events
1546    /// are also delivered through the legacy [`McpServerNotification`]
1547    /// receiver consumed by [`McpServerManager::refresh_changed_catalogs`].
1548    pub fn subscribe_events(&self) -> broadcast::Receiver<McpServerEvent> {
1549        self.events.subscribe()
1550    }
1551
1552    /// Subscribes to `notifications/resources/updated` for the given URI.
1553    ///
1554    /// Updates surface as [`McpServerEvent::ResourceUpdated`] on every
1555    /// receiver returned by [`Self::subscribe_events`].
1556    pub async fn subscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1557        let uri = uri.into();
1558        self.peer()
1559            .subscribe(rmcp_model::SubscribeRequestParams::new(uri.clone()))
1560            .await
1561            .map_err(|error| {
1562                rmcp_operation_error(
1563                    &self.server_id,
1564                    McpMethod::ResourcesSubscribe { uri },
1565                    error,
1566                )
1567            })
1568    }
1569
1570    /// Cancels a previous [`Self::subscribe_resource`] subscription.
1571    pub async fn unsubscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1572        let uri = uri.into();
1573        self.peer()
1574            .unsubscribe(rmcp_model::UnsubscribeRequestParams::new(uri.clone()))
1575            .await
1576            .map_err(|error| {
1577                rmcp_operation_error(
1578                    &self.server_id,
1579                    McpMethod::ResourcesUnsubscribe { uri },
1580                    error,
1581                )
1582            })
1583    }
1584
1585    /// Negotiates the minimum severity the server should emit through
1586    /// `notifications/message`. Surfaced as [`McpServerEvent::Logging`].
1587    pub async fn set_logging_level(&self, level: McpLoggingLevel) -> Result<(), McpError> {
1588        self.peer()
1589            .set_level(rmcp_model::SetLevelRequestParams::new(level))
1590            .await
1591            .map_err(|error| {
1592                rmcp_operation_error(
1593                    &self.server_id,
1594                    McpMethod::LoggingSetLevel {
1595                        level: format!("{level:?}"),
1596                    },
1597                    error,
1598                )
1599            })
1600    }
1601
1602    /// Sends a `notifications/cancelled` to the server, asking it to stop
1603    /// processing a previously issued request.
1604    pub async fn notify_cancelled(
1605        &self,
1606        params: McpCancelledNotificationParam,
1607    ) -> Result<(), McpError> {
1608        self.peer()
1609            .notify_cancelled(params)
1610            .await
1611            .map_err(rmcp_service_error)
1612    }
1613
1614    /// Notifies the server that the client's roots list has changed; servers
1615    /// may respond by re-issuing `roots/list`.
1616    pub async fn notify_roots_list_changed(&self) -> Result<(), McpError> {
1617        self.peer()
1618            .notify_roots_list_changed()
1619            .await
1620            .map_err(rmcp_service_error)
1621    }
1622
1623    /// Gracefully closes the underlying rmcp service.
1624    ///
1625    /// For Streamable HTTP this drives the rmcp transport to issue a `DELETE`
1626    /// against the negotiated session, releasing server-side state.
1627    pub async fn close(&self) -> Result<(), McpError> {
1628        let mut inner = self.inner.lock().await;
1629        inner
1630            .close()
1631            .await
1632            .map(|_| ())
1633            .map_err(|error| McpError::Transport(format!("rmcp service close failed: {error}")))
1634    }
1635
1636    /// Stores or clears authentication credentials and, when configured to do
1637    /// so via [`McpServerConfig`], reconnects to apply them.
1638    pub async fn resolve_auth(&self, resolution: AuthResolution) -> Result<(), McpError> {
1639        let mut auth_slot = self.auth.lock().await;
1640        match resolution {
1641            AuthResolution::Provided { credentials, .. } => {
1642                *auth_slot = Some(credentials);
1643            }
1644            AuthResolution::Cancelled { .. } => {
1645                *auth_slot = None;
1646            }
1647        }
1648        let snapshot = auth_slot.clone();
1649        drop(auth_slot);
1650        // Only reconnect if we have a config to reconnect with. Without one
1651        // (e.g. constructed via [`from_running_service`]) the auth is stored
1652        // but not pushed to the live transport.
1653        if self.config.is_some() {
1654            self.reconnect_inner(snapshot.as_ref()).await?;
1655        }
1656        Ok(())
1657    }
1658
1659    /// Discovers tools, resources, and prompts that the server advertised.
1660    pub async fn discover(&self) -> Result<McpDiscoverySnapshot, McpError> {
1661        let tools = async {
1662            match self.capabilities.tools {
1663                Some(_) => self.list_tools().await,
1664                None => Ok(Vec::new()),
1665            }
1666        };
1667        let resources = async {
1668            match self.capabilities.resources {
1669                Some(_) => self.list_resources().await,
1670                None => Ok(Vec::new()),
1671            }
1672        };
1673        let prompts = async {
1674            match self.capabilities.prompts {
1675                Some(_) => self.list_prompts().await,
1676                None => Ok(Vec::new()),
1677            }
1678        };
1679        let (tools, resources, prompts) = tokio::try_join!(tools, resources, prompts)?;
1680        Ok(McpDiscoverySnapshot {
1681            server_id: self.server_id.clone(),
1682            tools,
1683            resources,
1684            prompts,
1685            metadata: MetadataMap::new(),
1686        })
1687    }
1688
1689    async fn drain_notifications(&self) -> Vec<McpServerNotification> {
1690        let mut notifications = self.notifications.lock().await;
1691        let mut drained = Vec::new();
1692        while let Ok(notification) = notifications.try_recv() {
1693            drained.push(notification);
1694        }
1695        drained
1696    }
1697
1698    /// Lists all tools advertised by the connected MCP server.
1699    pub async fn list_tools(&self) -> Result<Vec<McpTool>, McpError> {
1700        self.peer()
1701            .list_all_tools()
1702            .await
1703            .map_err(rmcp_service_error)
1704    }
1705
1706    /// Lists all resources advertised by the connected MCP server.
1707    pub async fn list_resources(&self) -> Result<Vec<McpResource>, McpError> {
1708        self.peer()
1709            .list_all_resources()
1710            .await
1711            .map_err(rmcp_service_error)
1712    }
1713
1714    /// Lists all prompts advertised by the connected MCP server.
1715    pub async fn list_prompts(&self) -> Result<Vec<McpPrompt>, McpError> {
1716        self.peer()
1717            .list_all_prompts()
1718            .await
1719            .map_err(rmcp_service_error)
1720    }
1721
1722    /// Invokes a tool on the MCP server.
1723    ///
1724    /// Returns the typed [`CallToolResult`] — the [`Vec<Content>`] block list,
1725    /// the optional `structured_content` field, and the `is_error` flag are
1726    /// all preserved. Adapters convert this into agentkit
1727    /// [`ToolOutput`]/[`InvocableOutput`] at the boundary.
1728    pub async fn call_tool(
1729        &self,
1730        name: &str,
1731        arguments: Value,
1732    ) -> Result<CallToolResult, McpError> {
1733        let arguments_for_auth = arguments.clone();
1734        let mut params = rmcp_model::CallToolRequestParams::new(name.to_string());
1735        if !arguments.is_null() {
1736            params =
1737                params.with_arguments(value_to_json_object(arguments, "tools/call arguments")?);
1738        }
1739        let name_owned = name.to_string();
1740
1741        // Wire-call span so tool latency on `agent.execute_tool` can be
1742        // broken down into MCP round-trip vs dispatch overhead. Parents
1743        // under the loop's execute_tool span via the tracing hierarchy.
1744        let span = tracing::info_span!(
1745            "mcp.call_tool",
1746            "otel.name" = %format!("mcp.call_tool {name}"),
1747            "mcp.server.id" = %self.server_id,
1748            "mcp.tool.name" = %name,
1749            "error.type" = tracing::field::Empty,
1750        );
1751        use tracing::Instrument;
1752        let result = self.peer().call_tool(params).instrument(span.clone()).await;
1753        match result {
1754            Ok(result) => {
1755                if result.is_error == Some(true) {
1756                    span.record("error.type", "tool_error");
1757                }
1758                Ok(result)
1759            }
1760            Err(error) => {
1761                span.record("error.type", "mcp_error");
1762                Err(rmcp_operation_error(
1763                    &self.server_id,
1764                    McpMethod::ToolsCall {
1765                        name: name_owned,
1766                        arguments: arguments_for_auth,
1767                    },
1768                    error,
1769                ))
1770            }
1771        }
1772    }
1773
1774    /// Reads a resource from the MCP server by URI.
1775    ///
1776    /// Returns the typed [`ReadResourceResult`] — the full
1777    /// [`Vec<McpResourceContents>`] is preserved (text vs blob, mime types,
1778    /// metadata). Use [`McpResourceHandle`] for the agentkit
1779    /// [`ResourceProvider`] view that collapses to a single inline `DataRef`.
1780    pub async fn read_resource(&self, uri: &str) -> Result<ReadResourceResult, McpError> {
1781        let uri_owned = uri.to_string();
1782        self.peer()
1783            .read_resource(rmcp_model::ReadResourceRequestParams::new(uri))
1784            .await
1785            .map_err(|error| {
1786                rmcp_operation_error(
1787                    &self.server_id,
1788                    McpMethod::ResourcesRead { uri: uri_owned },
1789                    error,
1790                )
1791            })
1792    }
1793
1794    /// Retrieves a prompt from the MCP server, rendering it with the given
1795    /// arguments.
1796    ///
1797    /// Returns the typed [`GetPromptResult`] — message role and content
1798    /// blocks (text/image/audio/embedded resource) are preserved. Use
1799    /// [`McpPromptHandle`] for the collapsed agentkit [`PromptProvider`]
1800    /// view.
1801    pub async fn get_prompt(
1802        &self,
1803        name: &str,
1804        arguments: Value,
1805    ) -> Result<GetPromptResult, McpError> {
1806        let arguments_for_auth = arguments.clone();
1807        let name_owned = name.to_string();
1808        let mut params = rmcp_model::GetPromptRequestParams::new(name);
1809        if !arguments.is_null() {
1810            params =
1811                params.with_arguments(value_to_json_object(arguments, "prompts/get arguments")?);
1812        }
1813        self.peer().get_prompt(params).await.map_err(|error| {
1814            rmcp_operation_error(
1815                &self.server_id,
1816                McpMethod::PromptsGet {
1817                    name: name_owned,
1818                    arguments: arguments_for_auth,
1819                },
1820                error,
1821            )
1822        })
1823    }
1824}
1825
1826async fn connect_rmcp_stdio(
1827    config: &McpServerConfig,
1828    binding: &StdioTransportConfig,
1829    handler: McpClientHandler,
1830) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1831    let transport = TokioChildProcess::new(
1832        tokio::process::Command::new(&binding.command).configure(|command| {
1833            command.args(&binding.args);
1834            if let Some(cwd) = &binding.cwd {
1835                command.current_dir(cwd);
1836            }
1837            for (key, value) in &binding.env {
1838                command.env(key, value);
1839            }
1840        }),
1841    )
1842    .map_err(McpError::Io)?;
1843
1844    let service = handler
1845        .serve(transport)
1846        .await
1847        .map_err(|error| rmcp_initialize_error(config, error))?;
1848    let capabilities = service
1849        .peer_info()
1850        .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1851        .unwrap_or_default();
1852
1853    Ok((service, capabilities))
1854}
1855
1856async fn connect_rmcp_streamable_http(
1857    config: &McpServerConfig,
1858    binding: &StreamableHttpTransportConfig,
1859    auth: Option<&MetadataMap>,
1860    handler: McpClientHandler,
1861) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1862    let auth_header = auth
1863        .and_then(bearer_token_from_metadata)
1864        .or_else(|| binding.bearer_token.clone());
1865    let mut rmcp_config = RmcpStreamableHttpClientTransportConfig::with_uri(binding.url.clone());
1866    if let Some(auth_header) = auth_header {
1867        rmcp_config = rmcp_config.auth_header(auth_header);
1868    }
1869    rmcp_config = rmcp_config.custom_headers(binding.headers.iter().cloned().collect());
1870
1871    let result = match binding.http_client.as_ref() {
1872        Some(client) => {
1873            let transport = StreamableHttpClientTransport::with_client(
1874                DynHttpClient(client.clone()),
1875                rmcp_config,
1876            );
1877            handler.serve(transport).await
1878        }
1879        None => {
1880            let transport = StreamableHttpClientTransport::from_config(rmcp_config);
1881            handler.serve(transport).await
1882        }
1883    };
1884    let service = result.map_err(|error| rmcp_initialize_error(config, error))?;
1885    let capabilities = service
1886        .peer_info()
1887        .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1888        .unwrap_or_default();
1889
1890    Ok((service, capabilities))
1891}
1892
1893/// Adapter exposing a single MCP resource as a [`ResourceProvider`].
1894pub struct McpResourceHandle {
1895    connection: Arc<McpConnection>,
1896    descriptor: ResourceDescriptor,
1897}
1898
1899#[async_trait]
1900impl ResourceProvider for McpResourceHandle {
1901    async fn list_resources(&self) -> Result<Vec<ResourceDescriptor>, CapabilityError> {
1902        Ok(vec![self.descriptor.clone()])
1903    }
1904
1905    async fn read_resource(
1906        &self,
1907        id: &ResourceId,
1908        _ctx: &mut CapabilityContext<'_>,
1909    ) -> Result<ResourceContents, CapabilityError> {
1910        let result = self
1911            .connection
1912            .read_resource(&id.0)
1913            .await
1914            .map_err(|error| match error {
1915                McpError::AuthRequired(request) => {
1916                    CapabilityError::Unavailable(format!("auth required: {:?}", request))
1917                }
1918                other => CapabilityError::ExecutionFailed(other.to_string()),
1919            })?;
1920        read_resource_result_to_capabilities(result)
1921            .map_err(|error| CapabilityError::ExecutionFailed(error.to_string()))
1922    }
1923}
1924
1925/// Adapter exposing a single MCP prompt as a [`PromptProvider`].
1926pub struct McpPromptHandle {
1927    connection: Arc<McpConnection>,
1928    descriptor: PromptDescriptor,
1929}
1930
1931#[async_trait]
1932impl PromptProvider for McpPromptHandle {
1933    async fn list_prompts(&self) -> Result<Vec<PromptDescriptor>, CapabilityError> {
1934        Ok(vec![self.descriptor.clone()])
1935    }
1936
1937    async fn get_prompt(
1938        &self,
1939        id: &PromptId,
1940        args: Value,
1941        _ctx: &mut CapabilityContext<'_>,
1942    ) -> Result<PromptContents, CapabilityError> {
1943        let result =
1944            self.connection
1945                .get_prompt(&id.0, args)
1946                .await
1947                .map_err(|error| match error {
1948                    McpError::AuthRequired(request) => {
1949                        CapabilityError::Unavailable(format!("auth required: {:?}", request))
1950                    }
1951                    other => CapabilityError::ExecutionFailed(other.to_string()),
1952                })?;
1953        Ok(get_prompt_result_to_capabilities(result))
1954    }
1955}
1956
1957/// A [`CapabilityProvider`] that surfaces MCP tools, resources, and prompts.
1958///
1959/// The tool side is built by wrapping [`McpToolAdapter`]s in
1960/// [`agentkit_tools_core::ToolInvocableAdapter`], so the same
1961/// permission-check + adapter-spec plumbing the rest of agentkit uses also
1962/// applies to MCP tools — this crate no longer ships its own
1963/// `McpInvocable`.
1964pub struct McpCapabilityProvider {
1965    invocables: Vec<Arc<dyn Invocable>>,
1966    resources: Vec<Arc<dyn ResourceProvider>>,
1967    prompts: Vec<Arc<dyn PromptProvider>>,
1968}
1969
1970impl McpCapabilityProvider {
1971    /// Builds a capability provider from an existing connection and snapshot,
1972    /// using the [`McpToolNamespace::Default`] tool naming strategy.
1973    pub fn from_snapshot(connection: Arc<McpConnection>, snapshot: &McpDiscoverySnapshot) -> Self {
1974        Self::from_snapshot_with_namespace(connection, snapshot, &McpToolNamespace::Default)
1975    }
1976
1977    /// Builds a capability provider with a custom tool naming strategy.
1978    pub fn from_snapshot_with_namespace(
1979        connection: Arc<McpConnection>,
1980        snapshot: &McpDiscoverySnapshot,
1981        namespace: &McpToolNamespace,
1982    ) -> Self {
1983        let server_id = connection.server_id().clone();
1984        let registry =
1985            snapshot
1986                .tools
1987                .iter()
1988                .cloned()
1989                .fold(ToolRegistry::new(), |registry, tool| {
1990                    registry.with(McpToolAdapter::with_namespace(
1991                        &server_id,
1992                        connection.clone(),
1993                        tool,
1994                        namespace,
1995                    ))
1996                });
1997        let permissions: Arc<dyn PermissionChecker> = Arc::new(AllowAllPermissions);
1998        let resources_arc: Arc<dyn agentkit_tools_core::ToolResources> = Arc::new(());
1999        let invocables =
2000            ToolCapabilityProvider::from_registry(&registry, permissions, resources_arc)
2001                .invocables();
2002
2003        let resources = snapshot
2004            .resources
2005            .iter()
2006            .cloned()
2007            .map(|resource| {
2008                Arc::new(McpResourceHandle {
2009                    connection: connection.clone(),
2010                    descriptor: resource_descriptor_from_rmcp(resource),
2011                }) as Arc<dyn ResourceProvider>
2012            })
2013            .collect();
2014
2015        let prompts = snapshot
2016            .prompts
2017            .iter()
2018            .cloned()
2019            .map(|prompt| {
2020                Arc::new(McpPromptHandle {
2021                    connection: connection.clone(),
2022                    descriptor: prompt_descriptor_from_rmcp(prompt),
2023                }) as Arc<dyn PromptProvider>
2024            })
2025            .collect();
2026
2027        Self {
2028            invocables,
2029            resources,
2030            prompts,
2031        }
2032    }
2033
2034    /// Merges multiple capability providers into one.
2035    pub fn merge<I>(providers: I) -> Self
2036    where
2037        I: IntoIterator<Item = Self>,
2038    {
2039        let mut invocables = Vec::new();
2040        let mut resources = Vec::new();
2041        let mut prompts = Vec::new();
2042
2043        for provider in providers {
2044            invocables.extend(provider.invocables);
2045            resources.extend(provider.resources);
2046            prompts.extend(provider.prompts);
2047        }
2048
2049        Self {
2050            invocables,
2051            resources,
2052            prompts,
2053        }
2054    }
2055
2056    /// Connects to an MCP server, performs discovery, and builds a provider.
2057    pub async fn connect(
2058        config: &McpServerConfig,
2059    ) -> Result<(Arc<McpConnection>, Self, McpDiscoverySnapshot), McpError> {
2060        let connection = Arc::new(McpConnection::connect(config).await?);
2061        let snapshot = connection.discover().await?;
2062        let provider = Self::from_snapshot(connection.clone(), &snapshot);
2063
2064        Ok((connection, provider, snapshot))
2065    }
2066}
2067
2068impl CapabilityProvider for McpCapabilityProvider {
2069    fn invocables(&self) -> Vec<Arc<dyn Invocable>> {
2070        self.invocables.clone()
2071    }
2072
2073    fn resources(&self) -> Vec<Arc<dyn ResourceProvider>> {
2074        self.resources.clone()
2075    }
2076
2077    fn prompts(&self) -> Vec<Arc<dyn PromptProvider>> {
2078        self.prompts.clone()
2079    }
2080}
2081
2082/// A connected MCP server together with its configuration and snapshot.
2083#[derive(Clone)]
2084pub struct McpServerHandle {
2085    config: McpServerConfig,
2086    connection: Arc<McpConnection>,
2087    snapshot: McpDiscoverySnapshot,
2088    namespace: McpToolNamespace,
2089}
2090
2091impl McpServerHandle {
2092    /// Returns the original configuration used to connect this server.
2093    pub fn config(&self) -> &McpServerConfig {
2094        &self.config
2095    }
2096
2097    /// Returns the server's unique identifier.
2098    pub fn server_id(&self) -> &McpServerId {
2099        self.connection.server_id()
2100    }
2101
2102    /// Returns a shared reference to the underlying [`McpConnection`].
2103    pub fn connection(&self) -> Arc<McpConnection> {
2104        self.connection.clone()
2105    }
2106
2107    /// Returns the discovery snapshot captured when the server was connected.
2108    pub fn snapshot(&self) -> &McpDiscoverySnapshot {
2109        &self.snapshot
2110    }
2111
2112    /// Returns the tool naming strategy in effect for this server.
2113    pub fn namespace(&self) -> &McpToolNamespace {
2114        &self.namespace
2115    }
2116
2117    /// Builds a [`ToolRegistry`] containing an [`McpToolAdapter`] for each tool.
2118    pub fn tool_registry(&self) -> ToolRegistry {
2119        self.snapshot
2120            .tools
2121            .iter()
2122            .cloned()
2123            .fold(ToolRegistry::new(), |registry, tool| {
2124                registry.with(McpToolAdapter::with_namespace(
2125                    self.server_id(),
2126                    self.connection.clone(),
2127                    tool,
2128                    &self.namespace,
2129                ))
2130            })
2131    }
2132
2133    /// Builds an [`McpCapabilityProvider`] from this server's snapshot.
2134    pub fn capability_provider(&self) -> McpCapabilityProvider {
2135        McpCapabilityProvider::from_snapshot_with_namespace(
2136            self.connection.clone(),
2137            &self.snapshot,
2138            &self.namespace,
2139        )
2140    }
2141}
2142
2143/// Connection failure for one server returned by
2144/// [`McpServerManager::connect_all_settled`].
2145#[derive(Debug)]
2146pub struct McpServerConnectionError {
2147    /// The registered server that failed to connect or complete discovery.
2148    pub server_id: McpServerId,
2149    /// The underlying connection or discovery error for this server.
2150    pub error: McpError,
2151}
2152
2153/// Best-effort outcome returned by [`McpServerManager::connect_all_settled`].
2154///
2155/// Successful connections are installed into the manager and its tool catalog.
2156/// Failed entries leave any existing connection for that server untouched.
2157#[must_use = "inspect `failed` before ignoring the settled MCP connection result"]
2158pub struct McpConnectAllSettled {
2159    /// Handles for servers that connected and completed discovery.
2160    pub connected: Vec<McpServerHandle>,
2161    /// Per-server failures for servers that did not connect or discover.
2162    pub failed: Vec<McpServerConnectionError>,
2163}
2164
2165impl McpConnectAllSettled {
2166    /// Returns `true` when every registered server connected successfully.
2167    pub fn all_connected(&self) -> bool {
2168        self.failed.is_empty()
2169    }
2170
2171    /// Returns `true` when at least one server failed to connect.
2172    pub fn has_failures(&self) -> bool {
2173        !self.failed.is_empty()
2174    }
2175
2176    /// Borrows the successful connection handles.
2177    pub fn connected(&self) -> &[McpServerHandle] {
2178        &self.connected
2179    }
2180
2181    /// Borrows the per-server connection failures.
2182    pub fn failed(&self) -> &[McpServerConnectionError] {
2183        &self.failed
2184    }
2185
2186    /// Consumes the result into successful handles and failures.
2187    pub fn into_parts(self) -> (Vec<McpServerHandle>, Vec<McpServerConnectionError>) {
2188        (self.connected, self.failed)
2189    }
2190}
2191
2192impl fmt::Debug for McpConnectAllSettled {
2193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2194        let connected = self
2195            .connected
2196            .iter()
2197            .map(|handle| handle.server_id())
2198            .collect::<Vec<_>>();
2199        f.debug_struct("McpConnectAllSettled")
2200            .field("connected", &connected)
2201            .field("failed", &self.failed)
2202            .finish()
2203    }
2204}
2205
2206/// Per-server lifecycle options used by [`McpServerManager`].
2207#[derive(Clone, Debug, Default, PartialEq, Eq)]
2208pub struct McpServerOptions {
2209    /// Maximum time allowed to establish a server connection — transport
2210    /// setup and the MCP initialize handshake — and complete initial
2211    /// discovery (`tools/list`, `resources/list`, and `prompts/list`).
2212    /// Refresh discovery is bounded by the same duration.
2213    pub connect_timeout: Option<Duration>,
2214}
2215
2216impl McpServerOptions {
2217    /// Creates default server options.
2218    pub fn new() -> Self {
2219        Self::default()
2220    }
2221
2222    /// Sets the connect timeout.
2223    pub fn with_timeout(mut self, timeout: Duration) -> Self {
2224        self.connect_timeout = Some(timeout);
2225        self
2226    }
2227}
2228
2229/// Manages the lifecycle of one or more MCP servers.
2230pub struct McpServerManager {
2231    configs: BTreeMap<McpServerId, McpServerConfig>,
2232    options: BTreeMap<McpServerId, McpServerOptions>,
2233    connections: BTreeMap<McpServerId, McpServerHandle>,
2234    auth: BTreeMap<McpServerId, MetadataMap>,
2235    catalog_tx: broadcast::Sender<McpCatalogEvent>,
2236    namespace: McpToolNamespace,
2237    handler_config: McpHandlerConfig,
2238    catalog_writer: CatalogWriter,
2239    /// Agentkit-namespaced tool names this manager has registered for each
2240    /// connected server. Used to perform surgical writes against the
2241    /// [`CatalogWriter`] on connect/disconnect/refresh without rebuilding
2242    /// the whole catalog.
2243    server_tools: BTreeMap<McpServerId, BTreeSet<ToolName>>,
2244}
2245
2246impl Default for McpServerManager {
2247    fn default() -> Self {
2248        let (catalog_tx, _) = broadcast::channel(128);
2249        let (catalog_writer, _) = dynamic_catalog("mcp");
2250        Self {
2251            configs: BTreeMap::new(),
2252            options: BTreeMap::new(),
2253            connections: BTreeMap::new(),
2254            auth: BTreeMap::new(),
2255            catalog_tx,
2256            namespace: McpToolNamespace::Default,
2257            handler_config: McpHandlerConfig::default(),
2258            catalog_writer,
2259            server_tools: BTreeMap::new(),
2260        }
2261    }
2262}
2263
2264impl McpServerManager {
2265    /// Creates an empty server manager with no registered servers.
2266    pub fn new() -> Self {
2267        Self::default()
2268    }
2269
2270    /// Sets the tool naming strategy for every adapter built by this manager.
2271    pub fn with_namespace(mut self, namespace: McpToolNamespace) -> Self {
2272        self.namespace = namespace;
2273        self
2274    }
2275
2276    /// Replaces the tool naming strategy in place.
2277    pub fn set_namespace(&mut self, namespace: McpToolNamespace) -> &mut Self {
2278        self.namespace = namespace;
2279        self
2280    }
2281
2282    /// Returns the active tool naming strategy.
2283    pub fn namespace(&self) -> &McpToolNamespace {
2284        &self.namespace
2285    }
2286
2287    /// Replaces the [`McpHandlerConfig`] applied to every connection this
2288    /// manager opens.
2289    pub fn with_handler_config(mut self, handler_config: McpHandlerConfig) -> Self {
2290        self.handler_config = handler_config;
2291        self
2292    }
2293
2294    /// Sets the [`McpHandlerConfig`] in place.
2295    pub fn set_handler_config(&mut self, handler_config: McpHandlerConfig) -> &mut Self {
2296        self.handler_config = handler_config;
2297        self
2298    }
2299
2300    /// Returns the active [`McpHandlerConfig`].
2301    pub fn handler_config(&self) -> &McpHandlerConfig {
2302        &self.handler_config
2303    }
2304
2305    /// Registers a server configuration. Returns `self` for chaining.
2306    pub fn with_server(mut self, config: McpServerConfig) -> Self {
2307        self.register_server(config);
2308        self
2309    }
2310
2311    /// Registers a server configuration with lifecycle options. Returns
2312    /// `self` for chaining.
2313    pub fn with_server_options(
2314        mut self,
2315        config: McpServerConfig,
2316        options: McpServerOptions,
2317    ) -> Self {
2318        self.register_server_with_options(config, options);
2319        self
2320    }
2321
2322    /// Registers a server configuration by mutable reference.
2323    pub fn register_server(&mut self, config: McpServerConfig) -> &mut Self {
2324        let id = config.id.clone();
2325        self.configs.insert(id.clone(), config);
2326        self.options.entry(id).or_default();
2327        self
2328    }
2329
2330    /// Registers a server configuration and lifecycle options by mutable
2331    /// reference.
2332    pub fn register_server_with_options(
2333        &mut self,
2334        config: McpServerConfig,
2335        options: McpServerOptions,
2336    ) -> &mut Self {
2337        let id = config.id.clone();
2338        self.configs.insert(id.clone(), config);
2339        self.options.insert(id, options);
2340        self
2341    }
2342
2343    /// Returns the handle for a connected server, or `None` if not connected.
2344    pub fn connected_server(&self, server_id: &McpServerId) -> Option<&McpServerHandle> {
2345        self.connections.get(server_id)
2346    }
2347
2348    /// Returns handles for all currently connected servers.
2349    pub fn connected_servers(&self) -> Vec<&McpServerHandle> {
2350        self.connections.values().collect()
2351    }
2352
2353    /// Subscribes to MCP catalog and lifecycle events.
2354    pub fn subscribe_catalog_events(&self) -> broadcast::Receiver<McpCatalogEvent> {
2355        self.catalog_tx.subscribe()
2356    }
2357
2358    fn emit_catalog_event(&self, event: McpCatalogEvent) {
2359        let _ = self.catalog_tx.send(event);
2360    }
2361
2362    async fn discover_with_options(
2363        connection: &McpConnection,
2364        options: &McpServerOptions,
2365    ) -> Result<McpDiscoverySnapshot, McpError> {
2366        match options.connect_timeout {
2367            Some(timeout) => tokio::time::timeout(timeout, connection.discover())
2368                .await
2369                .map_err(|_| McpError::Timeout {
2370                    operation: "discover",
2371                    duration: timeout,
2372                })?,
2373            None => connection.discover().await,
2374        }
2375    }
2376
2377    async fn connect_and_discover(
2378        config: &McpServerConfig,
2379        auth: Option<&MetadataMap>,
2380        handler_config: McpHandlerConfig,
2381        options: &McpServerOptions,
2382    ) -> Result<(Arc<McpConnection>, McpDiscoverySnapshot), McpError> {
2383        let connect = async {
2384            let connection =
2385                Arc::new(McpConnection::connect_with_auth(config, auth, handler_config).await?);
2386            let snapshot = connection.discover().await?;
2387            Ok((connection, snapshot))
2388        };
2389        match options.connect_timeout {
2390            Some(timeout) => {
2391                tokio::time::timeout(timeout, connect)
2392                    .await
2393                    .map_err(|_| McpError::Timeout {
2394                        operation: "connect",
2395                        duration: timeout,
2396                    })?
2397            }
2398            None => connect.await,
2399        }
2400    }
2401
2402    /// Connects a single registered server by its identifier.
2403    pub async fn connect_server(
2404        &mut self,
2405        server_id: &McpServerId,
2406    ) -> Result<McpServerHandle, McpError> {
2407        let config = self
2408            .configs
2409            .get(server_id)
2410            .cloned()
2411            .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2412        let options = self.options.get(server_id).cloned().unwrap_or_default();
2413        let (connection, snapshot) = Self::connect_and_discover(
2414            &config,
2415            self.auth.get(server_id),
2416            self.handler_config.clone(),
2417            &options,
2418        )
2419        .await?;
2420        let handle = McpServerHandle {
2421            config,
2422            connection,
2423            snapshot,
2424            namespace: self.namespace.clone(),
2425        };
2426        self.connections.insert(server_id.clone(), handle.clone());
2427        self.register_server_tools(server_id, &handle.snapshot);
2428        self.emit_catalog_event(McpCatalogEvent::ServerConnected {
2429            server_id: server_id.clone(),
2430        });
2431        Ok(handle)
2432    }
2433
2434    /// Connects all registered servers concurrently.
2435    pub async fn connect_all(&mut self) -> Result<Vec<McpServerHandle>, McpError> {
2436        let plans: Vec<(
2437            McpServerId,
2438            McpServerConfig,
2439            McpServerOptions,
2440            Option<MetadataMap>,
2441        )> = self
2442            .configs
2443            .iter()
2444            .map(|(id, cfg)| {
2445                (
2446                    id.clone(),
2447                    cfg.clone(),
2448                    self.options.get(id).cloned().unwrap_or_default(),
2449                    self.auth.get(id).cloned(),
2450                )
2451            })
2452            .collect();
2453        let handler_config = self.handler_config.clone();
2454        let namespace = self.namespace.clone();
2455
2456        let futures = plans.into_iter().map(|(server_id, config, options, auth)| {
2457            let handler_config = handler_config.clone();
2458            let namespace = namespace.clone();
2459            async move {
2460                let (connection, snapshot) =
2461                    Self::connect_and_discover(&config, auth.as_ref(), handler_config, &options)
2462                        .await?;
2463                Ok::<(McpServerId, McpServerHandle), McpError>((
2464                    server_id,
2465                    McpServerHandle {
2466                        config,
2467                        connection,
2468                        snapshot,
2469                        namespace,
2470                    },
2471                ))
2472            }
2473        });
2474
2475        let results = try_join_all(futures).await?;
2476        let mut handles = Vec::with_capacity(results.len());
2477        let mut connected: Vec<(McpServerId, McpDiscoverySnapshot)> =
2478            Vec::with_capacity(results.len());
2479        for (server_id, handle) in results {
2480            connected.push((server_id.clone(), handle.snapshot.clone()));
2481            self.connections.insert(server_id, handle.clone());
2482            handles.push(handle);
2483        }
2484        for (server_id, snapshot) in &connected {
2485            self.register_server_tools(server_id, snapshot);
2486        }
2487        for (server_id, _) in connected {
2488            self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2489        }
2490        Ok(handles)
2491    }
2492
2493    /// Connects all registered servers concurrently and waits for every
2494    /// connection attempt to settle.
2495    ///
2496    /// Unlike [`Self::connect_all`], this method does not fail fast. Every
2497    /// server is attempted in parallel; successful connections are installed
2498    /// into the manager and tool catalog, while each failed connection is
2499    /// returned with its [`McpServerId`] and [`McpError`].
2500    pub async fn connect_all_settled(&mut self) -> McpConnectAllSettled {
2501        let plans: Vec<(
2502            McpServerId,
2503            McpServerConfig,
2504            McpServerOptions,
2505            Option<MetadataMap>,
2506        )> = self
2507            .configs
2508            .iter()
2509            .map(|(id, cfg)| {
2510                (
2511                    id.clone(),
2512                    cfg.clone(),
2513                    self.options.get(id).cloned().unwrap_or_default(),
2514                    self.auth.get(id).cloned(),
2515                )
2516            })
2517            .collect();
2518        let handler_config = self.handler_config.clone();
2519        let namespace = self.namespace.clone();
2520
2521        let futures = plans.into_iter().map(|(server_id, config, options, auth)| {
2522            let handler_config = handler_config.clone();
2523            let namespace = namespace.clone();
2524            async move {
2525                let result = async {
2526                    let (connection, snapshot) = Self::connect_and_discover(
2527                        &config,
2528                        auth.as_ref(),
2529                        handler_config,
2530                        &options,
2531                    )
2532                    .await?;
2533                    Ok::<McpServerHandle, McpError>(McpServerHandle {
2534                        config,
2535                        connection,
2536                        snapshot,
2537                        namespace,
2538                    })
2539                }
2540                .await;
2541                (server_id, result)
2542            }
2543        });
2544
2545        let results = join_all(futures).await;
2546        let mut connected = Vec::new();
2547        let mut failures = Vec::new();
2548        let mut connected_snapshots = Vec::new();
2549
2550        for (server_id, result) in results {
2551            match result {
2552                Ok(handle) => {
2553                    connected_snapshots.push((server_id.clone(), handle.snapshot.clone()));
2554                    self.connections.insert(server_id, handle.clone());
2555                    connected.push(handle);
2556                }
2557                Err(error) => {
2558                    failures.push(McpServerConnectionError { server_id, error });
2559                }
2560            }
2561        }
2562
2563        for (server_id, snapshot) in &connected_snapshots {
2564            self.register_server_tools(server_id, snapshot);
2565        }
2566        for (server_id, _) in connected_snapshots {
2567            self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2568        }
2569
2570        McpConnectAllSettled {
2571            connected,
2572            failed: failures,
2573        }
2574    }
2575
2576    /// Re-discovers capabilities for a connected server.
2577    pub async fn refresh_server(
2578        &mut self,
2579        server_id: &McpServerId,
2580    ) -> Result<McpDiscoverySnapshot, McpError> {
2581        let handle = self
2582            .connections
2583            .get_mut(server_id)
2584            .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2585        let options = self.options.get(server_id).cloned().unwrap_or_default();
2586        let previous = handle.snapshot.clone();
2587        let snapshot = match Self::discover_with_options(&handle.connection, &options).await {
2588            Ok(snapshot) => snapshot,
2589            Err(error) => {
2590                self.emit_catalog_event(McpCatalogEvent::RefreshFailed {
2591                    server_id: server_id.clone(),
2592                    message: error.to_string(),
2593                });
2594                return Err(error);
2595            }
2596        };
2597        handle.snapshot = snapshot.clone();
2598        let events = diff_discovery_snapshots(server_id, &previous, &snapshot);
2599        if !events.is_empty() {
2600            self.apply_catalog_events(server_id, &snapshot, &events);
2601            for event in events {
2602                self.emit_catalog_event(event);
2603            }
2604        }
2605        Ok(snapshot)
2606    }
2607
2608    /// Processes pending server list-change notifications.
2609    pub async fn refresh_changed_catalogs(&mut self) -> Result<Vec<McpCatalogEvent>, McpError> {
2610        let server_ids = self.connections.keys().cloned().collect::<Vec<_>>();
2611        let mut emitted = Vec::new();
2612
2613        for server_id in server_ids {
2614            let Some(connection) = self
2615                .connections
2616                .get(&server_id)
2617                .map(McpServerHandle::connection)
2618            else {
2619                continue;
2620            };
2621            let notifications = connection.drain_notifications().await;
2622            if notifications.is_empty() {
2623                continue;
2624            }
2625
2626            let handle = self
2627                .connections
2628                .get_mut(&server_id)
2629                .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2630            let options = self.options.get(&server_id).cloned().unwrap_or_default();
2631            let previous = handle.snapshot.clone();
2632            let snapshot = match Self::discover_with_options(&handle.connection, &options).await {
2633                Ok(snapshot) => snapshot,
2634                Err(error) => {
2635                    let event = McpCatalogEvent::RefreshFailed {
2636                        server_id: server_id.clone(),
2637                        message: error.to_string(),
2638                    };
2639                    self.emit_catalog_event(event.clone());
2640                    emitted.push(event);
2641                    return Err(error);
2642                }
2643            };
2644            handle.snapshot = snapshot.clone();
2645            let events = diff_discovery_snapshots(&server_id, &previous, &snapshot);
2646            if !events.is_empty() {
2647                self.apply_catalog_events(&server_id, &snapshot, &events);
2648                for event in events {
2649                    self.emit_catalog_event(event.clone());
2650                    emitted.push(event);
2651                }
2652            }
2653        }
2654
2655        Ok(emitted)
2656    }
2657
2658    /// Disconnects a server and removes it from active connections.
2659    pub async fn disconnect_server(&mut self, server_id: &McpServerId) -> Result<(), McpError> {
2660        let Some(handle) = self.connections.remove(server_id) else {
2661            return Err(McpError::UnknownServer(server_id.to_string()));
2662        };
2663        handle.connection.close().await?;
2664        self.unregister_server_tools(server_id);
2665        self.emit_catalog_event(McpCatalogEvent::ServerDisconnected {
2666            server_id: server_id.clone(),
2667        });
2668        Ok(())
2669    }
2670
2671    /// Stores or clears authentication credentials for a server.
2672    pub async fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), McpError> {
2673        let server_id = resolution
2674            .request()
2675            .server_id()
2676            .ok_or_else(|| McpError::AuthResolution("auth resolution missing server id".into()))?;
2677        let server_id = McpServerId::new(server_id);
2678        match &resolution {
2679            AuthResolution::Provided { credentials, .. } => {
2680                self.auth.insert(server_id.clone(), credentials.clone());
2681            }
2682            AuthResolution::Cancelled { .. } => {
2683                self.auth.remove(&server_id);
2684            }
2685        }
2686
2687        if let Some(handle) = self.connections.get(&server_id) {
2688            handle.connection.resolve_auth(resolution).await?;
2689        } else if !self.configs.contains_key(&server_id) {
2690            return Err(McpError::UnknownServer(server_id.to_string()));
2691        }
2692        self.emit_catalog_event(McpCatalogEvent::AuthChanged { server_id });
2693        Ok(())
2694    }
2695
2696    /// Builds a one-shot snapshot [`ToolRegistry`] of every tool across all
2697    /// connected servers. Use [`source`](Self::source) instead when wiring
2698    /// the manager into an [`agentkit_loop::Agent`] so tool catalog changes
2699    /// flow through automatically.
2700    pub fn tool_registry(&self) -> ToolRegistry {
2701        self.connections
2702            .values()
2703            .fold(ToolRegistry::new(), |mut registry, handle| {
2704                for tool in handle.snapshot.tools.iter().cloned() {
2705                    registry.register(McpToolAdapter::with_namespace(
2706                        handle.server_id(),
2707                        handle.connection.clone(),
2708                        tool,
2709                        &self.namespace,
2710                    ));
2711                }
2712                registry
2713            })
2714    }
2715
2716    /// Returns the manager's federated [`CatalogReader`].
2717    ///
2718    /// The manager keeps an internal `CatalogWriter` in sync with every
2719    /// connect, disconnect, and catalog refresh; the returned reader sees
2720    /// the added/removed/changed tool sets via
2721    /// [`ToolSource::drain_catalog_events`]. Pass it to
2722    /// [`agentkit_loop::AgentBuilder::tools`] alongside any frozen native
2723    /// [`ToolRegistry`].
2724    ///
2725    /// Each call returns a fresh reader subscription — events emitted before
2726    /// this call are not replayed. Call once at agent setup time and reuse.
2727    pub fn source(&self) -> CatalogReader {
2728        self.catalog_writer.reader()
2729    }
2730
2731    /// Surgically updates the tool catalog from the diff events produced
2732    /// by [`diff_discovery_snapshots`]. Only [`McpCatalogEvent::ToolsChanged`]
2733    /// affects the catalog — resource and prompt diffs are observed by the
2734    /// caller via the broadcast stream and don't touch tool state.
2735    fn apply_catalog_events(
2736        &mut self,
2737        server_id: &McpServerId,
2738        snapshot: &McpDiscoverySnapshot,
2739        events: &[McpCatalogEvent],
2740    ) {
2741        for event in events {
2742            if let McpCatalogEvent::ToolsChanged {
2743                added,
2744                removed,
2745                changed,
2746                ..
2747            } = event
2748            {
2749                self.apply_server_tool_diff(server_id, snapshot, added, removed, changed);
2750            }
2751        }
2752    }
2753
2754    /// Registers every tool from a freshly-discovered snapshot, recording
2755    /// the agentkit-namespaced names so [`Self::unregister_server_tools`]
2756    /// can later remove exactly this set.
2757    fn register_server_tools(&mut self, server_id: &McpServerId, snapshot: &McpDiscoverySnapshot) {
2758        let connection = match self.connections.get(server_id) {
2759            Some(handle) => handle.connection.clone(),
2760            None => return,
2761        };
2762        let previous = self.server_tools.remove(server_id).unwrap_or_default();
2763        let mut names = BTreeSet::new();
2764        for tool in &snapshot.tools {
2765            let adapter = McpToolAdapter::with_namespace(
2766                server_id,
2767                connection.clone(),
2768                tool.clone(),
2769                &self.namespace,
2770            );
2771            names.insert(adapter.spec().name.clone());
2772            self.catalog_writer.upsert(Arc::new(adapter));
2773        }
2774        for stale in previous.difference(&names) {
2775            self.catalog_writer.remove(stale);
2776        }
2777        self.server_tools.insert(server_id.clone(), names);
2778    }
2779
2780    /// Removes every agentkit-namespaced tool previously registered for
2781    /// `server_id`. No-op if the server was never registered.
2782    fn unregister_server_tools(&mut self, server_id: &McpServerId) {
2783        let Some(names) = self.server_tools.remove(server_id) else {
2784            return;
2785        };
2786        for name in names {
2787            self.catalog_writer.remove(&name);
2788        }
2789    }
2790
2791    /// Applies a per-tool diff (in raw MCP names) against the current
2792    /// catalog: removes are pruned, adds and changes are upserted from the
2793    /// fresh snapshot. Updates the per-server name index accordingly.
2794    fn apply_server_tool_diff(
2795        &mut self,
2796        server_id: &McpServerId,
2797        snapshot: &McpDiscoverySnapshot,
2798        added: &[String],
2799        removed: &[String],
2800        changed: &[String],
2801    ) {
2802        let connection = match self.connections.get(server_id) {
2803            Some(handle) => handle.connection.clone(),
2804            None => return,
2805        };
2806        let names = self.server_tools.entry(server_id.clone()).or_default();
2807
2808        for raw_name in removed {
2809            let agentkit_name = ToolName::new(self.namespace.apply(server_id, raw_name));
2810            if names.remove(&agentkit_name) {
2811                self.catalog_writer.remove(&agentkit_name);
2812            }
2813        }
2814
2815        let upsert_one = |raw_name: &str| -> Option<(ToolName, McpToolAdapter)> {
2816            let tool = snapshot
2817                .tools
2818                .iter()
2819                .find(|tool| tool.name.as_ref() == raw_name)?
2820                .clone();
2821            let adapter = McpToolAdapter::with_namespace(
2822                server_id,
2823                connection.clone(),
2824                tool,
2825                &self.namespace,
2826            );
2827            Some((adapter.spec().name.clone(), adapter))
2828        };
2829
2830        for raw_name in added.iter().chain(changed.iter()) {
2831            if let Some((agentkit_name, adapter)) = upsert_one(raw_name) {
2832                names.insert(agentkit_name);
2833                self.catalog_writer.upsert(Arc::new(adapter));
2834            }
2835        }
2836    }
2837
2838    /// Builds a combined [`McpCapabilityProvider`] from all connected servers.
2839    pub fn capability_provider(&self) -> McpCapabilityProvider {
2840        McpCapabilityProvider::merge(
2841            self.connections
2842                .values()
2843                .map(McpServerHandle::capability_provider),
2844        )
2845    }
2846}
2847
2848fn diff_discovery_snapshots(
2849    server_id: &McpServerId,
2850    previous: &McpDiscoverySnapshot,
2851    current: &McpDiscoverySnapshot,
2852) -> Vec<McpCatalogEvent> {
2853    let mut events = Vec::new();
2854    let (added, removed, changed) = diff_named_items(
2855        previous.tools.iter().map(|item| (item.name.as_ref(), item)),
2856        current.tools.iter().map(|item| (item.name.as_ref(), item)),
2857    );
2858    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2859        events.push(McpCatalogEvent::ToolsChanged {
2860            server_id: server_id.clone(),
2861            added,
2862            removed,
2863            changed,
2864        });
2865    }
2866
2867    let (added, removed, changed) = diff_named_items(
2868        previous
2869            .resources
2870            .iter()
2871            .map(|item| (item.uri.as_str(), item)),
2872        current
2873            .resources
2874            .iter()
2875            .map(|item| (item.uri.as_str(), item)),
2876    );
2877    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2878        events.push(McpCatalogEvent::ResourcesChanged {
2879            server_id: server_id.clone(),
2880            added,
2881            removed,
2882            changed,
2883        });
2884    }
2885
2886    let (added, removed, changed) = diff_named_items(
2887        previous
2888            .prompts
2889            .iter()
2890            .map(|item| (item.name.as_str(), item)),
2891        current
2892            .prompts
2893            .iter()
2894            .map(|item| (item.name.as_str(), item)),
2895    );
2896    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2897        events.push(McpCatalogEvent::PromptsChanged {
2898            server_id: server_id.clone(),
2899            added,
2900            removed,
2901            changed,
2902        });
2903    }
2904
2905    events
2906}
2907
2908/// Merge-walks two name-keyed sequences and produces added/removed/changed
2909/// name lists. Each side is sorted in place; no intermediate maps are
2910/// allocated. Names are cloned only at output time.
2911fn diff_named_items<'a, T>(
2912    previous: impl IntoIterator<Item = (&'a str, &'a T)>,
2913    current: impl IntoIterator<Item = (&'a str, &'a T)>,
2914) -> (Vec<String>, Vec<String>, Vec<String>)
2915where
2916    T: PartialEq + 'a,
2917{
2918    let mut prev: Vec<(&str, &T)> = previous.into_iter().collect();
2919    let mut curr: Vec<(&str, &T)> = current.into_iter().collect();
2920    prev.sort_unstable_by_key(|(name, _)| *name);
2921    curr.sort_unstable_by_key(|(name, _)| *name);
2922
2923    let mut added = Vec::new();
2924    let mut removed = Vec::new();
2925    let mut changed = Vec::new();
2926    let (mut i, mut j) = (0, 0);
2927    while i < prev.len() && j < curr.len() {
2928        match prev[i].0.cmp(curr[j].0) {
2929            std::cmp::Ordering::Less => {
2930                removed.push(prev[i].0.to_string());
2931                i += 1;
2932            }
2933            std::cmp::Ordering::Greater => {
2934                added.push(curr[j].0.to_string());
2935                j += 1;
2936            }
2937            std::cmp::Ordering::Equal => {
2938                if prev[i].1 != curr[j].1 {
2939                    changed.push(curr[j].0.to_string());
2940                }
2941                i += 1;
2942                j += 1;
2943            }
2944        }
2945    }
2946    while i < prev.len() {
2947        removed.push(prev[i].0.to_string());
2948        i += 1;
2949    }
2950    while j < curr.len() {
2951        added.push(curr[j].0.to_string());
2952        j += 1;
2953    }
2954
2955    (added, removed, changed)
2956}
2957
2958/// Adapter exposing an MCP tool as an agentkit [`Tool`].
2959pub struct McpToolAdapter {
2960    tool_name: String,
2961    connection: Arc<McpConnection>,
2962    spec: ToolSpec,
2963}
2964
2965impl McpToolAdapter {
2966    /// Creates a new tool adapter for the given MCP tool, using the
2967    /// [`McpToolNamespace::Default`] naming strategy.
2968    pub fn new(server_id: &McpServerId, connection: Arc<McpConnection>, tool: McpTool) -> Self {
2969        Self::with_namespace(server_id, connection, tool, &McpToolNamespace::Default)
2970    }
2971
2972    /// Creates a new tool adapter with a custom name-namespacing strategy.
2973    pub fn with_namespace(
2974        server_id: &McpServerId,
2975        connection: Arc<McpConnection>,
2976        tool: McpTool,
2977        namespace: &McpToolNamespace,
2978    ) -> Self {
2979        let spec = tool_spec_from_tool(server_id, &tool, namespace);
2980        Self {
2981            tool_name: tool.name.into_owned(),
2982            connection,
2983            spec,
2984        }
2985    }
2986
2987    async fn handle_invocation_error(
2988        &self,
2989        err: McpInvocationError,
2990        input: &Value,
2991    ) -> Result<CallToolResult, ToolError> {
2992        let Some(responder) = self.connection.handler_config().error_responder.clone() else {
2993            return Err(ToolError::ExecutionFailed(err.to_string()));
2994        };
2995        let method = McpMethod::ToolsCall {
2996            name: self.tool_name.clone(),
2997            arguments: input.clone(),
2998        };
2999        let ctx = McpErrorContext {
3000            server_id: self.connection.server_id(),
3001            method: &method,
3002            input: Some(input),
3003        };
3004        match responder.handle(&err, ctx).await {
3005            ErrorResponderOutcome::SynthesizeResult(result) => Ok(result),
3006            ErrorResponderOutcome::PassThrough => Err(ToolError::ExecutionFailed(err.to_string())),
3007        }
3008    }
3009}
3010
3011#[async_trait]
3012impl Tool for McpToolAdapter {
3013    fn spec(&self) -> &ToolSpec {
3014        &self.spec
3015    }
3016
3017    async fn invoke(
3018        &self,
3019        request: ToolRequest,
3020        _ctx: &mut ToolContext<'_>,
3021    ) -> Result<ToolResult, ToolError> {
3022        let input = request.input;
3023        let result = match self
3024            .connection
3025            .call_tool(&self.tool_name, input.clone())
3026            .await
3027        {
3028            Ok(result) => result,
3029            Err(McpError::AuthRequired(auth_request)) => {
3030                let responder = self
3031                    .connection
3032                    .handler_config()
3033                    .auth
3034                    .clone()
3035                    .ok_or_else(|| {
3036                        ToolError::ExecutionFailed(
3037                            "MCP server requires auth but no McpAuthResponder is registered".into(),
3038                        )
3039                    })?;
3040                let resolution = responder.resolve(*auth_request).await.map_err(|error| {
3041                    ToolError::ExecutionFailed(format!("auth responder failed: {error}"))
3042                })?;
3043                match &resolution {
3044                    AuthResolution::Provided { .. } => {
3045                        self.connection
3046                            .resolve_auth(resolution.clone())
3047                            .await
3048                            .map_err(|error| {
3049                                ToolError::ExecutionFailed(format!(
3050                                    "applying auth resolution failed: {error}"
3051                                ))
3052                            })?;
3053                    }
3054                    AuthResolution::Cancelled { .. } => {
3055                        return Err(ToolError::ExecutionFailed(
3056                            "user cancelled MCP auth flow".into(),
3057                        ));
3058                    }
3059                }
3060                match self
3061                    .connection
3062                    .call_tool(&self.tool_name, input.clone())
3063                    .await
3064                {
3065                    Ok(result) => result,
3066                    Err(McpError::AuthRequired(req)) => {
3067                        return Err(ToolError::ExecutionFailed(format!(
3068                            "MCP auth challenge unresolved after retry: {}",
3069                            req.id
3070                        )));
3071                    }
3072                    Err(McpError::Invocation(err)) => {
3073                        self.handle_invocation_error(err, &input).await?
3074                    }
3075                    Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
3076                }
3077            }
3078            Err(McpError::Invocation(err)) => self.handle_invocation_error(err, &input).await?,
3079            Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
3080        };
3081
3082        let is_error = result.is_error.unwrap_or(false);
3083        Ok(ToolResult {
3084            result: ToolResultPart {
3085                call_id: request.call_id,
3086                output: call_tool_result_to_tool_output(result),
3087                is_error,
3088                metadata: MetadataMap::new(),
3089            },
3090            duration: None,
3091            metadata: MetadataMap::new(),
3092        })
3093    }
3094}
3095
3096fn rmcp_server_capabilities_to_agentkit(
3097    capabilities: &rmcp_model::ServerCapabilities,
3098) -> McpServerCapabilities {
3099    McpServerCapabilities {
3100        tools: capabilities.tools.as_ref().map(|tools| ToolsCapability {
3101            list_changed: tools.list_changed,
3102        }),
3103        resources: capabilities
3104            .resources
3105            .as_ref()
3106            .map(|resources| ResourcesCapability {
3107                subscribe: resources.subscribe,
3108                list_changed: resources.list_changed,
3109            }),
3110        prompts: capabilities
3111            .prompts
3112            .as_ref()
3113            .map(|prompts| PromptsCapability {
3114                list_changed: prompts.list_changed,
3115            }),
3116        logging: capabilities.logging.as_ref().map(|_| LoggingCapability {}),
3117    }
3118}
3119
3120fn tool_spec_from_tool(
3121    server_id: &McpServerId,
3122    tool: &McpTool,
3123    namespace: &McpToolNamespace,
3124) -> ToolSpec {
3125    ToolSpec {
3126        name: ToolName::new(namespace.apply(server_id, &tool.name)),
3127        description: tool
3128            .description
3129            .as_ref()
3130            .map(|d| d.to_string())
3131            .unwrap_or_else(|| tool.name.to_string()),
3132        input_schema: Value::Object((*tool.input_schema).clone()),
3133        output_schema: tool
3134            .output_schema
3135            .as_ref()
3136            .map(|schema| Value::Object((**schema).clone())),
3137        annotations: tool_annotations_from_rmcp(tool.annotations.as_ref()),
3138        metadata: MetadataMap::new(),
3139    }
3140}
3141
3142fn tool_annotations_from_rmcp(annotations: Option<&McpToolAnnotations>) -> ToolAnnotations {
3143    let Some(annotations) = annotations else {
3144        return ToolAnnotations::default();
3145    };
3146    // rmcp expresses each hint as `Option<bool>` (advisory; absent means
3147    // unspecified). agentkit collapses absent → false. Tools that need to
3148    // distinguish "absent" from "false" should inspect the underlying
3149    // `McpTool::annotations` directly via the snapshot. MCP has no
3150    // `needs_approval` hint, so leave it unset and let the loop's permission
3151    // policy drive approval.
3152    ToolAnnotations {
3153        read_only_hint: annotations.read_only_hint.unwrap_or(false),
3154        destructive_hint: annotations.destructive_hint.unwrap_or(false),
3155        idempotent_hint: annotations.idempotent_hint.unwrap_or(false),
3156        needs_approval_hint: false,
3157        supports_streaming_hint: false,
3158    }
3159}
3160
3161fn resource_descriptor_from_rmcp(resource: McpResource) -> ResourceDescriptor {
3162    let raw = resource.raw;
3163    ResourceDescriptor {
3164        id: ResourceId::new(raw.uri),
3165        name: raw.name,
3166        description: raw.description,
3167        mime_type: raw.mime_type,
3168        metadata: MetadataMap::new(),
3169    }
3170}
3171
3172fn prompt_descriptor_from_rmcp(prompt: McpPrompt) -> PromptDescriptor {
3173    let arguments = prompt.arguments.unwrap_or_default();
3174    let mut required = Vec::new();
3175    let properties = arguments
3176        .into_iter()
3177        .map(|argument| {
3178            let mut schema = serde_json::Map::new();
3179            schema.insert("type".into(), Value::String("string".into()));
3180            if let Some(description) = argument.description {
3181                schema.insert("description".into(), Value::String(description));
3182            }
3183            if argument.required.unwrap_or(false) {
3184                required.push(Value::String(argument.name.clone()));
3185            }
3186            (argument.name, Value::Object(schema))
3187        })
3188        .collect::<serde_json::Map<String, Value>>();
3189    let mut input_schema = serde_json::Map::new();
3190    input_schema.insert("type".into(), Value::String("object".into()));
3191    input_schema.insert("properties".into(), Value::Object(properties));
3192    if !required.is_empty() {
3193        input_schema.insert("required".into(), Value::Array(required));
3194    }
3195
3196    PromptDescriptor {
3197        id: PromptId::new(prompt.name.clone()),
3198        name: prompt.name,
3199        description: prompt.description,
3200        input_schema: Value::Object(input_schema),
3201        metadata: MetadataMap::new(),
3202    }
3203}
3204
3205fn read_resource_result_to_capabilities(
3206    result: ReadResourceResult,
3207) -> Result<ResourceContents, McpError> {
3208    let content = result
3209        .contents
3210        .into_iter()
3211        .next()
3212        .ok_or_else(|| McpError::Protocol("resources/read returned no contents".into()))?;
3213    Ok(resource_contents_to_capabilities(content))
3214}
3215
3216fn resource_contents_to_capabilities(content: McpResourceContents) -> ResourceContents {
3217    let mut metadata = MetadataMap::new();
3218    let data = match content {
3219        McpResourceContents::TextResourceContents {
3220            text, mime_type, ..
3221        } => {
3222            if let Some(mime) = mime_type {
3223                metadata.insert("mime_type".into(), Value::String(mime));
3224            }
3225            DataRef::InlineText(text)
3226        }
3227        McpResourceContents::BlobResourceContents {
3228            blob,
3229            mime_type,
3230            uri,
3231            ..
3232        } => {
3233            if let Some(mime) = mime_type {
3234                metadata.insert("mime_type".into(), Value::String(mime));
3235            }
3236            metadata.insert("uri".into(), Value::String(uri));
3237            // rmcp delivers blobs as base64-encoded text on the wire.
3238            DataRef::InlineText(blob)
3239        }
3240    };
3241    ResourceContents { data, metadata }
3242}
3243
3244fn get_prompt_result_to_capabilities(result: GetPromptResult) -> PromptContents {
3245    let items = result
3246        .messages
3247        .into_iter()
3248        .map(prompt_message_to_item)
3249        .collect();
3250    let mut metadata = MetadataMap::new();
3251    if let Some(description) = result.description {
3252        metadata.insert("description".into(), Value::String(description));
3253    }
3254    PromptContents { items, metadata }
3255}
3256
3257fn prompt_message_to_item(message: PromptMessage) -> Item {
3258    let kind = match message.role {
3259        PromptMessageRole::Assistant => ItemKind::Assistant,
3260        PromptMessageRole::User => ItemKind::User,
3261    };
3262    Item {
3263        id: None,
3264        kind,
3265        parts: vec![prompt_message_content_to_part(message.content)],
3266        metadata: MetadataMap::new(),
3267        usage: None,
3268        finish_reason: None,
3269        created_at: None,
3270    }
3271}
3272
3273fn prompt_message_content_to_part(content: PromptMessageContent) -> Part {
3274    match content {
3275        PromptMessageContent::Text { text } => Part::Text(TextPart::new(text)),
3276        PromptMessageContent::Image { image } => Part::Media(MediaPart::new(
3277            Modality::Image,
3278            image.mime_type.clone(),
3279            DataRef::InlineText(image.data.clone()),
3280        )),
3281        PromptMessageContent::Resource { resource } => {
3282            let agentkit_resource = resource_contents_to_capabilities(resource.resource.clone());
3283            agentkit_part_from_resource(agentkit_resource)
3284        }
3285        PromptMessageContent::ResourceLink { link } => Part::Text(TextPart::new(link.uri.clone())),
3286    }
3287}
3288
3289fn agentkit_part_from_resource(resource: ResourceContents) -> Part {
3290    let mime = resource
3291        .metadata
3292        .get("mime_type")
3293        .and_then(Value::as_str)
3294        .unwrap_or("text/plain")
3295        .to_string();
3296    Part::Media(MediaPart::new(Modality::Binary, mime, resource.data))
3297}
3298
3299fn call_tool_result_to_tool_output(result: CallToolResult) -> ToolOutput {
3300    if let Some(structured) = result.structured_content {
3301        return ToolOutput::Structured(structured);
3302    }
3303    let parts = call_tool_content_to_parts(result.content);
3304    if parts.iter().all(|part| matches!(part, Part::Text(_))) {
3305        let text = parts
3306            .iter()
3307            .filter_map(|part| match part {
3308                Part::Text(text) => Some(text.text.clone()),
3309                _ => None,
3310            })
3311            .collect::<Vec<_>>()
3312            .join("\n");
3313        ToolOutput::Text(text)
3314    } else {
3315        ToolOutput::Parts(parts)
3316    }
3317}
3318
3319fn call_tool_content_to_parts(contents: Vec<Content>) -> Vec<Part> {
3320    contents.into_iter().map(content_to_part).collect()
3321}
3322
3323fn content_to_part(content: Content) -> Part {
3324    match content.raw {
3325        RawContent::Text(text) => Part::Text(TextPart::new(text.text)),
3326        RawContent::Image(image) => Part::Media(MediaPart::new(
3327            Modality::Image,
3328            image.mime_type,
3329            DataRef::InlineText(image.data),
3330        )),
3331        RawContent::Audio(audio) => Part::Media(MediaPart::new(
3332            Modality::Audio,
3333            audio.mime_type,
3334            DataRef::InlineText(audio.data),
3335        )),
3336        RawContent::Resource(embedded) => {
3337            agentkit_part_from_resource(resource_contents_to_capabilities(embedded.resource))
3338        }
3339        RawContent::ResourceLink(link) => Part::Text(TextPart::new(link.uri)),
3340    }
3341}
3342
3343fn value_to_json_object(value: Value, context: &str) -> Result<rmcp_model::JsonObject, McpError> {
3344    match value {
3345        Value::Object(object) => Ok(object),
3346        Value::Null => Ok(serde_json::Map::new()),
3347        other => Err(McpError::Protocol(format!(
3348            "{context} must be a JSON object, got {other}"
3349        ))),
3350    }
3351}
3352
3353fn bearer_token_from_metadata(metadata: &MetadataMap) -> Option<String> {
3354    ["bearer_token", "access_token", "token", "api_key"]
3355        .into_iter()
3356        .find_map(|key| metadata.get(key).and_then(Value::as_str).map(str::to_owned))
3357}
3358
3359fn rmcp_initialize_error(config: &McpServerConfig, error: ClientInitializeError) -> McpError {
3360    if let Some(signal) = match &error {
3361        ClientInitializeError::TransportError { error: dyn_err, .. } => {
3362            transport_auth_signal(dyn_err)
3363        }
3364        _ => None,
3365    } {
3366        return McpError::AuthRequired(Box::new(auth_request_from_signal(
3367            &config.id,
3368            McpMethod::Initialize,
3369            signal,
3370            &error.to_string(),
3371        )));
3372    }
3373    McpError::Transport(error.to_string())
3374}
3375
3376fn rmcp_service_error(error: ServiceError) -> McpError {
3377    service_error_to_mcp_error(error)
3378}
3379
3380fn rmcp_operation_error(
3381    server_id: &McpServerId,
3382    method: McpMethod,
3383    error: ServiceError,
3384) -> McpError {
3385    if let Some(signal) = service_auth_signal(&error) {
3386        return McpError::AuthRequired(Box::new(auth_request_from_signal(
3387            server_id,
3388            method,
3389            signal,
3390            &error.to_string(),
3391        )));
3392    }
3393    service_error_to_mcp_error(error)
3394}
3395
3396fn service_error_to_mcp_error(error: ServiceError) -> McpError {
3397    match error {
3398        ServiceError::McpError(data) => {
3399            McpError::Invocation(McpInvocationError::from_error_data(data))
3400        }
3401        other => McpError::Transport(other.to_string()),
3402    }
3403}
3404
3405#[derive(Debug)]
3406enum AuthSignal {
3407    Required {
3408        www_authenticate: Option<String>,
3409    },
3410    InsufficientScope {
3411        www_authenticate: Option<String>,
3412        required_scope: Option<String>,
3413    },
3414}
3415
3416fn service_auth_signal(error: &ServiceError) -> Option<AuthSignal> {
3417    match error {
3418        ServiceError::TransportSend(dyn_err) => transport_auth_signal(dyn_err),
3419        _ => None,
3420    }
3421}
3422
3423fn transport_auth_signal(error: &DynamicTransportError) -> Option<AuthSignal> {
3424    let inner = error
3425        .error
3426        .downcast_ref::<StreamableHttpError<reqwest::Error>>()?;
3427    match inner {
3428        StreamableHttpError::AuthRequired(AuthRequiredError {
3429            www_authenticate_header,
3430            ..
3431        }) => Some(AuthSignal::Required {
3432            www_authenticate: Some(www_authenticate_header.clone()),
3433        }),
3434        StreamableHttpError::InsufficientScope(InsufficientScopeError {
3435            www_authenticate_header,
3436            required_scope,
3437            ..
3438        }) => Some(AuthSignal::InsufficientScope {
3439            www_authenticate: Some(www_authenticate_header.clone()),
3440            required_scope: required_scope.clone(),
3441        }),
3442        _ => None,
3443    }
3444}
3445
3446fn auth_request_from_signal(
3447    server_id: &McpServerId,
3448    method: McpMethod,
3449    signal: AuthSignal,
3450    message: &str,
3451) -> AuthRequest {
3452    let method_name = method.method_name();
3453    let mut challenge = MetadataMap::new();
3454    challenge.insert("server_id".into(), Value::String(server_id.to_string()));
3455    challenge.insert("method".into(), Value::String(method_name.into()));
3456    challenge.insert("message".into(), Value::String(message.into()));
3457    challenge.insert("flow_kind".into(), Value::String("http_bearer".into()));
3458    match signal {
3459        AuthSignal::Required { www_authenticate } => {
3460            if let Some(header) = www_authenticate {
3461                challenge.insert("www_authenticate".into(), Value::String(header));
3462            }
3463        }
3464        AuthSignal::InsufficientScope {
3465            www_authenticate,
3466            required_scope,
3467        } => {
3468            challenge.insert("insufficient_scope".into(), Value::Bool(true));
3469            if let Some(header) = www_authenticate {
3470                challenge.insert("www_authenticate".into(), Value::String(header));
3471            }
3472            if let Some(scope) = required_scope {
3473                challenge.insert("required_scope".into(), Value::String(scope));
3474            }
3475        }
3476    }
3477    AuthRequest {
3478        id: format!("mcp:{}:{}", server_id, method_name),
3479        provider: format!("mcp.{}", server_id),
3480        operation: method.into_auth_operation(server_id),
3481        challenge,
3482    }
3483}
3484
3485/// Typed dispatch for MCP requests that may surface auth or invocation
3486/// errors. Each peer call constructs the matching variant;
3487/// [`auth_request_from_signal`] converts to a public [`AuthOperation`]
3488/// (typed for the four common cases, [`AuthOperation::McpOther`] for the
3489/// long tail). The same value is also exposed to [`McpErrorResponder`]
3490/// implementations via [`McpErrorContext::method`].
3491#[derive(Debug, Clone)]
3492pub enum McpMethod {
3493    /// `initialize` — the MCP handshake.
3494    Initialize,
3495    /// `tools/call`.
3496    ToolsCall {
3497        /// The raw MCP tool name (no agentkit namespacing).
3498        name: String,
3499        /// The arguments object as sent to the server.
3500        arguments: Value,
3501    },
3502    /// `resources/read`.
3503    ResourcesRead {
3504        /// Resource URI being read.
3505        uri: String,
3506    },
3507    /// `resources/subscribe`.
3508    ResourcesSubscribe {
3509        /// Resource URI being subscribed to.
3510        uri: String,
3511    },
3512    /// `resources/unsubscribe`.
3513    ResourcesUnsubscribe {
3514        /// Resource URI being unsubscribed from.
3515        uri: String,
3516    },
3517    /// `prompts/get`.
3518    PromptsGet {
3519        /// The raw MCP prompt name.
3520        name: String,
3521        /// Arguments forwarded to the prompt.
3522        arguments: Value,
3523    },
3524    /// `logging/setLevel`.
3525    LoggingSetLevel {
3526        /// Negotiated minimum log severity, formatted for diagnostics.
3527        level: String,
3528    },
3529}
3530
3531impl McpMethod {
3532    /// Returns the JSON-RPC method name (e.g. `"tools/call"`).
3533    pub fn method_name(&self) -> &'static str {
3534        match self {
3535            Self::Initialize => "initialize",
3536            Self::ToolsCall { .. } => "tools/call",
3537            Self::ResourcesRead { .. } => "resources/read",
3538            Self::ResourcesSubscribe { .. } => "resources/subscribe",
3539            Self::ResourcesUnsubscribe { .. } => "resources/unsubscribe",
3540            Self::PromptsGet { .. } => "prompts/get",
3541            Self::LoggingSetLevel { .. } => "logging/setLevel",
3542        }
3543    }
3544
3545    fn into_auth_operation(self, server_id: &McpServerId) -> AuthOperation {
3546        let server = server_id.to_string();
3547        match self {
3548            Self::Initialize => AuthOperation::McpConnect {
3549                server_id: server,
3550                metadata: MetadataMap::new(),
3551            },
3552            Self::ToolsCall { name, arguments } => AuthOperation::McpToolCall {
3553                server_id: server,
3554                tool_name: name,
3555                input: arguments,
3556                metadata: MetadataMap::new(),
3557            },
3558            Self::ResourcesRead { uri } => AuthOperation::McpResourceRead {
3559                server_id: server,
3560                resource_id: uri,
3561                metadata: MetadataMap::new(),
3562            },
3563            Self::PromptsGet { name, arguments } => AuthOperation::McpPromptGet {
3564                server_id: server,
3565                prompt_id: name,
3566                args: arguments,
3567                metadata: MetadataMap::new(),
3568            },
3569            other @ (Self::ResourcesSubscribe { .. }
3570            | Self::ResourcesUnsubscribe { .. }
3571            | Self::LoggingSetLevel { .. }) => {
3572                let method = other.method_name().to_string();
3573                AuthOperation::McpOther {
3574                    server_id: server,
3575                    method,
3576                    params: other.into_params_json(),
3577                    metadata: MetadataMap::new(),
3578                }
3579            }
3580        }
3581    }
3582
3583    fn into_params_json(self) -> Value {
3584        match self {
3585            Self::Initialize => json!({}),
3586            Self::ToolsCall { name, arguments } => json!({ "name": name, "arguments": arguments }),
3587            Self::ResourcesRead { uri } => json!({ "uri": uri }),
3588            Self::ResourcesSubscribe { uri } => json!({ "uri": uri }),
3589            Self::ResourcesUnsubscribe { uri } => json!({ "uri": uri }),
3590            Self::PromptsGet { name, arguments } => {
3591                json!({ "name": name, "arguments": arguments })
3592            }
3593            Self::LoggingSetLevel { level } => json!({ "level": level }),
3594        }
3595    }
3596}
3597
3598/// Errors produced by MCP transport, protocol, and lifecycle operations.
3599#[derive(Debug, Error)]
3600pub enum McpError {
3601    /// An underlying I/O error.
3602    #[error("io error: {0}")]
3603    Io(#[from] std::io::Error),
3604    /// A JSON serialization or deserialization error.
3605    #[error("serialization error: {0}")]
3606    Serialize(#[from] serde_json::Error),
3607    /// A transport-level error.
3608    #[error("transport error: {0}")]
3609    Transport(String),
3610    /// A manager lifecycle operation exceeded its configured timeout.
3611    #[error("{operation} timed out after {duration:?}")]
3612    Timeout {
3613        /// Operation that timed out.
3614        operation: &'static str,
3615        /// Configured timeout duration.
3616        duration: Duration,
3617    },
3618    /// An MCP protocol violation.
3619    #[error("protocol error: {0}")]
3620    Protocol(String),
3621    /// The server requires authentication before the operation can proceed.
3622    #[error("MCP auth required: {0:?}")]
3623    AuthRequired(Box<AuthRequest>),
3624    /// An error occurred while resolving or replaying authentication.
3625    #[error("auth resolution error: {0}")]
3626    AuthResolution(String),
3627    /// The MCP server returned a JSON-RPC error for the invoked method.
3628    #[error("invocation error: {0}")]
3629    Invocation(McpInvocationError),
3630    /// The referenced server ID is not registered in the [`McpServerManager`].
3631    #[error("unknown MCP server: {0}")]
3632    UnknownServer(String),
3633}
3634
3635impl From<&str> for McpServerId {
3636    fn from(value: &str) -> Self {
3637        Self::new(value)
3638    }
3639}
3640
3641impl From<String> for McpServerId {
3642    fn from(value: String) -> Self {
3643        Self::new(value)
3644    }
3645}