agent_client_protocol_rmcp/
lib.rs1use agent_client_protocol::mcp_server::{McpConnectionTo, McpServer, McpServerConnect};
24use agent_client_protocol::role::{self, HasPeer};
25use agent_client_protocol::{Agent, ByteStreams, ConnectTo, DynConnectTo, NullRun, Role};
26use futures_concurrency::future::TryJoin as _;
27use rmcp::ServiceExt;
28use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
29
30pub trait McpServerExt<Counterpart: Role>
31where
32 Counterpart: HasPeer<Agent>,
33{
34 fn from_rmcp<S>(
40 name: impl ToString,
41 new_fn: impl Fn() -> S + Send + Sync + 'static,
42 ) -> McpServer<Counterpart, NullRun>
43 where
44 S: rmcp::Service<rmcp::RoleServer>,
45 {
46 struct RmcpServer<F> {
47 name: String,
48 new_fn: F,
49 }
50
51 impl<Counterpart, F, S> McpServerConnect<Counterpart> for RmcpServer<F>
52 where
53 Counterpart: Role,
54 F: Fn() -> S + Send + Sync + 'static,
55 S: rmcp::Service<rmcp::RoleServer>,
56 {
57 fn name(&self) -> String {
58 self.name.clone()
59 }
60
61 fn connect(
62 &self,
63 _cx: McpConnectionTo<Counterpart>,
64 ) -> DynConnectTo<role::mcp::Client> {
65 let service = (self.new_fn)();
66 DynConnectTo::new(RmcpServerComponent { service })
67 }
68 }
69
70 McpServer::new(
71 RmcpServer {
72 name: name.to_string(),
73 new_fn,
74 },
75 NullRun,
76 )
77 }
78}
79
80impl<Counterpart: Role> McpServerExt<Counterpart> for McpServer<Counterpart> where
81 Counterpart: HasPeer<Agent>
82{
83}
84
85struct RmcpServerComponent<S> {
87 service: S,
88}
89
90impl<S> ConnectTo<role::mcp::Client> for RmcpServerComponent<S>
91where
92 S: rmcp::Service<rmcp::RoleServer>,
93{
94 async fn connect_to(
95 self,
96 client: impl ConnectTo<role::mcp::Server>,
97 ) -> Result<(), agent_client_protocol::Error> {
98 let (mcp_server_stream, mcp_client_stream) = tokio::io::duplex(8192);
100 let (mcp_server_read, mcp_server_write) = tokio::io::split(mcp_server_stream);
101 let (mcp_client_read, mcp_client_write) = tokio::io::split(mcp_client_stream);
102
103 let bytes_to_acp = async {
104 let byte_streams =
106 ByteStreams::new(mcp_client_write.compat_write(), mcp_client_read.compat());
107
108 drop(ConnectTo::<role::mcp::Client>::connect_to(byte_streams, client).await);
110
111 Ok(())
112 };
113
114 let bytes_to_rmcp = async {
115 let running_server = self
117 .service
118 .serve((mcp_server_read, mcp_server_write))
119 .await
120 .map_err(agent_client_protocol::Error::into_internal_error)?;
121
122 running_server
124 .waiting()
125 .await
126 .map(|_quit_reason| ())
127 .map_err(agent_client_protocol::Error::into_internal_error)
128 };
129
130 (bytes_to_acp, bytes_to_rmcp).try_join().await?;
131 Ok(())
132 }
133}