agent_client_protocol/mcp_server/
server.rs1use std::{marker::PhantomData, sync::Arc};
4
5use agent_client_protocol_schema::NewSessionRequest;
6use futures::{StreamExt, channel::mpsc};
7use uuid::Uuid;
8
9use crate::{
10 Agent, Client, ConnectTo, ConnectionTo, Dispatch, DynConnectTo, HandleDispatchFrom, Handled,
11 Role,
12 jsonrpc::{
13 DynamicHandlerRegistration,
14 run::{NullRun, RunWithConnectionTo},
15 },
16 mcp_server::{
17 McpConnectionTo, McpServerConnect, active_session::McpActiveSession,
18 builder::McpServerBuilder,
19 },
20 role::{self, HasPeer},
21 util::MatchDispatchFrom,
22};
23
24pub struct McpServer<Counterpart: Role, Run = NullRun> {
48 phantom: PhantomData<Counterpart>,
50
51 acp_url: String,
53
54 connect: Arc<dyn McpServerConnect<Counterpart>>,
56
57 responder: Run,
64}
65
66impl<Counterpart: Role + std::fmt::Debug, Run: std::fmt::Debug> std::fmt::Debug
67 for McpServer<Counterpart, Run>
68{
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("McpServer")
71 .field("phantom", &self.phantom)
72 .field("acp_url", &self.acp_url)
73 .field("responder", &self.responder)
74 .finish_non_exhaustive()
75 }
76}
77
78impl<Host: Role> McpServer<Host, NullRun> {
79 pub fn builder(name: impl ToString) -> McpServerBuilder<Host, NullRun> {
81 McpServerBuilder::new(name.to_string())
82 }
83}
84
85impl<Counterpart: Role, Run> McpServer<Counterpart, Run>
86where
87 Run: RunWithConnectionTo<Counterpart>,
88{
89 pub fn new(c: impl McpServerConnect<Counterpart>, responder: Run) -> Self {
95 McpServer {
96 phantom: PhantomData,
97 acp_url: format!("acp:{}", Uuid::new_v4()),
98 connect: Arc::new(c),
99 responder,
100 }
101 }
102
103 pub(crate) fn into_handler_and_responder(self) -> (McpNewSessionHandler<Counterpart>, Run)
105 where
106 Counterpart: HasPeer<Agent>,
107 {
108 let Self {
109 phantom: _,
110 acp_url,
111 connect,
112 responder,
113 } = self;
114 (McpNewSessionHandler::new(acp_url, connect), responder)
115 }
116}
117
118pub(crate) struct McpNewSessionHandler<Counterpart: Role>
120where
121 Counterpart: HasPeer<Agent>,
122{
123 acp_url: String,
124 connect: Arc<dyn McpServerConnect<Counterpart>>,
125 active_session: McpActiveSession<Counterpart>,
126}
127
128impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
129where
130 Counterpart: HasPeer<Agent>,
131{
132 pub fn new(acp_url: String, connect: Arc<dyn McpServerConnect<Counterpart>>) -> Self {
133 Self {
134 active_session: McpActiveSession::new(acp_url.clone(), connect.clone()),
135 acp_url,
136 connect,
137 }
138 }
139
140 fn modify_new_session_request(&self, request: &mut NewSessionRequest) {
142 request.mcp_servers.push(crate::schema::McpServer::Http(
143 crate::schema::McpServerHttp::new(self.connect.name(), self.acp_url.clone()),
144 ));
145 }
146}
147
148impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
149where
150 Counterpart: HasPeer<Agent>,
151{
152 pub fn into_dynamic_handler(
163 self,
164 request: &mut NewSessionRequest,
165 cx: &ConnectionTo<Counterpart>,
166 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error>
167 where
168 Counterpart: HasPeer<Agent>,
169 {
170 self.modify_new_session_request(request);
171 cx.add_dynamic_handler(self.active_session)
172 }
173}
174
175impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for McpNewSessionHandler<Counterpart>
176where
177 Counterpart: HasPeer<Client> + HasPeer<Agent>,
178{
179 async fn handle_dispatch_from(
180 &mut self,
181 message: Dispatch,
182 cx: ConnectionTo<Counterpart>,
183 ) -> Result<Handled<Dispatch>, crate::Error> {
184 MatchDispatchFrom::new(message, &cx)
185 .if_request_from(Client, async |mut request: NewSessionRequest, responder| {
186 self.modify_new_session_request(&mut request);
187 Ok(Handled::No {
188 message: (request, responder),
189 retry: false,
190 })
191 })
192 .await
193 .otherwise_delegate(&mut self.active_session)
194 .await
195 }
196
197 fn describe_chain(&self) -> impl std::fmt::Debug {
198 format!("McpServer({})", self.connect.name())
199 }
200}
201
202impl<Run> ConnectTo<role::mcp::Client> for McpServer<role::mcp::Client, Run>
203where
204 Run: RunWithConnectionTo<role::mcp::Client> + 'static,
205{
206 async fn connect_to(
207 self,
208 client: impl ConnectTo<role::mcp::Server>,
209 ) -> Result<(), crate::Error> {
210 let Self {
211 acp_url,
212 connect,
213 responder,
214 phantom: _,
215 } = self;
216
217 let (tx, mut rx) = mpsc::unbounded();
218
219 role::mcp::Server
220 .builder()
221 .with_responder(responder)
222 .on_receive_dispatch(
223 async |message_from_client: Dispatch, _cx| {
224 tx.unbounded_send(message_from_client)
225 .map_err(|_| crate::util::internal_error("nobody listening to mcp server"))
226 },
227 crate::on_receive_dispatch!(),
228 )
229 .with_spawned(async move |connection_to_client| {
230 let spawned_server: DynConnectTo<role::mcp::Client> =
231 connect.connect(McpConnectionTo {
232 acp_url,
233 connection: connection_to_client.clone(),
234 });
235
236 role::mcp::Client
237 .builder()
238 .on_receive_dispatch(
239 async |message_from_server: Dispatch, _| {
240 connection_to_client.send_proxied_message(message_from_server)
242 },
243 crate::on_receive_dispatch!(),
244 )
245 .connect_with(spawned_server, async |connection_to_server| {
246 while let Some(message_from_client) = rx.next().await {
247 connection_to_server.send_proxied_message(message_from_client)?;
248 }
249 Ok(())
250 })
251 .await
252 })
253 .connect_to(client)
254 .await
255 }
256}