Skip to main content

crabtalk_mcp/
handler.rs

1//! Crabtalk MCP handler — agent-driven registration with fingerprint-keyed dedup.
2//!
3//! Agents declare their MCP servers inline (RFC 0193). The handler tracks
4//! which agents have declared which configs and dedups identical configs
5//! by structural fingerprint — two agents declaring the exact same
6//! `(command, args, env, url)` share one peer process. The peer survives
7//! until the last agent referencing it unregisters.
8
9use crate::McpBridge;
10use parking_lot::RwLock as SyncRwLock;
11use std::{
12    collections::{BTreeMap, BTreeSet, hash_map::DefaultHasher},
13    hash::{Hash, Hasher},
14    sync::Arc,
15};
16use tokio::sync::{RwLock, broadcast};
17use wcore::McpServerConfig;
18
19/// Stable identifier for a peer process — hash of the structural config.
20/// Two configs with the same fingerprint produce the same peer; different
21/// fingerprints get separate peers.
22pub type Fingerprint = u64;
23
24/// Compute the dedup fingerprint for a config. Hashes the fields that
25/// affect process identity: command, args, env, and url. `name` and
26/// `auto_restart` are not part of the fingerprint — they are
27/// presentation-level.
28pub fn fingerprint(cfg: &McpServerConfig) -> Fingerprint {
29    let mut h = DefaultHasher::new();
30    cfg.command.hash(&mut h);
31    cfg.args.hash(&mut h);
32    // BTreeMap iterates in sorted order — fingerprint is order-independent.
33    for (k, v) in &cfg.env {
34        k.hash(&mut h);
35        v.hash(&mut h);
36    }
37    cfg.url.hash(&mut h);
38    h.finish()
39}
40
41/// Connection status for a single peer.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerStatus {
44    Connecting,
45    Connected,
46    Failed,
47    Disconnected,
48}
49
50/// Per-peer lifecycle state mirrored on the handler.
51#[derive(Debug, Clone)]
52pub struct McpServerState {
53    pub status: ServerStatus,
54    pub tools: Vec<String>,
55    pub last_error: Option<String>,
56}
57
58impl McpServerState {
59    fn connecting() -> Self {
60        Self {
61            status: ServerStatus::Connecting,
62            tools: Vec::new(),
63            last_error: None,
64        }
65    }
66
67    fn connected(tools: Vec<String>) -> Self {
68        Self {
69            status: ServerStatus::Connected,
70            tools,
71            last_error: None,
72        }
73    }
74
75    fn failed(error: String) -> Self {
76        Self {
77            status: ServerStatus::Failed,
78            tools: Vec::new(),
79            last_error: Some(error),
80        }
81    }
82}
83
84/// One peer's tracked state plus the (agent, name) pairs that own it.
85#[derive(Debug)]
86struct PeerEntry {
87    state: McpServerState,
88    /// Owners — at least one. When this drops to empty the peer is torn down.
89    refs: BTreeSet<(String, String)>,
90}
91
92/// Lifecycle event emitted on every state transition.
93#[derive(Debug, Clone)]
94pub enum McpEvent {
95    Connecting {
96        agent: String,
97        name: String,
98    },
99    Connected {
100        agent: String,
101        name: String,
102        tools: Vec<String>,
103    },
104    Failed {
105        agent: String,
106        name: String,
107        error: String,
108    },
109    Disconnected {
110        agent: String,
111        name: String,
112    },
113}
114
115const EVENT_CHANNEL_CAPACITY: usize = 256;
116
117/// MCP bridge owner.
118pub struct McpHandler {
119    bridge: RwLock<Arc<McpBridge>>,
120    /// Per-fingerprint peer state.
121    peers: SyncRwLock<BTreeMap<Fingerprint, PeerEntry>>,
122    /// Reverse lookup — (agent, mcp name) → fingerprint of the owning peer.
123    by_owner: SyncRwLock<BTreeMap<(String, String), Fingerprint>>,
124    events_tx: broadcast::Sender<McpEvent>,
125}
126
127impl McpHandler {
128    /// Timeout for connecting to a single MCP server.
129    const MCP_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
130
131    pub fn empty() -> Self {
132        let (events_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
133        Self {
134            bridge: RwLock::new(Arc::new(McpBridge::new())),
135            peers: SyncRwLock::new(BTreeMap::new()),
136            by_owner: SyncRwLock::new(BTreeMap::new()),
137            events_tx,
138        }
139    }
140
141    pub fn subscribe(&self) -> broadcast::Receiver<McpEvent> {
142        self.events_tx.subscribe()
143    }
144
145    /// Snapshot of every peer's state, keyed by user-facing (agent, name).
146    pub fn states(&self) -> BTreeMap<(String, String), McpServerState> {
147        let by_owner = self.by_owner.read();
148        let peers = self.peers.read();
149        by_owner
150            .iter()
151            .filter_map(|(key, fp)| peers.get(fp).map(|p| (key.clone(), p.state.clone())))
152            .collect()
153    }
154
155    /// `(peer_id, tool_name)` pairs for every tool exposed by the
156    /// agent's declared MCPs. Iteration order matches `mcp_names`, which
157    /// matches the agent's declaration order — first-declarer wins on
158    /// tool name collisions within an agent. Used by the dispatcher to
159    /// route calls to the right peer without exposing tools the agent
160    /// didn't ask for.
161    pub fn allowed(&self, agent: &str, mcp_names: &[String]) -> Vec<(String, String)> {
162        let by_owner = self.by_owner.read();
163        let peers = self.peers.read();
164        let mut out = Vec::new();
165        for name in mcp_names {
166            let key = (agent.to_owned(), name.clone());
167            if let Some(fp) = by_owner.get(&key)
168                && let Some(peer) = peers.get(fp)
169            {
170                let id = peer_id(*fp);
171                for tool_name in &peer.state.tools {
172                    out.push((id.clone(), tool_name.clone()));
173                }
174            }
175        }
176        out
177    }
178
179    /// Get a clone of the current bridge Arc. Tool calls go through this.
180    pub async fn bridge(&self) -> Arc<McpBridge> {
181        Arc::clone(&*self.bridge.read().await)
182    }
183
184    /// Try to get a clone of the current bridge Arc without blocking.
185    pub fn try_bridge(&self) -> Option<Arc<McpBridge>> {
186        self.bridge.try_read().ok().map(|g| Arc::clone(&*g))
187    }
188
189    /// Register `cfg` as belonging to `agent`. If another agent has
190    /// already registered an identical config, this is a refcount bump
191    /// — no spawn. Otherwise the peer is spawned in the background and
192    /// the result reflected via lifecycle events.
193    pub async fn register_for_agent(&self, agent: &str, cfg: &McpServerConfig) {
194        let fp = fingerprint(cfg);
195        let key = (agent.to_owned(), cfg.name.clone());
196
197        // Fast path — fingerprint already tracked.
198        let needs_spawn = {
199            let mut peers = self.peers.write();
200            let mut by_owner = self.by_owner.write();
201            // Drop any prior claim by this owner — same key may have
202            // pointed at a different fingerprint before update.
203            if let Some(old_fp) = by_owner.insert(key.clone(), fp)
204                && old_fp != fp
205                && let Some(entry) = peers.get_mut(&old_fp)
206            {
207                entry.refs.remove(&key);
208            }
209            match peers.get_mut(&fp) {
210                Some(entry) => {
211                    entry.refs.insert(key.clone());
212                    // Replay the terminal status to the new owner so
213                    // subscribers get a uniform view of register events.
214                    let event = match &entry.state.status {
215                        ServerStatus::Connected => Some(McpEvent::Connected {
216                            agent: agent.to_owned(),
217                            name: cfg.name.clone(),
218                            tools: entry.state.tools.clone(),
219                        }),
220                        ServerStatus::Failed => Some(McpEvent::Failed {
221                            agent: agent.to_owned(),
222                            name: cfg.name.clone(),
223                            error: entry.state.last_error.clone().unwrap_or_default(),
224                        }),
225                        ServerStatus::Connecting | ServerStatus::Disconnected => None,
226                    };
227                    if let Some(e) = event {
228                        let _ = self.events_tx.send(e);
229                    }
230                    false
231                }
232                None => {
233                    let mut refs = BTreeSet::new();
234                    refs.insert(key.clone());
235                    peers.insert(
236                        fp,
237                        PeerEntry {
238                            state: McpServerState::connecting(),
239                            refs,
240                        },
241                    );
242                    true
243                }
244            }
245        };
246
247        let _ = self.events_tx.send(McpEvent::Connecting {
248            agent: agent.to_owned(),
249            name: cfg.name.clone(),
250        });
251
252        if !needs_spawn {
253            return;
254        }
255
256        // Cold path — actually spawn the peer.
257        let bridge = self.bridge().await;
258        let state = connect_one(&bridge, cfg, fp).await;
259        {
260            let mut peers = self.peers.write();
261            if let Some(entry) = peers.get_mut(&fp) {
262                entry.state = state.clone();
263            }
264        }
265        let event = match &state.status {
266            ServerStatus::Connected => McpEvent::Connected {
267                agent: agent.to_owned(),
268                name: cfg.name.clone(),
269                tools: state.tools.clone(),
270            },
271            ServerStatus::Failed => McpEvent::Failed {
272                agent: agent.to_owned(),
273                name: cfg.name.clone(),
274                error: state.last_error.clone().unwrap_or_default(),
275            },
276            ServerStatus::Connecting | ServerStatus::Disconnected => return,
277        };
278        let _ = self.events_tx.send(event);
279    }
280
281    /// Drop the agent's claim on the named MCP. When the last claim is
282    /// released the peer is disconnected and forgotten.
283    pub async fn unregister_for_agent(&self, agent: &str, name: &str) {
284        let key = (agent.to_owned(), name.to_owned());
285        let drop_fp: Option<Fingerprint> = {
286            let mut by_owner = self.by_owner.write();
287            let Some(fp) = by_owner.remove(&key) else {
288                return;
289            };
290            let mut peers = self.peers.write();
291            if let Some(entry) = peers.get_mut(&fp) {
292                entry.refs.remove(&key);
293                if entry.refs.is_empty() {
294                    peers.remove(&fp);
295                    Some(fp)
296                } else {
297                    None
298                }
299            } else {
300                None
301            }
302        };
303
304        let _ = self.events_tx.send(McpEvent::Disconnected {
305            agent: agent.to_owned(),
306            name: name.to_owned(),
307        });
308
309        if let Some(fp) = drop_fp {
310            let bridge = self.bridge().await;
311            bridge.remove_server(&peer_id(fp)).await;
312        }
313    }
314}
315
316/// String form of a fingerprint, used as the bridge's peer key. Bridge
317/// remains name-keyed; we hand it the fingerprint hex.
318pub(crate) fn peer_id(fp: Fingerprint) -> String {
319    format!("{:016x}", fp)
320}
321
322/// Attempt to connect a single server, applying the global timeout.
323async fn connect_one(bridge: &McpBridge, cfg: &McpServerConfig, fp: Fingerprint) -> McpServerState {
324    let id = peer_id(fp);
325    let fut = async {
326        if let Some(url) = &cfg.url {
327            tracing::info!(
328                server = %cfg.name,
329                fingerprint = %id,
330                %url,
331                "connecting MCP server via HTTP"
332            );
333            bridge
334                .connect_http_named(id.clone(), url, cfg.auth.clone())
335                .await
336        } else {
337            let mut cmd = tokio::process::Command::new(&cfg.command);
338            cmd.args(&cfg.args);
339            for (k, v) in &cfg.env {
340                cmd.env(k, v);
341            }
342            tracing::info!(
343                server = %cfg.name,
344                fingerprint = %id,
345                command = %cfg.command,
346                "connecting MCP server via stdio"
347            );
348            bridge.connect_stdio_named(id.clone(), cmd).await
349        }
350    };
351
352    match tokio::time::timeout(McpHandler::MCP_CONNECT_TIMEOUT, fut).await {
353        Ok(Ok(tools)) => {
354            tracing::info!(
355                "connected MCP server '{}' ({}) — {} tool(s)",
356                cfg.name,
357                id,
358                tools.len()
359            );
360            McpServerState::connected(tools)
361        }
362        Ok(Err(e)) => {
363            let msg = e.to_string();
364            tracing::warn!("failed to connect MCP server '{}' ({id}): {msg}", cfg.name);
365            McpServerState::failed(msg)
366        }
367        Err(_) => {
368            let msg = format!(
369                "timed out after {}s",
370                McpHandler::MCP_CONNECT_TIMEOUT.as_secs()
371            );
372            tracing::warn!("MCP server '{}' ({id}) {msg}, skipping", cfg.name);
373            McpServerState::failed(msg)
374        }
375    }
376}