Skip to main content

tandem_runtime/
mcp.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::{LocalImplicitTenant, SecretRef, TenantContext, ToolResult};
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22    pub tool_name: String,
23    pub description: String,
24    #[serde(default)]
25    pub input_schema: Value,
26    pub fetched_at_ms: u64,
27    pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32    pub name: String,
33    pub transport: String,
34    #[serde(default = "default_enabled")]
35    pub enabled: bool,
36    pub connected: bool,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub pid: Option<u32>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub last_error: Option<String>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub last_auth_challenge: Option<McpAuthChallenge>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub mcp_session_id: Option<String>,
45    #[serde(default)]
46    pub headers: HashMap<String, String>,
47    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48    pub secret_headers: HashMap<String, McpSecretRef>,
49    #[serde(default)]
50    pub tool_cache: Vec<McpToolCacheEntry>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub tools_fetched_at_ms: Option<u64>,
53    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
54    pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
55    #[serde(default, skip)]
56    pub secret_header_values: HashMap<String, String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum McpSecretRef {
62    Store {
63        secret_id: String,
64        #[serde(default)]
65        tenant_context: TenantContext,
66    },
67    Env {
68        env: String,
69    },
70    BearerEnv {
71        env: String,
72    },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct McpAuthChallenge {
77    pub challenge_id: String,
78    pub tool_name: String,
79    pub authorization_url: String,
80    pub message: String,
81    pub requested_at_ms: u64,
82    pub status: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PendingMcpAuth {
87    pub challenge_id: String,
88    pub authorization_url: String,
89    pub message: String,
90    pub status: String,
91    pub first_seen_ms: u64,
92    pub last_probe_ms: u64,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct McpRemoteTool {
97    pub server_name: String,
98    pub tool_name: String,
99    pub namespaced_name: String,
100    pub description: String,
101    #[serde(default)]
102    pub input_schema: Value,
103    pub fetched_at_ms: u64,
104    pub schema_hash: String,
105}
106
107#[derive(Clone)]
108pub struct McpRegistry {
109    servers: Arc<RwLock<HashMap<String, McpServer>>>,
110    processes: Arc<Mutex<HashMap<String, Child>>>,
111    state_file: Arc<PathBuf>,
112}
113
114impl McpRegistry {
115    pub fn new() -> Self {
116        Self::new_with_state_file(resolve_state_file())
117    }
118
119    pub fn new_with_state_file(state_file: PathBuf) -> Self {
120        let (loaded_state, migrated) = load_state(&state_file);
121        let loaded = loaded_state
122            .into_iter()
123            .map(|(k, mut v)| {
124                v.connected = false;
125                v.pid = None;
126                if v.name.trim().is_empty() {
127                    v.name = k.clone();
128                }
129                if v.headers.is_empty() {
130                    v.headers = HashMap::new();
131                }
132                if v.secret_headers.is_empty() {
133                    v.secret_headers = HashMap::new();
134                }
135                let tenant_context = local_tenant_context();
136                v.secret_header_values =
137                    resolve_secret_header_values(&v.secret_headers, &tenant_context);
138                (k, v)
139            })
140            .collect::<HashMap<_, _>>();
141        if migrated {
142            persist_state_blocking(&state_file, &loaded);
143        }
144        Self {
145            servers: Arc::new(RwLock::new(loaded)),
146            processes: Arc::new(Mutex::new(HashMap::new())),
147            state_file: Arc::new(state_file),
148        }
149    }
150
151    pub async fn list(&self) -> HashMap<String, McpServer> {
152        self.servers.read().await.clone()
153    }
154
155    pub async fn list_public(&self) -> HashMap<String, McpServer> {
156        self.servers
157            .read()
158            .await
159            .iter()
160            .map(|(name, server)| (name.clone(), redacted_server_view(server)))
161            .collect()
162    }
163
164    pub async fn add(&self, name: String, transport: String) {
165        self.add_or_update(name, transport, HashMap::new(), true)
166            .await;
167    }
168
169    pub async fn add_or_update(
170        &self,
171        name: String,
172        transport: String,
173        headers: HashMap<String, String>,
174        enabled: bool,
175    ) {
176        self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
177            .await;
178    }
179
180    pub async fn add_or_update_with_secret_refs(
181        &self,
182        name: String,
183        transport: String,
184        headers: HashMap<String, String>,
185        secret_headers: HashMap<String, McpSecretRef>,
186        enabled: bool,
187    ) {
188        let normalized_name = name.trim().to_string();
189        let tenant_context = local_tenant_context();
190        let (persisted_headers, persisted_secret_headers, secret_header_values) =
191            split_headers_for_storage(&normalized_name, headers, secret_headers, &tenant_context);
192        let mut servers = self.servers.write().await;
193        let existing = servers.get(&normalized_name).cloned();
194        let preserve_cache = existing.as_ref().is_some_and(|row| {
195            row.transport == transport
196                && effective_headers(row)
197                    == combine_headers(&persisted_headers, &secret_header_values)
198        });
199        let existing_tool_cache = if preserve_cache {
200            existing
201                .as_ref()
202                .map(|row| row.tool_cache.clone())
203                .unwrap_or_default()
204        } else {
205            Vec::new()
206        };
207        let existing_fetched_at = if preserve_cache {
208            existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
209        } else {
210            None
211        };
212        let server = McpServer {
213            name: normalized_name.clone(),
214            transport,
215            enabled,
216            connected: false,
217            pid: None,
218            last_error: None,
219            last_auth_challenge: None,
220            mcp_session_id: None,
221            headers: persisted_headers,
222            secret_headers: persisted_secret_headers,
223            tool_cache: existing_tool_cache,
224            tools_fetched_at_ms: existing_fetched_at,
225            pending_auth_by_tool: HashMap::new(),
226            secret_header_values,
227        };
228        servers.insert(normalized_name, server);
229        drop(servers);
230        self.persist_state().await;
231    }
232
233    pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
234        let mut servers = self.servers.write().await;
235        let Some(server) = servers.get_mut(name) else {
236            return false;
237        };
238        server.enabled = enabled;
239        if !enabled {
240            server.connected = false;
241            server.pid = None;
242            server.last_auth_challenge = None;
243            server.mcp_session_id = None;
244            server.pending_auth_by_tool.clear();
245        }
246        drop(servers);
247        if !enabled {
248            if let Some(mut child) = self.processes.lock().await.remove(name) {
249                let _ = child.kill().await;
250                let _ = child.wait().await;
251            }
252        }
253        self.persist_state().await;
254        true
255    }
256
257    pub async fn remove(&self, name: &str) -> bool {
258        let removed_server = {
259            let mut servers = self.servers.write().await;
260            servers.remove(name)
261        };
262        let Some(server) = removed_server else {
263            return false;
264        };
265        let current_tenant = local_tenant_context();
266        delete_secret_header_refs(&server.secret_headers, &current_tenant);
267
268        if let Some(mut child) = self.processes.lock().await.remove(name) {
269            let _ = child.kill().await;
270            let _ = child.wait().await;
271        }
272        self.persist_state().await;
273        true
274    }
275
276    pub async fn connect(&self, name: &str) -> bool {
277        let server = {
278            let servers = self.servers.read().await;
279            let Some(server) = servers.get(name) else {
280                return false;
281            };
282            server.clone()
283        };
284
285        if !server.enabled {
286            let mut servers = self.servers.write().await;
287            if let Some(entry) = servers.get_mut(name) {
288                entry.connected = false;
289                entry.pid = None;
290                entry.last_error = Some("MCP server is disabled".to_string());
291                entry.last_auth_challenge = None;
292                entry.mcp_session_id = None;
293                entry.pending_auth_by_tool.clear();
294            }
295            drop(servers);
296            self.persist_state().await;
297            return false;
298        }
299
300        if let Some(command_text) = parse_stdio_transport(&server.transport) {
301            return self.connect_stdio(name, command_text).await;
302        }
303
304        if parse_remote_endpoint(&server.transport).is_some() {
305            return self.refresh(name).await.is_ok();
306        }
307
308        let mut servers = self.servers.write().await;
309        if let Some(entry) = servers.get_mut(name) {
310            entry.connected = true;
311            entry.pid = None;
312            entry.last_error = None;
313            entry.last_auth_challenge = None;
314            entry.mcp_session_id = None;
315            entry.pending_auth_by_tool.clear();
316        }
317        drop(servers);
318        self.persist_state().await;
319        true
320    }
321
322    pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
323        let server = {
324            let servers = self.servers.read().await;
325            let Some(server) = servers.get(name) else {
326                return Err("MCP server not found".to_string());
327            };
328            server.clone()
329        };
330
331        if !server.enabled {
332            return Err("MCP server is disabled".to_string());
333        }
334
335        let endpoint = parse_remote_endpoint(&server.transport)
336            .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
337
338        let request_headers = effective_headers(&server);
339        let (tools, session_id) = match self
340            .discover_remote_tools(&endpoint, &request_headers)
341            .await
342        {
343            Ok(result) => result,
344            Err(err) => {
345                let mut servers = self.servers.write().await;
346                if let Some(entry) = servers.get_mut(name) {
347                    entry.connected = false;
348                    entry.pid = None;
349                    entry.last_error = Some(err.clone());
350                    entry.last_auth_challenge = None;
351                    entry.mcp_session_id = None;
352                    entry.pending_auth_by_tool.clear();
353                    entry.tool_cache.clear();
354                    entry.tools_fetched_at_ms = None;
355                }
356                drop(servers);
357                self.persist_state().await;
358                return Err(err);
359            }
360        };
361
362        let now = now_ms();
363        let cache = tools
364            .iter()
365            .map(|tool| McpToolCacheEntry {
366                tool_name: tool.tool_name.clone(),
367                description: tool.description.clone(),
368                input_schema: tool.input_schema.clone(),
369                fetched_at_ms: now,
370                schema_hash: schema_hash(&tool.input_schema),
371            })
372            .collect::<Vec<_>>();
373
374        let mut servers = self.servers.write().await;
375        if let Some(entry) = servers.get_mut(name) {
376            entry.connected = true;
377            entry.pid = None;
378            entry.last_error = None;
379            entry.last_auth_challenge = None;
380            entry.mcp_session_id = session_id;
381            entry.tool_cache = cache;
382            entry.tools_fetched_at_ms = Some(now);
383            entry.pending_auth_by_tool.clear();
384        }
385        drop(servers);
386        self.persist_state().await;
387        Ok(self.server_tools(name).await)
388    }
389
390    pub async fn disconnect(&self, name: &str) -> bool {
391        if let Some(mut child) = self.processes.lock().await.remove(name) {
392            let _ = child.kill().await;
393            let _ = child.wait().await;
394        }
395        let mut servers = self.servers.write().await;
396        if let Some(server) = servers.get_mut(name) {
397            server.connected = false;
398            server.pid = None;
399            server.last_auth_challenge = None;
400            server.mcp_session_id = None;
401            server.pending_auth_by_tool.clear();
402            drop(servers);
403            self.persist_state().await;
404            return true;
405        }
406        false
407    }
408
409    pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
410        let mut out = self
411            .servers
412            .read()
413            .await
414            .values()
415            .filter(|server| server.enabled && server.connected)
416            .flat_map(server_tool_rows)
417            .collect::<Vec<_>>();
418        out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
419        out
420    }
421
422    pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
423        let Some(server) = self.servers.read().await.get(name).cloned() else {
424            return Vec::new();
425        };
426        let mut rows = server_tool_rows(&server);
427        rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
428        rows
429    }
430
431    pub async fn call_tool(
432        &self,
433        server_name: &str,
434        tool_name: &str,
435        args: Value,
436    ) -> Result<ToolResult, String> {
437        let server = {
438            let servers = self.servers.read().await;
439            let Some(server) = servers.get(server_name) else {
440                return Err(format!("MCP server '{server_name}' not found"));
441            };
442            server.clone()
443        };
444
445        if !server.enabled {
446            return Err(format!("MCP server '{server_name}' is disabled"));
447        }
448        if !server.connected {
449            return Err(format!("MCP server '{server_name}' is not connected"));
450        }
451
452        let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
453            "MCP tools/call currently supports HTTP/S transports only".to_string()
454        })?;
455        let canonical_tool = canonical_tool_key(tool_name);
456        let now = now_ms();
457        if let Some(blocked) = pending_auth_short_circuit(
458            &server,
459            &canonical_tool,
460            tool_name,
461            now,
462            MCP_AUTH_REPROBE_COOLDOWN_MS,
463        ) {
464            return Ok(ToolResult {
465                output: blocked.output,
466                metadata: json!({
467                    "server": server_name,
468                    "tool": tool_name,
469                    "result": Value::Null,
470                    "mcpAuth": blocked.mcp_auth
471                }),
472            });
473        }
474        let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
475
476        {
477            let mut servers = self.servers.write().await;
478            if let Some(row) = servers.get_mut(server_name) {
479                if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
480                    pending.last_probe_ms = now;
481                }
482            }
483        }
484
485        let request = json!({
486            "jsonrpc": "2.0",
487            "id": format!("call-{}-{}", server_name, now_ms()),
488            "method": "tools/call",
489            "params": {
490                "name": tool_name,
491                "arguments": normalized_args
492            }
493        });
494        let (response, session_id) = post_json_rpc_with_session(
495            &endpoint,
496            &effective_headers(&server),
497            request,
498            server.mcp_session_id.as_deref(),
499        )
500        .await?;
501        if session_id.is_some() {
502            let mut servers = self.servers.write().await;
503            if let Some(row) = servers.get_mut(server_name) {
504                row.mcp_session_id = session_id;
505            }
506            drop(servers);
507            self.persist_state().await;
508        }
509
510        if let Some(err) = response.get("error") {
511            if let Some(challenge) = extract_auth_challenge(err, tool_name) {
512                let output = format!(
513                    "{}\n\nAuthorize here: {}",
514                    challenge.message, challenge.authorization_url
515                );
516                {
517                    let mut servers = self.servers.write().await;
518                    if let Some(row) = servers.get_mut(server_name) {
519                        row.last_auth_challenge = Some(challenge.clone());
520                        row.last_error = None;
521                        row.pending_auth_by_tool.insert(
522                            canonical_tool.clone(),
523                            pending_auth_from_challenge(&challenge),
524                        );
525                    }
526                }
527                self.persist_state().await;
528                return Ok(ToolResult {
529                    output,
530                    metadata: json!({
531                        "server": server_name,
532                        "tool": tool_name,
533                        "result": Value::Null,
534                        "mcpAuth": {
535                            "required": true,
536                            "challengeId": challenge.challenge_id,
537                            "tool": challenge.tool_name,
538                            "authorizationUrl": challenge.authorization_url,
539                            "message": challenge.message,
540                            "status": challenge.status
541                        }
542                    }),
543                });
544            }
545            let message = err
546                .get("message")
547                .and_then(|v| v.as_str())
548                .unwrap_or("MCP tools/call failed");
549            return Err(message.to_string());
550        }
551
552        let result = response.get("result").cloned().unwrap_or(Value::Null);
553        let auth_challenge = extract_auth_challenge(&result, tool_name);
554        let output = if let Some(challenge) = auth_challenge.as_ref() {
555            format!(
556                "{}\n\nAuthorize here: {}",
557                challenge.message, challenge.authorization_url
558            )
559        } else {
560            result
561                .get("content")
562                .map(render_mcp_content)
563                .or_else(|| result.get("output").map(|v| v.to_string()))
564                .unwrap_or_else(|| result.to_string())
565        };
566
567        {
568            let mut servers = self.servers.write().await;
569            if let Some(row) = servers.get_mut(server_name) {
570                row.last_auth_challenge = auth_challenge.clone();
571                if let Some(challenge) = auth_challenge.as_ref() {
572                    row.pending_auth_by_tool.insert(
573                        canonical_tool.clone(),
574                        pending_auth_from_challenge(challenge),
575                    );
576                } else {
577                    row.pending_auth_by_tool.remove(&canonical_tool);
578                }
579            }
580        }
581        self.persist_state().await;
582
583        let auth_metadata = auth_challenge.as_ref().map(|challenge| {
584            json!({
585                "required": true,
586                "challengeId": challenge.challenge_id,
587                "tool": challenge.tool_name,
588                "authorizationUrl": challenge.authorization_url,
589                "message": challenge.message,
590                "status": challenge.status
591            })
592        });
593
594        Ok(ToolResult {
595            output,
596            metadata: json!({
597                "server": server_name,
598                "tool": tool_name,
599                "result": result,
600                "mcpAuth": auth_metadata
601            }),
602        })
603    }
604
605    async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
606        match spawn_stdio_process(command_text).await {
607            Ok(child) => {
608                let pid = child.id();
609                self.processes.lock().await.insert(name.to_string(), child);
610                let mut servers = self.servers.write().await;
611                if let Some(server) = servers.get_mut(name) {
612                    server.connected = true;
613                    server.pid = pid;
614                    server.last_error = None;
615                    server.last_auth_challenge = None;
616                    server.pending_auth_by_tool.clear();
617                }
618                drop(servers);
619                self.persist_state().await;
620                true
621            }
622            Err(err) => {
623                let mut servers = self.servers.write().await;
624                if let Some(server) = servers.get_mut(name) {
625                    server.connected = false;
626                    server.pid = None;
627                    server.last_error = Some(err);
628                    server.last_auth_challenge = None;
629                    server.pending_auth_by_tool.clear();
630                }
631                drop(servers);
632                self.persist_state().await;
633                false
634            }
635        }
636    }
637
638    async fn discover_remote_tools(
639        &self,
640        endpoint: &str,
641        headers: &HashMap<String, String>,
642    ) -> Result<(Vec<McpRemoteTool>, Option<String>), String> {
643        let initialize = json!({
644            "jsonrpc": "2.0",
645            "id": "initialize-1",
646            "method": "initialize",
647            "params": {
648                "protocolVersion": MCP_PROTOCOL_VERSION,
649                "capabilities": {},
650                "clientInfo": {
651                    "name": MCP_CLIENT_NAME,
652                    "version": MCP_CLIENT_VERSION,
653                }
654            }
655        });
656        let (init_response, mut session_id) =
657            post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
658        if let Some(err) = init_response.get("error") {
659            let message = err
660                .get("message")
661                .and_then(|v| v.as_str())
662                .unwrap_or("MCP initialize failed");
663            return Err(message.to_string());
664        }
665
666        let tools_list = json!({
667            "jsonrpc": "2.0",
668            "id": "tools-list-1",
669            "method": "tools/list",
670            "params": {}
671        });
672        let (tools_response, next_session_id) =
673            post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
674                .await?;
675        if next_session_id.is_some() {
676            session_id = next_session_id;
677        }
678        if let Some(err) = tools_response.get("error") {
679            let message = err
680                .get("message")
681                .and_then(|v| v.as_str())
682                .unwrap_or("MCP tools/list failed");
683            return Err(message.to_string());
684        }
685
686        let tools = tools_response
687            .get("result")
688            .and_then(|v| v.get("tools"))
689            .and_then(|v| v.as_array())
690            .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
691
692        let now = now_ms();
693        let mut out = Vec::new();
694        for row in tools {
695            let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
696                continue;
697            };
698            let description = row
699                .get("description")
700                .and_then(|v| v.as_str())
701                .unwrap_or("")
702                .to_string();
703            let mut input_schema = row
704                .get("inputSchema")
705                .or_else(|| row.get("input_schema"))
706                .cloned()
707                .unwrap_or_else(|| json!({"type":"object"}));
708            normalize_tool_input_schema(&mut input_schema);
709            out.push(McpRemoteTool {
710                server_name: String::new(),
711                tool_name: tool_name.to_string(),
712                namespaced_name: String::new(),
713                description,
714                input_schema,
715                fetched_at_ms: now,
716                schema_hash: String::new(),
717            });
718        }
719
720        Ok((out, session_id))
721    }
722
723    async fn persist_state(&self) {
724        let snapshot = self.servers.read().await.clone();
725        persist_state_blocking(self.state_file.as_path(), &snapshot);
726    }
727}
728
729impl Default for McpRegistry {
730    fn default() -> Self {
731        Self::new()
732    }
733}
734
735fn default_enabled() -> bool {
736    true
737}
738
739fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
740    if let Some(parent) = path.parent() {
741        let _ = std::fs::create_dir_all(parent);
742    }
743    if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
744        let _ = std::fs::write(path, payload);
745    }
746}
747
748fn resolve_state_file() -> PathBuf {
749    if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
750        return PathBuf::from(path);
751    }
752    if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
753        let trimmed = state_dir.trim();
754        if !trimmed.is_empty() {
755            return PathBuf::from(trimmed).join("mcp_servers.json");
756        }
757    }
758    if let Some(data_dir) = dirs::data_dir() {
759        return data_dir
760            .join("tandem")
761            .join("data")
762            .join("mcp_servers.json");
763    }
764    dirs::home_dir()
765        .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
766        .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
767}
768
769fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
770    let Ok(raw) = std::fs::read_to_string(path) else {
771        return (HashMap::new(), false);
772    };
773    let mut migrated = false;
774    let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
775    for (name, server) in parsed.iter_mut() {
776        let tenant_context = local_tenant_context();
777        let (headers, secret_headers, secret_header_values, server_migrated) =
778            migrate_server_headers(name, server, &tenant_context);
779        migrated = migrated || server_migrated;
780        server.headers = headers;
781        server.secret_headers = secret_headers;
782        server.secret_header_values = secret_header_values;
783    }
784    (parsed, migrated)
785}
786
787fn migrate_server_headers(
788    server_name: &str,
789    server: &McpServer,
790    current_tenant: &TenantContext,
791) -> (
792    HashMap<String, String>,
793    HashMap<String, McpSecretRef>,
794    HashMap<String, String>,
795    bool,
796) {
797    let original_effective = effective_headers(server);
798    let mut persisted_secret_headers = server.secret_headers.clone();
799    let mut secret_header_values =
800        resolve_secret_header_values(&persisted_secret_headers, current_tenant);
801    let mut persisted_headers = server.headers.clone();
802    let mut migrated = false;
803
804    let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
805    for header_name in header_keys {
806        let Some(value) = persisted_headers.get(&header_name).cloned() else {
807            continue;
808        };
809        if persisted_secret_headers.contains_key(&header_name) {
810            continue;
811        }
812        if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
813            persisted_headers.remove(&header_name);
814            let resolved =
815                resolve_secret_ref_value(&secret_ref, current_tenant).unwrap_or_default();
816            persisted_secret_headers.insert(header_name.clone(), secret_ref);
817            if !resolved.is_empty() {
818                secret_header_values.insert(header_name.clone(), resolved);
819            }
820            migrated = true;
821            continue;
822        }
823        if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
824            let secret_id = mcp_header_secret_id(server_name, &header_name);
825            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
826                persisted_headers.remove(&header_name);
827                persisted_secret_headers.insert(
828                    header_name.clone(),
829                    McpSecretRef::Store {
830                        secret_id: secret_id.clone(),
831                        tenant_context: current_tenant.clone(),
832                    },
833                );
834                secret_header_values.insert(header_name.clone(), value);
835                migrated = true;
836            }
837        }
838    }
839
840    if !migrated {
841        let effective = combine_headers(&persisted_headers, &secret_header_values);
842        migrated = effective != original_effective;
843    }
844
845    (
846        persisted_headers,
847        persisted_secret_headers,
848        secret_header_values,
849        migrated,
850    )
851}
852
853fn split_headers_for_storage(
854    server_name: &str,
855    headers: HashMap<String, String>,
856    explicit_secret_headers: HashMap<String, McpSecretRef>,
857    current_tenant: &TenantContext,
858) -> (
859    HashMap<String, String>,
860    HashMap<String, McpSecretRef>,
861    HashMap<String, String>,
862) {
863    let mut persisted_headers = HashMap::new();
864    let mut persisted_secret_headers = HashMap::new();
865    let mut secret_header_values = HashMap::new();
866
867    for (header_name, raw_value) in headers {
868        let value = raw_value.trim().to_string();
869        if value.is_empty() {
870            continue;
871        }
872        if let Some(secret_ref) = parse_secret_header_reference(&value) {
873            if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
874                secret_header_values.insert(header_name.clone(), resolved);
875            }
876            persisted_secret_headers.insert(header_name, secret_ref);
877            continue;
878        }
879        if header_name_is_sensitive(&header_name) {
880            let secret_id = mcp_header_secret_id(server_name, &header_name);
881            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
882                persisted_secret_headers.insert(
883                    header_name.clone(),
884                    McpSecretRef::Store {
885                        secret_id: secret_id.clone(),
886                        tenant_context: current_tenant.clone(),
887                    },
888                );
889                secret_header_values.insert(header_name, value);
890                continue;
891            }
892        }
893        persisted_headers.insert(header_name, value);
894    }
895
896    for (header_name, secret_ref) in explicit_secret_headers {
897        if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
898            secret_header_values.insert(header_name.clone(), resolved);
899        }
900        persisted_headers.remove(&header_name);
901        persisted_secret_headers.insert(header_name, secret_ref);
902    }
903
904    (
905        persisted_headers,
906        persisted_secret_headers,
907        secret_header_values,
908    )
909}
910
911fn combine_headers(
912    headers: &HashMap<String, String>,
913    secret_header_values: &HashMap<String, String>,
914) -> HashMap<String, String> {
915    let mut combined = headers.clone();
916    for (key, value) in secret_header_values {
917        if !value.trim().is_empty() {
918            combined.insert(key.clone(), value.clone());
919        }
920    }
921    combined
922}
923
924fn effective_headers(server: &McpServer) -> HashMap<String, String> {
925    combine_headers(&server.headers, &server.secret_header_values)
926}
927
928fn redacted_server_view(server: &McpServer) -> McpServer {
929    let mut clone = server.clone();
930    for (header_name, secret_ref) in &clone.secret_headers {
931        clone.headers.insert(
932            header_name.clone(),
933            redacted_secret_header_value(secret_ref),
934        );
935    }
936    clone.secret_header_values.clear();
937    clone
938}
939
940fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
941    match secret_ref {
942        McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
943        McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
944    }
945}
946
947fn resolve_secret_header_values(
948    secret_headers: &HashMap<String, McpSecretRef>,
949    current_tenant: &TenantContext,
950) -> HashMap<String, String> {
951    let mut out = HashMap::new();
952    for (header_name, secret_ref) in secret_headers {
953        if let Some(value) = resolve_secret_ref_value(secret_ref, current_tenant) {
954            if !value.trim().is_empty() {
955                out.insert(header_name.clone(), value);
956            }
957        }
958    }
959    out
960}
961
962fn delete_secret_header_refs(
963    secret_headers: &HashMap<String, McpSecretRef>,
964    current_tenant: &TenantContext,
965) {
966    for secret_ref in secret_headers.values() {
967        if let McpSecretRef::Store {
968            secret_id,
969            tenant_context,
970        } = secret_ref
971        {
972            if tenant_context != current_tenant {
973                continue;
974            }
975            let _ = tandem_core::delete_provider_auth(secret_id);
976        }
977    }
978}
979
980fn resolve_secret_ref_value(
981    secret_ref: &McpSecretRef,
982    current_tenant: &TenantContext,
983) -> Option<String> {
984    match secret_ref {
985        McpSecretRef::Store {
986            secret_id,
987            tenant_context,
988        } => {
989            let secret_ref = SecretRef {
990                org_id: tenant_context.org_id.clone(),
991                workspace_id: tenant_context.workspace_id.clone(),
992                provider: "mcp_header".to_string(),
993                secret_id: secret_id.trim().to_string(),
994                name: secret_id.trim().to_string(),
995            };
996            if secret_ref.validate_for_tenant(current_tenant).is_err() {
997                return None;
998            }
999            tandem_core::load_provider_auth()
1000                .get(&secret_id.trim().to_ascii_lowercase())
1001                .cloned()
1002                .filter(|value| !value.trim().is_empty())
1003        }
1004        McpSecretRef::Env { env } => std::env::var(env)
1005            .ok()
1006            .map(|value| value.trim().to_string())
1007            .filter(|value| !value.is_empty()),
1008        McpSecretRef::BearerEnv { env } => std::env::var(env)
1009            .ok()
1010            .map(|value| value.trim().to_string())
1011            .filter(|value| !value.is_empty())
1012            .map(|value| format!("Bearer {value}")),
1013    }
1014}
1015
1016fn local_tenant_context() -> TenantContext {
1017    LocalImplicitTenant.into()
1018}
1019
1020fn parse_secret_header_reference(raw: &str) -> Option<McpSecretRef> {
1021    let trimmed = raw.trim();
1022    if let Some(env) = trimmed
1023        .strip_prefix("${env:")
1024        .and_then(|rest| rest.strip_suffix('}'))
1025        .map(str::trim)
1026        .filter(|value| !value.is_empty())
1027    {
1028        return Some(McpSecretRef::Env {
1029            env: env.to_string(),
1030        });
1031    }
1032    if let Some(env) = trimmed
1033        .strip_prefix("${bearer_env:")
1034        .and_then(|rest| rest.strip_suffix('}'))
1035        .map(str::trim)
1036        .filter(|value| !value.is_empty())
1037    {
1038        return Some(McpSecretRef::BearerEnv {
1039            env: env.to_string(),
1040        });
1041    }
1042    if let Some(env) = trimmed
1043        .strip_prefix("Bearer ${env:")
1044        .and_then(|rest| rest.strip_suffix("}"))
1045        .map(str::trim)
1046        .filter(|value| !value.is_empty())
1047    {
1048        return Some(McpSecretRef::BearerEnv {
1049            env: env.to_string(),
1050        });
1051    }
1052    None
1053}
1054
1055fn header_name_is_sensitive(header_name: &str) -> bool {
1056    let normalized = header_name.trim().to_ascii_lowercase();
1057    normalized == "authorization"
1058        || normalized == "proxy-authorization"
1059        || normalized == "x-api-key"
1060        || normalized.contains("token")
1061        || normalized.contains("secret")
1062        || normalized.ends_with("api-key")
1063        || normalized.ends_with("api_key")
1064}
1065
1066fn mcp_header_secret_id(server_name: &str, header_name: &str) -> String {
1067    format!(
1068        "mcp_header::{}::{}",
1069        sanitize_namespace_segment(server_name),
1070        sanitize_namespace_segment(header_name)
1071    )
1072}
1073
1074fn parse_stdio_transport(transport: &str) -> Option<&str> {
1075    transport.strip_prefix("stdio:").map(str::trim)
1076}
1077
1078fn parse_remote_endpoint(transport: &str) -> Option<String> {
1079    let trimmed = transport.trim();
1080    if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
1081        return Some(trimmed.to_string());
1082    }
1083    for prefix in ["http:", "https:"] {
1084        if let Some(rest) = trimmed.strip_prefix(prefix) {
1085            let endpoint = rest.trim();
1086            if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
1087                return Some(endpoint.to_string());
1088            }
1089        }
1090    }
1091    None
1092}
1093
1094fn server_tool_rows(server: &McpServer) -> Vec<McpRemoteTool> {
1095    let server_slug = sanitize_namespace_segment(&server.name);
1096    server
1097        .tool_cache
1098        .iter()
1099        .map(|tool| {
1100            let tool_slug = sanitize_namespace_segment(&tool.tool_name);
1101            McpRemoteTool {
1102                server_name: server.name.clone(),
1103                tool_name: tool.tool_name.clone(),
1104                namespaced_name: format!("mcp.{server_slug}.{tool_slug}"),
1105                description: tool.description.clone(),
1106                input_schema: tool.input_schema.clone(),
1107                fetched_at_ms: tool.fetched_at_ms,
1108                schema_hash: tool.schema_hash.clone(),
1109            }
1110        })
1111        .collect()
1112}
1113
1114fn sanitize_namespace_segment(raw: &str) -> String {
1115    let mut out = String::new();
1116    let mut previous_underscore = false;
1117    for ch in raw.trim().chars() {
1118        if ch.is_ascii_alphanumeric() {
1119            out.push(ch.to_ascii_lowercase());
1120            previous_underscore = false;
1121        } else if !previous_underscore {
1122            out.push('_');
1123            previous_underscore = true;
1124        }
1125    }
1126    let cleaned = out.trim_matches('_');
1127    if cleaned.is_empty() {
1128        "tool".to_string()
1129    } else {
1130        cleaned.to_string()
1131    }
1132}
1133
1134fn schema_hash(schema: &Value) -> String {
1135    let payload = serde_json::to_vec(schema).unwrap_or_default();
1136    let mut hasher = Sha256::new();
1137    hasher.update(payload);
1138    format!("{:x}", hasher.finalize())
1139}
1140
1141fn extract_auth_challenge(result: &Value, tool_name: &str) -> Option<McpAuthChallenge> {
1142    let authorization_url = find_string_with_priority(
1143        result,
1144        &[
1145            &["structuredContent", "authorization_url"],
1146            &["structuredContent", "authorizationUrl"],
1147            &["authorization_url"],
1148            &["authorizationUrl"],
1149            &["auth_url"],
1150        ],
1151        &["authorization_url", "authorizationUrl", "auth_url"],
1152    )?;
1153    let raw_message = find_string_with_priority(
1154        result,
1155        &[
1156            &["structuredContent", "message"],
1157            &["message"],
1158            &["structuredContent", "text"],
1159            &["text"],
1160            &["llm_instructions"],
1161        ],
1162        &["message", "text", "llm_instructions"],
1163    )
1164    .unwrap_or_else(|| "This tool requires authorization before it can run.".to_string());
1165    let message = sanitize_auth_message(&raw_message);
1166    let challenge_id = stable_id_seed(&format!("{tool_name}:{authorization_url}"));
1167    Some(McpAuthChallenge {
1168        challenge_id,
1169        tool_name: tool_name.to_string(),
1170        authorization_url,
1171        message,
1172        requested_at_ms: now_ms(),
1173        status: "pending".to_string(),
1174    })
1175}
1176
1177fn find_string_by_any_key(value: &Value, keys: &[&str]) -> Option<String> {
1178    match value {
1179        Value::Object(map) => {
1180            for key in keys {
1181                if let Some(s) = map.get(*key).and_then(|v| v.as_str()) {
1182                    let trimmed = s.trim();
1183                    if !trimmed.is_empty() {
1184                        return Some(trimmed.to_string());
1185                    }
1186                }
1187            }
1188            for child in map.values() {
1189                if let Some(found) = find_string_by_any_key(child, keys) {
1190                    return Some(found);
1191                }
1192            }
1193            None
1194        }
1195        Value::Array(items) => items
1196            .iter()
1197            .find_map(|item| find_string_by_any_key(item, keys)),
1198        _ => None,
1199    }
1200}
1201
1202fn find_string_with_priority(
1203    value: &Value,
1204    paths: &[&[&str]],
1205    fallback_keys: &[&str],
1206) -> Option<String> {
1207    for path in paths {
1208        if let Some(found) = find_string_at_path(value, path) {
1209            return Some(found);
1210        }
1211    }
1212    find_string_by_any_key(value, fallback_keys)
1213}
1214
1215fn find_string_at_path(value: &Value, path: &[&str]) -> Option<String> {
1216    let mut current = value;
1217    for segment in path {
1218        current = current.get(*segment)?;
1219    }
1220    let s = current.as_str()?.trim();
1221    if s.is_empty() {
1222        None
1223    } else {
1224        Some(s.to_string())
1225    }
1226}
1227
1228fn sanitize_auth_message(raw: &str) -> String {
1229    let trimmed = raw.trim();
1230    if trimmed.is_empty() {
1231        return "This tool requires authorization before it can run.".to_string();
1232    }
1233    if let Some((head, _)) = trimmed.split_once("Authorize here:") {
1234        let head = head.trim();
1235        if !head.is_empty() {
1236            return truncate_text(head, 280);
1237        }
1238    }
1239    let no_newlines = trimmed.replace(['\r', '\n'], " ");
1240    truncate_text(no_newlines.trim(), 280)
1241}
1242
1243fn truncate_text(input: &str, max_chars: usize) -> String {
1244    if input.chars().count() <= max_chars {
1245        return input.to_string();
1246    }
1247    let truncated = input.chars().take(max_chars).collect::<String>();
1248    format!("{truncated}...")
1249}
1250
1251fn stable_id_seed(seed: &str) -> String {
1252    let mut hasher = Sha256::new();
1253    hasher.update(seed.as_bytes());
1254    let encoded = format!("{:x}", hasher.finalize());
1255    encoded.chars().take(16).collect()
1256}
1257
1258fn canonical_tool_key(tool_name: &str) -> String {
1259    tool_name.trim().to_ascii_lowercase()
1260}
1261
1262fn pending_auth_from_challenge(challenge: &McpAuthChallenge) -> PendingMcpAuth {
1263    PendingMcpAuth {
1264        challenge_id: challenge.challenge_id.clone(),
1265        authorization_url: challenge.authorization_url.clone(),
1266        message: challenge.message.clone(),
1267        status: challenge.status.clone(),
1268        first_seen_ms: challenge.requested_at_ms,
1269        last_probe_ms: challenge.requested_at_ms,
1270    }
1271}
1272
1273struct PendingAuthShortCircuit {
1274    output: String,
1275    mcp_auth: Value,
1276}
1277
1278fn pending_auth_short_circuit(
1279    server: &McpServer,
1280    tool_key: &str,
1281    tool_name: &str,
1282    now_ms_value: u64,
1283    cooldown_ms: u64,
1284) -> Option<PendingAuthShortCircuit> {
1285    let pending = server.pending_auth_by_tool.get(tool_key)?;
1286    let elapsed = now_ms_value.saturating_sub(pending.last_probe_ms);
1287    if elapsed >= cooldown_ms {
1288        return None;
1289    }
1290    let retry_after_ms = cooldown_ms.saturating_sub(elapsed);
1291    let output = format!(
1292        "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1293        tool_name,
1294        pending.message,
1295        pending.authorization_url,
1296        retry_after_ms.div_ceil(1000)
1297    );
1298    Some(PendingAuthShortCircuit {
1299        output,
1300        mcp_auth: json!({
1301            "required": true,
1302            "pending": true,
1303            "blocked": true,
1304            "retryAfterMs": retry_after_ms,
1305            "challengeId": pending.challenge_id,
1306            "tool": tool_name,
1307            "authorizationUrl": pending.authorization_url,
1308            "message": pending.message,
1309            "status": pending.status
1310        }),
1311    })
1312}
1313
1314fn normalize_tool_input_schema(schema: &mut Value) {
1315    normalize_schema_node(schema);
1316}
1317
1318fn normalize_schema_node(node: &mut Value) {
1319    let Some(obj) = node.as_object_mut() else {
1320        return;
1321    };
1322
1323    // Some MCP servers publish enums on non-string/object/array fields, which
1324    // OpenAI-compatible providers may reject (e.g. Gemini via OpenRouter).
1325    // Keep enum only when values are all strings and schema type is string-like.
1326    if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
1327        let all_strings = enum_values.iter().all(|v| v.is_string());
1328        let string_like_type = schema_type_allows_string_enum(obj.get("type"));
1329        if !all_strings || !string_like_type {
1330            obj.remove("enum");
1331        }
1332    }
1333
1334    if let Some(properties) = obj.get_mut("properties").and_then(|v| v.as_object_mut()) {
1335        for value in properties.values_mut() {
1336            normalize_schema_node(value);
1337        }
1338    }
1339
1340    if let Some(items) = obj.get_mut("items") {
1341        normalize_schema_node(items);
1342    }
1343
1344    for key in ["anyOf", "oneOf", "allOf"] {
1345        if let Some(array) = obj.get_mut(key).and_then(|v| v.as_array_mut()) {
1346            for child in array.iter_mut() {
1347                normalize_schema_node(child);
1348            }
1349        }
1350    }
1351
1352    if let Some(additional) = obj.get_mut("additionalProperties") {
1353        normalize_schema_node(additional);
1354    }
1355}
1356
1357fn schema_type_allows_string_enum(schema_type: Option<&Value>) -> bool {
1358    let Some(schema_type) = schema_type else {
1359        // No explicit type: keep enum to avoid over-normalizing loosely-typed schemas.
1360        return true;
1361    };
1362
1363    if let Some(kind) = schema_type.as_str() {
1364        return kind == "string";
1365    }
1366
1367    if let Some(kinds) = schema_type.as_array() {
1368        let mut saw_string = false;
1369        for kind in kinds {
1370            let Some(kind) = kind.as_str() else {
1371                return false;
1372            };
1373            if kind == "string" {
1374                saw_string = true;
1375                continue;
1376            }
1377            if kind != "null" {
1378                return false;
1379            }
1380        }
1381        return saw_string;
1382    }
1383
1384    false
1385}
1386
1387fn now_ms() -> u64 {
1388    SystemTime::now()
1389        .duration_since(UNIX_EPOCH)
1390        .map(|d| d.as_millis() as u64)
1391        .unwrap_or(0)
1392}
1393
1394fn build_headers(headers: &HashMap<String, String>) -> Result<HeaderMap, String> {
1395    let mut map = HeaderMap::new();
1396    map.insert(
1397        ACCEPT,
1398        HeaderValue::from_static("application/json, text/event-stream"),
1399    );
1400    map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
1401    for (key, value) in headers {
1402        let name = HeaderName::from_bytes(key.trim().as_bytes())
1403            .map_err(|e| format!("Invalid header name '{key}': {e}"))?;
1404        let header = HeaderValue::from_str(value.trim())
1405            .map_err(|e| format!("Invalid header value for '{key}': {e}"))?;
1406        map.insert(name, header);
1407    }
1408    Ok(map)
1409}
1410
1411async fn post_json_rpc_with_session(
1412    endpoint: &str,
1413    headers: &HashMap<String, String>,
1414    request: Value,
1415    session_id: Option<&str>,
1416) -> Result<(Value, Option<String>), String> {
1417    let client = reqwest::Client::builder()
1418        .timeout(std::time::Duration::from_secs(12))
1419        .build()
1420        .map_err(|e| format!("Failed to build HTTP client: {e}"))?;
1421    let mut req = client.post(endpoint).headers(build_headers(headers)?);
1422    if let Some(id) = session_id {
1423        let trimmed = id.trim();
1424        if !trimmed.is_empty() {
1425            req = req.header("Mcp-Session-Id", trimmed);
1426        }
1427    }
1428    let response = req
1429        .json(&request)
1430        .send()
1431        .await
1432        .map_err(|e| format!("MCP request failed: {e}"))?;
1433    let content_type = response
1434        .headers()
1435        .get(CONTENT_TYPE)
1436        .and_then(|v| v.to_str().ok())
1437        .unwrap_or("")
1438        .to_ascii_lowercase();
1439    let response_session_id = response
1440        .headers()
1441        .get("mcp-session-id")
1442        .and_then(|v| v.to_str().ok())
1443        .map(|v| v.trim().to_string())
1444        .filter(|v| !v.is_empty());
1445    let status = response.status();
1446    let payload = response
1447        .text()
1448        .await
1449        .map_err(|e| format!("Failed to read MCP response: {e}"))?;
1450    if !status.is_success() {
1451        return Err(format!(
1452            "MCP endpoint returned HTTP {}: {}",
1453            status.as_u16(),
1454            payload.chars().take(400).collect::<String>()
1455        ));
1456    }
1457
1458    let value = if content_type.starts_with("text/event-stream") {
1459        parse_sse_first_event_json(&payload).map_err(|e| {
1460            format!(
1461                "Invalid MCP SSE JSON response: {} (snippet: {})",
1462                e,
1463                payload.chars().take(400).collect::<String>()
1464            )
1465        })?
1466    } else if let Ok(value) = serde_json::from_str::<Value>(&payload) {
1467        value
1468    } else if let Ok(value) = parse_sse_first_event_json(&payload) {
1469        // Some MCP servers return SSE payloads without setting text/event-stream.
1470        value
1471    } else {
1472        return Err(format!(
1473            "Invalid MCP JSON response: {}",
1474            payload.chars().take(400).collect::<String>()
1475        ));
1476    };
1477
1478    Ok((value, response_session_id))
1479}
1480
1481fn parse_sse_first_event_json(payload: &str) -> Result<Value, String> {
1482    let mut data_lines: Vec<&str> = Vec::new();
1483    for raw in payload.lines() {
1484        let line = raw.trim_end_matches('\r');
1485        if let Some(rest) = line.strip_prefix("data:") {
1486            data_lines.push(rest.trim_start());
1487        }
1488        if line.is_empty() {
1489            if !data_lines.is_empty() {
1490                break;
1491            }
1492            continue;
1493        }
1494    }
1495    if data_lines.is_empty() {
1496        return Err("no SSE data event found".to_string());
1497    }
1498    let joined = data_lines.join("\n");
1499    serde_json::from_str::<Value>(&joined).map_err(|e| e.to_string())
1500}
1501
1502fn render_mcp_content(value: &Value) -> String {
1503    let Some(items) = value.as_array() else {
1504        return value.to_string();
1505    };
1506    let mut chunks = Vec::new();
1507    for item in items {
1508        if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
1509            chunks.push(text.to_string());
1510            continue;
1511        }
1512        chunks.push(item.to_string());
1513    }
1514    if chunks.is_empty() {
1515        value.to_string()
1516    } else {
1517        chunks.join("\n")
1518    }
1519}
1520
1521fn normalize_mcp_tool_args(server: &McpServer, tool_name: &str, raw_args: Value) -> Value {
1522    let Some(schema) = server
1523        .tool_cache
1524        .iter()
1525        .find(|row| row.tool_name.eq_ignore_ascii_case(tool_name))
1526        .map(|row| &row.input_schema)
1527    else {
1528        return raw_args;
1529    };
1530
1531    let mut args_obj = match raw_args {
1532        Value::Object(obj) => obj,
1533        other => return other,
1534    };
1535
1536    let properties = schema
1537        .get("properties")
1538        .and_then(|v| v.as_object())
1539        .cloned()
1540        .unwrap_or_default();
1541    if properties.is_empty() {
1542        return Value::Object(args_obj);
1543    }
1544
1545    // Build a normalized-key lookup so taskTitle -> task_title and list-id -> list_id resolve.
1546    let mut normalized_existing: HashMap<String, String> = HashMap::new();
1547    for key in args_obj.keys() {
1548        normalized_existing.insert(normalize_arg_key(key), key.clone());
1549    }
1550
1551    // Copy values from normalized aliases to canonical schema property names.
1552    let canonical_keys = properties.keys().cloned().collect::<Vec<_>>();
1553    for canonical in &canonical_keys {
1554        if args_obj.contains_key(canonical) {
1555            continue;
1556        }
1557        if let Some(existing_key) = normalized_existing.get(&normalize_arg_key(canonical)) {
1558            if let Some(value) = args_obj.get(existing_key).cloned() {
1559                args_obj.insert(canonical.clone(), value);
1560            }
1561        }
1562    }
1563
1564    // Fill required fields using conservative aliases when models choose common alternatives.
1565    let required = schema
1566        .get("required")
1567        .and_then(|v| v.as_array())
1568        .map(|arr| {
1569            arr.iter()
1570                .filter_map(|v| v.as_str().map(str::to_string))
1571                .collect::<Vec<_>>()
1572        })
1573        .unwrap_or_default();
1574
1575    for required_key in required {
1576        if args_obj.contains_key(&required_key) {
1577            continue;
1578        }
1579        if let Some(alias_value) = find_required_alias_value(&required_key, &args_obj) {
1580            args_obj.insert(required_key, alias_value);
1581        }
1582    }
1583
1584    Value::Object(args_obj)
1585}
1586
1587fn find_required_alias_value(
1588    required_key: &str,
1589    args_obj: &serde_json::Map<String, Value>,
1590) -> Option<Value> {
1591    let mut alias_candidates = vec![
1592        required_key.to_string(),
1593        required_key.to_ascii_lowercase(),
1594        required_key.replace('_', ""),
1595    ];
1596
1597    // Common fallback for fields like task_title where models often send `name`.
1598    if required_key.contains("title") {
1599        alias_candidates.extend([
1600            "name".to_string(),
1601            "title".to_string(),
1602            "task_name".to_string(),
1603            "taskname".to_string(),
1604        ]);
1605    }
1606
1607    // Common fallback for *_id fields where models emit `<base>` or `<base>Id`.
1608    if let Some(base) = required_key.strip_suffix("_id") {
1609        alias_candidates.extend([base.to_string(), format!("{base}id"), format!("{base}_id")]);
1610    }
1611
1612    let mut by_normalized: HashMap<String, &Value> = HashMap::new();
1613    for (key, value) in args_obj {
1614        by_normalized.insert(normalize_arg_key(key), value);
1615    }
1616
1617    alias_candidates
1618        .into_iter()
1619        .find_map(|candidate| by_normalized.get(&normalize_arg_key(&candidate)).cloned())
1620        .cloned()
1621}
1622
1623fn normalize_arg_key(key: &str) -> String {
1624    key.chars()
1625        .filter(|ch| ch.is_ascii_alphanumeric())
1626        .map(|ch| ch.to_ascii_lowercase())
1627        .collect()
1628}
1629
1630async fn spawn_stdio_process(command_text: &str) -> Result<Child, String> {
1631    if command_text.is_empty() {
1632        return Err("Missing stdio command".to_string());
1633    }
1634    #[cfg(windows)]
1635    let mut command = {
1636        let mut cmd = Command::new("powershell");
1637        cmd.args(["-NoProfile", "-Command", command_text]);
1638        cmd
1639    };
1640    #[cfg(not(windows))]
1641    let mut command = {
1642        let mut cmd = Command::new("sh");
1643        cmd.args(["-lc", command_text]);
1644        cmd
1645    };
1646    command
1647        .stdin(std::process::Stdio::null())
1648        .stdout(std::process::Stdio::null())
1649        .stderr(std::process::Stdio::null());
1650    command.spawn().map_err(|e| e.to_string())
1651}
1652
1653#[cfg(test)]
1654mod tests {
1655    use super::*;
1656    use uuid::Uuid;
1657
1658    #[tokio::test]
1659    async fn add_connect_disconnect_non_stdio_server() {
1660        let file = std::env::temp_dir().join(format!("mcp-test-{}.json", Uuid::new_v4()));
1661        let registry = McpRegistry::new_with_state_file(file);
1662        registry
1663            .add("example".to_string(), "sse:https://example.com".to_string())
1664            .await;
1665        assert!(registry.connect("example").await);
1666        let listed = registry.list().await;
1667        assert!(listed.get("example").map(|s| s.connected).unwrap_or(false));
1668        assert!(registry.disconnect("example").await);
1669    }
1670
1671    #[test]
1672    fn parse_remote_endpoint_supports_http_prefixes() {
1673        assert_eq!(
1674            parse_remote_endpoint("https://mcp.example.com/mcp"),
1675            Some("https://mcp.example.com/mcp".to_string())
1676        );
1677        assert_eq!(
1678            parse_remote_endpoint("http:https://mcp.example.com/mcp"),
1679            Some("https://mcp.example.com/mcp".to_string())
1680        );
1681    }
1682
1683    #[test]
1684    fn normalize_schema_removes_non_string_enums_recursively() {
1685        let mut schema = json!({
1686            "type": "object",
1687            "properties": {
1688                "good": { "type": "string", "enum": ["a", "b"] },
1689                "good_nullable": { "type": ["string", "null"], "enum": ["asc", "desc"] },
1690                "bad_object": { "type": "object", "enum": ["asc", "desc"] },
1691                "bad_array": { "type": "array", "enum": ["asc", "desc"] },
1692                "bad_number": { "type": "number", "enum": [1, 2] },
1693                "bad_mixed": { "enum": ["ok", 1] },
1694                "nested": {
1695                    "type": "object",
1696                    "properties": {
1697                        "child": { "enum": [true, false] }
1698                    }
1699                }
1700            }
1701        });
1702
1703        normalize_tool_input_schema(&mut schema);
1704
1705        assert!(
1706            schema["properties"]["good"]["enum"].is_array(),
1707            "string enums should be preserved"
1708        );
1709        assert!(
1710            schema["properties"]["good_nullable"]["enum"].is_array(),
1711            "string|null enums should be preserved"
1712        );
1713        assert!(
1714            schema["properties"]["bad_object"]["enum"].is_null(),
1715            "object enums should be dropped"
1716        );
1717        assert!(
1718            schema["properties"]["bad_array"]["enum"].is_null(),
1719            "array enums should be dropped"
1720        );
1721        assert!(
1722            schema["properties"]["bad_number"]["enum"].is_null(),
1723            "non-string enums should be dropped"
1724        );
1725        assert!(
1726            schema["properties"]["bad_mixed"]["enum"].is_null(),
1727            "mixed enums should be dropped"
1728        );
1729        assert!(
1730            schema["properties"]["nested"]["properties"]["child"]["enum"].is_null(),
1731            "recursive non-string enums should be dropped"
1732        );
1733    }
1734
1735    #[test]
1736    fn extract_auth_challenge_from_result_payload() {
1737        let payload = json!({
1738            "content": [
1739                {
1740                    "type": "text",
1741                    "llm_instructions": "Authorize Gmail access first.",
1742                    "authorization_url": "https://example.com/oauth/start"
1743                }
1744            ]
1745        });
1746        let challenge = extract_auth_challenge(&payload, "gmail_whoami")
1747            .expect("auth challenge should be detected");
1748        assert_eq!(challenge.tool_name, "gmail_whoami");
1749        assert_eq!(
1750            challenge.authorization_url,
1751            "https://example.com/oauth/start"
1752        );
1753        assert_eq!(challenge.status, "pending");
1754    }
1755
1756    #[test]
1757    fn extract_auth_challenge_returns_none_without_url() {
1758        let payload = json!({
1759            "content": [
1760                {"type":"text","text":"No authorization needed"}
1761            ]
1762        });
1763        assert!(extract_auth_challenge(&payload, "gmail_whoami").is_none());
1764    }
1765
1766    #[test]
1767    fn extract_auth_challenge_prefers_structured_content_message() {
1768        let payload = json!({
1769            "content": [
1770                {
1771                    "type": "text",
1772                    "text": "{\"authorization_url\":\"https://example.com/oauth\",\"message\":\"json blob\"}"
1773                }
1774            ],
1775            "structuredContent": {
1776                "authorization_url": "https://example.com/oauth",
1777                "message": "Authorize Reddit access first."
1778            }
1779        });
1780        let challenge = extract_auth_challenge(&payload, "reddit_getmyusername")
1781            .expect("auth challenge should be detected");
1782        assert_eq!(challenge.message, "Authorize Reddit access first.");
1783    }
1784
1785    #[test]
1786    fn sanitize_auth_message_compacts_llm_instructions() {
1787        let raw = "Please show the following link to the end user formatted as markdown: https://example.com/auth\nInform the end user that this tool requires authorization.";
1788        let message = sanitize_auth_message(raw);
1789        assert!(!message.contains('\n'));
1790        assert!(message.len() <= 283);
1791    }
1792
1793    #[test]
1794    fn normalize_mcp_tool_args_maps_clickup_aliases() {
1795        let server = McpServer {
1796            name: "arcade".to_string(),
1797            transport: "https://example.com/mcp".to_string(),
1798            enabled: true,
1799            connected: true,
1800            pid: None,
1801            last_error: None,
1802            last_auth_challenge: None,
1803            mcp_session_id: None,
1804            headers: HashMap::new(),
1805            secret_headers: HashMap::new(),
1806            tool_cache: vec![McpToolCacheEntry {
1807                tool_name: "Clickup_CreateTask".to_string(),
1808                description: "Create task".to_string(),
1809                input_schema: json!({
1810                    "type":"object",
1811                    "properties":{
1812                        "list_id":{"type":"string"},
1813                        "task_title":{"type":"string"}
1814                    },
1815                    "required":["list_id","task_title"]
1816                }),
1817                fetched_at_ms: 0,
1818                schema_hash: "x".to_string(),
1819            }],
1820            tools_fetched_at_ms: None,
1821            pending_auth_by_tool: HashMap::new(),
1822            secret_header_values: HashMap::new(),
1823        };
1824
1825        let normalized = normalize_mcp_tool_args(
1826            &server,
1827            "Clickup_CreateTask",
1828            json!({
1829                "listId": "123",
1830                "name": "Prep fish"
1831            }),
1832        );
1833        assert_eq!(
1834            normalized.get("list_id").and_then(|v| v.as_str()),
1835            Some("123")
1836        );
1837        assert_eq!(
1838            normalized.get("task_title").and_then(|v| v.as_str()),
1839            Some("Prep fish")
1840        );
1841    }
1842
1843    #[test]
1844    fn normalize_arg_key_ignores_case_and_separators() {
1845        assert_eq!(normalize_arg_key("task_title"), "tasktitle");
1846        assert_eq!(normalize_arg_key("taskTitle"), "tasktitle");
1847        assert_eq!(normalize_arg_key("task-title"), "tasktitle");
1848    }
1849
1850    #[test]
1851    fn pending_auth_blocks_retries_within_cooldown() {
1852        let mut server = McpServer {
1853            name: "arcade".to_string(),
1854            transport: "https://example.com/mcp".to_string(),
1855            enabled: true,
1856            connected: true,
1857            pid: None,
1858            last_error: None,
1859            last_auth_challenge: None,
1860            mcp_session_id: None,
1861            headers: HashMap::new(),
1862            secret_headers: HashMap::new(),
1863            tool_cache: Vec::new(),
1864            tools_fetched_at_ms: None,
1865            pending_auth_by_tool: HashMap::new(),
1866            secret_header_values: HashMap::new(),
1867        };
1868        server.pending_auth_by_tool.insert(
1869            "clickup_whoami".to_string(),
1870            PendingMcpAuth {
1871                challenge_id: "abc".to_string(),
1872                authorization_url: "https://example.com/auth".to_string(),
1873                message: "Authorize ClickUp access.".to_string(),
1874                status: "pending".to_string(),
1875                first_seen_ms: 1_000,
1876                last_probe_ms: 2_000,
1877            },
1878        );
1879        let blocked =
1880            pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 10_000, 15_000)
1881                .expect("should block");
1882        assert!(blocked.output.contains("Authorization pending"));
1883        assert!(blocked
1884            .mcp_auth
1885            .get("pending")
1886            .and_then(|v| v.as_bool())
1887            .unwrap_or(false));
1888    }
1889
1890    #[test]
1891    fn pending_auth_allows_probe_after_cooldown() {
1892        let mut server = McpServer {
1893            name: "arcade".to_string(),
1894            transport: "https://example.com/mcp".to_string(),
1895            enabled: true,
1896            connected: true,
1897            pid: None,
1898            last_error: None,
1899            last_auth_challenge: None,
1900            mcp_session_id: None,
1901            headers: HashMap::new(),
1902            secret_headers: HashMap::new(),
1903            tool_cache: Vec::new(),
1904            tools_fetched_at_ms: None,
1905            pending_auth_by_tool: HashMap::new(),
1906            secret_header_values: HashMap::new(),
1907        };
1908        server.pending_auth_by_tool.insert(
1909            "clickup_whoami".to_string(),
1910            PendingMcpAuth {
1911                challenge_id: "abc".to_string(),
1912                authorization_url: "https://example.com/auth".to_string(),
1913                message: "Authorize ClickUp access.".to_string(),
1914                status: "pending".to_string(),
1915                first_seen_ms: 1_000,
1916                last_probe_ms: 2_000,
1917            },
1918        );
1919        assert!(
1920            pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 17_001, 15_000)
1921                .is_none(),
1922            "cooldown elapsed should allow re-probe"
1923        );
1924    }
1925
1926    #[test]
1927    fn pending_auth_is_tool_scoped() {
1928        let mut server = McpServer {
1929            name: "arcade".to_string(),
1930            transport: "https://example.com/mcp".to_string(),
1931            enabled: true,
1932            connected: true,
1933            pid: None,
1934            last_error: None,
1935            last_auth_challenge: None,
1936            mcp_session_id: None,
1937            headers: HashMap::new(),
1938            secret_headers: HashMap::new(),
1939            tool_cache: Vec::new(),
1940            tools_fetched_at_ms: None,
1941            pending_auth_by_tool: HashMap::new(),
1942            secret_header_values: HashMap::new(),
1943        };
1944        server.pending_auth_by_tool.insert(
1945            "gmail_sendemail".to_string(),
1946            PendingMcpAuth {
1947                challenge_id: "abc".to_string(),
1948                authorization_url: "https://example.com/auth".to_string(),
1949                message: "Authorize Gmail access.".to_string(),
1950                status: "pending".to_string(),
1951                first_seen_ms: 1_000,
1952                last_probe_ms: 2_000,
1953            },
1954        );
1955        assert!(pending_auth_short_circuit(
1956            &server,
1957            "gmail_sendemail",
1958            "Gmail_SendEmail",
1959            2_100,
1960            15_000
1961        )
1962        .is_some());
1963        assert!(pending_auth_short_circuit(
1964            &server,
1965            "clickup_whoami",
1966            "Clickup_WhoAmI",
1967            2_100,
1968            15_000
1969        )
1970        .is_none());
1971    }
1972
1973    #[test]
1974    fn store_secret_ref_requires_matching_tenant_context() {
1975        let secret_id = "mcp_header::tenant::authorization".to_string();
1976        tandem_core::set_provider_auth(&secret_id, "tenant-secret").expect("store secret");
1977
1978        let current_tenant = TenantContext::explicit("tenant", "workspace", None);
1979        let matching_ref = McpSecretRef::Store {
1980            secret_id: secret_id.clone(),
1981            tenant_context: current_tenant.clone(),
1982        };
1983        assert_eq!(
1984            resolve_secret_ref_value(&matching_ref, &current_tenant).as_deref(),
1985            Some("tenant-secret")
1986        );
1987
1988        let mismatched_tenant = TenantContext::explicit("tenant", "other-workspace", None);
1989        assert!(
1990            resolve_secret_ref_value(&matching_ref, &mismatched_tenant).is_none(),
1991            "tenant mismatch should block secret lookup"
1992        );
1993
1994        let _ = tandem_core::delete_provider_auth(&secret_id);
1995    }
1996}