agent_client_protocol/mcp_server/
server.rs1use std::{marker::PhantomData, sync::Arc};
4
5use futures::{StreamExt, channel::mpsc};
6use uuid::Uuid;
7
8use crate::{
9 Agent, Client, ConnectTo, ConnectionTo, Dispatch, DynConnectTo, HandleDispatchFrom, Handled,
10 Role,
11 jsonrpc::{
12 DynamicHandlerRegistration,
13 run::{NullRun, RunWithConnectionTo},
14 },
15 mcp_server::{McpConnectionTo, McpServerConnect, active_session::McpActiveSession},
16 role::{self, HasPeer},
17 schema::v1::{McpServer as SchemaMcpServer, McpServerHttp, NewSessionRequest},
18 util::MatchDispatchFrom,
19};
20
21pub struct McpServer<Counterpart: Role, Run = NullRun> {
39 phantom: PhantomData<Counterpart>,
41
42 acp_id: String,
44
45 connect: Arc<dyn McpServerConnect<Counterpart>>,
47
48 responder: Run,
55}
56
57impl<Counterpart: Role + std::fmt::Debug, Run: std::fmt::Debug> std::fmt::Debug
58 for McpServer<Counterpart, Run>
59{
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("McpServer")
62 .field("phantom", &self.phantom)
63 .field("acp_id", &self.acp_id)
64 .field("responder", &self.responder)
65 .finish_non_exhaustive()
66 }
67}
68
69impl<Counterpart: Role, Run> McpServer<Counterpart, Run>
70where
71 Run: RunWithConnectionTo<Counterpart>,
72{
73 pub fn new(c: impl McpServerConnect<Counterpart>, responder: Run) -> Self {
80 McpServer {
81 phantom: PhantomData,
82 acp_id: format!("acp:{}", Uuid::new_v4()),
83 connect: Arc::new(c),
84 responder,
85 }
86 }
87
88 pub(crate) fn into_handler_and_responder(self) -> (McpNewSessionHandler<Counterpart>, Run)
90 where
91 Counterpart: HasPeer<Agent>,
92 {
93 let Self {
94 phantom: _,
95 acp_id,
96 connect,
97 responder,
98 } = self;
99 (McpNewSessionHandler::new(acp_id, connect), responder)
100 }
101}
102
103pub(crate) struct McpNewSessionHandler<Counterpart: Role>
105where
106 Counterpart: HasPeer<Agent>,
107{
108 acp_id: String,
109 connect: Arc<dyn McpServerConnect<Counterpart>>,
110 active_session: McpActiveSession<Counterpart>,
111}
112
113impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
114where
115 Counterpart: HasPeer<Agent>,
116{
117 pub fn new(acp_id: String, connect: Arc<dyn McpServerConnect<Counterpart>>) -> Self {
118 Self {
119 active_session: McpActiveSession::new(acp_id.clone(), connect.clone()),
120 acp_id,
121 connect,
122 }
123 }
124
125 fn modify_new_session_request(&self, request: &mut NewSessionRequest) {
127 request
128 .mcp_servers
129 .push(SchemaMcpServer::Http(McpServerHttp::new(
130 self.connect.name(),
131 self.acp_id.clone(),
132 )));
133 }
134}
135
136impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
137where
138 Counterpart: HasPeer<Agent>,
139{
140 pub fn into_dynamic_handler(
151 self,
152 request: &mut NewSessionRequest,
153 cx: &ConnectionTo<Counterpart>,
154 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error>
155 where
156 Counterpart: HasPeer<Agent>,
157 {
158 self.modify_new_session_request(request);
159 cx.add_dynamic_handler(self.active_session)
160 }
161}
162
163impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for McpNewSessionHandler<Counterpart>
164where
165 Counterpart: HasPeer<Client> + HasPeer<Agent>,
166{
167 async fn handle_dispatch_from(
168 &mut self,
169 message: Dispatch,
170 cx: ConnectionTo<Counterpart>,
171 ) -> Result<Handled<Dispatch>, crate::Error> {
172 MatchDispatchFrom::new(message, &cx)
173 .if_request_from(Client, async |mut request: NewSessionRequest, responder| {
174 self.modify_new_session_request(&mut request);
175 Ok(Handled::No {
176 message: (request, responder),
177 retry: false,
178 })
179 })
180 .await
181 .otherwise_delegate(&mut self.active_session)
182 .await
183 }
184
185 fn describe_chain(&self) -> impl std::fmt::Debug {
186 format!("McpServer({})", self.connect.name())
187 }
188}
189
190impl<Run> ConnectTo<role::mcp::Client> for McpServer<role::mcp::Client, Run>
191where
192 Run: RunWithConnectionTo<role::mcp::Client> + 'static,
193{
194 async fn connect_to(
195 self,
196 client: impl ConnectTo<role::mcp::Server>,
197 ) -> Result<(), crate::Error> {
198 let Self {
199 acp_id,
200 connect,
201 responder,
202 phantom: _,
203 } = self;
204
205 let (tx, mut rx) = mpsc::unbounded();
206
207 role::mcp::Server
208 .builder()
209 .with_responder(responder)
210 .on_receive_dispatch(
211 async |message_from_client: Dispatch, _cx| {
212 tx.unbounded_send(message_from_client)
213 .map_err(|_| crate::util::internal_error("nobody listening to mcp server"))
214 },
215 crate::on_receive_dispatch!(),
216 )
217 .with_spawned(async move |connection_to_client| {
218 let spawned_server: DynConnectTo<role::mcp::Client> =
219 connect.connect(McpConnectionTo {
220 acp_id,
221 connection: connection_to_client.clone(),
222 });
223
224 role::mcp::Client
225 .builder()
226 .on_receive_dispatch(
227 async |message_from_server: Dispatch, _| {
228 connection_to_client.send_proxied_message(message_from_server)
230 },
231 crate::on_receive_dispatch!(),
232 )
233 .connect_with(spawned_server, async |connection_to_server| {
234 while let Some(message_from_client) = rx.next().await {
235 connection_to_server.send_proxied_message(message_from_client)?;
236 }
237 Ok(())
238 })
239 .await
240 })
241 .connect_to(client)
242 .await
243 }
244}