Skip to main content

agentkit_mcp/
lib.rs

1//! Model Context Protocol integration for agentkit, built on top of [`rmcp`].
2//!
3//! This crate exposes:
4//!
5//! - [`McpServerConfig`] / [`McpTransportBinding`] / [`StdioTransportConfig`] /
6//!   [`StreamableHttpTransportConfig`] — declarative transport configuration.
7//! - [`McpConnection`] — a live, single-server connection wrapping
8//!   [`rmcp::service::RunningService`].
9//! - [`McpServerManager`] — multi-server lifecycle, discovery, catalog diffing,
10//!   and auth replay.
11//! - [`McpServerHandle`], [`McpToolExecutor`], [`McpToolAdapter`],
12//!   [`McpResourceHandle`], [`McpPromptHandle`],
13//!   [`McpCapabilityProvider`] — bridges into the agentkit `Tool` / capabilities
14//!   systems.
15//!
16//! Wire-protocol types (`CallToolResult`, `ReadResourceResult`, `Content`,
17//! `ToolAnnotations`, `Prompt`, sampling/elicitation/roots payloads, …) are
18//! re-exported from [`rmcp::model`] directly — there is no parallel
19//! agentkit-side vocabulary. As `rmcp` tracks new MCP spec revisions, those
20//! types and their fields propagate into agentkit unchanged.
21
22use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::fmt;
24use std::sync::{Arc, RwLock};
25
26use agentkit_capabilities::{
27    CapabilityContext, CapabilityError, CapabilityProvider, Invocable, PromptContents,
28    PromptDescriptor, PromptId, PromptProvider, ResourceContents, ResourceDescriptor, ResourceId,
29    ResourceProvider,
30};
31use agentkit_core::{
32    DataRef, Item, ItemKind, MediaPart, MetadataMap, Modality, Part, TextPart, ToolOutput,
33    ToolResultPart,
34};
35use agentkit_tools_core::{
36    AllowAllPermissions, CatalogReader, CatalogWriter, PermissionChecker, Tool, ToolAnnotations,
37    ToolCapabilityProvider, ToolContext, ToolError, ToolName, ToolRegistry, ToolRequest,
38    ToolResult, ToolSpec, dynamic_catalog,
39};
40use async_trait::async_trait;
41use futures_util::future::try_join_all;
42use futures_util::stream::BoxStream;
43use http::{HeaderName, HeaderValue};
44use rmcp::ServiceExt;
45use rmcp::handler::client::ClientHandler;
46use rmcp::model as rmcp_model;
47use rmcp::service::{ClientInitializeError, Peer, RoleClient, RunningService, ServiceError};
48use rmcp::transport::streamable_http_client::{
49    AuthRequiredError, InsufficientScopeError, StreamableHttpClient as RmcpStreamableHttpClient,
50    StreamableHttpClientTransportConfig as RmcpStreamableHttpClientTransportConfig,
51    StreamableHttpError, StreamableHttpPostResponse,
52};
53use rmcp::transport::{
54    ConfigureCommandExt, DynamicTransportError, StreamableHttpClientTransport, TokioChildProcess,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::{Value, json};
58use sse_stream::{Error as SseError, Sse};
59use thiserror::Error;
60use tokio::sync::{Mutex, broadcast, mpsc};
61
62/// Re-exports of the rmcp wire-protocol types this crate now surfaces directly
63/// instead of wrapping. Pull these in to pattern-match on tool annotations,
64/// content blocks, structured tool output, embedded resources, sampling /
65/// elicitation requests, progress and log notifications, etc.
66pub use rmcp::model::{
67    Annotations as McpAnnotations, AudioContent, CallToolResult,
68    CancelledNotificationParam as McpCancelledNotificationParam,
69    ClientCapabilities as McpClientCapabilities, Content,
70    CreateElicitationRequestParams as McpCreateElicitationRequestParams,
71    CreateElicitationResult as McpCreateElicitationResult,
72    CreateMessageRequestParams as McpCreateMessageRequestParams,
73    CreateMessageResult as McpCreateMessageResult, ElicitationAction as McpElicitationAction,
74    ElicitationCapability as McpElicitationCapability, EmbeddedResource,
75    FormElicitationCapability as McpFormElicitationCapability, GetPromptResult, ImageContent,
76    Implementation as McpImplementation, ListRootsResult as McpListRootsResult,
77    LoggingLevel as McpLoggingLevel,
78    LoggingMessageNotificationParam as McpLoggingMessageNotificationParam,
79    ProgressNotificationParam as McpProgressNotificationParam, Prompt as McpPrompt, PromptArgument,
80    PromptMessage, PromptMessageContent, PromptMessageRole, RawAudioContent, RawContent,
81    RawEmbeddedResource, RawImageContent, RawResource as McpRawResource, RawTextContent,
82    ReadResourceResult, Resource as McpResource, ResourceContents as McpResourceContents,
83    ResourceUpdatedNotificationParam as McpResourceUpdatedNotificationParam, Root as McpRoot,
84    RootsCapabilities as McpRootsCapabilities, SamplingCapability as McpSamplingCapability,
85    SamplingMessage as McpSamplingMessage, SetLevelRequestParams as McpSetLevelRequestParams,
86    TextContent, Tool as McpTool, ToolAnnotations as McpToolAnnotations,
87    UrlElicitationCapability as McpUrlElicitationCapability,
88};
89
90/// Re-export of the JSON-RPC client→server envelope handed to
91/// [`McpHttpClient::post_message`].
92pub use rmcp::model::ClientJsonRpcMessage;
93
94/// Re-exports of the rmcp Streamable HTTP transport types used by
95/// [`McpHttpClient`] implementations.
96pub use rmcp::transport::streamable_http_client::{
97    StreamableHttpError as McpStreamableHttpError,
98    StreamableHttpPostResponse as McpStreamableHttpPostResponse,
99};
100
101/// Re-export of the SSE event/error types referenced by [`McpHttpClient::get_stream`].
102pub use sse_stream::{Error as McpSseError, Sse as McpSse};
103
104/// Alias for [`McpTool`].
105pub type McpToolDescriptor = McpTool;
106/// Alias for [`McpResource`].
107pub type McpResourceDescriptor = McpResource;
108/// Alias for [`McpPrompt`].
109pub type McpPromptDescriptor = McpPrompt;
110
111/// An auth challenge raised by an MCP server during a tool call, resource
112/// read, prompt fetch, or connection handshake.
113///
114/// Hosts handle these via an [`McpAuthResponder`] registered on
115/// [`McpHandlerConfig::with_auth_responder`]. The responder is invoked
116/// inline by [`McpToolAdapter::invoke`] (and equivalent paths in
117/// [`McpServerManager`]) — auth never crosses the executor boundary as a
118/// loop interrupt.
119#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
120pub struct AuthRequest {
121    /// Unique identifier for this auth challenge.
122    pub id: String,
123    /// Name of the authentication provider (e.g. `"github"`, `"google"`).
124    pub provider: String,
125    /// The MCP operation that triggered the auth requirement.
126    pub operation: AuthOperation,
127    /// Provider-specific challenge data (e.g. OAuth URLs, scopes).
128    pub challenge: MetadataMap,
129}
130
131impl AuthRequest {
132    /// Convenience: returns the MCP server id this challenge targets, if any.
133    pub fn server_id(&self) -> Option<&str> {
134        self.operation.server_id()
135    }
136}
137
138/// The MCP operation that triggered an [`AuthRequest`].
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140pub enum AuthOperation {
141    /// Connecting to an MCP server.
142    McpConnect {
143        server_id: String,
144        metadata: MetadataMap,
145    },
146    /// Invoking a tool on an MCP server.
147    McpToolCall {
148        server_id: String,
149        tool_name: String,
150        input: Value,
151        metadata: MetadataMap,
152    },
153    /// Reading a resource from an MCP server.
154    McpResourceRead {
155        server_id: String,
156        resource_id: String,
157        metadata: MetadataMap,
158    },
159    /// Fetching a prompt from an MCP server.
160    McpPromptGet {
161        server_id: String,
162        prompt_id: String,
163        args: Value,
164        metadata: MetadataMap,
165    },
166    /// Any other MCP method that requires auth (resource subscribe/unsubscribe,
167    /// logging level changes, future protocol additions). The typed variants
168    /// above cover the common cases; this catch-all preserves the method name
169    /// and JSON params verbatim for hosts that need to render or log them.
170    McpOther {
171        server_id: String,
172        method: String,
173        params: Value,
174        metadata: MetadataMap,
175    },
176}
177
178impl AuthOperation {
179    /// Returns the MCP server ID this operation targets.
180    pub fn server_id(&self) -> Option<&str> {
181        match self {
182            Self::McpConnect { server_id, .. }
183            | Self::McpToolCall { server_id, .. }
184            | Self::McpResourceRead { server_id, .. }
185            | Self::McpPromptGet { server_id, .. }
186            | Self::McpOther { server_id, .. } => Some(server_id.as_str()),
187        }
188    }
189}
190
191/// Outcome of an [`AuthRequest`] after the host's [`McpAuthResponder`] runs.
192#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum AuthResolution {
194    /// The host obtained credentials.
195    Provided {
196        request: AuthRequest,
197        credentials: MetadataMap,
198    },
199    /// The host cancelled the auth flow.
200    Cancelled { request: AuthRequest },
201}
202
203impl AuthResolution {
204    /// Builds a successful auth resolution.
205    pub fn provided(request: AuthRequest, credentials: MetadataMap) -> Self {
206        Self::Provided {
207            request,
208            credentials,
209        }
210    }
211
212    /// Builds a cancelled auth resolution.
213    pub fn cancelled(request: AuthRequest) -> Self {
214        Self::Cancelled { request }
215    }
216
217    /// Returns the underlying [`AuthRequest`] regardless of variant.
218    pub fn request(&self) -> &AuthRequest {
219        match self {
220            Self::Provided { request, .. } | Self::Cancelled { request } => request,
221        }
222    }
223}
224
225/// Host-supplied resolver for MCP auth challenges.
226///
227/// Install one via [`McpHandlerConfig::with_auth_responder`]. When an MCP
228/// server returns an auth challenge during a tool call, resource read, or
229/// prompt fetch, the adapter invokes [`McpAuthResponder::resolve`] inline,
230/// applies the resulting credentials to the [`McpConnection`], and retries
231/// the original operation. Auth never surfaces as a loop interrupt.
232///
233/// Hosts that want to interleave the auth UI with the loop's main thread
234/// implement a thin channel-bridging responder (responder sends the
235/// challenge to the UI thread on a `mpsc::Sender`, awaits a `oneshot`
236/// reply with the resolution).
237#[async_trait]
238pub trait McpAuthResponder: Send + Sync + 'static {
239    async fn resolve(&self, request: AuthRequest) -> Result<AuthResolution, McpError>;
240}
241
242/// Unique identifier for a registered MCP server.
243///
244/// Each MCP server in a [`McpServerManager`] is addressed by its `McpServerId`.
245#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
246pub struct McpServerId(pub String);
247
248impl McpServerId {
249    /// Creates a new server identifier from any string-like value.
250    pub fn new(value: impl Into<String>) -> Self {
251        Self(value.into())
252    }
253}
254
255impl fmt::Display for McpServerId {
256    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257        self.0.fmt(f)
258    }
259}
260
261/// Configuration for an MCP server that communicates over standard I/O.
262///
263/// The specified command is spawned as a child process; rmcp drives the
264/// JSON-RPC framing over its stdin/stdout.
265#[derive(Clone, Debug, PartialEq, Eq)]
266pub struct StdioTransportConfig {
267    /// The executable to launch (e.g. `"npx"`, `"python"`, `"node"`).
268    pub command: String,
269    /// Command-line arguments passed to the executable.
270    pub args: Vec<String>,
271    /// Additional environment variables set for the child process.
272    pub env: Vec<(String, String)>,
273    /// Optional working directory for the child process.
274    pub cwd: Option<std::path::PathBuf>,
275}
276
277impl StdioTransportConfig {
278    /// Creates a new stdio transport configuration for the given command.
279    pub fn new(command: impl Into<String>) -> Self {
280        Self {
281            command: command.into(),
282            args: Vec::new(),
283            env: Vec::new(),
284            cwd: None,
285        }
286    }
287
288    /// Appends a command-line argument. Returns `self` for chaining.
289    pub fn with_arg(mut self, arg: impl Into<String>) -> Self {
290        self.args.push(arg.into());
291        self
292    }
293
294    /// Adds an environment variable for the child process. Returns `self` for chaining.
295    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
296        self.env.push((key.into(), value.into()));
297        self
298    }
299
300    /// Sets the working directory for the child process. Returns `self` for chaining.
301    pub fn with_cwd(mut self, cwd: impl Into<std::path::PathBuf>) -> Self {
302        self.cwd = Some(cwd.into());
303        self
304    }
305}
306
307/// Configuration for an MCP server that communicates over the MCP Streamable HTTP transport.
308#[derive(Clone, Default)]
309pub struct StreamableHttpTransportConfig {
310    /// The MCP endpoint URL to connect to.
311    pub url: String,
312    /// Static bearer token sent as an HTTP `Authorization: Bearer ...` header.
313    ///
314    /// Ignored when [`Self::http_client`] is set, since the custom client owns
315    /// authorization for every request.
316    pub bearer_token: Option<String>,
317    /// Static custom HTTP headers sent with every Streamable HTTP request.
318    ///
319    /// Ignored when [`Self::http_client`] is set.
320    pub headers: Vec<(HeaderName, HeaderValue)>,
321    /// Optional caller-supplied HTTP client.
322    ///
323    /// When `Some`, agentkit-mcp routes every Streamable HTTP request through
324    /// the provided implementation instead of rmcp's default reqwest client.
325    /// This is the seam to inject dynamic bearers, request signing, retry
326    /// middleware, custom TLS, and so on. See [`McpHttpClient`].
327    pub http_client: Option<Arc<dyn McpHttpClient>>,
328}
329
330impl fmt::Debug for StreamableHttpTransportConfig {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        f.debug_struct("StreamableHttpTransportConfig")
333            .field("url", &self.url)
334            .field(
335                "bearer_token",
336                &self.bearer_token.as_deref().map(|_| "<redacted>"),
337            )
338            .field("headers", &self.headers)
339            .field(
340                "http_client",
341                &self.http_client.as_ref().map(|_| "<custom>"),
342            )
343            .finish()
344    }
345}
346
347impl StreamableHttpTransportConfig {
348    /// Creates a new Streamable HTTP transport configuration for the given MCP endpoint.
349    pub fn new(url: impl Into<String>) -> Self {
350        Self {
351            url: url.into(),
352            bearer_token: None,
353            headers: Vec::new(),
354            http_client: None,
355        }
356    }
357
358    /// Sets a static bearer token for Streamable HTTP authorization.
359    ///
360    /// Ignored when a custom [`McpHttpClient`] is installed via
361    /// [`Self::with_http_client`].
362    pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
363        self.bearer_token = Some(token.into());
364        self
365    }
366
367    /// Installs a caller-supplied HTTP client for every Streamable HTTP
368    /// request issued by this transport.
369    ///
370    /// This is the only seam capable of producing per-request dynamic state
371    /// (rotating bearers, request signing, distributed-tracing headers).
372    /// rmcp's default reqwest path is bypassed entirely when this is set, so
373    /// implementations are responsible for forwarding `auth_header` /
374    /// `custom_headers` if they want the static config to keep applying.
375    pub fn with_http_client(mut self, client: Arc<dyn McpHttpClient>) -> Self {
376        self.http_client = Some(client);
377        self
378    }
379
380    /// Adds a static HTTP header for every Streamable HTTP request.
381    ///
382    /// Reserved MCP session and protocol headers are still managed by RMCP.
383    /// Ignored when a custom [`McpHttpClient`] is installed.
384    pub fn with_header<N, V>(mut self, name: N, value: V) -> Result<Self, McpError>
385    where
386        N: TryInto<HeaderName>,
387        N::Error: fmt::Display,
388        V: TryInto<HeaderValue>,
389        V::Error: fmt::Display,
390    {
391        let name = name
392            .try_into()
393            .map_err(|error| McpError::Transport(format!("invalid HTTP header name: {error}")))?;
394        let value = value
395            .try_into()
396            .map_err(|error| McpError::Transport(format!("invalid HTTP header value: {error}")))?;
397        self.headers.push((name, value));
398        Ok(self)
399    }
400}
401
402/// Type alias for the SSE stream returned by [`McpHttpClient::get_stream`].
403pub type McpSseStream = BoxStream<'static, Result<Sse, SseError>>;
404
405/// Pluggable HTTP transport for the MCP Streamable HTTP client.
406///
407/// Mirrors [`rmcp::transport::streamable_http_client::StreamableHttpClient`]
408/// but is dyn-compatible (boxed via `async_trait`) so the configuration can
409/// store an `Arc<dyn McpHttpClient>` without genericizing every type that
410/// flows through [`McpServerConfig`] / [`McpTransportBinding`].
411///
412/// The associated error type is fixed to [`reqwest::Error`] so that
413/// agentkit-mcp's auth-challenge detection (which downcasts to
414/// [`StreamableHttpError<reqwest::Error>`]) keeps working — implementations
415/// that wrap a non-reqwest backend should map their failures into a
416/// `reqwest::Error` before returning.
417///
418/// All three methods are invoked by rmcp's worker on every protocol op.
419/// `auth_header` and `custom_headers` carry the values resolved from
420/// [`StreamableHttpTransportConfig`] at connection time; implementations are
421/// free to ignore them and inject their own per-call values (e.g. a fresh
422/// bearer pulled from a runtime registry).
423#[async_trait]
424pub trait McpHttpClient: Send + Sync + 'static {
425    /// POSTs a single client→server JSON-RPC message. The response carries
426    /// either a JSON body or an SSE stream depending on what the server
427    /// negotiates.
428    async fn post_message(
429        &self,
430        uri: Arc<str>,
431        message: ClientJsonRpcMessage,
432        session_id: Option<Arc<str>>,
433        auth_header: Option<String>,
434        custom_headers: HashMap<HeaderName, HeaderValue>,
435    ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>>;
436
437    /// Tears down a server-issued session (HTTP DELETE).
438    async fn delete_session(
439        &self,
440        uri: Arc<str>,
441        session_id: Arc<str>,
442        auth_header: Option<String>,
443        custom_headers: HashMap<HeaderName, HeaderValue>,
444    ) -> Result<(), StreamableHttpError<reqwest::Error>>;
445
446    /// Opens a server→client SSE stream (HTTP GET) for push notifications and
447    /// reconnect resumes.
448    async fn get_stream(
449        &self,
450        uri: Arc<str>,
451        session_id: Arc<str>,
452        last_event_id: Option<String>,
453        auth_header: Option<String>,
454        custom_headers: HashMap<HeaderName, HeaderValue>,
455    ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>>;
456}
457
458/// Internal newtype that adapts an `Arc<dyn McpHttpClient>` to rmcp's
459/// generic, non-dyn-compatible [`RmcpStreamableHttpClient`] trait.
460#[derive(Clone)]
461struct DynHttpClient(Arc<dyn McpHttpClient>);
462
463impl RmcpStreamableHttpClient for DynHttpClient {
464    type Error = reqwest::Error;
465
466    async fn post_message(
467        &self,
468        uri: Arc<str>,
469        message: ClientJsonRpcMessage,
470        session_id: Option<Arc<str>>,
471        auth_header: Option<String>,
472        custom_headers: HashMap<HeaderName, HeaderValue>,
473    ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>> {
474        self.0
475            .post_message(uri, message, session_id, auth_header, custom_headers)
476            .await
477    }
478
479    async fn delete_session(
480        &self,
481        uri: Arc<str>,
482        session_id: Arc<str>,
483        auth_header: Option<String>,
484        custom_headers: HashMap<HeaderName, HeaderValue>,
485    ) -> Result<(), StreamableHttpError<reqwest::Error>> {
486        self.0
487            .delete_session(uri, session_id, auth_header, custom_headers)
488            .await
489    }
490
491    async fn get_stream(
492        &self,
493        uri: Arc<str>,
494        session_id: Arc<str>,
495        last_event_id: Option<String>,
496        auth_header: Option<String>,
497        custom_headers: HashMap<HeaderName, HeaderValue>,
498    ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>> {
499        self.0
500            .get_stream(uri, session_id, last_event_id, auth_header, custom_headers)
501            .await
502    }
503}
504
505/// Selects which transport an MCP server should use.
506#[derive(Clone, Debug)]
507pub enum McpTransportBinding {
508    /// Communicate over the child process's stdin/stdout.
509    Stdio(StdioTransportConfig),
510    /// Communicate over the MCP Streamable HTTP transport.
511    StreamableHttp(StreamableHttpTransportConfig),
512}
513
514/// Full configuration for a single MCP server.
515#[derive(Clone, Debug)]
516pub struct McpServerConfig {
517    /// Unique identifier for this server.
518    pub id: McpServerId,
519    /// Transport binding that determines how communication happens.
520    pub transport: McpTransportBinding,
521    /// Arbitrary metadata attached to this server configuration.
522    pub metadata: MetadataMap,
523}
524
525impl McpServerConfig {
526    /// Creates a new server configuration with the given identifier and transport.
527    pub fn new(id: impl Into<String>, transport: McpTransportBinding) -> Self {
528        Self {
529            id: McpServerId::new(id),
530            transport,
531            metadata: MetadataMap::new(),
532        }
533    }
534
535    /// Creates a stdio-backed server configuration.
536    pub fn stdio(id: impl Into<String>, command: impl Into<String>) -> Self {
537        Self::new(
538            id,
539            McpTransportBinding::Stdio(StdioTransportConfig::new(command)),
540        )
541    }
542
543    /// Creates a Streamable HTTP-backed server configuration.
544    pub fn streamable_http(id: impl Into<String>, url: impl Into<String>) -> Self {
545        Self::new(
546            id,
547            McpTransportBinding::StreamableHttp(StreamableHttpTransportConfig::new(url)),
548        )
549    }
550
551    /// Replaces the configuration metadata.
552    pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
553        self.metadata = metadata;
554        self
555    }
556}
557
558type CustomNamespace = Arc<dyn Fn(&McpServerId, &str) -> String + Send + Sync>;
559
560/// Strategy used to derive the agentkit-side tool name for an MCP tool.
561///
562/// The default (`Default`) preserves agentkit's historical
563/// `mcp_<server>_<tool>` shape so that names satisfy provider validators
564/// that only allow `[a-zA-Z0-9_-]` (e.g. Anthropic on Vertex). Use
565/// [`McpToolNamespace::None`] when the calling provider already namespaces
566/// remote tools, or [`McpToolNamespace::Custom`] for a bespoke scheme.
567#[derive(Clone, Default)]
568pub enum McpToolNamespace {
569    /// Format names as `mcp_<server>_<tool>`.
570    #[default]
571    Default,
572    /// Use the raw MCP tool name with no prefix at all.
573    None,
574    /// Apply a caller-supplied function for full control.
575    Custom(CustomNamespace),
576}
577
578impl fmt::Debug for McpToolNamespace {
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580        match self {
581            Self::Default => f.write_str("McpToolNamespace::Default"),
582            Self::None => f.write_str("McpToolNamespace::None"),
583            Self::Custom(_) => f.write_str("McpToolNamespace::Custom(<fn>)"),
584        }
585    }
586}
587
588impl McpToolNamespace {
589    /// Builds a custom namespace from a closure.
590    pub fn custom(f: impl Fn(&McpServerId, &str) -> String + Send + Sync + 'static) -> Self {
591        Self::Custom(Arc::new(f))
592    }
593
594    /// Applies the namespace strategy to produce the agentkit tool name.
595    pub fn apply(&self, server_id: &McpServerId, tool_name: &str) -> String {
596        match self {
597            Self::Default => format!("mcp_{server_id}_{tool_name}"),
598            Self::None => tool_name.to_string(),
599            Self::Custom(f) => f(server_id, tool_name),
600        }
601    }
602
603    /// Recovers the raw MCP tool name from an agentkit-side name. Returns
604    /// `None` for [`Self::Custom`] (no general inverse) or when the name
605    /// doesn't match the expected shape.
606    pub fn unapply(&self, server_id: &McpServerId, agentkit_name: &str) -> Option<String> {
607        match self {
608            Self::Default => agentkit_name
609                .strip_prefix(&format!("mcp_{server_id}_"))
610                .map(str::to_string),
611            Self::None => Some(agentkit_name.to_string()),
612            Self::Custom(_) => None,
613        }
614    }
615}
616
617/// A snapshot of all capabilities discovered from a single MCP server.
618///
619/// Tools, resources, and prompts are stored as raw rmcp wire types
620/// ([`McpTool`], [`McpResource`], [`McpPrompt`]) so that consumers see the
621/// full typed surface — `Tool::annotations`, `Tool::output_schema`,
622/// `Tool::execution`, `Tool::icons`; `Resource::title` / `mime_type` /
623/// `size`; `Prompt::arguments` (with the typed `required` flag and per-arg
624/// `description`).
625#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
626pub struct McpDiscoverySnapshot {
627    /// The server this snapshot was taken from.
628    pub server_id: McpServerId,
629    /// Tools advertised by the server.
630    pub tools: Vec<McpTool>,
631    /// Resources advertised by the server.
632    pub resources: Vec<McpResource>,
633    /// Prompts advertised by the server.
634    pub prompts: Vec<McpPrompt>,
635    /// Arbitrary metadata attached to this snapshot.
636    pub metadata: MetadataMap,
637}
638
639/// Catalog and lifecycle events emitted by [`McpServerManager`].
640#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
641pub enum McpCatalogEvent {
642    /// A server connected and completed initial discovery.
643    ServerConnected { server_id: McpServerId },
644    /// A server disconnected.
645    ServerDisconnected { server_id: McpServerId },
646    /// The server's tool list changed.
647    ToolsChanged {
648        server_id: McpServerId,
649        added: Vec<String>,
650        removed: Vec<String>,
651        changed: Vec<String>,
652    },
653    /// The server's resource list changed.
654    ResourcesChanged {
655        server_id: McpServerId,
656        added: Vec<String>,
657        removed: Vec<String>,
658        changed: Vec<String>,
659    },
660    /// The server's prompt list changed.
661    PromptsChanged {
662        server_id: McpServerId,
663        added: Vec<String>,
664        removed: Vec<String>,
665        changed: Vec<String>,
666    },
667    /// Authentication state changed for a server.
668    AuthChanged { server_id: McpServerId },
669    /// A catalog refresh failed.
670    RefreshFailed {
671        server_id: McpServerId,
672        message: String,
673    },
674}
675
676/// Capabilities advertised by an MCP server during the `initialize` handshake.
677#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
678pub struct McpServerCapabilities {
679    /// Advertised `tools` capability.
680    #[serde(default, skip_serializing_if = "Option::is_none")]
681    pub tools: Option<ToolsCapability>,
682    /// Advertised `resources` capability.
683    #[serde(default, skip_serializing_if = "Option::is_none")]
684    pub resources: Option<ResourcesCapability>,
685    /// Advertised `prompts` capability.
686    #[serde(default, skip_serializing_if = "Option::is_none")]
687    pub prompts: Option<PromptsCapability>,
688    /// Advertised `logging` capability.
689    #[serde(default, skip_serializing_if = "Option::is_none")]
690    pub logging: Option<LoggingCapability>,
691}
692
693impl McpServerCapabilities {
694    /// Returns a capabilities struct with every top-level capability
695    /// advertised. Useful for tests.
696    pub fn all() -> Self {
697        Self {
698            tools: Some(ToolsCapability::default()),
699            resources: Some(ResourcesCapability::default()),
700            prompts: Some(PromptsCapability::default()),
701            logging: Some(LoggingCapability::default()),
702        }
703    }
704}
705
706/// Tools sub-capability flags from the MCP `initialize` response.
707#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
708#[serde(rename_all = "camelCase")]
709pub struct ToolsCapability {
710    /// Server emits `notifications/tools/list_changed` when the catalog changes.
711    #[serde(default, skip_serializing_if = "Option::is_none")]
712    pub list_changed: Option<bool>,
713}
714
715/// Resources sub-capability flags from the MCP `initialize` response.
716#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
717#[serde(rename_all = "camelCase")]
718pub struct ResourcesCapability {
719    /// Server supports `resources/subscribe`.
720    #[serde(default, skip_serializing_if = "Option::is_none")]
721    pub subscribe: Option<bool>,
722    /// Server emits `notifications/resources/list_changed`.
723    #[serde(default, skip_serializing_if = "Option::is_none")]
724    pub list_changed: Option<bool>,
725}
726
727/// Prompts sub-capability flags from the MCP `initialize` response.
728#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
729#[serde(rename_all = "camelCase")]
730pub struct PromptsCapability {
731    /// Server emits `notifications/prompts/list_changed`.
732    #[serde(default, skip_serializing_if = "Option::is_none")]
733    pub list_changed: Option<bool>,
734}
735
736/// Logging sub-capability. Spec reserves the key with no defined sub-fields yet.
737#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
738pub struct LoggingCapability {}
739
740/// Server-originated catalog notifications observed by [`McpClientHandler`].
741///
742/// Drained by [`McpConnection`] inside
743/// [`McpServerManager::refresh_changed_catalogs`] to trigger re-discovery of
744/// the affected capability lists. For richer push-style consumption (progress,
745/// logging, resource updates, cancellation), subscribe via
746/// [`McpConnection::subscribe_events`] and pattern-match on
747/// [`McpServerEvent`].
748#[allow(clippy::enum_variant_names)]
749#[derive(Clone, Debug)]
750pub enum McpServerNotification {
751    /// Server announced `notifications/tools/list_changed`.
752    ToolsChanged,
753    /// Server announced `notifications/resources/list_changed`.
754    ResourcesChanged,
755    /// Server announced `notifications/prompts/list_changed`.
756    PromptsChanged,
757}
758
759/// Server-pushed events broadcast to every [`McpConnection::subscribe_events`]
760/// receiver.
761///
762/// Covers the rmcp client-handler notification surface that does not feed the
763/// catalog refresh path: progress, logging, resource updates, cancellation,
764/// plus list-changed announcements (also delivered over the legacy
765/// [`McpServerNotification`] channel).
766#[derive(Clone, Debug)]
767pub enum McpServerEvent {
768    /// `notifications/progress` from the server, scoped to a
769    /// `progress_token` issued in a previous request.
770    Progress(McpProgressNotificationParam),
771    /// `notifications/message` (server log emission). Drives the optional
772    /// log-level negotiation initiated by [`McpConnection::set_logging_level`].
773    Logging(McpLoggingMessageNotificationParam),
774    /// `notifications/resources/updated` for a resource the client previously
775    /// subscribed to via [`McpConnection::subscribe_resource`].
776    ResourceUpdated(McpResourceUpdatedNotificationParam),
777    /// `notifications/tools/list_changed`.
778    ToolListChanged,
779    /// `notifications/resources/list_changed`.
780    ResourceListChanged,
781    /// `notifications/prompts/list_changed`.
782    PromptListChanged,
783    /// `notifications/cancelled` from the server, requesting cancellation of
784    /// an in-flight client request.
785    Cancelled(McpCancelledNotificationParam),
786}
787
788/// Pluggable handler invoked when an MCP server issues `sampling/createMessage`.
789///
790/// Install one via [`McpHandlerConfig::with_sampling_responder`] to expose
791/// the host application's LLM as a sampling target for connected MCP servers.
792#[async_trait]
793pub trait McpSamplingResponder: Send + Sync + 'static {
794    /// Produces a sampled completion in response to a server-initiated
795    /// `sampling/createMessage` request.
796    async fn create_message(
797        &self,
798        params: McpCreateMessageRequestParams,
799    ) -> Result<McpCreateMessageResult, McpError>;
800}
801
802/// Pluggable handler invoked when an MCP server issues `elicitation/create`.
803///
804/// Install one via [`McpHandlerConfig::with_elicitation_responder`] to drive
805/// the host application's user-input UI.
806#[async_trait]
807pub trait McpElicitationResponder: Send + Sync + 'static {
808    /// Returns the user's response to a server-initiated elicitation request.
809    async fn create_elicitation(
810        &self,
811        params: McpCreateElicitationRequestParams,
812    ) -> Result<McpCreateElicitationResult, McpError>;
813}
814
815/// Pluggable handler invoked when an MCP server issues `roots/list`.
816///
817/// Install one via [`McpHandlerConfig::with_roots_provider`] to surface
818/// workspace roots that scope the server's filesystem access.
819#[async_trait]
820pub trait McpRootsProvider: Send + Sync + 'static {
821    /// Returns the roots the server should consider in scope.
822    async fn list_roots(&self) -> Result<Vec<McpRoot>, McpError>;
823}
824
825/// Default broadcast capacity for [`McpServerEvent`] subscribers.
826const DEFAULT_EVENTS_CAPACITY: usize = 128;
827
828/// Channels paired with an [`McpClientHandler`] returned by
829/// [`McpHandlerConfig::build`].
830///
831/// `notifications` is the legacy mpsc receiver consumed by the catalog refresh
832/// path inside [`McpServerManager::refresh_changed_catalogs`]. `events` is the
833/// broadcast sender that surfaces every [`McpServerEvent`] — clone it once and
834/// pass it into [`McpConnection::from_running_service_with_events`] when
835/// adopting an externally constructed [`rmcp::service::RunningService`].
836pub struct McpClientChannels {
837    /// Legacy mpsc receiver for catalog list-changed announcements.
838    pub notifications: mpsc::UnboundedReceiver<McpServerNotification>,
839    /// Broadcast sender that forwards every [`McpServerEvent`] to subscribers.
840    pub events: broadcast::Sender<McpServerEvent>,
841}
842
843/// rmcp [`ClientHandler`] used by [`McpConnection`].
844///
845/// You only need to construct this directly if you're wiring rmcp transports
846/// that [`McpTransportBinding`] does not cover (in-memory pipes, websockets,
847/// custom IO). Build one via [`McpHandlerConfig::build`], then pair the
848/// resulting service with [`McpConnection::from_running_service`] or
849/// [`McpConnection::from_running_service_with_events`].
850#[derive(Clone)]
851pub struct McpClientHandler {
852    info: rmcp_model::ClientInfo,
853    notifications: mpsc::UnboundedSender<McpServerNotification>,
854    events: broadcast::Sender<McpServerEvent>,
855    sampling: Option<Arc<dyn McpSamplingResponder>>,
856    elicitation: Option<Arc<dyn McpElicitationResponder>>,
857    roots: Option<Arc<dyn McpRootsProvider>>,
858}
859
860impl ClientHandler for McpClientHandler {
861    fn create_message(
862        &self,
863        params: rmcp_model::CreateMessageRequestParams,
864        _context: rmcp::service::RequestContext<RoleClient>,
865    ) -> impl Future<Output = Result<rmcp_model::CreateMessageResult, rmcp_model::ErrorData>>
866    + rmcp::service::MaybeSendFuture
867    + '_ {
868        let responder = self.sampling.clone();
869        async move {
870            match responder {
871                Some(responder) => responder.create_message(params).await.map_err(Into::into),
872                None => Err(rmcp_model::ErrorData::method_not_found::<
873                    rmcp_model::CreateMessageRequestMethod,
874                >()),
875            }
876        }
877    }
878
879    fn list_roots(
880        &self,
881        _context: rmcp::service::RequestContext<RoleClient>,
882    ) -> impl Future<Output = Result<rmcp_model::ListRootsResult, rmcp_model::ErrorData>>
883    + rmcp::service::MaybeSendFuture
884    + '_ {
885        let provider = self.roots.clone();
886        async move {
887            match provider {
888                Some(provider) => provider
889                    .list_roots()
890                    .await
891                    .map(McpListRootsResult::new)
892                    .map_err(Into::into),
893                None => Ok(McpListRootsResult::default()),
894            }
895        }
896    }
897
898    fn create_elicitation(
899        &self,
900        params: rmcp_model::CreateElicitationRequestParams,
901        _context: rmcp::service::RequestContext<RoleClient>,
902    ) -> impl Future<Output = Result<rmcp_model::CreateElicitationResult, rmcp_model::ErrorData>>
903    + rmcp::service::MaybeSendFuture
904    + '_ {
905        let responder = self.elicitation.clone();
906        async move {
907            match responder {
908                Some(responder) => responder
909                    .create_elicitation(params)
910                    .await
911                    .map_err(Into::into),
912                None => Ok(McpCreateElicitationResult::new(
913                    McpElicitationAction::Decline,
914                )),
915            }
916        }
917    }
918
919    fn on_progress(
920        &self,
921        params: rmcp_model::ProgressNotificationParam,
922        _context: rmcp::service::NotificationContext<RoleClient>,
923    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
924        let _ = self.events.send(McpServerEvent::Progress(params));
925        std::future::ready(())
926    }
927
928    fn on_logging_message(
929        &self,
930        params: rmcp_model::LoggingMessageNotificationParam,
931        _context: rmcp::service::NotificationContext<RoleClient>,
932    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
933        let _ = self.events.send(McpServerEvent::Logging(params));
934        std::future::ready(())
935    }
936
937    fn on_resource_updated(
938        &self,
939        params: rmcp_model::ResourceUpdatedNotificationParam,
940        _context: rmcp::service::NotificationContext<RoleClient>,
941    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
942        let _ = self.events.send(McpServerEvent::ResourceUpdated(params));
943        std::future::ready(())
944    }
945
946    fn on_cancelled(
947        &self,
948        params: rmcp_model::CancelledNotificationParam,
949        _context: rmcp::service::NotificationContext<RoleClient>,
950    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
951        let _ = self.events.send(McpServerEvent::Cancelled(params));
952        std::future::ready(())
953    }
954
955    fn on_tool_list_changed(
956        &self,
957        _context: rmcp::service::NotificationContext<RoleClient>,
958    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
959        let _ = self.notifications.send(McpServerNotification::ToolsChanged);
960        let _ = self.events.send(McpServerEvent::ToolListChanged);
961        std::future::ready(())
962    }
963
964    fn on_resource_list_changed(
965        &self,
966        _context: rmcp::service::NotificationContext<RoleClient>,
967    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
968        let _ = self
969            .notifications
970            .send(McpServerNotification::ResourcesChanged);
971        let _ = self.events.send(McpServerEvent::ResourceListChanged);
972        std::future::ready(())
973    }
974
975    fn on_prompt_list_changed(
976        &self,
977        _context: rmcp::service::NotificationContext<RoleClient>,
978    ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
979        let _ = self
980            .notifications
981            .send(McpServerNotification::PromptsChanged);
982        let _ = self.events.send(McpServerEvent::PromptListChanged);
983        std::future::ready(())
984    }
985
986    fn get_info(&self) -> rmcp_model::ClientInfo {
987        self.info.clone()
988    }
989}
990
991impl From<McpError> for rmcp_model::ErrorData {
992    fn from(error: McpError) -> Self {
993        rmcp_model::ErrorData::internal_error(error.to_string(), None)
994    }
995}
996
997type RmcpClientService = RunningService<RoleClient, McpClientHandler>;
998
999/// Configuration applied to every [`McpClientHandler`] this crate builds on
1000/// behalf of a connection or [`McpServerManager`].
1001///
1002/// Holds the optional sampling / elicitation / roots responders plus the
1003/// broadcast capacity for [`McpServerEvent`] subscribers. Pass an instance to
1004/// [`McpConnection::connect_with_handler`] to drive a single connection, or
1005/// install one on the manager via
1006/// [`McpServerManager::with_handler_config`] / per-trait builders.
1007#[derive(Clone, Default)]
1008pub struct McpHandlerConfig {
1009    /// Responder for server-initiated `sampling/createMessage` requests.
1010    pub sampling: Option<Arc<dyn McpSamplingResponder>>,
1011    /// Responder for server-initiated `elicitation/create` requests.
1012    pub elicitation: Option<Arc<dyn McpElicitationResponder>>,
1013    /// Provider for `roots/list`.
1014    pub roots: Option<Arc<dyn McpRootsProvider>>,
1015    /// Resolver for auth challenges raised during MCP operations. When
1016    /// installed, [`McpToolAdapter::invoke`] (and other operation paths)
1017    /// invoke the responder inline on auth challenges and retry — auth
1018    /// never surfaces as a loop interrupt.
1019    pub auth: Option<Arc<dyn McpAuthResponder>>,
1020    /// Broadcast capacity for the [`McpServerEvent`] channel. Defaults to
1021    /// `DEFAULT_EVENTS_CAPACITY` when `None`.
1022    pub events_capacity: Option<usize>,
1023}
1024
1025impl McpHandlerConfig {
1026    /// Returns an empty handler config.
1027    pub fn new() -> Self {
1028        Self::default()
1029    }
1030
1031    /// Sets the sampling responder.
1032    pub fn with_sampling_responder(mut self, responder: Arc<dyn McpSamplingResponder>) -> Self {
1033        self.sampling = Some(responder);
1034        self
1035    }
1036
1037    /// Sets the elicitation responder.
1038    pub fn with_elicitation_responder(
1039        mut self,
1040        responder: Arc<dyn McpElicitationResponder>,
1041    ) -> Self {
1042        self.elicitation = Some(responder);
1043        self
1044    }
1045
1046    /// Sets the roots provider.
1047    pub fn with_roots_provider(mut self, provider: Arc<dyn McpRootsProvider>) -> Self {
1048        self.roots = Some(provider);
1049        self
1050    }
1051
1052    /// Sets the auth responder.
1053    pub fn with_auth_responder(mut self, responder: Arc<dyn McpAuthResponder>) -> Self {
1054        self.auth = Some(responder);
1055        self
1056    }
1057
1058    /// Sets the broadcast capacity for [`McpServerEvent`] subscribers.
1059    pub fn with_events_capacity(mut self, capacity: usize) -> Self {
1060        self.events_capacity = Some(capacity);
1061        self
1062    }
1063
1064    /// Builds a handler together with a fresh [`McpClientChannels`] pair —
1065    /// the notification receiver and a new broadcast sender for
1066    /// [`McpServerEvent`].
1067    pub fn build(&self) -> (McpClientHandler, McpClientChannels) {
1068        self.build_inner(None)
1069    }
1070
1071    /// Builds a handler that publishes [`McpServerEvent`] into the provided
1072    /// broadcast sender. Use this when adopting an externally constructed
1073    /// rmcp service via [`McpConnection::from_running_service_with_events`]
1074    /// so subscribers see the same stream.
1075    pub fn build_with(
1076        &self,
1077        events: broadcast::Sender<McpServerEvent>,
1078    ) -> (McpClientHandler, McpClientChannels) {
1079        self.build_inner(Some(events))
1080    }
1081
1082    fn build_inner(
1083        &self,
1084        events: Option<broadcast::Sender<McpServerEvent>>,
1085    ) -> (McpClientHandler, McpClientChannels) {
1086        let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
1087        let events_tx = events.unwrap_or_else(|| {
1088            let capacity = self.events_capacity.unwrap_or(DEFAULT_EVENTS_CAPACITY);
1089            let (tx, _) = broadcast::channel(capacity);
1090            tx
1091        });
1092
1093        let mut capabilities = rmcp_model::ClientCapabilities::default();
1094        if self.sampling.is_some() {
1095            capabilities.sampling = Some(McpSamplingCapability::default());
1096        }
1097        if self.elicitation.is_some() {
1098            capabilities.elicitation = Some(McpElicitationCapability {
1099                form: Some(McpFormElicitationCapability::default()),
1100                url: None,
1101            });
1102        }
1103        if self.roots.is_some() {
1104            capabilities.roots = Some(McpRootsCapabilities::default());
1105        }
1106
1107        let handler = McpClientHandler {
1108            info: rmcp_model::ClientInfo::new(
1109                capabilities,
1110                rmcp_model::Implementation::new("agentkit-mcp", env!("CARGO_PKG_VERSION"))
1111                    .with_title("agentkit MCP client"),
1112            )
1113            .with_protocol_version(rmcp_model::ProtocolVersion::LATEST),
1114            notifications: notifications_tx,
1115            events: events_tx.clone(),
1116            sampling: self.sampling.clone(),
1117            elicitation: self.elicitation.clone(),
1118            roots: self.roots.clone(),
1119        };
1120
1121        (
1122            handler,
1123            McpClientChannels {
1124                notifications: notifications_rx,
1125                events: events_tx,
1126            },
1127        )
1128    }
1129}
1130
1131/// A live connection to a single MCP server, wrapping an
1132/// [`rmcp::service::RunningService`].
1133pub struct McpConnection {
1134    server_id: McpServerId,
1135    config: Option<McpServerConfig>,
1136    inner: Mutex<RmcpClientService>,
1137    peer: RwLock<Peer<RoleClient>>,
1138    auth: Mutex<Option<MetadataMap>>,
1139    notifications: Mutex<mpsc::UnboundedReceiver<McpServerNotification>>,
1140    events: broadcast::Sender<McpServerEvent>,
1141    handler_config: McpHandlerConfig,
1142    capabilities: McpServerCapabilities,
1143}
1144
1145/// The result of replaying an MCP operation after auth resolution.
1146#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1147pub enum McpOperationResult {
1148    /// The server was successfully (re)connected; contains the discovery snapshot.
1149    Connected(McpDiscoverySnapshot),
1150    /// A tool call completed; contains the typed rmcp [`CallToolResult`].
1151    Tool(CallToolResult),
1152    /// A resource was read successfully.
1153    Resource(ReadResourceResult),
1154    /// A prompt was retrieved successfully.
1155    Prompt(GetPromptResult),
1156}
1157
1158impl McpConnection {
1159    /// Connects to an MCP server, performs the rmcp `initialize` handshake,
1160    /// and returns a ready-to-use connection. No sampling / elicitation /
1161    /// roots responders are wired; use [`Self::connect_with_handler`] when
1162    /// the server may issue those requests.
1163    pub async fn connect(config: &McpServerConfig) -> Result<Self, McpError> {
1164        Self::connect_with_auth(config, None, McpHandlerConfig::default()).await
1165    }
1166
1167    /// Connects to an MCP server with a fully configured [`McpHandlerConfig`].
1168    pub async fn connect_with_handler(
1169        config: &McpServerConfig,
1170        handler_config: McpHandlerConfig,
1171    ) -> Result<Self, McpError> {
1172        Self::connect_with_auth(config, None, handler_config).await
1173    }
1174
1175    async fn connect_with_auth(
1176        config: &McpServerConfig,
1177        auth: Option<&MetadataMap>,
1178        handler_config: McpHandlerConfig,
1179    ) -> Result<Self, McpError> {
1180        let (handler, channels) = handler_config.build();
1181        let McpClientChannels {
1182            notifications: notification_rx,
1183            events: events_tx,
1184        } = channels;
1185        let (service, capabilities) = match &config.transport {
1186            McpTransportBinding::Stdio(binding) => {
1187                connect_rmcp_stdio(config, binding, handler).await?
1188            }
1189            McpTransportBinding::StreamableHttp(binding) => {
1190                connect_rmcp_streamable_http(config, binding, auth, handler).await?
1191            }
1192        };
1193
1194        let peer = service.peer().clone();
1195        Ok(Self {
1196            server_id: config.id.clone(),
1197            config: Some(config.clone()),
1198            inner: Mutex::new(service),
1199            peer: RwLock::new(peer),
1200            auth: Mutex::new(auth.cloned()),
1201            notifications: Mutex::new(notification_rx),
1202            events: events_tx,
1203            handler_config,
1204            capabilities,
1205        })
1206    }
1207
1208    /// Adopts an externally constructed [`rmcp::service::RunningService`] as
1209    /// an [`McpConnection`].
1210    ///
1211    /// Use this when you need a transport rmcp supports but
1212    /// [`McpTransportBinding`] does not (in-memory pipes for tests, websockets,
1213    /// custom IO). Pair the service with the notification receiver returned by
1214    /// [`McpHandlerConfig::build`] so list-change notifications stay
1215    /// observable.
1216    ///
1217    /// The connection has no [`McpServerConfig`] attached, so reconnect-on-auth
1218    /// is unavailable; [`resolve_auth`](Self::resolve_auth) only updates stored
1219    /// credentials in this mode. Server-pushed events from the underlying
1220    /// handler are *not* forwarded to subscribers — use
1221    /// [`Self::from_running_service_with_events`] paired with the broadcast
1222    /// sender from [`McpClientChannels`] when you need event delivery.
1223    pub fn from_running_service(
1224        server_id: impl Into<McpServerId>,
1225        service: RmcpClientService,
1226        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1227    ) -> Self {
1228        let (events_tx, _) = broadcast::channel(DEFAULT_EVENTS_CAPACITY);
1229        Self::from_running_service_with_events(server_id, service, notifications, events_tx)
1230    }
1231
1232    /// Variant of [`Self::from_running_service`] that wires the broadcast
1233    /// sender returned by [`McpHandlerConfig::build`] (or [`build_with`])
1234    /// so [`Self::subscribe_events`] receivers observe the same stream the
1235    /// handler is publishing into.
1236    ///
1237    /// [`build_with`]: McpHandlerConfig::build_with
1238    pub fn from_running_service_with_events(
1239        server_id: impl Into<McpServerId>,
1240        service: RmcpClientService,
1241        notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1242        events: broadcast::Sender<McpServerEvent>,
1243    ) -> Self {
1244        let capabilities = service
1245            .peer_info()
1246            .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1247            .unwrap_or_default();
1248        let peer = service.peer().clone();
1249        Self {
1250            server_id: server_id.into(),
1251            config: None,
1252            inner: Mutex::new(service),
1253            peer: RwLock::new(peer),
1254            auth: Mutex::new(None),
1255            notifications: Mutex::new(notifications),
1256            events,
1257            handler_config: McpHandlerConfig::default(),
1258            capabilities,
1259        }
1260    }
1261
1262    async fn reconnect_inner(&self, auth: Option<&MetadataMap>) -> Result<(), McpError> {
1263        let Some(config) = self.config.clone() else {
1264            return Ok(());
1265        };
1266        let (handler, channels) = self.handler_config.build_with(self.events.clone());
1267        let McpClientChannels {
1268            notifications: notification_rx,
1269            ..
1270        } = channels;
1271        let (service, _capabilities) = match &config.transport {
1272            McpTransportBinding::Stdio(binding) => {
1273                connect_rmcp_stdio(&config, binding, handler).await?
1274            }
1275            McpTransportBinding::StreamableHttp(binding) => {
1276                connect_rmcp_streamable_http(&config, binding, auth, handler).await?
1277            }
1278        };
1279        let new_peer = service.peer().clone();
1280        *self.notifications.lock().await = notification_rx;
1281        *self.inner.lock().await = service;
1282        *self.peer.write().expect("MCP peer lock poisoned") = new_peer;
1283        Ok(())
1284    }
1285
1286    fn peer(&self) -> Peer<RoleClient> {
1287        self.peer.read().expect("MCP peer lock poisoned").clone()
1288    }
1289
1290    /// Returns the [`McpServerId`] for this connection.
1291    pub fn server_id(&self) -> &McpServerId {
1292        &self.server_id
1293    }
1294
1295    /// Returns the capabilities advertised by the server during `initialize`.
1296    pub fn capabilities(&self) -> &McpServerCapabilities {
1297        &self.capabilities
1298    }
1299
1300    /// Returns the [`McpHandlerConfig`] this connection was built with.
1301    /// Used by [`McpToolAdapter`] to reach the registered
1302    /// [`McpAuthResponder`] when an auth challenge surfaces.
1303    pub fn handler_config(&self) -> &McpHandlerConfig {
1304        &self.handler_config
1305    }
1306
1307    /// Subscribes to the per-connection [`McpServerEvent`] broadcast.
1308    ///
1309    /// Receivers buffer up to `events_capacity` (configured via
1310    /// [`McpHandlerConfig::with_events_capacity`], defaults to
1311    /// `DEFAULT_EVENTS_CAPACITY`) before slow consumers are signalled with
1312    /// [`broadcast::error::RecvError::Lagged`]. Catalog `*ListChanged` events
1313    /// are also delivered through the legacy [`McpServerNotification`]
1314    /// receiver consumed by [`McpServerManager::refresh_changed_catalogs`].
1315    pub fn subscribe_events(&self) -> broadcast::Receiver<McpServerEvent> {
1316        self.events.subscribe()
1317    }
1318
1319    /// Subscribes to `notifications/resources/updated` for the given URI.
1320    ///
1321    /// Updates surface as [`McpServerEvent::ResourceUpdated`] on every
1322    /// receiver returned by [`Self::subscribe_events`].
1323    pub async fn subscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1324        let uri = uri.into();
1325        self.peer()
1326            .subscribe(rmcp_model::SubscribeRequestParams::new(uri.clone()))
1327            .await
1328            .map_err(|error| {
1329                rmcp_operation_error(
1330                    &self.server_id,
1331                    McpMethod::ResourcesSubscribe { uri },
1332                    error,
1333                )
1334            })
1335    }
1336
1337    /// Cancels a previous [`Self::subscribe_resource`] subscription.
1338    pub async fn unsubscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1339        let uri = uri.into();
1340        self.peer()
1341            .unsubscribe(rmcp_model::UnsubscribeRequestParams::new(uri.clone()))
1342            .await
1343            .map_err(|error| {
1344                rmcp_operation_error(
1345                    &self.server_id,
1346                    McpMethod::ResourcesUnsubscribe { uri },
1347                    error,
1348                )
1349            })
1350    }
1351
1352    /// Negotiates the minimum severity the server should emit through
1353    /// `notifications/message`. Surfaced as [`McpServerEvent::Logging`].
1354    pub async fn set_logging_level(&self, level: McpLoggingLevel) -> Result<(), McpError> {
1355        self.peer()
1356            .set_level(rmcp_model::SetLevelRequestParams::new(level))
1357            .await
1358            .map_err(|error| {
1359                rmcp_operation_error(
1360                    &self.server_id,
1361                    McpMethod::LoggingSetLevel {
1362                        level: format!("{level:?}"),
1363                    },
1364                    error,
1365                )
1366            })
1367    }
1368
1369    /// Sends a `notifications/cancelled` to the server, asking it to stop
1370    /// processing a previously issued request.
1371    pub async fn notify_cancelled(
1372        &self,
1373        params: McpCancelledNotificationParam,
1374    ) -> Result<(), McpError> {
1375        self.peer()
1376            .notify_cancelled(params)
1377            .await
1378            .map_err(rmcp_service_error)
1379    }
1380
1381    /// Notifies the server that the client's roots list has changed; servers
1382    /// may respond by re-issuing `roots/list`.
1383    pub async fn notify_roots_list_changed(&self) -> Result<(), McpError> {
1384        self.peer()
1385            .notify_roots_list_changed()
1386            .await
1387            .map_err(rmcp_service_error)
1388    }
1389
1390    /// Gracefully closes the underlying rmcp service.
1391    ///
1392    /// For Streamable HTTP this drives the rmcp transport to issue a `DELETE`
1393    /// against the negotiated session, releasing server-side state.
1394    pub async fn close(&self) -> Result<(), McpError> {
1395        let mut inner = self.inner.lock().await;
1396        inner
1397            .close()
1398            .await
1399            .map(|_| ())
1400            .map_err(|error| McpError::Transport(format!("rmcp service close failed: {error}")))
1401    }
1402
1403    /// Stores or clears authentication credentials and, when configured to do
1404    /// so via [`McpServerConfig`], reconnects to apply them.
1405    pub async fn resolve_auth(&self, resolution: AuthResolution) -> Result<(), McpError> {
1406        let mut auth_slot = self.auth.lock().await;
1407        match resolution {
1408            AuthResolution::Provided { credentials, .. } => {
1409                *auth_slot = Some(credentials);
1410            }
1411            AuthResolution::Cancelled { .. } => {
1412                *auth_slot = None;
1413            }
1414        }
1415        let snapshot = auth_slot.clone();
1416        drop(auth_slot);
1417        // Only reconnect if we have a config to reconnect with. Without one
1418        // (e.g. constructed via [`from_running_service`]) the auth is stored
1419        // but not pushed to the live transport.
1420        if self.config.is_some() {
1421            self.reconnect_inner(snapshot.as_ref()).await?;
1422        }
1423        Ok(())
1424    }
1425
1426    /// Discovers tools, resources, and prompts that the server advertised.
1427    pub async fn discover(&self) -> Result<McpDiscoverySnapshot, McpError> {
1428        let tools = async {
1429            match self.capabilities.tools {
1430                Some(_) => self.list_tools().await,
1431                None => Ok(Vec::new()),
1432            }
1433        };
1434        let resources = async {
1435            match self.capabilities.resources {
1436                Some(_) => self.list_resources().await,
1437                None => Ok(Vec::new()),
1438            }
1439        };
1440        let prompts = async {
1441            match self.capabilities.prompts {
1442                Some(_) => self.list_prompts().await,
1443                None => Ok(Vec::new()),
1444            }
1445        };
1446        let (tools, resources, prompts) = tokio::try_join!(tools, resources, prompts)?;
1447        Ok(McpDiscoverySnapshot {
1448            server_id: self.server_id.clone(),
1449            tools,
1450            resources,
1451            prompts,
1452            metadata: MetadataMap::new(),
1453        })
1454    }
1455
1456    async fn drain_notifications(&self) -> Vec<McpServerNotification> {
1457        let mut notifications = self.notifications.lock().await;
1458        let mut drained = Vec::new();
1459        while let Ok(notification) = notifications.try_recv() {
1460            drained.push(notification);
1461        }
1462        drained
1463    }
1464
1465    /// Lists all tools advertised by the connected MCP server.
1466    pub async fn list_tools(&self) -> Result<Vec<McpTool>, McpError> {
1467        self.peer()
1468            .list_all_tools()
1469            .await
1470            .map_err(rmcp_service_error)
1471    }
1472
1473    /// Lists all resources advertised by the connected MCP server.
1474    pub async fn list_resources(&self) -> Result<Vec<McpResource>, McpError> {
1475        self.peer()
1476            .list_all_resources()
1477            .await
1478            .map_err(rmcp_service_error)
1479    }
1480
1481    /// Lists all prompts advertised by the connected MCP server.
1482    pub async fn list_prompts(&self) -> Result<Vec<McpPrompt>, McpError> {
1483        self.peer()
1484            .list_all_prompts()
1485            .await
1486            .map_err(rmcp_service_error)
1487    }
1488
1489    /// Invokes a tool on the MCP server.
1490    ///
1491    /// Returns the typed [`CallToolResult`] — the [`Vec<Content>`] block list,
1492    /// the optional `structured_content` field, and the `is_error` flag are
1493    /// all preserved. Adapters convert this into agentkit
1494    /// [`ToolOutput`]/[`InvocableOutput`] at the boundary.
1495    pub async fn call_tool(
1496        &self,
1497        name: &str,
1498        arguments: Value,
1499    ) -> Result<CallToolResult, McpError> {
1500        let arguments_for_auth = arguments.clone();
1501        let mut params = rmcp_model::CallToolRequestParams::new(name.to_string());
1502        if !arguments.is_null() {
1503            params =
1504                params.with_arguments(value_to_json_object(arguments, "tools/call arguments")?);
1505        }
1506        let name_owned = name.to_string();
1507        self.peer().call_tool(params).await.map_err(|error| {
1508            rmcp_operation_error(
1509                &self.server_id,
1510                McpMethod::ToolsCall {
1511                    name: name_owned,
1512                    arguments: arguments_for_auth,
1513                },
1514                error,
1515            )
1516        })
1517    }
1518
1519    /// Reads a resource from the MCP server by URI.
1520    ///
1521    /// Returns the typed [`ReadResourceResult`] — the full
1522    /// [`Vec<McpResourceContents>`] is preserved (text vs blob, mime types,
1523    /// metadata). Use [`McpResourceHandle`] for the agentkit
1524    /// [`ResourceProvider`] view that collapses to a single inline `DataRef`.
1525    pub async fn read_resource(&self, uri: &str) -> Result<ReadResourceResult, McpError> {
1526        let uri_owned = uri.to_string();
1527        self.peer()
1528            .read_resource(rmcp_model::ReadResourceRequestParams::new(uri))
1529            .await
1530            .map_err(|error| {
1531                rmcp_operation_error(
1532                    &self.server_id,
1533                    McpMethod::ResourcesRead { uri: uri_owned },
1534                    error,
1535                )
1536            })
1537    }
1538
1539    /// Retrieves a prompt from the MCP server, rendering it with the given
1540    /// arguments.
1541    ///
1542    /// Returns the typed [`GetPromptResult`] — message role and content
1543    /// blocks (text/image/audio/embedded resource) are preserved. Use
1544    /// [`McpPromptHandle`] for the collapsed agentkit [`PromptProvider`]
1545    /// view.
1546    pub async fn get_prompt(
1547        &self,
1548        name: &str,
1549        arguments: Value,
1550    ) -> Result<GetPromptResult, McpError> {
1551        let arguments_for_auth = arguments.clone();
1552        let name_owned = name.to_string();
1553        let mut params = rmcp_model::GetPromptRequestParams::new(name);
1554        if !arguments.is_null() {
1555            params =
1556                params.with_arguments(value_to_json_object(arguments, "prompts/get arguments")?);
1557        }
1558        self.peer().get_prompt(params).await.map_err(|error| {
1559            rmcp_operation_error(
1560                &self.server_id,
1561                McpMethod::PromptsGet {
1562                    name: name_owned,
1563                    arguments: arguments_for_auth,
1564                },
1565                error,
1566            )
1567        })
1568    }
1569}
1570
1571async fn connect_rmcp_stdio(
1572    config: &McpServerConfig,
1573    binding: &StdioTransportConfig,
1574    handler: McpClientHandler,
1575) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1576    let transport = TokioChildProcess::new(
1577        tokio::process::Command::new(&binding.command).configure(|command| {
1578            command.args(&binding.args);
1579            if let Some(cwd) = &binding.cwd {
1580                command.current_dir(cwd);
1581            }
1582            for (key, value) in &binding.env {
1583                command.env(key, value);
1584            }
1585        }),
1586    )
1587    .map_err(McpError::Io)?;
1588
1589    let service = handler
1590        .serve(transport)
1591        .await
1592        .map_err(|error| rmcp_initialize_error(config, error))?;
1593    let capabilities = service
1594        .peer_info()
1595        .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1596        .unwrap_or_default();
1597
1598    Ok((service, capabilities))
1599}
1600
1601async fn connect_rmcp_streamable_http(
1602    config: &McpServerConfig,
1603    binding: &StreamableHttpTransportConfig,
1604    auth: Option<&MetadataMap>,
1605    handler: McpClientHandler,
1606) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1607    let auth_header = auth
1608        .and_then(bearer_token_from_metadata)
1609        .or_else(|| binding.bearer_token.clone());
1610    let mut rmcp_config = RmcpStreamableHttpClientTransportConfig::with_uri(binding.url.clone());
1611    if let Some(auth_header) = auth_header {
1612        rmcp_config = rmcp_config.auth_header(auth_header);
1613    }
1614    rmcp_config = rmcp_config.custom_headers(binding.headers.iter().cloned().collect());
1615
1616    let result = match binding.http_client.as_ref() {
1617        Some(client) => {
1618            let transport = StreamableHttpClientTransport::with_client(
1619                DynHttpClient(client.clone()),
1620                rmcp_config,
1621            );
1622            handler.serve(transport).await
1623        }
1624        None => {
1625            let transport = StreamableHttpClientTransport::from_config(rmcp_config);
1626            handler.serve(transport).await
1627        }
1628    };
1629    let service = result.map_err(|error| rmcp_initialize_error(config, error))?;
1630    let capabilities = service
1631        .peer_info()
1632        .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1633        .unwrap_or_default();
1634
1635    Ok((service, capabilities))
1636}
1637
1638/// Adapter exposing a single MCP resource as a [`ResourceProvider`].
1639pub struct McpResourceHandle {
1640    connection: Arc<McpConnection>,
1641    descriptor: ResourceDescriptor,
1642}
1643
1644#[async_trait]
1645impl ResourceProvider for McpResourceHandle {
1646    async fn list_resources(&self) -> Result<Vec<ResourceDescriptor>, CapabilityError> {
1647        Ok(vec![self.descriptor.clone()])
1648    }
1649
1650    async fn read_resource(
1651        &self,
1652        id: &ResourceId,
1653        _ctx: &mut CapabilityContext<'_>,
1654    ) -> Result<ResourceContents, CapabilityError> {
1655        let result = self
1656            .connection
1657            .read_resource(&id.0)
1658            .await
1659            .map_err(|error| match error {
1660                McpError::AuthRequired(request) => {
1661                    CapabilityError::Unavailable(format!("auth required: {:?}", request))
1662                }
1663                other => CapabilityError::ExecutionFailed(other.to_string()),
1664            })?;
1665        read_resource_result_to_capabilities(result)
1666            .map_err(|error| CapabilityError::ExecutionFailed(error.to_string()))
1667    }
1668}
1669
1670/// Adapter exposing a single MCP prompt as a [`PromptProvider`].
1671pub struct McpPromptHandle {
1672    connection: Arc<McpConnection>,
1673    descriptor: PromptDescriptor,
1674}
1675
1676#[async_trait]
1677impl PromptProvider for McpPromptHandle {
1678    async fn list_prompts(&self) -> Result<Vec<PromptDescriptor>, CapabilityError> {
1679        Ok(vec![self.descriptor.clone()])
1680    }
1681
1682    async fn get_prompt(
1683        &self,
1684        id: &PromptId,
1685        args: Value,
1686        _ctx: &mut CapabilityContext<'_>,
1687    ) -> Result<PromptContents, CapabilityError> {
1688        let result =
1689            self.connection
1690                .get_prompt(&id.0, args)
1691                .await
1692                .map_err(|error| match error {
1693                    McpError::AuthRequired(request) => {
1694                        CapabilityError::Unavailable(format!("auth required: {:?}", request))
1695                    }
1696                    other => CapabilityError::ExecutionFailed(other.to_string()),
1697                })?;
1698        Ok(get_prompt_result_to_capabilities(result))
1699    }
1700}
1701
1702/// A [`CapabilityProvider`] that surfaces MCP tools, resources, and prompts.
1703///
1704/// The tool side is built by wrapping [`McpToolAdapter`]s in
1705/// [`agentkit_tools_core::ToolInvocableAdapter`], so the same
1706/// permission-check + adapter-spec plumbing the rest of agentkit uses also
1707/// applies to MCP tools — this crate no longer ships its own
1708/// `McpInvocable`.
1709pub struct McpCapabilityProvider {
1710    invocables: Vec<Arc<dyn Invocable>>,
1711    resources: Vec<Arc<dyn ResourceProvider>>,
1712    prompts: Vec<Arc<dyn PromptProvider>>,
1713}
1714
1715impl McpCapabilityProvider {
1716    /// Builds a capability provider from an existing connection and snapshot,
1717    /// using the [`McpToolNamespace::Default`] tool naming strategy.
1718    pub fn from_snapshot(connection: Arc<McpConnection>, snapshot: &McpDiscoverySnapshot) -> Self {
1719        Self::from_snapshot_with_namespace(connection, snapshot, &McpToolNamespace::Default)
1720    }
1721
1722    /// Builds a capability provider with a custom tool naming strategy.
1723    pub fn from_snapshot_with_namespace(
1724        connection: Arc<McpConnection>,
1725        snapshot: &McpDiscoverySnapshot,
1726        namespace: &McpToolNamespace,
1727    ) -> Self {
1728        let server_id = connection.server_id().clone();
1729        let registry =
1730            snapshot
1731                .tools
1732                .iter()
1733                .cloned()
1734                .fold(ToolRegistry::new(), |registry, tool| {
1735                    registry.with(McpToolAdapter::with_namespace(
1736                        &server_id,
1737                        connection.clone(),
1738                        tool,
1739                        namespace,
1740                    ))
1741                });
1742        let permissions: Arc<dyn PermissionChecker> = Arc::new(AllowAllPermissions);
1743        let resources_arc: Arc<dyn agentkit_tools_core::ToolResources> = Arc::new(());
1744        let invocables =
1745            ToolCapabilityProvider::from_registry(&registry, permissions, resources_arc)
1746                .invocables();
1747
1748        let resources = snapshot
1749            .resources
1750            .iter()
1751            .cloned()
1752            .map(|resource| {
1753                Arc::new(McpResourceHandle {
1754                    connection: connection.clone(),
1755                    descriptor: resource_descriptor_from_rmcp(resource),
1756                }) as Arc<dyn ResourceProvider>
1757            })
1758            .collect();
1759
1760        let prompts = snapshot
1761            .prompts
1762            .iter()
1763            .cloned()
1764            .map(|prompt| {
1765                Arc::new(McpPromptHandle {
1766                    connection: connection.clone(),
1767                    descriptor: prompt_descriptor_from_rmcp(prompt),
1768                }) as Arc<dyn PromptProvider>
1769            })
1770            .collect();
1771
1772        Self {
1773            invocables,
1774            resources,
1775            prompts,
1776        }
1777    }
1778
1779    /// Merges multiple capability providers into one.
1780    pub fn merge<I>(providers: I) -> Self
1781    where
1782        I: IntoIterator<Item = Self>,
1783    {
1784        let mut invocables = Vec::new();
1785        let mut resources = Vec::new();
1786        let mut prompts = Vec::new();
1787
1788        for provider in providers {
1789            invocables.extend(provider.invocables);
1790            resources.extend(provider.resources);
1791            prompts.extend(provider.prompts);
1792        }
1793
1794        Self {
1795            invocables,
1796            resources,
1797            prompts,
1798        }
1799    }
1800
1801    /// Connects to an MCP server, performs discovery, and builds a provider.
1802    pub async fn connect(
1803        config: &McpServerConfig,
1804    ) -> Result<(Arc<McpConnection>, Self, McpDiscoverySnapshot), McpError> {
1805        let connection = Arc::new(McpConnection::connect(config).await?);
1806        let snapshot = connection.discover().await?;
1807        let provider = Self::from_snapshot(connection.clone(), &snapshot);
1808
1809        Ok((connection, provider, snapshot))
1810    }
1811}
1812
1813impl CapabilityProvider for McpCapabilityProvider {
1814    fn invocables(&self) -> Vec<Arc<dyn Invocable>> {
1815        self.invocables.clone()
1816    }
1817
1818    fn resources(&self) -> Vec<Arc<dyn ResourceProvider>> {
1819        self.resources.clone()
1820    }
1821
1822    fn prompts(&self) -> Vec<Arc<dyn PromptProvider>> {
1823        self.prompts.clone()
1824    }
1825}
1826
1827/// A connected MCP server together with its configuration and snapshot.
1828#[derive(Clone)]
1829pub struct McpServerHandle {
1830    config: McpServerConfig,
1831    connection: Arc<McpConnection>,
1832    snapshot: McpDiscoverySnapshot,
1833    namespace: McpToolNamespace,
1834}
1835
1836impl McpServerHandle {
1837    /// Returns the original configuration used to connect this server.
1838    pub fn config(&self) -> &McpServerConfig {
1839        &self.config
1840    }
1841
1842    /// Returns the server's unique identifier.
1843    pub fn server_id(&self) -> &McpServerId {
1844        self.connection.server_id()
1845    }
1846
1847    /// Returns a shared reference to the underlying [`McpConnection`].
1848    pub fn connection(&self) -> Arc<McpConnection> {
1849        self.connection.clone()
1850    }
1851
1852    /// Returns the discovery snapshot captured when the server was connected.
1853    pub fn snapshot(&self) -> &McpDiscoverySnapshot {
1854        &self.snapshot
1855    }
1856
1857    /// Returns the tool naming strategy in effect for this server.
1858    pub fn namespace(&self) -> &McpToolNamespace {
1859        &self.namespace
1860    }
1861
1862    /// Builds a [`ToolRegistry`] containing an [`McpToolAdapter`] for each tool.
1863    pub fn tool_registry(&self) -> ToolRegistry {
1864        self.snapshot
1865            .tools
1866            .iter()
1867            .cloned()
1868            .fold(ToolRegistry::new(), |registry, tool| {
1869                registry.with(McpToolAdapter::with_namespace(
1870                    self.server_id(),
1871                    self.connection.clone(),
1872                    tool,
1873                    &self.namespace,
1874                ))
1875            })
1876    }
1877
1878    /// Builds an [`McpCapabilityProvider`] from this server's snapshot.
1879    pub fn capability_provider(&self) -> McpCapabilityProvider {
1880        McpCapabilityProvider::from_snapshot_with_namespace(
1881            self.connection.clone(),
1882            &self.snapshot,
1883            &self.namespace,
1884        )
1885    }
1886}
1887
1888/// Manages the lifecycle of one or more MCP servers.
1889pub struct McpServerManager {
1890    configs: BTreeMap<McpServerId, McpServerConfig>,
1891    connections: BTreeMap<McpServerId, McpServerHandle>,
1892    auth: BTreeMap<McpServerId, MetadataMap>,
1893    catalog_tx: broadcast::Sender<McpCatalogEvent>,
1894    namespace: McpToolNamespace,
1895    handler_config: McpHandlerConfig,
1896    catalog_writer: CatalogWriter,
1897    /// Agentkit-namespaced tool names this manager has registered for each
1898    /// connected server. Used to perform surgical writes against the
1899    /// [`CatalogWriter`] on connect/disconnect/refresh without rebuilding
1900    /// the whole catalog.
1901    server_tools: BTreeMap<McpServerId, BTreeSet<ToolName>>,
1902}
1903
1904impl Default for McpServerManager {
1905    fn default() -> Self {
1906        let (catalog_tx, _) = broadcast::channel(128);
1907        let (catalog_writer, _) = dynamic_catalog("mcp");
1908        Self {
1909            configs: BTreeMap::new(),
1910            connections: BTreeMap::new(),
1911            auth: BTreeMap::new(),
1912            catalog_tx,
1913            namespace: McpToolNamespace::Default,
1914            handler_config: McpHandlerConfig::default(),
1915            catalog_writer,
1916            server_tools: BTreeMap::new(),
1917        }
1918    }
1919}
1920
1921impl McpServerManager {
1922    /// Creates an empty server manager with no registered servers.
1923    pub fn new() -> Self {
1924        Self::default()
1925    }
1926
1927    /// Sets the tool naming strategy for every adapter built by this manager.
1928    pub fn with_namespace(mut self, namespace: McpToolNamespace) -> Self {
1929        self.namespace = namespace;
1930        self
1931    }
1932
1933    /// Replaces the tool naming strategy in place.
1934    pub fn set_namespace(&mut self, namespace: McpToolNamespace) -> &mut Self {
1935        self.namespace = namespace;
1936        self
1937    }
1938
1939    /// Returns the active tool naming strategy.
1940    pub fn namespace(&self) -> &McpToolNamespace {
1941        &self.namespace
1942    }
1943
1944    /// Replaces the [`McpHandlerConfig`] applied to every connection this
1945    /// manager opens.
1946    pub fn with_handler_config(mut self, handler_config: McpHandlerConfig) -> Self {
1947        self.handler_config = handler_config;
1948        self
1949    }
1950
1951    /// Sets the [`McpHandlerConfig`] in place.
1952    pub fn set_handler_config(&mut self, handler_config: McpHandlerConfig) -> &mut Self {
1953        self.handler_config = handler_config;
1954        self
1955    }
1956
1957    /// Returns the active [`McpHandlerConfig`].
1958    pub fn handler_config(&self) -> &McpHandlerConfig {
1959        &self.handler_config
1960    }
1961
1962    /// Registers a server configuration. Returns `self` for chaining.
1963    pub fn with_server(mut self, config: McpServerConfig) -> Self {
1964        self.register_server(config);
1965        self
1966    }
1967
1968    /// Registers a server configuration by mutable reference.
1969    pub fn register_server(&mut self, config: McpServerConfig) -> &mut Self {
1970        self.configs.insert(config.id.clone(), config);
1971        self
1972    }
1973
1974    /// Returns the handle for a connected server, or `None` if not connected.
1975    pub fn connected_server(&self, server_id: &McpServerId) -> Option<&McpServerHandle> {
1976        self.connections.get(server_id)
1977    }
1978
1979    /// Returns handles for all currently connected servers.
1980    pub fn connected_servers(&self) -> Vec<&McpServerHandle> {
1981        self.connections.values().collect()
1982    }
1983
1984    /// Subscribes to MCP catalog and lifecycle events.
1985    pub fn subscribe_catalog_events(&self) -> broadcast::Receiver<McpCatalogEvent> {
1986        self.catalog_tx.subscribe()
1987    }
1988
1989    fn emit_catalog_event(&self, event: McpCatalogEvent) {
1990        let _ = self.catalog_tx.send(event);
1991    }
1992
1993    /// Connects a single registered server by its identifier.
1994    pub async fn connect_server(
1995        &mut self,
1996        server_id: &McpServerId,
1997    ) -> Result<McpServerHandle, McpError> {
1998        let config = self
1999            .configs
2000            .get(server_id)
2001            .cloned()
2002            .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2003        let connection = Arc::new(
2004            McpConnection::connect_with_auth(
2005                &config,
2006                self.auth.get(server_id),
2007                self.handler_config.clone(),
2008            )
2009            .await?,
2010        );
2011        let snapshot = connection.discover().await?;
2012        let handle = McpServerHandle {
2013            config,
2014            connection,
2015            snapshot,
2016            namespace: self.namespace.clone(),
2017        };
2018        self.connections.insert(server_id.clone(), handle.clone());
2019        self.register_server_tools(server_id, &handle.snapshot);
2020        self.emit_catalog_event(McpCatalogEvent::ServerConnected {
2021            server_id: server_id.clone(),
2022        });
2023        Ok(handle)
2024    }
2025
2026    /// Connects all registered servers concurrently.
2027    pub async fn connect_all(&mut self) -> Result<Vec<McpServerHandle>, McpError> {
2028        let plans: Vec<(McpServerId, McpServerConfig, Option<MetadataMap>)> = self
2029            .configs
2030            .iter()
2031            .map(|(id, cfg)| (id.clone(), cfg.clone(), self.auth.get(id).cloned()))
2032            .collect();
2033        let handler_config = self.handler_config.clone();
2034        let namespace = self.namespace.clone();
2035
2036        let futures = plans.into_iter().map(|(server_id, config, auth)| {
2037            let handler_config = handler_config.clone();
2038            let namespace = namespace.clone();
2039            async move {
2040                let connection = Arc::new(
2041                    McpConnection::connect_with_auth(&config, auth.as_ref(), handler_config)
2042                        .await?,
2043                );
2044                let snapshot = connection.discover().await?;
2045                Ok::<(McpServerId, McpServerHandle), McpError>((
2046                    server_id,
2047                    McpServerHandle {
2048                        config,
2049                        connection,
2050                        snapshot,
2051                        namespace,
2052                    },
2053                ))
2054            }
2055        });
2056
2057        let results = try_join_all(futures).await?;
2058        let mut handles = Vec::with_capacity(results.len());
2059        let mut connected: Vec<(McpServerId, McpDiscoverySnapshot)> =
2060            Vec::with_capacity(results.len());
2061        for (server_id, handle) in results {
2062            connected.push((server_id.clone(), handle.snapshot.clone()));
2063            self.connections.insert(server_id, handle.clone());
2064            handles.push(handle);
2065        }
2066        for (server_id, snapshot) in &connected {
2067            self.register_server_tools(server_id, snapshot);
2068        }
2069        for (server_id, _) in connected {
2070            self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2071        }
2072        Ok(handles)
2073    }
2074
2075    /// Re-discovers capabilities for a connected server.
2076    pub async fn refresh_server(
2077        &mut self,
2078        server_id: &McpServerId,
2079    ) -> Result<McpDiscoverySnapshot, McpError> {
2080        let handle = self
2081            .connections
2082            .get_mut(server_id)
2083            .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2084        let previous = handle.snapshot.clone();
2085        let snapshot = match handle.connection.discover().await {
2086            Ok(snapshot) => snapshot,
2087            Err(error) => {
2088                self.emit_catalog_event(McpCatalogEvent::RefreshFailed {
2089                    server_id: server_id.clone(),
2090                    message: error.to_string(),
2091                });
2092                return Err(error);
2093            }
2094        };
2095        handle.snapshot = snapshot.clone();
2096        let events = diff_discovery_snapshots(server_id, &previous, &snapshot);
2097        if !events.is_empty() {
2098            self.apply_catalog_events(server_id, &snapshot, &events);
2099            for event in events {
2100                self.emit_catalog_event(event);
2101            }
2102        }
2103        Ok(snapshot)
2104    }
2105
2106    /// Processes pending server list-change notifications.
2107    pub async fn refresh_changed_catalogs(&mut self) -> Result<Vec<McpCatalogEvent>, McpError> {
2108        let server_ids = self.connections.keys().cloned().collect::<Vec<_>>();
2109        let mut emitted = Vec::new();
2110
2111        for server_id in server_ids {
2112            let Some(connection) = self
2113                .connections
2114                .get(&server_id)
2115                .map(McpServerHandle::connection)
2116            else {
2117                continue;
2118            };
2119            let notifications = connection.drain_notifications().await;
2120            if notifications.is_empty() {
2121                continue;
2122            }
2123
2124            let handle = self
2125                .connections
2126                .get_mut(&server_id)
2127                .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2128            let previous = handle.snapshot.clone();
2129            let snapshot = match handle.connection.discover().await {
2130                Ok(snapshot) => snapshot,
2131                Err(error) => {
2132                    let event = McpCatalogEvent::RefreshFailed {
2133                        server_id: server_id.clone(),
2134                        message: error.to_string(),
2135                    };
2136                    self.emit_catalog_event(event.clone());
2137                    emitted.push(event);
2138                    return Err(error);
2139                }
2140            };
2141            handle.snapshot = snapshot.clone();
2142            let events = diff_discovery_snapshots(&server_id, &previous, &snapshot);
2143            if !events.is_empty() {
2144                self.apply_catalog_events(&server_id, &snapshot, &events);
2145                for event in events {
2146                    self.emit_catalog_event(event.clone());
2147                    emitted.push(event);
2148                }
2149            }
2150        }
2151
2152        Ok(emitted)
2153    }
2154
2155    /// Disconnects a server and removes it from active connections.
2156    pub async fn disconnect_server(&mut self, server_id: &McpServerId) -> Result<(), McpError> {
2157        let Some(handle) = self.connections.remove(server_id) else {
2158            return Err(McpError::UnknownServer(server_id.to_string()));
2159        };
2160        handle.connection.close().await?;
2161        self.unregister_server_tools(server_id);
2162        self.emit_catalog_event(McpCatalogEvent::ServerDisconnected {
2163            server_id: server_id.clone(),
2164        });
2165        Ok(())
2166    }
2167
2168    /// Stores or clears authentication credentials for a server.
2169    pub async fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), McpError> {
2170        let server_id = resolution
2171            .request()
2172            .server_id()
2173            .ok_or_else(|| McpError::AuthResolution("auth resolution missing server id".into()))?;
2174        let server_id = McpServerId::new(server_id);
2175        match &resolution {
2176            AuthResolution::Provided { credentials, .. } => {
2177                self.auth.insert(server_id.clone(), credentials.clone());
2178            }
2179            AuthResolution::Cancelled { .. } => {
2180                self.auth.remove(&server_id);
2181            }
2182        }
2183
2184        if let Some(handle) = self.connections.get(&server_id) {
2185            handle.connection.resolve_auth(resolution).await?;
2186        } else if !self.configs.contains_key(&server_id) {
2187            return Err(McpError::UnknownServer(server_id.to_string()));
2188        }
2189        self.emit_catalog_event(McpCatalogEvent::AuthChanged { server_id });
2190        Ok(())
2191    }
2192
2193    /// Builds a one-shot snapshot [`ToolRegistry`] of every tool across all
2194    /// connected servers. Use [`source`](Self::source) instead when wiring
2195    /// the manager into an [`agentkit_loop::Agent`] so tool catalog changes
2196    /// flow through automatically.
2197    pub fn tool_registry(&self) -> ToolRegistry {
2198        self.connections
2199            .values()
2200            .fold(ToolRegistry::new(), |mut registry, handle| {
2201                for tool in handle.snapshot.tools.iter().cloned() {
2202                    registry.register(McpToolAdapter::with_namespace(
2203                        handle.server_id(),
2204                        handle.connection.clone(),
2205                        tool,
2206                        &self.namespace,
2207                    ));
2208                }
2209                registry
2210            })
2211    }
2212
2213    /// Returns the manager's federated [`CatalogReader`].
2214    ///
2215    /// The manager keeps an internal `CatalogWriter` in sync with every
2216    /// connect, disconnect, and catalog refresh; the returned reader sees
2217    /// the added/removed/changed tool sets via
2218    /// [`ToolSource::drain_catalog_events`]. Pass it to
2219    /// [`agentkit_loop::AgentBuilder::tools`] alongside any frozen native
2220    /// [`ToolRegistry`].
2221    ///
2222    /// Each call returns a fresh reader subscription — events emitted before
2223    /// this call are not replayed. Call once at agent setup time and reuse.
2224    pub fn source(&self) -> CatalogReader {
2225        self.catalog_writer.reader()
2226    }
2227
2228    /// Surgically updates the tool catalog from the diff events produced
2229    /// by [`diff_discovery_snapshots`]. Only [`McpCatalogEvent::ToolsChanged`]
2230    /// affects the catalog — resource and prompt diffs are observed by the
2231    /// caller via the broadcast stream and don't touch tool state.
2232    fn apply_catalog_events(
2233        &mut self,
2234        server_id: &McpServerId,
2235        snapshot: &McpDiscoverySnapshot,
2236        events: &[McpCatalogEvent],
2237    ) {
2238        for event in events {
2239            if let McpCatalogEvent::ToolsChanged {
2240                added,
2241                removed,
2242                changed,
2243                ..
2244            } = event
2245            {
2246                self.apply_server_tool_diff(server_id, snapshot, added, removed, changed);
2247            }
2248        }
2249    }
2250
2251    /// Registers every tool from a freshly-discovered snapshot, recording
2252    /// the agentkit-namespaced names so [`Self::unregister_server_tools`]
2253    /// can later remove exactly this set.
2254    fn register_server_tools(&mut self, server_id: &McpServerId, snapshot: &McpDiscoverySnapshot) {
2255        let connection = match self.connections.get(server_id) {
2256            Some(handle) => handle.connection.clone(),
2257            None => return,
2258        };
2259        let previous = self.server_tools.remove(server_id).unwrap_or_default();
2260        let mut names = BTreeSet::new();
2261        for tool in &snapshot.tools {
2262            let adapter = McpToolAdapter::with_namespace(
2263                server_id,
2264                connection.clone(),
2265                tool.clone(),
2266                &self.namespace,
2267            );
2268            names.insert(adapter.spec().name.clone());
2269            self.catalog_writer.upsert(Arc::new(adapter));
2270        }
2271        for stale in previous.difference(&names) {
2272            self.catalog_writer.remove(stale);
2273        }
2274        self.server_tools.insert(server_id.clone(), names);
2275    }
2276
2277    /// Removes every agentkit-namespaced tool previously registered for
2278    /// `server_id`. No-op if the server was never registered.
2279    fn unregister_server_tools(&mut self, server_id: &McpServerId) {
2280        let Some(names) = self.server_tools.remove(server_id) else {
2281            return;
2282        };
2283        for name in names {
2284            self.catalog_writer.remove(&name);
2285        }
2286    }
2287
2288    /// Applies a per-tool diff (in raw MCP names) against the current
2289    /// catalog: removes are pruned, adds and changes are upserted from the
2290    /// fresh snapshot. Updates the per-server name index accordingly.
2291    fn apply_server_tool_diff(
2292        &mut self,
2293        server_id: &McpServerId,
2294        snapshot: &McpDiscoverySnapshot,
2295        added: &[String],
2296        removed: &[String],
2297        changed: &[String],
2298    ) {
2299        let connection = match self.connections.get(server_id) {
2300            Some(handle) => handle.connection.clone(),
2301            None => return,
2302        };
2303        let names = self.server_tools.entry(server_id.clone()).or_default();
2304
2305        for raw_name in removed {
2306            let agentkit_name = ToolName::new(self.namespace.apply(server_id, raw_name));
2307            if names.remove(&agentkit_name) {
2308                self.catalog_writer.remove(&agentkit_name);
2309            }
2310        }
2311
2312        let upsert_one = |raw_name: &str| -> Option<(ToolName, McpToolAdapter)> {
2313            let tool = snapshot
2314                .tools
2315                .iter()
2316                .find(|tool| tool.name.as_ref() == raw_name)?
2317                .clone();
2318            let adapter = McpToolAdapter::with_namespace(
2319                server_id,
2320                connection.clone(),
2321                tool,
2322                &self.namespace,
2323            );
2324            Some((adapter.spec().name.clone(), adapter))
2325        };
2326
2327        for raw_name in added.iter().chain(changed.iter()) {
2328            if let Some((agentkit_name, adapter)) = upsert_one(raw_name) {
2329                names.insert(agentkit_name);
2330                self.catalog_writer.upsert(Arc::new(adapter));
2331            }
2332        }
2333    }
2334
2335    /// Builds a combined [`McpCapabilityProvider`] from all connected servers.
2336    pub fn capability_provider(&self) -> McpCapabilityProvider {
2337        McpCapabilityProvider::merge(
2338            self.connections
2339                .values()
2340                .map(McpServerHandle::capability_provider),
2341        )
2342    }
2343}
2344
2345fn diff_discovery_snapshots(
2346    server_id: &McpServerId,
2347    previous: &McpDiscoverySnapshot,
2348    current: &McpDiscoverySnapshot,
2349) -> Vec<McpCatalogEvent> {
2350    let mut events = Vec::new();
2351    let (added, removed, changed) = diff_named_items(
2352        previous.tools.iter().map(|item| (item.name.as_ref(), item)),
2353        current.tools.iter().map(|item| (item.name.as_ref(), item)),
2354    );
2355    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2356        events.push(McpCatalogEvent::ToolsChanged {
2357            server_id: server_id.clone(),
2358            added,
2359            removed,
2360            changed,
2361        });
2362    }
2363
2364    let (added, removed, changed) = diff_named_items(
2365        previous
2366            .resources
2367            .iter()
2368            .map(|item| (item.uri.as_str(), item)),
2369        current
2370            .resources
2371            .iter()
2372            .map(|item| (item.uri.as_str(), item)),
2373    );
2374    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2375        events.push(McpCatalogEvent::ResourcesChanged {
2376            server_id: server_id.clone(),
2377            added,
2378            removed,
2379            changed,
2380        });
2381    }
2382
2383    let (added, removed, changed) = diff_named_items(
2384        previous
2385            .prompts
2386            .iter()
2387            .map(|item| (item.name.as_str(), item)),
2388        current
2389            .prompts
2390            .iter()
2391            .map(|item| (item.name.as_str(), item)),
2392    );
2393    if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2394        events.push(McpCatalogEvent::PromptsChanged {
2395            server_id: server_id.clone(),
2396            added,
2397            removed,
2398            changed,
2399        });
2400    }
2401
2402    events
2403}
2404
2405/// Merge-walks two name-keyed sequences and produces added/removed/changed
2406/// name lists. Each side is sorted in place; no intermediate maps are
2407/// allocated. Names are cloned only at output time.
2408fn diff_named_items<'a, T>(
2409    previous: impl IntoIterator<Item = (&'a str, &'a T)>,
2410    current: impl IntoIterator<Item = (&'a str, &'a T)>,
2411) -> (Vec<String>, Vec<String>, Vec<String>)
2412where
2413    T: PartialEq + 'a,
2414{
2415    let mut prev: Vec<(&str, &T)> = previous.into_iter().collect();
2416    let mut curr: Vec<(&str, &T)> = current.into_iter().collect();
2417    prev.sort_unstable_by_key(|(name, _)| *name);
2418    curr.sort_unstable_by_key(|(name, _)| *name);
2419
2420    let mut added = Vec::new();
2421    let mut removed = Vec::new();
2422    let mut changed = Vec::new();
2423    let (mut i, mut j) = (0, 0);
2424    while i < prev.len() && j < curr.len() {
2425        match prev[i].0.cmp(curr[j].0) {
2426            std::cmp::Ordering::Less => {
2427                removed.push(prev[i].0.to_string());
2428                i += 1;
2429            }
2430            std::cmp::Ordering::Greater => {
2431                added.push(curr[j].0.to_string());
2432                j += 1;
2433            }
2434            std::cmp::Ordering::Equal => {
2435                if prev[i].1 != curr[j].1 {
2436                    changed.push(curr[j].0.to_string());
2437                }
2438                i += 1;
2439                j += 1;
2440            }
2441        }
2442    }
2443    while i < prev.len() {
2444        removed.push(prev[i].0.to_string());
2445        i += 1;
2446    }
2447    while j < curr.len() {
2448        added.push(curr[j].0.to_string());
2449        j += 1;
2450    }
2451
2452    (added, removed, changed)
2453}
2454
2455/// Adapter exposing an MCP tool as an agentkit [`Tool`].
2456pub struct McpToolAdapter {
2457    tool_name: String,
2458    connection: Arc<McpConnection>,
2459    spec: ToolSpec,
2460}
2461
2462impl McpToolAdapter {
2463    /// Creates a new tool adapter for the given MCP tool, using the
2464    /// [`McpToolNamespace::Default`] naming strategy.
2465    pub fn new(server_id: &McpServerId, connection: Arc<McpConnection>, tool: McpTool) -> Self {
2466        Self::with_namespace(server_id, connection, tool, &McpToolNamespace::Default)
2467    }
2468
2469    /// Creates a new tool adapter with a custom name-namespacing strategy.
2470    pub fn with_namespace(
2471        server_id: &McpServerId,
2472        connection: Arc<McpConnection>,
2473        tool: McpTool,
2474        namespace: &McpToolNamespace,
2475    ) -> Self {
2476        let spec = tool_spec_from_tool(server_id, &tool, namespace);
2477        Self {
2478            tool_name: tool.name.into_owned(),
2479            connection,
2480            spec,
2481        }
2482    }
2483}
2484
2485#[async_trait]
2486impl Tool for McpToolAdapter {
2487    fn spec(&self) -> &ToolSpec {
2488        &self.spec
2489    }
2490
2491    async fn invoke(
2492        &self,
2493        request: ToolRequest,
2494        _ctx: &mut ToolContext<'_>,
2495    ) -> Result<ToolResult, ToolError> {
2496        let input = request.input;
2497        let result = match self
2498            .connection
2499            .call_tool(&self.tool_name, input.clone())
2500            .await
2501        {
2502            Ok(result) => result,
2503            Err(McpError::AuthRequired(auth_request)) => {
2504                let responder = self
2505                    .connection
2506                    .handler_config()
2507                    .auth
2508                    .clone()
2509                    .ok_or_else(|| {
2510                        ToolError::ExecutionFailed(
2511                            "MCP server requires auth but no McpAuthResponder is registered".into(),
2512                        )
2513                    })?;
2514                let resolution = responder.resolve(*auth_request).await.map_err(|error| {
2515                    ToolError::ExecutionFailed(format!("auth responder failed: {error}"))
2516                })?;
2517                match &resolution {
2518                    AuthResolution::Provided { .. } => {
2519                        self.connection
2520                            .resolve_auth(resolution.clone())
2521                            .await
2522                            .map_err(|error| {
2523                                ToolError::ExecutionFailed(format!(
2524                                    "applying auth resolution failed: {error}"
2525                                ))
2526                            })?;
2527                    }
2528                    AuthResolution::Cancelled { .. } => {
2529                        return Err(ToolError::ExecutionFailed(
2530                            "user cancelled MCP auth flow".into(),
2531                        ));
2532                    }
2533                }
2534                self.connection
2535                    .call_tool(&self.tool_name, input)
2536                    .await
2537                    .map_err(|err| match err {
2538                        McpError::AuthRequired(req) => ToolError::ExecutionFailed(format!(
2539                            "MCP auth challenge unresolved after retry: {}",
2540                            req.id
2541                        )),
2542                        other => ToolError::ExecutionFailed(other.to_string()),
2543                    })?
2544            }
2545            Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
2546        };
2547
2548        let is_error = result.is_error.unwrap_or(false);
2549        Ok(ToolResult {
2550            result: ToolResultPart {
2551                call_id: request.call_id,
2552                output: call_tool_result_to_tool_output(result),
2553                is_error,
2554                metadata: MetadataMap::new(),
2555            },
2556            duration: None,
2557            metadata: MetadataMap::new(),
2558        })
2559    }
2560}
2561
2562fn rmcp_server_capabilities_to_agentkit(
2563    capabilities: &rmcp_model::ServerCapabilities,
2564) -> McpServerCapabilities {
2565    McpServerCapabilities {
2566        tools: capabilities.tools.as_ref().map(|tools| ToolsCapability {
2567            list_changed: tools.list_changed,
2568        }),
2569        resources: capabilities
2570            .resources
2571            .as_ref()
2572            .map(|resources| ResourcesCapability {
2573                subscribe: resources.subscribe,
2574                list_changed: resources.list_changed,
2575            }),
2576        prompts: capabilities
2577            .prompts
2578            .as_ref()
2579            .map(|prompts| PromptsCapability {
2580                list_changed: prompts.list_changed,
2581            }),
2582        logging: capabilities.logging.as_ref().map(|_| LoggingCapability {}),
2583    }
2584}
2585
2586fn tool_spec_from_tool(
2587    server_id: &McpServerId,
2588    tool: &McpTool,
2589    namespace: &McpToolNamespace,
2590) -> ToolSpec {
2591    ToolSpec {
2592        name: ToolName::new(namespace.apply(server_id, &tool.name)),
2593        description: tool
2594            .description
2595            .as_ref()
2596            .map(|d| d.to_string())
2597            .unwrap_or_else(|| tool.name.to_string()),
2598        input_schema: Value::Object((*tool.input_schema).clone()),
2599        annotations: tool_annotations_from_rmcp(tool.annotations.as_ref()),
2600        metadata: MetadataMap::new(),
2601    }
2602}
2603
2604fn tool_annotations_from_rmcp(annotations: Option<&McpToolAnnotations>) -> ToolAnnotations {
2605    let Some(annotations) = annotations else {
2606        return ToolAnnotations::default();
2607    };
2608    // rmcp expresses each hint as `Option<bool>` (advisory; absent means
2609    // unspecified). agentkit collapses absent → false. Tools that need to
2610    // distinguish "absent" from "false" should inspect the underlying
2611    // `McpTool::annotations` directly via the snapshot. MCP has no
2612    // `needs_approval` hint, so leave it unset and let the loop's permission
2613    // policy drive approval.
2614    ToolAnnotations {
2615        read_only_hint: annotations.read_only_hint.unwrap_or(false),
2616        destructive_hint: annotations.destructive_hint.unwrap_or(false),
2617        idempotent_hint: annotations.idempotent_hint.unwrap_or(false),
2618        needs_approval_hint: false,
2619        supports_streaming_hint: false,
2620    }
2621}
2622
2623fn resource_descriptor_from_rmcp(resource: McpResource) -> ResourceDescriptor {
2624    let raw = resource.raw;
2625    ResourceDescriptor {
2626        id: ResourceId::new(raw.uri),
2627        name: raw.name,
2628        description: raw.description,
2629        mime_type: raw.mime_type,
2630        metadata: MetadataMap::new(),
2631    }
2632}
2633
2634fn prompt_descriptor_from_rmcp(prompt: McpPrompt) -> PromptDescriptor {
2635    let arguments = prompt.arguments.unwrap_or_default();
2636    let mut required = Vec::new();
2637    let properties = arguments
2638        .into_iter()
2639        .map(|argument| {
2640            let mut schema = serde_json::Map::new();
2641            schema.insert("type".into(), Value::String("string".into()));
2642            if let Some(description) = argument.description {
2643                schema.insert("description".into(), Value::String(description));
2644            }
2645            if argument.required.unwrap_or(false) {
2646                required.push(Value::String(argument.name.clone()));
2647            }
2648            (argument.name, Value::Object(schema))
2649        })
2650        .collect::<serde_json::Map<String, Value>>();
2651    let mut input_schema = serde_json::Map::new();
2652    input_schema.insert("type".into(), Value::String("object".into()));
2653    input_schema.insert("properties".into(), Value::Object(properties));
2654    if !required.is_empty() {
2655        input_schema.insert("required".into(), Value::Array(required));
2656    }
2657
2658    PromptDescriptor {
2659        id: PromptId::new(prompt.name.clone()),
2660        name: prompt.name,
2661        description: prompt.description,
2662        input_schema: Value::Object(input_schema),
2663        metadata: MetadataMap::new(),
2664    }
2665}
2666
2667fn read_resource_result_to_capabilities(
2668    result: ReadResourceResult,
2669) -> Result<ResourceContents, McpError> {
2670    let content = result
2671        .contents
2672        .into_iter()
2673        .next()
2674        .ok_or_else(|| McpError::Protocol("resources/read returned no contents".into()))?;
2675    Ok(resource_contents_to_capabilities(content))
2676}
2677
2678fn resource_contents_to_capabilities(content: McpResourceContents) -> ResourceContents {
2679    let mut metadata = MetadataMap::new();
2680    let data = match content {
2681        McpResourceContents::TextResourceContents {
2682            text, mime_type, ..
2683        } => {
2684            if let Some(mime) = mime_type {
2685                metadata.insert("mime_type".into(), Value::String(mime));
2686            }
2687            DataRef::InlineText(text)
2688        }
2689        McpResourceContents::BlobResourceContents {
2690            blob,
2691            mime_type,
2692            uri,
2693            ..
2694        } => {
2695            if let Some(mime) = mime_type {
2696                metadata.insert("mime_type".into(), Value::String(mime));
2697            }
2698            metadata.insert("uri".into(), Value::String(uri));
2699            // rmcp delivers blobs as base64-encoded text on the wire.
2700            DataRef::InlineText(blob)
2701        }
2702    };
2703    ResourceContents { data, metadata }
2704}
2705
2706fn get_prompt_result_to_capabilities(result: GetPromptResult) -> PromptContents {
2707    let items = result
2708        .messages
2709        .into_iter()
2710        .map(prompt_message_to_item)
2711        .collect();
2712    let mut metadata = MetadataMap::new();
2713    if let Some(description) = result.description {
2714        metadata.insert("description".into(), Value::String(description));
2715    }
2716    PromptContents { items, metadata }
2717}
2718
2719fn prompt_message_to_item(message: PromptMessage) -> Item {
2720    let kind = match message.role {
2721        PromptMessageRole::Assistant => ItemKind::Assistant,
2722        PromptMessageRole::User => ItemKind::User,
2723    };
2724    Item {
2725        id: None,
2726        kind,
2727        parts: vec![prompt_message_content_to_part(message.content)],
2728        metadata: MetadataMap::new(),
2729        usage: None,
2730        finish_reason: None,
2731        created_at: None,
2732    }
2733}
2734
2735fn prompt_message_content_to_part(content: PromptMessageContent) -> Part {
2736    match content {
2737        PromptMessageContent::Text { text } => Part::Text(TextPart::new(text)),
2738        PromptMessageContent::Image { image } => Part::Media(MediaPart::new(
2739            Modality::Image,
2740            image.mime_type.clone(),
2741            DataRef::InlineText(image.data.clone()),
2742        )),
2743        PromptMessageContent::Resource { resource } => {
2744            let agentkit_resource = resource_contents_to_capabilities(resource.resource.clone());
2745            agentkit_part_from_resource(agentkit_resource)
2746        }
2747        PromptMessageContent::ResourceLink { link } => Part::Text(TextPart::new(link.uri.clone())),
2748    }
2749}
2750
2751fn agentkit_part_from_resource(resource: ResourceContents) -> Part {
2752    let mime = resource
2753        .metadata
2754        .get("mime_type")
2755        .and_then(Value::as_str)
2756        .unwrap_or("text/plain")
2757        .to_string();
2758    Part::Media(MediaPart::new(Modality::Binary, mime, resource.data))
2759}
2760
2761fn call_tool_result_to_tool_output(result: CallToolResult) -> ToolOutput {
2762    if let Some(structured) = result.structured_content {
2763        return ToolOutput::Structured(structured);
2764    }
2765    let parts = call_tool_content_to_parts(result.content);
2766    if parts.iter().all(|part| matches!(part, Part::Text(_))) {
2767        let text = parts
2768            .iter()
2769            .filter_map(|part| match part {
2770                Part::Text(text) => Some(text.text.clone()),
2771                _ => None,
2772            })
2773            .collect::<Vec<_>>()
2774            .join("\n");
2775        ToolOutput::Text(text)
2776    } else {
2777        ToolOutput::Parts(parts)
2778    }
2779}
2780
2781fn call_tool_content_to_parts(contents: Vec<Content>) -> Vec<Part> {
2782    contents.into_iter().map(content_to_part).collect()
2783}
2784
2785fn content_to_part(content: Content) -> Part {
2786    match content.raw {
2787        RawContent::Text(text) => Part::Text(TextPart::new(text.text)),
2788        RawContent::Image(image) => Part::Media(MediaPart::new(
2789            Modality::Image,
2790            image.mime_type,
2791            DataRef::InlineText(image.data),
2792        )),
2793        RawContent::Audio(audio) => Part::Media(MediaPart::new(
2794            Modality::Audio,
2795            audio.mime_type,
2796            DataRef::InlineText(audio.data),
2797        )),
2798        RawContent::Resource(embedded) => {
2799            agentkit_part_from_resource(resource_contents_to_capabilities(embedded.resource))
2800        }
2801        RawContent::ResourceLink(link) => Part::Text(TextPart::new(link.uri)),
2802    }
2803}
2804
2805fn value_to_json_object(value: Value, context: &str) -> Result<rmcp_model::JsonObject, McpError> {
2806    match value {
2807        Value::Object(object) => Ok(object),
2808        Value::Null => Ok(serde_json::Map::new()),
2809        other => Err(McpError::Protocol(format!(
2810            "{context} must be a JSON object, got {other}"
2811        ))),
2812    }
2813}
2814
2815fn bearer_token_from_metadata(metadata: &MetadataMap) -> Option<String> {
2816    ["bearer_token", "access_token", "token", "api_key"]
2817        .into_iter()
2818        .find_map(|key| metadata.get(key).and_then(Value::as_str).map(str::to_owned))
2819}
2820
2821fn rmcp_initialize_error(config: &McpServerConfig, error: ClientInitializeError) -> McpError {
2822    if let Some(signal) = match &error {
2823        ClientInitializeError::TransportError { error: dyn_err, .. } => {
2824            transport_auth_signal(dyn_err)
2825        }
2826        _ => None,
2827    } {
2828        return McpError::AuthRequired(Box::new(auth_request_from_signal(
2829            &config.id,
2830            McpMethod::Initialize,
2831            signal,
2832            &error.to_string(),
2833        )));
2834    }
2835    McpError::Transport(error.to_string())
2836}
2837
2838fn rmcp_service_error(error: ServiceError) -> McpError {
2839    McpError::Invocation(error.to_string())
2840}
2841
2842fn rmcp_operation_error(
2843    server_id: &McpServerId,
2844    method: McpMethod,
2845    error: ServiceError,
2846) -> McpError {
2847    if let Some(signal) = service_auth_signal(&error) {
2848        return McpError::AuthRequired(Box::new(auth_request_from_signal(
2849            server_id,
2850            method,
2851            signal,
2852            &error.to_string(),
2853        )));
2854    }
2855    McpError::Invocation(error.to_string())
2856}
2857
2858#[derive(Debug)]
2859enum AuthSignal {
2860    Required {
2861        www_authenticate: Option<String>,
2862    },
2863    InsufficientScope {
2864        www_authenticate: Option<String>,
2865        required_scope: Option<String>,
2866    },
2867}
2868
2869fn service_auth_signal(error: &ServiceError) -> Option<AuthSignal> {
2870    match error {
2871        ServiceError::TransportSend(dyn_err) => transport_auth_signal(dyn_err),
2872        _ => None,
2873    }
2874}
2875
2876fn transport_auth_signal(error: &DynamicTransportError) -> Option<AuthSignal> {
2877    let inner = error
2878        .error
2879        .downcast_ref::<StreamableHttpError<reqwest::Error>>()?;
2880    match inner {
2881        StreamableHttpError::AuthRequired(AuthRequiredError {
2882            www_authenticate_header,
2883            ..
2884        }) => Some(AuthSignal::Required {
2885            www_authenticate: Some(www_authenticate_header.clone()),
2886        }),
2887        StreamableHttpError::InsufficientScope(InsufficientScopeError {
2888            www_authenticate_header,
2889            required_scope,
2890            ..
2891        }) => Some(AuthSignal::InsufficientScope {
2892            www_authenticate: Some(www_authenticate_header.clone()),
2893            required_scope: required_scope.clone(),
2894        }),
2895        _ => None,
2896    }
2897}
2898
2899fn auth_request_from_signal(
2900    server_id: &McpServerId,
2901    method: McpMethod,
2902    signal: AuthSignal,
2903    message: &str,
2904) -> AuthRequest {
2905    let method_name = method.method_name();
2906    let mut challenge = MetadataMap::new();
2907    challenge.insert("server_id".into(), Value::String(server_id.to_string()));
2908    challenge.insert("method".into(), Value::String(method_name.into()));
2909    challenge.insert("message".into(), Value::String(message.into()));
2910    challenge.insert("flow_kind".into(), Value::String("http_bearer".into()));
2911    match signal {
2912        AuthSignal::Required { www_authenticate } => {
2913            if let Some(header) = www_authenticate {
2914                challenge.insert("www_authenticate".into(), Value::String(header));
2915            }
2916        }
2917        AuthSignal::InsufficientScope {
2918            www_authenticate,
2919            required_scope,
2920        } => {
2921            challenge.insert("insufficient_scope".into(), Value::Bool(true));
2922            if let Some(header) = www_authenticate {
2923                challenge.insert("www_authenticate".into(), Value::String(header));
2924            }
2925            if let Some(scope) = required_scope {
2926                challenge.insert("required_scope".into(), Value::String(scope));
2927            }
2928        }
2929    }
2930    AuthRequest {
2931        id: format!("mcp:{}:{}", server_id, method_name),
2932        provider: format!("mcp.{}", server_id),
2933        operation: method.into_auth_operation(server_id),
2934        challenge,
2935    }
2936}
2937
2938/// Typed dispatch for MCP requests that may surface auth challenges. Each
2939/// peer call constructs the matching variant; [`auth_request_from_signal`]
2940/// converts to a public [`AuthOperation`] (typed for the four common cases,
2941/// [`AuthOperation::McpOther`] for the long tail).
2942#[derive(Debug, Clone)]
2943enum McpMethod {
2944    Initialize,
2945    ToolsCall { name: String, arguments: Value },
2946    ResourcesRead { uri: String },
2947    ResourcesSubscribe { uri: String },
2948    ResourcesUnsubscribe { uri: String },
2949    PromptsGet { name: String, arguments: Value },
2950    LoggingSetLevel { level: String },
2951}
2952
2953impl McpMethod {
2954    fn method_name(&self) -> &'static str {
2955        match self {
2956            Self::Initialize => "initialize",
2957            Self::ToolsCall { .. } => "tools/call",
2958            Self::ResourcesRead { .. } => "resources/read",
2959            Self::ResourcesSubscribe { .. } => "resources/subscribe",
2960            Self::ResourcesUnsubscribe { .. } => "resources/unsubscribe",
2961            Self::PromptsGet { .. } => "prompts/get",
2962            Self::LoggingSetLevel { .. } => "logging/setLevel",
2963        }
2964    }
2965
2966    fn into_auth_operation(self, server_id: &McpServerId) -> AuthOperation {
2967        let server = server_id.to_string();
2968        match self {
2969            Self::Initialize => AuthOperation::McpConnect {
2970                server_id: server,
2971                metadata: MetadataMap::new(),
2972            },
2973            Self::ToolsCall { name, arguments } => AuthOperation::McpToolCall {
2974                server_id: server,
2975                tool_name: name,
2976                input: arguments,
2977                metadata: MetadataMap::new(),
2978            },
2979            Self::ResourcesRead { uri } => AuthOperation::McpResourceRead {
2980                server_id: server,
2981                resource_id: uri,
2982                metadata: MetadataMap::new(),
2983            },
2984            Self::PromptsGet { name, arguments } => AuthOperation::McpPromptGet {
2985                server_id: server,
2986                prompt_id: name,
2987                args: arguments,
2988                metadata: MetadataMap::new(),
2989            },
2990            other @ (Self::ResourcesSubscribe { .. }
2991            | Self::ResourcesUnsubscribe { .. }
2992            | Self::LoggingSetLevel { .. }) => {
2993                let method = other.method_name().to_string();
2994                AuthOperation::McpOther {
2995                    server_id: server,
2996                    method,
2997                    params: other.into_params_json(),
2998                    metadata: MetadataMap::new(),
2999                }
3000            }
3001        }
3002    }
3003
3004    fn into_params_json(self) -> Value {
3005        match self {
3006            Self::Initialize => json!({}),
3007            Self::ToolsCall { name, arguments } => json!({ "name": name, "arguments": arguments }),
3008            Self::ResourcesRead { uri } => json!({ "uri": uri }),
3009            Self::ResourcesSubscribe { uri } => json!({ "uri": uri }),
3010            Self::ResourcesUnsubscribe { uri } => json!({ "uri": uri }),
3011            Self::PromptsGet { name, arguments } => {
3012                json!({ "name": name, "arguments": arguments })
3013            }
3014            Self::LoggingSetLevel { level } => json!({ "level": level }),
3015        }
3016    }
3017}
3018
3019/// Errors produced by MCP transport, protocol, and lifecycle operations.
3020#[derive(Debug, Error)]
3021pub enum McpError {
3022    /// An underlying I/O error.
3023    #[error("io error: {0}")]
3024    Io(#[from] std::io::Error),
3025    /// A JSON serialization or deserialization error.
3026    #[error("serialization error: {0}")]
3027    Serialize(#[from] serde_json::Error),
3028    /// A transport-level error.
3029    #[error("transport error: {0}")]
3030    Transport(String),
3031    /// An MCP protocol violation.
3032    #[error("protocol error: {0}")]
3033    Protocol(String),
3034    /// The server requires authentication before the operation can proceed.
3035    #[error("MCP auth required: {0:?}")]
3036    AuthRequired(Box<AuthRequest>),
3037    /// An error occurred while resolving or replaying authentication.
3038    #[error("auth resolution error: {0}")]
3039    AuthResolution(String),
3040    /// The MCP server returned an error for the invoked method.
3041    #[error("invocation error: {0}")]
3042    Invocation(String),
3043    /// The referenced server ID is not registered in the [`McpServerManager`].
3044    #[error("unknown MCP server: {0}")]
3045    UnknownServer(String),
3046}
3047
3048impl From<&str> for McpServerId {
3049    fn from(value: &str) -> Self {
3050        Self::new(value)
3051    }
3052}
3053
3054impl From<String> for McpServerId {
3055    fn from(value: String) -> Self {
3056        Self::new(value)
3057    }
3058}