Skip to main content

tandem_runtime/
mcp.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::ToolResult;
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22    pub tool_name: String,
23    pub description: String,
24    #[serde(default)]
25    pub input_schema: Value,
26    pub fetched_at_ms: u64,
27    pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32    pub name: String,
33    pub transport: String,
34    #[serde(default = "default_enabled")]
35    pub enabled: bool,
36    pub connected: bool,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub pid: Option<u32>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub last_error: Option<String>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub last_auth_challenge: Option<McpAuthChallenge>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub mcp_session_id: Option<String>,
45    #[serde(default)]
46    pub headers: HashMap<String, String>,
47    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48    pub secret_headers: HashMap<String, McpSecretRef>,
49    #[serde(default)]
50    pub tool_cache: Vec<McpToolCacheEntry>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub tools_fetched_at_ms: Option<u64>,
53    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
54    pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
55    #[serde(default, skip)]
56    pub secret_header_values: HashMap<String, String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum McpSecretRef {
62    Store { secret_id: String },
63    Env { env: String },
64    BearerEnv { env: String },
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct McpAuthChallenge {
69    pub challenge_id: String,
70    pub tool_name: String,
71    pub authorization_url: String,
72    pub message: String,
73    pub requested_at_ms: u64,
74    pub status: String,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PendingMcpAuth {
79    pub challenge_id: String,
80    pub authorization_url: String,
81    pub message: String,
82    pub status: String,
83    pub first_seen_ms: u64,
84    pub last_probe_ms: u64,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct McpRemoteTool {
89    pub server_name: String,
90    pub tool_name: String,
91    pub namespaced_name: String,
92    pub description: String,
93    #[serde(default)]
94    pub input_schema: Value,
95    pub fetched_at_ms: u64,
96    pub schema_hash: String,
97}
98
99#[derive(Clone)]
100pub struct McpRegistry {
101    servers: Arc<RwLock<HashMap<String, McpServer>>>,
102    processes: Arc<Mutex<HashMap<String, Child>>>,
103    state_file: Arc<PathBuf>,
104}
105
106impl McpRegistry {
107    pub fn new() -> Self {
108        Self::new_with_state_file(resolve_state_file())
109    }
110
111    pub fn new_with_state_file(state_file: PathBuf) -> Self {
112        let (loaded_state, migrated) = load_state(&state_file);
113        let loaded = loaded_state
114            .into_iter()
115            .map(|(k, mut v)| {
116                v.connected = false;
117                v.pid = None;
118                if v.name.trim().is_empty() {
119                    v.name = k.clone();
120                }
121                if v.headers.is_empty() {
122                    v.headers = HashMap::new();
123                }
124                if v.secret_headers.is_empty() {
125                    v.secret_headers = HashMap::new();
126                }
127                v.secret_header_values = resolve_secret_header_values(&v.secret_headers);
128                (k, v)
129            })
130            .collect::<HashMap<_, _>>();
131        if migrated {
132            persist_state_blocking(&state_file, &loaded);
133        }
134        Self {
135            servers: Arc::new(RwLock::new(loaded)),
136            processes: Arc::new(Mutex::new(HashMap::new())),
137            state_file: Arc::new(state_file),
138        }
139    }
140
141    pub async fn list(&self) -> HashMap<String, McpServer> {
142        self.servers.read().await.clone()
143    }
144
145    pub async fn list_public(&self) -> HashMap<String, McpServer> {
146        self.servers
147            .read()
148            .await
149            .iter()
150            .map(|(name, server)| (name.clone(), redacted_server_view(server)))
151            .collect()
152    }
153
154    pub async fn add(&self, name: String, transport: String) {
155        self.add_or_update(name, transport, HashMap::new(), true)
156            .await;
157    }
158
159    pub async fn add_or_update(
160        &self,
161        name: String,
162        transport: String,
163        headers: HashMap<String, String>,
164        enabled: bool,
165    ) {
166        self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
167            .await;
168    }
169
170    pub async fn add_or_update_with_secret_refs(
171        &self,
172        name: String,
173        transport: String,
174        headers: HashMap<String, String>,
175        secret_headers: HashMap<String, McpSecretRef>,
176        enabled: bool,
177    ) {
178        let normalized_name = name.trim().to_string();
179        let (persisted_headers, persisted_secret_headers, secret_header_values) =
180            split_headers_for_storage(&normalized_name, headers, secret_headers);
181        let mut servers = self.servers.write().await;
182        let existing = servers.get(&normalized_name).cloned();
183        let preserve_cache = existing.as_ref().is_some_and(|row| {
184            row.transport == transport
185                && effective_headers(row)
186                    == combine_headers(&persisted_headers, &secret_header_values)
187        });
188        let existing_tool_cache = if preserve_cache {
189            existing
190                .as_ref()
191                .map(|row| row.tool_cache.clone())
192                .unwrap_or_default()
193        } else {
194            Vec::new()
195        };
196        let existing_fetched_at = if preserve_cache {
197            existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
198        } else {
199            None
200        };
201        let server = McpServer {
202            name: normalized_name.clone(),
203            transport,
204            enabled,
205            connected: false,
206            pid: None,
207            last_error: None,
208            last_auth_challenge: None,
209            mcp_session_id: None,
210            headers: persisted_headers,
211            secret_headers: persisted_secret_headers,
212            tool_cache: existing_tool_cache,
213            tools_fetched_at_ms: existing_fetched_at,
214            pending_auth_by_tool: HashMap::new(),
215            secret_header_values,
216        };
217        servers.insert(normalized_name, server);
218        drop(servers);
219        self.persist_state().await;
220    }
221
222    pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
223        let mut servers = self.servers.write().await;
224        let Some(server) = servers.get_mut(name) else {
225            return false;
226        };
227        server.enabled = enabled;
228        if !enabled {
229            server.connected = false;
230            server.pid = None;
231            server.last_auth_challenge = None;
232            server.mcp_session_id = None;
233            server.pending_auth_by_tool.clear();
234        }
235        drop(servers);
236        if !enabled {
237            if let Some(mut child) = self.processes.lock().await.remove(name) {
238                let _ = child.kill().await;
239                let _ = child.wait().await;
240            }
241        }
242        self.persist_state().await;
243        true
244    }
245
246    pub async fn remove(&self, name: &str) -> bool {
247        let removed_server = {
248            let mut servers = self.servers.write().await;
249            servers.remove(name)
250        };
251        let Some(server) = removed_server else {
252            return false;
253        };
254        delete_secret_header_refs(&server.secret_headers);
255
256        if let Some(mut child) = self.processes.lock().await.remove(name) {
257            let _ = child.kill().await;
258            let _ = child.wait().await;
259        }
260        self.persist_state().await;
261        true
262    }
263
264    pub async fn connect(&self, name: &str) -> bool {
265        let server = {
266            let servers = self.servers.read().await;
267            let Some(server) = servers.get(name) else {
268                return false;
269            };
270            server.clone()
271        };
272
273        if !server.enabled {
274            let mut servers = self.servers.write().await;
275            if let Some(entry) = servers.get_mut(name) {
276                entry.connected = false;
277                entry.pid = None;
278                entry.last_error = Some("MCP server is disabled".to_string());
279                entry.last_auth_challenge = None;
280                entry.mcp_session_id = None;
281                entry.pending_auth_by_tool.clear();
282            }
283            drop(servers);
284            self.persist_state().await;
285            return false;
286        }
287
288        if let Some(command_text) = parse_stdio_transport(&server.transport) {
289            return self.connect_stdio(name, command_text).await;
290        }
291
292        if parse_remote_endpoint(&server.transport).is_some() {
293            return self.refresh(name).await.is_ok();
294        }
295
296        let mut servers = self.servers.write().await;
297        if let Some(entry) = servers.get_mut(name) {
298            entry.connected = true;
299            entry.pid = None;
300            entry.last_error = None;
301            entry.last_auth_challenge = None;
302            entry.mcp_session_id = None;
303            entry.pending_auth_by_tool.clear();
304        }
305        drop(servers);
306        self.persist_state().await;
307        true
308    }
309
310    pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
311        let server = {
312            let servers = self.servers.read().await;
313            let Some(server) = servers.get(name) else {
314                return Err("MCP server not found".to_string());
315            };
316            server.clone()
317        };
318
319        if !server.enabled {
320            return Err("MCP server is disabled".to_string());
321        }
322
323        let endpoint = parse_remote_endpoint(&server.transport)
324            .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
325
326        let request_headers = effective_headers(&server);
327        let (tools, session_id) = match self
328            .discover_remote_tools(&endpoint, &request_headers)
329            .await
330        {
331            Ok(result) => result,
332            Err(err) => {
333                let mut servers = self.servers.write().await;
334                if let Some(entry) = servers.get_mut(name) {
335                    entry.connected = false;
336                    entry.pid = None;
337                    entry.last_error = Some(err.clone());
338                    entry.last_auth_challenge = None;
339                    entry.mcp_session_id = None;
340                    entry.pending_auth_by_tool.clear();
341                    entry.tool_cache.clear();
342                    entry.tools_fetched_at_ms = None;
343                }
344                drop(servers);
345                self.persist_state().await;
346                return Err(err);
347            }
348        };
349
350        let now = now_ms();
351        let cache = tools
352            .iter()
353            .map(|tool| McpToolCacheEntry {
354                tool_name: tool.tool_name.clone(),
355                description: tool.description.clone(),
356                input_schema: tool.input_schema.clone(),
357                fetched_at_ms: now,
358                schema_hash: schema_hash(&tool.input_schema),
359            })
360            .collect::<Vec<_>>();
361
362        let mut servers = self.servers.write().await;
363        if let Some(entry) = servers.get_mut(name) {
364            entry.connected = true;
365            entry.pid = None;
366            entry.last_error = None;
367            entry.last_auth_challenge = None;
368            entry.mcp_session_id = session_id;
369            entry.tool_cache = cache;
370            entry.tools_fetched_at_ms = Some(now);
371            entry.pending_auth_by_tool.clear();
372        }
373        drop(servers);
374        self.persist_state().await;
375        Ok(self.server_tools(name).await)
376    }
377
378    pub async fn disconnect(&self, name: &str) -> bool {
379        if let Some(mut child) = self.processes.lock().await.remove(name) {
380            let _ = child.kill().await;
381            let _ = child.wait().await;
382        }
383        let mut servers = self.servers.write().await;
384        if let Some(server) = servers.get_mut(name) {
385            server.connected = false;
386            server.pid = None;
387            server.last_auth_challenge = None;
388            server.mcp_session_id = None;
389            server.pending_auth_by_tool.clear();
390            drop(servers);
391            self.persist_state().await;
392            return true;
393        }
394        false
395    }
396
397    pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
398        let mut out = self
399            .servers
400            .read()
401            .await
402            .values()
403            .filter(|server| server.enabled && server.connected)
404            .flat_map(server_tool_rows)
405            .collect::<Vec<_>>();
406        out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
407        out
408    }
409
410    pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
411        let Some(server) = self.servers.read().await.get(name).cloned() else {
412            return Vec::new();
413        };
414        let mut rows = server_tool_rows(&server);
415        rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
416        rows
417    }
418
419    pub async fn call_tool(
420        &self,
421        server_name: &str,
422        tool_name: &str,
423        args: Value,
424    ) -> Result<ToolResult, String> {
425        let server = {
426            let servers = self.servers.read().await;
427            let Some(server) = servers.get(server_name) else {
428                return Err(format!("MCP server '{server_name}' not found"));
429            };
430            server.clone()
431        };
432
433        if !server.enabled {
434            return Err(format!("MCP server '{server_name}' is disabled"));
435        }
436        if !server.connected {
437            return Err(format!("MCP server '{server_name}' is not connected"));
438        }
439
440        let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
441            "MCP tools/call currently supports HTTP/S transports only".to_string()
442        })?;
443        let canonical_tool = canonical_tool_key(tool_name);
444        let now = now_ms();
445        if let Some(blocked) = pending_auth_short_circuit(
446            &server,
447            &canonical_tool,
448            tool_name,
449            now,
450            MCP_AUTH_REPROBE_COOLDOWN_MS,
451        ) {
452            return Ok(ToolResult {
453                output: blocked.output,
454                metadata: json!({
455                    "server": server_name,
456                    "tool": tool_name,
457                    "result": Value::Null,
458                    "mcpAuth": blocked.mcp_auth
459                }),
460            });
461        }
462        let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
463
464        {
465            let mut servers = self.servers.write().await;
466            if let Some(row) = servers.get_mut(server_name) {
467                if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
468                    pending.last_probe_ms = now;
469                }
470            }
471        }
472
473        let request = json!({
474            "jsonrpc": "2.0",
475            "id": format!("call-{}-{}", server_name, now_ms()),
476            "method": "tools/call",
477            "params": {
478                "name": tool_name,
479                "arguments": normalized_args
480            }
481        });
482        let (response, session_id) = post_json_rpc_with_session(
483            &endpoint,
484            &effective_headers(&server),
485            request,
486            server.mcp_session_id.as_deref(),
487        )
488        .await?;
489        if session_id.is_some() {
490            let mut servers = self.servers.write().await;
491            if let Some(row) = servers.get_mut(server_name) {
492                row.mcp_session_id = session_id;
493            }
494            drop(servers);
495            self.persist_state().await;
496        }
497
498        if let Some(err) = response.get("error") {
499            if let Some(challenge) = extract_auth_challenge(err, tool_name) {
500                let output = format!(
501                    "{}\n\nAuthorize here: {}",
502                    challenge.message, challenge.authorization_url
503                );
504                {
505                    let mut servers = self.servers.write().await;
506                    if let Some(row) = servers.get_mut(server_name) {
507                        row.last_auth_challenge = Some(challenge.clone());
508                        row.last_error = None;
509                        row.pending_auth_by_tool.insert(
510                            canonical_tool.clone(),
511                            pending_auth_from_challenge(&challenge),
512                        );
513                    }
514                }
515                self.persist_state().await;
516                return Ok(ToolResult {
517                    output,
518                    metadata: json!({
519                        "server": server_name,
520                        "tool": tool_name,
521                        "result": Value::Null,
522                        "mcpAuth": {
523                            "required": true,
524                            "challengeId": challenge.challenge_id,
525                            "tool": challenge.tool_name,
526                            "authorizationUrl": challenge.authorization_url,
527                            "message": challenge.message,
528                            "status": challenge.status
529                        }
530                    }),
531                });
532            }
533            let message = err
534                .get("message")
535                .and_then(|v| v.as_str())
536                .unwrap_or("MCP tools/call failed");
537            return Err(message.to_string());
538        }
539
540        let result = response.get("result").cloned().unwrap_or(Value::Null);
541        let auth_challenge = extract_auth_challenge(&result, tool_name);
542        let output = if let Some(challenge) = auth_challenge.as_ref() {
543            format!(
544                "{}\n\nAuthorize here: {}",
545                challenge.message, challenge.authorization_url
546            )
547        } else {
548            result
549                .get("content")
550                .map(render_mcp_content)
551                .or_else(|| result.get("output").map(|v| v.to_string()))
552                .unwrap_or_else(|| result.to_string())
553        };
554
555        {
556            let mut servers = self.servers.write().await;
557            if let Some(row) = servers.get_mut(server_name) {
558                row.last_auth_challenge = auth_challenge.clone();
559                if let Some(challenge) = auth_challenge.as_ref() {
560                    row.pending_auth_by_tool.insert(
561                        canonical_tool.clone(),
562                        pending_auth_from_challenge(challenge),
563                    );
564                } else {
565                    row.pending_auth_by_tool.remove(&canonical_tool);
566                }
567            }
568        }
569        self.persist_state().await;
570
571        let auth_metadata = auth_challenge.as_ref().map(|challenge| {
572            json!({
573                "required": true,
574                "challengeId": challenge.challenge_id,
575                "tool": challenge.tool_name,
576                "authorizationUrl": challenge.authorization_url,
577                "message": challenge.message,
578                "status": challenge.status
579            })
580        });
581
582        Ok(ToolResult {
583            output,
584            metadata: json!({
585                "server": server_name,
586                "tool": tool_name,
587                "result": result,
588                "mcpAuth": auth_metadata
589            }),
590        })
591    }
592
593    async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
594        match spawn_stdio_process(command_text).await {
595            Ok(child) => {
596                let pid = child.id();
597                self.processes.lock().await.insert(name.to_string(), child);
598                let mut servers = self.servers.write().await;
599                if let Some(server) = servers.get_mut(name) {
600                    server.connected = true;
601                    server.pid = pid;
602                    server.last_error = None;
603                    server.last_auth_challenge = None;
604                    server.pending_auth_by_tool.clear();
605                }
606                drop(servers);
607                self.persist_state().await;
608                true
609            }
610            Err(err) => {
611                let mut servers = self.servers.write().await;
612                if let Some(server) = servers.get_mut(name) {
613                    server.connected = false;
614                    server.pid = None;
615                    server.last_error = Some(err);
616                    server.last_auth_challenge = None;
617                    server.pending_auth_by_tool.clear();
618                }
619                drop(servers);
620                self.persist_state().await;
621                false
622            }
623        }
624    }
625
626    async fn discover_remote_tools(
627        &self,
628        endpoint: &str,
629        headers: &HashMap<String, String>,
630    ) -> Result<(Vec<McpRemoteTool>, Option<String>), String> {
631        let initialize = json!({
632            "jsonrpc": "2.0",
633            "id": "initialize-1",
634            "method": "initialize",
635            "params": {
636                "protocolVersion": MCP_PROTOCOL_VERSION,
637                "capabilities": {},
638                "clientInfo": {
639                    "name": MCP_CLIENT_NAME,
640                    "version": MCP_CLIENT_VERSION,
641                }
642            }
643        });
644        let (init_response, mut session_id) =
645            post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
646        if let Some(err) = init_response.get("error") {
647            let message = err
648                .get("message")
649                .and_then(|v| v.as_str())
650                .unwrap_or("MCP initialize failed");
651            return Err(message.to_string());
652        }
653
654        let tools_list = json!({
655            "jsonrpc": "2.0",
656            "id": "tools-list-1",
657            "method": "tools/list",
658            "params": {}
659        });
660        let (tools_response, next_session_id) =
661            post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
662                .await?;
663        if next_session_id.is_some() {
664            session_id = next_session_id;
665        }
666        if let Some(err) = tools_response.get("error") {
667            let message = err
668                .get("message")
669                .and_then(|v| v.as_str())
670                .unwrap_or("MCP tools/list failed");
671            return Err(message.to_string());
672        }
673
674        let tools = tools_response
675            .get("result")
676            .and_then(|v| v.get("tools"))
677            .and_then(|v| v.as_array())
678            .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
679
680        let now = now_ms();
681        let mut out = Vec::new();
682        for row in tools {
683            let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
684                continue;
685            };
686            let description = row
687                .get("description")
688                .and_then(|v| v.as_str())
689                .unwrap_or("")
690                .to_string();
691            let mut input_schema = row
692                .get("inputSchema")
693                .or_else(|| row.get("input_schema"))
694                .cloned()
695                .unwrap_or_else(|| json!({"type":"object"}));
696            normalize_tool_input_schema(&mut input_schema);
697            out.push(McpRemoteTool {
698                server_name: String::new(),
699                tool_name: tool_name.to_string(),
700                namespaced_name: String::new(),
701                description,
702                input_schema,
703                fetched_at_ms: now,
704                schema_hash: String::new(),
705            });
706        }
707
708        Ok((out, session_id))
709    }
710
711    async fn persist_state(&self) {
712        let snapshot = self.servers.read().await.clone();
713        persist_state_blocking(self.state_file.as_path(), &snapshot);
714    }
715}
716
717impl Default for McpRegistry {
718    fn default() -> Self {
719        Self::new()
720    }
721}
722
723fn default_enabled() -> bool {
724    true
725}
726
727fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
728    if let Some(parent) = path.parent() {
729        let _ = std::fs::create_dir_all(parent);
730    }
731    if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
732        let _ = std::fs::write(path, payload);
733    }
734}
735
736fn resolve_state_file() -> PathBuf {
737    if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
738        return PathBuf::from(path);
739    }
740    if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
741        let trimmed = state_dir.trim();
742        if !trimmed.is_empty() {
743            return PathBuf::from(trimmed).join("mcp_servers.json");
744        }
745    }
746    if let Some(data_dir) = dirs::data_dir() {
747        return data_dir
748            .join("tandem")
749            .join("data")
750            .join("mcp_servers.json");
751    }
752    dirs::home_dir()
753        .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
754        .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
755}
756
757fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
758    let Ok(raw) = std::fs::read_to_string(path) else {
759        return (HashMap::new(), false);
760    };
761    let mut migrated = false;
762    let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
763    for (name, server) in parsed.iter_mut() {
764        let (headers, secret_headers, secret_header_values, server_migrated) =
765            migrate_server_headers(name, server);
766        migrated = migrated || server_migrated;
767        server.headers = headers;
768        server.secret_headers = secret_headers;
769        server.secret_header_values = secret_header_values;
770    }
771    (parsed, migrated)
772}
773
774fn migrate_server_headers(
775    server_name: &str,
776    server: &McpServer,
777) -> (
778    HashMap<String, String>,
779    HashMap<String, McpSecretRef>,
780    HashMap<String, String>,
781    bool,
782) {
783    let original_effective = effective_headers(server);
784    let mut persisted_secret_headers = server.secret_headers.clone();
785    let mut secret_header_values = resolve_secret_header_values(&persisted_secret_headers);
786    let mut persisted_headers = server.headers.clone();
787    let mut migrated = false;
788
789    let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
790    for header_name in header_keys {
791        let Some(value) = persisted_headers.get(&header_name).cloned() else {
792            continue;
793        };
794        if persisted_secret_headers.contains_key(&header_name) {
795            continue;
796        }
797        if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
798            persisted_headers.remove(&header_name);
799            let resolved = resolve_secret_ref_value(&secret_ref).unwrap_or_default();
800            persisted_secret_headers.insert(header_name.clone(), secret_ref);
801            if !resolved.is_empty() {
802                secret_header_values.insert(header_name.clone(), resolved);
803            }
804            migrated = true;
805            continue;
806        }
807        if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
808            let secret_id = mcp_header_secret_id(server_name, &header_name);
809            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
810                persisted_headers.remove(&header_name);
811                persisted_secret_headers.insert(
812                    header_name.clone(),
813                    McpSecretRef::Store {
814                        secret_id: secret_id.clone(),
815                    },
816                );
817                secret_header_values.insert(header_name.clone(), value);
818                migrated = true;
819            }
820        }
821    }
822
823    if !migrated {
824        let effective = combine_headers(&persisted_headers, &secret_header_values);
825        migrated = effective != original_effective;
826    }
827
828    (
829        persisted_headers,
830        persisted_secret_headers,
831        secret_header_values,
832        migrated,
833    )
834}
835
836fn split_headers_for_storage(
837    server_name: &str,
838    headers: HashMap<String, String>,
839    explicit_secret_headers: HashMap<String, McpSecretRef>,
840) -> (
841    HashMap<String, String>,
842    HashMap<String, McpSecretRef>,
843    HashMap<String, String>,
844) {
845    let mut persisted_headers = HashMap::new();
846    let mut persisted_secret_headers = HashMap::new();
847    let mut secret_header_values = HashMap::new();
848
849    for (header_name, raw_value) in headers {
850        let value = raw_value.trim().to_string();
851        if value.is_empty() {
852            continue;
853        }
854        if let Some(secret_ref) = parse_secret_header_reference(&value) {
855            if let Some(resolved) = resolve_secret_ref_value(&secret_ref) {
856                secret_header_values.insert(header_name.clone(), resolved);
857            }
858            persisted_secret_headers.insert(header_name, secret_ref);
859            continue;
860        }
861        if header_name_is_sensitive(&header_name) {
862            let secret_id = mcp_header_secret_id(server_name, &header_name);
863            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
864                persisted_secret_headers.insert(
865                    header_name.clone(),
866                    McpSecretRef::Store {
867                        secret_id: secret_id.clone(),
868                    },
869                );
870                secret_header_values.insert(header_name, value);
871                continue;
872            }
873        }
874        persisted_headers.insert(header_name, value);
875    }
876
877    for (header_name, secret_ref) in explicit_secret_headers {
878        if let Some(resolved) = resolve_secret_ref_value(&secret_ref) {
879            secret_header_values.insert(header_name.clone(), resolved);
880        }
881        persisted_headers.remove(&header_name);
882        persisted_secret_headers.insert(header_name, secret_ref);
883    }
884
885    (
886        persisted_headers,
887        persisted_secret_headers,
888        secret_header_values,
889    )
890}
891
892fn combine_headers(
893    headers: &HashMap<String, String>,
894    secret_header_values: &HashMap<String, String>,
895) -> HashMap<String, String> {
896    let mut combined = headers.clone();
897    for (key, value) in secret_header_values {
898        if !value.trim().is_empty() {
899            combined.insert(key.clone(), value.clone());
900        }
901    }
902    combined
903}
904
905fn effective_headers(server: &McpServer) -> HashMap<String, String> {
906    combine_headers(&server.headers, &server.secret_header_values)
907}
908
909fn redacted_server_view(server: &McpServer) -> McpServer {
910    let mut clone = server.clone();
911    for (header_name, secret_ref) in &clone.secret_headers {
912        clone.headers.insert(
913            header_name.clone(),
914            redacted_secret_header_value(secret_ref),
915        );
916    }
917    clone.secret_header_values.clear();
918    clone
919}
920
921fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
922    match secret_ref {
923        McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
924        McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
925    }
926}
927
928fn resolve_secret_header_values(
929    secret_headers: &HashMap<String, McpSecretRef>,
930) -> HashMap<String, String> {
931    let mut out = HashMap::new();
932    for (header_name, secret_ref) in secret_headers {
933        if let Some(value) = resolve_secret_ref_value(secret_ref) {
934            if !value.trim().is_empty() {
935                out.insert(header_name.clone(), value);
936            }
937        }
938    }
939    out
940}
941
942fn delete_secret_header_refs(secret_headers: &HashMap<String, McpSecretRef>) {
943    for secret_ref in secret_headers.values() {
944        if let McpSecretRef::Store { secret_id } = secret_ref {
945            let _ = tandem_core::delete_provider_auth(secret_id);
946        }
947    }
948}
949
950fn resolve_secret_ref_value(secret_ref: &McpSecretRef) -> Option<String> {
951    match secret_ref {
952        McpSecretRef::Store { secret_id } => tandem_core::load_provider_auth()
953            .get(&secret_id.trim().to_ascii_lowercase())
954            .cloned()
955            .filter(|value| !value.trim().is_empty()),
956        McpSecretRef::Env { env } => std::env::var(env)
957            .ok()
958            .map(|value| value.trim().to_string())
959            .filter(|value| !value.is_empty()),
960        McpSecretRef::BearerEnv { env } => std::env::var(env)
961            .ok()
962            .map(|value| value.trim().to_string())
963            .filter(|value| !value.is_empty())
964            .map(|value| format!("Bearer {value}")),
965    }
966}
967
968fn parse_secret_header_reference(raw: &str) -> Option<McpSecretRef> {
969    let trimmed = raw.trim();
970    if let Some(env) = trimmed
971        .strip_prefix("${env:")
972        .and_then(|rest| rest.strip_suffix('}'))
973        .map(str::trim)
974        .filter(|value| !value.is_empty())
975    {
976        return Some(McpSecretRef::Env {
977            env: env.to_string(),
978        });
979    }
980    if let Some(env) = trimmed
981        .strip_prefix("${bearer_env:")
982        .and_then(|rest| rest.strip_suffix('}'))
983        .map(str::trim)
984        .filter(|value| !value.is_empty())
985    {
986        return Some(McpSecretRef::BearerEnv {
987            env: env.to_string(),
988        });
989    }
990    if let Some(env) = trimmed
991        .strip_prefix("Bearer ${env:")
992        .and_then(|rest| rest.strip_suffix("}"))
993        .map(str::trim)
994        .filter(|value| !value.is_empty())
995    {
996        return Some(McpSecretRef::BearerEnv {
997            env: env.to_string(),
998        });
999    }
1000    None
1001}
1002
1003fn header_name_is_sensitive(header_name: &str) -> bool {
1004    let normalized = header_name.trim().to_ascii_lowercase();
1005    normalized == "authorization"
1006        || normalized == "proxy-authorization"
1007        || normalized == "x-api-key"
1008        || normalized.contains("token")
1009        || normalized.contains("secret")
1010        || normalized.ends_with("api-key")
1011        || normalized.ends_with("api_key")
1012}
1013
1014fn mcp_header_secret_id(server_name: &str, header_name: &str) -> String {
1015    format!(
1016        "mcp_header::{}::{}",
1017        sanitize_namespace_segment(server_name),
1018        sanitize_namespace_segment(header_name)
1019    )
1020}
1021
1022fn parse_stdio_transport(transport: &str) -> Option<&str> {
1023    transport.strip_prefix("stdio:").map(str::trim)
1024}
1025
1026fn parse_remote_endpoint(transport: &str) -> Option<String> {
1027    let trimmed = transport.trim();
1028    if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
1029        return Some(trimmed.to_string());
1030    }
1031    for prefix in ["http:", "https:"] {
1032        if let Some(rest) = trimmed.strip_prefix(prefix) {
1033            let endpoint = rest.trim();
1034            if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
1035                return Some(endpoint.to_string());
1036            }
1037        }
1038    }
1039    None
1040}
1041
1042fn server_tool_rows(server: &McpServer) -> Vec<McpRemoteTool> {
1043    let server_slug = sanitize_namespace_segment(&server.name);
1044    server
1045        .tool_cache
1046        .iter()
1047        .map(|tool| {
1048            let tool_slug = sanitize_namespace_segment(&tool.tool_name);
1049            McpRemoteTool {
1050                server_name: server.name.clone(),
1051                tool_name: tool.tool_name.clone(),
1052                namespaced_name: format!("mcp.{server_slug}.{tool_slug}"),
1053                description: tool.description.clone(),
1054                input_schema: tool.input_schema.clone(),
1055                fetched_at_ms: tool.fetched_at_ms,
1056                schema_hash: tool.schema_hash.clone(),
1057            }
1058        })
1059        .collect()
1060}
1061
1062fn sanitize_namespace_segment(raw: &str) -> String {
1063    let mut out = String::new();
1064    let mut previous_underscore = false;
1065    for ch in raw.trim().chars() {
1066        if ch.is_ascii_alphanumeric() {
1067            out.push(ch.to_ascii_lowercase());
1068            previous_underscore = false;
1069        } else if !previous_underscore {
1070            out.push('_');
1071            previous_underscore = true;
1072        }
1073    }
1074    let cleaned = out.trim_matches('_');
1075    if cleaned.is_empty() {
1076        "tool".to_string()
1077    } else {
1078        cleaned.to_string()
1079    }
1080}
1081
1082fn schema_hash(schema: &Value) -> String {
1083    let payload = serde_json::to_vec(schema).unwrap_or_default();
1084    let mut hasher = Sha256::new();
1085    hasher.update(payload);
1086    format!("{:x}", hasher.finalize())
1087}
1088
1089fn extract_auth_challenge(result: &Value, tool_name: &str) -> Option<McpAuthChallenge> {
1090    let authorization_url = find_string_with_priority(
1091        result,
1092        &[
1093            &["structuredContent", "authorization_url"],
1094            &["structuredContent", "authorizationUrl"],
1095            &["authorization_url"],
1096            &["authorizationUrl"],
1097            &["auth_url"],
1098        ],
1099        &["authorization_url", "authorizationUrl", "auth_url"],
1100    )?;
1101    let raw_message = find_string_with_priority(
1102        result,
1103        &[
1104            &["structuredContent", "message"],
1105            &["message"],
1106            &["structuredContent", "text"],
1107            &["text"],
1108            &["llm_instructions"],
1109        ],
1110        &["message", "text", "llm_instructions"],
1111    )
1112    .unwrap_or_else(|| "This tool requires authorization before it can run.".to_string());
1113    let message = sanitize_auth_message(&raw_message);
1114    let challenge_id = stable_id_seed(&format!("{tool_name}:{authorization_url}"));
1115    Some(McpAuthChallenge {
1116        challenge_id,
1117        tool_name: tool_name.to_string(),
1118        authorization_url,
1119        message,
1120        requested_at_ms: now_ms(),
1121        status: "pending".to_string(),
1122    })
1123}
1124
1125fn find_string_by_any_key(value: &Value, keys: &[&str]) -> Option<String> {
1126    match value {
1127        Value::Object(map) => {
1128            for key in keys {
1129                if let Some(s) = map.get(*key).and_then(|v| v.as_str()) {
1130                    let trimmed = s.trim();
1131                    if !trimmed.is_empty() {
1132                        return Some(trimmed.to_string());
1133                    }
1134                }
1135            }
1136            for child in map.values() {
1137                if let Some(found) = find_string_by_any_key(child, keys) {
1138                    return Some(found);
1139                }
1140            }
1141            None
1142        }
1143        Value::Array(items) => items
1144            .iter()
1145            .find_map(|item| find_string_by_any_key(item, keys)),
1146        _ => None,
1147    }
1148}
1149
1150fn find_string_with_priority(
1151    value: &Value,
1152    paths: &[&[&str]],
1153    fallback_keys: &[&str],
1154) -> Option<String> {
1155    for path in paths {
1156        if let Some(found) = find_string_at_path(value, path) {
1157            return Some(found);
1158        }
1159    }
1160    find_string_by_any_key(value, fallback_keys)
1161}
1162
1163fn find_string_at_path(value: &Value, path: &[&str]) -> Option<String> {
1164    let mut current = value;
1165    for segment in path {
1166        current = current.get(*segment)?;
1167    }
1168    let s = current.as_str()?.trim();
1169    if s.is_empty() {
1170        None
1171    } else {
1172        Some(s.to_string())
1173    }
1174}
1175
1176fn sanitize_auth_message(raw: &str) -> String {
1177    let trimmed = raw.trim();
1178    if trimmed.is_empty() {
1179        return "This tool requires authorization before it can run.".to_string();
1180    }
1181    if let Some((head, _)) = trimmed.split_once("Authorize here:") {
1182        let head = head.trim();
1183        if !head.is_empty() {
1184            return truncate_text(head, 280);
1185        }
1186    }
1187    let no_newlines = trimmed.replace(['\r', '\n'], " ");
1188    truncate_text(no_newlines.trim(), 280)
1189}
1190
1191fn truncate_text(input: &str, max_chars: usize) -> String {
1192    if input.chars().count() <= max_chars {
1193        return input.to_string();
1194    }
1195    let truncated = input.chars().take(max_chars).collect::<String>();
1196    format!("{truncated}...")
1197}
1198
1199fn stable_id_seed(seed: &str) -> String {
1200    let mut hasher = Sha256::new();
1201    hasher.update(seed.as_bytes());
1202    let encoded = format!("{:x}", hasher.finalize());
1203    encoded.chars().take(16).collect()
1204}
1205
1206fn canonical_tool_key(tool_name: &str) -> String {
1207    tool_name.trim().to_ascii_lowercase()
1208}
1209
1210fn pending_auth_from_challenge(challenge: &McpAuthChallenge) -> PendingMcpAuth {
1211    PendingMcpAuth {
1212        challenge_id: challenge.challenge_id.clone(),
1213        authorization_url: challenge.authorization_url.clone(),
1214        message: challenge.message.clone(),
1215        status: challenge.status.clone(),
1216        first_seen_ms: challenge.requested_at_ms,
1217        last_probe_ms: challenge.requested_at_ms,
1218    }
1219}
1220
1221struct PendingAuthShortCircuit {
1222    output: String,
1223    mcp_auth: Value,
1224}
1225
1226fn pending_auth_short_circuit(
1227    server: &McpServer,
1228    tool_key: &str,
1229    tool_name: &str,
1230    now_ms_value: u64,
1231    cooldown_ms: u64,
1232) -> Option<PendingAuthShortCircuit> {
1233    let pending = server.pending_auth_by_tool.get(tool_key)?;
1234    let elapsed = now_ms_value.saturating_sub(pending.last_probe_ms);
1235    if elapsed >= cooldown_ms {
1236        return None;
1237    }
1238    let retry_after_ms = cooldown_ms.saturating_sub(elapsed);
1239    let output = format!(
1240        "Authorization pending for `{}`.\n{}\n\nAuthorize here: {}\nRetry after {}s.",
1241        tool_name,
1242        pending.message,
1243        pending.authorization_url,
1244        retry_after_ms.div_ceil(1000)
1245    );
1246    Some(PendingAuthShortCircuit {
1247        output,
1248        mcp_auth: json!({
1249            "required": true,
1250            "pending": true,
1251            "blocked": true,
1252            "retryAfterMs": retry_after_ms,
1253            "challengeId": pending.challenge_id,
1254            "tool": tool_name,
1255            "authorizationUrl": pending.authorization_url,
1256            "message": pending.message,
1257            "status": pending.status
1258        }),
1259    })
1260}
1261
1262fn normalize_tool_input_schema(schema: &mut Value) {
1263    normalize_schema_node(schema);
1264}
1265
1266fn normalize_schema_node(node: &mut Value) {
1267    let Some(obj) = node.as_object_mut() else {
1268        return;
1269    };
1270
1271    // Some MCP servers publish enums on non-string/object/array fields, which
1272    // OpenAI-compatible providers may reject (e.g. Gemini via OpenRouter).
1273    // Keep enum only when values are all strings and schema type is string-like.
1274    if let Some(enum_values) = obj.get("enum").and_then(|v| v.as_array()) {
1275        let all_strings = enum_values.iter().all(|v| v.is_string());
1276        let string_like_type = schema_type_allows_string_enum(obj.get("type"));
1277        if !all_strings || !string_like_type {
1278            obj.remove("enum");
1279        }
1280    }
1281
1282    if let Some(properties) = obj.get_mut("properties").and_then(|v| v.as_object_mut()) {
1283        for value in properties.values_mut() {
1284            normalize_schema_node(value);
1285        }
1286    }
1287
1288    if let Some(items) = obj.get_mut("items") {
1289        normalize_schema_node(items);
1290    }
1291
1292    for key in ["anyOf", "oneOf", "allOf"] {
1293        if let Some(array) = obj.get_mut(key).and_then(|v| v.as_array_mut()) {
1294            for child in array.iter_mut() {
1295                normalize_schema_node(child);
1296            }
1297        }
1298    }
1299
1300    if let Some(additional) = obj.get_mut("additionalProperties") {
1301        normalize_schema_node(additional);
1302    }
1303}
1304
1305fn schema_type_allows_string_enum(schema_type: Option<&Value>) -> bool {
1306    let Some(schema_type) = schema_type else {
1307        // No explicit type: keep enum to avoid over-normalizing loosely-typed schemas.
1308        return true;
1309    };
1310
1311    if let Some(kind) = schema_type.as_str() {
1312        return kind == "string";
1313    }
1314
1315    if let Some(kinds) = schema_type.as_array() {
1316        let mut saw_string = false;
1317        for kind in kinds {
1318            let Some(kind) = kind.as_str() else {
1319                return false;
1320            };
1321            if kind == "string" {
1322                saw_string = true;
1323                continue;
1324            }
1325            if kind != "null" {
1326                return false;
1327            }
1328        }
1329        return saw_string;
1330    }
1331
1332    false
1333}
1334
1335fn now_ms() -> u64 {
1336    SystemTime::now()
1337        .duration_since(UNIX_EPOCH)
1338        .map(|d| d.as_millis() as u64)
1339        .unwrap_or(0)
1340}
1341
1342fn build_headers(headers: &HashMap<String, String>) -> Result<HeaderMap, String> {
1343    let mut map = HeaderMap::new();
1344    map.insert(
1345        ACCEPT,
1346        HeaderValue::from_static("application/json, text/event-stream"),
1347    );
1348    map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
1349    for (key, value) in headers {
1350        let name = HeaderName::from_bytes(key.trim().as_bytes())
1351            .map_err(|e| format!("Invalid header name '{key}': {e}"))?;
1352        let header = HeaderValue::from_str(value.trim())
1353            .map_err(|e| format!("Invalid header value for '{key}': {e}"))?;
1354        map.insert(name, header);
1355    }
1356    Ok(map)
1357}
1358
1359async fn post_json_rpc_with_session(
1360    endpoint: &str,
1361    headers: &HashMap<String, String>,
1362    request: Value,
1363    session_id: Option<&str>,
1364) -> Result<(Value, Option<String>), String> {
1365    let client = reqwest::Client::builder()
1366        .timeout(std::time::Duration::from_secs(12))
1367        .build()
1368        .map_err(|e| format!("Failed to build HTTP client: {e}"))?;
1369    let mut req = client.post(endpoint).headers(build_headers(headers)?);
1370    if let Some(id) = session_id {
1371        let trimmed = id.trim();
1372        if !trimmed.is_empty() {
1373            req = req.header("Mcp-Session-Id", trimmed);
1374        }
1375    }
1376    let response = req
1377        .json(&request)
1378        .send()
1379        .await
1380        .map_err(|e| format!("MCP request failed: {e}"))?;
1381    let content_type = response
1382        .headers()
1383        .get(CONTENT_TYPE)
1384        .and_then(|v| v.to_str().ok())
1385        .unwrap_or("")
1386        .to_ascii_lowercase();
1387    let response_session_id = response
1388        .headers()
1389        .get("mcp-session-id")
1390        .and_then(|v| v.to_str().ok())
1391        .map(|v| v.trim().to_string())
1392        .filter(|v| !v.is_empty());
1393    let status = response.status();
1394    let payload = response
1395        .text()
1396        .await
1397        .map_err(|e| format!("Failed to read MCP response: {e}"))?;
1398    if !status.is_success() {
1399        return Err(format!(
1400            "MCP endpoint returned HTTP {}: {}",
1401            status.as_u16(),
1402            payload.chars().take(400).collect::<String>()
1403        ));
1404    }
1405
1406    let value = if content_type.starts_with("text/event-stream") {
1407        parse_sse_first_event_json(&payload).map_err(|e| {
1408            format!(
1409                "Invalid MCP SSE JSON response: {} (snippet: {})",
1410                e,
1411                payload.chars().take(400).collect::<String>()
1412            )
1413        })?
1414    } else if let Ok(value) = serde_json::from_str::<Value>(&payload) {
1415        value
1416    } else if let Ok(value) = parse_sse_first_event_json(&payload) {
1417        // Some MCP servers return SSE payloads without setting text/event-stream.
1418        value
1419    } else {
1420        return Err(format!(
1421            "Invalid MCP JSON response: {}",
1422            payload.chars().take(400).collect::<String>()
1423        ));
1424    };
1425
1426    Ok((value, response_session_id))
1427}
1428
1429fn parse_sse_first_event_json(payload: &str) -> Result<Value, String> {
1430    let mut data_lines: Vec<&str> = Vec::new();
1431    for raw in payload.lines() {
1432        let line = raw.trim_end_matches('\r');
1433        if let Some(rest) = line.strip_prefix("data:") {
1434            data_lines.push(rest.trim_start());
1435        }
1436        if line.is_empty() {
1437            if !data_lines.is_empty() {
1438                break;
1439            }
1440            continue;
1441        }
1442    }
1443    if data_lines.is_empty() {
1444        return Err("no SSE data event found".to_string());
1445    }
1446    let joined = data_lines.join("\n");
1447    serde_json::from_str::<Value>(&joined).map_err(|e| e.to_string())
1448}
1449
1450fn render_mcp_content(value: &Value) -> String {
1451    let Some(items) = value.as_array() else {
1452        return value.to_string();
1453    };
1454    let mut chunks = Vec::new();
1455    for item in items {
1456        if let Some(text) = item.get("text").and_then(|v| v.as_str()) {
1457            chunks.push(text.to_string());
1458            continue;
1459        }
1460        chunks.push(item.to_string());
1461    }
1462    if chunks.is_empty() {
1463        value.to_string()
1464    } else {
1465        chunks.join("\n")
1466    }
1467}
1468
1469fn normalize_mcp_tool_args(server: &McpServer, tool_name: &str, raw_args: Value) -> Value {
1470    let Some(schema) = server
1471        .tool_cache
1472        .iter()
1473        .find(|row| row.tool_name.eq_ignore_ascii_case(tool_name))
1474        .map(|row| &row.input_schema)
1475    else {
1476        return raw_args;
1477    };
1478
1479    let mut args_obj = match raw_args {
1480        Value::Object(obj) => obj,
1481        other => return other,
1482    };
1483
1484    let properties = schema
1485        .get("properties")
1486        .and_then(|v| v.as_object())
1487        .cloned()
1488        .unwrap_or_default();
1489    if properties.is_empty() {
1490        return Value::Object(args_obj);
1491    }
1492
1493    // Build a normalized-key lookup so taskTitle -> task_title and list-id -> list_id resolve.
1494    let mut normalized_existing: HashMap<String, String> = HashMap::new();
1495    for key in args_obj.keys() {
1496        normalized_existing.insert(normalize_arg_key(key), key.clone());
1497    }
1498
1499    // Copy values from normalized aliases to canonical schema property names.
1500    let canonical_keys = properties.keys().cloned().collect::<Vec<_>>();
1501    for canonical in &canonical_keys {
1502        if args_obj.contains_key(canonical) {
1503            continue;
1504        }
1505        if let Some(existing_key) = normalized_existing.get(&normalize_arg_key(canonical)) {
1506            if let Some(value) = args_obj.get(existing_key).cloned() {
1507                args_obj.insert(canonical.clone(), value);
1508            }
1509        }
1510    }
1511
1512    // Fill required fields using conservative aliases when models choose common alternatives.
1513    let required = schema
1514        .get("required")
1515        .and_then(|v| v.as_array())
1516        .map(|arr| {
1517            arr.iter()
1518                .filter_map(|v| v.as_str().map(str::to_string))
1519                .collect::<Vec<_>>()
1520        })
1521        .unwrap_or_default();
1522
1523    for required_key in required {
1524        if args_obj.contains_key(&required_key) {
1525            continue;
1526        }
1527        if let Some(alias_value) = find_required_alias_value(&required_key, &args_obj) {
1528            args_obj.insert(required_key, alias_value);
1529        }
1530    }
1531
1532    Value::Object(args_obj)
1533}
1534
1535fn find_required_alias_value(
1536    required_key: &str,
1537    args_obj: &serde_json::Map<String, Value>,
1538) -> Option<Value> {
1539    let mut alias_candidates = vec![
1540        required_key.to_string(),
1541        required_key.to_ascii_lowercase(),
1542        required_key.replace('_', ""),
1543    ];
1544
1545    // Common fallback for fields like task_title where models often send `name`.
1546    if required_key.contains("title") {
1547        alias_candidates.extend([
1548            "name".to_string(),
1549            "title".to_string(),
1550            "task_name".to_string(),
1551            "taskname".to_string(),
1552        ]);
1553    }
1554
1555    // Common fallback for *_id fields where models emit `<base>` or `<base>Id`.
1556    if let Some(base) = required_key.strip_suffix("_id") {
1557        alias_candidates.extend([base.to_string(), format!("{base}id"), format!("{base}_id")]);
1558    }
1559
1560    let mut by_normalized: HashMap<String, &Value> = HashMap::new();
1561    for (key, value) in args_obj {
1562        by_normalized.insert(normalize_arg_key(key), value);
1563    }
1564
1565    alias_candidates
1566        .into_iter()
1567        .find_map(|candidate| by_normalized.get(&normalize_arg_key(&candidate)).cloned())
1568        .cloned()
1569}
1570
1571fn normalize_arg_key(key: &str) -> String {
1572    key.chars()
1573        .filter(|ch| ch.is_ascii_alphanumeric())
1574        .map(|ch| ch.to_ascii_lowercase())
1575        .collect()
1576}
1577
1578async fn spawn_stdio_process(command_text: &str) -> Result<Child, String> {
1579    if command_text.is_empty() {
1580        return Err("Missing stdio command".to_string());
1581    }
1582    #[cfg(windows)]
1583    let mut command = {
1584        let mut cmd = Command::new("powershell");
1585        cmd.args(["-NoProfile", "-Command", command_text]);
1586        cmd
1587    };
1588    #[cfg(not(windows))]
1589    let mut command = {
1590        let mut cmd = Command::new("sh");
1591        cmd.args(["-lc", command_text]);
1592        cmd
1593    };
1594    command
1595        .stdin(std::process::Stdio::null())
1596        .stdout(std::process::Stdio::null())
1597        .stderr(std::process::Stdio::null());
1598    command.spawn().map_err(|e| e.to_string())
1599}
1600
1601#[cfg(test)]
1602mod tests {
1603    use super::*;
1604    use uuid::Uuid;
1605
1606    #[tokio::test]
1607    async fn add_connect_disconnect_non_stdio_server() {
1608        let file = std::env::temp_dir().join(format!("mcp-test-{}.json", Uuid::new_v4()));
1609        let registry = McpRegistry::new_with_state_file(file);
1610        registry
1611            .add("example".to_string(), "sse:https://example.com".to_string())
1612            .await;
1613        assert!(registry.connect("example").await);
1614        let listed = registry.list().await;
1615        assert!(listed.get("example").map(|s| s.connected).unwrap_or(false));
1616        assert!(registry.disconnect("example").await);
1617    }
1618
1619    #[test]
1620    fn parse_remote_endpoint_supports_http_prefixes() {
1621        assert_eq!(
1622            parse_remote_endpoint("https://mcp.example.com/mcp"),
1623            Some("https://mcp.example.com/mcp".to_string())
1624        );
1625        assert_eq!(
1626            parse_remote_endpoint("http:https://mcp.example.com/mcp"),
1627            Some("https://mcp.example.com/mcp".to_string())
1628        );
1629    }
1630
1631    #[test]
1632    fn normalize_schema_removes_non_string_enums_recursively() {
1633        let mut schema = json!({
1634            "type": "object",
1635            "properties": {
1636                "good": { "type": "string", "enum": ["a", "b"] },
1637                "good_nullable": { "type": ["string", "null"], "enum": ["asc", "desc"] },
1638                "bad_object": { "type": "object", "enum": ["asc", "desc"] },
1639                "bad_array": { "type": "array", "enum": ["asc", "desc"] },
1640                "bad_number": { "type": "number", "enum": [1, 2] },
1641                "bad_mixed": { "enum": ["ok", 1] },
1642                "nested": {
1643                    "type": "object",
1644                    "properties": {
1645                        "child": { "enum": [true, false] }
1646                    }
1647                }
1648            }
1649        });
1650
1651        normalize_tool_input_schema(&mut schema);
1652
1653        assert!(
1654            schema["properties"]["good"]["enum"].is_array(),
1655            "string enums should be preserved"
1656        );
1657        assert!(
1658            schema["properties"]["good_nullable"]["enum"].is_array(),
1659            "string|null enums should be preserved"
1660        );
1661        assert!(
1662            schema["properties"]["bad_object"]["enum"].is_null(),
1663            "object enums should be dropped"
1664        );
1665        assert!(
1666            schema["properties"]["bad_array"]["enum"].is_null(),
1667            "array enums should be dropped"
1668        );
1669        assert!(
1670            schema["properties"]["bad_number"]["enum"].is_null(),
1671            "non-string enums should be dropped"
1672        );
1673        assert!(
1674            schema["properties"]["bad_mixed"]["enum"].is_null(),
1675            "mixed enums should be dropped"
1676        );
1677        assert!(
1678            schema["properties"]["nested"]["properties"]["child"]["enum"].is_null(),
1679            "recursive non-string enums should be dropped"
1680        );
1681    }
1682
1683    #[test]
1684    fn extract_auth_challenge_from_result_payload() {
1685        let payload = json!({
1686            "content": [
1687                {
1688                    "type": "text",
1689                    "llm_instructions": "Authorize Gmail access first.",
1690                    "authorization_url": "https://example.com/oauth/start"
1691                }
1692            ]
1693        });
1694        let challenge = extract_auth_challenge(&payload, "gmail_whoami")
1695            .expect("auth challenge should be detected");
1696        assert_eq!(challenge.tool_name, "gmail_whoami");
1697        assert_eq!(
1698            challenge.authorization_url,
1699            "https://example.com/oauth/start"
1700        );
1701        assert_eq!(challenge.status, "pending");
1702    }
1703
1704    #[test]
1705    fn extract_auth_challenge_returns_none_without_url() {
1706        let payload = json!({
1707            "content": [
1708                {"type":"text","text":"No authorization needed"}
1709            ]
1710        });
1711        assert!(extract_auth_challenge(&payload, "gmail_whoami").is_none());
1712    }
1713
1714    #[test]
1715    fn extract_auth_challenge_prefers_structured_content_message() {
1716        let payload = json!({
1717            "content": [
1718                {
1719                    "type": "text",
1720                    "text": "{\"authorization_url\":\"https://example.com/oauth\",\"message\":\"json blob\"}"
1721                }
1722            ],
1723            "structuredContent": {
1724                "authorization_url": "https://example.com/oauth",
1725                "message": "Authorize Reddit access first."
1726            }
1727        });
1728        let challenge = extract_auth_challenge(&payload, "reddit_getmyusername")
1729            .expect("auth challenge should be detected");
1730        assert_eq!(challenge.message, "Authorize Reddit access first.");
1731    }
1732
1733    #[test]
1734    fn sanitize_auth_message_compacts_llm_instructions() {
1735        let raw = "Please show the following link to the end user formatted as markdown: https://example.com/auth\nInform the end user that this tool requires authorization.";
1736        let message = sanitize_auth_message(raw);
1737        assert!(!message.contains('\n'));
1738        assert!(message.len() <= 283);
1739    }
1740
1741    #[test]
1742    fn normalize_mcp_tool_args_maps_clickup_aliases() {
1743        let server = McpServer {
1744            name: "arcade".to_string(),
1745            transport: "https://example.com/mcp".to_string(),
1746            enabled: true,
1747            connected: true,
1748            pid: None,
1749            last_error: None,
1750            last_auth_challenge: None,
1751            mcp_session_id: None,
1752            headers: HashMap::new(),
1753            secret_headers: HashMap::new(),
1754            tool_cache: vec![McpToolCacheEntry {
1755                tool_name: "Clickup_CreateTask".to_string(),
1756                description: "Create task".to_string(),
1757                input_schema: json!({
1758                    "type":"object",
1759                    "properties":{
1760                        "list_id":{"type":"string"},
1761                        "task_title":{"type":"string"}
1762                    },
1763                    "required":["list_id","task_title"]
1764                }),
1765                fetched_at_ms: 0,
1766                schema_hash: "x".to_string(),
1767            }],
1768            tools_fetched_at_ms: None,
1769            pending_auth_by_tool: HashMap::new(),
1770            secret_header_values: HashMap::new(),
1771        };
1772
1773        let normalized = normalize_mcp_tool_args(
1774            &server,
1775            "Clickup_CreateTask",
1776            json!({
1777                "listId": "123",
1778                "name": "Prep fish"
1779            }),
1780        );
1781        assert_eq!(
1782            normalized.get("list_id").and_then(|v| v.as_str()),
1783            Some("123")
1784        );
1785        assert_eq!(
1786            normalized.get("task_title").and_then(|v| v.as_str()),
1787            Some("Prep fish")
1788        );
1789    }
1790
1791    #[test]
1792    fn normalize_arg_key_ignores_case_and_separators() {
1793        assert_eq!(normalize_arg_key("task_title"), "tasktitle");
1794        assert_eq!(normalize_arg_key("taskTitle"), "tasktitle");
1795        assert_eq!(normalize_arg_key("task-title"), "tasktitle");
1796    }
1797
1798    #[test]
1799    fn pending_auth_blocks_retries_within_cooldown() {
1800        let mut server = McpServer {
1801            name: "arcade".to_string(),
1802            transport: "https://example.com/mcp".to_string(),
1803            enabled: true,
1804            connected: true,
1805            pid: None,
1806            last_error: None,
1807            last_auth_challenge: None,
1808            mcp_session_id: None,
1809            headers: HashMap::new(),
1810            secret_headers: HashMap::new(),
1811            tool_cache: Vec::new(),
1812            tools_fetched_at_ms: None,
1813            pending_auth_by_tool: HashMap::new(),
1814            secret_header_values: HashMap::new(),
1815        };
1816        server.pending_auth_by_tool.insert(
1817            "clickup_whoami".to_string(),
1818            PendingMcpAuth {
1819                challenge_id: "abc".to_string(),
1820                authorization_url: "https://example.com/auth".to_string(),
1821                message: "Authorize ClickUp access.".to_string(),
1822                status: "pending".to_string(),
1823                first_seen_ms: 1_000,
1824                last_probe_ms: 2_000,
1825            },
1826        );
1827        let blocked =
1828            pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 10_000, 15_000)
1829                .expect("should block");
1830        assert!(blocked.output.contains("Authorization pending"));
1831        assert!(blocked
1832            .mcp_auth
1833            .get("pending")
1834            .and_then(|v| v.as_bool())
1835            .unwrap_or(false));
1836    }
1837
1838    #[test]
1839    fn pending_auth_allows_probe_after_cooldown() {
1840        let mut server = McpServer {
1841            name: "arcade".to_string(),
1842            transport: "https://example.com/mcp".to_string(),
1843            enabled: true,
1844            connected: true,
1845            pid: None,
1846            last_error: None,
1847            last_auth_challenge: None,
1848            mcp_session_id: None,
1849            headers: HashMap::new(),
1850            secret_headers: HashMap::new(),
1851            tool_cache: Vec::new(),
1852            tools_fetched_at_ms: None,
1853            pending_auth_by_tool: HashMap::new(),
1854            secret_header_values: HashMap::new(),
1855        };
1856        server.pending_auth_by_tool.insert(
1857            "clickup_whoami".to_string(),
1858            PendingMcpAuth {
1859                challenge_id: "abc".to_string(),
1860                authorization_url: "https://example.com/auth".to_string(),
1861                message: "Authorize ClickUp access.".to_string(),
1862                status: "pending".to_string(),
1863                first_seen_ms: 1_000,
1864                last_probe_ms: 2_000,
1865            },
1866        );
1867        assert!(
1868            pending_auth_short_circuit(&server, "clickup_whoami", "Clickup_WhoAmI", 17_001, 15_000)
1869                .is_none(),
1870            "cooldown elapsed should allow re-probe"
1871        );
1872    }
1873
1874    #[test]
1875    fn pending_auth_is_tool_scoped() {
1876        let mut server = McpServer {
1877            name: "arcade".to_string(),
1878            transport: "https://example.com/mcp".to_string(),
1879            enabled: true,
1880            connected: true,
1881            pid: None,
1882            last_error: None,
1883            last_auth_challenge: None,
1884            mcp_session_id: None,
1885            headers: HashMap::new(),
1886            secret_headers: HashMap::new(),
1887            tool_cache: Vec::new(),
1888            tools_fetched_at_ms: None,
1889            pending_auth_by_tool: HashMap::new(),
1890            secret_header_values: HashMap::new(),
1891        };
1892        server.pending_auth_by_tool.insert(
1893            "gmail_sendemail".to_string(),
1894            PendingMcpAuth {
1895                challenge_id: "abc".to_string(),
1896                authorization_url: "https://example.com/auth".to_string(),
1897                message: "Authorize Gmail access.".to_string(),
1898                status: "pending".to_string(),
1899                first_seen_ms: 1_000,
1900                last_probe_ms: 2_000,
1901            },
1902        );
1903        assert!(pending_auth_short_circuit(
1904            &server,
1905            "gmail_sendemail",
1906            "Gmail_SendEmail",
1907            2_100,
1908            15_000
1909        )
1910        .is_some());
1911        assert!(pending_auth_short_circuit(
1912            &server,
1913            "clickup_whoami",
1914            "Clickup_WhoAmI",
1915            2_100,
1916            15_000
1917        )
1918        .is_none());
1919    }
1920}