Skip to main content

agent_client_protocol_rmcp/
lib.rs

1//! # agent-client-protocol-rmcp
2//!
3//! This crate provides integration between [rmcp](https://docs.rs/rmcp) MCP servers
4//! and the Agent Client Protocol MCP server framework.
5//!
6//! ## Usage
7//!
8//! Create an MCP server from an rmcp service using the extension trait:
9//!
10//! ```ignore
11//! use agent_client_protocol::mcp_server::McpServer;
12//! use agent_client_protocol_rmcp::McpServerExt;
13//!
14//! let server = McpServer::from_rmcp("my-server", MyRmcpService::new);
15//!
16//! // Use as a handler
17//! Proxy.builder()
18//!     .with_handler(server)
19//!     .serve(client)
20//!     .await?;
21//! ```
22
23use agent_client_protocol::mcp_server::{McpConnectionTo, McpServer, McpServerConnect};
24use agent_client_protocol::role::{self, HasPeer};
25use agent_client_protocol::{Agent, ByteStreams, ConnectTo, DynConnectTo, NullRun, Role};
26use futures_concurrency::future::TryJoin as _;
27use rmcp::ServiceExt;
28use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
29
30pub trait McpServerExt<Counterpart: Role>
31where
32    Counterpart: HasPeer<Agent>,
33{
34    /// Create an MCP server from something that implements the [`McpServerConnect`] trait.
35    ///
36    /// # See also
37    ///
38    /// See [`McpServer::builder`] to construct MCP servers from Rust code.
39    fn from_rmcp<S>(
40        name: impl ToString,
41        new_fn: impl Fn() -> S + Send + Sync + 'static,
42    ) -> McpServer<Counterpart, NullRun>
43    where
44        S: rmcp::Service<rmcp::RoleServer>,
45    {
46        struct RmcpServer<F> {
47            name: String,
48            new_fn: F,
49        }
50
51        impl<Counterpart, F, S> McpServerConnect<Counterpart> for RmcpServer<F>
52        where
53            Counterpart: Role,
54            F: Fn() -> S + Send + Sync + 'static,
55            S: rmcp::Service<rmcp::RoleServer>,
56        {
57            fn name(&self) -> String {
58                self.name.clone()
59            }
60
61            fn connect(
62                &self,
63                _cx: McpConnectionTo<Counterpart>,
64            ) -> DynConnectTo<role::mcp::Client> {
65                let service = (self.new_fn)();
66                DynConnectTo::new(RmcpServerComponent { service })
67            }
68        }
69
70        McpServer::new(
71            RmcpServer {
72                name: name.to_string(),
73                new_fn,
74            },
75            NullRun,
76        )
77    }
78}
79
80impl<Counterpart: Role> McpServerExt<Counterpart> for McpServer<Counterpart> where
81    Counterpart: HasPeer<Agent>
82{
83}
84
85/// Component wrapper for rmcp services.
86struct RmcpServerComponent<S> {
87    service: S,
88}
89
90impl<S> ConnectTo<role::mcp::Client> for RmcpServerComponent<S>
91where
92    S: rmcp::Service<rmcp::RoleServer>,
93{
94    async fn connect_to(
95        self,
96        client: impl ConnectTo<role::mcp::Server>,
97    ) -> Result<(), agent_client_protocol::Error> {
98        // Create tokio byte streams that rmcp expects
99        let (mcp_server_stream, mcp_client_stream) = tokio::io::duplex(8192);
100        let (mcp_server_read, mcp_server_write) = tokio::io::split(mcp_server_stream);
101        let (mcp_client_read, mcp_client_write) = tokio::io::split(mcp_client_stream);
102
103        let bytes_to_acp = async {
104            // Create ByteStreams component for the client side
105            let byte_streams =
106                ByteStreams::new(mcp_client_write.compat_write(), mcp_client_read.compat());
107
108            // Spawn task to connect byte_streams to the provided client
109            drop(ConnectTo::<role::mcp::Client>::connect_to(byte_streams, client).await);
110
111            Ok(())
112        };
113
114        let bytes_to_rmcp = async {
115            // Run the rmcp server with the server side of the duplex stream
116            let running_server = self
117                .service
118                .serve((mcp_server_read, mcp_server_write))
119                .await
120                .map_err(agent_client_protocol::Error::into_internal_error)?;
121
122            // Wait for the server to finish
123            running_server
124                .waiting()
125                .await
126                .map(|_quit_reason| ())
127                .map_err(agent_client_protocol::Error::into_internal_error)
128        };
129
130        (bytes_to_acp, bytes_to_rmcp).try_join().await?;
131        Ok(())
132    }
133}