Skip to main content

crabtalk_runtime/mcp/
handler.rs

1//! Crabtalk MCP handler — initial load and read access.
2
3use crate::mcp::{McpBridge, config::McpServerConfig};
4use std::sync::{Arc, RwLock as StdRwLock};
5use tokio::sync::RwLock;
6
7/// MCP bridge owner.
8pub struct McpHandler {
9    bridge: RwLock<Arc<McpBridge>>,
10    /// Sync cache of server names → tool names, populated at load/reload.
11    server_cache: StdRwLock<Vec<(String, Vec<String>)>>,
12}
13
14impl McpHandler {
15    /// Build a bridge from the given MCP server configs and discovered port files.
16    async fn build_bridge(configs: &[McpServerConfig]) -> McpBridge {
17        let bridge = McpBridge::new();
18        let mut connected_names: Vec<String> = Vec::new();
19
20        // 1. Connect servers from config.
21        for server_config in configs {
22            let result = if let Some(url) = &server_config.url {
23                tracing::info!(
24                    server = %server_config.name,
25                    url = %url,
26                    "connecting MCP server via HTTP"
27                );
28                bridge
29                    .connect_http_named(server_config.name.clone(), url)
30                    .await
31            } else {
32                let mut cmd = tokio::process::Command::new(&server_config.command);
33                cmd.args(&server_config.args);
34                for (k, v) in &server_config.env {
35                    cmd.env(k, v);
36                }
37                tracing::info!(
38                    server = %server_config.name,
39                    command = %server_config.command,
40                    "connecting MCP server via stdio"
41                );
42                bridge
43                    .connect_stdio_named(server_config.name.clone(), cmd)
44                    .await
45            };
46
47            match result {
48                Ok(tools) => {
49                    connected_names.push(server_config.name.clone());
50                    tracing::info!(
51                        "connected MCP server '{}' — {} tool(s)",
52                        server_config.name,
53                        tools.len()
54                    );
55                }
56                Err(e) => {
57                    tracing::warn!("failed to connect MCP server '{}': {e}", server_config.name);
58                }
59            }
60        }
61
62        // 2. Auto-discover services from port files not already connected.
63        for (name, url) in scan_port_files() {
64            if connected_names.iter().any(|n| n == &name) {
65                continue;
66            }
67            tracing::info!(
68                server = %name,
69                url = %url,
70                "connecting MCP server via port file"
71            );
72            match bridge.connect_http_named(name.clone(), &url).await {
73                Ok(tools) => {
74                    tracing::info!("connected MCP server '{name}' — {} tool(s)", tools.len());
75                }
76                Err(e) => {
77                    tracing::warn!("failed to connect MCP server '{name}': {e}");
78                }
79            }
80        }
81
82        bridge
83    }
84
85    /// Load MCP servers from the given configs at startup.
86    pub async fn load(configs: &[McpServerConfig]) -> Self {
87        let bridge = Self::build_bridge(configs).await;
88        let servers = bridge.list_servers().await;
89        Self {
90            bridge: RwLock::new(Arc::new(bridge)),
91            server_cache: StdRwLock::new(servers),
92        }
93    }
94
95    /// List all connected servers with their tool names.
96    pub async fn list(&self) -> Vec<(String, Vec<String>)> {
97        self.bridge.read().await.list_servers().await
98    }
99
100    /// Sync access to the cached server→tools list (populated at load time).
101    pub fn cached_list(&self) -> Vec<(String, Vec<String>)> {
102        self.server_cache.read().unwrap().clone()
103    }
104
105    /// Get a clone of the current bridge Arc.
106    pub async fn bridge(&self) -> Arc<McpBridge> {
107        Arc::clone(&*self.bridge.read().await)
108    }
109
110    /// Try to get a clone of the current bridge Arc without blocking.
111    pub fn try_bridge(&self) -> Option<Arc<McpBridge>> {
112        self.bridge.try_read().ok().map(|g| Arc::clone(&*g))
113    }
114}
115
116/// Scan `~/.crabtalk/run/*.port` for service port files.
117fn scan_port_files() -> Vec<(String, String)> {
118    let run_dir = &*wcore::paths::RUN_DIR;
119    let entries = match std::fs::read_dir(run_dir) {
120        Ok(e) => e,
121        Err(_) => return Vec::new(),
122    };
123
124    let mut result = Vec::new();
125    for entry in entries.flatten() {
126        let path = entry.path();
127        let Some(ext) = path.extension() else {
128            continue;
129        };
130        if ext != "port" {
131            continue;
132        }
133        let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
134            continue;
135        };
136        // Skip the daemon's own port file.
137        if stem == "crabtalk" {
138            continue;
139        }
140        if let Ok(contents) = std::fs::read_to_string(&path)
141            && let Ok(port) = contents.trim().parse::<u16>()
142        {
143            result.push((stem.to_string(), format!("http://127.0.0.1:{port}/mcp")));
144        }
145    }
146    result
147}