1use crate::McpBridge;
10use parking_lot::RwLock as SyncRwLock;
11use std::{
12 collections::{BTreeMap, BTreeSet, hash_map::DefaultHasher},
13 hash::{Hash, Hasher},
14 sync::Arc,
15};
16use tokio::sync::{RwLock, broadcast};
17use wcore::McpServerConfig;
18
19pub type Fingerprint = u64;
23
24pub fn fingerprint(cfg: &McpServerConfig) -> Fingerprint {
29 let mut h = DefaultHasher::new();
30 cfg.command.hash(&mut h);
31 cfg.args.hash(&mut h);
32 for (k, v) in &cfg.env {
34 k.hash(&mut h);
35 v.hash(&mut h);
36 }
37 cfg.url.hash(&mut h);
38 h.finish()
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerStatus {
44 Connecting,
45 Connected,
46 Failed,
47 Disconnected,
48}
49
50#[derive(Debug, Clone)]
52pub struct McpServerState {
53 pub status: ServerStatus,
54 pub tools: Vec<String>,
55 pub last_error: Option<String>,
56}
57
58impl McpServerState {
59 fn connecting() -> Self {
60 Self {
61 status: ServerStatus::Connecting,
62 tools: Vec::new(),
63 last_error: None,
64 }
65 }
66
67 fn connected(tools: Vec<String>) -> Self {
68 Self {
69 status: ServerStatus::Connected,
70 tools,
71 last_error: None,
72 }
73 }
74
75 fn failed(error: String) -> Self {
76 Self {
77 status: ServerStatus::Failed,
78 tools: Vec::new(),
79 last_error: Some(error),
80 }
81 }
82}
83
84#[derive(Debug)]
86struct PeerEntry {
87 state: McpServerState,
88 refs: BTreeSet<(String, String)>,
90}
91
92#[derive(Debug, Clone)]
94pub enum McpEvent {
95 Connecting {
96 agent: String,
97 name: String,
98 },
99 Connected {
100 agent: String,
101 name: String,
102 tools: Vec<String>,
103 },
104 Failed {
105 agent: String,
106 name: String,
107 error: String,
108 },
109 Disconnected {
110 agent: String,
111 name: String,
112 },
113}
114
115const EVENT_CHANNEL_CAPACITY: usize = 256;
116
117pub struct McpHandler {
119 bridge: RwLock<Arc<McpBridge>>,
120 peers: SyncRwLock<BTreeMap<Fingerprint, PeerEntry>>,
122 by_owner: SyncRwLock<BTreeMap<(String, String), Fingerprint>>,
124 events_tx: broadcast::Sender<McpEvent>,
125}
126
127impl McpHandler {
128 const MCP_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
130
131 pub fn empty() -> Self {
132 let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
133 Self {
134 bridge: RwLock::new(Arc::new(McpBridge::new())),
135 peers: SyncRwLock::new(BTreeMap::new()),
136 by_owner: SyncRwLock::new(BTreeMap::new()),
137 events_tx,
138 }
139 }
140
141 pub fn subscribe(&self) -> broadcast::Receiver<McpEvent> {
142 self.events_tx.subscribe()
143 }
144
145 pub fn states(&self) -> BTreeMap<(String, String), McpServerState> {
147 let by_owner = self.by_owner.read();
148 let peers = self.peers.read();
149 by_owner
150 .iter()
151 .filter_map(|(key, fp)| peers.get(fp).map(|p| (key.clone(), p.state.clone())))
152 .collect()
153 }
154
155 pub fn allowed(&self, agent: &str, mcp_names: &[String]) -> Vec<(String, String)> {
162 let by_owner = self.by_owner.read();
163 let peers = self.peers.read();
164 let mut out = Vec::new();
165 for name in mcp_names {
166 let key = (agent.to_owned(), name.clone());
167 if let Some(fp) = by_owner.get(&key)
168 && let Some(peer) = peers.get(fp)
169 {
170 let id = peer_id(*fp);
171 for tool_name in &peer.state.tools {
172 out.push((id.clone(), tool_name.clone()));
173 }
174 }
175 }
176 out
177 }
178
179 pub async fn bridge(&self) -> Arc<McpBridge> {
181 Arc::clone(&*self.bridge.read().await)
182 }
183
184 pub fn try_bridge(&self) -> Option<Arc<McpBridge>> {
186 self.bridge.try_read().ok().map(|g| Arc::clone(&*g))
187 }
188
189 pub async fn register_for_agent(&self, agent: &str, cfg: &McpServerConfig) {
194 let fp = fingerprint(cfg);
195 let key = (agent.to_owned(), cfg.name.clone());
196
197 let needs_spawn = {
199 let mut peers = self.peers.write();
200 let mut by_owner = self.by_owner.write();
201 if let Some(old_fp) = by_owner.insert(key.clone(), fp)
204 && old_fp != fp
205 && let Some(entry) = peers.get_mut(&old_fp)
206 {
207 entry.refs.remove(&key);
208 }
209 match peers.get_mut(&fp) {
210 Some(entry) => {
211 entry.refs.insert(key.clone());
212 let event = match &entry.state.status {
215 ServerStatus::Connected => Some(McpEvent::Connected {
216 agent: agent.to_owned(),
217 name: cfg.name.clone(),
218 tools: entry.state.tools.clone(),
219 }),
220 ServerStatus::Failed => Some(McpEvent::Failed {
221 agent: agent.to_owned(),
222 name: cfg.name.clone(),
223 error: entry.state.last_error.clone().unwrap_or_default(),
224 }),
225 ServerStatus::Connecting | ServerStatus::Disconnected => None,
226 };
227 if let Some(e) = event {
228 let _ = self.events_tx.send(e);
229 }
230 false
231 }
232 None => {
233 let mut refs = BTreeSet::new();
234 refs.insert(key.clone());
235 peers.insert(
236 fp,
237 PeerEntry {
238 state: McpServerState::connecting(),
239 refs,
240 },
241 );
242 true
243 }
244 }
245 };
246
247 let _ = self.events_tx.send(McpEvent::Connecting {
248 agent: agent.to_owned(),
249 name: cfg.name.clone(),
250 });
251
252 if !needs_spawn {
253 return;
254 }
255
256 let bridge = self.bridge().await;
258 let state = connect_one(&bridge, cfg, fp).await;
259 {
260 let mut peers = self.peers.write();
261 if let Some(entry) = peers.get_mut(&fp) {
262 entry.state = state.clone();
263 }
264 }
265 let event = match &state.status {
266 ServerStatus::Connected => McpEvent::Connected {
267 agent: agent.to_owned(),
268 name: cfg.name.clone(),
269 tools: state.tools.clone(),
270 },
271 ServerStatus::Failed => McpEvent::Failed {
272 agent: agent.to_owned(),
273 name: cfg.name.clone(),
274 error: state.last_error.clone().unwrap_or_default(),
275 },
276 ServerStatus::Connecting | ServerStatus::Disconnected => return,
277 };
278 let _ = self.events_tx.send(event);
279 }
280
281 pub async fn unregister_for_agent(&self, agent: &str, name: &str) {
284 let key = (agent.to_owned(), name.to_owned());
285 let drop_fp: Option<Fingerprint> = {
286 let mut by_owner = self.by_owner.write();
287 let Some(fp) = by_owner.remove(&key) else {
288 return;
289 };
290 let mut peers = self.peers.write();
291 if let Some(entry) = peers.get_mut(&fp) {
292 entry.refs.remove(&key);
293 if entry.refs.is_empty() {
294 peers.remove(&fp);
295 Some(fp)
296 } else {
297 None
298 }
299 } else {
300 None
301 }
302 };
303
304 let _ = self.events_tx.send(McpEvent::Disconnected {
305 agent: agent.to_owned(),
306 name: name.to_owned(),
307 });
308
309 if let Some(fp) = drop_fp {
310 let bridge = self.bridge().await;
311 bridge.remove_server(&peer_id(fp)).await;
312 }
313 }
314}
315
316pub(crate) fn peer_id(fp: Fingerprint) -> String {
319 format!("{:016x}", fp)
320}
321
322async fn connect_one(bridge: &McpBridge, cfg: &McpServerConfig, fp: Fingerprint) -> McpServerState {
324 let id = peer_id(fp);
325 let fut = async {
326 if let Some(url) = &cfg.url {
327 tracing::info!(
328 server = %cfg.name,
329 fingerprint = %id,
330 %url,
331 "connecting MCP server via HTTP"
332 );
333 bridge
334 .connect_http_named(id.clone(), url, cfg.auth.clone())
335 .await
336 } else {
337 let mut cmd = tokio::process::Command::new(&cfg.command);
338 cmd.args(&cfg.args);
339 for (k, v) in &cfg.env {
340 cmd.env(k, v);
341 }
342 tracing::info!(
343 server = %cfg.name,
344 fingerprint = %id,
345 command = %cfg.command,
346 "connecting MCP server via stdio"
347 );
348 bridge.connect_stdio_named(id.clone(), cmd).await
349 }
350 };
351
352 match tokio::time::timeout(McpHandler::MCP_CONNECT_TIMEOUT, fut).await {
353 Ok(Ok(tools)) => {
354 tracing::info!(
355 "connected MCP server '{}' ({}) — {} tool(s)",
356 cfg.name,
357 id,
358 tools.len()
359 );
360 McpServerState::connected(tools)
361 }
362 Ok(Err(e)) => {
363 let msg = e.to_string();
364 tracing::warn!("failed to connect MCP server '{}' ({id}): {msg}", cfg.name);
365 McpServerState::failed(msg)
366 }
367 Err(_) => {
368 let msg = format!(
369 "timed out after {}s",
370 McpHandler::MCP_CONNECT_TIMEOUT.as_secs()
371 );
372 tracing::warn!("MCP server '{}' ({id}) {msg}, skipping", cfg.name);
373 McpServerState::failed(msg)
374 }
375 }
376}