crabtalk_runtime/mcp/
handler.rs1use crate::mcp::{McpBridge, config::McpServerConfig};
4use std::sync::{Arc, RwLock as StdRwLock};
5use tokio::sync::RwLock;
6
7pub struct McpHandler {
9 bridge: RwLock<Arc<McpBridge>>,
10 server_cache: StdRwLock<Vec<(String, Vec<String>)>>,
12}
13
14impl McpHandler {
15 async fn build_bridge(configs: &[McpServerConfig]) -> McpBridge {
17 let bridge = McpBridge::new();
18 let mut connected_names: Vec<String> = Vec::new();
19
20 for server_config in configs {
22 let result = if let Some(url) = &server_config.url {
23 tracing::info!(
24 server = %server_config.name,
25 url = %url,
26 "connecting MCP server via HTTP"
27 );
28 bridge
29 .connect_http_named(server_config.name.clone(), url)
30 .await
31 } else {
32 let mut cmd = tokio::process::Command::new(&server_config.command);
33 cmd.args(&server_config.args);
34 for (k, v) in &server_config.env {
35 cmd.env(k, v);
36 }
37 tracing::info!(
38 server = %server_config.name,
39 command = %server_config.command,
40 "connecting MCP server via stdio"
41 );
42 bridge
43 .connect_stdio_named(server_config.name.clone(), cmd)
44 .await
45 };
46
47 match result {
48 Ok(tools) => {
49 connected_names.push(server_config.name.clone());
50 tracing::info!(
51 "connected MCP server '{}' — {} tool(s)",
52 server_config.name,
53 tools.len()
54 );
55 }
56 Err(e) => {
57 tracing::warn!("failed to connect MCP server '{}': {e}", server_config.name);
58 }
59 }
60 }
61
62 for (name, url) in scan_port_files() {
64 if connected_names.iter().any(|n| n == &name) {
65 continue;
66 }
67 tracing::info!(
68 server = %name,
69 url = %url,
70 "connecting MCP server via port file"
71 );
72 match bridge.connect_http_named(name.clone(), &url).await {
73 Ok(tools) => {
74 tracing::info!("connected MCP server '{name}' — {} tool(s)", tools.len());
75 }
76 Err(e) => {
77 tracing::warn!("failed to connect MCP server '{name}': {e}");
78 }
79 }
80 }
81
82 bridge
83 }
84
85 pub async fn load(configs: &[McpServerConfig]) -> Self {
87 let bridge = Self::build_bridge(configs).await;
88 let servers = bridge.list_servers().await;
89 Self {
90 bridge: RwLock::new(Arc::new(bridge)),
91 server_cache: StdRwLock::new(servers),
92 }
93 }
94
95 pub async fn list(&self) -> Vec<(String, Vec<String>)> {
97 self.bridge.read().await.list_servers().await
98 }
99
100 pub fn cached_list(&self) -> Vec<(String, Vec<String>)> {
102 self.server_cache.read().unwrap().clone()
103 }
104
105 pub async fn bridge(&self) -> Arc<McpBridge> {
107 Arc::clone(&*self.bridge.read().await)
108 }
109
110 pub fn try_bridge(&self) -> Option<Arc<McpBridge>> {
112 self.bridge.try_read().ok().map(|g| Arc::clone(&*g))
113 }
114}
115
116fn scan_port_files() -> Vec<(String, String)> {
118 let run_dir = &*wcore::paths::RUN_DIR;
119 let entries = match std::fs::read_dir(run_dir) {
120 Ok(e) => e,
121 Err(_) => return Vec::new(),
122 };
123
124 let mut result = Vec::new();
125 for entry in entries.flatten() {
126 let path = entry.path();
127 let Some(ext) = path.extension() else {
128 continue;
129 };
130 if ext != "port" {
131 continue;
132 }
133 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
134 continue;
135 };
136 if stem == "crabtalk" {
138 continue;
139 }
140 if let Ok(contents) = std::fs::read_to_string(&path)
141 && let Ok(port) = contents.trim().parse::<u16>()
142 {
143 result.push((stem.to_string(), format!("http://127.0.0.1:{port}/mcp")));
144 }
145 }
146 result
147}