1use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::fmt;
24use std::sync::{Arc, RwLock};
25use std::time::Duration;
26
27use agentkit_capabilities::{
28 CapabilityContext, CapabilityError, CapabilityProvider, Invocable, PromptContents,
29 PromptDescriptor, PromptId, PromptProvider, ResourceContents, ResourceDescriptor, ResourceId,
30 ResourceProvider,
31};
32use agentkit_core::{
33 DataRef, Item, ItemKind, MediaPart, MetadataMap, Modality, Part, TextPart, ToolOutput,
34 ToolResultPart,
35};
36use agentkit_tools_core::{
37 AllowAllPermissions, CatalogReader, CatalogWriter, PermissionChecker, Tool, ToolAnnotations,
38 ToolCapabilityProvider, ToolContext, ToolError, ToolName, ToolRegistry, ToolRequest,
39 ToolResult, ToolSpec, dynamic_catalog,
40};
41use async_trait::async_trait;
42use futures_util::future::{join_all, try_join_all};
43use futures_util::stream::BoxStream;
44use http::{HeaderName, HeaderValue};
45use rmcp::ServiceExt;
46use rmcp::handler::client::ClientHandler;
47use rmcp::model as rmcp_model;
48use rmcp::service::{ClientInitializeError, Peer, RoleClient, RunningService, ServiceError};
49use rmcp::transport::streamable_http_client::{
50 AuthRequiredError, InsufficientScopeError, StreamableHttpClient as RmcpStreamableHttpClient,
51 StreamableHttpClientTransportConfig as RmcpStreamableHttpClientTransportConfig,
52 StreamableHttpError, StreamableHttpPostResponse,
53};
54use rmcp::transport::{
55 ConfigureCommandExt, DynamicTransportError, StreamableHttpClientTransport, TokioChildProcess,
56};
57use serde::{Deserialize, Serialize};
58use serde_json::{Value, json};
59use sse_stream::{Error as SseError, Sse};
60use thiserror::Error;
61use tokio::sync::{Mutex, broadcast, mpsc};
62
63pub use rmcp::model::{
68 Annotations as McpAnnotations, AudioContent, CallToolResult,
69 CancelledNotificationParam as McpCancelledNotificationParam,
70 ClientCapabilities as McpClientCapabilities, Content,
71 CreateElicitationRequestParams as McpCreateElicitationRequestParams,
72 CreateElicitationResult as McpCreateElicitationResult,
73 CreateMessageRequestParams as McpCreateMessageRequestParams,
74 CreateMessageResult as McpCreateMessageResult, ElicitationAction as McpElicitationAction,
75 ElicitationCapability as McpElicitationCapability, EmbeddedResource,
76 FormElicitationCapability as McpFormElicitationCapability, GetPromptResult, ImageContent,
77 Implementation as McpImplementation, ListRootsResult as McpListRootsResult,
78 LoggingLevel as McpLoggingLevel,
79 LoggingMessageNotificationParam as McpLoggingMessageNotificationParam,
80 ProgressNotificationParam as McpProgressNotificationParam, Prompt as McpPrompt, PromptArgument,
81 PromptMessage, PromptMessageContent, PromptMessageRole, RawAudioContent, RawContent,
82 RawEmbeddedResource, RawImageContent, RawResource as McpRawResource, RawTextContent,
83 ReadResourceResult, Resource as McpResource, ResourceContents as McpResourceContents,
84 ResourceUpdatedNotificationParam as McpResourceUpdatedNotificationParam, Root as McpRoot,
85 RootsCapabilities as McpRootsCapabilities, SamplingCapability as McpSamplingCapability,
86 SamplingMessage as McpSamplingMessage, SetLevelRequestParams as McpSetLevelRequestParams,
87 TextContent, Tool as McpTool, ToolAnnotations as McpToolAnnotations,
88 UrlElicitationCapability as McpUrlElicitationCapability,
89};
90
91pub use rmcp::model::ClientJsonRpcMessage;
94
95pub use rmcp::transport::streamable_http_client::{
98 StreamableHttpError as McpStreamableHttpError,
99 StreamableHttpPostResponse as McpStreamableHttpPostResponse,
100};
101
102pub use sse_stream::{Error as McpSseError, Sse as McpSse};
104
105pub type McpToolDescriptor = McpTool;
107pub type McpResourceDescriptor = McpResource;
109pub type McpPromptDescriptor = McpPrompt;
111
112#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
121pub struct AuthRequest {
122 pub id: String,
124 pub provider: String,
126 pub operation: AuthOperation,
128 pub challenge: MetadataMap,
130}
131
132impl AuthRequest {
133 pub fn server_id(&self) -> Option<&str> {
135 self.operation.server_id()
136 }
137}
138
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
141pub enum AuthOperation {
142 McpConnect {
144 server_id: String,
145 metadata: MetadataMap,
146 },
147 McpToolCall {
149 server_id: String,
150 tool_name: String,
151 input: Value,
152 metadata: MetadataMap,
153 },
154 McpResourceRead {
156 server_id: String,
157 resource_id: String,
158 metadata: MetadataMap,
159 },
160 McpPromptGet {
162 server_id: String,
163 prompt_id: String,
164 args: Value,
165 metadata: MetadataMap,
166 },
167 McpOther {
172 server_id: String,
173 method: String,
174 params: Value,
175 metadata: MetadataMap,
176 },
177}
178
179impl AuthOperation {
180 pub fn server_id(&self) -> Option<&str> {
182 match self {
183 Self::McpConnect { server_id, .. }
184 | Self::McpToolCall { server_id, .. }
185 | Self::McpResourceRead { server_id, .. }
186 | Self::McpPromptGet { server_id, .. }
187 | Self::McpOther { server_id, .. } => Some(server_id.as_str()),
188 }
189 }
190}
191
192#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
194pub enum AuthResolution {
195 Provided {
197 request: AuthRequest,
198 credentials: MetadataMap,
199 },
200 Cancelled { request: AuthRequest },
202}
203
204impl AuthResolution {
205 pub fn provided(request: AuthRequest, credentials: MetadataMap) -> Self {
207 Self::Provided {
208 request,
209 credentials,
210 }
211 }
212
213 pub fn cancelled(request: AuthRequest) -> Self {
215 Self::Cancelled { request }
216 }
217
218 pub fn request(&self) -> &AuthRequest {
220 match self {
221 Self::Provided { request, .. } | Self::Cancelled { request } => request,
222 }
223 }
224}
225
226#[async_trait]
239pub trait McpAuthResponder: Send + Sync + 'static {
240 async fn resolve(&self, request: AuthRequest) -> Result<AuthResolution, McpError>;
241}
242
243#[derive(Debug, Clone, thiserror::Error)]
258pub enum McpInvocationError {
259 #[error("url elicitation required: {message}")]
261 UrlElicitation {
262 message: String,
264 data: Option<UrlElicitationData>,
267 raw_data: Option<serde_json::Value>,
269 },
270 #[error("invalid request: {message}")]
272 InvalidRequest {
273 message: String,
274 data: Option<serde_json::Value>,
275 },
276 #[error("method not found: {message}")]
278 MethodNotFound {
279 message: String,
280 data: Option<serde_json::Value>,
281 },
282 #[error("invalid params: {message}")]
284 InvalidParams {
285 message: String,
286 data: Option<serde_json::Value>,
287 },
288 #[error("internal error: {message}")]
290 InternalError {
291 message: String,
292 data: Option<serde_json::Value>,
293 },
294 #[error("parse error: {message}")]
296 ParseError {
297 message: String,
298 data: Option<serde_json::Value>,
299 },
300 #[error("resource not found: {message}")]
302 ResourceNotFound {
303 message: String,
304 data: Option<serde_json::Value>,
305 },
306 #[error("mcp error code {code}: {message}")]
309 Other {
310 code: i32,
311 message: String,
312 data: Option<serde_json::Value>,
313 },
314}
315
316#[derive(Debug, Clone, serde::Deserialize)]
324#[serde(rename_all = "camelCase")]
325pub struct UrlElicitationData {
326 pub url: String,
328 pub elicitation_id: String,
330 #[serde(default)]
332 pub message: Option<String>,
333}
334
335impl McpInvocationError {
336 pub fn from_error_data(err: rmcp::model::ErrorData) -> Self {
340 let rmcp::model::ErrorData {
341 code,
342 message,
343 data,
344 } = err;
345 let message = message.into_owned();
346 match code {
347 rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED => {
348 let typed = data.as_ref().and_then(|value| {
349 serde_json::from_value::<UrlElicitationData>(value.clone()).ok()
350 });
351 Self::UrlElicitation {
352 message,
353 data: typed,
354 raw_data: data,
355 }
356 }
357 rmcp::model::ErrorCode::INVALID_REQUEST => Self::InvalidRequest { message, data },
358 rmcp::model::ErrorCode::METHOD_NOT_FOUND => Self::MethodNotFound { message, data },
359 rmcp::model::ErrorCode::INVALID_PARAMS => Self::InvalidParams { message, data },
360 rmcp::model::ErrorCode::INTERNAL_ERROR => Self::InternalError { message, data },
361 rmcp::model::ErrorCode::PARSE_ERROR => Self::ParseError { message, data },
362 rmcp::model::ErrorCode::RESOURCE_NOT_FOUND => Self::ResourceNotFound { message, data },
363 other => Self::Other {
364 code: other.0,
365 message,
366 data,
367 },
368 }
369 }
370
371 pub fn code(&self) -> i32 {
373 match self {
374 Self::UrlElicitation { .. } => rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED.0,
375 Self::InvalidRequest { .. } => rmcp::model::ErrorCode::INVALID_REQUEST.0,
376 Self::MethodNotFound { .. } => rmcp::model::ErrorCode::METHOD_NOT_FOUND.0,
377 Self::InvalidParams { .. } => rmcp::model::ErrorCode::INVALID_PARAMS.0,
378 Self::InternalError { .. } => rmcp::model::ErrorCode::INTERNAL_ERROR.0,
379 Self::ParseError { .. } => rmcp::model::ErrorCode::PARSE_ERROR.0,
380 Self::ResourceNotFound { .. } => rmcp::model::ErrorCode::RESOURCE_NOT_FOUND.0,
381 Self::Other { code, .. } => *code,
382 }
383 }
384}
385
386#[async_trait]
401pub trait McpErrorResponder: Send + Sync + 'static {
402 async fn handle(
405 &self,
406 error: &McpInvocationError,
407 ctx: McpErrorContext<'_>,
408 ) -> ErrorResponderOutcome;
409}
410
411pub struct McpErrorContext<'a> {
414 pub server_id: &'a McpServerId,
416 pub method: &'a McpMethod,
418 pub input: Option<&'a serde_json::Value>,
420}
421
422pub enum ErrorResponderOutcome {
424 SynthesizeResult(CallToolResult),
430 PassThrough,
433}
434
435#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
439pub struct McpServerId(pub String);
440
441impl McpServerId {
442 pub fn new(value: impl Into<String>) -> Self {
444 Self(value.into())
445 }
446}
447
448impl fmt::Display for McpServerId {
449 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
450 self.0.fmt(f)
451 }
452}
453
454#[derive(Clone, Debug, PartialEq, Eq)]
459pub struct StdioTransportConfig {
460 pub command: String,
462 pub args: Vec<String>,
464 pub env: Vec<(String, String)>,
466 pub cwd: Option<std::path::PathBuf>,
468}
469
470impl StdioTransportConfig {
471 pub fn new(command: impl Into<String>) -> Self {
473 Self {
474 command: command.into(),
475 args: Vec::new(),
476 env: Vec::new(),
477 cwd: None,
478 }
479 }
480
481 pub fn with_arg(mut self, arg: impl Into<String>) -> Self {
483 self.args.push(arg.into());
484 self
485 }
486
487 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
489 self.env.push((key.into(), value.into()));
490 self
491 }
492
493 pub fn with_cwd(mut self, cwd: impl Into<std::path::PathBuf>) -> Self {
495 self.cwd = Some(cwd.into());
496 self
497 }
498}
499
500#[derive(Clone, Default)]
502pub struct StreamableHttpTransportConfig {
503 pub url: String,
505 pub bearer_token: Option<String>,
510 pub headers: Vec<(HeaderName, HeaderValue)>,
514 pub http_client: Option<Arc<dyn McpHttpClient>>,
521}
522
523impl fmt::Debug for StreamableHttpTransportConfig {
524 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
525 f.debug_struct("StreamableHttpTransportConfig")
526 .field("url", &self.url)
527 .field(
528 "bearer_token",
529 &self.bearer_token.as_deref().map(|_| "<redacted>"),
530 )
531 .field("headers", &self.headers)
532 .field(
533 "http_client",
534 &self.http_client.as_ref().map(|_| "<custom>"),
535 )
536 .finish()
537 }
538}
539
540impl StreamableHttpTransportConfig {
541 pub fn new(url: impl Into<String>) -> Self {
543 Self {
544 url: url.into(),
545 bearer_token: None,
546 headers: Vec::new(),
547 http_client: None,
548 }
549 }
550
551 pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
556 self.bearer_token = Some(token.into());
557 self
558 }
559
560 pub fn with_http_client(mut self, client: Arc<dyn McpHttpClient>) -> Self {
569 self.http_client = Some(client);
570 self
571 }
572
573 pub fn with_header<N, V>(mut self, name: N, value: V) -> Result<Self, McpError>
578 where
579 N: TryInto<HeaderName>,
580 N::Error: fmt::Display,
581 V: TryInto<HeaderValue>,
582 V::Error: fmt::Display,
583 {
584 let name = name
585 .try_into()
586 .map_err(|error| McpError::Transport(format!("invalid HTTP header name: {error}")))?;
587 let value = value
588 .try_into()
589 .map_err(|error| McpError::Transport(format!("invalid HTTP header value: {error}")))?;
590 self.headers.push((name, value));
591 Ok(self)
592 }
593}
594
595pub type McpSseStream = BoxStream<'static, Result<Sse, SseError>>;
597
598#[async_trait]
617pub trait McpHttpClient: Send + Sync + 'static {
618 async fn post_message(
622 &self,
623 uri: Arc<str>,
624 message: ClientJsonRpcMessage,
625 session_id: Option<Arc<str>>,
626 auth_header: Option<String>,
627 custom_headers: HashMap<HeaderName, HeaderValue>,
628 ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>>;
629
630 async fn delete_session(
632 &self,
633 uri: Arc<str>,
634 session_id: Arc<str>,
635 auth_header: Option<String>,
636 custom_headers: HashMap<HeaderName, HeaderValue>,
637 ) -> Result<(), StreamableHttpError<reqwest::Error>>;
638
639 async fn get_stream(
642 &self,
643 uri: Arc<str>,
644 session_id: Arc<str>,
645 last_event_id: Option<String>,
646 auth_header: Option<String>,
647 custom_headers: HashMap<HeaderName, HeaderValue>,
648 ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>>;
649}
650
651#[derive(Clone)]
654struct DynHttpClient(Arc<dyn McpHttpClient>);
655
656impl RmcpStreamableHttpClient for DynHttpClient {
657 type Error = reqwest::Error;
658
659 async fn post_message(
660 &self,
661 uri: Arc<str>,
662 message: ClientJsonRpcMessage,
663 session_id: Option<Arc<str>>,
664 auth_header: Option<String>,
665 custom_headers: HashMap<HeaderName, HeaderValue>,
666 ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>> {
667 self.0
668 .post_message(uri, message, session_id, auth_header, custom_headers)
669 .await
670 }
671
672 async fn delete_session(
673 &self,
674 uri: Arc<str>,
675 session_id: Arc<str>,
676 auth_header: Option<String>,
677 custom_headers: HashMap<HeaderName, HeaderValue>,
678 ) -> Result<(), StreamableHttpError<reqwest::Error>> {
679 self.0
680 .delete_session(uri, session_id, auth_header, custom_headers)
681 .await
682 }
683
684 async fn get_stream(
685 &self,
686 uri: Arc<str>,
687 session_id: Arc<str>,
688 last_event_id: Option<String>,
689 auth_header: Option<String>,
690 custom_headers: HashMap<HeaderName, HeaderValue>,
691 ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>> {
692 self.0
693 .get_stream(uri, session_id, last_event_id, auth_header, custom_headers)
694 .await
695 }
696}
697
698#[derive(Clone, Debug)]
700pub enum McpTransportBinding {
701 Stdio(StdioTransportConfig),
703 StreamableHttp(StreamableHttpTransportConfig),
705}
706
707#[derive(Clone, Debug)]
709pub struct McpServerConfig {
710 pub id: McpServerId,
712 pub transport: McpTransportBinding,
714 pub metadata: MetadataMap,
716}
717
718impl McpServerConfig {
719 pub fn new(id: impl Into<String>, transport: McpTransportBinding) -> Self {
721 Self {
722 id: McpServerId::new(id),
723 transport,
724 metadata: MetadataMap::new(),
725 }
726 }
727
728 pub fn stdio(id: impl Into<String>, command: impl Into<String>) -> Self {
730 Self::new(
731 id,
732 McpTransportBinding::Stdio(StdioTransportConfig::new(command)),
733 )
734 }
735
736 pub fn streamable_http(id: impl Into<String>, url: impl Into<String>) -> Self {
738 Self::new(
739 id,
740 McpTransportBinding::StreamableHttp(StreamableHttpTransportConfig::new(url)),
741 )
742 }
743
744 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
746 self.metadata = metadata;
747 self
748 }
749}
750
751type CustomNamespace = Arc<dyn Fn(&McpServerId, &str) -> String + Send + Sync>;
752
753#[derive(Clone, Default)]
761pub enum McpToolNamespace {
762 #[default]
764 Default,
765 None,
767 Custom(CustomNamespace),
769}
770
771impl fmt::Debug for McpToolNamespace {
772 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773 match self {
774 Self::Default => f.write_str("McpToolNamespace::Default"),
775 Self::None => f.write_str("McpToolNamespace::None"),
776 Self::Custom(_) => f.write_str("McpToolNamespace::Custom(<fn>)"),
777 }
778 }
779}
780
781impl McpToolNamespace {
782 pub fn custom(f: impl Fn(&McpServerId, &str) -> String + Send + Sync + 'static) -> Self {
784 Self::Custom(Arc::new(f))
785 }
786
787 pub fn apply(&self, server_id: &McpServerId, tool_name: &str) -> String {
789 match self {
790 Self::Default => format!("mcp_{server_id}_{tool_name}"),
791 Self::None => tool_name.to_string(),
792 Self::Custom(f) => f(server_id, tool_name),
793 }
794 }
795
796 pub fn unapply(&self, server_id: &McpServerId, agentkit_name: &str) -> Option<String> {
800 match self {
801 Self::Default => agentkit_name
802 .strip_prefix(&format!("mcp_{server_id}_"))
803 .map(str::to_string),
804 Self::None => Some(agentkit_name.to_string()),
805 Self::Custom(_) => None,
806 }
807 }
808}
809
810#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
819pub struct McpDiscoverySnapshot {
820 pub server_id: McpServerId,
822 pub tools: Vec<McpTool>,
824 pub resources: Vec<McpResource>,
826 pub prompts: Vec<McpPrompt>,
828 pub metadata: MetadataMap,
830}
831
832#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
834pub enum McpCatalogEvent {
835 ServerConnected { server_id: McpServerId },
837 ServerDisconnected { server_id: McpServerId },
839 ToolsChanged {
841 server_id: McpServerId,
842 added: Vec<String>,
843 removed: Vec<String>,
844 changed: Vec<String>,
845 },
846 ResourcesChanged {
848 server_id: McpServerId,
849 added: Vec<String>,
850 removed: Vec<String>,
851 changed: Vec<String>,
852 },
853 PromptsChanged {
855 server_id: McpServerId,
856 added: Vec<String>,
857 removed: Vec<String>,
858 changed: Vec<String>,
859 },
860 AuthChanged { server_id: McpServerId },
862 RefreshFailed {
864 server_id: McpServerId,
865 message: String,
866 },
867}
868
869#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
871pub struct McpServerCapabilities {
872 #[serde(default, skip_serializing_if = "Option::is_none")]
874 pub tools: Option<ToolsCapability>,
875 #[serde(default, skip_serializing_if = "Option::is_none")]
877 pub resources: Option<ResourcesCapability>,
878 #[serde(default, skip_serializing_if = "Option::is_none")]
880 pub prompts: Option<PromptsCapability>,
881 #[serde(default, skip_serializing_if = "Option::is_none")]
883 pub logging: Option<LoggingCapability>,
884}
885
886impl McpServerCapabilities {
887 pub fn all() -> Self {
890 Self {
891 tools: Some(ToolsCapability::default()),
892 resources: Some(ResourcesCapability::default()),
893 prompts: Some(PromptsCapability::default()),
894 logging: Some(LoggingCapability::default()),
895 }
896 }
897}
898
899#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
901#[serde(rename_all = "camelCase")]
902pub struct ToolsCapability {
903 #[serde(default, skip_serializing_if = "Option::is_none")]
905 pub list_changed: Option<bool>,
906}
907
908#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
910#[serde(rename_all = "camelCase")]
911pub struct ResourcesCapability {
912 #[serde(default, skip_serializing_if = "Option::is_none")]
914 pub subscribe: Option<bool>,
915 #[serde(default, skip_serializing_if = "Option::is_none")]
917 pub list_changed: Option<bool>,
918}
919
920#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
922#[serde(rename_all = "camelCase")]
923pub struct PromptsCapability {
924 #[serde(default, skip_serializing_if = "Option::is_none")]
926 pub list_changed: Option<bool>,
927}
928
929#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
931pub struct LoggingCapability {}
932
933#[allow(clippy::enum_variant_names)]
942#[derive(Clone, Debug)]
943pub enum McpServerNotification {
944 ToolsChanged,
946 ResourcesChanged,
948 PromptsChanged,
950}
951
952#[derive(Clone, Debug)]
960pub enum McpServerEvent {
961 Progress(McpProgressNotificationParam),
964 Logging(McpLoggingMessageNotificationParam),
967 ResourceUpdated(McpResourceUpdatedNotificationParam),
970 ToolListChanged,
972 ResourceListChanged,
974 PromptListChanged,
976 Cancelled(McpCancelledNotificationParam),
979}
980
981#[async_trait]
986pub trait McpSamplingResponder: Send + Sync + 'static {
987 async fn create_message(
990 &self,
991 params: McpCreateMessageRequestParams,
992 ) -> Result<McpCreateMessageResult, McpError>;
993}
994
995#[async_trait]
1000pub trait McpElicitationResponder: Send + Sync + 'static {
1001 async fn create_elicitation(
1003 &self,
1004 params: McpCreateElicitationRequestParams,
1005 ) -> Result<McpCreateElicitationResult, McpError>;
1006}
1007
1008#[async_trait]
1013pub trait McpRootsProvider: Send + Sync + 'static {
1014 async fn list_roots(&self) -> Result<Vec<McpRoot>, McpError>;
1016}
1017
1018const DEFAULT_EVENTS_CAPACITY: usize = 128;
1020
1021pub struct McpClientChannels {
1033 pub notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1035 pub events: broadcast::Sender<McpServerEvent>,
1037}
1038
1039#[derive(Clone)]
1049pub struct McpClientHandler {
1050 info: rmcp_model::ClientInfo,
1051 notifications: mpsc::UnboundedSender<McpServerNotification>,
1052 events: broadcast::Sender<McpServerEvent>,
1053 sampling: Option<Arc<dyn McpSamplingResponder>>,
1054 elicitation: Option<Arc<dyn McpElicitationResponder>>,
1055 roots: Option<Arc<dyn McpRootsProvider>>,
1056}
1057
1058impl ClientHandler for McpClientHandler {
1059 fn create_message(
1060 &self,
1061 params: rmcp_model::CreateMessageRequestParams,
1062 _context: rmcp::service::RequestContext<RoleClient>,
1063 ) -> impl Future<Output = Result<rmcp_model::CreateMessageResult, rmcp_model::ErrorData>>
1064 + rmcp::service::MaybeSendFuture
1065 + '_ {
1066 let responder = self.sampling.clone();
1067 async move {
1068 match responder {
1069 Some(responder) => responder.create_message(params).await.map_err(Into::into),
1070 None => Err(rmcp_model::ErrorData::method_not_found::<
1071 rmcp_model::CreateMessageRequestMethod,
1072 >()),
1073 }
1074 }
1075 }
1076
1077 fn list_roots(
1078 &self,
1079 _context: rmcp::service::RequestContext<RoleClient>,
1080 ) -> impl Future<Output = Result<rmcp_model::ListRootsResult, rmcp_model::ErrorData>>
1081 + rmcp::service::MaybeSendFuture
1082 + '_ {
1083 let provider = self.roots.clone();
1084 async move {
1085 match provider {
1086 Some(provider) => provider
1087 .list_roots()
1088 .await
1089 .map(McpListRootsResult::new)
1090 .map_err(Into::into),
1091 None => Ok(McpListRootsResult::default()),
1092 }
1093 }
1094 }
1095
1096 fn create_elicitation(
1097 &self,
1098 params: rmcp_model::CreateElicitationRequestParams,
1099 _context: rmcp::service::RequestContext<RoleClient>,
1100 ) -> impl Future<Output = Result<rmcp_model::CreateElicitationResult, rmcp_model::ErrorData>>
1101 + rmcp::service::MaybeSendFuture
1102 + '_ {
1103 let responder = self.elicitation.clone();
1104 async move {
1105 match responder {
1106 Some(responder) => responder
1107 .create_elicitation(params)
1108 .await
1109 .map_err(Into::into),
1110 None => Ok(McpCreateElicitationResult::new(
1111 McpElicitationAction::Decline,
1112 )),
1113 }
1114 }
1115 }
1116
1117 fn on_progress(
1118 &self,
1119 params: rmcp_model::ProgressNotificationParam,
1120 _context: rmcp::service::NotificationContext<RoleClient>,
1121 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1122 let _ = self.events.send(McpServerEvent::Progress(params));
1123 std::future::ready(())
1124 }
1125
1126 fn on_logging_message(
1127 &self,
1128 params: rmcp_model::LoggingMessageNotificationParam,
1129 _context: rmcp::service::NotificationContext<RoleClient>,
1130 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1131 let _ = self.events.send(McpServerEvent::Logging(params));
1132 std::future::ready(())
1133 }
1134
1135 fn on_resource_updated(
1136 &self,
1137 params: rmcp_model::ResourceUpdatedNotificationParam,
1138 _context: rmcp::service::NotificationContext<RoleClient>,
1139 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1140 let _ = self.events.send(McpServerEvent::ResourceUpdated(params));
1141 std::future::ready(())
1142 }
1143
1144 fn on_cancelled(
1145 &self,
1146 params: rmcp_model::CancelledNotificationParam,
1147 _context: rmcp::service::NotificationContext<RoleClient>,
1148 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1149 let _ = self.events.send(McpServerEvent::Cancelled(params));
1150 std::future::ready(())
1151 }
1152
1153 fn on_tool_list_changed(
1154 &self,
1155 _context: rmcp::service::NotificationContext<RoleClient>,
1156 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1157 let _ = self.notifications.send(McpServerNotification::ToolsChanged);
1158 let _ = self.events.send(McpServerEvent::ToolListChanged);
1159 std::future::ready(())
1160 }
1161
1162 fn on_resource_list_changed(
1163 &self,
1164 _context: rmcp::service::NotificationContext<RoleClient>,
1165 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1166 let _ = self
1167 .notifications
1168 .send(McpServerNotification::ResourcesChanged);
1169 let _ = self.events.send(McpServerEvent::ResourceListChanged);
1170 std::future::ready(())
1171 }
1172
1173 fn on_prompt_list_changed(
1174 &self,
1175 _context: rmcp::service::NotificationContext<RoleClient>,
1176 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1177 let _ = self
1178 .notifications
1179 .send(McpServerNotification::PromptsChanged);
1180 let _ = self.events.send(McpServerEvent::PromptListChanged);
1181 std::future::ready(())
1182 }
1183
1184 fn get_info(&self) -> rmcp_model::ClientInfo {
1185 self.info.clone()
1186 }
1187}
1188
1189impl From<McpError> for rmcp_model::ErrorData {
1190 fn from(error: McpError) -> Self {
1191 rmcp_model::ErrorData::internal_error(error.to_string(), None)
1192 }
1193}
1194
1195type RmcpClientService = RunningService<RoleClient, McpClientHandler>;
1196
1197#[derive(Clone, Default)]
1206pub struct McpHandlerConfig {
1207 pub sampling: Option<Arc<dyn McpSamplingResponder>>,
1209 pub elicitation: Option<Arc<dyn McpElicitationResponder>>,
1211 pub roots: Option<Arc<dyn McpRootsProvider>>,
1213 pub auth: Option<Arc<dyn McpAuthResponder>>,
1218 pub error_responder: Option<Arc<dyn McpErrorResponder>>,
1225 pub events_capacity: Option<usize>,
1228}
1229
1230impl McpHandlerConfig {
1231 pub fn new() -> Self {
1233 Self::default()
1234 }
1235
1236 pub fn with_sampling_responder(mut self, responder: Arc<dyn McpSamplingResponder>) -> Self {
1238 self.sampling = Some(responder);
1239 self
1240 }
1241
1242 pub fn with_elicitation_responder(
1244 mut self,
1245 responder: Arc<dyn McpElicitationResponder>,
1246 ) -> Self {
1247 self.elicitation = Some(responder);
1248 self
1249 }
1250
1251 pub fn with_roots_provider(mut self, provider: Arc<dyn McpRootsProvider>) -> Self {
1253 self.roots = Some(provider);
1254 self
1255 }
1256
1257 pub fn with_auth_responder(mut self, responder: Arc<dyn McpAuthResponder>) -> Self {
1259 self.auth = Some(responder);
1260 self
1261 }
1262
1263 pub fn with_error_responder(mut self, responder: Arc<dyn McpErrorResponder>) -> Self {
1265 self.error_responder = Some(responder);
1266 self
1267 }
1268
1269 pub fn with_events_capacity(mut self, capacity: usize) -> Self {
1271 self.events_capacity = Some(capacity);
1272 self
1273 }
1274
1275 pub fn build(&self) -> (McpClientHandler, McpClientChannels) {
1279 self.build_inner(None)
1280 }
1281
1282 pub fn build_with(
1287 &self,
1288 events: broadcast::Sender<McpServerEvent>,
1289 ) -> (McpClientHandler, McpClientChannels) {
1290 self.build_inner(Some(events))
1291 }
1292
1293 fn build_inner(
1294 &self,
1295 events: Option<broadcast::Sender<McpServerEvent>>,
1296 ) -> (McpClientHandler, McpClientChannels) {
1297 let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
1298 let events_tx = events.unwrap_or_else(|| {
1299 let capacity = self.events_capacity.unwrap_or(DEFAULT_EVENTS_CAPACITY);
1300 let (tx, _) = broadcast::channel(capacity);
1301 tx
1302 });
1303
1304 let mut capabilities = rmcp_model::ClientCapabilities::default();
1305 if self.sampling.is_some() {
1306 capabilities.sampling = Some(McpSamplingCapability::default());
1307 }
1308 if self.elicitation.is_some() {
1309 capabilities.elicitation = Some(McpElicitationCapability {
1310 form: Some(McpFormElicitationCapability::default()),
1311 url: None,
1312 });
1313 }
1314 if self.roots.is_some() {
1315 capabilities.roots = Some(McpRootsCapabilities::default());
1316 }
1317
1318 let handler = McpClientHandler {
1319 info: rmcp_model::ClientInfo::new(
1320 capabilities,
1321 rmcp_model::Implementation::new("agentkit-mcp", env!("CARGO_PKG_VERSION"))
1322 .with_title("agentkit MCP client"),
1323 )
1324 .with_protocol_version(rmcp_model::ProtocolVersion::LATEST),
1325 notifications: notifications_tx,
1326 events: events_tx.clone(),
1327 sampling: self.sampling.clone(),
1328 elicitation: self.elicitation.clone(),
1329 roots: self.roots.clone(),
1330 };
1331
1332 (
1333 handler,
1334 McpClientChannels {
1335 notifications: notifications_rx,
1336 events: events_tx,
1337 },
1338 )
1339 }
1340}
1341
1342pub struct McpConnection {
1345 server_id: McpServerId,
1346 config: Option<McpServerConfig>,
1347 inner: Mutex<RmcpClientService>,
1348 peer: RwLock<Peer<RoleClient>>,
1349 auth: Mutex<Option<MetadataMap>>,
1350 notifications: Mutex<mpsc::UnboundedReceiver<McpServerNotification>>,
1351 events: broadcast::Sender<McpServerEvent>,
1352 handler_config: McpHandlerConfig,
1353 capabilities: McpServerCapabilities,
1354}
1355
1356#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1358pub enum McpOperationResult {
1359 Connected(McpDiscoverySnapshot),
1361 Tool(CallToolResult),
1363 Resource(ReadResourceResult),
1365 Prompt(GetPromptResult),
1367}
1368
1369impl McpConnection {
1370 pub async fn connect(config: &McpServerConfig) -> Result<Self, McpError> {
1375 Self::connect_with_auth(config, None, McpHandlerConfig::default()).await
1376 }
1377
1378 pub async fn connect_with_handler(
1380 config: &McpServerConfig,
1381 handler_config: McpHandlerConfig,
1382 ) -> Result<Self, McpError> {
1383 Self::connect_with_auth(config, None, handler_config).await
1384 }
1385
1386 async fn connect_with_auth(
1387 config: &McpServerConfig,
1388 auth: Option<&MetadataMap>,
1389 handler_config: McpHandlerConfig,
1390 ) -> Result<Self, McpError> {
1391 let (handler, channels) = handler_config.build();
1392 let McpClientChannels {
1393 notifications: notification_rx,
1394 events: events_tx,
1395 } = channels;
1396 let (service, capabilities) = match &config.transport {
1397 McpTransportBinding::Stdio(binding) => {
1398 connect_rmcp_stdio(config, binding, handler).await?
1399 }
1400 McpTransportBinding::StreamableHttp(binding) => {
1401 connect_rmcp_streamable_http(config, binding, auth, handler).await?
1402 }
1403 };
1404
1405 let peer = service.peer().clone();
1406 Ok(Self {
1407 server_id: config.id.clone(),
1408 config: Some(config.clone()),
1409 inner: Mutex::new(service),
1410 peer: RwLock::new(peer),
1411 auth: Mutex::new(auth.cloned()),
1412 notifications: Mutex::new(notification_rx),
1413 events: events_tx,
1414 handler_config,
1415 capabilities,
1416 })
1417 }
1418
1419 pub fn from_running_service(
1435 server_id: impl Into<McpServerId>,
1436 service: RmcpClientService,
1437 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1438 ) -> Self {
1439 let (events_tx, _) = broadcast::channel(DEFAULT_EVENTS_CAPACITY);
1440 Self::from_running_service_with_events(server_id, service, notifications, events_tx)
1441 }
1442
1443 pub fn from_running_service_with_events(
1450 server_id: impl Into<McpServerId>,
1451 service: RmcpClientService,
1452 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1453 events: broadcast::Sender<McpServerEvent>,
1454 ) -> Self {
1455 Self::from_running_service_with_events_and_handler_config(
1456 server_id,
1457 service,
1458 notifications,
1459 events,
1460 McpHandlerConfig::default(),
1461 )
1462 }
1463
1464 pub fn from_running_service_with_events_and_handler_config(
1471 server_id: impl Into<McpServerId>,
1472 service: RmcpClientService,
1473 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1474 events: broadcast::Sender<McpServerEvent>,
1475 handler_config: McpHandlerConfig,
1476 ) -> Self {
1477 let capabilities = service
1478 .peer_info()
1479 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1480 .unwrap_or_default();
1481 let peer = service.peer().clone();
1482 Self {
1483 server_id: server_id.into(),
1484 config: None,
1485 inner: Mutex::new(service),
1486 peer: RwLock::new(peer),
1487 auth: Mutex::new(None),
1488 notifications: Mutex::new(notifications),
1489 events,
1490 handler_config,
1491 capabilities,
1492 }
1493 }
1494
1495 async fn reconnect_inner(&self, auth: Option<&MetadataMap>) -> Result<(), McpError> {
1496 let Some(config) = self.config.clone() else {
1497 return Ok(());
1498 };
1499 let (handler, channels) = self.handler_config.build_with(self.events.clone());
1500 let McpClientChannels {
1501 notifications: notification_rx,
1502 ..
1503 } = channels;
1504 let (service, _capabilities) = match &config.transport {
1505 McpTransportBinding::Stdio(binding) => {
1506 connect_rmcp_stdio(&config, binding, handler).await?
1507 }
1508 McpTransportBinding::StreamableHttp(binding) => {
1509 connect_rmcp_streamable_http(&config, binding, auth, handler).await?
1510 }
1511 };
1512 let new_peer = service.peer().clone();
1513 *self.notifications.lock().await = notification_rx;
1514 *self.inner.lock().await = service;
1515 *self.peer.write().expect("MCP peer lock poisoned") = new_peer;
1516 Ok(())
1517 }
1518
1519 fn peer(&self) -> Peer<RoleClient> {
1520 self.peer.read().expect("MCP peer lock poisoned").clone()
1521 }
1522
1523 pub fn server_id(&self) -> &McpServerId {
1525 &self.server_id
1526 }
1527
1528 pub fn capabilities(&self) -> &McpServerCapabilities {
1530 &self.capabilities
1531 }
1532
1533 pub fn handler_config(&self) -> &McpHandlerConfig {
1537 &self.handler_config
1538 }
1539
1540 pub fn subscribe_events(&self) -> broadcast::Receiver<McpServerEvent> {
1549 self.events.subscribe()
1550 }
1551
1552 pub async fn subscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1557 let uri = uri.into();
1558 self.peer()
1559 .subscribe(rmcp_model::SubscribeRequestParams::new(uri.clone()))
1560 .await
1561 .map_err(|error| {
1562 rmcp_operation_error(
1563 &self.server_id,
1564 McpMethod::ResourcesSubscribe { uri },
1565 error,
1566 )
1567 })
1568 }
1569
1570 pub async fn unsubscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1572 let uri = uri.into();
1573 self.peer()
1574 .unsubscribe(rmcp_model::UnsubscribeRequestParams::new(uri.clone()))
1575 .await
1576 .map_err(|error| {
1577 rmcp_operation_error(
1578 &self.server_id,
1579 McpMethod::ResourcesUnsubscribe { uri },
1580 error,
1581 )
1582 })
1583 }
1584
1585 pub async fn set_logging_level(&self, level: McpLoggingLevel) -> Result<(), McpError> {
1588 self.peer()
1589 .set_level(rmcp_model::SetLevelRequestParams::new(level))
1590 .await
1591 .map_err(|error| {
1592 rmcp_operation_error(
1593 &self.server_id,
1594 McpMethod::LoggingSetLevel {
1595 level: format!("{level:?}"),
1596 },
1597 error,
1598 )
1599 })
1600 }
1601
1602 pub async fn notify_cancelled(
1605 &self,
1606 params: McpCancelledNotificationParam,
1607 ) -> Result<(), McpError> {
1608 self.peer()
1609 .notify_cancelled(params)
1610 .await
1611 .map_err(rmcp_service_error)
1612 }
1613
1614 pub async fn notify_roots_list_changed(&self) -> Result<(), McpError> {
1617 self.peer()
1618 .notify_roots_list_changed()
1619 .await
1620 .map_err(rmcp_service_error)
1621 }
1622
1623 pub async fn close(&self) -> Result<(), McpError> {
1628 let mut inner = self.inner.lock().await;
1629 inner
1630 .close()
1631 .await
1632 .map(|_| ())
1633 .map_err(|error| McpError::Transport(format!("rmcp service close failed: {error}")))
1634 }
1635
1636 pub async fn resolve_auth(&self, resolution: AuthResolution) -> Result<(), McpError> {
1639 let mut auth_slot = self.auth.lock().await;
1640 match resolution {
1641 AuthResolution::Provided { credentials, .. } => {
1642 *auth_slot = Some(credentials);
1643 }
1644 AuthResolution::Cancelled { .. } => {
1645 *auth_slot = None;
1646 }
1647 }
1648 let snapshot = auth_slot.clone();
1649 drop(auth_slot);
1650 if self.config.is_some() {
1654 self.reconnect_inner(snapshot.as_ref()).await?;
1655 }
1656 Ok(())
1657 }
1658
1659 pub async fn discover(&self) -> Result<McpDiscoverySnapshot, McpError> {
1661 let tools = async {
1662 match self.capabilities.tools {
1663 Some(_) => self.list_tools().await,
1664 None => Ok(Vec::new()),
1665 }
1666 };
1667 let resources = async {
1668 match self.capabilities.resources {
1669 Some(_) => self.list_resources().await,
1670 None => Ok(Vec::new()),
1671 }
1672 };
1673 let prompts = async {
1674 match self.capabilities.prompts {
1675 Some(_) => self.list_prompts().await,
1676 None => Ok(Vec::new()),
1677 }
1678 };
1679 let (tools, resources, prompts) = tokio::try_join!(tools, resources, prompts)?;
1680 Ok(McpDiscoverySnapshot {
1681 server_id: self.server_id.clone(),
1682 tools,
1683 resources,
1684 prompts,
1685 metadata: MetadataMap::new(),
1686 })
1687 }
1688
1689 async fn drain_notifications(&self) -> Vec<McpServerNotification> {
1690 let mut notifications = self.notifications.lock().await;
1691 let mut drained = Vec::new();
1692 while let Ok(notification) = notifications.try_recv() {
1693 drained.push(notification);
1694 }
1695 drained
1696 }
1697
1698 pub async fn list_tools(&self) -> Result<Vec<McpTool>, McpError> {
1700 self.peer()
1701 .list_all_tools()
1702 .await
1703 .map_err(rmcp_service_error)
1704 }
1705
1706 pub async fn list_resources(&self) -> Result<Vec<McpResource>, McpError> {
1708 self.peer()
1709 .list_all_resources()
1710 .await
1711 .map_err(rmcp_service_error)
1712 }
1713
1714 pub async fn list_prompts(&self) -> Result<Vec<McpPrompt>, McpError> {
1716 self.peer()
1717 .list_all_prompts()
1718 .await
1719 .map_err(rmcp_service_error)
1720 }
1721
1722 pub async fn call_tool(
1729 &self,
1730 name: &str,
1731 arguments: Value,
1732 ) -> Result<CallToolResult, McpError> {
1733 let arguments_for_auth = arguments.clone();
1734 let mut params = rmcp_model::CallToolRequestParams::new(name.to_string());
1735 if !arguments.is_null() {
1736 params =
1737 params.with_arguments(value_to_json_object(arguments, "tools/call arguments")?);
1738 }
1739 let name_owned = name.to_string();
1740
1741 let span = tracing::info_span!(
1745 "mcp.call_tool",
1746 "otel.name" = %format!("mcp.call_tool {name}"),
1747 "mcp.server.id" = %self.server_id,
1748 "mcp.tool.name" = %name,
1749 "error.type" = tracing::field::Empty,
1750 );
1751 use tracing::Instrument;
1752 let result = self.peer().call_tool(params).instrument(span.clone()).await;
1753 match result {
1754 Ok(result) => {
1755 if result.is_error == Some(true) {
1756 span.record("error.type", "tool_error");
1757 }
1758 Ok(result)
1759 }
1760 Err(error) => {
1761 span.record("error.type", "mcp_error");
1762 Err(rmcp_operation_error(
1763 &self.server_id,
1764 McpMethod::ToolsCall {
1765 name: name_owned,
1766 arguments: arguments_for_auth,
1767 },
1768 error,
1769 ))
1770 }
1771 }
1772 }
1773
1774 pub async fn read_resource(&self, uri: &str) -> Result<ReadResourceResult, McpError> {
1781 let uri_owned = uri.to_string();
1782 self.peer()
1783 .read_resource(rmcp_model::ReadResourceRequestParams::new(uri))
1784 .await
1785 .map_err(|error| {
1786 rmcp_operation_error(
1787 &self.server_id,
1788 McpMethod::ResourcesRead { uri: uri_owned },
1789 error,
1790 )
1791 })
1792 }
1793
1794 pub async fn get_prompt(
1802 &self,
1803 name: &str,
1804 arguments: Value,
1805 ) -> Result<GetPromptResult, McpError> {
1806 let arguments_for_auth = arguments.clone();
1807 let name_owned = name.to_string();
1808 let mut params = rmcp_model::GetPromptRequestParams::new(name);
1809 if !arguments.is_null() {
1810 params =
1811 params.with_arguments(value_to_json_object(arguments, "prompts/get arguments")?);
1812 }
1813 self.peer().get_prompt(params).await.map_err(|error| {
1814 rmcp_operation_error(
1815 &self.server_id,
1816 McpMethod::PromptsGet {
1817 name: name_owned,
1818 arguments: arguments_for_auth,
1819 },
1820 error,
1821 )
1822 })
1823 }
1824}
1825
1826async fn connect_rmcp_stdio(
1827 config: &McpServerConfig,
1828 binding: &StdioTransportConfig,
1829 handler: McpClientHandler,
1830) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1831 let transport = TokioChildProcess::new(
1832 tokio::process::Command::new(&binding.command).configure(|command| {
1833 command.args(&binding.args);
1834 if let Some(cwd) = &binding.cwd {
1835 command.current_dir(cwd);
1836 }
1837 for (key, value) in &binding.env {
1838 command.env(key, value);
1839 }
1840 }),
1841 )
1842 .map_err(McpError::Io)?;
1843
1844 let service = handler
1845 .serve(transport)
1846 .await
1847 .map_err(|error| rmcp_initialize_error(config, error))?;
1848 let capabilities = service
1849 .peer_info()
1850 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1851 .unwrap_or_default();
1852
1853 Ok((service, capabilities))
1854}
1855
1856async fn connect_rmcp_streamable_http(
1857 config: &McpServerConfig,
1858 binding: &StreamableHttpTransportConfig,
1859 auth: Option<&MetadataMap>,
1860 handler: McpClientHandler,
1861) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1862 let auth_header = auth
1863 .and_then(bearer_token_from_metadata)
1864 .or_else(|| binding.bearer_token.clone());
1865 let mut rmcp_config = RmcpStreamableHttpClientTransportConfig::with_uri(binding.url.clone());
1866 if let Some(auth_header) = auth_header {
1867 rmcp_config = rmcp_config.auth_header(auth_header);
1868 }
1869 rmcp_config = rmcp_config.custom_headers(binding.headers.iter().cloned().collect());
1870
1871 let result = match binding.http_client.as_ref() {
1872 Some(client) => {
1873 let transport = StreamableHttpClientTransport::with_client(
1874 DynHttpClient(client.clone()),
1875 rmcp_config,
1876 );
1877 handler.serve(transport).await
1878 }
1879 None => {
1880 let transport = StreamableHttpClientTransport::from_config(rmcp_config);
1881 handler.serve(transport).await
1882 }
1883 };
1884 let service = result.map_err(|error| rmcp_initialize_error(config, error))?;
1885 let capabilities = service
1886 .peer_info()
1887 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1888 .unwrap_or_default();
1889
1890 Ok((service, capabilities))
1891}
1892
1893pub struct McpResourceHandle {
1895 connection: Arc<McpConnection>,
1896 descriptor: ResourceDescriptor,
1897}
1898
1899#[async_trait]
1900impl ResourceProvider for McpResourceHandle {
1901 async fn list_resources(&self) -> Result<Vec<ResourceDescriptor>, CapabilityError> {
1902 Ok(vec![self.descriptor.clone()])
1903 }
1904
1905 async fn read_resource(
1906 &self,
1907 id: &ResourceId,
1908 _ctx: &mut CapabilityContext<'_>,
1909 ) -> Result<ResourceContents, CapabilityError> {
1910 let result = self
1911 .connection
1912 .read_resource(&id.0)
1913 .await
1914 .map_err(|error| match error {
1915 McpError::AuthRequired(request) => {
1916 CapabilityError::Unavailable(format!("auth required: {:?}", request))
1917 }
1918 other => CapabilityError::ExecutionFailed(other.to_string()),
1919 })?;
1920 read_resource_result_to_capabilities(result)
1921 .map_err(|error| CapabilityError::ExecutionFailed(error.to_string()))
1922 }
1923}
1924
1925pub struct McpPromptHandle {
1927 connection: Arc<McpConnection>,
1928 descriptor: PromptDescriptor,
1929}
1930
1931#[async_trait]
1932impl PromptProvider for McpPromptHandle {
1933 async fn list_prompts(&self) -> Result<Vec<PromptDescriptor>, CapabilityError> {
1934 Ok(vec![self.descriptor.clone()])
1935 }
1936
1937 async fn get_prompt(
1938 &self,
1939 id: &PromptId,
1940 args: Value,
1941 _ctx: &mut CapabilityContext<'_>,
1942 ) -> Result<PromptContents, CapabilityError> {
1943 let result =
1944 self.connection
1945 .get_prompt(&id.0, args)
1946 .await
1947 .map_err(|error| match error {
1948 McpError::AuthRequired(request) => {
1949 CapabilityError::Unavailable(format!("auth required: {:?}", request))
1950 }
1951 other => CapabilityError::ExecutionFailed(other.to_string()),
1952 })?;
1953 Ok(get_prompt_result_to_capabilities(result))
1954 }
1955}
1956
1957pub struct McpCapabilityProvider {
1965 invocables: Vec<Arc<dyn Invocable>>,
1966 resources: Vec<Arc<dyn ResourceProvider>>,
1967 prompts: Vec<Arc<dyn PromptProvider>>,
1968}
1969
1970impl McpCapabilityProvider {
1971 pub fn from_snapshot(connection: Arc<McpConnection>, snapshot: &McpDiscoverySnapshot) -> Self {
1974 Self::from_snapshot_with_namespace(connection, snapshot, &McpToolNamespace::Default)
1975 }
1976
1977 pub fn from_snapshot_with_namespace(
1979 connection: Arc<McpConnection>,
1980 snapshot: &McpDiscoverySnapshot,
1981 namespace: &McpToolNamespace,
1982 ) -> Self {
1983 let server_id = connection.server_id().clone();
1984 let registry =
1985 snapshot
1986 .tools
1987 .iter()
1988 .cloned()
1989 .fold(ToolRegistry::new(), |registry, tool| {
1990 registry.with(McpToolAdapter::with_namespace(
1991 &server_id,
1992 connection.clone(),
1993 tool,
1994 namespace,
1995 ))
1996 });
1997 let permissions: Arc<dyn PermissionChecker> = Arc::new(AllowAllPermissions);
1998 let resources_arc: Arc<dyn agentkit_tools_core::ToolResources> = Arc::new(());
1999 let invocables =
2000 ToolCapabilityProvider::from_registry(®istry, permissions, resources_arc)
2001 .invocables();
2002
2003 let resources = snapshot
2004 .resources
2005 .iter()
2006 .cloned()
2007 .map(|resource| {
2008 Arc::new(McpResourceHandle {
2009 connection: connection.clone(),
2010 descriptor: resource_descriptor_from_rmcp(resource),
2011 }) as Arc<dyn ResourceProvider>
2012 })
2013 .collect();
2014
2015 let prompts = snapshot
2016 .prompts
2017 .iter()
2018 .cloned()
2019 .map(|prompt| {
2020 Arc::new(McpPromptHandle {
2021 connection: connection.clone(),
2022 descriptor: prompt_descriptor_from_rmcp(prompt),
2023 }) as Arc<dyn PromptProvider>
2024 })
2025 .collect();
2026
2027 Self {
2028 invocables,
2029 resources,
2030 prompts,
2031 }
2032 }
2033
2034 pub fn merge<I>(providers: I) -> Self
2036 where
2037 I: IntoIterator<Item = Self>,
2038 {
2039 let mut invocables = Vec::new();
2040 let mut resources = Vec::new();
2041 let mut prompts = Vec::new();
2042
2043 for provider in providers {
2044 invocables.extend(provider.invocables);
2045 resources.extend(provider.resources);
2046 prompts.extend(provider.prompts);
2047 }
2048
2049 Self {
2050 invocables,
2051 resources,
2052 prompts,
2053 }
2054 }
2055
2056 pub async fn connect(
2058 config: &McpServerConfig,
2059 ) -> Result<(Arc<McpConnection>, Self, McpDiscoverySnapshot), McpError> {
2060 let connection = Arc::new(McpConnection::connect(config).await?);
2061 let snapshot = connection.discover().await?;
2062 let provider = Self::from_snapshot(connection.clone(), &snapshot);
2063
2064 Ok((connection, provider, snapshot))
2065 }
2066}
2067
2068impl CapabilityProvider for McpCapabilityProvider {
2069 fn invocables(&self) -> Vec<Arc<dyn Invocable>> {
2070 self.invocables.clone()
2071 }
2072
2073 fn resources(&self) -> Vec<Arc<dyn ResourceProvider>> {
2074 self.resources.clone()
2075 }
2076
2077 fn prompts(&self) -> Vec<Arc<dyn PromptProvider>> {
2078 self.prompts.clone()
2079 }
2080}
2081
2082#[derive(Clone)]
2084pub struct McpServerHandle {
2085 config: McpServerConfig,
2086 connection: Arc<McpConnection>,
2087 snapshot: McpDiscoverySnapshot,
2088 namespace: McpToolNamespace,
2089}
2090
2091impl McpServerHandle {
2092 pub fn config(&self) -> &McpServerConfig {
2094 &self.config
2095 }
2096
2097 pub fn server_id(&self) -> &McpServerId {
2099 self.connection.server_id()
2100 }
2101
2102 pub fn connection(&self) -> Arc<McpConnection> {
2104 self.connection.clone()
2105 }
2106
2107 pub fn snapshot(&self) -> &McpDiscoverySnapshot {
2109 &self.snapshot
2110 }
2111
2112 pub fn namespace(&self) -> &McpToolNamespace {
2114 &self.namespace
2115 }
2116
2117 pub fn tool_registry(&self) -> ToolRegistry {
2119 self.snapshot
2120 .tools
2121 .iter()
2122 .cloned()
2123 .fold(ToolRegistry::new(), |registry, tool| {
2124 registry.with(McpToolAdapter::with_namespace(
2125 self.server_id(),
2126 self.connection.clone(),
2127 tool,
2128 &self.namespace,
2129 ))
2130 })
2131 }
2132
2133 pub fn capability_provider(&self) -> McpCapabilityProvider {
2135 McpCapabilityProvider::from_snapshot_with_namespace(
2136 self.connection.clone(),
2137 &self.snapshot,
2138 &self.namespace,
2139 )
2140 }
2141}
2142
2143#[derive(Debug)]
2146pub struct McpServerConnectionError {
2147 pub server_id: McpServerId,
2149 pub error: McpError,
2151}
2152
2153#[must_use = "inspect `failed` before ignoring the settled MCP connection result"]
2158pub struct McpConnectAllSettled {
2159 pub connected: Vec<McpServerHandle>,
2161 pub failed: Vec<McpServerConnectionError>,
2163}
2164
2165impl McpConnectAllSettled {
2166 pub fn all_connected(&self) -> bool {
2168 self.failed.is_empty()
2169 }
2170
2171 pub fn has_failures(&self) -> bool {
2173 !self.failed.is_empty()
2174 }
2175
2176 pub fn connected(&self) -> &[McpServerHandle] {
2178 &self.connected
2179 }
2180
2181 pub fn failed(&self) -> &[McpServerConnectionError] {
2183 &self.failed
2184 }
2185
2186 pub fn into_parts(self) -> (Vec<McpServerHandle>, Vec<McpServerConnectionError>) {
2188 (self.connected, self.failed)
2189 }
2190}
2191
2192impl fmt::Debug for McpConnectAllSettled {
2193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2194 let connected = self
2195 .connected
2196 .iter()
2197 .map(|handle| handle.server_id())
2198 .collect::<Vec<_>>();
2199 f.debug_struct("McpConnectAllSettled")
2200 .field("connected", &connected)
2201 .field("failed", &self.failed)
2202 .finish()
2203 }
2204}
2205
2206#[derive(Clone, Debug, Default, PartialEq, Eq)]
2208pub struct McpServerOptions {
2209 pub connect_timeout: Option<Duration>,
2214}
2215
2216impl McpServerOptions {
2217 pub fn new() -> Self {
2219 Self::default()
2220 }
2221
2222 pub fn with_timeout(mut self, timeout: Duration) -> Self {
2224 self.connect_timeout = Some(timeout);
2225 self
2226 }
2227}
2228
2229pub struct McpServerManager {
2231 configs: BTreeMap<McpServerId, McpServerConfig>,
2232 options: BTreeMap<McpServerId, McpServerOptions>,
2233 connections: BTreeMap<McpServerId, McpServerHandle>,
2234 auth: BTreeMap<McpServerId, MetadataMap>,
2235 catalog_tx: broadcast::Sender<McpCatalogEvent>,
2236 namespace: McpToolNamespace,
2237 handler_config: McpHandlerConfig,
2238 catalog_writer: CatalogWriter,
2239 server_tools: BTreeMap<McpServerId, BTreeSet<ToolName>>,
2244}
2245
2246impl Default for McpServerManager {
2247 fn default() -> Self {
2248 let (catalog_tx, _) = broadcast::channel(128);
2249 let (catalog_writer, _) = dynamic_catalog("mcp");
2250 Self {
2251 configs: BTreeMap::new(),
2252 options: BTreeMap::new(),
2253 connections: BTreeMap::new(),
2254 auth: BTreeMap::new(),
2255 catalog_tx,
2256 namespace: McpToolNamespace::Default,
2257 handler_config: McpHandlerConfig::default(),
2258 catalog_writer,
2259 server_tools: BTreeMap::new(),
2260 }
2261 }
2262}
2263
2264impl McpServerManager {
2265 pub fn new() -> Self {
2267 Self::default()
2268 }
2269
2270 pub fn with_namespace(mut self, namespace: McpToolNamespace) -> Self {
2272 self.namespace = namespace;
2273 self
2274 }
2275
2276 pub fn set_namespace(&mut self, namespace: McpToolNamespace) -> &mut Self {
2278 self.namespace = namespace;
2279 self
2280 }
2281
2282 pub fn namespace(&self) -> &McpToolNamespace {
2284 &self.namespace
2285 }
2286
2287 pub fn with_handler_config(mut self, handler_config: McpHandlerConfig) -> Self {
2290 self.handler_config = handler_config;
2291 self
2292 }
2293
2294 pub fn set_handler_config(&mut self, handler_config: McpHandlerConfig) -> &mut Self {
2296 self.handler_config = handler_config;
2297 self
2298 }
2299
2300 pub fn handler_config(&self) -> &McpHandlerConfig {
2302 &self.handler_config
2303 }
2304
2305 pub fn with_server(mut self, config: McpServerConfig) -> Self {
2307 self.register_server(config);
2308 self
2309 }
2310
2311 pub fn with_server_options(
2314 mut self,
2315 config: McpServerConfig,
2316 options: McpServerOptions,
2317 ) -> Self {
2318 self.register_server_with_options(config, options);
2319 self
2320 }
2321
2322 pub fn register_server(&mut self, config: McpServerConfig) -> &mut Self {
2324 let id = config.id.clone();
2325 self.configs.insert(id.clone(), config);
2326 self.options.entry(id).or_default();
2327 self
2328 }
2329
2330 pub fn register_server_with_options(
2333 &mut self,
2334 config: McpServerConfig,
2335 options: McpServerOptions,
2336 ) -> &mut Self {
2337 let id = config.id.clone();
2338 self.configs.insert(id.clone(), config);
2339 self.options.insert(id, options);
2340 self
2341 }
2342
2343 pub fn connected_server(&self, server_id: &McpServerId) -> Option<&McpServerHandle> {
2345 self.connections.get(server_id)
2346 }
2347
2348 pub fn connected_servers(&self) -> Vec<&McpServerHandle> {
2350 self.connections.values().collect()
2351 }
2352
2353 pub fn subscribe_catalog_events(&self) -> broadcast::Receiver<McpCatalogEvent> {
2355 self.catalog_tx.subscribe()
2356 }
2357
2358 fn emit_catalog_event(&self, event: McpCatalogEvent) {
2359 let _ = self.catalog_tx.send(event);
2360 }
2361
2362 async fn discover_with_options(
2363 connection: &McpConnection,
2364 options: &McpServerOptions,
2365 ) -> Result<McpDiscoverySnapshot, McpError> {
2366 match options.connect_timeout {
2367 Some(timeout) => tokio::time::timeout(timeout, connection.discover())
2368 .await
2369 .map_err(|_| McpError::Timeout {
2370 operation: "discover",
2371 duration: timeout,
2372 })?,
2373 None => connection.discover().await,
2374 }
2375 }
2376
2377 async fn connect_and_discover(
2378 config: &McpServerConfig,
2379 auth: Option<&MetadataMap>,
2380 handler_config: McpHandlerConfig,
2381 options: &McpServerOptions,
2382 ) -> Result<(Arc<McpConnection>, McpDiscoverySnapshot), McpError> {
2383 let connect = async {
2384 let connection =
2385 Arc::new(McpConnection::connect_with_auth(config, auth, handler_config).await?);
2386 let snapshot = connection.discover().await?;
2387 Ok((connection, snapshot))
2388 };
2389 match options.connect_timeout {
2390 Some(timeout) => {
2391 tokio::time::timeout(timeout, connect)
2392 .await
2393 .map_err(|_| McpError::Timeout {
2394 operation: "connect",
2395 duration: timeout,
2396 })?
2397 }
2398 None => connect.await,
2399 }
2400 }
2401
2402 pub async fn connect_server(
2404 &mut self,
2405 server_id: &McpServerId,
2406 ) -> Result<McpServerHandle, McpError> {
2407 let config = self
2408 .configs
2409 .get(server_id)
2410 .cloned()
2411 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2412 let options = self.options.get(server_id).cloned().unwrap_or_default();
2413 let (connection, snapshot) = Self::connect_and_discover(
2414 &config,
2415 self.auth.get(server_id),
2416 self.handler_config.clone(),
2417 &options,
2418 )
2419 .await?;
2420 let handle = McpServerHandle {
2421 config,
2422 connection,
2423 snapshot,
2424 namespace: self.namespace.clone(),
2425 };
2426 self.connections.insert(server_id.clone(), handle.clone());
2427 self.register_server_tools(server_id, &handle.snapshot);
2428 self.emit_catalog_event(McpCatalogEvent::ServerConnected {
2429 server_id: server_id.clone(),
2430 });
2431 Ok(handle)
2432 }
2433
2434 pub async fn connect_all(&mut self) -> Result<Vec<McpServerHandle>, McpError> {
2436 let plans: Vec<(
2437 McpServerId,
2438 McpServerConfig,
2439 McpServerOptions,
2440 Option<MetadataMap>,
2441 )> = self
2442 .configs
2443 .iter()
2444 .map(|(id, cfg)| {
2445 (
2446 id.clone(),
2447 cfg.clone(),
2448 self.options.get(id).cloned().unwrap_or_default(),
2449 self.auth.get(id).cloned(),
2450 )
2451 })
2452 .collect();
2453 let handler_config = self.handler_config.clone();
2454 let namespace = self.namespace.clone();
2455
2456 let futures = plans.into_iter().map(|(server_id, config, options, auth)| {
2457 let handler_config = handler_config.clone();
2458 let namespace = namespace.clone();
2459 async move {
2460 let (connection, snapshot) =
2461 Self::connect_and_discover(&config, auth.as_ref(), handler_config, &options)
2462 .await?;
2463 Ok::<(McpServerId, McpServerHandle), McpError>((
2464 server_id,
2465 McpServerHandle {
2466 config,
2467 connection,
2468 snapshot,
2469 namespace,
2470 },
2471 ))
2472 }
2473 });
2474
2475 let results = try_join_all(futures).await?;
2476 let mut handles = Vec::with_capacity(results.len());
2477 let mut connected: Vec<(McpServerId, McpDiscoverySnapshot)> =
2478 Vec::with_capacity(results.len());
2479 for (server_id, handle) in results {
2480 connected.push((server_id.clone(), handle.snapshot.clone()));
2481 self.connections.insert(server_id, handle.clone());
2482 handles.push(handle);
2483 }
2484 for (server_id, snapshot) in &connected {
2485 self.register_server_tools(server_id, snapshot);
2486 }
2487 for (server_id, _) in connected {
2488 self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2489 }
2490 Ok(handles)
2491 }
2492
2493 pub async fn connect_all_settled(&mut self) -> McpConnectAllSettled {
2501 let plans: Vec<(
2502 McpServerId,
2503 McpServerConfig,
2504 McpServerOptions,
2505 Option<MetadataMap>,
2506 )> = self
2507 .configs
2508 .iter()
2509 .map(|(id, cfg)| {
2510 (
2511 id.clone(),
2512 cfg.clone(),
2513 self.options.get(id).cloned().unwrap_or_default(),
2514 self.auth.get(id).cloned(),
2515 )
2516 })
2517 .collect();
2518 let handler_config = self.handler_config.clone();
2519 let namespace = self.namespace.clone();
2520
2521 let futures = plans.into_iter().map(|(server_id, config, options, auth)| {
2522 let handler_config = handler_config.clone();
2523 let namespace = namespace.clone();
2524 async move {
2525 let result = async {
2526 let (connection, snapshot) = Self::connect_and_discover(
2527 &config,
2528 auth.as_ref(),
2529 handler_config,
2530 &options,
2531 )
2532 .await?;
2533 Ok::<McpServerHandle, McpError>(McpServerHandle {
2534 config,
2535 connection,
2536 snapshot,
2537 namespace,
2538 })
2539 }
2540 .await;
2541 (server_id, result)
2542 }
2543 });
2544
2545 let results = join_all(futures).await;
2546 let mut connected = Vec::new();
2547 let mut failures = Vec::new();
2548 let mut connected_snapshots = Vec::new();
2549
2550 for (server_id, result) in results {
2551 match result {
2552 Ok(handle) => {
2553 connected_snapshots.push((server_id.clone(), handle.snapshot.clone()));
2554 self.connections.insert(server_id, handle.clone());
2555 connected.push(handle);
2556 }
2557 Err(error) => {
2558 failures.push(McpServerConnectionError { server_id, error });
2559 }
2560 }
2561 }
2562
2563 for (server_id, snapshot) in &connected_snapshots {
2564 self.register_server_tools(server_id, snapshot);
2565 }
2566 for (server_id, _) in connected_snapshots {
2567 self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2568 }
2569
2570 McpConnectAllSettled {
2571 connected,
2572 failed: failures,
2573 }
2574 }
2575
2576 pub async fn refresh_server(
2578 &mut self,
2579 server_id: &McpServerId,
2580 ) -> Result<McpDiscoverySnapshot, McpError> {
2581 let handle = self
2582 .connections
2583 .get_mut(server_id)
2584 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2585 let options = self.options.get(server_id).cloned().unwrap_or_default();
2586 let previous = handle.snapshot.clone();
2587 let snapshot = match Self::discover_with_options(&handle.connection, &options).await {
2588 Ok(snapshot) => snapshot,
2589 Err(error) => {
2590 self.emit_catalog_event(McpCatalogEvent::RefreshFailed {
2591 server_id: server_id.clone(),
2592 message: error.to_string(),
2593 });
2594 return Err(error);
2595 }
2596 };
2597 handle.snapshot = snapshot.clone();
2598 let events = diff_discovery_snapshots(server_id, &previous, &snapshot);
2599 if !events.is_empty() {
2600 self.apply_catalog_events(server_id, &snapshot, &events);
2601 for event in events {
2602 self.emit_catalog_event(event);
2603 }
2604 }
2605 Ok(snapshot)
2606 }
2607
2608 pub async fn refresh_changed_catalogs(&mut self) -> Result<Vec<McpCatalogEvent>, McpError> {
2610 let server_ids = self.connections.keys().cloned().collect::<Vec<_>>();
2611 let mut emitted = Vec::new();
2612
2613 for server_id in server_ids {
2614 let Some(connection) = self
2615 .connections
2616 .get(&server_id)
2617 .map(McpServerHandle::connection)
2618 else {
2619 continue;
2620 };
2621 let notifications = connection.drain_notifications().await;
2622 if notifications.is_empty() {
2623 continue;
2624 }
2625
2626 let handle = self
2627 .connections
2628 .get_mut(&server_id)
2629 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2630 let options = self.options.get(&server_id).cloned().unwrap_or_default();
2631 let previous = handle.snapshot.clone();
2632 let snapshot = match Self::discover_with_options(&handle.connection, &options).await {
2633 Ok(snapshot) => snapshot,
2634 Err(error) => {
2635 let event = McpCatalogEvent::RefreshFailed {
2636 server_id: server_id.clone(),
2637 message: error.to_string(),
2638 };
2639 self.emit_catalog_event(event.clone());
2640 emitted.push(event);
2641 return Err(error);
2642 }
2643 };
2644 handle.snapshot = snapshot.clone();
2645 let events = diff_discovery_snapshots(&server_id, &previous, &snapshot);
2646 if !events.is_empty() {
2647 self.apply_catalog_events(&server_id, &snapshot, &events);
2648 for event in events {
2649 self.emit_catalog_event(event.clone());
2650 emitted.push(event);
2651 }
2652 }
2653 }
2654
2655 Ok(emitted)
2656 }
2657
2658 pub async fn disconnect_server(&mut self, server_id: &McpServerId) -> Result<(), McpError> {
2660 let Some(handle) = self.connections.remove(server_id) else {
2661 return Err(McpError::UnknownServer(server_id.to_string()));
2662 };
2663 handle.connection.close().await?;
2664 self.unregister_server_tools(server_id);
2665 self.emit_catalog_event(McpCatalogEvent::ServerDisconnected {
2666 server_id: server_id.clone(),
2667 });
2668 Ok(())
2669 }
2670
2671 pub async fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), McpError> {
2673 let server_id = resolution
2674 .request()
2675 .server_id()
2676 .ok_or_else(|| McpError::AuthResolution("auth resolution missing server id".into()))?;
2677 let server_id = McpServerId::new(server_id);
2678 match &resolution {
2679 AuthResolution::Provided { credentials, .. } => {
2680 self.auth.insert(server_id.clone(), credentials.clone());
2681 }
2682 AuthResolution::Cancelled { .. } => {
2683 self.auth.remove(&server_id);
2684 }
2685 }
2686
2687 if let Some(handle) = self.connections.get(&server_id) {
2688 handle.connection.resolve_auth(resolution).await?;
2689 } else if !self.configs.contains_key(&server_id) {
2690 return Err(McpError::UnknownServer(server_id.to_string()));
2691 }
2692 self.emit_catalog_event(McpCatalogEvent::AuthChanged { server_id });
2693 Ok(())
2694 }
2695
2696 pub fn tool_registry(&self) -> ToolRegistry {
2701 self.connections
2702 .values()
2703 .fold(ToolRegistry::new(), |mut registry, handle| {
2704 for tool in handle.snapshot.tools.iter().cloned() {
2705 registry.register(McpToolAdapter::with_namespace(
2706 handle.server_id(),
2707 handle.connection.clone(),
2708 tool,
2709 &self.namespace,
2710 ));
2711 }
2712 registry
2713 })
2714 }
2715
2716 pub fn source(&self) -> CatalogReader {
2728 self.catalog_writer.reader()
2729 }
2730
2731 fn apply_catalog_events(
2736 &mut self,
2737 server_id: &McpServerId,
2738 snapshot: &McpDiscoverySnapshot,
2739 events: &[McpCatalogEvent],
2740 ) {
2741 for event in events {
2742 if let McpCatalogEvent::ToolsChanged {
2743 added,
2744 removed,
2745 changed,
2746 ..
2747 } = event
2748 {
2749 self.apply_server_tool_diff(server_id, snapshot, added, removed, changed);
2750 }
2751 }
2752 }
2753
2754 fn register_server_tools(&mut self, server_id: &McpServerId, snapshot: &McpDiscoverySnapshot) {
2758 let connection = match self.connections.get(server_id) {
2759 Some(handle) => handle.connection.clone(),
2760 None => return,
2761 };
2762 let previous = self.server_tools.remove(server_id).unwrap_or_default();
2763 let mut names = BTreeSet::new();
2764 for tool in &snapshot.tools {
2765 let adapter = McpToolAdapter::with_namespace(
2766 server_id,
2767 connection.clone(),
2768 tool.clone(),
2769 &self.namespace,
2770 );
2771 names.insert(adapter.spec().name.clone());
2772 self.catalog_writer.upsert(Arc::new(adapter));
2773 }
2774 for stale in previous.difference(&names) {
2775 self.catalog_writer.remove(stale);
2776 }
2777 self.server_tools.insert(server_id.clone(), names);
2778 }
2779
2780 fn unregister_server_tools(&mut self, server_id: &McpServerId) {
2783 let Some(names) = self.server_tools.remove(server_id) else {
2784 return;
2785 };
2786 for name in names {
2787 self.catalog_writer.remove(&name);
2788 }
2789 }
2790
2791 fn apply_server_tool_diff(
2795 &mut self,
2796 server_id: &McpServerId,
2797 snapshot: &McpDiscoverySnapshot,
2798 added: &[String],
2799 removed: &[String],
2800 changed: &[String],
2801 ) {
2802 let connection = match self.connections.get(server_id) {
2803 Some(handle) => handle.connection.clone(),
2804 None => return,
2805 };
2806 let names = self.server_tools.entry(server_id.clone()).or_default();
2807
2808 for raw_name in removed {
2809 let agentkit_name = ToolName::new(self.namespace.apply(server_id, raw_name));
2810 if names.remove(&agentkit_name) {
2811 self.catalog_writer.remove(&agentkit_name);
2812 }
2813 }
2814
2815 let upsert_one = |raw_name: &str| -> Option<(ToolName, McpToolAdapter)> {
2816 let tool = snapshot
2817 .tools
2818 .iter()
2819 .find(|tool| tool.name.as_ref() == raw_name)?
2820 .clone();
2821 let adapter = McpToolAdapter::with_namespace(
2822 server_id,
2823 connection.clone(),
2824 tool,
2825 &self.namespace,
2826 );
2827 Some((adapter.spec().name.clone(), adapter))
2828 };
2829
2830 for raw_name in added.iter().chain(changed.iter()) {
2831 if let Some((agentkit_name, adapter)) = upsert_one(raw_name) {
2832 names.insert(agentkit_name);
2833 self.catalog_writer.upsert(Arc::new(adapter));
2834 }
2835 }
2836 }
2837
2838 pub fn capability_provider(&self) -> McpCapabilityProvider {
2840 McpCapabilityProvider::merge(
2841 self.connections
2842 .values()
2843 .map(McpServerHandle::capability_provider),
2844 )
2845 }
2846}
2847
2848fn diff_discovery_snapshots(
2849 server_id: &McpServerId,
2850 previous: &McpDiscoverySnapshot,
2851 current: &McpDiscoverySnapshot,
2852) -> Vec<McpCatalogEvent> {
2853 let mut events = Vec::new();
2854 let (added, removed, changed) = diff_named_items(
2855 previous.tools.iter().map(|item| (item.name.as_ref(), item)),
2856 current.tools.iter().map(|item| (item.name.as_ref(), item)),
2857 );
2858 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2859 events.push(McpCatalogEvent::ToolsChanged {
2860 server_id: server_id.clone(),
2861 added,
2862 removed,
2863 changed,
2864 });
2865 }
2866
2867 let (added, removed, changed) = diff_named_items(
2868 previous
2869 .resources
2870 .iter()
2871 .map(|item| (item.uri.as_str(), item)),
2872 current
2873 .resources
2874 .iter()
2875 .map(|item| (item.uri.as_str(), item)),
2876 );
2877 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2878 events.push(McpCatalogEvent::ResourcesChanged {
2879 server_id: server_id.clone(),
2880 added,
2881 removed,
2882 changed,
2883 });
2884 }
2885
2886 let (added, removed, changed) = diff_named_items(
2887 previous
2888 .prompts
2889 .iter()
2890 .map(|item| (item.name.as_str(), item)),
2891 current
2892 .prompts
2893 .iter()
2894 .map(|item| (item.name.as_str(), item)),
2895 );
2896 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2897 events.push(McpCatalogEvent::PromptsChanged {
2898 server_id: server_id.clone(),
2899 added,
2900 removed,
2901 changed,
2902 });
2903 }
2904
2905 events
2906}
2907
2908fn diff_named_items<'a, T>(
2912 previous: impl IntoIterator<Item = (&'a str, &'a T)>,
2913 current: impl IntoIterator<Item = (&'a str, &'a T)>,
2914) -> (Vec<String>, Vec<String>, Vec<String>)
2915where
2916 T: PartialEq + 'a,
2917{
2918 let mut prev: Vec<(&str, &T)> = previous.into_iter().collect();
2919 let mut curr: Vec<(&str, &T)> = current.into_iter().collect();
2920 prev.sort_unstable_by_key(|(name, _)| *name);
2921 curr.sort_unstable_by_key(|(name, _)| *name);
2922
2923 let mut added = Vec::new();
2924 let mut removed = Vec::new();
2925 let mut changed = Vec::new();
2926 let (mut i, mut j) = (0, 0);
2927 while i < prev.len() && j < curr.len() {
2928 match prev[i].0.cmp(curr[j].0) {
2929 std::cmp::Ordering::Less => {
2930 removed.push(prev[i].0.to_string());
2931 i += 1;
2932 }
2933 std::cmp::Ordering::Greater => {
2934 added.push(curr[j].0.to_string());
2935 j += 1;
2936 }
2937 std::cmp::Ordering::Equal => {
2938 if prev[i].1 != curr[j].1 {
2939 changed.push(curr[j].0.to_string());
2940 }
2941 i += 1;
2942 j += 1;
2943 }
2944 }
2945 }
2946 while i < prev.len() {
2947 removed.push(prev[i].0.to_string());
2948 i += 1;
2949 }
2950 while j < curr.len() {
2951 added.push(curr[j].0.to_string());
2952 j += 1;
2953 }
2954
2955 (added, removed, changed)
2956}
2957
2958pub struct McpToolAdapter {
2960 tool_name: String,
2961 connection: Arc<McpConnection>,
2962 spec: ToolSpec,
2963}
2964
2965impl McpToolAdapter {
2966 pub fn new(server_id: &McpServerId, connection: Arc<McpConnection>, tool: McpTool) -> Self {
2969 Self::with_namespace(server_id, connection, tool, &McpToolNamespace::Default)
2970 }
2971
2972 pub fn with_namespace(
2974 server_id: &McpServerId,
2975 connection: Arc<McpConnection>,
2976 tool: McpTool,
2977 namespace: &McpToolNamespace,
2978 ) -> Self {
2979 let spec = tool_spec_from_tool(server_id, &tool, namespace);
2980 Self {
2981 tool_name: tool.name.into_owned(),
2982 connection,
2983 spec,
2984 }
2985 }
2986
2987 async fn handle_invocation_error(
2988 &self,
2989 err: McpInvocationError,
2990 input: &Value,
2991 ) -> Result<CallToolResult, ToolError> {
2992 let Some(responder) = self.connection.handler_config().error_responder.clone() else {
2993 return Err(ToolError::ExecutionFailed(err.to_string()));
2994 };
2995 let method = McpMethod::ToolsCall {
2996 name: self.tool_name.clone(),
2997 arguments: input.clone(),
2998 };
2999 let ctx = McpErrorContext {
3000 server_id: self.connection.server_id(),
3001 method: &method,
3002 input: Some(input),
3003 };
3004 match responder.handle(&err, ctx).await {
3005 ErrorResponderOutcome::SynthesizeResult(result) => Ok(result),
3006 ErrorResponderOutcome::PassThrough => Err(ToolError::ExecutionFailed(err.to_string())),
3007 }
3008 }
3009}
3010
3011#[async_trait]
3012impl Tool for McpToolAdapter {
3013 fn spec(&self) -> &ToolSpec {
3014 &self.spec
3015 }
3016
3017 async fn invoke(
3018 &self,
3019 request: ToolRequest,
3020 _ctx: &mut ToolContext<'_>,
3021 ) -> Result<ToolResult, ToolError> {
3022 let input = request.input;
3023 let result = match self
3024 .connection
3025 .call_tool(&self.tool_name, input.clone())
3026 .await
3027 {
3028 Ok(result) => result,
3029 Err(McpError::AuthRequired(auth_request)) => {
3030 let responder = self
3031 .connection
3032 .handler_config()
3033 .auth
3034 .clone()
3035 .ok_or_else(|| {
3036 ToolError::ExecutionFailed(
3037 "MCP server requires auth but no McpAuthResponder is registered".into(),
3038 )
3039 })?;
3040 let resolution = responder.resolve(*auth_request).await.map_err(|error| {
3041 ToolError::ExecutionFailed(format!("auth responder failed: {error}"))
3042 })?;
3043 match &resolution {
3044 AuthResolution::Provided { .. } => {
3045 self.connection
3046 .resolve_auth(resolution.clone())
3047 .await
3048 .map_err(|error| {
3049 ToolError::ExecutionFailed(format!(
3050 "applying auth resolution failed: {error}"
3051 ))
3052 })?;
3053 }
3054 AuthResolution::Cancelled { .. } => {
3055 return Err(ToolError::ExecutionFailed(
3056 "user cancelled MCP auth flow".into(),
3057 ));
3058 }
3059 }
3060 match self
3061 .connection
3062 .call_tool(&self.tool_name, input.clone())
3063 .await
3064 {
3065 Ok(result) => result,
3066 Err(McpError::AuthRequired(req)) => {
3067 return Err(ToolError::ExecutionFailed(format!(
3068 "MCP auth challenge unresolved after retry: {}",
3069 req.id
3070 )));
3071 }
3072 Err(McpError::Invocation(err)) => {
3073 self.handle_invocation_error(err, &input).await?
3074 }
3075 Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
3076 }
3077 }
3078 Err(McpError::Invocation(err)) => self.handle_invocation_error(err, &input).await?,
3079 Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
3080 };
3081
3082 let is_error = result.is_error.unwrap_or(false);
3083 Ok(ToolResult {
3084 result: ToolResultPart {
3085 call_id: request.call_id,
3086 output: call_tool_result_to_tool_output(result),
3087 is_error,
3088 metadata: MetadataMap::new(),
3089 },
3090 duration: None,
3091 metadata: MetadataMap::new(),
3092 })
3093 }
3094}
3095
3096fn rmcp_server_capabilities_to_agentkit(
3097 capabilities: &rmcp_model::ServerCapabilities,
3098) -> McpServerCapabilities {
3099 McpServerCapabilities {
3100 tools: capabilities.tools.as_ref().map(|tools| ToolsCapability {
3101 list_changed: tools.list_changed,
3102 }),
3103 resources: capabilities
3104 .resources
3105 .as_ref()
3106 .map(|resources| ResourcesCapability {
3107 subscribe: resources.subscribe,
3108 list_changed: resources.list_changed,
3109 }),
3110 prompts: capabilities
3111 .prompts
3112 .as_ref()
3113 .map(|prompts| PromptsCapability {
3114 list_changed: prompts.list_changed,
3115 }),
3116 logging: capabilities.logging.as_ref().map(|_| LoggingCapability {}),
3117 }
3118}
3119
3120fn tool_spec_from_tool(
3121 server_id: &McpServerId,
3122 tool: &McpTool,
3123 namespace: &McpToolNamespace,
3124) -> ToolSpec {
3125 ToolSpec {
3126 name: ToolName::new(namespace.apply(server_id, &tool.name)),
3127 description: tool
3128 .description
3129 .as_ref()
3130 .map(|d| d.to_string())
3131 .unwrap_or_else(|| tool.name.to_string()),
3132 input_schema: Value::Object((*tool.input_schema).clone()),
3133 output_schema: tool
3134 .output_schema
3135 .as_ref()
3136 .map(|schema| Value::Object((**schema).clone())),
3137 annotations: tool_annotations_from_rmcp(tool.annotations.as_ref()),
3138 metadata: MetadataMap::new(),
3139 }
3140}
3141
3142fn tool_annotations_from_rmcp(annotations: Option<&McpToolAnnotations>) -> ToolAnnotations {
3143 let Some(annotations) = annotations else {
3144 return ToolAnnotations::default();
3145 };
3146 ToolAnnotations {
3153 read_only_hint: annotations.read_only_hint.unwrap_or(false),
3154 destructive_hint: annotations.destructive_hint.unwrap_or(false),
3155 idempotent_hint: annotations.idempotent_hint.unwrap_or(false),
3156 needs_approval_hint: false,
3157 supports_streaming_hint: false,
3158 }
3159}
3160
3161fn resource_descriptor_from_rmcp(resource: McpResource) -> ResourceDescriptor {
3162 let raw = resource.raw;
3163 ResourceDescriptor {
3164 id: ResourceId::new(raw.uri),
3165 name: raw.name,
3166 description: raw.description,
3167 mime_type: raw.mime_type,
3168 metadata: MetadataMap::new(),
3169 }
3170}
3171
3172fn prompt_descriptor_from_rmcp(prompt: McpPrompt) -> PromptDescriptor {
3173 let arguments = prompt.arguments.unwrap_or_default();
3174 let mut required = Vec::new();
3175 let properties = arguments
3176 .into_iter()
3177 .map(|argument| {
3178 let mut schema = serde_json::Map::new();
3179 schema.insert("type".into(), Value::String("string".into()));
3180 if let Some(description) = argument.description {
3181 schema.insert("description".into(), Value::String(description));
3182 }
3183 if argument.required.unwrap_or(false) {
3184 required.push(Value::String(argument.name.clone()));
3185 }
3186 (argument.name, Value::Object(schema))
3187 })
3188 .collect::<serde_json::Map<String, Value>>();
3189 let mut input_schema = serde_json::Map::new();
3190 input_schema.insert("type".into(), Value::String("object".into()));
3191 input_schema.insert("properties".into(), Value::Object(properties));
3192 if !required.is_empty() {
3193 input_schema.insert("required".into(), Value::Array(required));
3194 }
3195
3196 PromptDescriptor {
3197 id: PromptId::new(prompt.name.clone()),
3198 name: prompt.name,
3199 description: prompt.description,
3200 input_schema: Value::Object(input_schema),
3201 metadata: MetadataMap::new(),
3202 }
3203}
3204
3205fn read_resource_result_to_capabilities(
3206 result: ReadResourceResult,
3207) -> Result<ResourceContents, McpError> {
3208 let content = result
3209 .contents
3210 .into_iter()
3211 .next()
3212 .ok_or_else(|| McpError::Protocol("resources/read returned no contents".into()))?;
3213 Ok(resource_contents_to_capabilities(content))
3214}
3215
3216fn resource_contents_to_capabilities(content: McpResourceContents) -> ResourceContents {
3217 let mut metadata = MetadataMap::new();
3218 let data = match content {
3219 McpResourceContents::TextResourceContents {
3220 text, mime_type, ..
3221 } => {
3222 if let Some(mime) = mime_type {
3223 metadata.insert("mime_type".into(), Value::String(mime));
3224 }
3225 DataRef::InlineText(text)
3226 }
3227 McpResourceContents::BlobResourceContents {
3228 blob,
3229 mime_type,
3230 uri,
3231 ..
3232 } => {
3233 if let Some(mime) = mime_type {
3234 metadata.insert("mime_type".into(), Value::String(mime));
3235 }
3236 metadata.insert("uri".into(), Value::String(uri));
3237 DataRef::InlineText(blob)
3239 }
3240 };
3241 ResourceContents { data, metadata }
3242}
3243
3244fn get_prompt_result_to_capabilities(result: GetPromptResult) -> PromptContents {
3245 let items = result
3246 .messages
3247 .into_iter()
3248 .map(prompt_message_to_item)
3249 .collect();
3250 let mut metadata = MetadataMap::new();
3251 if let Some(description) = result.description {
3252 metadata.insert("description".into(), Value::String(description));
3253 }
3254 PromptContents { items, metadata }
3255}
3256
3257fn prompt_message_to_item(message: PromptMessage) -> Item {
3258 let kind = match message.role {
3259 PromptMessageRole::Assistant => ItemKind::Assistant,
3260 PromptMessageRole::User => ItemKind::User,
3261 };
3262 Item {
3263 id: None,
3264 kind,
3265 parts: vec![prompt_message_content_to_part(message.content)],
3266 metadata: MetadataMap::new(),
3267 usage: None,
3268 finish_reason: None,
3269 created_at: None,
3270 }
3271}
3272
3273fn prompt_message_content_to_part(content: PromptMessageContent) -> Part {
3274 match content {
3275 PromptMessageContent::Text { text } => Part::Text(TextPart::new(text)),
3276 PromptMessageContent::Image { image } => Part::Media(MediaPart::new(
3277 Modality::Image,
3278 image.mime_type.clone(),
3279 DataRef::InlineText(image.data.clone()),
3280 )),
3281 PromptMessageContent::Resource { resource } => {
3282 let agentkit_resource = resource_contents_to_capabilities(resource.resource.clone());
3283 agentkit_part_from_resource(agentkit_resource)
3284 }
3285 PromptMessageContent::ResourceLink { link } => Part::Text(TextPart::new(link.uri.clone())),
3286 }
3287}
3288
3289fn agentkit_part_from_resource(resource: ResourceContents) -> Part {
3290 let mime = resource
3291 .metadata
3292 .get("mime_type")
3293 .and_then(Value::as_str)
3294 .unwrap_or("text/plain")
3295 .to_string();
3296 Part::Media(MediaPart::new(Modality::Binary, mime, resource.data))
3297}
3298
3299fn call_tool_result_to_tool_output(result: CallToolResult) -> ToolOutput {
3300 if let Some(structured) = result.structured_content {
3301 return ToolOutput::Structured(structured);
3302 }
3303 let parts = call_tool_content_to_parts(result.content);
3304 if parts.iter().all(|part| matches!(part, Part::Text(_))) {
3305 let text = parts
3306 .iter()
3307 .filter_map(|part| match part {
3308 Part::Text(text) => Some(text.text.clone()),
3309 _ => None,
3310 })
3311 .collect::<Vec<_>>()
3312 .join("\n");
3313 ToolOutput::Text(text)
3314 } else {
3315 ToolOutput::Parts(parts)
3316 }
3317}
3318
3319fn call_tool_content_to_parts(contents: Vec<Content>) -> Vec<Part> {
3320 contents.into_iter().map(content_to_part).collect()
3321}
3322
3323fn content_to_part(content: Content) -> Part {
3324 match content.raw {
3325 RawContent::Text(text) => Part::Text(TextPart::new(text.text)),
3326 RawContent::Image(image) => Part::Media(MediaPart::new(
3327 Modality::Image,
3328 image.mime_type,
3329 DataRef::InlineText(image.data),
3330 )),
3331 RawContent::Audio(audio) => Part::Media(MediaPart::new(
3332 Modality::Audio,
3333 audio.mime_type,
3334 DataRef::InlineText(audio.data),
3335 )),
3336 RawContent::Resource(embedded) => {
3337 agentkit_part_from_resource(resource_contents_to_capabilities(embedded.resource))
3338 }
3339 RawContent::ResourceLink(link) => Part::Text(TextPart::new(link.uri)),
3340 }
3341}
3342
3343fn value_to_json_object(value: Value, context: &str) -> Result<rmcp_model::JsonObject, McpError> {
3344 match value {
3345 Value::Object(object) => Ok(object),
3346 Value::Null => Ok(serde_json::Map::new()),
3347 other => Err(McpError::Protocol(format!(
3348 "{context} must be a JSON object, got {other}"
3349 ))),
3350 }
3351}
3352
3353fn bearer_token_from_metadata(metadata: &MetadataMap) -> Option<String> {
3354 ["bearer_token", "access_token", "token", "api_key"]
3355 .into_iter()
3356 .find_map(|key| metadata.get(key).and_then(Value::as_str).map(str::to_owned))
3357}
3358
3359fn rmcp_initialize_error(config: &McpServerConfig, error: ClientInitializeError) -> McpError {
3360 if let Some(signal) = match &error {
3361 ClientInitializeError::TransportError { error: dyn_err, .. } => {
3362 transport_auth_signal(dyn_err)
3363 }
3364 _ => None,
3365 } {
3366 return McpError::AuthRequired(Box::new(auth_request_from_signal(
3367 &config.id,
3368 McpMethod::Initialize,
3369 signal,
3370 &error.to_string(),
3371 )));
3372 }
3373 McpError::Transport(error.to_string())
3374}
3375
3376fn rmcp_service_error(error: ServiceError) -> McpError {
3377 service_error_to_mcp_error(error)
3378}
3379
3380fn rmcp_operation_error(
3381 server_id: &McpServerId,
3382 method: McpMethod,
3383 error: ServiceError,
3384) -> McpError {
3385 if let Some(signal) = service_auth_signal(&error) {
3386 return McpError::AuthRequired(Box::new(auth_request_from_signal(
3387 server_id,
3388 method,
3389 signal,
3390 &error.to_string(),
3391 )));
3392 }
3393 service_error_to_mcp_error(error)
3394}
3395
3396fn service_error_to_mcp_error(error: ServiceError) -> McpError {
3397 match error {
3398 ServiceError::McpError(data) => {
3399 McpError::Invocation(McpInvocationError::from_error_data(data))
3400 }
3401 other => McpError::Transport(other.to_string()),
3402 }
3403}
3404
3405#[derive(Debug)]
3406enum AuthSignal {
3407 Required {
3408 www_authenticate: Option<String>,
3409 },
3410 InsufficientScope {
3411 www_authenticate: Option<String>,
3412 required_scope: Option<String>,
3413 },
3414}
3415
3416fn service_auth_signal(error: &ServiceError) -> Option<AuthSignal> {
3417 match error {
3418 ServiceError::TransportSend(dyn_err) => transport_auth_signal(dyn_err),
3419 _ => None,
3420 }
3421}
3422
3423fn transport_auth_signal(error: &DynamicTransportError) -> Option<AuthSignal> {
3424 let inner = error
3425 .error
3426 .downcast_ref::<StreamableHttpError<reqwest::Error>>()?;
3427 match inner {
3428 StreamableHttpError::AuthRequired(AuthRequiredError {
3429 www_authenticate_header,
3430 ..
3431 }) => Some(AuthSignal::Required {
3432 www_authenticate: Some(www_authenticate_header.clone()),
3433 }),
3434 StreamableHttpError::InsufficientScope(InsufficientScopeError {
3435 www_authenticate_header,
3436 required_scope,
3437 ..
3438 }) => Some(AuthSignal::InsufficientScope {
3439 www_authenticate: Some(www_authenticate_header.clone()),
3440 required_scope: required_scope.clone(),
3441 }),
3442 _ => None,
3443 }
3444}
3445
3446fn auth_request_from_signal(
3447 server_id: &McpServerId,
3448 method: McpMethod,
3449 signal: AuthSignal,
3450 message: &str,
3451) -> AuthRequest {
3452 let method_name = method.method_name();
3453 let mut challenge = MetadataMap::new();
3454 challenge.insert("server_id".into(), Value::String(server_id.to_string()));
3455 challenge.insert("method".into(), Value::String(method_name.into()));
3456 challenge.insert("message".into(), Value::String(message.into()));
3457 challenge.insert("flow_kind".into(), Value::String("http_bearer".into()));
3458 match signal {
3459 AuthSignal::Required { www_authenticate } => {
3460 if let Some(header) = www_authenticate {
3461 challenge.insert("www_authenticate".into(), Value::String(header));
3462 }
3463 }
3464 AuthSignal::InsufficientScope {
3465 www_authenticate,
3466 required_scope,
3467 } => {
3468 challenge.insert("insufficient_scope".into(), Value::Bool(true));
3469 if let Some(header) = www_authenticate {
3470 challenge.insert("www_authenticate".into(), Value::String(header));
3471 }
3472 if let Some(scope) = required_scope {
3473 challenge.insert("required_scope".into(), Value::String(scope));
3474 }
3475 }
3476 }
3477 AuthRequest {
3478 id: format!("mcp:{}:{}", server_id, method_name),
3479 provider: format!("mcp.{}", server_id),
3480 operation: method.into_auth_operation(server_id),
3481 challenge,
3482 }
3483}
3484
3485#[derive(Debug, Clone)]
3492pub enum McpMethod {
3493 Initialize,
3495 ToolsCall {
3497 name: String,
3499 arguments: Value,
3501 },
3502 ResourcesRead {
3504 uri: String,
3506 },
3507 ResourcesSubscribe {
3509 uri: String,
3511 },
3512 ResourcesUnsubscribe {
3514 uri: String,
3516 },
3517 PromptsGet {
3519 name: String,
3521 arguments: Value,
3523 },
3524 LoggingSetLevel {
3526 level: String,
3528 },
3529}
3530
3531impl McpMethod {
3532 pub fn method_name(&self) -> &'static str {
3534 match self {
3535 Self::Initialize => "initialize",
3536 Self::ToolsCall { .. } => "tools/call",
3537 Self::ResourcesRead { .. } => "resources/read",
3538 Self::ResourcesSubscribe { .. } => "resources/subscribe",
3539 Self::ResourcesUnsubscribe { .. } => "resources/unsubscribe",
3540 Self::PromptsGet { .. } => "prompts/get",
3541 Self::LoggingSetLevel { .. } => "logging/setLevel",
3542 }
3543 }
3544
3545 fn into_auth_operation(self, server_id: &McpServerId) -> AuthOperation {
3546 let server = server_id.to_string();
3547 match self {
3548 Self::Initialize => AuthOperation::McpConnect {
3549 server_id: server,
3550 metadata: MetadataMap::new(),
3551 },
3552 Self::ToolsCall { name, arguments } => AuthOperation::McpToolCall {
3553 server_id: server,
3554 tool_name: name,
3555 input: arguments,
3556 metadata: MetadataMap::new(),
3557 },
3558 Self::ResourcesRead { uri } => AuthOperation::McpResourceRead {
3559 server_id: server,
3560 resource_id: uri,
3561 metadata: MetadataMap::new(),
3562 },
3563 Self::PromptsGet { name, arguments } => AuthOperation::McpPromptGet {
3564 server_id: server,
3565 prompt_id: name,
3566 args: arguments,
3567 metadata: MetadataMap::new(),
3568 },
3569 other @ (Self::ResourcesSubscribe { .. }
3570 | Self::ResourcesUnsubscribe { .. }
3571 | Self::LoggingSetLevel { .. }) => {
3572 let method = other.method_name().to_string();
3573 AuthOperation::McpOther {
3574 server_id: server,
3575 method,
3576 params: other.into_params_json(),
3577 metadata: MetadataMap::new(),
3578 }
3579 }
3580 }
3581 }
3582
3583 fn into_params_json(self) -> Value {
3584 match self {
3585 Self::Initialize => json!({}),
3586 Self::ToolsCall { name, arguments } => json!({ "name": name, "arguments": arguments }),
3587 Self::ResourcesRead { uri } => json!({ "uri": uri }),
3588 Self::ResourcesSubscribe { uri } => json!({ "uri": uri }),
3589 Self::ResourcesUnsubscribe { uri } => json!({ "uri": uri }),
3590 Self::PromptsGet { name, arguments } => {
3591 json!({ "name": name, "arguments": arguments })
3592 }
3593 Self::LoggingSetLevel { level } => json!({ "level": level }),
3594 }
3595 }
3596}
3597
3598#[derive(Debug, Error)]
3600pub enum McpError {
3601 #[error("io error: {0}")]
3603 Io(#[from] std::io::Error),
3604 #[error("serialization error: {0}")]
3606 Serialize(#[from] serde_json::Error),
3607 #[error("transport error: {0}")]
3609 Transport(String),
3610 #[error("{operation} timed out after {duration:?}")]
3612 Timeout {
3613 operation: &'static str,
3615 duration: Duration,
3617 },
3618 #[error("protocol error: {0}")]
3620 Protocol(String),
3621 #[error("MCP auth required: {0:?}")]
3623 AuthRequired(Box<AuthRequest>),
3624 #[error("auth resolution error: {0}")]
3626 AuthResolution(String),
3627 #[error("invocation error: {0}")]
3629 Invocation(McpInvocationError),
3630 #[error("unknown MCP server: {0}")]
3632 UnknownServer(String),
3633}
3634
3635impl From<&str> for McpServerId {
3636 fn from(value: &str) -> Self {
3637 Self::new(value)
3638 }
3639}
3640
3641impl From<String> for McpServerId {
3642 fn from(value: String) -> Self {
3643 Self::new(value)
3644 }
3645}