1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::{LocalImplicitTenant, SecretRef, TenantContext, ToolResult};
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22 pub tool_name: String,
23 pub description: String,
24 #[serde(default)]
25 pub input_schema: Value,
26 pub fetched_at_ms: u64,
27 pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32 pub name: String,
33 pub transport: String,
34 #[serde(default = "default_enabled")]
35 pub enabled: bool,
36 pub connected: bool,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub pid: Option<u32>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub last_error: Option<String>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub last_auth_challenge: Option<McpAuthChallenge>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub mcp_session_id: Option<String>,
45 #[serde(default)]
46 pub headers: HashMap<String, String>,
47 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48 pub secret_headers: HashMap<String, McpSecretRef>,
49 #[serde(default)]
50 pub tool_cache: Vec<McpToolCacheEntry>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub tools_fetched_at_ms: Option<u64>,
53 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
54 pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
55 #[serde(default, skip)]
56 pub secret_header_values: HashMap<String, String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum McpSecretRef {
62 Store {
63 secret_id: String,
64 #[serde(default)]
65 tenant_context: TenantContext,
66 },
67 Env {
68 env: String,
69 },
70 BearerEnv {
71 env: String,
72 },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct McpAuthChallenge {
77 pub challenge_id: String,
78 pub tool_name: String,
79 pub authorization_url: String,
80 pub message: String,
81 pub requested_at_ms: u64,
82 pub status: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PendingMcpAuth {
87 pub challenge_id: String,
88 pub authorization_url: String,
89 pub message: String,
90 pub status: String,
91 pub first_seen_ms: u64,
92 pub last_probe_ms: u64,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct McpRemoteTool {
97 pub server_name: String,
98 pub tool_name: String,
99 pub namespaced_name: String,
100 pub description: String,
101 #[serde(default)]
102 pub input_schema: Value,
103 pub fetched_at_ms: u64,
104 pub schema_hash: String,
105}
106
107#[derive(Clone)]
108pub struct McpRegistry {
109 servers: Arc<RwLock<HashMap<String, McpServer>>>,
110 processes: Arc<Mutex<HashMap<String, Child>>>,
111 state_file: Arc<PathBuf>,
112}
113
114impl McpRegistry {
115 pub fn new() -> Self {
116 Self::new_with_state_file(resolve_state_file())
117 }
118
119 pub fn new_with_state_file(state_file: PathBuf) -> Self {
120 let (loaded_state, migrated) = load_state(&state_file);
121 let loaded = loaded_state
122 .into_iter()
123 .map(|(k, mut v)| {
124 v.connected = false;
125 v.pid = None;
126 if v.name.trim().is_empty() {
127 v.name = k.clone();
128 }
129 if v.headers.is_empty() {
130 v.headers = HashMap::new();
131 }
132 if v.secret_headers.is_empty() {
133 v.secret_headers = HashMap::new();
134 }
135 let tenant_context = local_tenant_context();
136 v.secret_header_values =
137 resolve_secret_header_values(&v.secret_headers, &tenant_context);
138 (k, v)
139 })
140 .collect::<HashMap<_, _>>();
141 if migrated {
142 persist_state_blocking(&state_file, &loaded);
143 }
144 Self {
145 servers: Arc::new(RwLock::new(loaded)),
146 processes: Arc::new(Mutex::new(HashMap::new())),
147 state_file: Arc::new(state_file),
148 }
149 }
150
151 pub async fn list(&self) -> HashMap<String, McpServer> {
152 self.servers.read().await.clone()
153 }
154
155 pub async fn list_public(&self) -> HashMap<String, McpServer> {
156 self.servers
157 .read()
158 .await
159 .iter()
160 .map(|(name, server)| (name.clone(), redacted_server_view(server)))
161 .collect()
162 }
163
164 pub async fn add(&self, name: String, transport: String) {
165 self.add_or_update(name, transport, HashMap::new(), true)
166 .await;
167 }
168
169 pub async fn add_or_update(
170 &self,
171 name: String,
172 transport: String,
173 headers: HashMap<String, String>,
174 enabled: bool,
175 ) {
176 self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
177 .await;
178 }
179
180 pub async fn add_or_update_with_secret_refs(
181 &self,
182 name: String,
183 transport: String,
184 headers: HashMap<String, String>,
185 secret_headers: HashMap<String, McpSecretRef>,
186 enabled: bool,
187 ) {
188 let normalized_name = name.trim().to_string();
189 let tenant_context = local_tenant_context();
190 let (persisted_headers, persisted_secret_headers, secret_header_values) =
191 split_headers_for_storage(&normalized_name, headers, secret_headers, &tenant_context);
192 let mut servers = self.servers.write().await;
193 let existing = servers.get(&normalized_name).cloned();
194 let preserve_cache = existing.as_ref().is_some_and(|row| {
195 row.transport == transport
196 && effective_headers(row)
197 == combine_headers(&persisted_headers, &secret_header_values)
198 });
199 let existing_tool_cache = if preserve_cache {
200 existing
201 .as_ref()
202 .map(|row| row.tool_cache.clone())
203 .unwrap_or_default()
204 } else {
205 Vec::new()
206 };
207 let existing_fetched_at = if preserve_cache {
208 existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
209 } else {
210 None
211 };
212 let server = McpServer {
213 name: normalized_name.clone(),
214 transport,
215 enabled,
216 connected: false,
217 pid: None,
218 last_error: None,
219 last_auth_challenge: None,
220 mcp_session_id: None,
221 headers: persisted_headers,
222 secret_headers: persisted_secret_headers,
223 tool_cache: existing_tool_cache,
224 tools_fetched_at_ms: existing_fetched_at,
225 pending_auth_by_tool: HashMap::new(),
226 secret_header_values,
227 };
228 servers.insert(normalized_name, server);
229 drop(servers);
230 self.persist_state().await;
231 }
232
233 pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
234 let mut servers = self.servers.write().await;
235 let Some(server) = servers.get_mut(name) else {
236 return false;
237 };
238 server.enabled = enabled;
239 if !enabled {
240 server.connected = false;
241 server.pid = None;
242 server.last_auth_challenge = None;
243 server.mcp_session_id = None;
244 server.pending_auth_by_tool.clear();
245 }
246 drop(servers);
247 if !enabled {
248 if let Some(mut child) = self.processes.lock().await.remove(name) {
249 let _ = child.kill().await;
250 let _ = child.wait().await;
251 }
252 }
253 self.persist_state().await;
254 true
255 }
256
257 pub async fn remove(&self, name: &str) -> bool {
258 let removed_server = {
259 let mut servers = self.servers.write().await;
260 servers.remove(name)
261 };
262 let Some(server) = removed_server else {
263 return false;
264 };
265 let current_tenant = local_tenant_context();
266 delete_secret_header_refs(&server.secret_headers, ¤t_tenant);
267
268 if let Some(mut child) = self.processes.lock().await.remove(name) {
269 let _ = child.kill().await;
270 let _ = child.wait().await;
271 }
272 self.persist_state().await;
273 true
274 }
275
276 pub async fn connect(&self, name: &str) -> bool {
277 let server = {
278 let servers = self.servers.read().await;
279 let Some(server) = servers.get(name) else {
280 return false;
281 };
282 server.clone()
283 };
284
285 if !server.enabled {
286 let mut servers = self.servers.write().await;
287 if let Some(entry) = servers.get_mut(name) {
288 entry.connected = false;
289 entry.pid = None;
290 entry.last_error = Some("MCP server is disabled".to_string());
291 entry.last_auth_challenge = None;
292 entry.mcp_session_id = None;
293 entry.pending_auth_by_tool.clear();
294 }
295 drop(servers);
296 self.persist_state().await;
297 return false;
298 }
299
300 if let Some(command_text) = parse_stdio_transport(&server.transport) {
301 return self.connect_stdio(name, command_text).await;
302 }
303
304 if parse_remote_endpoint(&server.transport).is_some() {
305 return self.refresh(name).await.is_ok();
306 }
307
308 let mut servers = self.servers.write().await;
309 if let Some(entry) = servers.get_mut(name) {
310 entry.connected = true;
311 entry.pid = None;
312 entry.last_error = None;
313 entry.last_auth_challenge = None;
314 entry.mcp_session_id = None;
315 entry.pending_auth_by_tool.clear();
316 }
317 drop(servers);
318 self.persist_state().await;
319 true
320 }
321
322 pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
323 let server = {
324 let servers = self.servers.read().await;
325 let Some(server) = servers.get(name) else {
326 return Err("MCP server not found".to_string());
327 };
328 server.clone()
329 };
330
331 if !server.enabled {
332 return Err("MCP server is disabled".to_string());
333 }
334
335 let endpoint = parse_remote_endpoint(&server.transport)
336 .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
337
338 let request_headers = effective_headers(&server);
339 let (tools, session_id) = match self
340 .discover_remote_tools(&endpoint, &request_headers)
341 .await
342 {
343 Ok(result) => result,
344 Err(err) => {
345 let mut servers = self.servers.write().await;
346 if let Some(entry) = servers.get_mut(name) {
347 entry.connected = false;
348 entry.pid = None;
349 entry.last_error = Some(err.clone());
350 entry.last_auth_challenge = None;
351 entry.mcp_session_id = None;
352 entry.pending_auth_by_tool.clear();
353 entry.tool_cache.clear();
354 entry.tools_fetched_at_ms = None;
355 }
356 drop(servers);
357 self.persist_state().await;
358 return Err(err);
359 }
360 };
361
362 let now = now_ms();
363 let cache = tools
364 .iter()
365 .map(|tool| McpToolCacheEntry {
366 tool_name: tool.tool_name.clone(),
367 description: tool.description.clone(),
368 input_schema: tool.input_schema.clone(),
369 fetched_at_ms: now,
370 schema_hash: schema_hash(&tool.input_schema),
371 })
372 .collect::<Vec<_>>();
373
374 let mut servers = self.servers.write().await;
375 if let Some(entry) = servers.get_mut(name) {
376 entry.connected = true;
377 entry.pid = None;
378 entry.last_error = None;
379 entry.last_auth_challenge = None;
380 entry.mcp_session_id = session_id;
381 entry.tool_cache = cache;
382 entry.tools_fetched_at_ms = Some(now);
383 entry.pending_auth_by_tool.clear();
384 }
385 drop(servers);
386 self.persist_state().await;
387 Ok(self.server_tools(name).await)
388 }
389
390 pub async fn disconnect(&self, name: &str) -> bool {
391 if let Some(mut child) = self.processes.lock().await.remove(name) {
392 let _ = child.kill().await;
393 let _ = child.wait().await;
394 }
395 let mut servers = self.servers.write().await;
396 if let Some(server) = servers.get_mut(name) {
397 server.connected = false;
398 server.pid = None;
399 server.last_auth_challenge = None;
400 server.mcp_session_id = None;
401 server.pending_auth_by_tool.clear();
402 drop(servers);
403 self.persist_state().await;
404 return true;
405 }
406 false
407 }
408
409 pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
410 let mut out = self
411 .servers
412 .read()
413 .await
414 .values()
415 .filter(|server| server.enabled && server.connected)
416 .flat_map(server_tool_rows)
417 .collect::<Vec<_>>();
418 out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
419 out
420 }
421
422 pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
423 let Some(server) = self.servers.read().await.get(name).cloned() else {
424 return Vec::new();
425 };
426 let mut rows = server_tool_rows(&server);
427 rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
428 rows
429 }
430
431 pub async fn call_tool(
432 &self,
433 server_name: &str,
434 tool_name: &str,
435 args: Value,
436 ) -> Result<ToolResult, String> {
437 let server = {
438 let servers = self.servers.read().await;
439 let Some(server) = servers.get(server_name) else {
440 return Err(format!("MCP server '{server_name}' not found"));
441 };
442 server.clone()
443 };
444
445 if !server.enabled {
446 return Err(format!("MCP server '{server_name}' is disabled"));
447 }
448 if !server.connected {
449 return Err(format!("MCP server '{server_name}' is not connected"));
450 }
451
452 let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
453 "MCP tools/call currently supports HTTP/S transports only".to_string()
454 })?;
455 let canonical_tool = canonical_tool_key(tool_name);
456 let now = now_ms();
457 if let Some(blocked) = pending_auth_short_circuit(
458 &server,
459 &canonical_tool,
460 tool_name,
461 now,
462 MCP_AUTH_REPROBE_COOLDOWN_MS,
463 ) {
464 return Ok(ToolResult {
465 output: blocked.output,
466 metadata: json!({
467 "server": server_name,
468 "tool": tool_name,
469 "result": Value::Null,
470 "mcpAuth": blocked.mcp_auth
471 }),
472 });
473 }
474 let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
475
476 {
477 let mut servers = self.servers.write().await;
478 if let Some(row) = servers.get_mut(server_name) {
479 if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
480 pending.last_probe_ms = now;
481 }
482 }
483 }
484
485 let request = json!({
486 "jsonrpc": "2.0",
487 "id": format!("call-{}-{}", server_name, now_ms()),
488 "method": "tools/call",
489 "params": {
490 "name": tool_name,
491 "arguments": normalized_args
492 }
493 });
494 let (response, session_id) = post_json_rpc_with_session(
495 &endpoint,
496 &effective_headers(&server),
497 request,
498 server.mcp_session_id.as_deref(),
499 )
500 .await?;
501 if session_id.is_some() {
502 let mut servers = self.servers.write().await;
503 if let Some(row) = servers.get_mut(server_name) {
504 row.mcp_session_id = session_id;
505 }
506 drop(servers);
507 self.persist_state().await;
508 }
509
510 if let Some(err) = response.get("error") {
511 if let Some(challenge) = extract_auth_challenge(err, tool_name) {
512 let output = format!(
513 "{}\n\nAuthorize here: {}",
514 challenge.message, challenge.authorization_url
515 );
516 {
517 let mut servers = self.servers.write().await;
518 if let Some(row) = servers.get_mut(server_name) {
519 row.last_auth_challenge = Some(challenge.clone());
520 row.last_error = None;
521 row.pending_auth_by_tool.insert(
522 canonical_tool.clone(),
523 pending_auth_from_challenge(&challenge),
524 );
525 }
526 }
527 self.persist_state().await;
528 return Ok(ToolResult {
529 output,
530 metadata: json!({
531 "server": server_name,
532 "tool": tool_name,
533 "result": Value::Null,
534 "mcpAuth": {
535 "required": true,
536 "challengeId": challenge.challenge_id,
537 "tool": challenge.tool_name,
538 "authorizationUrl": challenge.authorization_url,
539 "message": challenge.message,
540 "status": challenge.status
541 }
542 }),
543 });
544 }
545 let message = err
546 .get("message")
547 .and_then(|v| v.as_str())
548 .unwrap_or("MCP tools/call failed");
549 return Err(message.to_string());
550 }
551
552 let result = response.get("result").cloned().unwrap_or(Value::Null);
553 let auth_challenge = extract_auth_challenge(&result, tool_name);
554 let output = if let Some(challenge) = auth_challenge.as_ref() {
555 format!(
556 "{}\n\nAuthorize here: {}",
557 challenge.message, challenge.authorization_url
558 )
559 } else {
560 result
561 .get("content")
562 .map(render_mcp_content)
563 .or_else(|| result.get("output").map(|v| v.to_string()))
564 .unwrap_or_else(|| result.to_string())
565 };
566
567 {
568 let mut servers = self.servers.write().await;
569 if let Some(row) = servers.get_mut(server_name) {
570 row.last_auth_challenge = auth_challenge.clone();
571 if let Some(challenge) = auth_challenge.as_ref() {
572 row.pending_auth_by_tool.insert(
573 canonical_tool.clone(),
574 pending_auth_from_challenge(challenge),
575 );
576 } else {
577 row.pending_auth_by_tool.remove(&canonical_tool);
578 }
579 }
580 }
581 self.persist_state().await;
582
583 let auth_metadata = auth_challenge.as_ref().map(|challenge| {
584 json!({
585 "required": true,
586 "challengeId": challenge.challenge_id,
587 "tool": challenge.tool_name,
588 "authorizationUrl": challenge.authorization_url,
589 "message": challenge.message,
590 "status": challenge.status
591 })
592 });
593
594 Ok(ToolResult {
595 output,
596 metadata: json!({
597 "server": server_name,
598 "tool": tool_name,
599 "result": result,
600 "mcpAuth": auth_metadata
601 }),
602 })
603 }
604
605 async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
606 match spawn_stdio_process(command_text).await {
607 Ok(child) => {
608 let pid = child.id();
609 self.processes.lock().await.insert(name.to_string(), child);
610 let mut servers = self.servers.write().await;
611 if let Some(server) = servers.get_mut(name) {
612 server.connected = true;
613 server.pid = pid;
614 server.last_error = None;
615 server.last_auth_challenge = None;
616 server.pending_auth_by_tool.clear();
617 }
618 drop(servers);
619 self.persist_state().await;
620 true
621 }
622 Err(err) => {
623 let mut servers = self.servers.write().await;
624 if let Some(server) = servers.get_mut(name) {
625 server.connected = false;
626 server.pid = None;
627 server.last_error = Some(err);
628 server.last_auth_challenge = None;
629 server.pending_auth_by_tool.clear();
630 }
631 drop(servers);
632 self.persist_state().await;
633 false
634 }
635 }
636 }
637
638 async fn discover_remote_tools(
639 &self,
640 endpoint: &str,
641 headers: &HashMap<String, String>,
642 ) -> Result<(Vec<McpRemoteTool>, Option<String>), String> {
643 let initialize = json!({
644 "jsonrpc": "2.0",
645 "id": "initialize-1",
646 "method": "initialize",
647 "params": {
648 "protocolVersion": MCP_PROTOCOL_VERSION,
649 "capabilities": {},
650 "clientInfo": {
651 "name": MCP_CLIENT_NAME,
652 "version": MCP_CLIENT_VERSION,
653 }
654 }
655 });
656 let (init_response, mut session_id) =
657 post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
658 if let Some(err) = init_response.get("error") {
659 let message = err
660 .get("message")
661 .and_then(|v| v.as_str())
662 .unwrap_or("MCP initialize failed");
663 return Err(message.to_string());
664 }
665
666 let tools_list = json!({
667 "jsonrpc": "2.0",
668 "id": "tools-list-1",
669 "method": "tools/list",
670 "params": {}
671 });
672 let (tools_response, next_session_id) =
673 post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
674 .await?;
675 if next_session_id.is_some() {
676 session_id = next_session_id;
677 }
678 if let Some(err) = tools_response.get("error") {
679 let message = err
680 .get("message")
681 .and_then(|v| v.as_str())
682 .unwrap_or("MCP tools/list failed");
683 return Err(message.to_string());
684 }
685
686 let tools = tools_response
687 .get("result")
688 .and_then(|v| v.get("tools"))
689 .and_then(|v| v.as_array())
690 .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
691
692 let now = now_ms();
693 let mut out = Vec::new();
694 for row in tools {
695 let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
696 continue;
697 };
698 let description = row
699 .get("description")
700 .and_then(|v| v.as_str())
701 .unwrap_or("")
702 .to_string();
703 let mut input_schema = row
704 .get("inputSchema")
705 .or_else(|| row.get("input_schema"))
706 .cloned()
707 .unwrap_or_else(|| json!({"type":"object"}));
708 normalize_tool_input_schema(&mut input_schema);
709 out.push(McpRemoteTool {
710 server_name: String::new(),
711 tool_name: tool_name.to_string(),
712 namespaced_name: String::new(),
713 description,
714 input_schema,
715 fetched_at_ms: now,
716 schema_hash: String::new(),
717 });
718 }
719
720 Ok((out, session_id))
721 }
722
723 async fn persist_state(&self) {
724 let snapshot = self.servers.read().await.clone();
725 persist_state_blocking(self.state_file.as_path(), &snapshot);
726 }
727}
728
729impl Default for McpRegistry {
730 fn default() -> Self {
731 Self::new()
732 }
733}
734
735fn default_enabled() -> bool {
736 true
737}
738
739fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
740 if let Some(parent) = path.parent() {
741 let _ = std::fs::create_dir_all(parent);
742 }
743 if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
744 let _ = std::fs::write(path, payload);
745 }
746}
747
748fn resolve_state_file() -> PathBuf {
749 if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
750 return PathBuf::from(path);
751 }
752 if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
753 let trimmed = state_dir.trim();
754 if !trimmed.is_empty() {
755 return PathBuf::from(trimmed).join("mcp_servers.json");
756 }
757 }
758 if let Some(data_dir) = dirs::data_dir() {
759 return data_dir
760 .join("tandem")
761 .join("data")
762 .join("mcp_servers.json");
763 }
764 dirs::home_dir()
765 .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
766 .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
767}
768
769fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
770 let Ok(raw) = std::fs::read_to_string(path) else {
771 return (HashMap::new(), false);
772 };
773 let mut migrated = false;
774 let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
775 for (name, server) in parsed.iter_mut() {
776 let tenant_context = local_tenant_context();
777 let (headers, secret_headers, secret_header_values, server_migrated) =
778 migrate_server_headers(name, server, &tenant_context);
779 migrated = migrated || server_migrated;
780 server.headers = headers;
781 server.secret_headers = secret_headers;
782 server.secret_header_values = secret_header_values;
783 }
784 (parsed, migrated)
785}
786
787fn migrate_server_headers(
788 server_name: &str,
789 server: &McpServer,
790 current_tenant: &TenantContext,
791) -> (
792 HashMap<String, String>,
793 HashMap<String, McpSecretRef>,
794 HashMap<String, String>,
795 bool,
796) {
797 let original_effective = effective_headers(server);
798 let mut persisted_secret_headers = server.secret_headers.clone();
799 let mut secret_header_values =
800 resolve_secret_header_values(&persisted_secret_headers, current_tenant);
801 let mut persisted_headers = server.headers.clone();
802 let mut migrated = false;
803
804 let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
805 for header_name in header_keys {
806 let Some(value) = persisted_headers.get(&header_name).cloned() else {
807 continue;
808 };
809 if persisted_secret_headers.contains_key(&header_name) {
810 continue;
811 }
812 if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
813 persisted_headers.remove(&header_name);
814 let resolved =
815 resolve_secret_ref_value(&secret_ref, current_tenant).unwrap_or_default();
816 persisted_secret_headers.insert(header_name.clone(), secret_ref);
817 if !resolved.is_empty() {
818 secret_header_values.insert(header_name.clone(), resolved);
819 }
820 migrated = true;
821 continue;
822 }
823 if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
824 let secret_id = mcp_header_secret_id(server_name, &header_name);
825 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
826 persisted_headers.remove(&header_name);
827 persisted_secret_headers.insert(
828 header_name.clone(),
829 McpSecretRef::Store {
830 secret_id: secret_id.clone(),
831 tenant_context: current_tenant.clone(),
832 },
833 );
834 secret_header_values.insert(header_name.clone(), value);
835 migrated = true;
836 }
837 }
838 }
839
840 if !migrated {
841 let effective = combine_headers(&persisted_headers, &secret_header_values);
842 migrated = effective != original_effective;
843 }
844
845 (
846 persisted_headers,
847 persisted_secret_headers,
848 secret_header_values,
849 migrated,
850 )
851}
852
853fn split_headers_for_storage(
854 server_name: &str,
855 headers: HashMap<String, String>,
856 explicit_secret_headers: HashMap<String, McpSecretRef>,
857 current_tenant: &TenantContext,
858) -> (
859 HashMap<String, String>,
860 HashMap<String, McpSecretRef>,
861 HashMap<String, String>,
862) {
863 let mut persisted_headers = HashMap::new();
864 let mut persisted_secret_headers = HashMap::new();
865 let mut secret_header_values = HashMap::new();
866
867 for (header_name, raw_value) in headers {
868 let value = raw_value.trim().to_string();
869 if value.is_empty() {
870 continue;
871 }
872 if let Some(secret_ref) = parse_secret_header_reference(&value) {
873 if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
874 secret_header_values.insert(header_name.clone(), resolved);
875 }
876 persisted_secret_headers.insert(header_name, secret_ref);
877 continue;
878 }
879 if header_name_is_sensitive(&header_name) {
880 let secret_id = mcp_header_secret_id(server_name, &header_name);
881 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
882 persisted_secret_headers.insert(
883 header_name.clone(),
884 McpSecretRef::Store {
885 secret_id: secret_id.clone(),
886 tenant_context: current_tenant.clone(),
887 },
888 );
889 secret_header_values.insert(header_name, value);
890 continue;
891 }
892 }
893 persisted_headers.insert(header_name, value);
894 }
895
896 for (header_name, secret_ref) in explicit_secret_headers {
897 if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
898 secret_header_values.insert(header_name.clone(), resolved);
899 }
900 persisted_headers.remove(&header_name);
901 persisted_secret_headers.insert(header_name, secret_ref);
902 }
903
904 (
905 persisted_headers,
906 persisted_secret_headers,
907 secret_header_values,
908 )
909}
910
911fn combine_headers(
912 headers: &HashMap<String, String>,
913 secret_header_values: &HashMap<String, String>,
914) -> HashMap<String, String> {
915 let mut combined = headers.clone();
916 for (key, value) in secret_header_values {
917 if !value.trim().is_empty() {
918 combined.insert(key.clone(), value.clone());
919 }
920 }
921 combined
922}
923
924fn effective_headers(server: &McpServer) -> HashMap<String, String> {
925 combine_headers(&server.headers, &server.secret_header_values)
926}
927
928fn redacted_server_view(server: &McpServer) -> McpServer {
929 let mut clone = server.clone();
930 for (header_name, secret_ref) in &clone.secret_headers {
931 clone.headers.insert(
932 header_name.clone(),
933 redacted_secret_header_value(secret_ref),
934 );
935 }
936 clone.secret_header_values.clear();
937 clone
938}
939
940fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
941 match secret_ref {
942 McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
943 McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
944 }
945}
946
947fn resolve_secret_header_values(
948 secret_headers: &HashMap<String, McpSecretRef>,
949 current_tenant: &TenantContext,
950) -> HashMap<String, String> {
951 let mut out = HashMap::new();
952 for (header_name, secret_ref) in secret_headers {
953 if let Some(value) = resolve_secret_ref_value(secret_ref, current_tenant) {
954 if !value.trim().is_empty() {
955 out.insert(header_name.clone(), value);
956 }
957 }
958 }
959 out
960}
961
962fn delete_secret_header_refs(
963 secret_headers: &HashMap<String, McpSecretRef>,
964 current_tenant: &TenantContext,
965) {
966 for secret_ref in secret_headers.values() {
967 if let McpSecretRef::Store {
968 secret_id,
969 tenant_context,
970 } = secret_ref
971 {
972 if tenant_context != current_tenant {
973 continue;
974 }
975 let _ = tandem_core::delete_provider_auth(secret_id);
976 }
977 }
978}
979
980fn resolve_secret_ref_value(
981 secret_ref: &McpSecretRef,
982 current_tenant: &TenantContext,
983) -> Option<String> {
984 match secret_ref {
985 McpSecretRef::Store {
986 secret_id,
987 tenant_context,
988 } => {
989 let secret_ref = SecretRef {
990 org_id: tenant_context.org_id.clone(),
991 workspace_id: tenant_context.workspace_id.clone(),
992 provider: "mcp_header".to_string(),
993 secret_id: secret_id.trim().to_string(),
994 name: secret_id.trim().to_string(),
995 };
996 if secret_ref.validate_for_tenant(current_tenant).is_err() {
997 return None;
998 }
999 tandem_core::load_provider_auth()
1000 .get(&secret_id.trim().to_ascii_lowercase())
1001 .cloned()
1002 .filter(|value| !value.trim().is_empty())
1003 }
1004 McpSecretRef::Env { env } => std::env::var(env)
1005 .ok()
1006 .map(|value| value.trim().to_string())
1007 .filter(|value| !value.is_empty()),
1008 McpSecretRef::BearerEnv { env } => std::env::var(env)
1009 .ok()
1010 .map(|value| value.trim().to_string())
1011 .filter(|value| !value.is_empty())
1012 .map(|value| format!("Bearer {value}")),
1013 }
1014}
1015
1016fn local_tenant_context() -> TenantContext {
1017 LocalImplicitTenant.into()
1018}
1019
1020fn parse_secret_header_reference(raw: &str) -> Option<McpSecretRef> {
1021 let trimmed = raw.trim();
1022 if let Some(env) = trimmed
1023 .strip_prefix("${env:")
1024 .and_then(|rest| rest.strip_suffix('}'))
1025 .map(str::trim)
1026 .filter(|value| !value.is_empty())
1027 {
1028 return Some(McpSecretRef::Env {
1029 env: env.to_string(),
1030 });
1031 }
1032 if let Some(env) = trimmed
1033 .strip_prefix("${bearer_env:")
1034 .and_then(|rest| rest.strip_suffix('}'))
1035 .map(str::trim)
1036 .filter(|value| !value.is_empty())
1037 {
1038 return Some(McpSecretRef::BearerEnv {
1039 env: env.to_string(),
1040 });
1041 }
1042 if let Some(env) = trimmed
1043 .strip_prefix("Bearer ${env:")
1044 .and_then(|rest| rest.strip_suffix("}"))
1045 .map(str::trim)
1046 .filter(|value| !value.is_empty())
1047 {
1048 return Some(McpSecretRef::BearerEnv {
1049 env: env.to_string(),
1050 });
1051 }
1052 None
1053}
1054
1055fn header_name_is_sensitive(header_name: &str) -> bool {
1056 let normalized = header_name.trim().to_ascii_lowercase();
1057 normalized == "authorization"
1058 || normalized == "proxy-authorization"
1059 || normalized == "x-api-key"
1060 || normalized.contains("token")
1061 || normalized.contains("secret")
1062 || normalized.ends_with("api-key")
1063 || normalized.ends_with("api_key")
1064}
1065
1066fn mcp_header_secret_id(server_name: &str, header_name: &str) -> String {
1067 format!(
1068 "mcp_header::{}::{}",
1069 sanitize_namespace_segment(server_name),
1070 sanitize_namespace_segment(header_name)
1071 )
1072}
1073
1074fn parse_stdio_transport(transport: &str) -> Option<&str> {
1075 transport.strip_prefix("stdio:").map(str::trim)
1076}
1077
1078fn parse_remote_endpoint(transport: &str) -> Option<String> {
1079 let trimmed = transport.trim();
1080 if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
1081 return Some(trimmed.to_string());
1082 }
1083 for prefix in ["http:", "https:"] {
1084 if let Some(rest) = trimmed.strip_prefix(prefix) {
1085 let endpoint = rest.trim();
1086 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
1087 return Some(endpoint.to_string());
1088 }
1089 }
1090 }
1091 None
1092}
1093
1094fn server_tool_rows(server: &McpServer) -> Vec<McpRemoteTool> {
1095 let server_slug = sanitize_namespace_segment(&server.name);
1096 server
1097 .tool_cache
1098 .iter()
1099 .map(|tool| {
1100 let tool_slug = sanitize_namespace_segment(&tool.tool_name);
1101 McpRemoteTool {
1102 server_name: server.name.clone(),
1103 tool_name: tool.tool_name.clone(),
1104 namespaced_name: format!("mcp.{server_slug}.{tool_slug}"),
1105 description: tool.description.clone(),
1106 input_schema: tool.input_schema.clone(),
1107 fetched_at_ms: tool.fetched_at_ms,
1108 schema_hash: tool.schema_hash.clone(),
1109 }
1110 })
1111 .collect()
1112}
1113
1114fn sanitize_namespace_segment(raw: &str) -> String {
1115 let mut out = String::new();
1116 let mut previous_underscore = false;
1117 for ch in raw.trim().chars() {
1118 if ch.is_ascii_alphanumeric() {
1119 out.push(ch.to_ascii_lowercase());
1120 previous_underscore = false;
1121 } else if !previous_underscore {
1122 out.push('_');
1123 previous_underscore = true;
1124 }
1125 }
1126 let cleaned = out.trim_matches('_');
1127 if cleaned.is_empty() {
1128 "tool".to_string()
1129 } else {
1130 cleaned.to_string()
1131 }
1132}
1133
1134fn schema_hash(schema: &Value) -> String {
1135 let payload = serde_json::to_vec(schema).unwrap_or_default();
1136 let mut hasher = Sha256::new();
1137 hasher.update(payload);
1138 format!("{:x}", hasher.finalize())
1139}
1140
1141fn extract_auth_challenge(result: &Value, tool_name: &str) -> Option<McpAuthChallenge> {
1142 let authorization_url = find_string_with_priority(
1143 result,
1144 &[
1145 &["structuredContent", "authorization_url"],
1146 &["structuredContent", "authorizationUrl"],
1147 &["authorization_url"],
1148 &["authorizationUrl"],
1149 &["auth_url"],
1150 ],
1151 &["authorization_url", "authorizationUrl", "auth_url"],
1152 )?;
1153 let raw_message = find_string_with_priority(
1154 result,
1155 &[
1156 &["structuredContent", "message"],
1157 &["message"],
1158 &["structuredContent", "text"],
1159 &["text"],
1160 &["llm_instructions"],
1161 ],
1162 &["message", "text", "llm_instructions"],
1163 )
1164 .unwrap_or_else(|| "This tool requires authorization before it can run.".to_string());
1165 let message = sanitize_auth_message(&raw_message);
1166 let challenge_id = stable_id_seed(&format!("{tool_name}:{authorization_url}"));
1167 Some(McpAuthChallenge {
1168 challenge_id,
1169 tool_name: tool_name.to_string(),
1170 authorization_url,
1171 message,
1172 requested_at_ms: now_ms(),
1173 status: "pending".to_string(),
1174 })
1175}
1176
1177fn find_string_by_any_key(value: &Value, keys: &[&str]) -> Option<String> {
1178 match value {
1179 Value::Object(map) => {
1180 for key in keys {
1181 if let Some(s) = map.get(*key).and_then(|v| v.as_str()) {
1182 let trimmed = s.trim();
1183 if !trimmed.is_empty() {
1184 return Some(trimmed.to_string());
1185 }
1186 }
1187 }
1188 for child in map.values() {
1189 if let Some(found) = find_string_by_any_key(child, keys) {
1190 return Some(found);
1191 }
1192 }
1193 None
1194 }
1195 Value::Array(items) => items
1196 .iter()
1197 .find_map(|item| find_string_by_any_key(item, keys)),
1198 _ => None,
1199 }
1200}
1201
1202fn find_string_with_priority(
1203 value: &Value,
1204 paths: &[&[&str]],
1205 fallback_keys: &[&str],
1206) -> Option<String> {
1207 for path in paths {
1208 if let Some(found) = find_string_at_path(value, path) {
1209 return Some(found);
1210 }
1211 }
1212 find_string_by_any_key(value, fallback_keys)
1213}
1214
1215fn find_string_at_path(value: &Value, path: &[&str]) -> Option<String> {
1216 let mut current = value;
1217 for segment in path {
1218 current = current.get(*segment)?;
1219 }
1220 let s = current.as_str()?.trim();
1221 if s.is_empty() {
1222 None
1223 } else {
1224 Some(s.to_string())
1225 }
1226}
1227
1228fn sanitize_auth_message(raw: &str) -> String {
1229 let trimmed = raw.trim();
1230 if trimmed.is_empty() {
1231 return "This tool requires authorization before it can run.".to_string();
1232 }
1233 if let Some((head, _)) = trimmed.split_once("Authorize here:") {
1234 let head = head.trim();
1235 if !head.is_empty() {
1236 return truncate_text(head, 280);
1237 }
1238 }
1239 let no_newlines = trimmed.replace(['\r', '\n'], " ");
1240 truncate_text(no_newlines.trim(), 280)
1241}
1242
1243fn truncate_text(input: &str, max_chars: usize) -> String {
1244 if input.chars().count() <= max_chars {
1245 return input.to_string();
1246 }
1247 let truncated = input.chars().take(max_chars).collect::<String>();
1248 format!("{truncated}...")
1249}
1250
1251fn stable_id_seed(seed: &str) -> String {
1252 let mut hasher = Sha256::new();
1253 hasher.update(seed.as_bytes());
1254 let encoded = format!("{:x}", hasher.finalize());
1255 encoded.chars().take(16).collect()
1256}
1257
1258fn canonical_tool_key(tool_name: &str) -> String {
1259 tool_name.trim().to_ascii_lowercase()
1260}
1261
1262fn pending_auth_from_challenge(challenge: &McpAuthChallenge) -> PendingMcpAuth {
1263 PendingMcpAuth {
1264 challenge_id: challenge.challenge_id.clone(),
1265 authorization_url: challenge.authorization_url.clone(),
1266 message: challenge.message.clone(),
1267 status: challenge.status.clone(),
1268 first_seen_ms: challenge.requested_at_ms,
1269 last_probe_ms: challenge.requested_at_ms,
1270 }
1271}
1272
1273struct PendingAuthShortCircuit {
1274 output: String,
1275 mcp_auth: Value,
1276}
1277
1278fn pending_auth_short_circuit(
1279 server: &McpServer,
1280 tool_key: &str,
1281 tool_name: &str,
1282 now_ms_value: u64,
1283 cooldown_ms: u64,
1284) -> Option<PendingAuthShortCircuit> {
1285 let pending = server.pending_auth_by_tool.get(tool_key)?;
1286 let elapsed = now_ms_value.saturating_sub(pending.last_probe_ms);
1287 if elapsed >= cooldown_ms {
1288 return None;
1289 }
1290 let retry_after_ms = cooldown_ms.saturating_sub(elapsed);
1291 let output = format!(
1292 "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1293 tool_name,
1294 pending.message,
1295 pending.authorization_url,
1296 retry_after_ms.div_ceil(1000)
1297 );
1298 Some(PendingAuthShortCircuit {
1299 output,
1300 mcp_auth: json!({
1301 "required": true,
1302 "pending": true,
1303 "blocked": true,
1304 "retryAfterMs": retry_after_ms,
1305 "challengeId": pending.challenge_id,
1306 "tool": tool_name,
1307 "authorizationUrl": pending.authorization_url,
1308 "message": pending.message,
1309 "status": pending.status
1310 }),
1311 })
1312}
1313
1314fn normalize_tool_input_schema(schema: &mut Value) {
1315 normalize_schema_node(schema);
1316}
1317
1318fn normalize_schema_node(node: &mut Value) {
1319 let Some(obj) = node.as_object_mut() else {
1320 return;
1321 };
1322
1323 if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
1327 let all_strings = enum_values.iter().all(|v| v.is_string());
1328 let string_like_type = schema_type_allows_string_enum(obj.get("type"));
1329 if !all_strings || !string_like_type {
1330 obj.remove("enum");
1331 }
1332 }
1333
1334 if let Some(properties) = obj.get_mut("properties").and_then(|v| v.as_object_mut()) {
1335 for value in properties.values_mut() {
1336 normalize_schema_node(value);
1337 }
1338 }
1339
1340 if let Some(items) = obj.get_mut("items") {
1341 normalize_schema_node(items);
1342 }
1343
1344 for key in ["anyOf", "oneOf", "allOf"] {
1345 if let Some(array) = obj.get_mut(key).and_then(|v| v.as_array_mut()) {
1346 for child in array.iter_mut() {
1347 normalize_schema_node(child);
1348 }
1349 }
1350 }
1351
1352 if let Some(additional) = obj.get_mut("additionalProperties") {
1353 normalize_schema_node(additional);
1354 }
1355}
1356
1357fn schema_type_allows_string_enum(schema_type: Option<&Value>) -> bool {
1358 let Some(schema_type) = schema_type else {
1359 return true;
1361 };
1362
1363 if let Some(kind) = schema_type.as_str() {
1364 return kind == "string";
1365 }
1366
1367 if let Some(kinds) = schema_type.as_array() {
1368 let mut saw_string = false;
1369 for kind in kinds {
1370 let Some(kind) = kind.as_str() else {
1371 return false;
1372 };
1373 if kind == "string" {
1374 saw_string = true;
1375 continue;
1376 }
1377 if kind != "null" {
1378 return false;
1379 }
1380 }
1381 return saw_string;
1382 }
1383
1384 false
1385}
1386
1387fn now_ms() -> u64 {
1388 SystemTime::now()
1389 .duration_since(UNIX_EPOCH)
1390 .map(|d| d.as_millis() as u64)
1391 .unwrap_or(0)
1392}
1393
1394fn build_headers(headers: &HashMap<String, String>) -> Result<HeaderMap, String> {
1395 let mut map = HeaderMap::new();
1396 map.insert(
1397 ACCEPT,
1398 HeaderValue::from_static("application/json, text/event-stream"),
1399 );
1400 map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
1401 for (key, value) in headers {
1402 let name = HeaderName::from_bytes(key.trim().as_bytes())
1403 .map_err(|e| format!("Invalid header name '{key}': {e}"))?;
1404 let header = HeaderValue::from_str(value.trim())
1405 .map_err(|e| format!("Invalid header value for '{key}': {e}"))?;
1406 map.insert(name, header);
1407 }
1408 Ok(map)
1409}
1410
1411async fn post_json_rpc_with_session(
1412 endpoint: &str,
1413 headers: &HashMap<String, String>,
1414 request: Value,
1415 session_id: Option<&str>,
1416) -> Result<(Value, Option<String>), String> {
1417 let client = reqwest::Client::builder()
1418 .timeout(std::time::Duration::from_secs(12))
1419 .build()
1420 .map_err(|e| format!("Failed to build HTTP client: {e}"))?;
1421 let mut req = client.post(endpoint).headers(build_headers(headers)?);
1422 if let Some(id) = session_id {
1423 let trimmed = id.trim();
1424 if !trimmed.is_empty() {
1425 req = req.header("Mcp-Session-Id", trimmed);
1426 }
1427 }
1428 let response = req
1429 .json(&request)
1430 .send()
1431 .await
1432 .map_err(|e| format!("MCP request failed: {e}"))?;
1433 let content_type = response
1434 .headers()
1435 .get(CONTENT_TYPE)
1436 .and_then(|v| v.to_str().ok())
1437 .unwrap_or("")
1438 .to_ascii_lowercase();
1439 let response_session_id = response
1440 .headers()
1441 .get("mcp-session-id")
1442 .and_then(|v| v.to_str().ok())
1443 .map(|v| v.trim().to_string())
1444 .filter(|v| !v.is_empty());
1445 let status = response.status();
1446 let payload = response
1447 .text()
1448 .await
1449 .map_err(|e| format!("Failed to read MCP response: {e}"))?;
1450 if !status.is_success() {
1451 return Err(format!(
1452 "MCP endpoint returned HTTP {}: {}",
1453 status.as_u16(),
1454 payload.chars().take(400).collect::<String>()
1455 ));
1456 }
1457
1458 let value = if content_type.starts_with("text/event-stream") {
1459 parse_sse_first_event_json(&payload).map_err(|e| {
1460 format!(
1461 "Invalid MCP SSE JSON response: {} (snippet: {})",
1462 e,
1463 payload.chars().take(400).collect::<String>()
1464 )
1465 })?
1466 } else if let Ok(value) = serde_json::from_str::<Value>(&payload) {
1467 value
1468 } else if let Ok(value) = parse_sse_first_event_json(&payload) {
1469 value
1471 } else {
1472 return Err(format!(
1473 "Invalid MCP JSON response: {}",
1474 payload.chars().take(400).collect::<String>()
1475 ));
1476 };
1477
1478 Ok((value, response_session_id))
1479}
1480
1481fn parse_sse_first_event_json(payload: &str) -> Result<Value, String> {
1482 let mut data_lines: Vec<&str> = Vec::new();
1483 for raw in payload.lines() {
1484 let line = raw.trim_end_matches('\r');
1485 if let Some(rest) = line.strip_prefix("data:") {
1486 data_lines.push(rest.trim_start());
1487 }
1488 if line.is_empty() {
1489 if !data_lines.is_empty() {
1490 break;
1491 }
1492 continue;
1493 }
1494 }
1495 if data_lines.is_empty() {
1496 return Err("no SSE data event found".to_string());
1497 }
1498 let joined = data_lines.join("\n");
1499 serde_json::from_str::<Value>(&joined).map_err(|e| e.to_string())
1500}
1501
1502fn render_mcp_content(value: &Value) -> String {
1503 let Some(items) = value.as_array() else {
1504 return value.to_string();
1505 };
1506 let mut chunks = Vec::new();
1507 for item in items {
1508 if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
1509 chunks.push(text.to_string());
1510 continue;
1511 }
1512 chunks.push(item.to_string());
1513 }
1514 if chunks.is_empty() {
1515 value.to_string()
1516 } else {
1517 chunks.join("\n")
1518 }
1519}
1520
1521fn normalize_mcp_tool_args(server: &McpServer, tool_name: &str, raw_args: Value) -> Value {
1522 let Some(schema) = server
1523 .tool_cache
1524 .iter()
1525 .find(|row| row.tool_name.eq_ignore_ascii_case(tool_name))
1526 .map(|row| &row.input_schema)
1527 else {
1528 return raw_args;
1529 };
1530
1531 let mut args_obj = match raw_args {
1532 Value::Object(obj) => obj,
1533 other => return other,
1534 };
1535
1536 let properties = schema
1537 .get("properties")
1538 .and_then(|v| v.as_object())
1539 .cloned()
1540 .unwrap_or_default();
1541 if properties.is_empty() {
1542 return Value::Object(args_obj);
1543 }
1544
1545 let mut normalized_existing: HashMap<String, String> = HashMap::new();
1547 for key in args_obj.keys() {
1548 normalized_existing.insert(normalize_arg_key(key), key.clone());
1549 }
1550
1551 let canonical_keys = properties.keys().cloned().collect::<Vec<_>>();
1553 for canonical in &canonical_keys {
1554 if args_obj.contains_key(canonical) {
1555 continue;
1556 }
1557 if let Some(existing_key) = normalized_existing.get(&normalize_arg_key(canonical)) {
1558 if let Some(value) = args_obj.get(existing_key).cloned() {
1559 args_obj.insert(canonical.clone(), value);
1560 }
1561 }
1562 }
1563
1564 let required = schema
1566 .get("required")
1567 .and_then(|v| v.as_array())
1568 .map(|arr| {
1569 arr.iter()
1570 .filter_map(|v| v.as_str().map(str::to_string))
1571 .collect::<Vec<_>>()
1572 })
1573 .unwrap_or_default();
1574
1575 for required_key in required {
1576 if args_obj.contains_key(&required_key) {
1577 continue;
1578 }
1579 if let Some(alias_value) = find_required_alias_value(&required_key, &args_obj) {
1580 args_obj.insert(required_key, alias_value);
1581 }
1582 }
1583
1584 Value::Object(args_obj)
1585}
1586
1587fn find_required_alias_value(
1588 required_key: &str,
1589 args_obj: &serde_json::Map<String, Value>,
1590) -> Option<Value> {
1591 let mut alias_candidates = vec![
1592 required_key.to_string(),
1593 required_key.to_ascii_lowercase(),
1594 required_key.replace('_', ""),
1595 ];
1596
1597 if required_key.contains("title") {
1599 alias_candidates.extend([
1600 "name".to_string(),
1601 "title".to_string(),
1602 "task_name".to_string(),
1603 "taskname".to_string(),
1604 ]);
1605 }
1606
1607 if let Some(base) = required_key.strip_suffix("_id") {
1609 alias_candidates.extend([base.to_string(), format!("{base}id"), format!("{base}_id")]);
1610 }
1611
1612 let mut by_normalized: HashMap<String, &Value> = HashMap::new();
1613 for (key, value) in args_obj {
1614 by_normalized.insert(normalize_arg_key(key), value);
1615 }
1616
1617 alias_candidates
1618 .into_iter()
1619 .find_map(|candidate| by_normalized.get(&normalize_arg_key(&candidate)).cloned())
1620 .cloned()
1621}
1622
1623fn normalize_arg_key(key: &str) -> String {
1624 key.chars()
1625 .filter(|ch| ch.is_ascii_alphanumeric())
1626 .map(|ch| ch.to_ascii_lowercase())
1627 .collect()
1628}
1629
1630async fn spawn_stdio_process(command_text: &str) -> Result<Child, String> {
1631 if command_text.is_empty() {
1632 return Err("Missing stdio command".to_string());
1633 }
1634 #[cfg(windows)]
1635 let mut command = {
1636 let mut cmd = Command::new("powershell");
1637 cmd.args(["-NoProfile", "-Command", command_text]);
1638 cmd
1639 };
1640 #[cfg(not(windows))]
1641 let mut command = {
1642 let mut cmd = Command::new("sh");
1643 cmd.args(["-lc", command_text]);
1644 cmd
1645 };
1646 command
1647 .stdin(std::process::Stdio::null())
1648 .stdout(std::process::Stdio::null())
1649 .stderr(std::process::Stdio::null());
1650 command.spawn().map_err(|e| e.to_string())
1651}
1652
1653#[cfg(test)]
1654mod tests {
1655 use super::*;
1656 use uuid::Uuid;
1657
1658 #[tokio::test]
1659 async fn add_connect_disconnect_non_stdio_server() {
1660 let file = std::env::temp_dir().join(format!("mcp-test-{}.json", Uuid::new_v4()));
1661 let registry = McpRegistry::new_with_state_file(file);
1662 registry
1663 .add("example".to_string(), "sse:https://example.com".to_string())
1664 .await;
1665 assert!(registry.connect("example").await);
1666 let listed = registry.list().await;
1667 assert!(listed.get("example").map(|s| s.connected).unwrap_or(false));
1668 assert!(registry.disconnect("example").await);
1669 }
1670
1671 #[test]
1672 fn parse_remote_endpoint_supports_http_prefixes() {
1673 assert_eq!(
1674 parse_remote_endpoint("https://mcp.example.com/mcp"),
1675 Some("https://mcp.example.com/mcp".to_string())
1676 );
1677 assert_eq!(
1678 parse_remote_endpoint("http:https://mcp.example.com/mcp"),
1679 Some("https://mcp.example.com/mcp".to_string())
1680 );
1681 }
1682
1683 #[test]
1684 fn normalize_schema_removes_non_string_enums_recursively() {
1685 let mut schema = json!({
1686 "type": "object",
1687 "properties": {
1688 "good": { "type": "string", "enum": ["a", "b"] },
1689 "good_nullable": { "type": ["string", "null"], "enum": ["asc", "desc"] },
1690 "bad_object": { "type": "object", "enum": ["asc", "desc"] },
1691 "bad_array": { "type": "array", "enum": ["asc", "desc"] },
1692 "bad_number": { "type": "number", "enum": [1, 2] },
1693 "bad_mixed": { "enum": ["ok", 1] },
1694 "nested": {
1695 "type": "object",
1696 "properties": {
1697 "child": { "enum": [true, false] }
1698 }
1699 }
1700 }
1701 });
1702
1703 normalize_tool_input_schema(&mut schema);
1704
1705 assert!(
1706 schema["properties"]["good"]["enum"].is_array(),
1707 "string enums should be preserved"
1708 );
1709 assert!(
1710 schema["properties"]["good_nullable"]["enum"].is_array(),
1711 "string|null enums should be preserved"
1712 );
1713 assert!(
1714 schema["properties"]["bad_object"]["enum"].is_null(),
1715 "object enums should be dropped"
1716 );
1717 assert!(
1718 schema["properties"]["bad_array"]["enum"].is_null(),
1719 "array enums should be dropped"
1720 );
1721 assert!(
1722 schema["properties"]["bad_number"]["enum"].is_null(),
1723 "non-string enums should be dropped"
1724 );
1725 assert!(
1726 schema["properties"]["bad_mixed"]["enum"].is_null(),
1727 "mixed enums should be dropped"
1728 );
1729 assert!(
1730 schema["properties"]["nested"]["properties"]["child"]["enum"].is_null(),
1731 "recursive non-string enums should be dropped"
1732 );
1733 }
1734
1735 #[test]
1736 fn extract_auth_challenge_from_result_payload() {
1737 let payload = json!({
1738 "content": [
1739 {
1740 "type": "text",
1741 "llm_instructions": "Authorize Gmail access first.",
1742 "authorization_url": "https://example.com/oauth/start"
1743 }
1744 ]
1745 });
1746 let challenge = extract_auth_challenge(&payload, "gmail_whoami")
1747 .expect("auth challenge should be detected");
1748 assert_eq!(challenge.tool_name, "gmail_whoami");
1749 assert_eq!(
1750 challenge.authorization_url,
1751 "https://example.com/oauth/start"
1752 );
1753 assert_eq!(challenge.status, "pending");
1754 }
1755
1756 #[test]
1757 fn extract_auth_challenge_returns_none_without_url() {
1758 let payload = json!({
1759 "content": [
1760 {"type":"text","text":"No authorization needed"}
1761 ]
1762 });
1763 assert!(extract_auth_challenge(&payload, "gmail_whoami").is_none());
1764 }
1765
1766 #[test]
1767 fn extract_auth_challenge_prefers_structured_content_message() {
1768 let payload = json!({
1769 "content": [
1770 {
1771 "type": "text",
1772 "text": "{\"authorization_url\":\"https://example.com/oauth\",\"message\":\"json blob\"}"
1773 }
1774 ],
1775 "structuredContent": {
1776 "authorization_url": "https://example.com/oauth",
1777 "message": "Authorize Reddit access first."
1778 }
1779 });
1780 let challenge = extract_auth_challenge(&payload, "reddit_getmyusername")
1781 .expect("auth challenge should be detected");
1782 assert_eq!(challenge.message, "Authorize Reddit access first.");
1783 }
1784
1785 #[test]
1786 fn sanitize_auth_message_compacts_llm_instructions() {
1787 let raw = "Please show the following link to the end user formatted as markdown: https://example.com/auth\nInform the end user that this tool requires authorization.";
1788 let message = sanitize_auth_message(raw);
1789 assert!(!message.contains('\n'));
1790 assert!(message.len() <= 283);
1791 }
1792
1793 #[test]
1794 fn normalize_mcp_tool_args_maps_clickup_aliases() {
1795 let server = McpServer {
1796 name: "arcade".to_string(),
1797 transport: "https://example.com/mcp".to_string(),
1798 enabled: true,
1799 connected: true,
1800 pid: None,
1801 last_error: None,
1802 last_auth_challenge: None,
1803 mcp_session_id: None,
1804 headers: HashMap::new(),
1805 secret_headers: HashMap::new(),
1806 tool_cache: vec![McpToolCacheEntry {
1807 tool_name: "Clickup_CreateTask".to_string(),
1808 description: "Create task".to_string(),
1809 input_schema: json!({
1810 "type":"object",
1811 "properties":{
1812 "list_id":{"type":"string"},
1813 "task_title":{"type":"string"}
1814 },
1815 "required":["list_id","task_title"]
1816 }),
1817 fetched_at_ms: 0,
1818 schema_hash: "x".to_string(),
1819 }],
1820 tools_fetched_at_ms: None,
1821 pending_auth_by_tool: HashMap::new(),
1822 secret_header_values: HashMap::new(),
1823 };
1824
1825 let normalized = normalize_mcp_tool_args(
1826 &server,
1827 "Clickup_CreateTask",
1828 json!({
1829 "listId": "123",
1830 "name": "Prep fish"
1831 }),
1832 );
1833 assert_eq!(
1834 normalized.get("list_id").and_then(|v| v.as_str()),
1835 Some("123")
1836 );
1837 assert_eq!(
1838 normalized.get("task_title").and_then(|v| v.as_str()),
1839 Some("Prep fish")
1840 );
1841 }
1842
1843 #[test]
1844 fn normalize_arg_key_ignores_case_and_separators() {
1845 assert_eq!(normalize_arg_key("task_title"), "tasktitle");
1846 assert_eq!(normalize_arg_key("taskTitle"), "tasktitle");
1847 assert_eq!(normalize_arg_key("task-title"), "tasktitle");
1848 }
1849
1850 #[test]
1851 fn pending_auth_blocks_retries_within_cooldown() {
1852 let mut server = McpServer {
1853 name: "arcade".to_string(),
1854 transport: "https://example.com/mcp".to_string(),
1855 enabled: true,
1856 connected: true,
1857 pid: None,
1858 last_error: None,
1859 last_auth_challenge: None,
1860 mcp_session_id: None,
1861 headers: HashMap::new(),
1862 secret_headers: HashMap::new(),
1863 tool_cache: Vec::new(),
1864 tools_fetched_at_ms: None,
1865 pending_auth_by_tool: HashMap::new(),
1866 secret_header_values: HashMap::new(),
1867 };
1868 server.pending_auth_by_tool.insert(
1869 "clickup_whoami".to_string(),
1870 PendingMcpAuth {
1871 challenge_id: "abc".to_string(),
1872 authorization_url: "https://example.com/auth".to_string(),
1873 message: "Authorize ClickUp access.".to_string(),
1874 status: "pending".to_string(),
1875 first_seen_ms: 1_000,
1876 last_probe_ms: 2_000,
1877 },
1878 );
1879 let blocked =
1880 pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 10_000, 15_000)
1881 .expect("should block");
1882 assert!(blocked.output.contains("Authorization pending"));
1883 assert!(blocked
1884 .mcp_auth
1885 .get("pending")
1886 .and_then(|v| v.as_bool())
1887 .unwrap_or(false));
1888 }
1889
1890 #[test]
1891 fn pending_auth_allows_probe_after_cooldown() {
1892 let mut server = McpServer {
1893 name: "arcade".to_string(),
1894 transport: "https://example.com/mcp".to_string(),
1895 enabled: true,
1896 connected: true,
1897 pid: None,
1898 last_error: None,
1899 last_auth_challenge: None,
1900 mcp_session_id: None,
1901 headers: HashMap::new(),
1902 secret_headers: HashMap::new(),
1903 tool_cache: Vec::new(),
1904 tools_fetched_at_ms: None,
1905 pending_auth_by_tool: HashMap::new(),
1906 secret_header_values: HashMap::new(),
1907 };
1908 server.pending_auth_by_tool.insert(
1909 "clickup_whoami".to_string(),
1910 PendingMcpAuth {
1911 challenge_id: "abc".to_string(),
1912 authorization_url: "https://example.com/auth".to_string(),
1913 message: "Authorize ClickUp access.".to_string(),
1914 status: "pending".to_string(),
1915 first_seen_ms: 1_000,
1916 last_probe_ms: 2_000,
1917 },
1918 );
1919 assert!(
1920 pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 17_001, 15_000)
1921 .is_none(),
1922 "cooldown elapsed should allow re-probe"
1923 );
1924 }
1925
1926 #[test]
1927 fn pending_auth_is_tool_scoped() {
1928 let mut server = McpServer {
1929 name: "arcade".to_string(),
1930 transport: "https://example.com/mcp".to_string(),
1931 enabled: true,
1932 connected: true,
1933 pid: None,
1934 last_error: None,
1935 last_auth_challenge: None,
1936 mcp_session_id: None,
1937 headers: HashMap::new(),
1938 secret_headers: HashMap::new(),
1939 tool_cache: Vec::new(),
1940 tools_fetched_at_ms: None,
1941 pending_auth_by_tool: HashMap::new(),
1942 secret_header_values: HashMap::new(),
1943 };
1944 server.pending_auth_by_tool.insert(
1945 "gmail_sendemail".to_string(),
1946 PendingMcpAuth {
1947 challenge_id: "abc".to_string(),
1948 authorization_url: "https://example.com/auth".to_string(),
1949 message: "Authorize Gmail access.".to_string(),
1950 status: "pending".to_string(),
1951 first_seen_ms: 1_000,
1952 last_probe_ms: 2_000,
1953 },
1954 );
1955 assert!(pending_auth_short_circuit(
1956 &server,
1957 "gmail_sendemail",
1958 "Gmail_SendEmail",
1959 2_100,
1960 15_000
1961 )
1962 .is_some());
1963 assert!(pending_auth_short_circuit(
1964 &server,
1965 "clickup_whoami",
1966 "Clickup_WhoAmI",
1967 2_100,
1968 15_000
1969 )
1970 .is_none());
1971 }
1972
1973 #[test]
1974 fn store_secret_ref_requires_matching_tenant_context() {
1975 let secret_id = "mcp_header::tenant::authorization".to_string();
1976 tandem_core::set_provider_auth(&secret_id, "tenant-secret").expect("store secret");
1977
1978 let current_tenant = TenantContext::explicit("tenant", "workspace", None);
1979 let matching_ref = McpSecretRef::Store {
1980 secret_id: secret_id.clone(),
1981 tenant_context: current_tenant.clone(),
1982 };
1983 assert_eq!(
1984 resolve_secret_ref_value(&matching_ref, ¤t_tenant).as_deref(),
1985 Some("tenant-secret")
1986 );
1987
1988 let mismatched_tenant = TenantContext::explicit("tenant", "other-workspace", None);
1989 assert!(
1990 resolve_secret_ref_value(&matching_ref, &mismatched_tenant).is_none(),
1991 "tenant mismatch should block secret lookup"
1992 );
1993
1994 let _ = tandem_core::delete_provider_auth(&secret_id);
1995 }
1996}