Skip to main content

agent_client_protocol/mcp_server/
server.rs

1//! MCP server attachment and routing for ACP sessions.
2
3use std::{marker::PhantomData, sync::Arc};
4
5use futures::{StreamExt, channel::mpsc};
6use uuid::Uuid;
7
8use crate::{
9    Agent, Client, ConnectTo, ConnectionTo, Dispatch, DynConnectTo, HandleDispatchFrom, Handled,
10    Role,
11    jsonrpc::{
12        DynamicHandlerRegistration,
13        run::{NullRun, RunWithConnectionTo},
14    },
15    mcp_server::{McpConnectionTo, McpServerConnect, active_session::McpActiveSession},
16    role::{self, HasPeer},
17    schema::v1::{McpServer as SchemaMcpServer, McpServerHttp, NewSessionRequest},
18    util::MatchDispatchFrom,
19};
20
21/// An MCP server that can be attached to ACP connections.
22///
23/// `McpServer` wraps an [`McpServerConnect`](`super::McpServerConnect`) implementation and can be used either:
24/// - As a message handler via [`Builder::with_handler`](`crate::Builder::with_handler`), automatically
25///   attaching to new sessions
26/// - Manually for more control
27///
28/// # Creating an MCP Server
29///
30/// The `agent-client-protocol-rmcp` crate provides builder APIs for MCP tools
31/// backed by the `rmcp` crate.
32///
33/// Or implement [`McpServerConnect`](`super::McpServerConnect`) for custom server behavior:
34///
35/// ```rust,ignore
36/// let server = McpServer::new(MyCustomServerConnect);
37/// ```
38pub struct McpServer<Counterpart: Role, Run = NullRun> {
39    /// The host role that is serving up this MCP server
40    phantom: PhantomData<Counterpart>,
41
42    /// The ACP identifier we assigned for this mcp server; always unique
43    acp_id: String,
44
45    /// The "connect" instance
46    connect: Arc<dyn McpServerConnect<Counterpart>>,
47
48    /// The "responder" is a task that should be run alongside the message handler.
49    /// Some futures direct messages back through channels to this future which actually
50    /// handles responding to the client.
51    ///
52    /// Some connector implementations use this to run support tasks alongside
53    /// the message handler.
54    responder: Run,
55}
56
57impl<Counterpart: Role + std::fmt::Debug, Run: std::fmt::Debug> std::fmt::Debug
58    for McpServer<Counterpart, Run>
59{
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("McpServer")
62            .field("phantom", &self.phantom)
63            .field("acp_id", &self.acp_id)
64            .field("responder", &self.responder)
65            .finish_non_exhaustive()
66    }
67}
68
69impl<Counterpart: Role, Run> McpServer<Counterpart, Run>
70where
71    Run: RunWithConnectionTo<Counterpart>,
72{
73    /// Create an MCP server from something that implements the [`McpServerConnect`](`super::McpServerConnect`) trait.
74    ///
75    /// # See also
76    ///
77    /// See `agent-client-protocol-rmcp` to construct MCP servers from Rust code
78    /// with `rmcp`.
79    pub fn new(c: impl McpServerConnect<Counterpart>, responder: Run) -> Self {
80        McpServer {
81            phantom: PhantomData,
82            acp_id: format!("acp:{}", Uuid::new_v4()),
83            connect: Arc::new(c),
84            responder,
85        }
86    }
87
88    /// Split this MCP server into the message handler and a future that must be run while the handler is active.
89    pub(crate) fn into_handler_and_responder(self) -> (McpNewSessionHandler<Counterpart>, Run)
90    where
91        Counterpart: HasPeer<Agent>,
92    {
93        let Self {
94            phantom: _,
95            acp_id,
96            connect,
97            responder,
98        } = self;
99        (McpNewSessionHandler::new(acp_id, connect), responder)
100    }
101}
102
103/// Message handler created from a [`McpServer`].
104pub(crate) struct McpNewSessionHandler<Counterpart: Role>
105where
106    Counterpart: HasPeer<Agent>,
107{
108    acp_id: String,
109    connect: Arc<dyn McpServerConnect<Counterpart>>,
110    active_session: McpActiveSession<Counterpart>,
111}
112
113impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
114where
115    Counterpart: HasPeer<Agent>,
116{
117    pub fn new(acp_id: String, connect: Arc<dyn McpServerConnect<Counterpart>>) -> Self {
118        Self {
119            active_session: McpActiveSession::new(acp_id.clone(), connect.clone()),
120            acp_id,
121            connect,
122        }
123    }
124
125    /// Modify the new session request to include this MCP server.
126    fn modify_new_session_request(&self, request: &mut NewSessionRequest) {
127        request
128            .mcp_servers
129            .push(SchemaMcpServer::Http(McpServerHttp::new(
130                self.connect.name(),
131                self.acp_id.clone(),
132            )));
133    }
134}
135
136impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
137where
138    Counterpart: HasPeer<Agent>,
139{
140    /// Attach this server to the new session, spawning off a dynamic handler that will
141    /// manage requests coming from this session.
142    ///
143    /// # Return value
144    ///
145    /// Returns a [`DynamicHandlerRegistration`] for the handler that intercepts messages
146    /// related to this MCP server. Once the value is dropped, the MCP server messages
147    /// will no longer be received, so you need to keep this value alive as long as the session
148    /// is in use. You can also invoke [`DynamicHandlerRegistration::run_indefinitely`]
149    /// if you want to keep the handler running indefinitely.
150    pub fn into_dynamic_handler(
151        self,
152        request: &mut NewSessionRequest,
153        cx: &ConnectionTo<Counterpart>,
154    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error>
155    where
156        Counterpart: HasPeer<Agent>,
157    {
158        self.modify_new_session_request(request);
159        cx.add_dynamic_handler(self.active_session)
160    }
161}
162
163impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for McpNewSessionHandler<Counterpart>
164where
165    Counterpart: HasPeer<Client> + HasPeer<Agent>,
166{
167    async fn handle_dispatch_from(
168        &mut self,
169        message: Dispatch,
170        cx: ConnectionTo<Counterpart>,
171    ) -> Result<Handled<Dispatch>, crate::Error> {
172        MatchDispatchFrom::new(message, &cx)
173            .if_request_from(Client, async |mut request: NewSessionRequest, responder| {
174                self.modify_new_session_request(&mut request);
175                Ok(Handled::No {
176                    message: (request, responder),
177                    retry: false,
178                })
179            })
180            .await
181            .otherwise_delegate(&mut self.active_session)
182            .await
183    }
184
185    fn describe_chain(&self) -> impl std::fmt::Debug {
186        format!("McpServer({})", self.connect.name())
187    }
188}
189
190impl<Run> ConnectTo<role::mcp::Client> for McpServer<role::mcp::Client, Run>
191where
192    Run: RunWithConnectionTo<role::mcp::Client> + 'static,
193{
194    async fn connect_to(
195        self,
196        client: impl ConnectTo<role::mcp::Server>,
197    ) -> Result<(), crate::Error> {
198        let Self {
199            acp_id,
200            connect,
201            responder,
202            phantom: _,
203        } = self;
204
205        let (tx, mut rx) = mpsc::unbounded();
206
207        role::mcp::Server
208            .builder()
209            .with_responder(responder)
210            .on_receive_dispatch(
211                async |message_from_client: Dispatch, _cx| {
212                    tx.unbounded_send(message_from_client)
213                        .map_err(|_| crate::util::internal_error("nobody listening to mcp server"))
214                },
215                crate::on_receive_dispatch!(),
216            )
217            .with_spawned(async move |connection_to_client| {
218                let spawned_server: DynConnectTo<role::mcp::Client> =
219                    connect.connect(McpConnectionTo {
220                        acp_id,
221                        connection: connection_to_client.clone(),
222                    });
223
224                role::mcp::Client
225                    .builder()
226                    .on_receive_dispatch(
227                        async |message_from_server: Dispatch, _| {
228                            // when we receive a message from the server, fwd to the client
229                            connection_to_client.send_proxied_message(message_from_server)
230                        },
231                        crate::on_receive_dispatch!(),
232                    )
233                    .connect_with(spawned_server, async |connection_to_server| {
234                        while let Some(message_from_client) = rx.next().await {
235                            connection_to_server.send_proxied_message(message_from_client)?;
236                        }
237                        Ok(())
238                    })
239                    .await
240            })
241            .connect_to(client)
242            .await
243    }
244}