Skip to main content

agent_client_protocol/mcp_server/
server.rs

1//! MCP server builder for creating MCP servers.
2
3use std::{marker::PhantomData, sync::Arc};
4
5use agent_client_protocol_schema::NewSessionRequest;
6use futures::{StreamExt, channel::mpsc};
7use uuid::Uuid;
8
9use crate::{
10    Agent, Client, ConnectTo, ConnectionTo, Dispatch, DynConnectTo, HandleDispatchFrom, Handled,
11    Role,
12    jsonrpc::{
13        DynamicHandlerRegistration,
14        run::{NullRun, RunWithConnectionTo},
15    },
16    mcp_server::{
17        McpConnectionTo, McpServerConnect, active_session::McpActiveSession,
18        builder::McpServerBuilder,
19    },
20    role::{self, HasPeer},
21    util::MatchDispatchFrom,
22};
23
24/// An MCP server that can be attached to ACP connections.
25///
26/// `McpServer` wraps an [`McpServerConnect`](`super::McpServerConnect`) implementation and can be used either:
27/// - As a message handler via [`Builder::with_handler`](`crate::Builder::with_handler`), automatically
28///   attaching to new sessions
29/// - Manually for more control
30///
31/// # Creating an MCP Server
32///
33/// Use [`McpServer::builder`] to create a server with tools:
34///
35/// ```rust,ignore
36/// let server = McpServer::builder("my-server".to_string())
37///     .instructions("A helpful assistant")
38///     .tool(MyTool)
39///     .build();
40/// ```
41///
42/// Or implement [`McpServerConnect`](`super::McpServerConnect`) for custom server behavior:
43///
44/// ```rust,ignore
45/// let server = McpServer::new(MyCustomServerConnect);
46/// ```
47pub struct McpServer<Counterpart: Role, Run = NullRun> {
48    /// The host role that is serving up this MCP server
49    phantom: PhantomData<Counterpart>,
50
51    /// The ACP URL we assigned for this mcp server; always unique
52    acp_url: String,
53
54    /// The "connect" instance
55    connect: Arc<dyn McpServerConnect<Counterpart>>,
56
57    /// The "responder" is a task that should be run alongside the message handler.
58    /// Some futures direct messages back through channels to this future which actually
59    /// handles responding to the client.
60    ///
61    /// This is how we bridge the gap between the rmcp implementation,
62    /// which requires `'static`, and our APIs, which do not.
63    responder: Run,
64}
65
66impl<Counterpart: Role + std::fmt::Debug, Run: std::fmt::Debug> std::fmt::Debug
67    for McpServer<Counterpart, Run>
68{
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("McpServer")
71            .field("phantom", &self.phantom)
72            .field("acp_url", &self.acp_url)
73            .field("responder", &self.responder)
74            .finish_non_exhaustive()
75    }
76}
77
78impl<Host: Role> McpServer<Host, NullRun> {
79    /// Create an empty server with no content.
80    pub fn builder(name: impl ToString) -> McpServerBuilder<Host, NullRun> {
81        McpServerBuilder::new(name.to_string())
82    }
83}
84
85impl<Counterpart: Role, Run> McpServer<Counterpart, Run>
86where
87    Run: RunWithConnectionTo<Counterpart>,
88{
89    /// Create an MCP server from something that implements the [`McpServerConnect`](`super::McpServerConnect`) trait.
90    ///
91    /// # See also
92    ///
93    /// See [`Self::builder`] to construct MCP servers from Rust code.
94    pub fn new(c: impl McpServerConnect<Counterpart>, responder: Run) -> Self {
95        McpServer {
96            phantom: PhantomData,
97            acp_url: format!("acp:{}", Uuid::new_v4()),
98            connect: Arc::new(c),
99            responder,
100        }
101    }
102
103    /// Split this MCP server into the message handler and a future that must be run while the handler is active.
104    pub(crate) fn into_handler_and_responder(self) -> (McpNewSessionHandler<Counterpart>, Run)
105    where
106        Counterpart: HasPeer<Agent>,
107    {
108        let Self {
109            phantom: _,
110            acp_url,
111            connect,
112            responder,
113        } = self;
114        (McpNewSessionHandler::new(acp_url, connect), responder)
115    }
116}
117
118/// Message handler created from a [`McpServer`].
119pub(crate) struct McpNewSessionHandler<Counterpart: Role>
120where
121    Counterpart: HasPeer<Agent>,
122{
123    acp_url: String,
124    connect: Arc<dyn McpServerConnect<Counterpart>>,
125    active_session: McpActiveSession<Counterpart>,
126}
127
128impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
129where
130    Counterpart: HasPeer<Agent>,
131{
132    pub fn new(acp_url: String, connect: Arc<dyn McpServerConnect<Counterpart>>) -> Self {
133        Self {
134            active_session: McpActiveSession::new(acp_url.clone(), connect.clone()),
135            acp_url,
136            connect,
137        }
138    }
139
140    /// Modify the new session request to include this MCP server.
141    fn modify_new_session_request(&self, request: &mut NewSessionRequest) {
142        request.mcp_servers.push(crate::schema::McpServer::Http(
143            crate::schema::McpServerHttp::new(self.connect.name(), self.acp_url.clone()),
144        ));
145    }
146}
147
148impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
149where
150    Counterpart: HasPeer<Agent>,
151{
152    /// Attach this server to the new session, spawning off a dynamic handler that will
153    /// manage requests coming from this session.
154    ///
155    /// # Return value
156    ///
157    /// Returns a [`DynamicHandlerRegistration`] for the handler that intercepts messages
158    /// related to this MCP server. Once the value is dropped, the MCP server messages
159    /// will no longer be received, so you need to keep this value alive as long as the session
160    /// is in use. You can also invoke [`DynamicHandlerRegistration::run_indefinitely`]
161    /// if you want to keep the handler running indefinitely.
162    pub fn into_dynamic_handler(
163        self,
164        request: &mut NewSessionRequest,
165        cx: &ConnectionTo<Counterpart>,
166    ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error>
167    where
168        Counterpart: HasPeer<Agent>,
169    {
170        self.modify_new_session_request(request);
171        cx.add_dynamic_handler(self.active_session)
172    }
173}
174
175impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for McpNewSessionHandler<Counterpart>
176where
177    Counterpart: HasPeer<Client> + HasPeer<Agent>,
178{
179    async fn handle_dispatch_from(
180        &mut self,
181        message: Dispatch,
182        cx: ConnectionTo<Counterpart>,
183    ) -> Result<Handled<Dispatch>, crate::Error> {
184        MatchDispatchFrom::new(message, &cx)
185            .if_request_from(Client, async |mut request: NewSessionRequest, responder| {
186                self.modify_new_session_request(&mut request);
187                Ok(Handled::No {
188                    message: (request, responder),
189                    retry: false,
190                })
191            })
192            .await
193            .otherwise_delegate(&mut self.active_session)
194            .await
195    }
196
197    fn describe_chain(&self) -> impl std::fmt::Debug {
198        format!("McpServer({})", self.connect.name())
199    }
200}
201
202impl<Run> ConnectTo<role::mcp::Client> for McpServer<role::mcp::Client, Run>
203where
204    Run: RunWithConnectionTo<role::mcp::Client> + 'static,
205{
206    async fn connect_to(
207        self,
208        client: impl ConnectTo<role::mcp::Server>,
209    ) -> Result<(), crate::Error> {
210        let Self {
211            acp_url,
212            connect,
213            responder,
214            phantom: _,
215        } = self;
216
217        let (tx, mut rx) = mpsc::unbounded();
218
219        role::mcp::Server
220            .builder()
221            .with_responder(responder)
222            .on_receive_dispatch(
223                async |message_from_client: Dispatch, _cx| {
224                    tx.unbounded_send(message_from_client)
225                        .map_err(|_| crate::util::internal_error("nobody listening to mcp server"))
226                },
227                crate::on_receive_dispatch!(),
228            )
229            .with_spawned(async move |connection_to_client| {
230                let spawned_server: DynConnectTo<role::mcp::Client> =
231                    connect.connect(McpConnectionTo {
232                        acp_url,
233                        connection: connection_to_client.clone(),
234                    });
235
236                role::mcp::Client
237                    .builder()
238                    .on_receive_dispatch(
239                        async |message_from_server: Dispatch, _| {
240                            // when we receive a message from the server, fwd to the client
241                            connection_to_client.send_proxied_message(message_from_server)
242                        },
243                        crate::on_receive_dispatch!(),
244                    )
245                    .connect_with(spawned_server, async |connection_to_server| {
246                        while let Some(message_from_client) = rx.next().await {
247                            connection_to_server.send_proxied_message(message_from_client)?;
248                        }
249                        Ok(())
250                    })
251                    .await
252            })
253            .connect_to(client)
254            .await
255    }
256}