agent_client_protocol_rmcp/
lib.rs1use agent_client_protocol::mcp_server::{McpConnectionTo, McpServer, McpServerConnect};
33use agent_client_protocol::role;
34use agent_client_protocol::{ByteStreams, ConnectTo, DynConnectTo, NullRun, Role};
35use futures_concurrency::future::TryJoin as _;
36use rmcp::ServiceExt;
37use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
38
39mod builder;
40
41pub use agent_client_protocol::mcp_server::{EnabledTools, McpTool};
42pub use agent_client_protocol::{tool_fn, tool_fn_mut};
43pub use builder::McpServerBuilder;
44
45pub trait McpServerExt<Counterpart: Role> {
47 fn builder(name: impl ToString) -> McpServerBuilder<Counterpart, NullRun> {
49 McpServerBuilder::new(name.to_string())
50 }
51
52 fn from_rmcp<S>(
58 name: impl ToString,
59 new_fn: impl Fn() -> S + Send + Sync + 'static,
60 ) -> McpServer<Counterpart, NullRun>
61 where
62 S: rmcp::Service<rmcp::RoleServer>,
63 {
64 struct RmcpServer<F> {
65 name: String,
66 new_fn: F,
67 }
68
69 impl<Counterpart, F, S> McpServerConnect<Counterpart> for RmcpServer<F>
70 where
71 Counterpart: Role,
72 F: Fn() -> S + Send + Sync + 'static,
73 S: rmcp::Service<rmcp::RoleServer>,
74 {
75 fn name(&self) -> String {
76 self.name.clone()
77 }
78
79 fn connect(
80 &self,
81 _cx: McpConnectionTo<Counterpart>,
82 ) -> DynConnectTo<role::mcp::Client> {
83 let service = (self.new_fn)();
84 DynConnectTo::new(RmcpServerComponent { service })
85 }
86 }
87
88 McpServer::new(
89 RmcpServer {
90 name: name.to_string(),
91 new_fn,
92 },
93 NullRun,
94 )
95 }
96}
97
98impl<Counterpart: Role> McpServerExt<Counterpart> for McpServer<Counterpart> {}
99
100struct RmcpServerComponent<S> {
102 service: S,
103}
104
105impl<S> ConnectTo<role::mcp::Client> for RmcpServerComponent<S>
106where
107 S: rmcp::Service<rmcp::RoleServer>,
108{
109 async fn connect_to(
110 self,
111 client: impl ConnectTo<role::mcp::Server>,
112 ) -> Result<(), agent_client_protocol::Error> {
113 let (mcp_server_stream, mcp_client_stream) = tokio::io::duplex(8192);
115 let (mcp_server_read, mcp_server_write) = tokio::io::split(mcp_server_stream);
116 let (mcp_client_read, mcp_client_write) = tokio::io::split(mcp_client_stream);
117
118 let bytes_to_acp = async {
119 let byte_streams =
121 ByteStreams::new(mcp_client_write.compat_write(), mcp_client_read.compat());
122
123 drop(ConnectTo::<role::mcp::Client>::connect_to(byte_streams, client).await);
125
126 Ok(())
127 };
128
129 let bytes_to_rmcp = async {
130 let running_server = self
132 .service
133 .serve((mcp_server_read, mcp_server_write))
134 .await
135 .map_err(agent_client_protocol::Error::into_internal_error)?;
136
137 running_server
139 .waiting()
140 .await
141 .map(|_quit_reason| ())
142 .map_err(agent_client_protocol::Error::into_internal_error)
143 };
144
145 (bytes_to_acp, bytes_to_rmcp).try_join().await?;
146 Ok(())
147 }
148}