Skip to main content

agent_client_protocol_rmcp/
lib.rs

1//! # agent-client-protocol-rmcp
2//!
3//! This crate provides integration between [rmcp](https://docs.rs/rmcp) MCP servers
4//! and the Agent Client Protocol MCP server framework.
5//!
6//! ## Usage
7//!
8//! Build an MCP server with tools using the extension trait:
9//!
10//! ```ignore
11//! use agent_client_protocol::mcp_server::McpServer;
12//! use agent_client_protocol_rmcp::McpServerExt;
13//!
14//! let server = McpServer::builder("my-tools").build();
15//! ```
16//!
17//! Or create an MCP server from an rmcp service:
18//!
19//! ```ignore
20//! use agent_client_protocol::mcp_server::McpServer;
21//! use agent_client_protocol_rmcp::McpServerExt;
22//!
23//! let server = McpServer::from_rmcp("my-server", MyRmcpService::new);
24//!
25//! // Use as a handler
26//! Proxy.builder()
27//!     .with_handler(server)
28//!     .serve(client)
29//!     .await?;
30//! ```
31
32use agent_client_protocol::mcp_server::{McpConnectionTo, McpServer, McpServerConnect};
33use agent_client_protocol::role;
34use agent_client_protocol::{ByteStreams, ConnectTo, DynConnectTo, NullRun, Role};
35use futures_concurrency::future::TryJoin as _;
36use rmcp::ServiceExt;
37use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
38
39mod builder;
40
41pub use agent_client_protocol::mcp_server::{EnabledTools, McpTool};
42pub use agent_client_protocol::{tool_fn, tool_fn_mut};
43pub use builder::McpServerBuilder;
44
45/// Extension constructors for ACP MCP servers backed by `rmcp`.
46pub trait McpServerExt<Counterpart: Role> {
47    /// Create an MCP server builder for defining tools in Rust code.
48    fn builder(name: impl ToString) -> McpServerBuilder<Counterpart, NullRun> {
49        McpServerBuilder::new(name.to_string())
50    }
51
52    /// Create an MCP server from something that implements the [`McpServerConnect`] trait.
53    ///
54    /// # See also
55    ///
56    /// See [`Self::builder`] to construct MCP servers from Rust code.
57    fn from_rmcp<S>(
58        name: impl ToString,
59        new_fn: impl Fn() -> S + Send + Sync + 'static,
60    ) -> McpServer<Counterpart, NullRun>
61    where
62        S: rmcp::Service<rmcp::RoleServer>,
63    {
64        struct RmcpServer<F> {
65            name: String,
66            new_fn: F,
67        }
68
69        impl<Counterpart, F, S> McpServerConnect<Counterpart> for RmcpServer<F>
70        where
71            Counterpart: Role,
72            F: Fn() -> S + Send + Sync + 'static,
73            S: rmcp::Service<rmcp::RoleServer>,
74        {
75            fn name(&self) -> String {
76                self.name.clone()
77            }
78
79            fn connect(
80                &self,
81                _cx: McpConnectionTo<Counterpart>,
82            ) -> DynConnectTo<role::mcp::Client> {
83                let service = (self.new_fn)();
84                DynConnectTo::new(RmcpServerComponent { service })
85            }
86        }
87
88        McpServer::new(
89            RmcpServer {
90                name: name.to_string(),
91                new_fn,
92            },
93            NullRun,
94        )
95    }
96}
97
98impl<Counterpart: Role> McpServerExt<Counterpart> for McpServer<Counterpart> {}
99
100/// Component wrapper for rmcp services.
101struct RmcpServerComponent<S> {
102    service: S,
103}
104
105impl<S> ConnectTo<role::mcp::Client> for RmcpServerComponent<S>
106where
107    S: rmcp::Service<rmcp::RoleServer>,
108{
109    async fn connect_to(
110        self,
111        client: impl ConnectTo<role::mcp::Server>,
112    ) -> Result<(), agent_client_protocol::Error> {
113        // Create tokio byte streams that rmcp expects
114        let (mcp_server_stream, mcp_client_stream) = tokio::io::duplex(8192);
115        let (mcp_server_read, mcp_server_write) = tokio::io::split(mcp_server_stream);
116        let (mcp_client_read, mcp_client_write) = tokio::io::split(mcp_client_stream);
117
118        let bytes_to_acp = async {
119            // Create ByteStreams component for the client side
120            let byte_streams =
121                ByteStreams::new(mcp_client_write.compat_write(), mcp_client_read.compat());
122
123            // Spawn task to connect byte_streams to the provided client
124            drop(ConnectTo::<role::mcp::Client>::connect_to(byte_streams, client).await);
125
126            Ok(())
127        };
128
129        let bytes_to_rmcp = async {
130            // Run the rmcp server with the server side of the duplex stream
131            let running_server = self
132                .service
133                .serve((mcp_server_read, mcp_server_write))
134                .await
135                .map_err(agent_client_protocol::Error::into_internal_error)?;
136
137            // Wait for the server to finish
138            running_server
139                .waiting()
140                .await
141                .map(|_quit_reason| ())
142                .map_err(agent_client_protocol::Error::into_internal_error)
143        };
144
145        (bytes_to_acp, bytes_to_rmcp).try_join().await?;
146        Ok(())
147    }
148}