1use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::fmt;
24use std::sync::{Arc, RwLock};
25
26use agentkit_capabilities::{
27 CapabilityContext, CapabilityError, CapabilityProvider, Invocable, PromptContents,
28 PromptDescriptor, PromptId, PromptProvider, ResourceContents, ResourceDescriptor, ResourceId,
29 ResourceProvider,
30};
31use agentkit_core::{
32 DataRef, Item, ItemKind, MediaPart, MetadataMap, Modality, Part, TextPart, ToolOutput,
33 ToolResultPart,
34};
35use agentkit_tools_core::{
36 AllowAllPermissions, CatalogReader, CatalogWriter, PermissionChecker, Tool, ToolAnnotations,
37 ToolCapabilityProvider, ToolContext, ToolError, ToolName, ToolRegistry, ToolRequest,
38 ToolResult, ToolSpec, dynamic_catalog,
39};
40use async_trait::async_trait;
41use futures_util::future::try_join_all;
42use futures_util::stream::BoxStream;
43use http::{HeaderName, HeaderValue};
44use rmcp::ServiceExt;
45use rmcp::handler::client::ClientHandler;
46use rmcp::model as rmcp_model;
47use rmcp::service::{ClientInitializeError, Peer, RoleClient, RunningService, ServiceError};
48use rmcp::transport::streamable_http_client::{
49 AuthRequiredError, InsufficientScopeError, StreamableHttpClient as RmcpStreamableHttpClient,
50 StreamableHttpClientTransportConfig as RmcpStreamableHttpClientTransportConfig,
51 StreamableHttpError, StreamableHttpPostResponse,
52};
53use rmcp::transport::{
54 ConfigureCommandExt, DynamicTransportError, StreamableHttpClientTransport, TokioChildProcess,
55};
56use serde::{Deserialize, Serialize};
57use serde_json::{Value, json};
58use sse_stream::{Error as SseError, Sse};
59use thiserror::Error;
60use tokio::sync::{Mutex, broadcast, mpsc};
61
62pub use rmcp::model::{
67 Annotations as McpAnnotations, AudioContent, CallToolResult,
68 CancelledNotificationParam as McpCancelledNotificationParam,
69 ClientCapabilities as McpClientCapabilities, Content,
70 CreateElicitationRequestParams as McpCreateElicitationRequestParams,
71 CreateElicitationResult as McpCreateElicitationResult,
72 CreateMessageRequestParams as McpCreateMessageRequestParams,
73 CreateMessageResult as McpCreateMessageResult, ElicitationAction as McpElicitationAction,
74 ElicitationCapability as McpElicitationCapability, EmbeddedResource,
75 FormElicitationCapability as McpFormElicitationCapability, GetPromptResult, ImageContent,
76 Implementation as McpImplementation, ListRootsResult as McpListRootsResult,
77 LoggingLevel as McpLoggingLevel,
78 LoggingMessageNotificationParam as McpLoggingMessageNotificationParam,
79 ProgressNotificationParam as McpProgressNotificationParam, Prompt as McpPrompt, PromptArgument,
80 PromptMessage, PromptMessageContent, PromptMessageRole, RawAudioContent, RawContent,
81 RawEmbeddedResource, RawImageContent, RawResource as McpRawResource, RawTextContent,
82 ReadResourceResult, Resource as McpResource, ResourceContents as McpResourceContents,
83 ResourceUpdatedNotificationParam as McpResourceUpdatedNotificationParam, Root as McpRoot,
84 RootsCapabilities as McpRootsCapabilities, SamplingCapability as McpSamplingCapability,
85 SamplingMessage as McpSamplingMessage, SetLevelRequestParams as McpSetLevelRequestParams,
86 TextContent, Tool as McpTool, ToolAnnotations as McpToolAnnotations,
87 UrlElicitationCapability as McpUrlElicitationCapability,
88};
89
90pub use rmcp::model::ClientJsonRpcMessage;
93
94pub use rmcp::transport::streamable_http_client::{
97 StreamableHttpError as McpStreamableHttpError,
98 StreamableHttpPostResponse as McpStreamableHttpPostResponse,
99};
100
101pub use sse_stream::{Error as McpSseError, Sse as McpSse};
103
104pub type McpToolDescriptor = McpTool;
106pub type McpResourceDescriptor = McpResource;
108pub type McpPromptDescriptor = McpPrompt;
110
111#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
120pub struct AuthRequest {
121 pub id: String,
123 pub provider: String,
125 pub operation: AuthOperation,
127 pub challenge: MetadataMap,
129}
130
131impl AuthRequest {
132 pub fn server_id(&self) -> Option<&str> {
134 self.operation.server_id()
135 }
136}
137
138#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140pub enum AuthOperation {
141 McpConnect {
143 server_id: String,
144 metadata: MetadataMap,
145 },
146 McpToolCall {
148 server_id: String,
149 tool_name: String,
150 input: Value,
151 metadata: MetadataMap,
152 },
153 McpResourceRead {
155 server_id: String,
156 resource_id: String,
157 metadata: MetadataMap,
158 },
159 McpPromptGet {
161 server_id: String,
162 prompt_id: String,
163 args: Value,
164 metadata: MetadataMap,
165 },
166 McpOther {
171 server_id: String,
172 method: String,
173 params: Value,
174 metadata: MetadataMap,
175 },
176}
177
178impl AuthOperation {
179 pub fn server_id(&self) -> Option<&str> {
181 match self {
182 Self::McpConnect { server_id, .. }
183 | Self::McpToolCall { server_id, .. }
184 | Self::McpResourceRead { server_id, .. }
185 | Self::McpPromptGet { server_id, .. }
186 | Self::McpOther { server_id, .. } => Some(server_id.as_str()),
187 }
188 }
189}
190
191#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
193pub enum AuthResolution {
194 Provided {
196 request: AuthRequest,
197 credentials: MetadataMap,
198 },
199 Cancelled { request: AuthRequest },
201}
202
203impl AuthResolution {
204 pub fn provided(request: AuthRequest, credentials: MetadataMap) -> Self {
206 Self::Provided {
207 request,
208 credentials,
209 }
210 }
211
212 pub fn cancelled(request: AuthRequest) -> Self {
214 Self::Cancelled { request }
215 }
216
217 pub fn request(&self) -> &AuthRequest {
219 match self {
220 Self::Provided { request, .. } | Self::Cancelled { request } => request,
221 }
222 }
223}
224
225#[async_trait]
238pub trait McpAuthResponder: Send + Sync + 'static {
239 async fn resolve(&self, request: AuthRequest) -> Result<AuthResolution, McpError>;
240}
241
242#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
246pub struct McpServerId(pub String);
247
248impl McpServerId {
249 pub fn new(value: impl Into<String>) -> Self {
251 Self(value.into())
252 }
253}
254
255impl fmt::Display for McpServerId {
256 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
257 self.0.fmt(f)
258 }
259}
260
261#[derive(Clone, Debug, PartialEq, Eq)]
266pub struct StdioTransportConfig {
267 pub command: String,
269 pub args: Vec<String>,
271 pub env: Vec<(String, String)>,
273 pub cwd: Option<std::path::PathBuf>,
275}
276
277impl StdioTransportConfig {
278 pub fn new(command: impl Into<String>) -> Self {
280 Self {
281 command: command.into(),
282 args: Vec::new(),
283 env: Vec::new(),
284 cwd: None,
285 }
286 }
287
288 pub fn with_arg(mut self, arg: impl Into<String>) -> Self {
290 self.args.push(arg.into());
291 self
292 }
293
294 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
296 self.env.push((key.into(), value.into()));
297 self
298 }
299
300 pub fn with_cwd(mut self, cwd: impl Into<std::path::PathBuf>) -> Self {
302 self.cwd = Some(cwd.into());
303 self
304 }
305}
306
307#[derive(Clone, Default)]
309pub struct StreamableHttpTransportConfig {
310 pub url: String,
312 pub bearer_token: Option<String>,
317 pub headers: Vec<(HeaderName, HeaderValue)>,
321 pub http_client: Option<Arc<dyn McpHttpClient>>,
328}
329
330impl fmt::Debug for StreamableHttpTransportConfig {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 f.debug_struct("StreamableHttpTransportConfig")
333 .field("url", &self.url)
334 .field(
335 "bearer_token",
336 &self.bearer_token.as_deref().map(|_| "<redacted>"),
337 )
338 .field("headers", &self.headers)
339 .field(
340 "http_client",
341 &self.http_client.as_ref().map(|_| "<custom>"),
342 )
343 .finish()
344 }
345}
346
347impl StreamableHttpTransportConfig {
348 pub fn new(url: impl Into<String>) -> Self {
350 Self {
351 url: url.into(),
352 bearer_token: None,
353 headers: Vec::new(),
354 http_client: None,
355 }
356 }
357
358 pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
363 self.bearer_token = Some(token.into());
364 self
365 }
366
367 pub fn with_http_client(mut self, client: Arc<dyn McpHttpClient>) -> Self {
376 self.http_client = Some(client);
377 self
378 }
379
380 pub fn with_header<N, V>(mut self, name: N, value: V) -> Result<Self, McpError>
385 where
386 N: TryInto<HeaderName>,
387 N::Error: fmt::Display,
388 V: TryInto<HeaderValue>,
389 V::Error: fmt::Display,
390 {
391 let name = name
392 .try_into()
393 .map_err(|error| McpError::Transport(format!("invalid HTTP header name: {error}")))?;
394 let value = value
395 .try_into()
396 .map_err(|error| McpError::Transport(format!("invalid HTTP header value: {error}")))?;
397 self.headers.push((name, value));
398 Ok(self)
399 }
400}
401
402pub type McpSseStream = BoxStream<'static, Result<Sse, SseError>>;
404
405#[async_trait]
424pub trait McpHttpClient: Send + Sync + 'static {
425 async fn post_message(
429 &self,
430 uri: Arc<str>,
431 message: ClientJsonRpcMessage,
432 session_id: Option<Arc<str>>,
433 auth_header: Option<String>,
434 custom_headers: HashMap<HeaderName, HeaderValue>,
435 ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>>;
436
437 async fn delete_session(
439 &self,
440 uri: Arc<str>,
441 session_id: Arc<str>,
442 auth_header: Option<String>,
443 custom_headers: HashMap<HeaderName, HeaderValue>,
444 ) -> Result<(), StreamableHttpError<reqwest::Error>>;
445
446 async fn get_stream(
449 &self,
450 uri: Arc<str>,
451 session_id: Arc<str>,
452 last_event_id: Option<String>,
453 auth_header: Option<String>,
454 custom_headers: HashMap<HeaderName, HeaderValue>,
455 ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>>;
456}
457
458#[derive(Clone)]
461struct DynHttpClient(Arc<dyn McpHttpClient>);
462
463impl RmcpStreamableHttpClient for DynHttpClient {
464 type Error = reqwest::Error;
465
466 async fn post_message(
467 &self,
468 uri: Arc<str>,
469 message: ClientJsonRpcMessage,
470 session_id: Option<Arc<str>>,
471 auth_header: Option<String>,
472 custom_headers: HashMap<HeaderName, HeaderValue>,
473 ) -> Result<StreamableHttpPostResponse, StreamableHttpError<reqwest::Error>> {
474 self.0
475 .post_message(uri, message, session_id, auth_header, custom_headers)
476 .await
477 }
478
479 async fn delete_session(
480 &self,
481 uri: Arc<str>,
482 session_id: Arc<str>,
483 auth_header: Option<String>,
484 custom_headers: HashMap<HeaderName, HeaderValue>,
485 ) -> Result<(), StreamableHttpError<reqwest::Error>> {
486 self.0
487 .delete_session(uri, session_id, auth_header, custom_headers)
488 .await
489 }
490
491 async fn get_stream(
492 &self,
493 uri: Arc<str>,
494 session_id: Arc<str>,
495 last_event_id: Option<String>,
496 auth_header: Option<String>,
497 custom_headers: HashMap<HeaderName, HeaderValue>,
498 ) -> Result<McpSseStream, StreamableHttpError<reqwest::Error>> {
499 self.0
500 .get_stream(uri, session_id, last_event_id, auth_header, custom_headers)
501 .await
502 }
503}
504
505#[derive(Clone, Debug)]
507pub enum McpTransportBinding {
508 Stdio(StdioTransportConfig),
510 StreamableHttp(StreamableHttpTransportConfig),
512}
513
514#[derive(Clone, Debug)]
516pub struct McpServerConfig {
517 pub id: McpServerId,
519 pub transport: McpTransportBinding,
521 pub metadata: MetadataMap,
523}
524
525impl McpServerConfig {
526 pub fn new(id: impl Into<String>, transport: McpTransportBinding) -> Self {
528 Self {
529 id: McpServerId::new(id),
530 transport,
531 metadata: MetadataMap::new(),
532 }
533 }
534
535 pub fn stdio(id: impl Into<String>, command: impl Into<String>) -> Self {
537 Self::new(
538 id,
539 McpTransportBinding::Stdio(StdioTransportConfig::new(command)),
540 )
541 }
542
543 pub fn streamable_http(id: impl Into<String>, url: impl Into<String>) -> Self {
545 Self::new(
546 id,
547 McpTransportBinding::StreamableHttp(StreamableHttpTransportConfig::new(url)),
548 )
549 }
550
551 pub fn with_metadata(mut self, metadata: MetadataMap) -> Self {
553 self.metadata = metadata;
554 self
555 }
556}
557
558type CustomNamespace = Arc<dyn Fn(&McpServerId, &str) -> String + Send + Sync>;
559
560#[derive(Clone, Default)]
568pub enum McpToolNamespace {
569 #[default]
571 Default,
572 None,
574 Custom(CustomNamespace),
576}
577
578impl fmt::Debug for McpToolNamespace {
579 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580 match self {
581 Self::Default => f.write_str("McpToolNamespace::Default"),
582 Self::None => f.write_str("McpToolNamespace::None"),
583 Self::Custom(_) => f.write_str("McpToolNamespace::Custom(<fn>)"),
584 }
585 }
586}
587
588impl McpToolNamespace {
589 pub fn custom(f: impl Fn(&McpServerId, &str) -> String + Send + Sync + 'static) -> Self {
591 Self::Custom(Arc::new(f))
592 }
593
594 pub fn apply(&self, server_id: &McpServerId, tool_name: &str) -> String {
596 match self {
597 Self::Default => format!("mcp_{server_id}_{tool_name}"),
598 Self::None => tool_name.to_string(),
599 Self::Custom(f) => f(server_id, tool_name),
600 }
601 }
602
603 pub fn unapply(&self, server_id: &McpServerId, agentkit_name: &str) -> Option<String> {
607 match self {
608 Self::Default => agentkit_name
609 .strip_prefix(&format!("mcp_{server_id}_"))
610 .map(str::to_string),
611 Self::None => Some(agentkit_name.to_string()),
612 Self::Custom(_) => None,
613 }
614 }
615}
616
617#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
626pub struct McpDiscoverySnapshot {
627 pub server_id: McpServerId,
629 pub tools: Vec<McpTool>,
631 pub resources: Vec<McpResource>,
633 pub prompts: Vec<McpPrompt>,
635 pub metadata: MetadataMap,
637}
638
639#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
641pub enum McpCatalogEvent {
642 ServerConnected { server_id: McpServerId },
644 ServerDisconnected { server_id: McpServerId },
646 ToolsChanged {
648 server_id: McpServerId,
649 added: Vec<String>,
650 removed: Vec<String>,
651 changed: Vec<String>,
652 },
653 ResourcesChanged {
655 server_id: McpServerId,
656 added: Vec<String>,
657 removed: Vec<String>,
658 changed: Vec<String>,
659 },
660 PromptsChanged {
662 server_id: McpServerId,
663 added: Vec<String>,
664 removed: Vec<String>,
665 changed: Vec<String>,
666 },
667 AuthChanged { server_id: McpServerId },
669 RefreshFailed {
671 server_id: McpServerId,
672 message: String,
673 },
674}
675
676#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
678pub struct McpServerCapabilities {
679 #[serde(default, skip_serializing_if = "Option::is_none")]
681 pub tools: Option<ToolsCapability>,
682 #[serde(default, skip_serializing_if = "Option::is_none")]
684 pub resources: Option<ResourcesCapability>,
685 #[serde(default, skip_serializing_if = "Option::is_none")]
687 pub prompts: Option<PromptsCapability>,
688 #[serde(default, skip_serializing_if = "Option::is_none")]
690 pub logging: Option<LoggingCapability>,
691}
692
693impl McpServerCapabilities {
694 pub fn all() -> Self {
697 Self {
698 tools: Some(ToolsCapability::default()),
699 resources: Some(ResourcesCapability::default()),
700 prompts: Some(PromptsCapability::default()),
701 logging: Some(LoggingCapability::default()),
702 }
703 }
704}
705
706#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
708#[serde(rename_all = "camelCase")]
709pub struct ToolsCapability {
710 #[serde(default, skip_serializing_if = "Option::is_none")]
712 pub list_changed: Option<bool>,
713}
714
715#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
717#[serde(rename_all = "camelCase")]
718pub struct ResourcesCapability {
719 #[serde(default, skip_serializing_if = "Option::is_none")]
721 pub subscribe: Option<bool>,
722 #[serde(default, skip_serializing_if = "Option::is_none")]
724 pub list_changed: Option<bool>,
725}
726
727#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
729#[serde(rename_all = "camelCase")]
730pub struct PromptsCapability {
731 #[serde(default, skip_serializing_if = "Option::is_none")]
733 pub list_changed: Option<bool>,
734}
735
736#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
738pub struct LoggingCapability {}
739
740#[allow(clippy::enum_variant_names)]
749#[derive(Clone, Debug)]
750pub enum McpServerNotification {
751 ToolsChanged,
753 ResourcesChanged,
755 PromptsChanged,
757}
758
759#[derive(Clone, Debug)]
767pub enum McpServerEvent {
768 Progress(McpProgressNotificationParam),
771 Logging(McpLoggingMessageNotificationParam),
774 ResourceUpdated(McpResourceUpdatedNotificationParam),
777 ToolListChanged,
779 ResourceListChanged,
781 PromptListChanged,
783 Cancelled(McpCancelledNotificationParam),
786}
787
788#[async_trait]
793pub trait McpSamplingResponder: Send + Sync + 'static {
794 async fn create_message(
797 &self,
798 params: McpCreateMessageRequestParams,
799 ) -> Result<McpCreateMessageResult, McpError>;
800}
801
802#[async_trait]
807pub trait McpElicitationResponder: Send + Sync + 'static {
808 async fn create_elicitation(
810 &self,
811 params: McpCreateElicitationRequestParams,
812 ) -> Result<McpCreateElicitationResult, McpError>;
813}
814
815#[async_trait]
820pub trait McpRootsProvider: Send + Sync + 'static {
821 async fn list_roots(&self) -> Result<Vec<McpRoot>, McpError>;
823}
824
825const DEFAULT_EVENTS_CAPACITY: usize = 128;
827
828pub struct McpClientChannels {
837 pub notifications: mpsc::UnboundedReceiver<McpServerNotification>,
839 pub events: broadcast::Sender<McpServerEvent>,
841}
842
843#[derive(Clone)]
851pub struct McpClientHandler {
852 info: rmcp_model::ClientInfo,
853 notifications: mpsc::UnboundedSender<McpServerNotification>,
854 events: broadcast::Sender<McpServerEvent>,
855 sampling: Option<Arc<dyn McpSamplingResponder>>,
856 elicitation: Option<Arc<dyn McpElicitationResponder>>,
857 roots: Option<Arc<dyn McpRootsProvider>>,
858}
859
860impl ClientHandler for McpClientHandler {
861 fn create_message(
862 &self,
863 params: rmcp_model::CreateMessageRequestParams,
864 _context: rmcp::service::RequestContext<RoleClient>,
865 ) -> impl Future<Output = Result<rmcp_model::CreateMessageResult, rmcp_model::ErrorData>>
866 + rmcp::service::MaybeSendFuture
867 + '_ {
868 let responder = self.sampling.clone();
869 async move {
870 match responder {
871 Some(responder) => responder.create_message(params).await.map_err(Into::into),
872 None => Err(rmcp_model::ErrorData::method_not_found::<
873 rmcp_model::CreateMessageRequestMethod,
874 >()),
875 }
876 }
877 }
878
879 fn list_roots(
880 &self,
881 _context: rmcp::service::RequestContext<RoleClient>,
882 ) -> impl Future<Output = Result<rmcp_model::ListRootsResult, rmcp_model::ErrorData>>
883 + rmcp::service::MaybeSendFuture
884 + '_ {
885 let provider = self.roots.clone();
886 async move {
887 match provider {
888 Some(provider) => provider
889 .list_roots()
890 .await
891 .map(McpListRootsResult::new)
892 .map_err(Into::into),
893 None => Ok(McpListRootsResult::default()),
894 }
895 }
896 }
897
898 fn create_elicitation(
899 &self,
900 params: rmcp_model::CreateElicitationRequestParams,
901 _context: rmcp::service::RequestContext<RoleClient>,
902 ) -> impl Future<Output = Result<rmcp_model::CreateElicitationResult, rmcp_model::ErrorData>>
903 + rmcp::service::MaybeSendFuture
904 + '_ {
905 let responder = self.elicitation.clone();
906 async move {
907 match responder {
908 Some(responder) => responder
909 .create_elicitation(params)
910 .await
911 .map_err(Into::into),
912 None => Ok(McpCreateElicitationResult::new(
913 McpElicitationAction::Decline,
914 )),
915 }
916 }
917 }
918
919 fn on_progress(
920 &self,
921 params: rmcp_model::ProgressNotificationParam,
922 _context: rmcp::service::NotificationContext<RoleClient>,
923 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
924 let _ = self.events.send(McpServerEvent::Progress(params));
925 std::future::ready(())
926 }
927
928 fn on_logging_message(
929 &self,
930 params: rmcp_model::LoggingMessageNotificationParam,
931 _context: rmcp::service::NotificationContext<RoleClient>,
932 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
933 let _ = self.events.send(McpServerEvent::Logging(params));
934 std::future::ready(())
935 }
936
937 fn on_resource_updated(
938 &self,
939 params: rmcp_model::ResourceUpdatedNotificationParam,
940 _context: rmcp::service::NotificationContext<RoleClient>,
941 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
942 let _ = self.events.send(McpServerEvent::ResourceUpdated(params));
943 std::future::ready(())
944 }
945
946 fn on_cancelled(
947 &self,
948 params: rmcp_model::CancelledNotificationParam,
949 _context: rmcp::service::NotificationContext<RoleClient>,
950 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
951 let _ = self.events.send(McpServerEvent::Cancelled(params));
952 std::future::ready(())
953 }
954
955 fn on_tool_list_changed(
956 &self,
957 _context: rmcp::service::NotificationContext<RoleClient>,
958 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
959 let _ = self.notifications.send(McpServerNotification::ToolsChanged);
960 let _ = self.events.send(McpServerEvent::ToolListChanged);
961 std::future::ready(())
962 }
963
964 fn on_resource_list_changed(
965 &self,
966 _context: rmcp::service::NotificationContext<RoleClient>,
967 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
968 let _ = self
969 .notifications
970 .send(McpServerNotification::ResourcesChanged);
971 let _ = self.events.send(McpServerEvent::ResourceListChanged);
972 std::future::ready(())
973 }
974
975 fn on_prompt_list_changed(
976 &self,
977 _context: rmcp::service::NotificationContext<RoleClient>,
978 ) -> impl Future<Output = ()> + rmcp::service::MaybeSendFuture + '_ {
979 let _ = self
980 .notifications
981 .send(McpServerNotification::PromptsChanged);
982 let _ = self.events.send(McpServerEvent::PromptListChanged);
983 std::future::ready(())
984 }
985
986 fn get_info(&self) -> rmcp_model::ClientInfo {
987 self.info.clone()
988 }
989}
990
991impl From<McpError> for rmcp_model::ErrorData {
992 fn from(error: McpError) -> Self {
993 rmcp_model::ErrorData::internal_error(error.to_string(), None)
994 }
995}
996
997type RmcpClientService = RunningService<RoleClient, McpClientHandler>;
998
999#[derive(Clone, Default)]
1008pub struct McpHandlerConfig {
1009 pub sampling: Option<Arc<dyn McpSamplingResponder>>,
1011 pub elicitation: Option<Arc<dyn McpElicitationResponder>>,
1013 pub roots: Option<Arc<dyn McpRootsProvider>>,
1015 pub auth: Option<Arc<dyn McpAuthResponder>>,
1020 pub events_capacity: Option<usize>,
1023}
1024
1025impl McpHandlerConfig {
1026 pub fn new() -> Self {
1028 Self::default()
1029 }
1030
1031 pub fn with_sampling_responder(mut self, responder: Arc<dyn McpSamplingResponder>) -> Self {
1033 self.sampling = Some(responder);
1034 self
1035 }
1036
1037 pub fn with_elicitation_responder(
1039 mut self,
1040 responder: Arc<dyn McpElicitationResponder>,
1041 ) -> Self {
1042 self.elicitation = Some(responder);
1043 self
1044 }
1045
1046 pub fn with_roots_provider(mut self, provider: Arc<dyn McpRootsProvider>) -> Self {
1048 self.roots = Some(provider);
1049 self
1050 }
1051
1052 pub fn with_auth_responder(mut self, responder: Arc<dyn McpAuthResponder>) -> Self {
1054 self.auth = Some(responder);
1055 self
1056 }
1057
1058 pub fn with_events_capacity(mut self, capacity: usize) -> Self {
1060 self.events_capacity = Some(capacity);
1061 self
1062 }
1063
1064 pub fn build(&self) -> (McpClientHandler, McpClientChannels) {
1068 self.build_inner(None)
1069 }
1070
1071 pub fn build_with(
1076 &self,
1077 events: broadcast::Sender<McpServerEvent>,
1078 ) -> (McpClientHandler, McpClientChannels) {
1079 self.build_inner(Some(events))
1080 }
1081
1082 fn build_inner(
1083 &self,
1084 events: Option<broadcast::Sender<McpServerEvent>>,
1085 ) -> (McpClientHandler, McpClientChannels) {
1086 let (notifications_tx, notifications_rx) = mpsc::unbounded_channel();
1087 let events_tx = events.unwrap_or_else(|| {
1088 let capacity = self.events_capacity.unwrap_or(DEFAULT_EVENTS_CAPACITY);
1089 let (tx, _) = broadcast::channel(capacity);
1090 tx
1091 });
1092
1093 let mut capabilities = rmcp_model::ClientCapabilities::default();
1094 if self.sampling.is_some() {
1095 capabilities.sampling = Some(McpSamplingCapability::default());
1096 }
1097 if self.elicitation.is_some() {
1098 capabilities.elicitation = Some(McpElicitationCapability {
1099 form: Some(McpFormElicitationCapability::default()),
1100 url: None,
1101 });
1102 }
1103 if self.roots.is_some() {
1104 capabilities.roots = Some(McpRootsCapabilities::default());
1105 }
1106
1107 let handler = McpClientHandler {
1108 info: rmcp_model::ClientInfo::new(
1109 capabilities,
1110 rmcp_model::Implementation::new("agentkit-mcp", env!("CARGO_PKG_VERSION"))
1111 .with_title("agentkit MCP client"),
1112 )
1113 .with_protocol_version(rmcp_model::ProtocolVersion::LATEST),
1114 notifications: notifications_tx,
1115 events: events_tx.clone(),
1116 sampling: self.sampling.clone(),
1117 elicitation: self.elicitation.clone(),
1118 roots: self.roots.clone(),
1119 };
1120
1121 (
1122 handler,
1123 McpClientChannels {
1124 notifications: notifications_rx,
1125 events: events_tx,
1126 },
1127 )
1128 }
1129}
1130
1131pub struct McpConnection {
1134 server_id: McpServerId,
1135 config: Option<McpServerConfig>,
1136 inner: Mutex<RmcpClientService>,
1137 peer: RwLock<Peer<RoleClient>>,
1138 auth: Mutex<Option<MetadataMap>>,
1139 notifications: Mutex<mpsc::UnboundedReceiver<McpServerNotification>>,
1140 events: broadcast::Sender<McpServerEvent>,
1141 handler_config: McpHandlerConfig,
1142 capabilities: McpServerCapabilities,
1143}
1144
1145#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1147pub enum McpOperationResult {
1148 Connected(McpDiscoverySnapshot),
1150 Tool(CallToolResult),
1152 Resource(ReadResourceResult),
1154 Prompt(GetPromptResult),
1156}
1157
1158impl McpConnection {
1159 pub async fn connect(config: &McpServerConfig) -> Result<Self, McpError> {
1164 Self::connect_with_auth(config, None, McpHandlerConfig::default()).await
1165 }
1166
1167 pub async fn connect_with_handler(
1169 config: &McpServerConfig,
1170 handler_config: McpHandlerConfig,
1171 ) -> Result<Self, McpError> {
1172 Self::connect_with_auth(config, None, handler_config).await
1173 }
1174
1175 async fn connect_with_auth(
1176 config: &McpServerConfig,
1177 auth: Option<&MetadataMap>,
1178 handler_config: McpHandlerConfig,
1179 ) -> Result<Self, McpError> {
1180 let (handler, channels) = handler_config.build();
1181 let McpClientChannels {
1182 notifications: notification_rx,
1183 events: events_tx,
1184 } = channels;
1185 let (service, capabilities) = match &config.transport {
1186 McpTransportBinding::Stdio(binding) => {
1187 connect_rmcp_stdio(config, binding, handler).await?
1188 }
1189 McpTransportBinding::StreamableHttp(binding) => {
1190 connect_rmcp_streamable_http(config, binding, auth, handler).await?
1191 }
1192 };
1193
1194 let peer = service.peer().clone();
1195 Ok(Self {
1196 server_id: config.id.clone(),
1197 config: Some(config.clone()),
1198 inner: Mutex::new(service),
1199 peer: RwLock::new(peer),
1200 auth: Mutex::new(auth.cloned()),
1201 notifications: Mutex::new(notification_rx),
1202 events: events_tx,
1203 handler_config,
1204 capabilities,
1205 })
1206 }
1207
1208 pub fn from_running_service(
1224 server_id: impl Into<McpServerId>,
1225 service: RmcpClientService,
1226 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1227 ) -> Self {
1228 let (events_tx, _) = broadcast::channel(DEFAULT_EVENTS_CAPACITY);
1229 Self::from_running_service_with_events(server_id, service, notifications, events_tx)
1230 }
1231
1232 pub fn from_running_service_with_events(
1239 server_id: impl Into<McpServerId>,
1240 service: RmcpClientService,
1241 notifications: mpsc::UnboundedReceiver<McpServerNotification>,
1242 events: broadcast::Sender<McpServerEvent>,
1243 ) -> Self {
1244 let capabilities = service
1245 .peer_info()
1246 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1247 .unwrap_or_default();
1248 let peer = service.peer().clone();
1249 Self {
1250 server_id: server_id.into(),
1251 config: None,
1252 inner: Mutex::new(service),
1253 peer: RwLock::new(peer),
1254 auth: Mutex::new(None),
1255 notifications: Mutex::new(notifications),
1256 events,
1257 handler_config: McpHandlerConfig::default(),
1258 capabilities,
1259 }
1260 }
1261
1262 async fn reconnect_inner(&self, auth: Option<&MetadataMap>) -> Result<(), McpError> {
1263 let Some(config) = self.config.clone() else {
1264 return Ok(());
1265 };
1266 let (handler, channels) = self.handler_config.build_with(self.events.clone());
1267 let McpClientChannels {
1268 notifications: notification_rx,
1269 ..
1270 } = channels;
1271 let (service, _capabilities) = match &config.transport {
1272 McpTransportBinding::Stdio(binding) => {
1273 connect_rmcp_stdio(&config, binding, handler).await?
1274 }
1275 McpTransportBinding::StreamableHttp(binding) => {
1276 connect_rmcp_streamable_http(&config, binding, auth, handler).await?
1277 }
1278 };
1279 let new_peer = service.peer().clone();
1280 *self.notifications.lock().await = notification_rx;
1281 *self.inner.lock().await = service;
1282 *self.peer.write().expect("MCP peer lock poisoned") = new_peer;
1283 Ok(())
1284 }
1285
1286 fn peer(&self) -> Peer<RoleClient> {
1287 self.peer.read().expect("MCP peer lock poisoned").clone()
1288 }
1289
1290 pub fn server_id(&self) -> &McpServerId {
1292 &self.server_id
1293 }
1294
1295 pub fn capabilities(&self) -> &McpServerCapabilities {
1297 &self.capabilities
1298 }
1299
1300 pub fn handler_config(&self) -> &McpHandlerConfig {
1304 &self.handler_config
1305 }
1306
1307 pub fn subscribe_events(&self) -> broadcast::Receiver<McpServerEvent> {
1316 self.events.subscribe()
1317 }
1318
1319 pub async fn subscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1324 let uri = uri.into();
1325 self.peer()
1326 .subscribe(rmcp_model::SubscribeRequestParams::new(uri.clone()))
1327 .await
1328 .map_err(|error| {
1329 rmcp_operation_error(
1330 &self.server_id,
1331 McpMethod::ResourcesSubscribe { uri },
1332 error,
1333 )
1334 })
1335 }
1336
1337 pub async fn unsubscribe_resource(&self, uri: impl Into<String>) -> Result<(), McpError> {
1339 let uri = uri.into();
1340 self.peer()
1341 .unsubscribe(rmcp_model::UnsubscribeRequestParams::new(uri.clone()))
1342 .await
1343 .map_err(|error| {
1344 rmcp_operation_error(
1345 &self.server_id,
1346 McpMethod::ResourcesUnsubscribe { uri },
1347 error,
1348 )
1349 })
1350 }
1351
1352 pub async fn set_logging_level(&self, level: McpLoggingLevel) -> Result<(), McpError> {
1355 self.peer()
1356 .set_level(rmcp_model::SetLevelRequestParams::new(level))
1357 .await
1358 .map_err(|error| {
1359 rmcp_operation_error(
1360 &self.server_id,
1361 McpMethod::LoggingSetLevel {
1362 level: format!("{level:?}"),
1363 },
1364 error,
1365 )
1366 })
1367 }
1368
1369 pub async fn notify_cancelled(
1372 &self,
1373 params: McpCancelledNotificationParam,
1374 ) -> Result<(), McpError> {
1375 self.peer()
1376 .notify_cancelled(params)
1377 .await
1378 .map_err(rmcp_service_error)
1379 }
1380
1381 pub async fn notify_roots_list_changed(&self) -> Result<(), McpError> {
1384 self.peer()
1385 .notify_roots_list_changed()
1386 .await
1387 .map_err(rmcp_service_error)
1388 }
1389
1390 pub async fn close(&self) -> Result<(), McpError> {
1395 let mut inner = self.inner.lock().await;
1396 inner
1397 .close()
1398 .await
1399 .map(|_| ())
1400 .map_err(|error| McpError::Transport(format!("rmcp service close failed: {error}")))
1401 }
1402
1403 pub async fn resolve_auth(&self, resolution: AuthResolution) -> Result<(), McpError> {
1406 let mut auth_slot = self.auth.lock().await;
1407 match resolution {
1408 AuthResolution::Provided { credentials, .. } => {
1409 *auth_slot = Some(credentials);
1410 }
1411 AuthResolution::Cancelled { .. } => {
1412 *auth_slot = None;
1413 }
1414 }
1415 let snapshot = auth_slot.clone();
1416 drop(auth_slot);
1417 if self.config.is_some() {
1421 self.reconnect_inner(snapshot.as_ref()).await?;
1422 }
1423 Ok(())
1424 }
1425
1426 pub async fn discover(&self) -> Result<McpDiscoverySnapshot, McpError> {
1428 let tools = async {
1429 match self.capabilities.tools {
1430 Some(_) => self.list_tools().await,
1431 None => Ok(Vec::new()),
1432 }
1433 };
1434 let resources = async {
1435 match self.capabilities.resources {
1436 Some(_) => self.list_resources().await,
1437 None => Ok(Vec::new()),
1438 }
1439 };
1440 let prompts = async {
1441 match self.capabilities.prompts {
1442 Some(_) => self.list_prompts().await,
1443 None => Ok(Vec::new()),
1444 }
1445 };
1446 let (tools, resources, prompts) = tokio::try_join!(tools, resources, prompts)?;
1447 Ok(McpDiscoverySnapshot {
1448 server_id: self.server_id.clone(),
1449 tools,
1450 resources,
1451 prompts,
1452 metadata: MetadataMap::new(),
1453 })
1454 }
1455
1456 async fn drain_notifications(&self) -> Vec<McpServerNotification> {
1457 let mut notifications = self.notifications.lock().await;
1458 let mut drained = Vec::new();
1459 while let Ok(notification) = notifications.try_recv() {
1460 drained.push(notification);
1461 }
1462 drained
1463 }
1464
1465 pub async fn list_tools(&self) -> Result<Vec<McpTool>, McpError> {
1467 self.peer()
1468 .list_all_tools()
1469 .await
1470 .map_err(rmcp_service_error)
1471 }
1472
1473 pub async fn list_resources(&self) -> Result<Vec<McpResource>, McpError> {
1475 self.peer()
1476 .list_all_resources()
1477 .await
1478 .map_err(rmcp_service_error)
1479 }
1480
1481 pub async fn list_prompts(&self) -> Result<Vec<McpPrompt>, McpError> {
1483 self.peer()
1484 .list_all_prompts()
1485 .await
1486 .map_err(rmcp_service_error)
1487 }
1488
1489 pub async fn call_tool(
1496 &self,
1497 name: &str,
1498 arguments: Value,
1499 ) -> Result<CallToolResult, McpError> {
1500 let arguments_for_auth = arguments.clone();
1501 let mut params = rmcp_model::CallToolRequestParams::new(name.to_string());
1502 if !arguments.is_null() {
1503 params =
1504 params.with_arguments(value_to_json_object(arguments, "tools/call arguments")?);
1505 }
1506 let name_owned = name.to_string();
1507 self.peer().call_tool(params).await.map_err(|error| {
1508 rmcp_operation_error(
1509 &self.server_id,
1510 McpMethod::ToolsCall {
1511 name: name_owned,
1512 arguments: arguments_for_auth,
1513 },
1514 error,
1515 )
1516 })
1517 }
1518
1519 pub async fn read_resource(&self, uri: &str) -> Result<ReadResourceResult, McpError> {
1526 let uri_owned = uri.to_string();
1527 self.peer()
1528 .read_resource(rmcp_model::ReadResourceRequestParams::new(uri))
1529 .await
1530 .map_err(|error| {
1531 rmcp_operation_error(
1532 &self.server_id,
1533 McpMethod::ResourcesRead { uri: uri_owned },
1534 error,
1535 )
1536 })
1537 }
1538
1539 pub async fn get_prompt(
1547 &self,
1548 name: &str,
1549 arguments: Value,
1550 ) -> Result<GetPromptResult, McpError> {
1551 let arguments_for_auth = arguments.clone();
1552 let name_owned = name.to_string();
1553 let mut params = rmcp_model::GetPromptRequestParams::new(name);
1554 if !arguments.is_null() {
1555 params =
1556 params.with_arguments(value_to_json_object(arguments, "prompts/get arguments")?);
1557 }
1558 self.peer().get_prompt(params).await.map_err(|error| {
1559 rmcp_operation_error(
1560 &self.server_id,
1561 McpMethod::PromptsGet {
1562 name: name_owned,
1563 arguments: arguments_for_auth,
1564 },
1565 error,
1566 )
1567 })
1568 }
1569}
1570
1571async fn connect_rmcp_stdio(
1572 config: &McpServerConfig,
1573 binding: &StdioTransportConfig,
1574 handler: McpClientHandler,
1575) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1576 let transport = TokioChildProcess::new(
1577 tokio::process::Command::new(&binding.command).configure(|command| {
1578 command.args(&binding.args);
1579 if let Some(cwd) = &binding.cwd {
1580 command.current_dir(cwd);
1581 }
1582 for (key, value) in &binding.env {
1583 command.env(key, value);
1584 }
1585 }),
1586 )
1587 .map_err(McpError::Io)?;
1588
1589 let service = handler
1590 .serve(transport)
1591 .await
1592 .map_err(|error| rmcp_initialize_error(config, error))?;
1593 let capabilities = service
1594 .peer_info()
1595 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1596 .unwrap_or_default();
1597
1598 Ok((service, capabilities))
1599}
1600
1601async fn connect_rmcp_streamable_http(
1602 config: &McpServerConfig,
1603 binding: &StreamableHttpTransportConfig,
1604 auth: Option<&MetadataMap>,
1605 handler: McpClientHandler,
1606) -> Result<(RmcpClientService, McpServerCapabilities), McpError> {
1607 let auth_header = auth
1608 .and_then(bearer_token_from_metadata)
1609 .or_else(|| binding.bearer_token.clone());
1610 let mut rmcp_config = RmcpStreamableHttpClientTransportConfig::with_uri(binding.url.clone());
1611 if let Some(auth_header) = auth_header {
1612 rmcp_config = rmcp_config.auth_header(auth_header);
1613 }
1614 rmcp_config = rmcp_config.custom_headers(binding.headers.iter().cloned().collect());
1615
1616 let result = match binding.http_client.as_ref() {
1617 Some(client) => {
1618 let transport = StreamableHttpClientTransport::with_client(
1619 DynHttpClient(client.clone()),
1620 rmcp_config,
1621 );
1622 handler.serve(transport).await
1623 }
1624 None => {
1625 let transport = StreamableHttpClientTransport::from_config(rmcp_config);
1626 handler.serve(transport).await
1627 }
1628 };
1629 let service = result.map_err(|error| rmcp_initialize_error(config, error))?;
1630 let capabilities = service
1631 .peer_info()
1632 .map(|info| rmcp_server_capabilities_to_agentkit(&info.capabilities))
1633 .unwrap_or_default();
1634
1635 Ok((service, capabilities))
1636}
1637
1638pub struct McpResourceHandle {
1640 connection: Arc<McpConnection>,
1641 descriptor: ResourceDescriptor,
1642}
1643
1644#[async_trait]
1645impl ResourceProvider for McpResourceHandle {
1646 async fn list_resources(&self) -> Result<Vec<ResourceDescriptor>, CapabilityError> {
1647 Ok(vec![self.descriptor.clone()])
1648 }
1649
1650 async fn read_resource(
1651 &self,
1652 id: &ResourceId,
1653 _ctx: &mut CapabilityContext<'_>,
1654 ) -> Result<ResourceContents, CapabilityError> {
1655 let result = self
1656 .connection
1657 .read_resource(&id.0)
1658 .await
1659 .map_err(|error| match error {
1660 McpError::AuthRequired(request) => {
1661 CapabilityError::Unavailable(format!("auth required: {:?}", request))
1662 }
1663 other => CapabilityError::ExecutionFailed(other.to_string()),
1664 })?;
1665 read_resource_result_to_capabilities(result)
1666 .map_err(|error| CapabilityError::ExecutionFailed(error.to_string()))
1667 }
1668}
1669
1670pub struct McpPromptHandle {
1672 connection: Arc<McpConnection>,
1673 descriptor: PromptDescriptor,
1674}
1675
1676#[async_trait]
1677impl PromptProvider for McpPromptHandle {
1678 async fn list_prompts(&self) -> Result<Vec<PromptDescriptor>, CapabilityError> {
1679 Ok(vec![self.descriptor.clone()])
1680 }
1681
1682 async fn get_prompt(
1683 &self,
1684 id: &PromptId,
1685 args: Value,
1686 _ctx: &mut CapabilityContext<'_>,
1687 ) -> Result<PromptContents, CapabilityError> {
1688 let result =
1689 self.connection
1690 .get_prompt(&id.0, args)
1691 .await
1692 .map_err(|error| match error {
1693 McpError::AuthRequired(request) => {
1694 CapabilityError::Unavailable(format!("auth required: {:?}", request))
1695 }
1696 other => CapabilityError::ExecutionFailed(other.to_string()),
1697 })?;
1698 Ok(get_prompt_result_to_capabilities(result))
1699 }
1700}
1701
1702pub struct McpCapabilityProvider {
1710 invocables: Vec<Arc<dyn Invocable>>,
1711 resources: Vec<Arc<dyn ResourceProvider>>,
1712 prompts: Vec<Arc<dyn PromptProvider>>,
1713}
1714
1715impl McpCapabilityProvider {
1716 pub fn from_snapshot(connection: Arc<McpConnection>, snapshot: &McpDiscoverySnapshot) -> Self {
1719 Self::from_snapshot_with_namespace(connection, snapshot, &McpToolNamespace::Default)
1720 }
1721
1722 pub fn from_snapshot_with_namespace(
1724 connection: Arc<McpConnection>,
1725 snapshot: &McpDiscoverySnapshot,
1726 namespace: &McpToolNamespace,
1727 ) -> Self {
1728 let server_id = connection.server_id().clone();
1729 let registry =
1730 snapshot
1731 .tools
1732 .iter()
1733 .cloned()
1734 .fold(ToolRegistry::new(), |registry, tool| {
1735 registry.with(McpToolAdapter::with_namespace(
1736 &server_id,
1737 connection.clone(),
1738 tool,
1739 namespace,
1740 ))
1741 });
1742 let permissions: Arc<dyn PermissionChecker> = Arc::new(AllowAllPermissions);
1743 let resources_arc: Arc<dyn agentkit_tools_core::ToolResources> = Arc::new(());
1744 let invocables =
1745 ToolCapabilityProvider::from_registry(®istry, permissions, resources_arc)
1746 .invocables();
1747
1748 let resources = snapshot
1749 .resources
1750 .iter()
1751 .cloned()
1752 .map(|resource| {
1753 Arc::new(McpResourceHandle {
1754 connection: connection.clone(),
1755 descriptor: resource_descriptor_from_rmcp(resource),
1756 }) as Arc<dyn ResourceProvider>
1757 })
1758 .collect();
1759
1760 let prompts = snapshot
1761 .prompts
1762 .iter()
1763 .cloned()
1764 .map(|prompt| {
1765 Arc::new(McpPromptHandle {
1766 connection: connection.clone(),
1767 descriptor: prompt_descriptor_from_rmcp(prompt),
1768 }) as Arc<dyn PromptProvider>
1769 })
1770 .collect();
1771
1772 Self {
1773 invocables,
1774 resources,
1775 prompts,
1776 }
1777 }
1778
1779 pub fn merge<I>(providers: I) -> Self
1781 where
1782 I: IntoIterator<Item = Self>,
1783 {
1784 let mut invocables = Vec::new();
1785 let mut resources = Vec::new();
1786 let mut prompts = Vec::new();
1787
1788 for provider in providers {
1789 invocables.extend(provider.invocables);
1790 resources.extend(provider.resources);
1791 prompts.extend(provider.prompts);
1792 }
1793
1794 Self {
1795 invocables,
1796 resources,
1797 prompts,
1798 }
1799 }
1800
1801 pub async fn connect(
1803 config: &McpServerConfig,
1804 ) -> Result<(Arc<McpConnection>, Self, McpDiscoverySnapshot), McpError> {
1805 let connection = Arc::new(McpConnection::connect(config).await?);
1806 let snapshot = connection.discover().await?;
1807 let provider = Self::from_snapshot(connection.clone(), &snapshot);
1808
1809 Ok((connection, provider, snapshot))
1810 }
1811}
1812
1813impl CapabilityProvider for McpCapabilityProvider {
1814 fn invocables(&self) -> Vec<Arc<dyn Invocable>> {
1815 self.invocables.clone()
1816 }
1817
1818 fn resources(&self) -> Vec<Arc<dyn ResourceProvider>> {
1819 self.resources.clone()
1820 }
1821
1822 fn prompts(&self) -> Vec<Arc<dyn PromptProvider>> {
1823 self.prompts.clone()
1824 }
1825}
1826
1827#[derive(Clone)]
1829pub struct McpServerHandle {
1830 config: McpServerConfig,
1831 connection: Arc<McpConnection>,
1832 snapshot: McpDiscoverySnapshot,
1833 namespace: McpToolNamespace,
1834}
1835
1836impl McpServerHandle {
1837 pub fn config(&self) -> &McpServerConfig {
1839 &self.config
1840 }
1841
1842 pub fn server_id(&self) -> &McpServerId {
1844 self.connection.server_id()
1845 }
1846
1847 pub fn connection(&self) -> Arc<McpConnection> {
1849 self.connection.clone()
1850 }
1851
1852 pub fn snapshot(&self) -> &McpDiscoverySnapshot {
1854 &self.snapshot
1855 }
1856
1857 pub fn namespace(&self) -> &McpToolNamespace {
1859 &self.namespace
1860 }
1861
1862 pub fn tool_registry(&self) -> ToolRegistry {
1864 self.snapshot
1865 .tools
1866 .iter()
1867 .cloned()
1868 .fold(ToolRegistry::new(), |registry, tool| {
1869 registry.with(McpToolAdapter::with_namespace(
1870 self.server_id(),
1871 self.connection.clone(),
1872 tool,
1873 &self.namespace,
1874 ))
1875 })
1876 }
1877
1878 pub fn capability_provider(&self) -> McpCapabilityProvider {
1880 McpCapabilityProvider::from_snapshot_with_namespace(
1881 self.connection.clone(),
1882 &self.snapshot,
1883 &self.namespace,
1884 )
1885 }
1886}
1887
1888pub struct McpServerManager {
1890 configs: BTreeMap<McpServerId, McpServerConfig>,
1891 connections: BTreeMap<McpServerId, McpServerHandle>,
1892 auth: BTreeMap<McpServerId, MetadataMap>,
1893 catalog_tx: broadcast::Sender<McpCatalogEvent>,
1894 namespace: McpToolNamespace,
1895 handler_config: McpHandlerConfig,
1896 catalog_writer: CatalogWriter,
1897 server_tools: BTreeMap<McpServerId, BTreeSet<ToolName>>,
1902}
1903
1904impl Default for McpServerManager {
1905 fn default() -> Self {
1906 let (catalog_tx, _) = broadcast::channel(128);
1907 let (catalog_writer, _) = dynamic_catalog("mcp");
1908 Self {
1909 configs: BTreeMap::new(),
1910 connections: BTreeMap::new(),
1911 auth: BTreeMap::new(),
1912 catalog_tx,
1913 namespace: McpToolNamespace::Default,
1914 handler_config: McpHandlerConfig::default(),
1915 catalog_writer,
1916 server_tools: BTreeMap::new(),
1917 }
1918 }
1919}
1920
1921impl McpServerManager {
1922 pub fn new() -> Self {
1924 Self::default()
1925 }
1926
1927 pub fn with_namespace(mut self, namespace: McpToolNamespace) -> Self {
1929 self.namespace = namespace;
1930 self
1931 }
1932
1933 pub fn set_namespace(&mut self, namespace: McpToolNamespace) -> &mut Self {
1935 self.namespace = namespace;
1936 self
1937 }
1938
1939 pub fn namespace(&self) -> &McpToolNamespace {
1941 &self.namespace
1942 }
1943
1944 pub fn with_handler_config(mut self, handler_config: McpHandlerConfig) -> Self {
1947 self.handler_config = handler_config;
1948 self
1949 }
1950
1951 pub fn set_handler_config(&mut self, handler_config: McpHandlerConfig) -> &mut Self {
1953 self.handler_config = handler_config;
1954 self
1955 }
1956
1957 pub fn handler_config(&self) -> &McpHandlerConfig {
1959 &self.handler_config
1960 }
1961
1962 pub fn with_server(mut self, config: McpServerConfig) -> Self {
1964 self.register_server(config);
1965 self
1966 }
1967
1968 pub fn register_server(&mut self, config: McpServerConfig) -> &mut Self {
1970 self.configs.insert(config.id.clone(), config);
1971 self
1972 }
1973
1974 pub fn connected_server(&self, server_id: &McpServerId) -> Option<&McpServerHandle> {
1976 self.connections.get(server_id)
1977 }
1978
1979 pub fn connected_servers(&self) -> Vec<&McpServerHandle> {
1981 self.connections.values().collect()
1982 }
1983
1984 pub fn subscribe_catalog_events(&self) -> broadcast::Receiver<McpCatalogEvent> {
1986 self.catalog_tx.subscribe()
1987 }
1988
1989 fn emit_catalog_event(&self, event: McpCatalogEvent) {
1990 let _ = self.catalog_tx.send(event);
1991 }
1992
1993 pub async fn connect_server(
1995 &mut self,
1996 server_id: &McpServerId,
1997 ) -> Result<McpServerHandle, McpError> {
1998 let config = self
1999 .configs
2000 .get(server_id)
2001 .cloned()
2002 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2003 let connection = Arc::new(
2004 McpConnection::connect_with_auth(
2005 &config,
2006 self.auth.get(server_id),
2007 self.handler_config.clone(),
2008 )
2009 .await?,
2010 );
2011 let snapshot = connection.discover().await?;
2012 let handle = McpServerHandle {
2013 config,
2014 connection,
2015 snapshot,
2016 namespace: self.namespace.clone(),
2017 };
2018 self.connections.insert(server_id.clone(), handle.clone());
2019 self.register_server_tools(server_id, &handle.snapshot);
2020 self.emit_catalog_event(McpCatalogEvent::ServerConnected {
2021 server_id: server_id.clone(),
2022 });
2023 Ok(handle)
2024 }
2025
2026 pub async fn connect_all(&mut self) -> Result<Vec<McpServerHandle>, McpError> {
2028 let plans: Vec<(McpServerId, McpServerConfig, Option<MetadataMap>)> = self
2029 .configs
2030 .iter()
2031 .map(|(id, cfg)| (id.clone(), cfg.clone(), self.auth.get(id).cloned()))
2032 .collect();
2033 let handler_config = self.handler_config.clone();
2034 let namespace = self.namespace.clone();
2035
2036 let futures = plans.into_iter().map(|(server_id, config, auth)| {
2037 let handler_config = handler_config.clone();
2038 let namespace = namespace.clone();
2039 async move {
2040 let connection = Arc::new(
2041 McpConnection::connect_with_auth(&config, auth.as_ref(), handler_config)
2042 .await?,
2043 );
2044 let snapshot = connection.discover().await?;
2045 Ok::<(McpServerId, McpServerHandle), McpError>((
2046 server_id,
2047 McpServerHandle {
2048 config,
2049 connection,
2050 snapshot,
2051 namespace,
2052 },
2053 ))
2054 }
2055 });
2056
2057 let results = try_join_all(futures).await?;
2058 let mut handles = Vec::with_capacity(results.len());
2059 let mut connected: Vec<(McpServerId, McpDiscoverySnapshot)> =
2060 Vec::with_capacity(results.len());
2061 for (server_id, handle) in results {
2062 connected.push((server_id.clone(), handle.snapshot.clone()));
2063 self.connections.insert(server_id, handle.clone());
2064 handles.push(handle);
2065 }
2066 for (server_id, snapshot) in &connected {
2067 self.register_server_tools(server_id, snapshot);
2068 }
2069 for (server_id, _) in connected {
2070 self.emit_catalog_event(McpCatalogEvent::ServerConnected { server_id });
2071 }
2072 Ok(handles)
2073 }
2074
2075 pub async fn refresh_server(
2077 &mut self,
2078 server_id: &McpServerId,
2079 ) -> Result<McpDiscoverySnapshot, McpError> {
2080 let handle = self
2081 .connections
2082 .get_mut(server_id)
2083 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2084 let previous = handle.snapshot.clone();
2085 let snapshot = match handle.connection.discover().await {
2086 Ok(snapshot) => snapshot,
2087 Err(error) => {
2088 self.emit_catalog_event(McpCatalogEvent::RefreshFailed {
2089 server_id: server_id.clone(),
2090 message: error.to_string(),
2091 });
2092 return Err(error);
2093 }
2094 };
2095 handle.snapshot = snapshot.clone();
2096 let events = diff_discovery_snapshots(server_id, &previous, &snapshot);
2097 if !events.is_empty() {
2098 self.apply_catalog_events(server_id, &snapshot, &events);
2099 for event in events {
2100 self.emit_catalog_event(event);
2101 }
2102 }
2103 Ok(snapshot)
2104 }
2105
2106 pub async fn refresh_changed_catalogs(&mut self) -> Result<Vec<McpCatalogEvent>, McpError> {
2108 let server_ids = self.connections.keys().cloned().collect::<Vec<_>>();
2109 let mut emitted = Vec::new();
2110
2111 for server_id in server_ids {
2112 let Some(connection) = self
2113 .connections
2114 .get(&server_id)
2115 .map(McpServerHandle::connection)
2116 else {
2117 continue;
2118 };
2119 let notifications = connection.drain_notifications().await;
2120 if notifications.is_empty() {
2121 continue;
2122 }
2123
2124 let handle = self
2125 .connections
2126 .get_mut(&server_id)
2127 .ok_or_else(|| McpError::UnknownServer(server_id.to_string()))?;
2128 let previous = handle.snapshot.clone();
2129 let snapshot = match handle.connection.discover().await {
2130 Ok(snapshot) => snapshot,
2131 Err(error) => {
2132 let event = McpCatalogEvent::RefreshFailed {
2133 server_id: server_id.clone(),
2134 message: error.to_string(),
2135 };
2136 self.emit_catalog_event(event.clone());
2137 emitted.push(event);
2138 return Err(error);
2139 }
2140 };
2141 handle.snapshot = snapshot.clone();
2142 let events = diff_discovery_snapshots(&server_id, &previous, &snapshot);
2143 if !events.is_empty() {
2144 self.apply_catalog_events(&server_id, &snapshot, &events);
2145 for event in events {
2146 self.emit_catalog_event(event.clone());
2147 emitted.push(event);
2148 }
2149 }
2150 }
2151
2152 Ok(emitted)
2153 }
2154
2155 pub async fn disconnect_server(&mut self, server_id: &McpServerId) -> Result<(), McpError> {
2157 let Some(handle) = self.connections.remove(server_id) else {
2158 return Err(McpError::UnknownServer(server_id.to_string()));
2159 };
2160 handle.connection.close().await?;
2161 self.unregister_server_tools(server_id);
2162 self.emit_catalog_event(McpCatalogEvent::ServerDisconnected {
2163 server_id: server_id.clone(),
2164 });
2165 Ok(())
2166 }
2167
2168 pub async fn resolve_auth(&mut self, resolution: AuthResolution) -> Result<(), McpError> {
2170 let server_id = resolution
2171 .request()
2172 .server_id()
2173 .ok_or_else(|| McpError::AuthResolution("auth resolution missing server id".into()))?;
2174 let server_id = McpServerId::new(server_id);
2175 match &resolution {
2176 AuthResolution::Provided { credentials, .. } => {
2177 self.auth.insert(server_id.clone(), credentials.clone());
2178 }
2179 AuthResolution::Cancelled { .. } => {
2180 self.auth.remove(&server_id);
2181 }
2182 }
2183
2184 if let Some(handle) = self.connections.get(&server_id) {
2185 handle.connection.resolve_auth(resolution).await?;
2186 } else if !self.configs.contains_key(&server_id) {
2187 return Err(McpError::UnknownServer(server_id.to_string()));
2188 }
2189 self.emit_catalog_event(McpCatalogEvent::AuthChanged { server_id });
2190 Ok(())
2191 }
2192
2193 pub fn tool_registry(&self) -> ToolRegistry {
2198 self.connections
2199 .values()
2200 .fold(ToolRegistry::new(), |mut registry, handle| {
2201 for tool in handle.snapshot.tools.iter().cloned() {
2202 registry.register(McpToolAdapter::with_namespace(
2203 handle.server_id(),
2204 handle.connection.clone(),
2205 tool,
2206 &self.namespace,
2207 ));
2208 }
2209 registry
2210 })
2211 }
2212
2213 pub fn source(&self) -> CatalogReader {
2225 self.catalog_writer.reader()
2226 }
2227
2228 fn apply_catalog_events(
2233 &mut self,
2234 server_id: &McpServerId,
2235 snapshot: &McpDiscoverySnapshot,
2236 events: &[McpCatalogEvent],
2237 ) {
2238 for event in events {
2239 if let McpCatalogEvent::ToolsChanged {
2240 added,
2241 removed,
2242 changed,
2243 ..
2244 } = event
2245 {
2246 self.apply_server_tool_diff(server_id, snapshot, added, removed, changed);
2247 }
2248 }
2249 }
2250
2251 fn register_server_tools(&mut self, server_id: &McpServerId, snapshot: &McpDiscoverySnapshot) {
2255 let connection = match self.connections.get(server_id) {
2256 Some(handle) => handle.connection.clone(),
2257 None => return,
2258 };
2259 let previous = self.server_tools.remove(server_id).unwrap_or_default();
2260 let mut names = BTreeSet::new();
2261 for tool in &snapshot.tools {
2262 let adapter = McpToolAdapter::with_namespace(
2263 server_id,
2264 connection.clone(),
2265 tool.clone(),
2266 &self.namespace,
2267 );
2268 names.insert(adapter.spec().name.clone());
2269 self.catalog_writer.upsert(Arc::new(adapter));
2270 }
2271 for stale in previous.difference(&names) {
2272 self.catalog_writer.remove(stale);
2273 }
2274 self.server_tools.insert(server_id.clone(), names);
2275 }
2276
2277 fn unregister_server_tools(&mut self, server_id: &McpServerId) {
2280 let Some(names) = self.server_tools.remove(server_id) else {
2281 return;
2282 };
2283 for name in names {
2284 self.catalog_writer.remove(&name);
2285 }
2286 }
2287
2288 fn apply_server_tool_diff(
2292 &mut self,
2293 server_id: &McpServerId,
2294 snapshot: &McpDiscoverySnapshot,
2295 added: &[String],
2296 removed: &[String],
2297 changed: &[String],
2298 ) {
2299 let connection = match self.connections.get(server_id) {
2300 Some(handle) => handle.connection.clone(),
2301 None => return,
2302 };
2303 let names = self.server_tools.entry(server_id.clone()).or_default();
2304
2305 for raw_name in removed {
2306 let agentkit_name = ToolName::new(self.namespace.apply(server_id, raw_name));
2307 if names.remove(&agentkit_name) {
2308 self.catalog_writer.remove(&agentkit_name);
2309 }
2310 }
2311
2312 let upsert_one = |raw_name: &str| -> Option<(ToolName, McpToolAdapter)> {
2313 let tool = snapshot
2314 .tools
2315 .iter()
2316 .find(|tool| tool.name.as_ref() == raw_name)?
2317 .clone();
2318 let adapter = McpToolAdapter::with_namespace(
2319 server_id,
2320 connection.clone(),
2321 tool,
2322 &self.namespace,
2323 );
2324 Some((adapter.spec().name.clone(), adapter))
2325 };
2326
2327 for raw_name in added.iter().chain(changed.iter()) {
2328 if let Some((agentkit_name, adapter)) = upsert_one(raw_name) {
2329 names.insert(agentkit_name);
2330 self.catalog_writer.upsert(Arc::new(adapter));
2331 }
2332 }
2333 }
2334
2335 pub fn capability_provider(&self) -> McpCapabilityProvider {
2337 McpCapabilityProvider::merge(
2338 self.connections
2339 .values()
2340 .map(McpServerHandle::capability_provider),
2341 )
2342 }
2343}
2344
2345fn diff_discovery_snapshots(
2346 server_id: &McpServerId,
2347 previous: &McpDiscoverySnapshot,
2348 current: &McpDiscoverySnapshot,
2349) -> Vec<McpCatalogEvent> {
2350 let mut events = Vec::new();
2351 let (added, removed, changed) = diff_named_items(
2352 previous.tools.iter().map(|item| (item.name.as_ref(), item)),
2353 current.tools.iter().map(|item| (item.name.as_ref(), item)),
2354 );
2355 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2356 events.push(McpCatalogEvent::ToolsChanged {
2357 server_id: server_id.clone(),
2358 added,
2359 removed,
2360 changed,
2361 });
2362 }
2363
2364 let (added, removed, changed) = diff_named_items(
2365 previous
2366 .resources
2367 .iter()
2368 .map(|item| (item.uri.as_str(), item)),
2369 current
2370 .resources
2371 .iter()
2372 .map(|item| (item.uri.as_str(), item)),
2373 );
2374 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2375 events.push(McpCatalogEvent::ResourcesChanged {
2376 server_id: server_id.clone(),
2377 added,
2378 removed,
2379 changed,
2380 });
2381 }
2382
2383 let (added, removed, changed) = diff_named_items(
2384 previous
2385 .prompts
2386 .iter()
2387 .map(|item| (item.name.as_str(), item)),
2388 current
2389 .prompts
2390 .iter()
2391 .map(|item| (item.name.as_str(), item)),
2392 );
2393 if !added.is_empty() || !removed.is_empty() || !changed.is_empty() {
2394 events.push(McpCatalogEvent::PromptsChanged {
2395 server_id: server_id.clone(),
2396 added,
2397 removed,
2398 changed,
2399 });
2400 }
2401
2402 events
2403}
2404
2405fn diff_named_items<'a, T>(
2409 previous: impl IntoIterator<Item = (&'a str, &'a T)>,
2410 current: impl IntoIterator<Item = (&'a str, &'a T)>,
2411) -> (Vec<String>, Vec<String>, Vec<String>)
2412where
2413 T: PartialEq + 'a,
2414{
2415 let mut prev: Vec<(&str, &T)> = previous.into_iter().collect();
2416 let mut curr: Vec<(&str, &T)> = current.into_iter().collect();
2417 prev.sort_unstable_by_key(|(name, _)| *name);
2418 curr.sort_unstable_by_key(|(name, _)| *name);
2419
2420 let mut added = Vec::new();
2421 let mut removed = Vec::new();
2422 let mut changed = Vec::new();
2423 let (mut i, mut j) = (0, 0);
2424 while i < prev.len() && j < curr.len() {
2425 match prev[i].0.cmp(curr[j].0) {
2426 std::cmp::Ordering::Less => {
2427 removed.push(prev[i].0.to_string());
2428 i += 1;
2429 }
2430 std::cmp::Ordering::Greater => {
2431 added.push(curr[j].0.to_string());
2432 j += 1;
2433 }
2434 std::cmp::Ordering::Equal => {
2435 if prev[i].1 != curr[j].1 {
2436 changed.push(curr[j].0.to_string());
2437 }
2438 i += 1;
2439 j += 1;
2440 }
2441 }
2442 }
2443 while i < prev.len() {
2444 removed.push(prev[i].0.to_string());
2445 i += 1;
2446 }
2447 while j < curr.len() {
2448 added.push(curr[j].0.to_string());
2449 j += 1;
2450 }
2451
2452 (added, removed, changed)
2453}
2454
2455pub struct McpToolAdapter {
2457 tool_name: String,
2458 connection: Arc<McpConnection>,
2459 spec: ToolSpec,
2460}
2461
2462impl McpToolAdapter {
2463 pub fn new(server_id: &McpServerId, connection: Arc<McpConnection>, tool: McpTool) -> Self {
2466 Self::with_namespace(server_id, connection, tool, &McpToolNamespace::Default)
2467 }
2468
2469 pub fn with_namespace(
2471 server_id: &McpServerId,
2472 connection: Arc<McpConnection>,
2473 tool: McpTool,
2474 namespace: &McpToolNamespace,
2475 ) -> Self {
2476 let spec = tool_spec_from_tool(server_id, &tool, namespace);
2477 Self {
2478 tool_name: tool.name.into_owned(),
2479 connection,
2480 spec,
2481 }
2482 }
2483}
2484
2485#[async_trait]
2486impl Tool for McpToolAdapter {
2487 fn spec(&self) -> &ToolSpec {
2488 &self.spec
2489 }
2490
2491 async fn invoke(
2492 &self,
2493 request: ToolRequest,
2494 _ctx: &mut ToolContext<'_>,
2495 ) -> Result<ToolResult, ToolError> {
2496 let input = request.input;
2497 let result = match self
2498 .connection
2499 .call_tool(&self.tool_name, input.clone())
2500 .await
2501 {
2502 Ok(result) => result,
2503 Err(McpError::AuthRequired(auth_request)) => {
2504 let responder = self
2505 .connection
2506 .handler_config()
2507 .auth
2508 .clone()
2509 .ok_or_else(|| {
2510 ToolError::ExecutionFailed(
2511 "MCP server requires auth but no McpAuthResponder is registered".into(),
2512 )
2513 })?;
2514 let resolution = responder.resolve(*auth_request).await.map_err(|error| {
2515 ToolError::ExecutionFailed(format!("auth responder failed: {error}"))
2516 })?;
2517 match &resolution {
2518 AuthResolution::Provided { .. } => {
2519 self.connection
2520 .resolve_auth(resolution.clone())
2521 .await
2522 .map_err(|error| {
2523 ToolError::ExecutionFailed(format!(
2524 "applying auth resolution failed: {error}"
2525 ))
2526 })?;
2527 }
2528 AuthResolution::Cancelled { .. } => {
2529 return Err(ToolError::ExecutionFailed(
2530 "user cancelled MCP auth flow".into(),
2531 ));
2532 }
2533 }
2534 self.connection
2535 .call_tool(&self.tool_name, input)
2536 .await
2537 .map_err(|err| match err {
2538 McpError::AuthRequired(req) => ToolError::ExecutionFailed(format!(
2539 "MCP auth challenge unresolved after retry: {}",
2540 req.id
2541 )),
2542 other => ToolError::ExecutionFailed(other.to_string()),
2543 })?
2544 }
2545 Err(other) => return Err(ToolError::ExecutionFailed(other.to_string())),
2546 };
2547
2548 let is_error = result.is_error.unwrap_or(false);
2549 Ok(ToolResult {
2550 result: ToolResultPart {
2551 call_id: request.call_id,
2552 output: call_tool_result_to_tool_output(result),
2553 is_error,
2554 metadata: MetadataMap::new(),
2555 },
2556 duration: None,
2557 metadata: MetadataMap::new(),
2558 })
2559 }
2560}
2561
2562fn rmcp_server_capabilities_to_agentkit(
2563 capabilities: &rmcp_model::ServerCapabilities,
2564) -> McpServerCapabilities {
2565 McpServerCapabilities {
2566 tools: capabilities.tools.as_ref().map(|tools| ToolsCapability {
2567 list_changed: tools.list_changed,
2568 }),
2569 resources: capabilities
2570 .resources
2571 .as_ref()
2572 .map(|resources| ResourcesCapability {
2573 subscribe: resources.subscribe,
2574 list_changed: resources.list_changed,
2575 }),
2576 prompts: capabilities
2577 .prompts
2578 .as_ref()
2579 .map(|prompts| PromptsCapability {
2580 list_changed: prompts.list_changed,
2581 }),
2582 logging: capabilities.logging.as_ref().map(|_| LoggingCapability {}),
2583 }
2584}
2585
2586fn tool_spec_from_tool(
2587 server_id: &McpServerId,
2588 tool: &McpTool,
2589 namespace: &McpToolNamespace,
2590) -> ToolSpec {
2591 ToolSpec {
2592 name: ToolName::new(namespace.apply(server_id, &tool.name)),
2593 description: tool
2594 .description
2595 .as_ref()
2596 .map(|d| d.to_string())
2597 .unwrap_or_else(|| tool.name.to_string()),
2598 input_schema: Value::Object((*tool.input_schema).clone()),
2599 annotations: tool_annotations_from_rmcp(tool.annotations.as_ref()),
2600 metadata: MetadataMap::new(),
2601 }
2602}
2603
2604fn tool_annotations_from_rmcp(annotations: Option<&McpToolAnnotations>) -> ToolAnnotations {
2605 let Some(annotations) = annotations else {
2606 return ToolAnnotations::default();
2607 };
2608 ToolAnnotations {
2615 read_only_hint: annotations.read_only_hint.unwrap_or(false),
2616 destructive_hint: annotations.destructive_hint.unwrap_or(false),
2617 idempotent_hint: annotations.idempotent_hint.unwrap_or(false),
2618 needs_approval_hint: false,
2619 supports_streaming_hint: false,
2620 }
2621}
2622
2623fn resource_descriptor_from_rmcp(resource: McpResource) -> ResourceDescriptor {
2624 let raw = resource.raw;
2625 ResourceDescriptor {
2626 id: ResourceId::new(raw.uri),
2627 name: raw.name,
2628 description: raw.description,
2629 mime_type: raw.mime_type,
2630 metadata: MetadataMap::new(),
2631 }
2632}
2633
2634fn prompt_descriptor_from_rmcp(prompt: McpPrompt) -> PromptDescriptor {
2635 let arguments = prompt.arguments.unwrap_or_default();
2636 let mut required = Vec::new();
2637 let properties = arguments
2638 .into_iter()
2639 .map(|argument| {
2640 let mut schema = serde_json::Map::new();
2641 schema.insert("type".into(), Value::String("string".into()));
2642 if let Some(description) = argument.description {
2643 schema.insert("description".into(), Value::String(description));
2644 }
2645 if argument.required.unwrap_or(false) {
2646 required.push(Value::String(argument.name.clone()));
2647 }
2648 (argument.name, Value::Object(schema))
2649 })
2650 .collect::<serde_json::Map<String, Value>>();
2651 let mut input_schema = serde_json::Map::new();
2652 input_schema.insert("type".into(), Value::String("object".into()));
2653 input_schema.insert("properties".into(), Value::Object(properties));
2654 if !required.is_empty() {
2655 input_schema.insert("required".into(), Value::Array(required));
2656 }
2657
2658 PromptDescriptor {
2659 id: PromptId::new(prompt.name.clone()),
2660 name: prompt.name,
2661 description: prompt.description,
2662 input_schema: Value::Object(input_schema),
2663 metadata: MetadataMap::new(),
2664 }
2665}
2666
2667fn read_resource_result_to_capabilities(
2668 result: ReadResourceResult,
2669) -> Result<ResourceContents, McpError> {
2670 let content = result
2671 .contents
2672 .into_iter()
2673 .next()
2674 .ok_or_else(|| McpError::Protocol("resources/read returned no contents".into()))?;
2675 Ok(resource_contents_to_capabilities(content))
2676}
2677
2678fn resource_contents_to_capabilities(content: McpResourceContents) -> ResourceContents {
2679 let mut metadata = MetadataMap::new();
2680 let data = match content {
2681 McpResourceContents::TextResourceContents {
2682 text, mime_type, ..
2683 } => {
2684 if let Some(mime) = mime_type {
2685 metadata.insert("mime_type".into(), Value::String(mime));
2686 }
2687 DataRef::InlineText(text)
2688 }
2689 McpResourceContents::BlobResourceContents {
2690 blob,
2691 mime_type,
2692 uri,
2693 ..
2694 } => {
2695 if let Some(mime) = mime_type {
2696 metadata.insert("mime_type".into(), Value::String(mime));
2697 }
2698 metadata.insert("uri".into(), Value::String(uri));
2699 DataRef::InlineText(blob)
2701 }
2702 };
2703 ResourceContents { data, metadata }
2704}
2705
2706fn get_prompt_result_to_capabilities(result: GetPromptResult) -> PromptContents {
2707 let items = result
2708 .messages
2709 .into_iter()
2710 .map(prompt_message_to_item)
2711 .collect();
2712 let mut metadata = MetadataMap::new();
2713 if let Some(description) = result.description {
2714 metadata.insert("description".into(), Value::String(description));
2715 }
2716 PromptContents { items, metadata }
2717}
2718
2719fn prompt_message_to_item(message: PromptMessage) -> Item {
2720 let kind = match message.role {
2721 PromptMessageRole::Assistant => ItemKind::Assistant,
2722 PromptMessageRole::User => ItemKind::User,
2723 };
2724 Item {
2725 id: None,
2726 kind,
2727 parts: vec![prompt_message_content_to_part(message.content)],
2728 metadata: MetadataMap::new(),
2729 usage: None,
2730 finish_reason: None,
2731 created_at: None,
2732 }
2733}
2734
2735fn prompt_message_content_to_part(content: PromptMessageContent) -> Part {
2736 match content {
2737 PromptMessageContent::Text { text } => Part::Text(TextPart::new(text)),
2738 PromptMessageContent::Image { image } => Part::Media(MediaPart::new(
2739 Modality::Image,
2740 image.mime_type.clone(),
2741 DataRef::InlineText(image.data.clone()),
2742 )),
2743 PromptMessageContent::Resource { resource } => {
2744 let agentkit_resource = resource_contents_to_capabilities(resource.resource.clone());
2745 agentkit_part_from_resource(agentkit_resource)
2746 }
2747 PromptMessageContent::ResourceLink { link } => Part::Text(TextPart::new(link.uri.clone())),
2748 }
2749}
2750
2751fn agentkit_part_from_resource(resource: ResourceContents) -> Part {
2752 let mime = resource
2753 .metadata
2754 .get("mime_type")
2755 .and_then(Value::as_str)
2756 .unwrap_or("text/plain")
2757 .to_string();
2758 Part::Media(MediaPart::new(Modality::Binary, mime, resource.data))
2759}
2760
2761fn call_tool_result_to_tool_output(result: CallToolResult) -> ToolOutput {
2762 if let Some(structured) = result.structured_content {
2763 return ToolOutput::Structured(structured);
2764 }
2765 let parts = call_tool_content_to_parts(result.content);
2766 if parts.iter().all(|part| matches!(part, Part::Text(_))) {
2767 let text = parts
2768 .iter()
2769 .filter_map(|part| match part {
2770 Part::Text(text) => Some(text.text.clone()),
2771 _ => None,
2772 })
2773 .collect::<Vec<_>>()
2774 .join("\n");
2775 ToolOutput::Text(text)
2776 } else {
2777 ToolOutput::Parts(parts)
2778 }
2779}
2780
2781fn call_tool_content_to_parts(contents: Vec<Content>) -> Vec<Part> {
2782 contents.into_iter().map(content_to_part).collect()
2783}
2784
2785fn content_to_part(content: Content) -> Part {
2786 match content.raw {
2787 RawContent::Text(text) => Part::Text(TextPart::new(text.text)),
2788 RawContent::Image(image) => Part::Media(MediaPart::new(
2789 Modality::Image,
2790 image.mime_type,
2791 DataRef::InlineText(image.data),
2792 )),
2793 RawContent::Audio(audio) => Part::Media(MediaPart::new(
2794 Modality::Audio,
2795 audio.mime_type,
2796 DataRef::InlineText(audio.data),
2797 )),
2798 RawContent::Resource(embedded) => {
2799 agentkit_part_from_resource(resource_contents_to_capabilities(embedded.resource))
2800 }
2801 RawContent::ResourceLink(link) => Part::Text(TextPart::new(link.uri)),
2802 }
2803}
2804
2805fn value_to_json_object(value: Value, context: &str) -> Result<rmcp_model::JsonObject, McpError> {
2806 match value {
2807 Value::Object(object) => Ok(object),
2808 Value::Null => Ok(serde_json::Map::new()),
2809 other => Err(McpError::Protocol(format!(
2810 "{context} must be a JSON object, got {other}"
2811 ))),
2812 }
2813}
2814
2815fn bearer_token_from_metadata(metadata: &MetadataMap) -> Option<String> {
2816 ["bearer_token", "access_token", "token", "api_key"]
2817 .into_iter()
2818 .find_map(|key| metadata.get(key).and_then(Value::as_str).map(str::to_owned))
2819}
2820
2821fn rmcp_initialize_error(config: &McpServerConfig, error: ClientInitializeError) -> McpError {
2822 if let Some(signal) = match &error {
2823 ClientInitializeError::TransportError { error: dyn_err, .. } => {
2824 transport_auth_signal(dyn_err)
2825 }
2826 _ => None,
2827 } {
2828 return McpError::AuthRequired(Box::new(auth_request_from_signal(
2829 &config.id,
2830 McpMethod::Initialize,
2831 signal,
2832 &error.to_string(),
2833 )));
2834 }
2835 McpError::Transport(error.to_string())
2836}
2837
2838fn rmcp_service_error(error: ServiceError) -> McpError {
2839 McpError::Invocation(error.to_string())
2840}
2841
2842fn rmcp_operation_error(
2843 server_id: &McpServerId,
2844 method: McpMethod,
2845 error: ServiceError,
2846) -> McpError {
2847 if let Some(signal) = service_auth_signal(&error) {
2848 return McpError::AuthRequired(Box::new(auth_request_from_signal(
2849 server_id,
2850 method,
2851 signal,
2852 &error.to_string(),
2853 )));
2854 }
2855 McpError::Invocation(error.to_string())
2856}
2857
2858#[derive(Debug)]
2859enum AuthSignal {
2860 Required {
2861 www_authenticate: Option<String>,
2862 },
2863 InsufficientScope {
2864 www_authenticate: Option<String>,
2865 required_scope: Option<String>,
2866 },
2867}
2868
2869fn service_auth_signal(error: &ServiceError) -> Option<AuthSignal> {
2870 match error {
2871 ServiceError::TransportSend(dyn_err) => transport_auth_signal(dyn_err),
2872 _ => None,
2873 }
2874}
2875
2876fn transport_auth_signal(error: &DynamicTransportError) -> Option<AuthSignal> {
2877 let inner = error
2878 .error
2879 .downcast_ref::<StreamableHttpError<reqwest::Error>>()?;
2880 match inner {
2881 StreamableHttpError::AuthRequired(AuthRequiredError {
2882 www_authenticate_header,
2883 ..
2884 }) => Some(AuthSignal::Required {
2885 www_authenticate: Some(www_authenticate_header.clone()),
2886 }),
2887 StreamableHttpError::InsufficientScope(InsufficientScopeError {
2888 www_authenticate_header,
2889 required_scope,
2890 ..
2891 }) => Some(AuthSignal::InsufficientScope {
2892 www_authenticate: Some(www_authenticate_header.clone()),
2893 required_scope: required_scope.clone(),
2894 }),
2895 _ => None,
2896 }
2897}
2898
2899fn auth_request_from_signal(
2900 server_id: &McpServerId,
2901 method: McpMethod,
2902 signal: AuthSignal,
2903 message: &str,
2904) -> AuthRequest {
2905 let method_name = method.method_name();
2906 let mut challenge = MetadataMap::new();
2907 challenge.insert("server_id".into(), Value::String(server_id.to_string()));
2908 challenge.insert("method".into(), Value::String(method_name.into()));
2909 challenge.insert("message".into(), Value::String(message.into()));
2910 challenge.insert("flow_kind".into(), Value::String("http_bearer".into()));
2911 match signal {
2912 AuthSignal::Required { www_authenticate } => {
2913 if let Some(header) = www_authenticate {
2914 challenge.insert("www_authenticate".into(), Value::String(header));
2915 }
2916 }
2917 AuthSignal::InsufficientScope {
2918 www_authenticate,
2919 required_scope,
2920 } => {
2921 challenge.insert("insufficient_scope".into(), Value::Bool(true));
2922 if let Some(header) = www_authenticate {
2923 challenge.insert("www_authenticate".into(), Value::String(header));
2924 }
2925 if let Some(scope) = required_scope {
2926 challenge.insert("required_scope".into(), Value::String(scope));
2927 }
2928 }
2929 }
2930 AuthRequest {
2931 id: format!("mcp:{}:{}", server_id, method_name),
2932 provider: format!("mcp.{}", server_id),
2933 operation: method.into_auth_operation(server_id),
2934 challenge,
2935 }
2936}
2937
2938#[derive(Debug, Clone)]
2943enum McpMethod {
2944 Initialize,
2945 ToolsCall { name: String, arguments: Value },
2946 ResourcesRead { uri: String },
2947 ResourcesSubscribe { uri: String },
2948 ResourcesUnsubscribe { uri: String },
2949 PromptsGet { name: String, arguments: Value },
2950 LoggingSetLevel { level: String },
2951}
2952
2953impl McpMethod {
2954 fn method_name(&self) -> &'static str {
2955 match self {
2956 Self::Initialize => "initialize",
2957 Self::ToolsCall { .. } => "tools/call",
2958 Self::ResourcesRead { .. } => "resources/read",
2959 Self::ResourcesSubscribe { .. } => "resources/subscribe",
2960 Self::ResourcesUnsubscribe { .. } => "resources/unsubscribe",
2961 Self::PromptsGet { .. } => "prompts/get",
2962 Self::LoggingSetLevel { .. } => "logging/setLevel",
2963 }
2964 }
2965
2966 fn into_auth_operation(self, server_id: &McpServerId) -> AuthOperation {
2967 let server = server_id.to_string();
2968 match self {
2969 Self::Initialize => AuthOperation::McpConnect {
2970 server_id: server,
2971 metadata: MetadataMap::new(),
2972 },
2973 Self::ToolsCall { name, arguments } => AuthOperation::McpToolCall {
2974 server_id: server,
2975 tool_name: name,
2976 input: arguments,
2977 metadata: MetadataMap::new(),
2978 },
2979 Self::ResourcesRead { uri } => AuthOperation::McpResourceRead {
2980 server_id: server,
2981 resource_id: uri,
2982 metadata: MetadataMap::new(),
2983 },
2984 Self::PromptsGet { name, arguments } => AuthOperation::McpPromptGet {
2985 server_id: server,
2986 prompt_id: name,
2987 args: arguments,
2988 metadata: MetadataMap::new(),
2989 },
2990 other @ (Self::ResourcesSubscribe { .. }
2991 | Self::ResourcesUnsubscribe { .. }
2992 | Self::LoggingSetLevel { .. }) => {
2993 let method = other.method_name().to_string();
2994 AuthOperation::McpOther {
2995 server_id: server,
2996 method,
2997 params: other.into_params_json(),
2998 metadata: MetadataMap::new(),
2999 }
3000 }
3001 }
3002 }
3003
3004 fn into_params_json(self) -> Value {
3005 match self {
3006 Self::Initialize => json!({}),
3007 Self::ToolsCall { name, arguments } => json!({ "name": name, "arguments": arguments }),
3008 Self::ResourcesRead { uri } => json!({ "uri": uri }),
3009 Self::ResourcesSubscribe { uri } => json!({ "uri": uri }),
3010 Self::ResourcesUnsubscribe { uri } => json!({ "uri": uri }),
3011 Self::PromptsGet { name, arguments } => {
3012 json!({ "name": name, "arguments": arguments })
3013 }
3014 Self::LoggingSetLevel { level } => json!({ "level": level }),
3015 }
3016 }
3017}
3018
3019#[derive(Debug, Error)]
3021pub enum McpError {
3022 #[error("io error: {0}")]
3024 Io(#[from] std::io::Error),
3025 #[error("serialization error: {0}")]
3027 Serialize(#[from] serde_json::Error),
3028 #[error("transport error: {0}")]
3030 Transport(String),
3031 #[error("protocol error: {0}")]
3033 Protocol(String),
3034 #[error("MCP auth required: {0:?}")]
3036 AuthRequired(Box<AuthRequest>),
3037 #[error("auth resolution error: {0}")]
3039 AuthResolution(String),
3040 #[error("invocation error: {0}")]
3042 Invocation(String),
3043 #[error("unknown MCP server: {0}")]
3045 UnknownServer(String),
3046}
3047
3048impl From<&str> for McpServerId {
3049 fn from(value: &str) -> Self {
3050 Self::new(value)
3051 }
3052}
3053
3054impl From<String> for McpServerId {
3055 fn from(value: String) -> Self {
3056 Self::new(value)
3057 }
3058}