1use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::fmt;
24use std::sync::{Arc, RwLock};
25
26use agentkit_capabilities::{
27 CapabilityContext, CapabilityError, CapabilityProvider, Invocable, PromptContents,
28 PromptDescriptor, PromptId, PromptProvider, ResourceContents, ResourceDescriptor, ResourceId,
29 ResourceProvider,
30};
31use agentkit_core::{
32 DataRef, Item, ItemKind, MediaPart, MetadataMap, Modality, Part, TextPart, ToolOutput,
33 ToolResultPart,
34};
35use agentkit_tools_core::{
36 AllowAllPermissions, CatalogReader, CatalogWriter, PermissionChecker, Tool, ToolAnnotations,
37 ToolCapabilityProvider, ToolContext, ToolError, ToolName, ToolRegistry, ToolRequest,
38 ToolResult, ToolSpec, dynamic_catalog,
39};
40use async_trait::async_trait;
41use futures_util::future::try_join_all;
42use futures_util::stream::BoxStream;
43use http::{HeaderName, HeaderValue};
44use rmcp::ServiceExt;
45use rmcp::handler::client::ClientHandler;
46use rmcp::model as rmcp_model;
47use rmcp::service::{ClientInitializeError, Peer, RoleClient, RunningService, ServiceError};
48use rmcp::transport::streamable_http_client::{
49 AuthRequiredError, InsufficientScopeError, StreamableHttpClient as RmcpStreamableHttpClient,
50 StreamableHttpClientTransportConfig as RmcpStreamableHttpClientTransportConfig,
51 StreamableHttpError, StreamableHttpPostResponse,
52};
53use rmcp::transport::{
54 ConfigureCommandExt, DynamicTransportError, StreamableHttpClientTransport, TokioChildProcess,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::{Value, json};
58use sse_stream::{Error as SseError, Sse};
59use thiserror::Error;
60use tokio::sync::{Mutex, broadcast, mpsc};
61
62pub use rmcp::model::{
67 Annotations as McpAnnotations, AudioContent, CallToolResult,
68 CancelledNotificationParam as McpCancelledNotificationParam,
69 ClientCapabilities as McpClientCapabilities, Content,
70 CreateElicitationRequestParams as McpCreateElicitationRequestParams,
71 CreateElicitationResult as McpCreateElicitationResult,
72 CreateMessageRequestParams as McpCreateMessageRequestParams,
73 CreateMessageResult as McpCreateMessageResult, ElicitationAction as McpElicitationAction,
74 ElicitationCapability as McpElicitationCapability, EmbeddedResource,
75 FormElicitationCapability as McpFormElicitationCapability, GetPromptResult, ImageContent,
76 Implementation as McpImplementation, ListRootsResult as McpListRootsResult,
77 LoggingLevel as McpLoggingLevel,
78 LoggingMessageNotificationParam as McpLoggingMessageNotificationParam,
79 ProgressNotificationParam as McpProgressNotificationParam, Prompt as McpPrompt, PromptArgument,
80 PromptMessage, PromptMessageContent, PromptMessageRole, RawAudioContent, RawContent,
81 RawEmbeddedResource, RawImageContent, RawResource as McpRawResource, RawTextContent,
82 ReadResourceResult, Resource as McpResource, ResourceContents as McpResourceContents,
83 ResourceUpdatedNotificationParam as McpResourceUpdatedNotificationParam, Root as McpRoot,
84 RootsCapabilities as McpRootsCapabilities, SamplingCapability as McpSamplingCapability,
85 SamplingMessage as McpSamplingMessage, SetLevelRequestParams as McpSetLevelRequestParams,
86 TextContent, Tool as McpTool, ToolAnnotations as McpToolAnnotations,
87 UrlElicitationCapability as McpUrlElicitationCapability,
88};
89
90pub use rmcp::model::ClientJsonRpcMessage;
93
94pub use rmcp::transport::streamable_http_client::{
97 StreamableHttpError as McpStreamableHttpError,
98 StreamableHttpPostResponse as McpStreamableHttpPostResponse,
99};
100
101pub use sse_stream::{Error as McpSseError, Sse as McpSse};
103
104pub type McpToolDescriptor = McpTool;
106pub type McpResourceDescriptor = McpResource;
108pub type McpPromptDescriptor = McpPrompt;
110
111#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
120pub struct AuthRequest {
121 pub id: String,
123 pub provider: String,
125 pub operation: AuthOperation,
127 pub challenge: MetadataMap,
129}
130
131impl AuthRequest {
132 pub fn server_id(&self) -> Option<&str> {
134 self.operation.server_id()
135 }
136}
137
138#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140pub enum AuthOperation {
141 McpConnect {
143 server_id: String,
144 metadata: MetadataMap,
145 },
146 McpToolCall {
148 server_id: String,
149 tool_name: String,
150 input: Value,
151 metadata: MetadataMap,
152 },
153 McpResourceRead {
155 server_id: String,
156 resource_id: String,
157 metadata: MetadataMap,
158 },
159 McpPromptGet {
161 server_id: String,
162 prompt_id: String,
163 args: Value,
164 metadata: MetadataMap,
165 },
166 McpOther {
171 server_id: String,
172 method: String,
173 params: Value,
174 metadata: MetadataMap,
175 },
176}
177
178impl AuthOperation {
179 pub fn server_id(&self) -> Option<&str> {
181 match self {
182 Self::McpConnect { server_id, .. }
183 | Self::McpToolCall { server_id, .. }
184 | Self::McpResourceRead { server_id, .. }
185 | Self::McpPromptGet { server_id, .. }
186 | Self::McpOther { server_id, .. } => Some(server_id.as_str()),
187 }
188 }
189}
190
191#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum AuthResolution {
194 Provided {
196 request: AuthRequest,
197 credentials: MetadataMap,
198 },
199 Cancelled { request: AuthRequest },
201}
202
203impl AuthResolution {
204 pub fn provided(request: AuthRequest, credentials: MetadataMap) -> Self {
206 Self::Provided {
207 request,
208 credentials,
209 }
210 }
211
212 pub fn cancelled(request: AuthRequest) -> Self {
214 Self::Cancelled { request }
215 }
216
217 pub fn request(&self) -> &AuthRequest {
219 match self {
220 Self::Provided { request, .. } | Self::Cancelled { request } => request,
221 }
222 }
223}
224
225#[async_trait]
238pub trait McpAuthResponder: Send + Sync + 'static {
239 async fn resolve(&self, request: AuthRequest) -> Result<AuthResolution, McpError>;
240}
241
242#[derive(Debug, Clone, thiserror::Error)]
257pub enum McpInvocationError {
258 #[error("url elicitation required: {message}")]
260 UrlElicitation {
261 message: String,
263 data: Option<UrlElicitationData>,
266 raw_data: Option<serde_json::Value>,
268 },
269 #[error("invalid request: {message}")]
271 InvalidRequest {
272 message: String,
273 data: Option<serde_json::Value>,
274 },
275 #[error("method not found: {message}")]
277 MethodNotFound {
278 message: String,
279 data: Option<serde_json::Value>,
280 },
281 #[error("invalid params: {message}")]
283 InvalidParams {
284 message: String,
285 data: Option<serde_json::Value>,
286 },
287 #[error("internal error: {message}")]
289 InternalError {
290 message: String,
291 data: Option<serde_json::Value>,
292 },
293 #[error("parse error: {message}")]
295 ParseError {
296 message: String,
297 data: Option<serde_json::Value>,
298 },
299 #[error("resource not found: {message}")]
301 ResourceNotFound {
302 message: String,
303 data: Option<serde_json::Value>,
304 },
305 #[error("mcp error code {code}: {message}")]
308 Other {
309 code: i32,
310 message: String,
311 data: Option<serde_json::Value>,
312 },
313}
314
315#[derive(Debug, Clone, serde::Deserialize)]
323#[serde(rename_all = "camelCase")]
324pub struct UrlElicitationData {
325 pub url: String,
327 pub elicitation_id: String,
329 #[serde(default)]
331 pub message: Option<String>,
332}
333
334impl McpInvocationError {
335 pub fn from_error_data(err: rmcp::model::ErrorData) -> Self {
339 let rmcp::model::ErrorData {
340 code,
341 message,
342 data,
343 } = err;
344 let message = message.into_owned();
345 match code {
346 rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED => {
347 let typed = data.as_ref().and_then(|value| {
348 serde_json::from_value::<UrlElicitationData>(value.clone()).ok()
349 });
350 Self::UrlElicitation {
351 message,
352 data: typed,
353 raw_data: data,
354 }
355 }
356 rmcp::model::ErrorCode::INVALID_REQUEST => Self::InvalidRequest { message, data },
357 rmcp::model::ErrorCode::METHOD_NOT_FOUND => Self::MethodNotFound { message, data },
358 rmcp::model::ErrorCode::INVALID_PARAMS => Self::InvalidParams { message, data },
359 rmcp::model::ErrorCode::INTERNAL_ERROR => Self::InternalError { message, data },
360 rmcp::model::ErrorCode::PARSE_ERROR => Self::ParseError { message, data },
361 rmcp::model::ErrorCode::RESOURCE_NOT_FOUND => Self::ResourceNotFound { message, data },
362 other => Self::Other {
363 code: other.0,
364 message,
365 data,
366 },
367 }
368 }
369
370 pub fn code(&self) -> i32 {
372 match self {
373 Self::UrlElicitation { .. } => rmcp::model::ErrorCode::URL_ELICITATION_REQUIRED.0,
374 Self::InvalidRequest { .. } => rmcp::model::ErrorCode::INVALID_REQUEST.0,
375 Self::MethodNotFound { .. } => rmcp::model::ErrorCode::METHOD_NOT_FOUND.0,
376 Self::InvalidParams { .. } => rmcp::model::ErrorCode::INVALID_PARAMS.0,
377 Self::InternalError { .. } => rmcp::model::ErrorCode::INTERNAL_ERROR.0,
378 Self::ParseError { .. } => rmcp::model::ErrorCode::PARSE_ERROR.0,
379 Self::ResourceNotFound { .. } => rmcp::model::ErrorCode::RESOURCE_NOT_FOUND.0,
380 Self::Other { code, .. } => *code,
381 }
382 }
383}
384
385#[async_trait]
400pub trait McpErrorResponder: Send + Sync + 'static {
401 async fn handle(
404 &self,
405 error: &McpInvocationError,
406 ctx: McpErrorContext<'_>,
407 ) -> ErrorResponderOutcome;
408}
409
410pub struct McpErrorContext<'a> {
413 pub server_id: &'a McpServerId,
415 pub method: &'a McpMethod,
417 pub input: Option<&'a serde_json::Value>,
419}
420
421pub enum ErrorResponderOutcome {
423 SynthesizeResult(CallToolResult),
429 PassThrough,
432}
433
434#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
438pub struct McpServerId(pub String);
439
440impl McpServerId {
441 pub fn new(value: impl Into<String>) -> Self {
443 Self(value.into())
444 }
445}
446
447impl fmt::Display for McpServerId {
448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449 self.0.fmt(f)
450 }
451}
452
453#[derive(Clone, Debug, PartialEq, Eq)]
458pub struct StdioTransportConfig {
459 pub command: String,
461 pub args: Vec<String>,
463 pub env: Vec<(String, String)>,
465 pub cwd: Option<std::path::PathBuf>,
467}
468
469impl StdioTransportConfig {
470 pub fn new(command: impl Into<String>) -> Self {
472 Self {
473 command: command.into(),
474 args: Vec::new(),
475 env: Vec::new(),
476 cwd: None,
477 }
478 }
479
480 pub fn with_arg(mut self, arg: impl Into<String>) -> Self {
482 self.args.push(arg.into());
483 self
484 }
485
486 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
488 self.env.push((key.into(), value.into()));
489 self
490 }
491
492 pub fn with_cwd(mut self, cwd: impl Into<std::path::PathBuf>) -> Self {
494 self.cwd = Some(cwd.into());
495 self
496 }
497}
498
499#[derive(Clone, Default)]
501pub struct StreamableHttpTransportConfig {
502 pub url: String,
504 pub bearer_token: Option<String>,
509 pub headers: Vec<(HeaderName, HeaderValue)>,
513 pub http_client: Option<Arc<dyn McpHttpClient>>,
520}
521
522impl fmt::Debug for StreamableHttpTransportConfig {
523 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524 f.debug_struct("StreamableHttpTransportConfig")
525 .field("url", &self.url)
526 .field(
527 "bearer_token",
528 &self.bearer_token.as_deref().map(|_| "<redacted>"),
529 )
530 .field("headers", &self.headers)
531 .field(
532 "http_client",
533 &self.http_client.as_ref().map(|_| "<custom>"),
534 )
535 .finish()
536 }
537}
538
539impl StreamableHttpTransportConfig {
540 pub fn new(url: impl Into<String>) -> Self {
542 Self {
543 url: url.into(),
544 bearer_token: None,
545 headers: Vec::new(),
546 http_client: None,
547 }
548 }
549
550 pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
555 self.bearer_token = Some(token.into());
556 self
557 }
558
559 pub fn with_http_client(mut self, client: Arc<dyn McpHttpClient>) -> Self {
568 self.http_client = Some(client);
569 self
570 }
571
572 pub fn with_header<N, V>(mut self, name: N, value: V) -> Result<Self, McpError>
577 where
578 N: TryInto<HeaderName>,
579 N::Error: fmt::Display,
580 V: TryInto<HeaderValue>,
581 V::Error: fmt::Display,
582 {
583 let name = name
584 .try_into()
585 .map_err(|error| McpError::Transport(format!("invalid HTTP header name: {error}")))?;
586 let value = value
587 .try_into()
588 .map_err(|error| McpError::Transport(format!("invalid HTTP header value: {error}")))?;
589 self.headers.push((name, value));
590 Ok(self)
591 }
592}
593
594pub type McpSseStream = BoxStream<'static, Result<Sse, SseError>>;
596
597#[async_trait]
616pub trait McpHttpClient: Send + Sync + 'static {
617 async fn post_message(
621 &self,
622 uri: Arc<str>,
623 message: ClientJsonRpcMessage,
624 session_id: Option<Arc<str>>,
625 auth_header: Option<String>,
626 custom_headers: HashMap<HeaderName, HeaderValue>,
627 ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>>;
628
629 async fn delete_session(
631 &self,
632 uri: Arc<str>,
633 session_id: Arc<str>,
634 auth_header: Option<String>,
635 custom_headers: HashMap<HeaderName, HeaderValue>,
636 ) -> Result<(), StreamableHttpError<reqwest::Error>>;
637
638 async fn get_stream(
641 &self,
642 uri: Arc<str>,
643 session_id: Arc<str>,
644 last_event_id: Option<String>,
645 auth_header: Option<String>,
646 custom_headers: HashMap<HeaderName, HeaderValue>,
647 ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>>;
648}
649
650#[derive(Clone)]
653struct DynHttpClient(Arc<dyn McpHttpClient>);
654
655impl RmcpStreamableHttpClient for DynHttpClient {
656 type Error = reqwest::Error;
657
658 async fn post_message(
659 &self,
660 uri: Arc<str>,
661 message: ClientJsonRpcMessage,
662 session_id: Option<Arc<str>>,
663 auth_header: Option<String>,
664 custom_headers: HashMap<HeaderName, HeaderValue>,
665 ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>> {
666 self.0
667 .post_message(uri, message, session_id, auth_header, custom_headers)
668 .await
669 }
670
671 async fn delete_session(
672 &self,
673 uri: Arc<str>,
674 session_id: Arc<str>,
675 auth_header: Option<String>,
676 custom_headers: HashMap<HeaderName, HeaderValue>,
677 ) -> Result<(), StreamableHttpError<reqwest::Error>> {
678 self.0
679 .delete_session(uri, session_id, auth_header, custom_headers)
680 .await
681 }
682
683 async fn get_stream(
684 &self,
685 uri: Arc<str>,
686 session_id: Arc<str>,
687 last_event_id: Option<String>,
688 auth_header: Option<String>,
689 custom_headers: HashMap<HeaderName, HeaderValue>,
690 ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>> {
691 self.0
692 .get_stream(uri, session_id, last_event_id, auth_header, custom_headers)
693 .await
694 }
695}
696
697#[derive(Clone, Debug)]
699pub enum McpTransportBinding {
700 Stdio(StdioTransportConfig),
702 StreamableHttp(StreamableHttpTransportConfig),
704}
705
706#[derive(Clone, Debug)]
708pub struct McpServerConfig {
709 pub id: McpServerId,
711 pub transport: McpTransportBinding,
713 pub metadata: MetadataMap,
715}
716
717impl McpServerConfig {
718 pub fn new(id: impl Into<String>, transport: McpTransportBinding) -> Self {
720 Self {
721 id: McpServerId::new(id),
722 transport,
723 metadata: MetadataMap::new(),
724 }
725 }
726
727 pub fn stdio(id: impl Into<String>, command: impl Into<String>) -> Self {
729 Self::new(
730 id,
731 McpTransportBinding::Stdio(StdioTransportConfig::new(command)),
732 )
733 }
734
735 pub fn streamable_http(id: impl Into<String>, url: impl Into<String>) -> Self {
737 Self::new(
738 id,
739 McpTransportBinding::StreamableHttp(StreamableHttpTransportConfig::new(url)),
740 )
741 }
742
743 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
745 self.metadata = metadata;
746 self
747 }
748}
749
750type CustomNamespace = Arc<dyn Fn(&McpServerId, &str) -> String + Send + Sync>;
751
752#[derive(Clone, Default)]
760pub enum McpToolNamespace {
761 #[default]
763 Default,
764 None,
766 Custom(CustomNamespace),
768}
769
770impl fmt::Debug for McpToolNamespace {
771 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
772 match self {
773 Self::Default => f.write_str("McpToolNamespace::Default"),
774 Self::None => f.write_str("McpToolNamespace::None"),
775 Self::Custom(_) => f.write_str("McpToolNamespace::Custom(<fn>)"),
776 }
777 }
778}
779
780impl McpToolNamespace {
781 pub fn custom(f: impl Fn(&McpServerId, &str) -> String + Send + Sync + 'static) -> Self {
783 Self::Custom(Arc::new(f))
784 }
785
786 pub fn apply(&self, server_id: &McpServerId, tool_name: &str) -> String {
788 match self {
789 Self::Default => format!("mcp_{server_id}_{tool_name}"),
790 Self::None => tool_name.to_string(),
791 Self::Custom(f) => f(server_id, tool_name),
792 }
793 }
794
795 pub fn unapply(&self, server_id: &McpServerId, agentkit_name: &str) -> Option<String> {
799 match self {
800 Self::Default => agentkit_name
801 .strip_prefix(&format!("mcp_{server_id}_"))
802 .map(str::to_string),
803 Self::None => Some(agentkit_name.to_string()),
804 Self::Custom(_) => None,
805 }
806 }
807}
808
809#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
818pub struct McpDiscoverySnapshot {
819 pub server_id: McpServerId,
821 pub tools: Vec<McpTool>,
823 pub resources: Vec<McpResource>,
825 pub prompts: Vec<McpPrompt>,
827 pub metadata: MetadataMap,
829}
830
831#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
833pub enum McpCatalogEvent {
834 ServerConnected { server_id: McpServerId },
836 ServerDisconnected { server_id: McpServerId },
838 ToolsChanged {
840 server_id: McpServerId,
841 added: Vec<String>,
842 removed: Vec<String>,
843 changed: Vec<String>,
844 },
845 ResourcesChanged {
847 server_id: McpServerId,
848 added: Vec<String>,
849 removed: Vec<String>,
850 changed: Vec<String>,
851 },
852 PromptsChanged {
854 server_id: McpServerId,
855 added: Vec<String>,
856 removed: Vec<String>,
857 changed: Vec<String>,
858 },
859 AuthChanged { server_id: McpServerId },
861 RefreshFailed {
863 server_id: McpServerId,
864 message: String,
865 },
866}
867
868#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
870pub struct McpServerCapabilities {
871 #[serde(default, skip_serializing_if = "Option::is_none")]
873 pub tools: Option<ToolsCapability>,
874 #[serde(default, skip_serializing_if = "Option::is_none")]
876 pub resources: Option<ResourcesCapability>,
877 #[serde(default, skip_serializing_if = "Option::is_none")]
879 pub prompts: Option<PromptsCapability>,
880 #[serde(default, skip_serializing_if = "Option::is_none")]
882 pub logging: Option<LoggingCapability>,
883}
884
885impl McpServerCapabilities {
886 pub fn all() -> Self {
889 Self {
890 tools: Some(ToolsCapability::default()),
891 resources: Some(ResourcesCapability::default()),
892 prompts: Some(PromptsCapability::default()),
893 logging: Some(LoggingCapability::default()),
894 }
895 }
896}
897
898#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
900#[serde(rename_all = "camelCase")]
901pub struct ToolsCapability {
902 #[serde(default, skip_serializing_if = "Option::is_none")]
904 pub list_changed: Option<bool>,
905}
906
907#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
909#[serde(rename_all = "camelCase")]
910pub struct ResourcesCapability {
911 #[serde(default, skip_serializing_if = "Option::is_none")]
913 pub subscribe: Option<bool>,
914 #[serde(default, skip_serializing_if = "Option::is_none")]
916 pub list_changed: Option<bool>,
917}
918
919#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
921#[serde(rename_all = "camelCase")]
922pub struct PromptsCapability {
923 #[serde(default, skip_serializing_if = "Option::is_none")]
925 pub list_changed: Option<bool>,
926}
927
928#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
930pub struct LoggingCapability {}
931
932#[allow(clippy::enum_variant_names)]
941#[derive(Clone, Debug)]
942pub enum McpServerNotification {
943 ToolsChanged,
945 ResourcesChanged,
947 PromptsChanged,
949}
950
951#[derive(Clone, Debug)]
959pub enum McpServerEvent {
960 Progress(McpProgressNotificationParam),
963 Logging(McpLoggingMessageNotificationParam),
966 ResourceUpdated(McpResourceUpdatedNotificationParam),
969 ToolListChanged,
971 ResourceListChanged,
973 PromptListChanged,
975 Cancelled(McpCancelledNotificationParam),
978}
979
980#[async_trait]
985pub trait McpSamplingResponder: Send + Sync + 'static {
986 async fn create_message(
989 &self,
990 params: McpCreateMessageRequestParams,
991 ) -> Result<McpCreateMessageResult, McpError>;
992}
993
994#[async_trait]
999pub trait McpElicitationResponder: Send + Sync + 'static {
1000 async fn create_elicitation(
1002 &self,
1003 params: McpCreateElicitationRequestParams,
1004 ) -> Result<McpCreateElicitationResult, McpError>;
1005}
1006
1007#[async_trait]
1012pub trait McpRootsProvider: Send + Sync + 'static {
1013 async fn list_roots(&self) -> Result<Vec<McpRoot>, McpError>;
1015}
1016
1017const DEFAULT_EVENTS_CAPACITY: usize = 128;
1019
1020pub struct McpClientChannels {
1032 pub notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1034 pub events: broadcast::Sender<McpServerEvent>,
1036}
1037
1038#[derive(Clone)]
1048pub struct McpClientHandler {
1049 info: rmcp_model::ClientInfo,
1050 notifications: mpsc::UnboundedSender<McpServerNotification>,
1051 events: broadcast::Sender<McpServerEvent>,
1052 sampling: Option<Arc<dyn McpSamplingResponder>>,
1053 elicitation: Option<Arc<dyn McpElicitationResponder>>,
1054 roots: Option<Arc<dyn McpRootsProvider>>,
1055}
1056
1057impl ClientHandler for McpClientHandler {
1058 fn create_message(
1059 &self,
1060 params: rmcp_model::CreateMessageRequestParams,
1061 _context: rmcp::service::RequestContext<RoleClient>,
1062 ) -> impl Future<Output = Result<rmcp_model::CreateMessageResult, rmcp_model::ErrorData>>
1063 + rmcp::service::MaybeSendFuture
1064 + '_ {
1065 let responder = self.sampling.clone();
1066 async move {
1067 match responder {
1068 Some(responder) => responder.create_message(params).await.map_err(Into::into),
1069 None => Err(rmcp_model::ErrorData::method_not_found::<
1070 rmcp_model::CreateMessageRequestMethod,
1071 >()),
1072 }
1073 }
1074 }
1075
1076 fn list_roots(
1077 &self,
1078 _context: rmcp::service::RequestContext<RoleClient>,
1079 ) -> impl Future<Output = Result<rmcp_model::ListRootsResult, rmcp_model::ErrorData>>
1080 + rmcp::service::MaybeSendFuture
1081 + '_ {
1082 let provider = self.roots.clone();
1083 async move {
1084 match provider {
1085 Some(provider) => provider
1086 .list_roots()
1087 .await
1088 .map(McpListRootsResult::new)
1089 .map_err(Into::into),
1090 None => Ok(McpListRootsResult::default()),
1091 }
1092 }
1093 }
1094
1095 fn create_elicitation(
1096 &self,
1097 params: rmcp_model::CreateElicitationRequestParams,
1098 _context: rmcp::service::RequestContext<RoleClient>,
1099 ) -> impl Future<Output = Result<rmcp_model::CreateElicitationResult, rmcp_model::ErrorData>>
1100 + rmcp::service::MaybeSendFuture
1101 + '_ {
1102 let responder = self.elicitation.clone();
1103 async move {
1104 match responder {
1105 Some(responder) => responder
1106 .create_elicitation(params)
1107 .await
1108 .map_err(Into::into),
1109 None => Ok(McpCreateElicitationResult::new(
1110 McpElicitationAction::Decline,
1111 )),
1112 }
1113 }
1114 }
1115
1116 fn on_progress(
1117 &self,
1118 params: rmcp_model::ProgressNotificationParam,
1119 _context: rmcp::service::NotificationContext<RoleClient>,
1120 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1121 let _ = self.events.send(McpServerEvent::Progress(params));
1122 std::future::ready(())
1123 }
1124
1125 fn on_logging_message(
1126 &self,
1127 params: rmcp_model::LoggingMessageNotificationParam,
1128 _context: rmcp::service::NotificationContext<RoleClient>,
1129 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1130 let _ = self.events.send(McpServerEvent::Logging(params));
1131 std::future::ready(())
1132 }
1133
1134 fn on_resource_updated(
1135 &self,
1136 params: rmcp_model::ResourceUpdatedNotificationParam,
1137 _context: rmcp::service::NotificationContext<RoleClient>,
1138 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1139 let _ = self.events.send(McpServerEvent::ResourceUpdated(params));
1140 std::future::ready(())
1141 }
1142
1143 fn on_cancelled(
1144 &self,
1145 params: rmcp_model::CancelledNotificationParam,
1146 _context: rmcp::service::NotificationContext<RoleClient>,
1147 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1148 let _ = self.events.send(McpServerEvent::Cancelled(params));
1149 std::future::ready(())
1150 }
1151
1152 fn on_tool_list_changed(
1153 &self,
1154 _context: rmcp::service::NotificationContext<RoleClient>,
1155 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1156 let _ = self.notifications.send(McpServerNotification::ToolsChanged);
1157 let _ = self.events.send(McpServerEvent::ToolListChanged);
1158 std::future::ready(())
1159 }
1160
1161 fn on_resource_list_changed(
1162 &self,
1163 _context: rmcp::service::NotificationContext<RoleClient>,
1164 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1165 let _ = self
1166 .notifications
1167 .send(McpServerNotification::ResourcesChanged);
1168 let _ = self.events.send(McpServerEvent::ResourceListChanged);
1169 std::future::ready(())
1170 }
1171
1172 fn on_prompt_list_changed(
1173 &self,
1174 _context: rmcp::service::NotificationContext<RoleClient>,
1175 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
1176 let _ = self
1177 .notifications
1178 .send(McpServerNotification::PromptsChanged);
1179 let _ = self.events.send(McpServerEvent::PromptListChanged);
1180 std::future::ready(())
1181 }
1182
1183 fn get_info(&self) -> rmcp_model::ClientInfo {
1184 self.info.clone()
1185 }
1186}
1187
1188impl From<McpError> for rmcp_model::ErrorData {
1189 fn from(error: McpError) -> Self {
1190 rmcp_model::ErrorData::internal_error(error.to_string(), None)
1191 }
1192}
1193
1194type RmcpClientService = RunningService<RoleClient, McpClientHandler>;
1195
1196#[derive(Clone, Default)]
1205pub struct McpHandlerConfig {
1206 pub sampling: Option<Arc<dyn McpSamplingResponder>>,
1208 pub elicitation: Option<Arc<dyn McpElicitationResponder>>,
1210 pub roots: Option<Arc<dyn McpRootsProvider>>,
1212 pub auth: Option<Arc<dyn McpAuthResponder>>,
1217 pub error_responder: Option<Arc<dyn McpErrorResponder>>,
1224 pub events_capacity: Option<usize>,
1227}
1228
1229impl McpHandlerConfig {
1230 pub fn new() -> Self {
1232 Self::default()
1233 }
1234
1235 pub fn with_sampling_responder(mut self, responder: Arc<dyn McpSamplingResponder>) -> Self {
1237 self.sampling = Some(responder);
1238 self
1239 }
1240
1241 pub fn with_elicitation_responder(
1243 mut self,
1244 responder: Arc<dyn McpElicitationResponder>,
1245 ) -> Self {
1246 self.elicitation = Some(responder);
1247 self
1248 }
1249
1250 pub fn with_roots_provider(mut self, provider: Arc<dyn McpRootsProvider>) -> Self {
1252 self.roots = Some(provider);
1253 self
1254 }
1255
1256 pub fn with_auth_responder(mut self, responder: Arc<dyn McpAuthResponder>) -> Self {
1258 self.auth = Some(responder);
1259 self
1260 }
1261
1262 pub fn with_error_responder(mut self, responder: Arc<dyn McpErrorResponder>) -> Self {
1264 self.error_responder = Some(responder);
1265 self
1266 }
1267
1268 pub fn with_events_capacity(mut self, capacity: usize) -> Self {
1270 self.events_capacity = Some(capacity);
1271 self
1272 }
1273
1274 pub fn build(&self) -> (McpClientHandler, McpClientChannels) {
1278 self.build_inner(None)
1279 }
1280
1281 pub fn build_with(
1286 &self,
1287 events: broadcast::Sender<McpServerEvent>,
1288 ) -> (McpClientHandler, McpClientChannels) {
1289 self.build_inner(Some(events))
1290 }
1291
1292 fn build_inner(
1293 &self,
1294 events: Option<broadcast::Sender<McpServerEvent>>,
1295 ) -> (McpClientHandler, McpClientChannels) {
1296 let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
1297 let events_tx = events.unwrap_or_else(|| {
1298 let capacity = self.events_capacity.unwrap_or(DEFAULT_EVENTS_CAPACITY);
1299 let (tx, _) = broadcast::channel(capacity);
1300 tx
1301 });
1302
1303 let mut capabilities = rmcp_model::ClientCapabilities::default();
1304 if self.sampling.is_some() {
1305 capabilities.sampling = Some(McpSamplingCapability::default());
1306 }
1307 if self.elicitation.is_some() {
1308 capabilities.elicitation = Some(McpElicitationCapability {
1309 form: Some(McpFormElicitationCapability::default()),
1310 url: None,
1311 });
1312 }
1313 if self.roots.is_some() {
1314 capabilities.roots = Some(McpRootsCapabilities::default());
1315 }
1316
1317 let handler = McpClientHandler {
1318 info: rmcp_model::ClientInfo::new(
1319 capabilities,
1320 rmcp_model::Implementation::new("agentkit-mcp", env!("CARGO_PKG_VERSION"))
1321 .with_title("agentkit MCP client"),
1322 )
1323 .with_protocol_version(rmcp_model::ProtocolVersion::LATEST),
1324 notifications: notifications_tx,
1325 events: events_tx.clone(),
1326 sampling: self.sampling.clone(),
1327 elicitation: self.elicitation.clone(),
1328 roots: self.roots.clone(),
1329 };
1330
1331 (
1332 handler,
1333 McpClientChannels {
1334 notifications: notifications_rx,
1335 events: events_tx,
1336 },
1337 )
1338 }
1339}
1340
1341pub struct McpConnection {
1344 server_id: McpServerId,
1345 config: Option<McpServerConfig>,
1346 inner: Mutex<RmcpClientService>,
1347 peer: RwLock<Peer<RoleClient>>,
1348 auth: Mutex<Option<MetadataMap>>,
1349 notifications: Mutex<mpsc::UnboundedReceiver<McpServerNotification>>,
1350 events: broadcast::Sender<McpServerEvent>,
1351 handler_config: McpHandlerConfig,
1352 capabilities: McpServerCapabilities,
1353}
1354
1355#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1357pub enum McpOperationResult {
1358 Connected(McpDiscoverySnapshot),
1360 Tool(CallToolResult),
1362 Resource(ReadResourceResult),
1364 Prompt(GetPromptResult),
1366}
1367
1368impl McpConnection {
1369 pub async fn connect(config: &McpServerConfig) -> Result<Self, McpError> {
1374 Self::connect_with_auth(config, None, McpHandlerConfig::default()).await
1375 }
1376
1377 pub async fn connect_with_handler(
1379 config: &McpServerConfig,
1380 handler_config: McpHandlerConfig,
1381 ) -> Result<Self, McpError> {
1382 Self::connect_with_auth(config, None, handler_config).await
1383 }
1384
1385 async fn connect_with_auth(
1386 config: &McpServerConfig,
1387 auth: Option<&MetadataMap>,
1388 handler_config: McpHandlerConfig,
1389 ) -> Result<Self, McpError> {
1390 let (handler, channels) = handler_config.build();
1391 let McpClientChannels {
1392 notifications: notification_rx,
1393 events: events_tx,
1394 } = channels;
1395 let (service, capabilities) = match &config.transport {
1396 McpTransportBinding::Stdio(binding) => {
1397 connect_rmcp_stdio(config, binding, handler).await?
1398 }
1399 McpTransportBinding::StreamableHttp(binding) => {
1400 connect_rmcp_streamable_http(config, binding, auth, handler).await?
1401 }
1402 };
1403
1404 let peer = service.peer().clone();
1405 Ok(Self {
1406 server_id: config.id.clone(),
1407 config: Some(config.clone()),
1408 inner: Mutex::new(service),
1409 peer: RwLock::new(peer),
1410 auth: Mutex::new(auth.cloned()),
1411 notifications: Mutex::new(notification_rx),
1412 events: events_tx,
1413 handler_config,
1414 capabilities,
1415 })
1416 }
1417
1418 pub fn from_running_service(
1434 server_id: impl Into<McpServerId>,
1435 service: RmcpClientService,
1436 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1437 ) -> Self {
1438 let (events_tx, _) = broadcast::channel(DEFAULT_EVENTS_CAPACITY);
1439 Self::from_running_service_with_events(server_id, service, notifications, events_tx)
1440 }
1441
1442 pub fn from_running_service_with_events(
1449 server_id: impl Into<McpServerId>,
1450 service: RmcpClientService,
1451 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1452 events: broadcast::Sender<McpServerEvent>,
1453 ) -> Self {
1454 Self::from_running_service_with_events_and_handler_config(
1455 server_id,
1456 service,
1457 notifications,
1458 events,
1459 McpHandlerConfig::default(),
1460 )
1461 }
1462
1463 pub fn from_running_service_with_events_and_handler_config(
1470 server_id: impl Into<McpServerId>,
1471 service: RmcpClientService,
1472 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1473 events: broadcast::Sender<McpServerEvent>,
1474 handler_config: McpHandlerConfig,
1475 ) -> Self {
1476 let capabilities = service
1477 .peer_info()
1478 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1479 .unwrap_or_default();
1480 let peer = service.peer().clone();
1481 Self {
1482 server_id: server_id.into(),
1483 config: None,
1484 inner: Mutex::new(service),
1485 peer: RwLock::new(peer),
1486 auth: Mutex::new(None),
1487 notifications: Mutex::new(notifications),
1488 events,
1489 handler_config,
1490 capabilities,
1491 }
1492 }
1493
1494 async fn reconnect_inner(&self, auth: Option<&MetadataMap>) -> Result<(), McpError> {
1495 let Some(config) = self.config.clone() else {
1496 return Ok(());
1497 };
1498 let (handler, channels) = self.handler_config.build_with(self.events.clone());
1499 let McpClientChannels {
1500 notifications: notification_rx,
1501 ..
1502 } = channels;
1503 let (service, _capabilities) = match &config.transport {
1504 McpTransportBinding::Stdio(binding) => {
1505 connect_rmcp_stdio(&config, binding, handler).await?
1506 }
1507 McpTransportBinding::StreamableHttp(binding) => {
1508 connect_rmcp_streamable_http(&config, binding, auth, handler).await?
1509 }
1510 };
1511 let new_peer = service.peer().clone();
1512 *self.notifications.lock().await = notification_rx;
1513 *self.inner.lock().await = service;
1514 *self.peer.write().expect("MCP peer lock poisoned") = new_peer;
1515 Ok(())
1516 }
1517
1518 fn peer(&self) -> Peer<RoleClient> {
1519 self.peer.read().expect("MCP peer lock poisoned").clone()
1520 }
1521
1522 pub fn server_id(&self) -> &McpServerId {
1524 &self.server_id
1525 }
1526
1527 pub fn capabilities(&self) -> &McpServerCapabilities {
1529 &self.capabilities
1530 }
1531
1532 pub fn handler_config(&self) -> &McpHandlerConfig {
1536 &self.handler_config
1537 }
1538
1539 pub fn subscribe_events(&self) -> broadcast::Receiver<McpServerEvent> {
1548 self.events.subscribe()
1549 }
1550
1551 pub async fn subscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1556 let uri = uri.into();
1557 self.peer()
1558 .subscribe(rmcp_model::SubscribeRequestParams::new(uri.clone()))
1559 .await
1560 .map_err(|error| {
1561 rmcp_operation_error(
1562 &self.server_id,
1563 McpMethod::ResourcesSubscribe { uri },
1564 error,
1565 )
1566 })
1567 }
1568
1569 pub async fn unsubscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1571 let uri = uri.into();
1572 self.peer()
1573 .unsubscribe(rmcp_model::UnsubscribeRequestParams::new(uri.clone()))
1574 .await
1575 .map_err(|error| {
1576 rmcp_operation_error(
1577 &self.server_id,
1578 McpMethod::ResourcesUnsubscribe { uri },
1579 error,
1580 )
1581 })
1582 }
1583
1584 pub async fn set_logging_level(&self, level: McpLoggingLevel) -> Result<(), McpError> {
1587 self.peer()
1588 .set_level(rmcp_model::SetLevelRequestParams::new(level))
1589 .await
1590 .map_err(|error| {
1591 rmcp_operation_error(
1592 &self.server_id,
1593 McpMethod::LoggingSetLevel {
1594 level: format!("{level:?}"),
1595 },
1596 error,
1597 )
1598 })
1599 }
1600
1601 pub async fn notify_cancelled(
1604 &self,
1605 params: McpCancelledNotificationParam,
1606 ) -> Result<(), McpError> {
1607 self.peer()
1608 .notify_cancelled(params)
1609 .await
1610 .map_err(rmcp_service_error)
1611 }
1612
1613 pub async fn notify_roots_list_changed(&self) -> Result<(), McpError> {
1616 self.peer()
1617 .notify_roots_list_changed()
1618 .await
1619 .map_err(rmcp_service_error)
1620 }
1621
1622 pub async fn close(&self) -> Result<(), McpError> {
1627 let mut inner = self.inner.lock().await;
1628 inner
1629 .close()
1630 .await
1631 .map(|_| ())
1632 .map_err(|error| McpError::Transport(format!("rmcp service close failed: {error}")))
1633 }
1634
1635 pub async fn resolve_auth(&self, resolution: AuthResolution) -> Result<(), McpError> {
1638 let mut auth_slot = self.auth.lock().await;
1639 match resolution {
1640 AuthResolution::Provided { credentials, .. } => {
1641 *auth_slot = Some(credentials);
1642 }
1643 AuthResolution::Cancelled { .. } => {
1644 *auth_slot = None;
1645 }
1646 }
1647 let snapshot = auth_slot.clone();
1648 drop(auth_slot);
1649 if self.config.is_some() {
1653 self.reconnect_inner(snapshot.as_ref()).await?;
1654 }
1655 Ok(())
1656 }
1657
1658 pub async fn discover(&self) -> Result<McpDiscoverySnapshot, McpError> {
1660 let tools = async {
1661 match self.capabilities.tools {
1662 Some(_) => self.list_tools().await,
1663 None => Ok(Vec::new()),
1664 }
1665 };
1666 let resources = async {
1667 match self.capabilities.resources {
1668 Some(_) => self.list_resources().await,
1669 None => Ok(Vec::new()),
1670 }
1671 };
1672 let prompts = async {
1673 match self.capabilities.prompts {
1674 Some(_) => self.list_prompts().await,
1675 None => Ok(Vec::new()),
1676 }
1677 };
1678 let (tools, resources, prompts) = tokio::try_join!(tools, resources, prompts)?;
1679 Ok(McpDiscoverySnapshot {
1680 server_id: self.server_id.clone(),
1681 tools,
1682 resources,
1683 prompts,
1684 metadata: MetadataMap::new(),
1685 })
1686 }
1687
1688 async fn drain_notifications(&self) -> Vec<McpServerNotification> {
1689 let mut notifications = self.notifications.lock().await;
1690 let mut drained = Vec::new();
1691 while let Ok(notification) = notifications.try_recv() {
1692 drained.push(notification);
1693 }
1694 drained
1695 }
1696
1697 pub async fn list_tools(&self) -> Result<Vec<McpTool>, McpError> {
1699 self.peer()
1700 .list_all_tools()
1701 .await
1702 .map_err(rmcp_service_error)
1703 }
1704
1705 pub async fn list_resources(&self) -> Result<Vec<McpResource>, McpError> {
1707 self.peer()
1708 .list_all_resources()
1709 .await
1710 .map_err(rmcp_service_error)
1711 }
1712
1713 pub async fn list_prompts(&self) -> Result<Vec<McpPrompt>, McpError> {
1715 self.peer()
1716 .list_all_prompts()
1717 .await
1718 .map_err(rmcp_service_error)
1719 }
1720
1721 pub async fn call_tool(
1728 &self,
1729 name: &str,
1730 arguments: Value,
1731 ) -> Result<CallToolResult, McpError> {
1732 let arguments_for_auth = arguments.clone();
1733 let mut params = rmcp_model::CallToolRequestParams::new(name.to_string());
1734 if !arguments.is_null() {
1735 params =
1736 params.with_arguments(value_to_json_object(arguments, "tools/call arguments")?);
1737 }
1738 let name_owned = name.to_string();
1739 self.peer().call_tool(params).await.map_err(|error| {
1740 rmcp_operation_error(
1741 &self.server_id,
1742 McpMethod::ToolsCall {
1743 name: name_owned,
1744 arguments: arguments_for_auth,
1745 },
1746 error,
1747 )
1748 })
1749 }
1750
1751 pub async fn read_resource(&self, uri: &str) -> Result<ReadResourceResult, McpError> {
1758 let uri_owned = uri.to_string();
1759 self.peer()
1760 .read_resource(rmcp_model::ReadResourceRequestParams::new(uri))
1761 .await
1762 .map_err(|error| {
1763 rmcp_operation_error(
1764 &self.server_id,
1765 McpMethod::ResourcesRead { uri: uri_owned },
1766 error,
1767 )
1768 })
1769 }
1770
1771 pub async fn get_prompt(
1779 &self,
1780 name: &str,
1781 arguments: Value,
1782 ) -> Result<GetPromptResult, McpError> {
1783 let arguments_for_auth = arguments.clone();
1784 let name_owned = name.to_string();
1785 let mut params = rmcp_model::GetPromptRequestParams::new(name);
1786 if !arguments.is_null() {
1787 params =
1788 params.with_arguments(value_to_json_object(arguments, "prompts/get arguments")?);
1789 }
1790 self.peer().get_prompt(params).await.map_err(|error| {
1791 rmcp_operation_error(
1792 &self.server_id,
1793 McpMethod::PromptsGet {
1794 name: name_owned,
1795 arguments: arguments_for_auth,
1796 },
1797 error,
1798 )
1799 })
1800 }
1801}
1802
1803async fn connect_rmcp_stdio(
1804 config: &McpServerConfig,
1805 binding: &StdioTransportConfig,
1806 handler: McpClientHandler,
1807) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1808 let transport = TokioChildProcess::new(
1809 tokio::process::Command::new(&binding.command).configure(|command| {
1810 command.args(&binding.args);
1811 if let Some(cwd) = &binding.cwd {
1812 command.current_dir(cwd);
1813 }
1814 for (key, value) in &binding.env {
1815 command.env(key, value);
1816 }
1817 }),
1818 )
1819 .map_err(McpError::Io)?;
1820
1821 let service = handler
1822 .serve(transport)
1823 .await
1824 .map_err(|error| rmcp_initialize_error(config, error))?;
1825 let capabilities = service
1826 .peer_info()
1827 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1828 .unwrap_or_default();
1829
1830 Ok((service, capabilities))
1831}
1832
1833async fn connect_rmcp_streamable_http(
1834 config: &McpServerConfig,
1835 binding: &StreamableHttpTransportConfig,
1836 auth: Option<&MetadataMap>,
1837 handler: McpClientHandler,
1838) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1839 let auth_header = auth
1840 .and_then(bearer_token_from_metadata)
1841 .or_else(|| binding.bearer_token.clone());
1842 let mut rmcp_config = RmcpStreamableHttpClientTransportConfig::with_uri(binding.url.clone());
1843 if let Some(auth_header) = auth_header {
1844 rmcp_config = rmcp_config.auth_header(auth_header);
1845 }
1846 rmcp_config = rmcp_config.custom_headers(binding.headers.iter().cloned().collect());
1847
1848 let result = match binding.http_client.as_ref() {
1849 Some(client) => {
1850 let transport = StreamableHttpClientTransport::with_client(
1851 DynHttpClient(client.clone()),
1852 rmcp_config,
1853 );
1854 handler.serve(transport).await
1855 }
1856 None => {
1857 let transport = StreamableHttpClientTransport::from_config(rmcp_config);
1858 handler.serve(transport).await
1859 }
1860 };
1861 let service = result.map_err(|error| rmcp_initialize_error(config, error))?;
1862 let capabilities = service
1863 .peer_info()
1864 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1865 .unwrap_or_default();
1866
1867 Ok((service, capabilities))
1868}
1869
1870pub struct McpResourceHandle {
1872 connection: Arc<McpConnection>,
1873 descriptor: ResourceDescriptor,
1874}
1875
1876#[async_trait]
1877impl ResourceProvider for McpResourceHandle {
1878 async fn list_resources(&self) -> Result<Vec<ResourceDescriptor>, CapabilityError> {
1879 Ok(vec![self.descriptor.clone()])
1880 }
1881
1882 async fn read_resource(
1883 &self,
1884 id: &ResourceId,
1885 _ctx: &mut CapabilityContext<'_>,
1886 ) -> Result<ResourceContents, CapabilityError> {
1887 let result = self
1888 .connection
1889 .read_resource(&id.0)
1890 .await
1891 .map_err(|error| match error {
1892 McpError::AuthRequired(request) => {
1893 CapabilityError::Unavailable(format!("auth required: {:?}", request))
1894 }
1895 other => CapabilityError::ExecutionFailed(other.to_string()),
1896 })?;
1897 read_resource_result_to_capabilities(result)
1898 .map_err(|error| CapabilityError::ExecutionFailed(error.to_string()))
1899 }
1900}
1901
1902pub struct McpPromptHandle {
1904 connection: Arc<McpConnection>,
1905 descriptor: PromptDescriptor,
1906}
1907
1908#[async_trait]
1909impl PromptProvider for McpPromptHandle {
1910 async fn list_prompts(&self) -> Result<Vec<PromptDescriptor>, CapabilityError> {
1911 Ok(vec![self.descriptor.clone()])
1912 }
1913
1914 async fn get_prompt(
1915 &self,
1916 id: &PromptId,
1917 args: Value,
1918 _ctx: &mut CapabilityContext<'_>,
1919 ) -> Result<PromptContents, CapabilityError> {
1920 let result =
1921 self.connection
1922 .get_prompt(&id.0, args)
1923 .await
1924 .map_err(|error| match error {
1925 McpError::AuthRequired(request) => {
1926 CapabilityError::Unavailable(format!("auth required: {:?}", request))
1927 }
1928 other => CapabilityError::ExecutionFailed(other.to_string()),
1929 })?;
1930 Ok(get_prompt_result_to_capabilities(result))
1931 }
1932}
1933
1934pub struct McpCapabilityProvider {
1942 invocables: Vec<Arc<dyn Invocable>>,
1943 resources: Vec<Arc<dyn ResourceProvider>>,
1944 prompts: Vec<Arc<dyn PromptProvider>>,
1945}
1946
1947impl McpCapabilityProvider {
1948 pub fn from_snapshot(connection: Arc<McpConnection>, snapshot: &McpDiscoverySnapshot) -> Self {
1951 Self::from_snapshot_with_namespace(connection, snapshot, &McpToolNamespace::Default)
1952 }
1953
1954 pub fn from_snapshot_with_namespace(
1956 connection: Arc<McpConnection>,
1957 snapshot: &McpDiscoverySnapshot,
1958 namespace: &McpToolNamespace,
1959 ) -> Self {
1960 let server_id = connection.server_id().clone();
1961 let registry =
1962 snapshot
1963 .tools
1964 .iter()
1965 .cloned()
1966 .fold(ToolRegistry::new(), |registry, tool| {
1967 registry.with(McpToolAdapter::with_namespace(
1968 &server_id,
1969 connection.clone(),
1970 tool,
1971 namespace,
1972 ))
1973 });
1974 let permissions: Arc<dyn PermissionChecker> = Arc::new(AllowAllPermissions);
1975 let resources_arc: Arc<dyn agentkit_tools_core::ToolResources> = Arc::new(());
1976 let invocables =
1977 ToolCapabilityProvider::from_registry(®istry, permissions, resources_arc)
1978 .invocables();
1979
1980 let resources = snapshot
1981 .resources
1982 .iter()
1983 .cloned()
1984 .map(|resource| {
1985 Arc::new(McpResourceHandle {
1986 connection: connection.clone(),
1987 descriptor: resource_descriptor_from_rmcp(resource),
1988 }) as Arc<dyn ResourceProvider>
1989 })
1990 .collect();
1991
1992 let prompts = snapshot
1993 .prompts
1994 .iter()
1995 .cloned()
1996 .map(|prompt| {
1997 Arc::new(McpPromptHandle {
1998 connection: connection.clone(),
1999 descriptor: prompt_descriptor_from_rmcp(prompt),
2000 }) as Arc<dyn PromptProvider>
2001 })
2002 .collect();
2003
2004 Self {
2005 invocables,
2006 resources,
2007 prompts,
2008 }
2009 }
2010
2011 pub fn merge<I>(providers: I) -> Self
2013 where
2014 I: IntoIterator<Item = Self>,
2015 {
2016 let mut invocables = Vec::new();
2017 let mut resources = Vec::new();
2018 let mut prompts = Vec::new();
2019
2020 for provider in providers {
2021 invocables.extend(provider.invocables);
2022 resources.extend(provider.resources);
2023 prompts.extend(provider.prompts);
2024 }
2025
2026 Self {
2027 invocables,
2028 resources,
2029 prompts,
2030 }
2031 }
2032
2033 pub async fn connect(
2035 config: &McpServerConfig,
2036 ) -> Result<(Arc<McpConnection>, Self, McpDiscoverySnapshot), McpError> {
2037 let connection = Arc::new(McpConnection::connect(config).await?);
2038 let snapshot = connection.discover().await?;
2039 let provider = Self::from_snapshot(connection.clone(), &snapshot);
2040
2041 Ok((connection, provider, snapshot))
2042 }
2043}
2044
2045impl CapabilityProvider for McpCapabilityProvider {
2046 fn invocables(&self) -> Vec<Arc<dyn Invocable>> {
2047 self.invocables.clone()
2048 }
2049
2050 fn resources(&self) -> Vec<Arc<dyn ResourceProvider>> {
2051 self.resources.clone()
2052 }
2053
2054 fn prompts(&self) -> Vec<Arc<dyn PromptProvider>> {
2055 self.prompts.clone()
2056 }
2057}
2058
2059#[derive(Clone)]
2061pub struct McpServerHandle {
2062 config: McpServerConfig,
2063 connection: Arc<McpConnection>,
2064 snapshot: McpDiscoverySnapshot,
2065 namespace: McpToolNamespace,
2066}
2067
2068impl McpServerHandle {
2069 pub fn config(&self) -> &McpServerConfig {
2071 &self.config
2072 }
2073
2074 pub fn server_id(&self) -> &McpServerId {
2076 self.connection.server_id()
2077 }
2078
2079 pub fn connection(&self) -> Arc<McpConnection> {
2081 self.connection.clone()
2082 }
2083
2084 pub fn snapshot(&self) -> &McpDiscoverySnapshot {
2086 &self.snapshot
2087 }
2088
2089 pub fn namespace(&self) -> &McpToolNamespace {
2091 &self.namespace
2092 }
2093
2094 pub fn tool_registry(&self) -> ToolRegistry {
2096 self.snapshot
2097 .tools
2098 .iter()
2099 .cloned()
2100 .fold(ToolRegistry::new(), |registry, tool| {
2101 registry.with(McpToolAdapter::with_namespace(
2102 self.server_id(),
2103 self.connection.clone(),
2104 tool,
2105 &self.namespace,
2106 ))
2107 })
2108 }
2109
2110 pub fn capability_provider(&self) -> McpCapabilityProvider {
2112 McpCapabilityProvider::from_snapshot_with_namespace(
2113 self.connection.clone(),
2114 &self.snapshot,
2115 &self.namespace,
2116 )
2117 }
2118}
2119
2120pub struct McpServerManager {
2122 configs: BTreeMap<McpServerId, McpServerConfig>,
2123 connections: BTreeMap<McpServerId, McpServerHandle>,
2124 auth: BTreeMap<McpServerId, MetadataMap>,
2125 catalog_tx: broadcast::Sender<McpCatalogEvent>,
2126 namespace: McpToolNamespace,
2127 handler_config: McpHandlerConfig,
2128 catalog_writer: CatalogWriter,
2129 server_tools: BTreeMap<McpServerId, BTreeSet<ToolName>>,
2134}
2135
2136impl Default for McpServerManager {
2137 fn default() -> Self {
2138 let (catalog_tx, _) = broadcast::channel(128);
2139 let (catalog_writer, _) = dynamic_catalog("mcp");
2140 Self {
2141 configs: BTreeMap::new(),
2142 connections: BTreeMap::new(),
2143 auth: BTreeMap::new(),
2144 catalog_tx,
2145 namespace: McpToolNamespace::Default,
2146 handler_config: McpHandlerConfig::default(),
2147 catalog_writer,
2148 server_tools: BTreeMap::new(),
2149 }
2150 }
2151}
2152
2153impl McpServerManager {
2154 pub fn new() -> Self {
2156 Self::default()
2157 }
2158
2159 pub fn with_namespace(mut self, namespace: McpToolNamespace) -> Self {
2161 self.namespace = namespace;
2162 self
2163 }
2164
2165 pub fn set_namespace(&mut self, namespace: McpToolNamespace) -> &mut Self {
2167 self.namespace = namespace;
2168 self
2169 }
2170
2171 pub fn namespace(&self) -> &McpToolNamespace {
2173 &self.namespace
2174 }
2175
2176 pub fn with_handler_config(mut self, handler_config: McpHandlerConfig) -> Self {
2179 self.handler_config = handler_config;
2180 self
2181 }
2182
2183 pub fn set_handler_config(&mut self, handler_config: McpHandlerConfig) -> &mut Self {
2185 self.handler_config = handler_config;
2186 self
2187 }
2188
2189 pub fn handler_config(&self) -> &McpHandlerConfig {
2191 &self.handler_config
2192 }
2193
2194 pub fn with_server(mut self, config: McpServerConfig) -> Self {
2196 self.register_server(config);
2197 self
2198 }
2199
2200 pub fn register_server(&mut self, config: McpServerConfig) -> &mut Self {
2202 self.configs.insert(config.id.clone(), config);
2203 self
2204 }
2205
2206 pub fn connected_server(&self, server_id: &McpServerId) -> Option<&McpServerHandle> {
2208 self.connections.get(server_id)
2209 }
2210
2211 pub fn connected_servers(&self) -> Vec<&McpServerHandle> {
2213 self.connections.values().collect()
2214 }
2215
2216 pub fn subscribe_catalog_events(&self) -> broadcast::Receiver<McpCatalogEvent> {
2218 self.catalog_tx.subscribe()
2219 }
2220
2221 fn emit_catalog_event(&self, event: McpCatalogEvent) {
2222 let _ = self.catalog_tx.send(event);
2223 }
2224
2225 pub async fn connect_server(
2227 &mut self,
2228 server_id: &McpServerId,
2229 ) -> Result<McpServerHandle, McpError> {
2230 let config = self
2231 .configs
2232 .get(server_id)
2233 .cloned()
2234 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2235 let connection = Arc::new(
2236 McpConnection::connect_with_auth(
2237 &config,
2238 self.auth.get(server_id),
2239 self.handler_config.clone(),
2240 )
2241 .await?,
2242 );
2243 let snapshot = connection.discover().await?;
2244 let handle = McpServerHandle {
2245 config,
2246 connection,
2247 snapshot,
2248 namespace: self.namespace.clone(),
2249 };
2250 self.connections.insert(server_id.clone(), handle.clone());
2251 self.register_server_tools(server_id, &handle.snapshot);
2252 self.emit_catalog_event(McpCatalogEvent::ServerConnected {
2253 server_id: server_id.clone(),
2254 });
2255 Ok(handle)
2256 }
2257
2258 pub async fn connect_all(&mut self) -> Result<Vec<McpServerHandle>, McpError> {
2260 let plans: Vec<(McpServerId, McpServerConfig, Option<MetadataMap>)> = self
2261 .configs
2262 .iter()
2263 .map(|(id, cfg)| (id.clone(), cfg.clone(), self.auth.get(id).cloned()))
2264 .collect();
2265 let handler_config = self.handler_config.clone();
2266 let namespace = self.namespace.clone();
2267
2268 let futures = plans.into_iter().map(|(server_id, config, auth)| {
2269 let handler_config = handler_config.clone();
2270 let namespace = namespace.clone();
2271 async move {
2272 let connection = Arc::new(
2273 McpConnection::connect_with_auth(&config, auth.as_ref(), handler_config)
2274 .await?,
2275 );
2276 let snapshot = connection.discover().await?;
2277 Ok::<(McpServerId, McpServerHandle), McpError>((
2278 server_id,
2279 McpServerHandle {
2280 config,
2281 connection,
2282 snapshot,
2283 namespace,
2284 },
2285 ))
2286 }
2287 });
2288
2289 let results = try_join_all(futures).await?;
2290 let mut handles = Vec::with_capacity(results.len());
2291 let mut connected: Vec<(McpServerId, McpDiscoverySnapshot)> =
2292 Vec::with_capacity(results.len());
2293 for (server_id, handle) in results {
2294 connected.push((server_id.clone(), handle.snapshot.clone()));
2295 self.connections.insert(server_id, handle.clone());
2296 handles.push(handle);
2297 }
2298 for (server_id, snapshot) in &connected {
2299 self.register_server_tools(server_id, snapshot);
2300 }
2301 for (server_id, _) in connected {
2302 self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2303 }
2304 Ok(handles)
2305 }
2306
2307 pub async fn refresh_server(
2309 &mut self,
2310 server_id: &McpServerId,
2311 ) -> Result<McpDiscoverySnapshot, McpError> {
2312 let handle = self
2313 .connections
2314 .get_mut(server_id)
2315 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2316 let previous = handle.snapshot.clone();
2317 let snapshot = match handle.connection.discover().await {
2318 Ok(snapshot) => snapshot,
2319 Err(error) => {
2320 self.emit_catalog_event(McpCatalogEvent::RefreshFailed {
2321 server_id: server_id.clone(),
2322 message: error.to_string(),
2323 });
2324 return Err(error);
2325 }
2326 };
2327 handle.snapshot = snapshot.clone();
2328 let events = diff_discovery_snapshots(server_id, &previous, &snapshot);
2329 if !events.is_empty() {
2330 self.apply_catalog_events(server_id, &snapshot, &events);
2331 for event in events {
2332 self.emit_catalog_event(event);
2333 }
2334 }
2335 Ok(snapshot)
2336 }
2337
2338 pub async fn refresh_changed_catalogs(&mut self) -> Result<Vec<McpCatalogEvent>, McpError> {
2340 let server_ids = self.connections.keys().cloned().collect::<Vec<_>>();
2341 let mut emitted = Vec::new();
2342
2343 for server_id in server_ids {
2344 let Some(connection) = self
2345 .connections
2346 .get(&server_id)
2347 .map(McpServerHandle::connection)
2348 else {
2349 continue;
2350 };
2351 let notifications = connection.drain_notifications().await;
2352 if notifications.is_empty() {
2353 continue;
2354 }
2355
2356 let handle = self
2357 .connections
2358 .get_mut(&server_id)
2359 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2360 let previous = handle.snapshot.clone();
2361 let snapshot = match handle.connection.discover().await {
2362 Ok(snapshot) => snapshot,
2363 Err(error) => {
2364 let event = McpCatalogEvent::RefreshFailed {
2365 server_id: server_id.clone(),
2366 message: error.to_string(),
2367 };
2368 self.emit_catalog_event(event.clone());
2369 emitted.push(event);
2370 return Err(error);
2371 }
2372 };
2373 handle.snapshot = snapshot.clone();
2374 let events = diff_discovery_snapshots(&server_id, &previous, &snapshot);
2375 if !events.is_empty() {
2376 self.apply_catalog_events(&server_id, &snapshot, &events);
2377 for event in events {
2378 self.emit_catalog_event(event.clone());
2379 emitted.push(event);
2380 }
2381 }
2382 }
2383
2384 Ok(emitted)
2385 }
2386
2387 pub async fn disconnect_server(&mut self, server_id: &McpServerId) -> Result<(), McpError> {
2389 let Some(handle) = self.connections.remove(server_id) else {
2390 return Err(McpError::UnknownServer(server_id.to_string()));
2391 };
2392 handle.connection.close().await?;
2393 self.unregister_server_tools(server_id);
2394 self.emit_catalog_event(McpCatalogEvent::ServerDisconnected {
2395 server_id: server_id.clone(),
2396 });
2397 Ok(())
2398 }
2399
2400 pub async fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), McpError> {
2402 let server_id = resolution
2403 .request()
2404 .server_id()
2405 .ok_or_else(|| McpError::AuthResolution("auth resolution missing server id".into()))?;
2406 let server_id = McpServerId::new(server_id);
2407 match &resolution {
2408 AuthResolution::Provided { credentials, .. } => {
2409 self.auth.insert(server_id.clone(), credentials.clone());
2410 }
2411 AuthResolution::Cancelled { .. } => {
2412 self.auth.remove(&server_id);
2413 }
2414 }
2415
2416 if let Some(handle) = self.connections.get(&server_id) {
2417 handle.connection.resolve_auth(resolution).await?;
2418 } else if !self.configs.contains_key(&server_id) {
2419 return Err(McpError::UnknownServer(server_id.to_string()));
2420 }
2421 self.emit_catalog_event(McpCatalogEvent::AuthChanged { server_id });
2422 Ok(())
2423 }
2424
2425 pub fn tool_registry(&self) -> ToolRegistry {
2430 self.connections
2431 .values()
2432 .fold(ToolRegistry::new(), |mut registry, handle| {
2433 for tool in handle.snapshot.tools.iter().cloned() {
2434 registry.register(McpToolAdapter::with_namespace(
2435 handle.server_id(),
2436 handle.connection.clone(),
2437 tool,
2438 &self.namespace,
2439 ));
2440 }
2441 registry
2442 })
2443 }
2444
2445 pub fn source(&self) -> CatalogReader {
2457 self.catalog_writer.reader()
2458 }
2459
2460 fn apply_catalog_events(
2465 &mut self,
2466 server_id: &McpServerId,
2467 snapshot: &McpDiscoverySnapshot,
2468 events: &[McpCatalogEvent],
2469 ) {
2470 for event in events {
2471 if let McpCatalogEvent::ToolsChanged {
2472 added,
2473 removed,
2474 changed,
2475 ..
2476 } = event
2477 {
2478 self.apply_server_tool_diff(server_id, snapshot, added, removed, changed);
2479 }
2480 }
2481 }
2482
2483 fn register_server_tools(&mut self, server_id: &McpServerId, snapshot: &McpDiscoverySnapshot) {
2487 let connection = match self.connections.get(server_id) {
2488 Some(handle) => handle.connection.clone(),
2489 None => return,
2490 };
2491 let previous = self.server_tools.remove(server_id).unwrap_or_default();
2492 let mut names = BTreeSet::new();
2493 for tool in &snapshot.tools {
2494 let adapter = McpToolAdapter::with_namespace(
2495 server_id,
2496 connection.clone(),
2497 tool.clone(),
2498 &self.namespace,
2499 );
2500 names.insert(adapter.spec().name.clone());
2501 self.catalog_writer.upsert(Arc::new(adapter));
2502 }
2503 for stale in previous.difference(&names) {
2504 self.catalog_writer.remove(stale);
2505 }
2506 self.server_tools.insert(server_id.clone(), names);
2507 }
2508
2509 fn unregister_server_tools(&mut self, server_id: &McpServerId) {
2512 let Some(names) = self.server_tools.remove(server_id) else {
2513 return;
2514 };
2515 for name in names {
2516 self.catalog_writer.remove(&name);
2517 }
2518 }
2519
2520 fn apply_server_tool_diff(
2524 &mut self,
2525 server_id: &McpServerId,
2526 snapshot: &McpDiscoverySnapshot,
2527 added: &[String],
2528 removed: &[String],
2529 changed: &[String],
2530 ) {
2531 let connection = match self.connections.get(server_id) {
2532 Some(handle) => handle.connection.clone(),
2533 None => return,
2534 };
2535 let names = self.server_tools.entry(server_id.clone()).or_default();
2536
2537 for raw_name in removed {
2538 let agentkit_name = ToolName::new(self.namespace.apply(server_id, raw_name));
2539 if names.remove(&agentkit_name) {
2540 self.catalog_writer.remove(&agentkit_name);
2541 }
2542 }
2543
2544 let upsert_one = |raw_name: &str| -> Option<(ToolName, McpToolAdapter)> {
2545 let tool = snapshot
2546 .tools
2547 .iter()
2548 .find(|tool| tool.name.as_ref() == raw_name)?
2549 .clone();
2550 let adapter = McpToolAdapter::with_namespace(
2551 server_id,
2552 connection.clone(),
2553 tool,
2554 &self.namespace,
2555 );
2556 Some((adapter.spec().name.clone(), adapter))
2557 };
2558
2559 for raw_name in added.iter().chain(changed.iter()) {
2560 if let Some((agentkit_name, adapter)) = upsert_one(raw_name) {
2561 names.insert(agentkit_name);
2562 self.catalog_writer.upsert(Arc::new(adapter));
2563 }
2564 }
2565 }
2566
2567 pub fn capability_provider(&self) -> McpCapabilityProvider {
2569 McpCapabilityProvider::merge(
2570 self.connections
2571 .values()
2572 .map(McpServerHandle::capability_provider),
2573 )
2574 }
2575}
2576
2577fn diff_discovery_snapshots(
2578 server_id: &McpServerId,
2579 previous: &McpDiscoverySnapshot,
2580 current: &McpDiscoverySnapshot,
2581) -> Vec<McpCatalogEvent> {
2582 let mut events = Vec::new();
2583 let (added, removed, changed) = diff_named_items(
2584 previous.tools.iter().map(|item| (item.name.as_ref(), item)),
2585 current.tools.iter().map(|item| (item.name.as_ref(), item)),
2586 );
2587 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2588 events.push(McpCatalogEvent::ToolsChanged {
2589 server_id: server_id.clone(),
2590 added,
2591 removed,
2592 changed,
2593 });
2594 }
2595
2596 let (added, removed, changed) = diff_named_items(
2597 previous
2598 .resources
2599 .iter()
2600 .map(|item| (item.uri.as_str(), item)),
2601 current
2602 .resources
2603 .iter()
2604 .map(|item| (item.uri.as_str(), item)),
2605 );
2606 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2607 events.push(McpCatalogEvent::ResourcesChanged {
2608 server_id: server_id.clone(),
2609 added,
2610 removed,
2611 changed,
2612 });
2613 }
2614
2615 let (added, removed, changed) = diff_named_items(
2616 previous
2617 .prompts
2618 .iter()
2619 .map(|item| (item.name.as_str(), item)),
2620 current
2621 .prompts
2622 .iter()
2623 .map(|item| (item.name.as_str(), item)),
2624 );
2625 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2626 events.push(McpCatalogEvent::PromptsChanged {
2627 server_id: server_id.clone(),
2628 added,
2629 removed,
2630 changed,
2631 });
2632 }
2633
2634 events
2635}
2636
2637fn diff_named_items<'a, T>(
2641 previous: impl IntoIterator<Item = (&'a str, &'a T)>,
2642 current: impl IntoIterator<Item = (&'a str, &'a T)>,
2643) -> (Vec<String>, Vec<String>, Vec<String>)
2644where
2645 T: PartialEq + 'a,
2646{
2647 let mut prev: Vec<(&str, &T)> = previous.into_iter().collect();
2648 let mut curr: Vec<(&str, &T)> = current.into_iter().collect();
2649 prev.sort_unstable_by_key(|(name, _)| *name);
2650 curr.sort_unstable_by_key(|(name, _)| *name);
2651
2652 let mut added = Vec::new();
2653 let mut removed = Vec::new();
2654 let mut changed = Vec::new();
2655 let (mut i, mut j) = (0, 0);
2656 while i < prev.len() && j < curr.len() {
2657 match prev[i].0.cmp(curr[j].0) {
2658 std::cmp::Ordering::Less => {
2659 removed.push(prev[i].0.to_string());
2660 i += 1;
2661 }
2662 std::cmp::Ordering::Greater => {
2663 added.push(curr[j].0.to_string());
2664 j += 1;
2665 }
2666 std::cmp::Ordering::Equal => {
2667 if prev[i].1 != curr[j].1 {
2668 changed.push(curr[j].0.to_string());
2669 }
2670 i += 1;
2671 j += 1;
2672 }
2673 }
2674 }
2675 while i < prev.len() {
2676 removed.push(prev[i].0.to_string());
2677 i += 1;
2678 }
2679 while j < curr.len() {
2680 added.push(curr[j].0.to_string());
2681 j += 1;
2682 }
2683
2684 (added, removed, changed)
2685}
2686
2687pub struct McpToolAdapter {
2689 tool_name: String,
2690 connection: Arc<McpConnection>,
2691 spec: ToolSpec,
2692}
2693
2694impl McpToolAdapter {
2695 pub fn new(server_id: &McpServerId, connection: Arc<McpConnection>, tool: McpTool) -> Self {
2698 Self::with_namespace(server_id, connection, tool, &McpToolNamespace::Default)
2699 }
2700
2701 pub fn with_namespace(
2703 server_id: &McpServerId,
2704 connection: Arc<McpConnection>,
2705 tool: McpTool,
2706 namespace: &McpToolNamespace,
2707 ) -> Self {
2708 let spec = tool_spec_from_tool(server_id, &tool, namespace);
2709 Self {
2710 tool_name: tool.name.into_owned(),
2711 connection,
2712 spec,
2713 }
2714 }
2715
2716 async fn handle_invocation_error(
2717 &self,
2718 err: McpInvocationError,
2719 input: &Value,
2720 ) -> Result<CallToolResult, ToolError> {
2721 let Some(responder) = self.connection.handler_config().error_responder.clone() else {
2722 return Err(ToolError::ExecutionFailed(err.to_string()));
2723 };
2724 let method = McpMethod::ToolsCall {
2725 name: self.tool_name.clone(),
2726 arguments: input.clone(),
2727 };
2728 let ctx = McpErrorContext {
2729 server_id: self.connection.server_id(),
2730 method: &method,
2731 input: Some(input),
2732 };
2733 match responder.handle(&err, ctx).await {
2734 ErrorResponderOutcome::SynthesizeResult(result) => Ok(result),
2735 ErrorResponderOutcome::PassThrough => Err(ToolError::ExecutionFailed(err.to_string())),
2736 }
2737 }
2738}
2739
2740#[async_trait]
2741impl Tool for McpToolAdapter {
2742 fn spec(&self) -> &ToolSpec {
2743 &self.spec
2744 }
2745
2746 async fn invoke(
2747 &self,
2748 request: ToolRequest,
2749 _ctx: &mut ToolContext<'_>,
2750 ) -> Result<ToolResult, ToolError> {
2751 let input = request.input;
2752 let result = match self
2753 .connection
2754 .call_tool(&self.tool_name, input.clone())
2755 .await
2756 {
2757 Ok(result) => result,
2758 Err(McpError::AuthRequired(auth_request)) => {
2759 let responder = self
2760 .connection
2761 .handler_config()
2762 .auth
2763 .clone()
2764 .ok_or_else(|| {
2765 ToolError::ExecutionFailed(
2766 "MCP server requires auth but no McpAuthResponder is registered".into(),
2767 )
2768 })?;
2769 let resolution = responder.resolve(*auth_request).await.map_err(|error| {
2770 ToolError::ExecutionFailed(format!("auth responder failed: {error}"))
2771 })?;
2772 match &resolution {
2773 AuthResolution::Provided { .. } => {
2774 self.connection
2775 .resolve_auth(resolution.clone())
2776 .await
2777 .map_err(|error| {
2778 ToolError::ExecutionFailed(format!(
2779 "applying auth resolution failed: {error}"
2780 ))
2781 })?;
2782 }
2783 AuthResolution::Cancelled { .. } => {
2784 return Err(ToolError::ExecutionFailed(
2785 "user cancelled MCP auth flow".into(),
2786 ));
2787 }
2788 }
2789 match self
2790 .connection
2791 .call_tool(&self.tool_name, input.clone())
2792 .await
2793 {
2794 Ok(result) => result,
2795 Err(McpError::AuthRequired(req)) => {
2796 return Err(ToolError::ExecutionFailed(format!(
2797 "MCP auth challenge unresolved after retry: {}",
2798 req.id
2799 )));
2800 }
2801 Err(McpError::Invocation(err)) => {
2802 self.handle_invocation_error(err, &input).await?
2803 }
2804 Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
2805 }
2806 }
2807 Err(McpError::Invocation(err)) => self.handle_invocation_error(err, &input).await?,
2808 Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
2809 };
2810
2811 let is_error = result.is_error.unwrap_or(false);
2812 Ok(ToolResult {
2813 result: ToolResultPart {
2814 call_id: request.call_id,
2815 output: call_tool_result_to_tool_output(result),
2816 is_error,
2817 metadata: MetadataMap::new(),
2818 },
2819 duration: None,
2820 metadata: MetadataMap::new(),
2821 })
2822 }
2823}
2824
2825fn rmcp_server_capabilities_to_agentkit(
2826 capabilities: &rmcp_model::ServerCapabilities,
2827) -> McpServerCapabilities {
2828 McpServerCapabilities {
2829 tools: capabilities.tools.as_ref().map(|tools| ToolsCapability {
2830 list_changed: tools.list_changed,
2831 }),
2832 resources: capabilities
2833 .resources
2834 .as_ref()
2835 .map(|resources| ResourcesCapability {
2836 subscribe: resources.subscribe,
2837 list_changed: resources.list_changed,
2838 }),
2839 prompts: capabilities
2840 .prompts
2841 .as_ref()
2842 .map(|prompts| PromptsCapability {
2843 list_changed: prompts.list_changed,
2844 }),
2845 logging: capabilities.logging.as_ref().map(|_| LoggingCapability {}),
2846 }
2847}
2848
2849fn tool_spec_from_tool(
2850 server_id: &McpServerId,
2851 tool: &McpTool,
2852 namespace: &McpToolNamespace,
2853) -> ToolSpec {
2854 ToolSpec {
2855 name: ToolName::new(namespace.apply(server_id, &tool.name)),
2856 description: tool
2857 .description
2858 .as_ref()
2859 .map(|d| d.to_string())
2860 .unwrap_or_else(|| tool.name.to_string()),
2861 input_schema: Value::Object((*tool.input_schema).clone()),
2862 annotations: tool_annotations_from_rmcp(tool.annotations.as_ref()),
2863 metadata: MetadataMap::new(),
2864 }
2865}
2866
2867fn tool_annotations_from_rmcp(annotations: Option<&McpToolAnnotations>) -> ToolAnnotations {
2868 let Some(annotations) = annotations else {
2869 return ToolAnnotations::default();
2870 };
2871 ToolAnnotations {
2878 read_only_hint: annotations.read_only_hint.unwrap_or(false),
2879 destructive_hint: annotations.destructive_hint.unwrap_or(false),
2880 idempotent_hint: annotations.idempotent_hint.unwrap_or(false),
2881 needs_approval_hint: false,
2882 supports_streaming_hint: false,
2883 }
2884}
2885
2886fn resource_descriptor_from_rmcp(resource: McpResource) -> ResourceDescriptor {
2887 let raw = resource.raw;
2888 ResourceDescriptor {
2889 id: ResourceId::new(raw.uri),
2890 name: raw.name,
2891 description: raw.description,
2892 mime_type: raw.mime_type,
2893 metadata: MetadataMap::new(),
2894 }
2895}
2896
2897fn prompt_descriptor_from_rmcp(prompt: McpPrompt) -> PromptDescriptor {
2898 let arguments = prompt.arguments.unwrap_or_default();
2899 let mut required = Vec::new();
2900 let properties = arguments
2901 .into_iter()
2902 .map(|argument| {
2903 let mut schema = serde_json::Map::new();
2904 schema.insert("type".into(), Value::String("string".into()));
2905 if let Some(description) = argument.description {
2906 schema.insert("description".into(), Value::String(description));
2907 }
2908 if argument.required.unwrap_or(false) {
2909 required.push(Value::String(argument.name.clone()));
2910 }
2911 (argument.name, Value::Object(schema))
2912 })
2913 .collect::<serde_json::Map<String, Value>>();
2914 let mut input_schema = serde_json::Map::new();
2915 input_schema.insert("type".into(), Value::String("object".into()));
2916 input_schema.insert("properties".into(), Value::Object(properties));
2917 if !required.is_empty() {
2918 input_schema.insert("required".into(), Value::Array(required));
2919 }
2920
2921 PromptDescriptor {
2922 id: PromptId::new(prompt.name.clone()),
2923 name: prompt.name,
2924 description: prompt.description,
2925 input_schema: Value::Object(input_schema),
2926 metadata: MetadataMap::new(),
2927 }
2928}
2929
2930fn read_resource_result_to_capabilities(
2931 result: ReadResourceResult,
2932) -> Result<ResourceContents, McpError> {
2933 let content = result
2934 .contents
2935 .into_iter()
2936 .next()
2937 .ok_or_else(|| McpError::Protocol("resources/read returned no contents".into()))?;
2938 Ok(resource_contents_to_capabilities(content))
2939}
2940
2941fn resource_contents_to_capabilities(content: McpResourceContents) -> ResourceContents {
2942 let mut metadata = MetadataMap::new();
2943 let data = match content {
2944 McpResourceContents::TextResourceContents {
2945 text, mime_type, ..
2946 } => {
2947 if let Some(mime) = mime_type {
2948 metadata.insert("mime_type".into(), Value::String(mime));
2949 }
2950 DataRef::InlineText(text)
2951 }
2952 McpResourceContents::BlobResourceContents {
2953 blob,
2954 mime_type,
2955 uri,
2956 ..
2957 } => {
2958 if let Some(mime) = mime_type {
2959 metadata.insert("mime_type".into(), Value::String(mime));
2960 }
2961 metadata.insert("uri".into(), Value::String(uri));
2962 DataRef::InlineText(blob)
2964 }
2965 };
2966 ResourceContents { data, metadata }
2967}
2968
2969fn get_prompt_result_to_capabilities(result: GetPromptResult) -> PromptContents {
2970 let items = result
2971 .messages
2972 .into_iter()
2973 .map(prompt_message_to_item)
2974 .collect();
2975 let mut metadata = MetadataMap::new();
2976 if let Some(description) = result.description {
2977 metadata.insert("description".into(), Value::String(description));
2978 }
2979 PromptContents { items, metadata }
2980}
2981
2982fn prompt_message_to_item(message: PromptMessage) -> Item {
2983 let kind = match message.role {
2984 PromptMessageRole::Assistant => ItemKind::Assistant,
2985 PromptMessageRole::User => ItemKind::User,
2986 };
2987 Item {
2988 id: None,
2989 kind,
2990 parts: vec![prompt_message_content_to_part(message.content)],
2991 metadata: MetadataMap::new(),
2992 usage: None,
2993 finish_reason: None,
2994 created_at: None,
2995 }
2996}
2997
2998fn prompt_message_content_to_part(content: PromptMessageContent) -> Part {
2999 match content {
3000 PromptMessageContent::Text { text } => Part::Text(TextPart::new(text)),
3001 PromptMessageContent::Image { image } => Part::Media(MediaPart::new(
3002 Modality::Image,
3003 image.mime_type.clone(),
3004 DataRef::InlineText(image.data.clone()),
3005 )),
3006 PromptMessageContent::Resource { resource } => {
3007 let agentkit_resource = resource_contents_to_capabilities(resource.resource.clone());
3008 agentkit_part_from_resource(agentkit_resource)
3009 }
3010 PromptMessageContent::ResourceLink { link } => Part::Text(TextPart::new(link.uri.clone())),
3011 }
3012}
3013
3014fn agentkit_part_from_resource(resource: ResourceContents) -> Part {
3015 let mime = resource
3016 .metadata
3017 .get("mime_type")
3018 .and_then(Value::as_str)
3019 .unwrap_or("text/plain")
3020 .to_string();
3021 Part::Media(MediaPart::new(Modality::Binary, mime, resource.data))
3022}
3023
3024fn call_tool_result_to_tool_output(result: CallToolResult) -> ToolOutput {
3025 if let Some(structured) = result.structured_content {
3026 return ToolOutput::Structured(structured);
3027 }
3028 let parts = call_tool_content_to_parts(result.content);
3029 if parts.iter().all(|part| matches!(part, Part::Text(_))) {
3030 let text = parts
3031 .iter()
3032 .filter_map(|part| match part {
3033 Part::Text(text) => Some(text.text.clone()),
3034 _ => None,
3035 })
3036 .collect::<Vec<_>>()
3037 .join("\n");
3038 ToolOutput::Text(text)
3039 } else {
3040 ToolOutput::Parts(parts)
3041 }
3042}
3043
3044fn call_tool_content_to_parts(contents: Vec<Content>) -> Vec<Part> {
3045 contents.into_iter().map(content_to_part).collect()
3046}
3047
3048fn content_to_part(content: Content) -> Part {
3049 match content.raw {
3050 RawContent::Text(text) => Part::Text(TextPart::new(text.text)),
3051 RawContent::Image(image) => Part::Media(MediaPart::new(
3052 Modality::Image,
3053 image.mime_type,
3054 DataRef::InlineText(image.data),
3055 )),
3056 RawContent::Audio(audio) => Part::Media(MediaPart::new(
3057 Modality::Audio,
3058 audio.mime_type,
3059 DataRef::InlineText(audio.data),
3060 )),
3061 RawContent::Resource(embedded) => {
3062 agentkit_part_from_resource(resource_contents_to_capabilities(embedded.resource))
3063 }
3064 RawContent::ResourceLink(link) => Part::Text(TextPart::new(link.uri)),
3065 }
3066}
3067
3068fn value_to_json_object(value: Value, context: &str) -> Result<rmcp_model::JsonObject, McpError> {
3069 match value {
3070 Value::Object(object) => Ok(object),
3071 Value::Null => Ok(serde_json::Map::new()),
3072 other => Err(McpError::Protocol(format!(
3073 "{context} must be a JSON object, got {other}"
3074 ))),
3075 }
3076}
3077
3078fn bearer_token_from_metadata(metadata: &MetadataMap) -> Option<String> {
3079 ["bearer_token", "access_token", "token", "api_key"]
3080 .into_iter()
3081 .find_map(|key| metadata.get(key).and_then(Value::as_str).map(str::to_owned))
3082}
3083
3084fn rmcp_initialize_error(config: &McpServerConfig, error: ClientInitializeError) -> McpError {
3085 if let Some(signal) = match &error {
3086 ClientInitializeError::TransportError { error: dyn_err, .. } => {
3087 transport_auth_signal(dyn_err)
3088 }
3089 _ => None,
3090 } {
3091 return McpError::AuthRequired(Box::new(auth_request_from_signal(
3092 &config.id,
3093 McpMethod::Initialize,
3094 signal,
3095 &error.to_string(),
3096 )));
3097 }
3098 McpError::Transport(error.to_string())
3099}
3100
3101fn rmcp_service_error(error: ServiceError) -> McpError {
3102 service_error_to_mcp_error(error)
3103}
3104
3105fn rmcp_operation_error(
3106 server_id: &McpServerId,
3107 method: McpMethod,
3108 error: ServiceError,
3109) -> McpError {
3110 if let Some(signal) = service_auth_signal(&error) {
3111 return McpError::AuthRequired(Box::new(auth_request_from_signal(
3112 server_id,
3113 method,
3114 signal,
3115 &error.to_string(),
3116 )));
3117 }
3118 service_error_to_mcp_error(error)
3119}
3120
3121fn service_error_to_mcp_error(error: ServiceError) -> McpError {
3122 match error {
3123 ServiceError::McpError(data) => {
3124 McpError::Invocation(McpInvocationError::from_error_data(data))
3125 }
3126 other => McpError::Transport(other.to_string()),
3127 }
3128}
3129
3130#[derive(Debug)]
3131enum AuthSignal {
3132 Required {
3133 www_authenticate: Option<String>,
3134 },
3135 InsufficientScope {
3136 www_authenticate: Option<String>,
3137 required_scope: Option<String>,
3138 },
3139}
3140
3141fn service_auth_signal(error: &ServiceError) -> Option<AuthSignal> {
3142 match error {
3143 ServiceError::TransportSend(dyn_err) => transport_auth_signal(dyn_err),
3144 _ => None,
3145 }
3146}
3147
3148fn transport_auth_signal(error: &DynamicTransportError) -> Option<AuthSignal> {
3149 let inner = error
3150 .error
3151 .downcast_ref::<StreamableHttpError<reqwest::Error>>()?;
3152 match inner {
3153 StreamableHttpError::AuthRequired(AuthRequiredError {
3154 www_authenticate_header,
3155 ..
3156 }) => Some(AuthSignal::Required {
3157 www_authenticate: Some(www_authenticate_header.clone()),
3158 }),
3159 StreamableHttpError::InsufficientScope(InsufficientScopeError {
3160 www_authenticate_header,
3161 required_scope,
3162 ..
3163 }) => Some(AuthSignal::InsufficientScope {
3164 www_authenticate: Some(www_authenticate_header.clone()),
3165 required_scope: required_scope.clone(),
3166 }),
3167 _ => None,
3168 }
3169}
3170
3171fn auth_request_from_signal(
3172 server_id: &McpServerId,
3173 method: McpMethod,
3174 signal: AuthSignal,
3175 message: &str,
3176) -> AuthRequest {
3177 let method_name = method.method_name();
3178 let mut challenge = MetadataMap::new();
3179 challenge.insert("server_id".into(), Value::String(server_id.to_string()));
3180 challenge.insert("method".into(), Value::String(method_name.into()));
3181 challenge.insert("message".into(), Value::String(message.into()));
3182 challenge.insert("flow_kind".into(), Value::String("http_bearer".into()));
3183 match signal {
3184 AuthSignal::Required { www_authenticate } => {
3185 if let Some(header) = www_authenticate {
3186 challenge.insert("www_authenticate".into(), Value::String(header));
3187 }
3188 }
3189 AuthSignal::InsufficientScope {
3190 www_authenticate,
3191 required_scope,
3192 } => {
3193 challenge.insert("insufficient_scope".into(), Value::Bool(true));
3194 if let Some(header) = www_authenticate {
3195 challenge.insert("www_authenticate".into(), Value::String(header));
3196 }
3197 if let Some(scope) = required_scope {
3198 challenge.insert("required_scope".into(), Value::String(scope));
3199 }
3200 }
3201 }
3202 AuthRequest {
3203 id: format!("mcp:{}:{}", server_id, method_name),
3204 provider: format!("mcp.{}", server_id),
3205 operation: method.into_auth_operation(server_id),
3206 challenge,
3207 }
3208}
3209
3210#[derive(Debug, Clone)]
3217pub enum McpMethod {
3218 Initialize,
3220 ToolsCall {
3222 name: String,
3224 arguments: Value,
3226 },
3227 ResourcesRead {
3229 uri: String,
3231 },
3232 ResourcesSubscribe {
3234 uri: String,
3236 },
3237 ResourcesUnsubscribe {
3239 uri: String,
3241 },
3242 PromptsGet {
3244 name: String,
3246 arguments: Value,
3248 },
3249 LoggingSetLevel {
3251 level: String,
3253 },
3254}
3255
3256impl McpMethod {
3257 pub fn method_name(&self) -> &'static str {
3259 match self {
3260 Self::Initialize => "initialize",
3261 Self::ToolsCall { .. } => "tools/call",
3262 Self::ResourcesRead { .. } => "resources/read",
3263 Self::ResourcesSubscribe { .. } => "resources/subscribe",
3264 Self::ResourcesUnsubscribe { .. } => "resources/unsubscribe",
3265 Self::PromptsGet { .. } => "prompts/get",
3266 Self::LoggingSetLevel { .. } => "logging/setLevel",
3267 }
3268 }
3269
3270 fn into_auth_operation(self, server_id: &McpServerId) -> AuthOperation {
3271 let server = server_id.to_string();
3272 match self {
3273 Self::Initialize => AuthOperation::McpConnect {
3274 server_id: server,
3275 metadata: MetadataMap::new(),
3276 },
3277 Self::ToolsCall { name, arguments } => AuthOperation::McpToolCall {
3278 server_id: server,
3279 tool_name: name,
3280 input: arguments,
3281 metadata: MetadataMap::new(),
3282 },
3283 Self::ResourcesRead { uri } => AuthOperation::McpResourceRead {
3284 server_id: server,
3285 resource_id: uri,
3286 metadata: MetadataMap::new(),
3287 },
3288 Self::PromptsGet { name, arguments } => AuthOperation::McpPromptGet {
3289 server_id: server,
3290 prompt_id: name,
3291 args: arguments,
3292 metadata: MetadataMap::new(),
3293 },
3294 other @ (Self::ResourcesSubscribe { .. }
3295 | Self::ResourcesUnsubscribe { .. }
3296 | Self::LoggingSetLevel { .. }) => {
3297 let method = other.method_name().to_string();
3298 AuthOperation::McpOther {
3299 server_id: server,
3300 method,
3301 params: other.into_params_json(),
3302 metadata: MetadataMap::new(),
3303 }
3304 }
3305 }
3306 }
3307
3308 fn into_params_json(self) -> Value {
3309 match self {
3310 Self::Initialize => json!({}),
3311 Self::ToolsCall { name, arguments } => json!({ "name": name, "arguments": arguments }),
3312 Self::ResourcesRead { uri } => json!({ "uri": uri }),
3313 Self::ResourcesSubscribe { uri } => json!({ "uri": uri }),
3314 Self::ResourcesUnsubscribe { uri } => json!({ "uri": uri }),
3315 Self::PromptsGet { name, arguments } => {
3316 json!({ "name": name, "arguments": arguments })
3317 }
3318 Self::LoggingSetLevel { level } => json!({ "level": level }),
3319 }
3320 }
3321}
3322
3323#[derive(Debug, Error)]
3325pub enum McpError {
3326 #[error("io error: {0}")]
3328 Io(#[from] std::io::Error),
3329 #[error("serialization error: {0}")]
3331 Serialize(#[from] serde_json::Error),
3332 #[error("transport error: {0}")]
3334 Transport(String),
3335 #[error("protocol error: {0}")]
3337 Protocol(String),
3338 #[error("MCP auth required: {0:?}")]
3340 AuthRequired(Box<AuthRequest>),
3341 #[error("auth resolution error: {0}")]
3343 AuthResolution(String),
3344 #[error("invocation error: {0}")]
3346 Invocation(McpInvocationError),
3347 #[error("unknown MCP server: {0}")]
3349 UnknownServer(String),
3350}
3351
3352impl From<&str> for McpServerId {
3353 fn from(value: &str) -> Self {
3354 Self::new(value)
3355 }
3356}
3357
3358impl From<String> for McpServerId {
3359 fn from(value: String) -> Self {
3360 Self::new(value)
3361 }
3362}