Skip to main content

agent_client_protocol/mcp_server/
server.rs

1//! MCP server attachment and routing for ACP sessions.
2
3use std::{marker::PhantomData, sync::Arc};
4
5use agent_client_protocol_schema::NewSessionRequest;
6use futures::{StreamExt, channel::mpsc};
7use uuid::Uuid;
8
9use crate::{
10    Agent, Client, ConnectTo, ConnectionTo, Dispatch, DynConnectTo, HandleDispatchFrom, Handled,
11    Role,
12    jsonrpc::{
13        DynamicHandlerRegistration,
14        run::{NullRun, RunWithConnectionTo},
15    },
16    mcp_server::{McpConnectionTo, McpServerConnect, active_session::McpActiveSession},
17    role::{self, HasPeer},
18    util::MatchDispatchFrom,
19};
20
21/// An MCP server that can be attached to ACP connections.
22///
23/// `McpServer` wraps an [`McpServerConnect`](`super::McpServerConnect`) implementation and can be used either:
24/// - As a message handler via [`Builder::with_handler`](`crate::Builder::with_handler`), automatically
25///   attaching to new sessions
26/// - Manually for more control
27///
28/// # Creating an MCP Server
29///
30/// The `agent-client-protocol-rmcp` crate provides builder APIs for MCP tools
31/// backed by the `rmcp` crate.
32///
33/// Or implement [`McpServerConnect`](`super::McpServerConnect`) for custom server behavior:
34///
35/// ```rust,ignore
36/// let server = McpServer::new(MyCustomServerConnect);
37/// ```
38pub struct McpServer<Counterpart: Role, Run = NullRun> {
39    /// The host role that is serving up this MCP server
40    phantom: PhantomData<Counterpart>,
41
42    /// The ACP identifier we assigned for this mcp server; always unique
43    acp_id: String,
44
45    /// The "connect" instance
46    connect: Arc<dyn McpServerConnect<Counterpart>>,
47
48    /// The "responder" is a task that should be run alongside the message handler.
49    /// Some futures direct messages back through channels to this future which actually
50    /// handles responding to the client.
51    ///
52    /// Some connector implementations use this to run support tasks alongside
53    /// the message handler.
54    responder: Run,
55}
56
57impl<Counterpart: Role + std::fmt::Debug, Run: std::fmt::Debug> std::fmt::Debug
58    for McpServer<Counterpart, Run>
59{
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("McpServer")
62            .field("phantom", &self.phantom)
63            .field("acp_id", &self.acp_id)
64            .field("responder", &self.responder)
65            .finish_non_exhaustive()
66    }
67}
68
69impl<Counterpart: Role, Run> McpServer<Counterpart, Run>
70where
71    Run: RunWithConnectionTo<Counterpart>,
72{
73    /// Create an MCP server from something that implements the [`McpServerConnect`](`super::McpServerConnect`) trait.
74    ///
75    /// # See also
76    ///
77    /// See `agent-client-protocol-rmcp` to construct MCP servers from Rust code
78    /// with `rmcp`.
79    pub fn new(c: impl McpServerConnect<Counterpart>, responder: Run) -> Self {
80        McpServer {
81            phantom: PhantomData,
82            acp_id: format!("acp:{}", Uuid::new_v4()),
83            connect: Arc::new(c),
84            responder,
85        }
86    }
87
88    /// Split this MCP server into the message handler and a future that must be run while the handler is active.
89    pub(crate) fn into_handler_and_responder(self) -> (McpNewSessionHandler<Counterpart>, Run)
90    where
91        Counterpart: HasPeer<Agent>,
92    {
93        let Self {
94            phantom: _,
95            acp_id,
96            connect,
97            responder,
98        } = self;
99        (McpNewSessionHandler::new(acp_id, connect), responder)
100    }
101}
102
103/// Message handler created from a [`McpServer`].
104pub(crate) struct McpNewSessionHandler<Counterpart: Role>
105where
106    Counterpart: HasPeer<Agent>,
107{
108    acp_id: String,
109    connect: Arc<dyn McpServerConnect<Counterpart>>,
110    active_session: McpActiveSession<Counterpart>,
111}
112
113impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
114where
115    Counterpart: HasPeer<Agent>,
116{
117    pub fn new(acp_id: String, connect: Arc<dyn McpServerConnect<Counterpart>>) -> Self {
118        Self {
119            active_session: McpActiveSession::new(acp_id.clone(), connect.clone()),
120            acp_id,
121            connect,
122        }
123    }
124
125    /// Modify the new session request to include this MCP server.
126    fn modify_new_session_request(&self, request: &mut NewSessionRequest) {
127        request.mcp_servers.push(crate::schema::McpServer::Http(
128            crate::schema::McpServerHttp::new(self.connect.name(), self.acp_id.clone()),
129        ));
130    }
131}
132
133impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
134where
135    Counterpart: HasPeer<Agent>,
136{
137    /// Attach this server to the new session, spawning off a dynamic handler that will
138    /// manage requests coming from this session.
139    ///
140    /// # Return value
141    ///
142    /// Returns a [`DynamicHandlerRegistration`] for the handler that intercepts messages
143    /// related to this MCP server. Once the value is dropped, the MCP server messages
144    /// will no longer be received, so you need to keep this value alive as long as the session
145    /// is in use. You can also invoke [`DynamicHandlerRegistration::run_indefinitely`]
146    /// if you want to keep the handler running indefinitely.
147    pub fn into_dynamic_handler(
148        self,
149        request: &mut NewSessionRequest,
150        cx: &ConnectionTo<Counterpart>,
151    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error>
152    where
153        Counterpart: HasPeer<Agent>,
154    {
155        self.modify_new_session_request(request);
156        cx.add_dynamic_handler(self.active_session)
157    }
158}
159
160impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for McpNewSessionHandler<Counterpart>
161where
162    Counterpart: HasPeer<Client> + HasPeer<Agent>,
163{
164    async fn handle_dispatch_from(
165        &mut self,
166        message: Dispatch,
167        cx: ConnectionTo<Counterpart>,
168    ) -> Result<Handled<Dispatch>, crate::Error> {
169        MatchDispatchFrom::new(message, &cx)
170            .if_request_from(Client, async |mut request: NewSessionRequest, responder| {
171                self.modify_new_session_request(&mut request);
172                Ok(Handled::No {
173                    message: (request, responder),
174                    retry: false,
175                })
176            })
177            .await
178            .otherwise_delegate(&mut self.active_session)
179            .await
180    }
181
182    fn describe_chain(&self) -> impl std::fmt::Debug {
183        format!("McpServer({})", self.connect.name())
184    }
185}
186
187impl<Run> ConnectTo<role::mcp::Client> for McpServer<role::mcp::Client, Run>
188where
189    Run: RunWithConnectionTo<role::mcp::Client> + 'static,
190{
191    async fn connect_to(
192        self,
193        client: impl ConnectTo<role::mcp::Server>,
194    ) -> Result<(), crate::Error> {
195        let Self {
196            acp_id,
197            connect,
198            responder,
199            phantom: _,
200        } = self;
201
202        let (tx, mut rx) = mpsc::unbounded();
203
204        role::mcp::Server
205            .builder()
206            .with_responder(responder)
207            .on_receive_dispatch(
208                async |message_from_client: Dispatch, _cx| {
209                    tx.unbounded_send(message_from_client)
210                        .map_err(|_| crate::util::internal_error("nobody listening to mcp server"))
211                },
212                crate::on_receive_dispatch!(),
213            )
214            .with_spawned(async move |connection_to_client| {
215                let spawned_server: DynConnectTo<role::mcp::Client> =
216                    connect.connect(McpConnectionTo {
217                        acp_id,
218                        connection: connection_to_client.clone(),
219                    });
220
221                role::mcp::Client
222                    .builder()
223                    .on_receive_dispatch(
224                        async |message_from_server: Dispatch, _| {
225                            // when we receive a message from the server, fwd to the client
226                            connection_to_client.send_proxied_message(message_from_server)
227                        },
228                        crate::on_receive_dispatch!(),
229                    )
230                    .connect_with(spawned_server, async |connection_to_server| {
231                        while let Some(message_from_client) = rx.next().await {
232                            connection_to_server.send_proxied_message(message_from_client)?;
233                        }
234                        Ok(())
235                    })
236                    .await
237            })
238            .connect_to(client)
239            .await
240    }
241}