1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::ToolResult;
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22 pub tool_name: String,
23 pub description: String,
24 #[serde(default)]
25 pub input_schema: Value,
26 pub fetched_at_ms: u64,
27 pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32 pub name: String,
33 pub transport: String,
34 #[serde(default = "default_enabled")]
35 pub enabled: bool,
36 pub connected: bool,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub pid: Option<u32>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub last_error: Option<String>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub last_auth_challenge: Option<McpAuthChallenge>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub mcp_session_id: Option<String>,
45 #[serde(default)]
46 pub headers: HashMap<String, String>,
47 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48 pub secret_headers: HashMap<String, McpSecretRef>,
49 #[serde(default)]
50 pub tool_cache: Vec<McpToolCacheEntry>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub tools_fetched_at_ms: Option<u64>,
53 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
54 pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
55 #[serde(default, skip)]
56 pub secret_header_values: HashMap<String, String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum McpSecretRef {
62 Store { secret_id: String },
63 Env { env: String },
64 BearerEnv { env: String },
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct McpAuthChallenge {
69 pub challenge_id: String,
70 pub tool_name: String,
71 pub authorization_url: String,
72 pub message: String,
73 pub requested_at_ms: u64,
74 pub status: String,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PendingMcpAuth {
79 pub challenge_id: String,
80 pub authorization_url: String,
81 pub message: String,
82 pub status: String,
83 pub first_seen_ms: u64,
84 pub last_probe_ms: u64,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct McpRemoteTool {
89 pub server_name: String,
90 pub tool_name: String,
91 pub namespaced_name: String,
92 pub description: String,
93 #[serde(default)]
94 pub input_schema: Value,
95 pub fetched_at_ms: u64,
96 pub schema_hash: String,
97}
98
99#[derive(Clone)]
100pub struct McpRegistry {
101 servers: Arc<RwLock<HashMap<String, McpServer>>>,
102 processes: Arc<Mutex<HashMap<String, Child>>>,
103 state_file: Arc<PathBuf>,
104}
105
106impl McpRegistry {
107 pub fn new() -> Self {
108 Self::new_with_state_file(resolve_state_file())
109 }
110
111 pub fn new_with_state_file(state_file: PathBuf) -> Self {
112 let (loaded_state, migrated) = load_state(&state_file);
113 let loaded = loaded_state
114 .into_iter()
115 .map(|(k, mut v)| {
116 v.connected = false;
117 v.pid = None;
118 if v.name.trim().is_empty() {
119 v.name = k.clone();
120 }
121 if v.headers.is_empty() {
122 v.headers = HashMap::new();
123 }
124 if v.secret_headers.is_empty() {
125 v.secret_headers = HashMap::new();
126 }
127 v.secret_header_values = resolve_secret_header_values(&v.secret_headers);
128 (k, v)
129 })
130 .collect::<HashMap<_, _>>();
131 if migrated {
132 persist_state_blocking(&state_file, &loaded);
133 }
134 Self {
135 servers: Arc::new(RwLock::new(loaded)),
136 processes: Arc::new(Mutex::new(HashMap::new())),
137 state_file: Arc::new(state_file),
138 }
139 }
140
141 pub async fn list(&self) -> HashMap<String, McpServer> {
142 self.servers.read().await.clone()
143 }
144
145 pub async fn list_public(&self) -> HashMap<String, McpServer> {
146 self.servers
147 .read()
148 .await
149 .iter()
150 .map(|(name, server)| (name.clone(), redacted_server_view(server)))
151 .collect()
152 }
153
154 pub async fn add(&self, name: String, transport: String) {
155 self.add_or_update(name, transport, HashMap::new(), true)
156 .await;
157 }
158
159 pub async fn add_or_update(
160 &self,
161 name: String,
162 transport: String,
163 headers: HashMap<String, String>,
164 enabled: bool,
165 ) {
166 self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
167 .await;
168 }
169
170 pub async fn add_or_update_with_secret_refs(
171 &self,
172 name: String,
173 transport: String,
174 headers: HashMap<String, String>,
175 secret_headers: HashMap<String, McpSecretRef>,
176 enabled: bool,
177 ) {
178 let normalized_name = name.trim().to_string();
179 let (persisted_headers, persisted_secret_headers, secret_header_values) =
180 split_headers_for_storage(&normalized_name, headers, secret_headers);
181 let mut servers = self.servers.write().await;
182 let existing = servers.get(&normalized_name).cloned();
183 let preserve_cache = existing.as_ref().is_some_and(|row| {
184 row.transport == transport
185 && effective_headers(row)
186 == combine_headers(&persisted_headers, &secret_header_values)
187 });
188 let existing_tool_cache = if preserve_cache {
189 existing
190 .as_ref()
191 .map(|row| row.tool_cache.clone())
192 .unwrap_or_default()
193 } else {
194 Vec::new()
195 };
196 let existing_fetched_at = if preserve_cache {
197 existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
198 } else {
199 None
200 };
201 let server = McpServer {
202 name: normalized_name.clone(),
203 transport,
204 enabled,
205 connected: false,
206 pid: None,
207 last_error: None,
208 last_auth_challenge: None,
209 mcp_session_id: None,
210 headers: persisted_headers,
211 secret_headers: persisted_secret_headers,
212 tool_cache: existing_tool_cache,
213 tools_fetched_at_ms: existing_fetched_at,
214 pending_auth_by_tool: HashMap::new(),
215 secret_header_values,
216 };
217 servers.insert(normalized_name, server);
218 drop(servers);
219 self.persist_state().await;
220 }
221
222 pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
223 let mut servers = self.servers.write().await;
224 let Some(server) = servers.get_mut(name) else {
225 return false;
226 };
227 server.enabled = enabled;
228 if !enabled {
229 server.connected = false;
230 server.pid = None;
231 server.last_auth_challenge = None;
232 server.mcp_session_id = None;
233 server.pending_auth_by_tool.clear();
234 }
235 drop(servers);
236 if !enabled {
237 if let Some(mut child) = self.processes.lock().await.remove(name) {
238 let _ = child.kill().await;
239 let _ = child.wait().await;
240 }
241 }
242 self.persist_state().await;
243 true
244 }
245
246 pub async fn remove(&self, name: &str) -> bool {
247 let removed_server = {
248 let mut servers = self.servers.write().await;
249 servers.remove(name)
250 };
251 let Some(server) = removed_server else {
252 return false;
253 };
254 delete_secret_header_refs(&server.secret_headers);
255
256 if let Some(mut child) = self.processes.lock().await.remove(name) {
257 let _ = child.kill().await;
258 let _ = child.wait().await;
259 }
260 self.persist_state().await;
261 true
262 }
263
264 pub async fn connect(&self, name: &str) -> bool {
265 let server = {
266 let servers = self.servers.read().await;
267 let Some(server) = servers.get(name) else {
268 return false;
269 };
270 server.clone()
271 };
272
273 if !server.enabled {
274 let mut servers = self.servers.write().await;
275 if let Some(entry) = servers.get_mut(name) {
276 entry.connected = false;
277 entry.pid = None;
278 entry.last_error = Some("MCP server is disabled".to_string());
279 entry.last_auth_challenge = None;
280 entry.mcp_session_id = None;
281 entry.pending_auth_by_tool.clear();
282 }
283 drop(servers);
284 self.persist_state().await;
285 return false;
286 }
287
288 if let Some(command_text) = parse_stdio_transport(&server.transport) {
289 return self.connect_stdio(name, command_text).await;
290 }
291
292 if parse_remote_endpoint(&server.transport).is_some() {
293 return self.refresh(name).await.is_ok();
294 }
295
296 let mut servers = self.servers.write().await;
297 if let Some(entry) = servers.get_mut(name) {
298 entry.connected = true;
299 entry.pid = None;
300 entry.last_error = None;
301 entry.last_auth_challenge = None;
302 entry.mcp_session_id = None;
303 entry.pending_auth_by_tool.clear();
304 }
305 drop(servers);
306 self.persist_state().await;
307 true
308 }
309
310 pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
311 let server = {
312 let servers = self.servers.read().await;
313 let Some(server) = servers.get(name) else {
314 return Err("MCP server not found".to_string());
315 };
316 server.clone()
317 };
318
319 if !server.enabled {
320 return Err("MCP server is disabled".to_string());
321 }
322
323 let endpoint = parse_remote_endpoint(&server.transport)
324 .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
325
326 let request_headers = effective_headers(&server);
327 let (tools, session_id) = match self
328 .discover_remote_tools(&endpoint, &request_headers)
329 .await
330 {
331 Ok(result) => result,
332 Err(err) => {
333 let mut servers = self.servers.write().await;
334 if let Some(entry) = servers.get_mut(name) {
335 entry.connected = false;
336 entry.pid = None;
337 entry.last_error = Some(err.clone());
338 entry.last_auth_challenge = None;
339 entry.mcp_session_id = None;
340 entry.pending_auth_by_tool.clear();
341 entry.tool_cache.clear();
342 entry.tools_fetched_at_ms = None;
343 }
344 drop(servers);
345 self.persist_state().await;
346 return Err(err);
347 }
348 };
349
350 let now = now_ms();
351 let cache = tools
352 .iter()
353 .map(|tool| McpToolCacheEntry {
354 tool_name: tool.tool_name.clone(),
355 description: tool.description.clone(),
356 input_schema: tool.input_schema.clone(),
357 fetched_at_ms: now,
358 schema_hash: schema_hash(&tool.input_schema),
359 })
360 .collect::<Vec<_>>();
361
362 let mut servers = self.servers.write().await;
363 if let Some(entry) = servers.get_mut(name) {
364 entry.connected = true;
365 entry.pid = None;
366 entry.last_error = None;
367 entry.last_auth_challenge = None;
368 entry.mcp_session_id = session_id;
369 entry.tool_cache = cache;
370 entry.tools_fetched_at_ms = Some(now);
371 entry.pending_auth_by_tool.clear();
372 }
373 drop(servers);
374 self.persist_state().await;
375 Ok(self.server_tools(name).await)
376 }
377
378 pub async fn disconnect(&self, name: &str) -> bool {
379 if let Some(mut child) = self.processes.lock().await.remove(name) {
380 let _ = child.kill().await;
381 let _ = child.wait().await;
382 }
383 let mut servers = self.servers.write().await;
384 if let Some(server) = servers.get_mut(name) {
385 server.connected = false;
386 server.pid = None;
387 server.last_auth_challenge = None;
388 server.mcp_session_id = None;
389 server.pending_auth_by_tool.clear();
390 drop(servers);
391 self.persist_state().await;
392 return true;
393 }
394 false
395 }
396
397 pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
398 let mut out = self
399 .servers
400 .read()
401 .await
402 .values()
403 .filter(|server| server.enabled && server.connected)
404 .flat_map(server_tool_rows)
405 .collect::<Vec<_>>();
406 out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
407 out
408 }
409
410 pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
411 let Some(server) = self.servers.read().await.get(name).cloned() else {
412 return Vec::new();
413 };
414 let mut rows = server_tool_rows(&server);
415 rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
416 rows
417 }
418
419 pub async fn call_tool(
420 &self,
421 server_name: &str,
422 tool_name: &str,
423 args: Value,
424 ) -> Result<ToolResult, String> {
425 let server = {
426 let servers = self.servers.read().await;
427 let Some(server) = servers.get(server_name) else {
428 return Err(format!("MCP server '{server_name}' not found"));
429 };
430 server.clone()
431 };
432
433 if !server.enabled {
434 return Err(format!("MCP server '{server_name}' is disabled"));
435 }
436 if !server.connected {
437 return Err(format!("MCP server '{server_name}' is not connected"));
438 }
439
440 let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
441 "MCP tools/call currently supports HTTP/S transports only".to_string()
442 })?;
443 let canonical_tool = canonical_tool_key(tool_name);
444 let now = now_ms();
445 if let Some(blocked) = pending_auth_short_circuit(
446 &server,
447 &canonical_tool,
448 tool_name,
449 now,
450 MCP_AUTH_REPROBE_COOLDOWN_MS,
451 ) {
452 return Ok(ToolResult {
453 output: blocked.output,
454 metadata: json!({
455 "server": server_name,
456 "tool": tool_name,
457 "result": Value::Null,
458 "mcpAuth": blocked.mcp_auth
459 }),
460 });
461 }
462 let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
463
464 {
465 let mut servers = self.servers.write().await;
466 if let Some(row) = servers.get_mut(server_name) {
467 if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
468 pending.last_probe_ms = now;
469 }
470 }
471 }
472
473 let request = json!({
474 "jsonrpc": "2.0",
475 "id": format!("call-{}-{}", server_name, now_ms()),
476 "method": "tools/call",
477 "params": {
478 "name": tool_name,
479 "arguments": normalized_args
480 }
481 });
482 let (response, session_id) = post_json_rpc_with_session(
483 &endpoint,
484 &effective_headers(&server),
485 request,
486 server.mcp_session_id.as_deref(),
487 )
488 .await?;
489 if session_id.is_some() {
490 let mut servers = self.servers.write().await;
491 if let Some(row) = servers.get_mut(server_name) {
492 row.mcp_session_id = session_id;
493 }
494 drop(servers);
495 self.persist_state().await;
496 }
497
498 if let Some(err) = response.get("error") {
499 if let Some(challenge) = extract_auth_challenge(err, tool_name) {
500 let output = format!(
501 "{}\n\nAuthorize here: {}",
502 challenge.message, challenge.authorization_url
503 );
504 {
505 let mut servers = self.servers.write().await;
506 if let Some(row) = servers.get_mut(server_name) {
507 row.last_auth_challenge = Some(challenge.clone());
508 row.last_error = None;
509 row.pending_auth_by_tool.insert(
510 canonical_tool.clone(),
511 pending_auth_from_challenge(&challenge),
512 );
513 }
514 }
515 self.persist_state().await;
516 return Ok(ToolResult {
517 output,
518 metadata: json!({
519 "server": server_name,
520 "tool": tool_name,
521 "result": Value::Null,
522 "mcpAuth": {
523 "required": true,
524 "challengeId": challenge.challenge_id,
525 "tool": challenge.tool_name,
526 "authorizationUrl": challenge.authorization_url,
527 "message": challenge.message,
528 "status": challenge.status
529 }
530 }),
531 });
532 }
533 let message = err
534 .get("message")
535 .and_then(|v| v.as_str())
536 .unwrap_or("MCP tools/call failed");
537 return Err(message.to_string());
538 }
539
540 let result = response.get("result").cloned().unwrap_or(Value::Null);
541 let auth_challenge = extract_auth_challenge(&result, tool_name);
542 let output = if let Some(challenge) = auth_challenge.as_ref() {
543 format!(
544 "{}\n\nAuthorize here: {}",
545 challenge.message, challenge.authorization_url
546 )
547 } else {
548 result
549 .get("content")
550 .map(render_mcp_content)
551 .or_else(|| result.get("output").map(|v| v.to_string()))
552 .unwrap_or_else(|| result.to_string())
553 };
554
555 {
556 let mut servers = self.servers.write().await;
557 if let Some(row) = servers.get_mut(server_name) {
558 row.last_auth_challenge = auth_challenge.clone();
559 if let Some(challenge) = auth_challenge.as_ref() {
560 row.pending_auth_by_tool.insert(
561 canonical_tool.clone(),
562 pending_auth_from_challenge(challenge),
563 );
564 } else {
565 row.pending_auth_by_tool.remove(&canonical_tool);
566 }
567 }
568 }
569 self.persist_state().await;
570
571 let auth_metadata = auth_challenge.as_ref().map(|challenge| {
572 json!({
573 "required": true,
574 "challengeId": challenge.challenge_id,
575 "tool": challenge.tool_name,
576 "authorizationUrl": challenge.authorization_url,
577 "message": challenge.message,
578 "status": challenge.status
579 })
580 });
581
582 Ok(ToolResult {
583 output,
584 metadata: json!({
585 "server": server_name,
586 "tool": tool_name,
587 "result": result,
588 "mcpAuth": auth_metadata
589 }),
590 })
591 }
592
593 async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
594 match spawn_stdio_process(command_text).await {
595 Ok(child) => {
596 let pid = child.id();
597 self.processes.lock().await.insert(name.to_string(), child);
598 let mut servers = self.servers.write().await;
599 if let Some(server) = servers.get_mut(name) {
600 server.connected = true;
601 server.pid = pid;
602 server.last_error = None;
603 server.last_auth_challenge = None;
604 server.pending_auth_by_tool.clear();
605 }
606 drop(servers);
607 self.persist_state().await;
608 true
609 }
610 Err(err) => {
611 let mut servers = self.servers.write().await;
612 if let Some(server) = servers.get_mut(name) {
613 server.connected = false;
614 server.pid = None;
615 server.last_error = Some(err);
616 server.last_auth_challenge = None;
617 server.pending_auth_by_tool.clear();
618 }
619 drop(servers);
620 self.persist_state().await;
621 false
622 }
623 }
624 }
625
626 async fn discover_remote_tools(
627 &self,
628 endpoint: &str,
629 headers: &HashMap<String, String>,
630 ) -> Result<(Vec<McpRemoteTool>, Option<String>), String> {
631 let initialize = json!({
632 "jsonrpc": "2.0",
633 "id": "initialize-1",
634 "method": "initialize",
635 "params": {
636 "protocolVersion": MCP_PROTOCOL_VERSION,
637 "capabilities": {},
638 "clientInfo": {
639 "name": MCP_CLIENT_NAME,
640 "version": MCP_CLIENT_VERSION,
641 }
642 }
643 });
644 let (init_response, mut session_id) =
645 post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
646 if let Some(err) = init_response.get("error") {
647 let message = err
648 .get("message")
649 .and_then(|v| v.as_str())
650 .unwrap_or("MCP initialize failed");
651 return Err(message.to_string());
652 }
653
654 let tools_list = json!({
655 "jsonrpc": "2.0",
656 "id": "tools-list-1",
657 "method": "tools/list",
658 "params": {}
659 });
660 let (tools_response, next_session_id) =
661 post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
662 .await?;
663 if next_session_id.is_some() {
664 session_id = next_session_id;
665 }
666 if let Some(err) = tools_response.get("error") {
667 let message = err
668 .get("message")
669 .and_then(|v| v.as_str())
670 .unwrap_or("MCP tools/list failed");
671 return Err(message.to_string());
672 }
673
674 let tools = tools_response
675 .get("result")
676 .and_then(|v| v.get("tools"))
677 .and_then(|v| v.as_array())
678 .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
679
680 let now = now_ms();
681 let mut out = Vec::new();
682 for row in tools {
683 let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
684 continue;
685 };
686 let description = row
687 .get("description")
688 .and_then(|v| v.as_str())
689 .unwrap_or("")
690 .to_string();
691 let mut input_schema = row
692 .get("inputSchema")
693 .or_else(|| row.get("input_schema"))
694 .cloned()
695 .unwrap_or_else(|| json!({"type":"object"}));
696 normalize_tool_input_schema(&mut input_schema);
697 out.push(McpRemoteTool {
698 server_name: String::new(),
699 tool_name: tool_name.to_string(),
700 namespaced_name: String::new(),
701 description,
702 input_schema,
703 fetched_at_ms: now,
704 schema_hash: String::new(),
705 });
706 }
707
708 Ok((out, session_id))
709 }
710
711 async fn persist_state(&self) {
712 let snapshot = self.servers.read().await.clone();
713 persist_state_blocking(self.state_file.as_path(), &snapshot);
714 }
715}
716
717impl Default for McpRegistry {
718 fn default() -> Self {
719 Self::new()
720 }
721}
722
723fn default_enabled() -> bool {
724 true
725}
726
727fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
728 if let Some(parent) = path.parent() {
729 let _ = std::fs::create_dir_all(parent);
730 }
731 if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
732 let _ = std::fs::write(path, payload);
733 }
734}
735
736fn resolve_state_file() -> PathBuf {
737 if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
738 return PathBuf::from(path);
739 }
740 if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
741 let trimmed = state_dir.trim();
742 if !trimmed.is_empty() {
743 return PathBuf::from(trimmed).join("mcp_servers.json");
744 }
745 }
746 if let Some(data_dir) = dirs::data_dir() {
747 return data_dir
748 .join("tandem")
749 .join("data")
750 .join("mcp_servers.json");
751 }
752 dirs::home_dir()
753 .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
754 .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
755}
756
757fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
758 let Ok(raw) = std::fs::read_to_string(path) else {
759 return (HashMap::new(), false);
760 };
761 let mut migrated = false;
762 let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
763 for (name, server) in parsed.iter_mut() {
764 let (headers, secret_headers, secret_header_values, server_migrated) =
765 migrate_server_headers(name, server);
766 migrated = migrated || server_migrated;
767 server.headers = headers;
768 server.secret_headers = secret_headers;
769 server.secret_header_values = secret_header_values;
770 }
771 (parsed, migrated)
772}
773
774fn migrate_server_headers(
775 server_name: &str,
776 server: &McpServer,
777) -> (
778 HashMap<String, String>,
779 HashMap<String, McpSecretRef>,
780 HashMap<String, String>,
781 bool,
782) {
783 let original_effective = effective_headers(server);
784 let mut persisted_secret_headers = server.secret_headers.clone();
785 let mut secret_header_values = resolve_secret_header_values(&persisted_secret_headers);
786 let mut persisted_headers = server.headers.clone();
787 let mut migrated = false;
788
789 let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
790 for header_name in header_keys {
791 let Some(value) = persisted_headers.get(&header_name).cloned() else {
792 continue;
793 };
794 if persisted_secret_headers.contains_key(&header_name) {
795 continue;
796 }
797 if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
798 persisted_headers.remove(&header_name);
799 let resolved = resolve_secret_ref_value(&secret_ref).unwrap_or_default();
800 persisted_secret_headers.insert(header_name.clone(), secret_ref);
801 if !resolved.is_empty() {
802 secret_header_values.insert(header_name.clone(), resolved);
803 }
804 migrated = true;
805 continue;
806 }
807 if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
808 let secret_id = mcp_header_secret_id(server_name, &header_name);
809 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
810 persisted_headers.remove(&header_name);
811 persisted_secret_headers.insert(
812 header_name.clone(),
813 McpSecretRef::Store {
814 secret_id: secret_id.clone(),
815 },
816 );
817 secret_header_values.insert(header_name.clone(), value);
818 migrated = true;
819 }
820 }
821 }
822
823 if !migrated {
824 let effective = combine_headers(&persisted_headers, &secret_header_values);
825 migrated = effective != original_effective;
826 }
827
828 (
829 persisted_headers,
830 persisted_secret_headers,
831 secret_header_values,
832 migrated,
833 )
834}
835
836fn split_headers_for_storage(
837 server_name: &str,
838 headers: HashMap<String, String>,
839 explicit_secret_headers: HashMap<String, McpSecretRef>,
840) -> (
841 HashMap<String, String>,
842 HashMap<String, McpSecretRef>,
843 HashMap<String, String>,
844) {
845 let mut persisted_headers = HashMap::new();
846 let mut persisted_secret_headers = HashMap::new();
847 let mut secret_header_values = HashMap::new();
848
849 for (header_name, raw_value) in headers {
850 let value = raw_value.trim().to_string();
851 if value.is_empty() {
852 continue;
853 }
854 if let Some(secret_ref) = parse_secret_header_reference(&value) {
855 if let Some(resolved) = resolve_secret_ref_value(&secret_ref) {
856 secret_header_values.insert(header_name.clone(), resolved);
857 }
858 persisted_secret_headers.insert(header_name, secret_ref);
859 continue;
860 }
861 if header_name_is_sensitive(&header_name) {
862 let secret_id = mcp_header_secret_id(server_name, &header_name);
863 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
864 persisted_secret_headers.insert(
865 header_name.clone(),
866 McpSecretRef::Store {
867 secret_id: secret_id.clone(),
868 },
869 );
870 secret_header_values.insert(header_name, value);
871 continue;
872 }
873 }
874 persisted_headers.insert(header_name, value);
875 }
876
877 for (header_name, secret_ref) in explicit_secret_headers {
878 if let Some(resolved) = resolve_secret_ref_value(&secret_ref) {
879 secret_header_values.insert(header_name.clone(), resolved);
880 }
881 persisted_headers.remove(&header_name);
882 persisted_secret_headers.insert(header_name, secret_ref);
883 }
884
885 (
886 persisted_headers,
887 persisted_secret_headers,
888 secret_header_values,
889 )
890}
891
892fn combine_headers(
893 headers: &HashMap<String, String>,
894 secret_header_values: &HashMap<String, String>,
895) -> HashMap<String, String> {
896 let mut combined = headers.clone();
897 for (key, value) in secret_header_values {
898 if !value.trim().is_empty() {
899 combined.insert(key.clone(), value.clone());
900 }
901 }
902 combined
903}
904
905fn effective_headers(server: &McpServer) -> HashMap<String, String> {
906 combine_headers(&server.headers, &server.secret_header_values)
907}
908
909fn redacted_server_view(server: &McpServer) -> McpServer {
910 let mut clone = server.clone();
911 for (header_name, secret_ref) in &clone.secret_headers {
912 clone.headers.insert(
913 header_name.clone(),
914 redacted_secret_header_value(secret_ref),
915 );
916 }
917 clone.secret_header_values.clear();
918 clone
919}
920
921fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
922 match secret_ref {
923 McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
924 McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
925 }
926}
927
928fn resolve_secret_header_values(
929 secret_headers: &HashMap<String, McpSecretRef>,
930) -> HashMap<String, String> {
931 let mut out = HashMap::new();
932 for (header_name, secret_ref) in secret_headers {
933 if let Some(value) = resolve_secret_ref_value(secret_ref) {
934 if !value.trim().is_empty() {
935 out.insert(header_name.clone(), value);
936 }
937 }
938 }
939 out
940}
941
942fn delete_secret_header_refs(secret_headers: &HashMap<String, McpSecretRef>) {
943 for secret_ref in secret_headers.values() {
944 if let McpSecretRef::Store { secret_id } = secret_ref {
945 let _ = tandem_core::delete_provider_auth(secret_id);
946 }
947 }
948}
949
950fn resolve_secret_ref_value(secret_ref: &McpSecretRef) -> Option<String> {
951 match secret_ref {
952 McpSecretRef::Store { secret_id } => tandem_core::load_provider_auth()
953 .get(&secret_id.trim().to_ascii_lowercase())
954 .cloned()
955 .filter(|value| !value.trim().is_empty()),
956 McpSecretRef::Env { env } => std::env::var(env)
957 .ok()
958 .map(|value| value.trim().to_string())
959 .filter(|value| !value.is_empty()),
960 McpSecretRef::BearerEnv { env } => std::env::var(env)
961 .ok()
962 .map(|value| value.trim().to_string())
963 .filter(|value| !value.is_empty())
964 .map(|value| format!("Bearer {value}")),
965 }
966}
967
968fn parse_secret_header_reference(raw: &str) -> Option<McpSecretRef> {
969 let trimmed = raw.trim();
970 if let Some(env) = trimmed
971 .strip_prefix("${env:")
972 .and_then(|rest| rest.strip_suffix('}'))
973 .map(str::trim)
974 .filter(|value| !value.is_empty())
975 {
976 return Some(McpSecretRef::Env {
977 env: env.to_string(),
978 });
979 }
980 if let Some(env) = trimmed
981 .strip_prefix("${bearer_env:")
982 .and_then(|rest| rest.strip_suffix('}'))
983 .map(str::trim)
984 .filter(|value| !value.is_empty())
985 {
986 return Some(McpSecretRef::BearerEnv {
987 env: env.to_string(),
988 });
989 }
990 if let Some(env) = trimmed
991 .strip_prefix("Bearer ${env:")
992 .and_then(|rest| rest.strip_suffix("}"))
993 .map(str::trim)
994 .filter(|value| !value.is_empty())
995 {
996 return Some(McpSecretRef::BearerEnv {
997 env: env.to_string(),
998 });
999 }
1000 None
1001}
1002
1003fn header_name_is_sensitive(header_name: &str) -> bool {
1004 let normalized = header_name.trim().to_ascii_lowercase();
1005 normalized == "authorization"
1006 || normalized == "proxy-authorization"
1007 || normalized == "x-api-key"
1008 || normalized.contains("token")
1009 || normalized.contains("secret")
1010 || normalized.ends_with("api-key")
1011 || normalized.ends_with("api_key")
1012}
1013
1014fn mcp_header_secret_id(server_name: &str, header_name: &str) -> String {
1015 format!(
1016 "mcp_header::{}::{}",
1017 sanitize_namespace_segment(server_name),
1018 sanitize_namespace_segment(header_name)
1019 )
1020}
1021
1022fn parse_stdio_transport(transport: &str) -> Option<&str> {
1023 transport.strip_prefix("stdio:").map(str::trim)
1024}
1025
1026fn parse_remote_endpoint(transport: &str) -> Option<String> {
1027 let trimmed = transport.trim();
1028 if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
1029 return Some(trimmed.to_string());
1030 }
1031 for prefix in ["http:", "https:"] {
1032 if let Some(rest) = trimmed.strip_prefix(prefix) {
1033 let endpoint = rest.trim();
1034 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
1035 return Some(endpoint.to_string());
1036 }
1037 }
1038 }
1039 None
1040}
1041
1042fn server_tool_rows(server: &McpServer) -> Vec<McpRemoteTool> {
1043 let server_slug = sanitize_namespace_segment(&server.name);
1044 server
1045 .tool_cache
1046 .iter()
1047 .map(|tool| {
1048 let tool_slug = sanitize_namespace_segment(&tool.tool_name);
1049 McpRemoteTool {
1050 server_name: server.name.clone(),
1051 tool_name: tool.tool_name.clone(),
1052 namespaced_name: format!("mcp.{server_slug}.{tool_slug}"),
1053 description: tool.description.clone(),
1054 input_schema: tool.input_schema.clone(),
1055 fetched_at_ms: tool.fetched_at_ms,
1056 schema_hash: tool.schema_hash.clone(),
1057 }
1058 })
1059 .collect()
1060}
1061
1062fn sanitize_namespace_segment(raw: &str) -> String {
1063 let mut out = String::new();
1064 let mut previous_underscore = false;
1065 for ch in raw.trim().chars() {
1066 if ch.is_ascii_alphanumeric() {
1067 out.push(ch.to_ascii_lowercase());
1068 previous_underscore = false;
1069 } else if !previous_underscore {
1070 out.push('_');
1071 previous_underscore = true;
1072 }
1073 }
1074 let cleaned = out.trim_matches('_');
1075 if cleaned.is_empty() {
1076 "tool".to_string()
1077 } else {
1078 cleaned.to_string()
1079 }
1080}
1081
1082fn schema_hash(schema: &Value) -> String {
1083 let payload = serde_json::to_vec(schema).unwrap_or_default();
1084 let mut hasher = Sha256::new();
1085 hasher.update(payload);
1086 format!("{:x}", hasher.finalize())
1087}
1088
1089fn extract_auth_challenge(result: &Value, tool_name: &str) -> Option<McpAuthChallenge> {
1090 let authorization_url = find_string_with_priority(
1091 result,
1092 &[
1093 &["structuredContent", "authorization_url"],
1094 &["structuredContent", "authorizationUrl"],
1095 &["authorization_url"],
1096 &["authorizationUrl"],
1097 &["auth_url"],
1098 ],
1099 &["authorization_url", "authorizationUrl", "auth_url"],
1100 )?;
1101 let raw_message = find_string_with_priority(
1102 result,
1103 &[
1104 &["structuredContent", "message"],
1105 &["message"],
1106 &["structuredContent", "text"],
1107 &["text"],
1108 &["llm_instructions"],
1109 ],
1110 &["message", "text", "llm_instructions"],
1111 )
1112 .unwrap_or_else(|| "This tool requires authorization before it can run.".to_string());
1113 let message = sanitize_auth_message(&raw_message);
1114 let challenge_id = stable_id_seed(&format!("{tool_name}:{authorization_url}"));
1115 Some(McpAuthChallenge {
1116 challenge_id,
1117 tool_name: tool_name.to_string(),
1118 authorization_url,
1119 message,
1120 requested_at_ms: now_ms(),
1121 status: "pending".to_string(),
1122 })
1123}
1124
1125fn find_string_by_any_key(value: &Value, keys: &[&str]) -> Option<String> {
1126 match value {
1127 Value::Object(map) => {
1128 for key in keys {
1129 if let Some(s) = map.get(*key).and_then(|v| v.as_str()) {
1130 let trimmed = s.trim();
1131 if !trimmed.is_empty() {
1132 return Some(trimmed.to_string());
1133 }
1134 }
1135 }
1136 for child in map.values() {
1137 if let Some(found) = find_string_by_any_key(child, keys) {
1138 return Some(found);
1139 }
1140 }
1141 None
1142 }
1143 Value::Array(items) => items
1144 .iter()
1145 .find_map(|item| find_string_by_any_key(item, keys)),
1146 _ => None,
1147 }
1148}
1149
1150fn find_string_with_priority(
1151 value: &Value,
1152 paths: &[&[&str]],
1153 fallback_keys: &[&str],
1154) -> Option<String> {
1155 for path in paths {
1156 if let Some(found) = find_string_at_path(value, path) {
1157 return Some(found);
1158 }
1159 }
1160 find_string_by_any_key(value, fallback_keys)
1161}
1162
1163fn find_string_at_path(value: &Value, path: &[&str]) -> Option<String> {
1164 let mut current = value;
1165 for segment in path {
1166 current = current.get(*segment)?;
1167 }
1168 let s = current.as_str()?.trim();
1169 if s.is_empty() {
1170 None
1171 } else {
1172 Some(s.to_string())
1173 }
1174}
1175
1176fn sanitize_auth_message(raw: &str) -> String {
1177 let trimmed = raw.trim();
1178 if trimmed.is_empty() {
1179 return "This tool requires authorization before it can run.".to_string();
1180 }
1181 if let Some((head, _)) = trimmed.split_once("Authorize here:") {
1182 let head = head.trim();
1183 if !head.is_empty() {
1184 return truncate_text(head, 280);
1185 }
1186 }
1187 let no_newlines = trimmed.replace(['\r', '\n'], " ");
1188 truncate_text(no_newlines.trim(), 280)
1189}
1190
1191fn truncate_text(input: &str, max_chars: usize) -> String {
1192 if input.chars().count() <= max_chars {
1193 return input.to_string();
1194 }
1195 let truncated = input.chars().take(max_chars).collect::<String>();
1196 format!("{truncated}...")
1197}
1198
1199fn stable_id_seed(seed: &str) -> String {
1200 let mut hasher = Sha256::new();
1201 hasher.update(seed.as_bytes());
1202 let encoded = format!("{:x}", hasher.finalize());
1203 encoded.chars().take(16).collect()
1204}
1205
1206fn canonical_tool_key(tool_name: &str) -> String {
1207 tool_name.trim().to_ascii_lowercase()
1208}
1209
1210fn pending_auth_from_challenge(challenge: &McpAuthChallenge) -> PendingMcpAuth {
1211 PendingMcpAuth {
1212 challenge_id: challenge.challenge_id.clone(),
1213 authorization_url: challenge.authorization_url.clone(),
1214 message: challenge.message.clone(),
1215 status: challenge.status.clone(),
1216 first_seen_ms: challenge.requested_at_ms,
1217 last_probe_ms: challenge.requested_at_ms,
1218 }
1219}
1220
1221struct PendingAuthShortCircuit {
1222 output: String,
1223 mcp_auth: Value,
1224}
1225
1226fn pending_auth_short_circuit(
1227 server: &McpServer,
1228 tool_key: &str,
1229 tool_name: &str,
1230 now_ms_value: u64,
1231 cooldown_ms: u64,
1232) -> Option<PendingAuthShortCircuit> {
1233 let pending = server.pending_auth_by_tool.get(tool_key)?;
1234 let elapsed = now_ms_value.saturating_sub(pending.last_probe_ms);
1235 if elapsed >= cooldown_ms {
1236 return None;
1237 }
1238 let retry_after_ms = cooldown_ms.saturating_sub(elapsed);
1239 let output = format!(
1240 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1241 tool_name,
1242 pending.message,
1243 pending.authorization_url,
1244 retry_after_ms.div_ceil(1000)
1245 );
1246 Some(PendingAuthShortCircuit {
1247 output,
1248 mcp_auth: json!({
1249 "required": true,
1250 "pending": true,
1251 "blocked": true,
1252 "retryAfterMs": retry_after_ms,
1253 "challengeId": pending.challenge_id,
1254 "tool": tool_name,
1255 "authorizationUrl": pending.authorization_url,
1256 "message": pending.message,
1257 "status": pending.status
1258 }),
1259 })
1260}
1261
1262fn normalize_tool_input_schema(schema: &mut Value) {
1263 normalize_schema_node(schema);
1264}
1265
1266fn normalize_schema_node(node: &mut Value) {
1267 let Some(obj) = node.as_object_mut() else {
1268 return;
1269 };
1270
1271 if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
1275 let all_strings = enum_values.iter().all(|v| v.is_string());
1276 let string_like_type = schema_type_allows_string_enum(obj.get("type"));
1277 if !all_strings || !string_like_type {
1278 obj.remove("enum");
1279 }
1280 }
1281
1282 if let Some(properties) = obj.get_mut("properties").and_then(|v| v.as_object_mut()) {
1283 for value in properties.values_mut() {
1284 normalize_schema_node(value);
1285 }
1286 }
1287
1288 if let Some(items) = obj.get_mut("items") {
1289 normalize_schema_node(items);
1290 }
1291
1292 for key in ["anyOf", "oneOf", "allOf"] {
1293 if let Some(array) = obj.get_mut(key).and_then(|v| v.as_array_mut()) {
1294 for child in array.iter_mut() {
1295 normalize_schema_node(child);
1296 }
1297 }
1298 }
1299
1300 if let Some(additional) = obj.get_mut("additionalProperties") {
1301 normalize_schema_node(additional);
1302 }
1303}
1304
1305fn schema_type_allows_string_enum(schema_type: Option<&Value>) -> bool {
1306 let Some(schema_type) = schema_type else {
1307 return true;
1309 };
1310
1311 if let Some(kind) = schema_type.as_str() {
1312 return kind == "string";
1313 }
1314
1315 if let Some(kinds) = schema_type.as_array() {
1316 let mut saw_string = false;
1317 for kind in kinds {
1318 let Some(kind) = kind.as_str() else {
1319 return false;
1320 };
1321 if kind == "string" {
1322 saw_string = true;
1323 continue;
1324 }
1325 if kind != "null" {
1326 return false;
1327 }
1328 }
1329 return saw_string;
1330 }
1331
1332 false
1333}
1334
1335fn now_ms() -> u64 {
1336 SystemTime::now()
1337 .duration_since(UNIX_EPOCH)
1338 .map(|d| d.as_millis() as u64)
1339 .unwrap_or(0)
1340}
1341
1342fn build_headers(headers: &HashMap<String, String>) -> Result<HeaderMap, String> {
1343 let mut map = HeaderMap::new();
1344 map.insert(
1345 ACCEPT,
1346 HeaderValue::from_static("application/json, text/event-stream"),
1347 );
1348 map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
1349 for (key, value) in headers {
1350 let name = HeaderName::from_bytes(key.trim().as_bytes())
1351 .map_err(|e| format!("Invalid header name '{key}': {e}"))?;
1352 let header = HeaderValue::from_str(value.trim())
1353 .map_err(|e| format!("Invalid header value for '{key}': {e}"))?;
1354 map.insert(name, header);
1355 }
1356 Ok(map)
1357}
1358
1359async fn post_json_rpc_with_session(
1360 endpoint: &str,
1361 headers: &HashMap<String, String>,
1362 request: Value,
1363 session_id: Option<&str>,
1364) -> Result<(Value, Option<String>), String> {
1365 let client = reqwest::Client::builder()
1366 .timeout(std::time::Duration::from_secs(12))
1367 .build()
1368 .map_err(|e| format!("Failed to build HTTP client: {e}"))?;
1369 let mut req = client.post(endpoint).headers(build_headers(headers)?);
1370 if let Some(id) = session_id {
1371 let trimmed = id.trim();
1372 if !trimmed.is_empty() {
1373 req = req.header("Mcp-Session-Id", trimmed);
1374 }
1375 }
1376 let response = req
1377 .json(&request)
1378 .send()
1379 .await
1380 .map_err(|e| format!("MCP request failed: {e}"))?;
1381 let content_type = response
1382 .headers()
1383 .get(CONTENT_TYPE)
1384 .and_then(|v| v.to_str().ok())
1385 .unwrap_or("")
1386 .to_ascii_lowercase();
1387 let response_session_id = response
1388 .headers()
1389 .get("mcp-session-id")
1390 .and_then(|v| v.to_str().ok())
1391 .map(|v| v.trim().to_string())
1392 .filter(|v| !v.is_empty());
1393 let status = response.status();
1394 let payload = response
1395 .text()
1396 .await
1397 .map_err(|e| format!("Failed to read MCP response: {e}"))?;
1398 if !status.is_success() {
1399 return Err(format!(
1400 "MCP endpoint returned HTTP {}: {}",
1401 status.as_u16(),
1402 payload.chars().take(400).collect::<String>()
1403 ));
1404 }
1405
1406 let value = if content_type.starts_with("text/event-stream") {
1407 parse_sse_first_event_json(&payload).map_err(|e| {
1408 format!(
1409 "Invalid MCP SSE JSON response: {} (snippet: {})",
1410 e,
1411 payload.chars().take(400).collect::<String>()
1412 )
1413 })?
1414 } else if let Ok(value) = serde_json::from_str::<Value>(&payload) {
1415 value
1416 } else if let Ok(value) = parse_sse_first_event_json(&payload) {
1417 value
1419 } else {
1420 return Err(format!(
1421 "Invalid MCP JSON response: {}",
1422 payload.chars().take(400).collect::<String>()
1423 ));
1424 };
1425
1426 Ok((value, response_session_id))
1427}
1428
1429fn parse_sse_first_event_json(payload: &str) -> Result<Value, String> {
1430 let mut data_lines: Vec<&str> = Vec::new();
1431 for raw in payload.lines() {
1432 let line = raw.trim_end_matches('\r');
1433 if let Some(rest) = line.strip_prefix("data:") {
1434 data_lines.push(rest.trim_start());
1435 }
1436 if line.is_empty() {
1437 if !data_lines.is_empty() {
1438 break;
1439 }
1440 continue;
1441 }
1442 }
1443 if data_lines.is_empty() {
1444 return Err("no SSE data event found".to_string());
1445 }
1446 let joined = data_lines.join("\n");
1447 serde_json::from_str::<Value>(&joined).map_err(|e| e.to_string())
1448}
1449
1450fn render_mcp_content(value: &Value) -> String {
1451 let Some(items) = value.as_array() else {
1452 return value.to_string();
1453 };
1454 let mut chunks = Vec::new();
1455 for item in items {
1456 if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
1457 chunks.push(text.to_string());
1458 continue;
1459 }
1460 chunks.push(item.to_string());
1461 }
1462 if chunks.is_empty() {
1463 value.to_string()
1464 } else {
1465 chunks.join("\n")
1466 }
1467}
1468
1469fn normalize_mcp_tool_args(server: &McpServer, tool_name: &str, raw_args: Value) -> Value {
1470 let Some(schema) = server
1471 .tool_cache
1472 .iter()
1473 .find(|row| row.tool_name.eq_ignore_ascii_case(tool_name))
1474 .map(|row| &row.input_schema)
1475 else {
1476 return raw_args;
1477 };
1478
1479 let mut args_obj = match raw_args {
1480 Value::Object(obj) => obj,
1481 other => return other,
1482 };
1483
1484 let properties = schema
1485 .get("properties")
1486 .and_then(|v| v.as_object())
1487 .cloned()
1488 .unwrap_or_default();
1489 if properties.is_empty() {
1490 return Value::Object(args_obj);
1491 }
1492
1493 let mut normalized_existing: HashMap<String, String> = HashMap::new();
1495 for key in args_obj.keys() {
1496 normalized_existing.insert(normalize_arg_key(key), key.clone());
1497 }
1498
1499 let canonical_keys = properties.keys().cloned().collect::<Vec<_>>();
1501 for canonical in &canonical_keys {
1502 if args_obj.contains_key(canonical) {
1503 continue;
1504 }
1505 if let Some(existing_key) = normalized_existing.get(&normalize_arg_key(canonical)) {
1506 if let Some(value) = args_obj.get(existing_key).cloned() {
1507 args_obj.insert(canonical.clone(), value);
1508 }
1509 }
1510 }
1511
1512 let required = schema
1514 .get("required")
1515 .and_then(|v| v.as_array())
1516 .map(|arr| {
1517 arr.iter()
1518 .filter_map(|v| v.as_str().map(str::to_string))
1519 .collect::<Vec<_>>()
1520 })
1521 .unwrap_or_default();
1522
1523 for required_key in required {
1524 if args_obj.contains_key(&required_key) {
1525 continue;
1526 }
1527 if let Some(alias_value) = find_required_alias_value(&required_key, &args_obj) {
1528 args_obj.insert(required_key, alias_value);
1529 }
1530 }
1531
1532 Value::Object(args_obj)
1533}
1534
1535fn find_required_alias_value(
1536 required_key: &str,
1537 args_obj: &serde_json::Map<String, Value>,
1538) -> Option<Value> {
1539 let mut alias_candidates = vec![
1540 required_key.to_string(),
1541 required_key.to_ascii_lowercase(),
1542 required_key.replace('_', ""),
1543 ];
1544
1545 if required_key.contains("title") {
1547 alias_candidates.extend([
1548 "name".to_string(),
1549 "title".to_string(),
1550 "task_name".to_string(),
1551 "taskname".to_string(),
1552 ]);
1553 }
1554
1555 if let Some(base) = required_key.strip_suffix("_id") {
1557 alias_candidates.extend([base.to_string(), format!("{base}id"), format!("{base}_id")]);
1558 }
1559
1560 let mut by_normalized: HashMap<String, &Value> = HashMap::new();
1561 for (key, value) in args_obj {
1562 by_normalized.insert(normalize_arg_key(key), value);
1563 }
1564
1565 alias_candidates
1566 .into_iter()
1567 .find_map(|candidate| by_normalized.get(&normalize_arg_key(&candidate)).cloned())
1568 .cloned()
1569}
1570
1571fn normalize_arg_key(key: &str) -> String {
1572 key.chars()
1573 .filter(|ch| ch.is_ascii_alphanumeric())
1574 .map(|ch| ch.to_ascii_lowercase())
1575 .collect()
1576}
1577
1578async fn spawn_stdio_process(command_text: &str) -> Result<Child, String> {
1579 if command_text.is_empty() {
1580 return Err("Missing stdio command".to_string());
1581 }
1582 #[cfg(windows)]
1583 let mut command = {
1584 let mut cmd = Command::new("powershell");
1585 cmd.args(["-NoProfile", "-Command", command_text]);
1586 cmd
1587 };
1588 #[cfg(not(windows))]
1589 let mut command = {
1590 let mut cmd = Command::new("sh");
1591 cmd.args(["-lc", command_text]);
1592 cmd
1593 };
1594 command
1595 .stdin(std::process::Stdio::null())
1596 .stdout(std::process::Stdio::null())
1597 .stderr(std::process::Stdio::null());
1598 command.spawn().map_err(|e| e.to_string())
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603 use super::*;
1604 use uuid::Uuid;
1605
1606 #[tokio::test]
1607 async fn add_connect_disconnect_non_stdio_server() {
1608 let file = std::env::temp_dir().join(format!("mcp-test-{}.json", Uuid::new_v4()));
1609 let registry = McpRegistry::new_with_state_file(file);
1610 registry
1611 .add("example".to_string(), "sse:https://example.com".to_string())
1612 .await;
1613 assert!(registry.connect("example").await);
1614 let listed = registry.list().await;
1615 assert!(listed.get("example").map(|s| s.connected).unwrap_or(false));
1616 assert!(registry.disconnect("example").await);
1617 }
1618
1619 #[test]
1620 fn parse_remote_endpoint_supports_http_prefixes() {
1621 assert_eq!(
1622 parse_remote_endpoint("https://mcp.example.com/mcp"),
1623 Some("https://mcp.example.com/mcp".to_string())
1624 );
1625 assert_eq!(
1626 parse_remote_endpoint("http:https://mcp.example.com/mcp"),
1627 Some("https://mcp.example.com/mcp".to_string())
1628 );
1629 }
1630
1631 #[test]
1632 fn normalize_schema_removes_non_string_enums_recursively() {
1633 let mut schema = json!({
1634 "type": "object",
1635 "properties": {
1636 "good": { "type": "string", "enum": ["a", "b"] },
1637 "good_nullable": { "type": ["string", "null"], "enum": ["asc", "desc"] },
1638 "bad_object": { "type": "object", "enum": ["asc", "desc"] },
1639 "bad_array": { "type": "array", "enum": ["asc", "desc"] },
1640 "bad_number": { "type": "number", "enum": [1, 2] },
1641 "bad_mixed": { "enum": ["ok", 1] },
1642 "nested": {
1643 "type": "object",
1644 "properties": {
1645 "child": { "enum": [true, false] }
1646 }
1647 }
1648 }
1649 });
1650
1651 normalize_tool_input_schema(&mut schema);
1652
1653 assert!(
1654 schema["properties"]["good"]["enum"].is_array(),
1655 "string enums should be preserved"
1656 );
1657 assert!(
1658 schema["properties"]["good_nullable"]["enum"].is_array(),
1659 "string|null enums should be preserved"
1660 );
1661 assert!(
1662 schema["properties"]["bad_object"]["enum"].is_null(),
1663 "object enums should be dropped"
1664 );
1665 assert!(
1666 schema["properties"]["bad_array"]["enum"].is_null(),
1667 "array enums should be dropped"
1668 );
1669 assert!(
1670 schema["properties"]["bad_number"]["enum"].is_null(),
1671 "non-string enums should be dropped"
1672 );
1673 assert!(
1674 schema["properties"]["bad_mixed"]["enum"].is_null(),
1675 "mixed enums should be dropped"
1676 );
1677 assert!(
1678 schema["properties"]["nested"]["properties"]["child"]["enum"].is_null(),
1679 "recursive non-string enums should be dropped"
1680 );
1681 }
1682
1683 #[test]
1684 fn extract_auth_challenge_from_result_payload() {
1685 let payload = json!({
1686 "content": [
1687 {
1688 "type": "text",
1689 "llm_instructions": "Authorize Gmail access first.",
1690 "authorization_url": "https://example.com/oauth/start"
1691 }
1692 ]
1693 });
1694 let challenge = extract_auth_challenge(&payload, "gmail_whoami")
1695 .expect("auth challenge should be detected");
1696 assert_eq!(challenge.tool_name, "gmail_whoami");
1697 assert_eq!(
1698 challenge.authorization_url,
1699 "https://example.com/oauth/start"
1700 );
1701 assert_eq!(challenge.status, "pending");
1702 }
1703
1704 #[test]
1705 fn extract_auth_challenge_returns_none_without_url() {
1706 let payload = json!({
1707 "content": [
1708 {"type":"text","text":"No authorization needed"}
1709 ]
1710 });
1711 assert!(extract_auth_challenge(&payload, "gmail_whoami").is_none());
1712 }
1713
1714 #[test]
1715 fn extract_auth_challenge_prefers_structured_content_message() {
1716 let payload = json!({
1717 "content": [
1718 {
1719 "type": "text",
1720 "text": "{\"authorization_url\":\"https://example.com/oauth\",\"message\":\"json blob\"}"
1721 }
1722 ],
1723 "structuredContent": {
1724 "authorization_url": "https://example.com/oauth",
1725 "message": "Authorize Reddit access first."
1726 }
1727 });
1728 let challenge = extract_auth_challenge(&payload, "reddit_getmyusername")
1729 .expect("auth challenge should be detected");
1730 assert_eq!(challenge.message, "Authorize Reddit access first.");
1731 }
1732
1733 #[test]
1734 fn sanitize_auth_message_compacts_llm_instructions() {
1735 let raw = "Please show the following link to the end user formatted as markdown: https://example.com/auth\nInform the end user that this tool requires authorization.";
1736 let message = sanitize_auth_message(raw);
1737 assert!(!message.contains('\n'));
1738 assert!(message.len() <= 283);
1739 }
1740
1741 #[test]
1742 fn normalize_mcp_tool_args_maps_clickup_aliases() {
1743 let server = McpServer {
1744 name: "arcade".to_string(),
1745 transport: "https://example.com/mcp".to_string(),
1746 enabled: true,
1747 connected: true,
1748 pid: None,
1749 last_error: None,
1750 last_auth_challenge: None,
1751 mcp_session_id: None,
1752 headers: HashMap::new(),
1753 secret_headers: HashMap::new(),
1754 tool_cache: vec![McpToolCacheEntry {
1755 tool_name: "Clickup_CreateTask".to_string(),
1756 description: "Create task".to_string(),
1757 input_schema: json!({
1758 "type":"object",
1759 "properties":{
1760 "list_id":{"type":"string"},
1761 "task_title":{"type":"string"}
1762 },
1763 "required":["list_id","task_title"]
1764 }),
1765 fetched_at_ms: 0,
1766 schema_hash: "x".to_string(),
1767 }],
1768 tools_fetched_at_ms: None,
1769 pending_auth_by_tool: HashMap::new(),
1770 secret_header_values: HashMap::new(),
1771 };
1772
1773 let normalized = normalize_mcp_tool_args(
1774 &server,
1775 "Clickup_CreateTask",
1776 json!({
1777 "listId": "123",
1778 "name": "Prep fish"
1779 }),
1780 );
1781 assert_eq!(
1782 normalized.get("list_id").and_then(|v| v.as_str()),
1783 Some("123")
1784 );
1785 assert_eq!(
1786 normalized.get("task_title").and_then(|v| v.as_str()),
1787 Some("Prep fish")
1788 );
1789 }
1790
1791 #[test]
1792 fn normalize_arg_key_ignores_case_and_separators() {
1793 assert_eq!(normalize_arg_key("task_title"), "tasktitle");
1794 assert_eq!(normalize_arg_key("taskTitle"), "tasktitle");
1795 assert_eq!(normalize_arg_key("task-title"), "tasktitle");
1796 }
1797
1798 #[test]
1799 fn pending_auth_blocks_retries_within_cooldown() {
1800 let mut server = McpServer {
1801 name: "arcade".to_string(),
1802 transport: "https://example.com/mcp".to_string(),
1803 enabled: true,
1804 connected: true,
1805 pid: None,
1806 last_error: None,
1807 last_auth_challenge: None,
1808 mcp_session_id: None,
1809 headers: HashMap::new(),
1810 secret_headers: HashMap::new(),
1811 tool_cache: Vec::new(),
1812 tools_fetched_at_ms: None,
1813 pending_auth_by_tool: HashMap::new(),
1814 secret_header_values: HashMap::new(),
1815 };
1816 server.pending_auth_by_tool.insert(
1817 "clickup_whoami".to_string(),
1818 PendingMcpAuth {
1819 challenge_id: "abc".to_string(),
1820 authorization_url: "https://example.com/auth".to_string(),
1821 message: "Authorize ClickUp access.".to_string(),
1822 status: "pending".to_string(),
1823 first_seen_ms: 1_000,
1824 last_probe_ms: 2_000,
1825 },
1826 );
1827 let blocked =
1828 pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 10_000, 15_000)
1829 .expect("should block");
1830 assert!(blocked.output.contains("Authorization pending"));
1831 assert!(blocked
1832 .mcp_auth
1833 .get("pending")
1834 .and_then(|v| v.as_bool())
1835 .unwrap_or(false));
1836 }
1837
1838 #[test]
1839 fn pending_auth_allows_probe_after_cooldown() {
1840 let mut server = McpServer {
1841 name: "arcade".to_string(),
1842 transport: "https://example.com/mcp".to_string(),
1843 enabled: true,
1844 connected: true,
1845 pid: None,
1846 last_error: None,
1847 last_auth_challenge: None,
1848 mcp_session_id: None,
1849 headers: HashMap::new(),
1850 secret_headers: HashMap::new(),
1851 tool_cache: Vec::new(),
1852 tools_fetched_at_ms: None,
1853 pending_auth_by_tool: HashMap::new(),
1854 secret_header_values: HashMap::new(),
1855 };
1856 server.pending_auth_by_tool.insert(
1857 "clickup_whoami".to_string(),
1858 PendingMcpAuth {
1859 challenge_id: "abc".to_string(),
1860 authorization_url: "https://example.com/auth".to_string(),
1861 message: "Authorize ClickUp access.".to_string(),
1862 status: "pending".to_string(),
1863 first_seen_ms: 1_000,
1864 last_probe_ms: 2_000,
1865 },
1866 );
1867 assert!(
1868 pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 17_001, 15_000)
1869 .is_none(),
1870 "cooldown elapsed should allow re-probe"
1871 );
1872 }
1873
1874 #[test]
1875 fn pending_auth_is_tool_scoped() {
1876 let mut server = McpServer {
1877 name: "arcade".to_string(),
1878 transport: "https://example.com/mcp".to_string(),
1879 enabled: true,
1880 connected: true,
1881 pid: None,
1882 last_error: None,
1883 last_auth_challenge: None,
1884 mcp_session_id: None,
1885 headers: HashMap::new(),
1886 secret_headers: HashMap::new(),
1887 tool_cache: Vec::new(),
1888 tools_fetched_at_ms: None,
1889 pending_auth_by_tool: HashMap::new(),
1890 secret_header_values: HashMap::new(),
1891 };
1892 server.pending_auth_by_tool.insert(
1893 "gmail_sendemail".to_string(),
1894 PendingMcpAuth {
1895 challenge_id: "abc".to_string(),
1896 authorization_url: "https://example.com/auth".to_string(),
1897 message: "Authorize Gmail access.".to_string(),
1898 status: "pending".to_string(),
1899 first_seen_ms: 1_000,
1900 last_probe_ms: 2_000,
1901 },
1902 );
1903 assert!(pending_auth_short_circuit(
1904 &server,
1905 "gmail_sendemail",
1906 "Gmail_SendEmail",
1907 2_100,
1908 15_000
1909 )
1910 .is_some());
1911 assert!(pending_auth_short_circuit(
1912 &server,
1913 "clickup_whoami",
1914 "Clickup_WhoAmI",
1915 2_100,
1916 15_000
1917 )
1918 .is_none());
1919 }
1920}