agent_client_protocol/mcp_server/
server.rs1use std::{marker::PhantomData, sync::Arc};
4
5use agent_client_protocol_schema::NewSessionRequest;
6use futures::{StreamExt, channel::mpsc};
7use uuid::Uuid;
8
9use crate::{
10 Agent, Client, ConnectTo, ConnectionTo, Dispatch, DynConnectTo, HandleDispatchFrom, Handled,
11 Role,
12 jsonrpc::{
13 DynamicHandlerRegistration,
14 run::{NullRun, RunWithConnectionTo},
15 },
16 mcp_server::{McpConnectionTo, McpServerConnect, active_session::McpActiveSession},
17 role::{self, HasPeer},
18 util::MatchDispatchFrom,
19};
20
21pub struct McpServer<Counterpart: Role, Run = NullRun> {
39 phantom: PhantomData<Counterpart>,
41
42 acp_id: String,
44
45 connect: Arc<dyn McpServerConnect<Counterpart>>,
47
48 responder: Run,
55}
56
57impl<Counterpart: Role + std::fmt::Debug, Run: std::fmt::Debug> std::fmt::Debug
58 for McpServer<Counterpart, Run>
59{
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("McpServer")
62 .field("phantom", &self.phantom)
63 .field("acp_id", &self.acp_id)
64 .field("responder", &self.responder)
65 .finish_non_exhaustive()
66 }
67}
68
69impl<Counterpart: Role, Run> McpServer<Counterpart, Run>
70where
71 Run: RunWithConnectionTo<Counterpart>,
72{
73 pub fn new(c: impl McpServerConnect<Counterpart>, responder: Run) -> Self {
80 McpServer {
81 phantom: PhantomData,
82 acp_id: format!("acp:{}", Uuid::new_v4()),
83 connect: Arc::new(c),
84 responder,
85 }
86 }
87
88 pub(crate) fn into_handler_and_responder(self) -> (McpNewSessionHandler<Counterpart>, Run)
90 where
91 Counterpart: HasPeer<Agent>,
92 {
93 let Self {
94 phantom: _,
95 acp_id,
96 connect,
97 responder,
98 } = self;
99 (McpNewSessionHandler::new(acp_id, connect), responder)
100 }
101}
102
103pub(crate) struct McpNewSessionHandler<Counterpart: Role>
105where
106 Counterpart: HasPeer<Agent>,
107{
108 acp_id: String,
109 connect: Arc<dyn McpServerConnect<Counterpart>>,
110 active_session: McpActiveSession<Counterpart>,
111}
112
113impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
114where
115 Counterpart: HasPeer<Agent>,
116{
117 pub fn new(acp_id: String, connect: Arc<dyn McpServerConnect<Counterpart>>) -> Self {
118 Self {
119 active_session: McpActiveSession::new(acp_id.clone(), connect.clone()),
120 acp_id,
121 connect,
122 }
123 }
124
125 fn modify_new_session_request(&self, request: &mut NewSessionRequest) {
127 request.mcp_servers.push(crate::schema::McpServer::Http(
128 crate::schema::McpServerHttp::new(self.connect.name(), self.acp_id.clone()),
129 ));
130 }
131}
132
133impl<Counterpart: Role> McpNewSessionHandler<Counterpart>
134where
135 Counterpart: HasPeer<Agent>,
136{
137 pub fn into_dynamic_handler(
148 self,
149 request: &mut NewSessionRequest,
150 cx: &ConnectionTo<Counterpart>,
151 ) -> Result<DynamicHandlerRegistration<Counterpart>, crate::Error>
152 where
153 Counterpart: HasPeer<Agent>,
154 {
155 self.modify_new_session_request(request);
156 cx.add_dynamic_handler(self.active_session)
157 }
158}
159
160impl<Counterpart: Role> HandleDispatchFrom<Counterpart> for McpNewSessionHandler<Counterpart>
161where
162 Counterpart: HasPeer<Client> + HasPeer<Agent>,
163{
164 async fn handle_dispatch_from(
165 &mut self,
166 message: Dispatch,
167 cx: ConnectionTo<Counterpart>,
168 ) -> Result<Handled<Dispatch>, crate::Error> {
169 MatchDispatchFrom::new(message, &cx)
170 .if_request_from(Client, async |mut request: NewSessionRequest, responder| {
171 self.modify_new_session_request(&mut request);
172 Ok(Handled::No {
173 message: (request, responder),
174 retry: false,
175 })
176 })
177 .await
178 .otherwise_delegate(&mut self.active_session)
179 .await
180 }
181
182 fn describe_chain(&self) -> impl std::fmt::Debug {
183 format!("McpServer({})", self.connect.name())
184 }
185}
186
187impl<Run> ConnectTo<role::mcp::Client> for McpServer<role::mcp::Client, Run>
188where
189 Run: RunWithConnectionTo<role::mcp::Client> + 'static,
190{
191 async fn connect_to(
192 self,
193 client: impl ConnectTo<role::mcp::Server>,
194 ) -> Result<(), crate::Error> {
195 let Self {
196 acp_id,
197 connect,
198 responder,
199 phantom: _,
200 } = self;
201
202 let (tx, mut rx) = mpsc::unbounded();
203
204 role::mcp::Server
205 .builder()
206 .with_responder(responder)
207 .on_receive_dispatch(
208 async |message_from_client: Dispatch, _cx| {
209 tx.unbounded_send(message_from_client)
210 .map_err(|_| crate::util::internal_error("nobody listening to mcp server"))
211 },
212 crate::on_receive_dispatch!(),
213 )
214 .with_spawned(async move |connection_to_client| {
215 let spawned_server: DynConnectTo<role::mcp::Client> =
216 connect.connect(McpConnectionTo {
217 acp_id,
218 connection: connection_to_client.clone(),
219 });
220
221 role::mcp::Client
222 .builder()
223 .on_receive_dispatch(
224 async |message_from_server: Dispatch, _| {
225 connection_to_client.send_proxied_message(message_from_server)
227 },
228 crate::on_receive_dispatch!(),
229 )
230 .connect_with(spawned_server, async |connection_to_server| {
231 while let Some(message_from_client) = rx.next().await {
232 connection_to_server.send_proxied_message(message_from_client)?;
233 }
234 Ok(())
235 })
236 .await
237 })
238 .connect_to(client)
239 .await
240 }
241}