Skip to main content

tandem_runtime/mcp_parts/
part01.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::{LocalImplicitTenant, SecretRef, TenantContext, ToolResult};
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22    pub tool_name: String,
23    pub description: String,
24    #[serde(default)]
25    pub input_schema: Value,
26    pub fetched_at_ms: u64,
27    pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32    pub name: String,
33    pub transport: String,
34    #[serde(default, skip_serializing_if = "String::is_empty")]
35    pub auth_kind: String,
36    #[serde(default = "default_enabled")]
37    pub enabled: bool,
38    pub connected: bool,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub pid: Option<u32>,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub last_error: Option<String>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub last_auth_challenge: Option<McpAuthChallenge>,
45    #[serde(default, skip_serializing_if = "Option::is_none")]
46    pub mcp_session_id: Option<String>,
47    #[serde(default)]
48    pub headers: HashMap<String, String>,
49    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
50    pub secret_headers: HashMap<String, McpSecretRef>,
51    #[serde(default)]
52    pub tool_cache: Vec<McpToolCacheEntry>,
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub tools_fetched_at_ms: Option<u64>,
55    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
56    pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
57    #[serde(default, skip)]
58    pub secret_header_values: HashMap<String, String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(tag = "type", rename_all = "snake_case")]
63pub enum McpSecretRef {
64    Store {
65        secret_id: String,
66        #[serde(default)]
67        tenant_context: TenantContext,
68    },
69    Env {
70        env: String,
71    },
72    BearerEnv {
73        env: String,
74    },
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct McpAuthChallenge {
79    pub challenge_id: String,
80    pub tool_name: String,
81    pub authorization_url: String,
82    pub message: String,
83    pub requested_at_ms: u64,
84    pub status: String,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct PendingMcpAuth {
89    pub challenge_id: String,
90    pub authorization_url: String,
91    pub message: String,
92    pub status: String,
93    pub first_seen_ms: u64,
94    pub last_probe_ms: u64,
95}
96
97#[derive(Debug, Clone)]
98enum DiscoverRemoteToolsError {
99    Message(String),
100    AuthChallenge(McpAuthChallenge),
101}
102
103impl From<String> for DiscoverRemoteToolsError {
104    fn from(value: String) -> Self {
105        Self::Message(value)
106    }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct McpRemoteTool {
111    pub server_name: String,
112    pub tool_name: String,
113    pub namespaced_name: String,
114    pub description: String,
115    #[serde(default)]
116    pub input_schema: Value,
117    pub fetched_at_ms: u64,
118    pub schema_hash: String,
119}
120
121#[derive(Clone)]
122pub struct McpRegistry {
123    servers: Arc<RwLock<HashMap<String, McpServer>>>,
124    processes: Arc<Mutex<HashMap<String, Child>>>,
125    state_file: Arc<PathBuf>,
126}
127
128impl McpRegistry {
129    pub fn new() -> Self {
130        Self::new_with_state_file(resolve_state_file())
131    }
132
133    pub fn new_with_state_file(state_file: PathBuf) -> Self {
134        let (loaded_state, migrated) = load_state(&state_file);
135        let loaded = loaded_state
136            .into_iter()
137            .map(|(k, mut v)| {
138                v.connected = false;
139                v.pid = None;
140                if v.name.trim().is_empty() {
141                    v.name = k.clone();
142                }
143                if v.headers.is_empty() {
144                    v.headers = HashMap::new();
145                }
146                if v.secret_headers.is_empty() {
147                    v.secret_headers = HashMap::new();
148                }
149                let tenant_context = local_tenant_context();
150                v.secret_header_values =
151                    resolve_secret_header_values(&v.secret_headers, &tenant_context);
152                (k, v)
153            })
154            .collect::<HashMap<_, _>>();
155        if migrated {
156            persist_state_blocking(&state_file, &loaded);
157        }
158        Self {
159            servers: Arc::new(RwLock::new(loaded)),
160            processes: Arc::new(Mutex::new(HashMap::new())),
161            state_file: Arc::new(state_file),
162        }
163    }
164
165    pub async fn list(&self) -> HashMap<String, McpServer> {
166        self.servers.read().await.clone()
167    }
168
169    pub async fn list_public(&self) -> HashMap<String, McpServer> {
170        self.servers
171            .read()
172            .await
173            .iter()
174            .map(|(name, server)| (name.clone(), redacted_server_view(server)))
175            .collect()
176    }
177
178    pub async fn add(&self, name: String, transport: String) {
179        self.add_or_update(name, transport, HashMap::new(), true)
180            .await;
181    }
182
183    pub async fn add_or_update(
184        &self,
185        name: String,
186        transport: String,
187        headers: HashMap<String, String>,
188        enabled: bool,
189    ) {
190        self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
191            .await;
192    }
193
194    pub async fn add_or_update_with_secret_refs(
195        &self,
196        name: String,
197        transport: String,
198        headers: HashMap<String, String>,
199        secret_headers: HashMap<String, McpSecretRef>,
200        enabled: bool,
201    ) {
202        let normalized_name = name.trim().to_string();
203        let tenant_context = local_tenant_context();
204        let (persisted_headers, persisted_secret_headers, secret_header_values) =
205            split_headers_for_storage(&normalized_name, headers, secret_headers, &tenant_context);
206        let mut servers = self.servers.write().await;
207        let existing = servers.get(&normalized_name).cloned();
208        let preserve_cache = existing.as_ref().is_some_and(|row| {
209            row.transport == transport
210                && effective_headers(row)
211                    == combine_headers(&persisted_headers, &secret_header_values)
212        });
213        let existing_tool_cache = if preserve_cache {
214            existing
215                .as_ref()
216                .map(|row| row.tool_cache.clone())
217                .unwrap_or_default()
218        } else {
219            Vec::new()
220        };
221        let existing_fetched_at = if preserve_cache {
222            existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
223        } else {
224            None
225        };
226        let server = McpServer {
227            name: normalized_name.clone(),
228            transport,
229            auth_kind: existing
230                .as_ref()
231                .map(|row| row.auth_kind.clone())
232                .unwrap_or_default(),
233            enabled,
234            connected: false,
235            pid: None,
236            last_error: None,
237            last_auth_challenge: None,
238            mcp_session_id: None,
239            headers: persisted_headers,
240            secret_headers: persisted_secret_headers,
241            tool_cache: existing_tool_cache,
242            tools_fetched_at_ms: existing_fetched_at,
243            pending_auth_by_tool: HashMap::new(),
244            secret_header_values,
245        };
246        servers.insert(normalized_name, server);
247        drop(servers);
248        self.persist_state().await;
249    }
250
251    pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
252        let mut servers = self.servers.write().await;
253        let Some(server) = servers.get_mut(name) else {
254            return false;
255        };
256        server.enabled = enabled;
257        if !enabled {
258            server.connected = false;
259            server.pid = None;
260            server.last_auth_challenge = None;
261            server.mcp_session_id = None;
262            server.pending_auth_by_tool.clear();
263        }
264        drop(servers);
265        if !enabled {
266            if let Some(mut child) = self.processes.lock().await.remove(name) {
267                let _ = child.kill().await;
268                let _ = child.wait().await;
269            }
270        }
271        self.persist_state().await;
272        true
273    }
274
275    pub async fn remove(&self, name: &str) -> bool {
276        let removed_server = {
277            let mut servers = self.servers.write().await;
278            servers.remove(name)
279        };
280        let Some(server) = removed_server else {
281            return false;
282        };
283        let current_tenant = local_tenant_context();
284        delete_secret_header_refs(&server.secret_headers, &current_tenant);
285
286        if let Some(mut child) = self.processes.lock().await.remove(name) {
287            let _ = child.kill().await;
288            let _ = child.wait().await;
289        }
290        self.persist_state().await;
291        true
292    }
293
294    pub async fn connect(&self, name: &str) -> bool {
295        let server = {
296            let servers = self.servers.read().await;
297            let Some(server) = servers.get(name) else {
298                return false;
299            };
300            server.clone()
301        };
302
303        if !server.enabled {
304            let mut servers = self.servers.write().await;
305            if let Some(entry) = servers.get_mut(name) {
306                entry.connected = false;
307                entry.pid = None;
308                entry.last_error = Some("MCP server is disabled".to_string());
309                entry.last_auth_challenge = None;
310                entry.mcp_session_id = None;
311                entry.pending_auth_by_tool.clear();
312            }
313            drop(servers);
314            self.persist_state().await;
315            return false;
316        }
317
318        if let Some(command_text) = parse_stdio_transport(&server.transport) {
319            return self.connect_stdio(name, command_text).await;
320        }
321
322        if parse_remote_endpoint(&server.transport).is_some() {
323            return self.refresh(name).await.is_ok();
324        }
325
326        let mut servers = self.servers.write().await;
327        if let Some(entry) = servers.get_mut(name) {
328            entry.connected = true;
329            entry.pid = None;
330            entry.last_error = None;
331            entry.last_auth_challenge = None;
332            entry.mcp_session_id = None;
333            entry.pending_auth_by_tool.clear();
334        }
335        drop(servers);
336        self.persist_state().await;
337        true
338    }
339
340    pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
341        let server = {
342            let servers = self.servers.read().await;
343            let Some(server) = servers.get(name) else {
344                return Err("MCP server not found".to_string());
345            };
346            server.clone()
347        };
348
349        if !server.enabled {
350            return Err("MCP server is disabled".to_string());
351        }
352
353        let endpoint = parse_remote_endpoint(&server.transport)
354            .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
355
356        let request_headers = effective_headers(&server);
357        let (tools, session_id) = match self
358            .discover_remote_tools(name, &endpoint, &request_headers)
359            .await
360        {
361            Ok(result) => result,
362            Err(DiscoverRemoteToolsError::AuthChallenge(challenge)) => {
363                let mut servers = self.servers.write().await;
364                if let Some(entry) = servers.get_mut(name) {
365                    entry.connected = false;
366                    entry.pid = None;
367                    entry.last_error = Some(challenge.message.clone());
368                    entry.last_auth_challenge = Some(challenge.clone());
369                    entry.mcp_session_id = None;
370                    entry.pending_auth_by_tool.clear();
371                    entry.tool_cache.clear();
372                    entry.tools_fetched_at_ms = None;
373                }
374                drop(servers);
375                self.persist_state().await;
376                return Err(format!(
377                    "MCP server '{name}' requires authorization: {}",
378                    challenge.message
379                ));
380            }
381            Err(DiscoverRemoteToolsError::Message(err)) => {
382                let mut servers = self.servers.write().await;
383                if let Some(entry) = servers.get_mut(name) {
384                    entry.connected = false;
385                    entry.pid = None;
386                    entry.last_error = Some(err.clone());
387                    entry.last_auth_challenge = None;
388                    entry.mcp_session_id = None;
389                    entry.pending_auth_by_tool.clear();
390                    entry.tool_cache.clear();
391                    entry.tools_fetched_at_ms = None;
392                }
393                drop(servers);
394                self.persist_state().await;
395                return Err(err);
396            }
397        };
398
399        let now = now_ms();
400        let cache = tools
401            .iter()
402            .map(|tool| McpToolCacheEntry {
403                tool_name: tool.tool_name.clone(),
404                description: tool.description.clone(),
405                input_schema: tool.input_schema.clone(),
406                fetched_at_ms: now,
407                schema_hash: schema_hash(&tool.input_schema),
408            })
409            .collect::<Vec<_>>();
410
411        let mut servers = self.servers.write().await;
412        if let Some(entry) = servers.get_mut(name) {
413            entry.connected = true;
414            entry.pid = None;
415            entry.last_error = None;
416            entry.last_auth_challenge = None;
417            entry.mcp_session_id = session_id;
418            entry.tool_cache = cache;
419            entry.tools_fetched_at_ms = Some(now);
420            entry.pending_auth_by_tool.clear();
421        }
422        drop(servers);
423        self.persist_state().await;
424        Ok(self.server_tools(name).await)
425    }
426
427    pub async fn disconnect(&self, name: &str) -> bool {
428        if let Some(mut child) = self.processes.lock().await.remove(name) {
429            let _ = child.kill().await;
430            let _ = child.wait().await;
431        }
432        let mut servers = self.servers.write().await;
433        if let Some(server) = servers.get_mut(name) {
434            server.connected = false;
435            server.pid = None;
436            server.last_auth_challenge = None;
437            server.mcp_session_id = None;
438            server.pending_auth_by_tool.clear();
439            drop(servers);
440            self.persist_state().await;
441            return true;
442        }
443        false
444    }
445
446    pub async fn complete_auth(&self, name: &str) -> bool {
447        let mut servers = self.servers.write().await;
448        let Some(server) = servers.get_mut(name) else {
449            return false;
450        };
451        server.last_error = None;
452        server.last_auth_challenge = None;
453        server.pending_auth_by_tool.clear();
454        drop(servers);
455        self.persist_state().await;
456        true
457    }
458
459    pub async fn set_auth_kind(&self, name: &str, auth_kind: String) -> bool {
460        let normalized = normalize_auth_kind(&auth_kind);
461        let mut servers = self.servers.write().await;
462        let Some(server) = servers.get_mut(name) else {
463            return false;
464        };
465        server.auth_kind = normalized;
466        drop(servers);
467        self.persist_state().await;
468        true
469    }
470
471    pub async fn record_server_auth_challenge(
472        &self,
473        name: &str,
474        challenge: McpAuthChallenge,
475        last_error: Option<String>,
476    ) -> bool {
477        let mut servers = self.servers.write().await;
478        let Some(server) = servers.get_mut(name) else {
479            return false;
480        };
481        let tool_key = canonical_tool_key(&challenge.tool_name);
482        server.connected = false;
483        server.pid = None;
484        server.last_error = last_error.or_else(|| Some(challenge.message.clone()));
485        server.last_auth_challenge = Some(challenge.clone());
486        server.mcp_session_id = None;
487        server.pending_auth_by_tool.clear();
488        server
489            .pending_auth_by_tool
490            .insert(tool_key, pending_auth_from_challenge(&challenge));
491        drop(servers);
492        self.persist_state().await;
493        true
494    }
495
496    pub async fn clear_server_auth_challenge(&self, name: &str) -> bool {
497        let mut servers = self.servers.write().await;
498        let Some(server) = servers.get_mut(name) else {
499            return false;
500        };
501        server.last_auth_challenge = None;
502        server.pending_auth_by_tool.clear();
503        drop(servers);
504        self.persist_state().await;
505        true
506    }
507
508    pub async fn set_bearer_token(&self, name: &str, token: &str) -> Result<bool, String> {
509        let trimmed = token.trim();
510        if trimmed.is_empty() {
511            return Err("oauth access token cannot be empty".to_string());
512        }
513        let current_tenant = local_tenant_context();
514        let mut servers = self.servers.write().await;
515        let Some(server) = servers.get_mut(name) else {
516            return Ok(false);
517        };
518        let header_name = "Authorization".to_string();
519        let secret_id = mcp_header_secret_id(name, &header_name);
520        tandem_core::set_provider_auth(&secret_id, &format!("Bearer {trimmed}"))
521            .map_err(|error| error.to_string())?;
522        server.secret_headers.insert(
523            header_name.clone(),
524            McpSecretRef::Store {
525                secret_id: secret_id.clone(),
526                tenant_context: current_tenant,
527            },
528        );
529        server
530            .secret_header_values
531            .insert(header_name.clone(), format!("Bearer {trimmed}"));
532        server.headers.remove(&header_name);
533        drop(servers);
534        self.persist_state().await;
535        Ok(true)
536    }
537
538    pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
539        let mut out = self
540            .servers
541            .read()
542            .await
543            .values()
544            .filter(|server| server.enabled && server.connected)
545            .flat_map(server_tool_rows)
546            .collect::<Vec<_>>();
547        out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
548        out
549    }
550
551    pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
552        let Some(server) = self.servers.read().await.get(name).cloned() else {
553            return Vec::new();
554        };
555        let mut rows = server_tool_rows(&server);
556        rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
557        rows
558    }
559
560    pub async fn call_tool(
561        &self,
562        server_name: &str,
563        tool_name: &str,
564        args: Value,
565    ) -> Result<ToolResult, String> {
566        let server = {
567            let servers = self.servers.read().await;
568            let Some(server) = servers.get(server_name) else {
569                return Err(format!("MCP server '{server_name}' not found"));
570            };
571            server.clone()
572        };
573
574        if !server.enabled {
575            return Err(format!("MCP server '{server_name}' is disabled"));
576        }
577        if !server.connected {
578            return Err(format!("MCP server '{server_name}' is not connected"));
579        }
580
581        let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
582            "MCP tools/call currently supports HTTP/S transports only".to_string()
583        })?;
584        let canonical_tool = canonical_tool_key(tool_name);
585        let now = now_ms();
586        if let Some(blocked) = pending_auth_short_circuit(
587            &server,
588            &canonical_tool,
589            tool_name,
590            now,
591            MCP_AUTH_REPROBE_COOLDOWN_MS,
592        ) {
593            return Ok(ToolResult {
594                output: blocked.output,
595                metadata: json!({
596                    "server": server_name,
597                    "tool": tool_name,
598                    "result": Value::Null,
599                    "mcpAuth": blocked.mcp_auth
600                }),
601            });
602        }
603        let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
604
605        {
606            let mut servers = self.servers.write().await;
607            if let Some(row) = servers.get_mut(server_name) {
608                if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
609                    pending.last_probe_ms = now;
610                }
611            }
612        }
613
614        let request = json!({
615            "jsonrpc": "2.0",
616            "id": format!("call-{}-{}", server_name, now_ms()),
617            "method": "tools/call",
618            "params": {
619                "name": tool_name,
620                "arguments": normalized_args
621            }
622        });
623        let (response, session_id) = post_json_rpc_with_session(
624            &endpoint,
625            &effective_headers(&server),
626            request,
627            server.mcp_session_id.as_deref(),
628        )
629        .await?;
630        if session_id.is_some() {
631            let mut servers = self.servers.write().await;
632            if let Some(row) = servers.get_mut(server_name) {
633                row.mcp_session_id = session_id;
634            }
635            drop(servers);
636            self.persist_state().await;
637        }
638
639        if let Some(err) = response.get("error") {
640            if let Some(challenge) = extract_auth_challenge(err, tool_name) {
641                let output = format!(
642                    "{}\n\nAuthorize here: {}",
643                    challenge.message, challenge.authorization_url
644                );
645                {
646                    let mut servers = self.servers.write().await;
647                    if let Some(row) = servers.get_mut(server_name) {
648                        row.last_auth_challenge = Some(challenge.clone());
649                        row.last_error = None;
650                        row.pending_auth_by_tool.insert(
651                            canonical_tool.clone(),
652                            pending_auth_from_challenge(&challenge),
653                        );
654                    }
655                }
656                self.persist_state().await;
657                return Ok(ToolResult {
658                    output,
659                    metadata: json!({
660                        "server": server_name,
661                        "tool": tool_name,
662                        "result": Value::Null,
663                        "mcpAuth": {
664                            "required": true,
665                            "challengeId": challenge.challenge_id,
666                            "tool": challenge.tool_name,
667                            "authorizationUrl": challenge.authorization_url,
668                            "message": challenge.message,
669                            "status": challenge.status
670                        }
671                    }),
672                });
673            }
674            let message = err
675                .get("message")
676                .and_then(|v| v.as_str())
677                .unwrap_or("MCP tools/call failed");
678            return Err(message.to_string());
679        }
680
681        let result = response.get("result").cloned().unwrap_or(Value::Null);
682        let auth_challenge = extract_auth_challenge(&result, tool_name);
683        let output = if let Some(challenge) = auth_challenge.as_ref() {
684            format!(
685                "{}\n\nAuthorize here: {}",
686                challenge.message, challenge.authorization_url
687            )
688        } else {
689            result
690                .get("content")
691                .map(render_mcp_content)
692                .or_else(|| result.get("output").map(|v| v.to_string()))
693                .unwrap_or_else(|| result.to_string())
694        };
695
696        {
697            let mut servers = self.servers.write().await;
698            if let Some(row) = servers.get_mut(server_name) {
699                row.last_auth_challenge = auth_challenge.clone();
700                if let Some(challenge) = auth_challenge.as_ref() {
701                    row.pending_auth_by_tool.insert(
702                        canonical_tool.clone(),
703                        pending_auth_from_challenge(challenge),
704                    );
705                } else {
706                    row.pending_auth_by_tool.remove(&canonical_tool);
707                }
708            }
709        }
710        self.persist_state().await;
711
712        let auth_metadata = auth_challenge.as_ref().map(|challenge| {
713            json!({
714                "required": true,
715                "challengeId": challenge.challenge_id,
716                "tool": challenge.tool_name,
717                "authorizationUrl": challenge.authorization_url,
718                "message": challenge.message,
719                "status": challenge.status
720            })
721        });
722
723        Ok(ToolResult {
724            output,
725            metadata: json!({
726                "server": server_name,
727                "tool": tool_name,
728                "result": result,
729                "mcpAuth": auth_metadata
730            }),
731        })
732    }
733
734    async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
735        match spawn_stdio_process(command_text).await {
736            Ok(child) => {
737                let pid = child.id();
738                self.processes.lock().await.insert(name.to_string(), child);
739                let mut servers = self.servers.write().await;
740                if let Some(server) = servers.get_mut(name) {
741                    server.connected = true;
742                    server.pid = pid;
743                    server.last_error = None;
744                    server.last_auth_challenge = None;
745                    server.pending_auth_by_tool.clear();
746                }
747                drop(servers);
748                self.persist_state().await;
749                true
750            }
751            Err(err) => {
752                let mut servers = self.servers.write().await;
753                if let Some(server) = servers.get_mut(name) {
754                    server.connected = false;
755                    server.pid = None;
756                    server.last_error = Some(err);
757                    server.last_auth_challenge = None;
758                    server.pending_auth_by_tool.clear();
759                }
760                drop(servers);
761                self.persist_state().await;
762                false
763            }
764        }
765    }
766
767    async fn discover_remote_tools(
768        &self,
769        server_name: &str,
770        endpoint: &str,
771        headers: &HashMap<String, String>,
772    ) -> Result<(Vec<McpRemoteTool>, Option<String>), DiscoverRemoteToolsError> {
773        let initialize = json!({
774            "jsonrpc": "2.0",
775            "id": "initialize-1",
776            "method": "initialize",
777            "params": {
778                "protocolVersion": MCP_PROTOCOL_VERSION,
779                "capabilities": {},
780                "clientInfo": {
781                    "name": MCP_CLIENT_NAME,
782                    "version": MCP_CLIENT_VERSION,
783                }
784            }
785        });
786        let (init_response, mut session_id) =
787            post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
788        if let Some(err) = init_response.get("error") {
789            if let Some(challenge) = extract_auth_challenge(err, server_name) {
790                return Err(DiscoverRemoteToolsError::AuthChallenge(challenge));
791            }
792            let message = err
793                .get("message")
794                .and_then(|v| v.as_str())
795                .unwrap_or("MCP initialize failed");
796            return Err(DiscoverRemoteToolsError::Message(message.to_string()));
797        }
798
799        let tools_list = json!({
800            "jsonrpc": "2.0",
801            "id": "tools-list-1",
802            "method": "tools/list",
803            "params": {}
804        });
805        let (tools_response, next_session_id) =
806            post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
807                .await?;
808        if next_session_id.is_some() {
809            session_id = next_session_id;
810        }
811        if let Some(err) = tools_response.get("error") {
812            if let Some(challenge) = extract_auth_challenge(err, server_name) {
813                return Err(DiscoverRemoteToolsError::AuthChallenge(challenge));
814            }
815            let message = err
816                .get("message")
817                .and_then(|v| v.as_str())
818                .unwrap_or("MCP tools/list failed");
819            return Err(DiscoverRemoteToolsError::Message(message.to_string()));
820        }
821
822        let tools = tools_response
823            .get("result")
824            .and_then(|v| v.get("tools"))
825            .and_then(|v| v.as_array())
826            .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
827
828        let now = now_ms();
829        let mut out = Vec::new();
830        for row in tools {
831            let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
832                continue;
833            };
834            let description = row
835                .get("description")
836                .and_then(|v| v.as_str())
837                .unwrap_or("")
838                .to_string();
839            let mut input_schema = row
840                .get("inputSchema")
841                .or_else(|| row.get("input_schema"))
842                .cloned()
843                .unwrap_or_else(|| json!({"type":"object"}));
844            normalize_tool_input_schema(&mut input_schema);
845            out.push(McpRemoteTool {
846                server_name: String::new(),
847                tool_name: tool_name.to_string(),
848                namespaced_name: String::new(),
849                description,
850                input_schema,
851                fetched_at_ms: now,
852                schema_hash: String::new(),
853            });
854        }
855
856        Ok((out, session_id))
857    }
858
859    async fn persist_state(&self) {
860        let snapshot = self.servers.read().await.clone();
861        persist_state_blocking(self.state_file.as_path(), &snapshot);
862    }
863}
864
865impl Default for McpRegistry {
866    fn default() -> Self {
867        Self::new()
868    }
869}
870
871fn default_enabled() -> bool {
872    true
873}
874
875fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
876    if let Some(parent) = path.parent() {
877        let _ = std::fs::create_dir_all(parent);
878    }
879    if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
880        let _ = std::fs::write(path, payload);
881    }
882}
883
884fn resolve_state_file() -> PathBuf {
885    if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
886        return PathBuf::from(path);
887    }
888    if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
889        let trimmed = state_dir.trim();
890        if !trimmed.is_empty() {
891            return PathBuf::from(trimmed).join("mcp_servers.json");
892        }
893    }
894    if let Some(data_dir) = dirs::data_dir() {
895        return data_dir
896            .join("tandem")
897            .join("data")
898            .join("mcp_servers.json");
899    }
900    dirs::home_dir()
901        .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
902        .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
903}
904
905fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
906    let Ok(raw) = std::fs::read_to_string(path) else {
907        return (HashMap::new(), false);
908    };
909    let mut migrated = false;
910    let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
911    for (name, server) in parsed.iter_mut() {
912        let tenant_context = local_tenant_context();
913        let (headers, secret_headers, secret_header_values, server_migrated) =
914            migrate_server_headers(name, server, &tenant_context);
915        migrated = migrated || server_migrated;
916        server.headers = headers;
917        server.secret_headers = secret_headers;
918        server.secret_header_values = secret_header_values;
919    }
920    (parsed, migrated)
921}
922
923fn migrate_server_headers(
924    server_name: &str,
925    server: &McpServer,
926    current_tenant: &TenantContext,
927) -> (
928    HashMap<String, String>,
929    HashMap<String, McpSecretRef>,
930    HashMap<String, String>,
931    bool,
932) {
933    let original_effective = effective_headers(server);
934    let mut persisted_secret_headers = server.secret_headers.clone();
935    let mut secret_header_values =
936        resolve_secret_header_values(&persisted_secret_headers, current_tenant);
937    let mut persisted_headers = server.headers.clone();
938    let mut migrated = false;
939
940    let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
941    for header_name in header_keys {
942        let Some(value) = persisted_headers.get(&header_name).cloned() else {
943            continue;
944        };
945        if persisted_secret_headers.contains_key(&header_name) {
946            continue;
947        }
948        if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
949            persisted_headers.remove(&header_name);
950            let resolved =
951                resolve_secret_ref_value(&secret_ref, current_tenant).unwrap_or_default();
952            persisted_secret_headers.insert(header_name.clone(), secret_ref);
953            if !resolved.is_empty() {
954                secret_header_values.insert(header_name.clone(), resolved);
955            }
956            migrated = true;
957            continue;
958        }
959        if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
960            let secret_id = mcp_header_secret_id(server_name, &header_name);
961            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
962                persisted_headers.remove(&header_name);
963                persisted_secret_headers.insert(
964                    header_name.clone(),
965                    McpSecretRef::Store {
966                        secret_id: secret_id.clone(),
967                        tenant_context: current_tenant.clone(),
968                    },
969                );
970                secret_header_values.insert(header_name.clone(), value);
971                migrated = true;
972            }
973        }
974    }
975
976    if !migrated {
977        let effective = combine_headers(&persisted_headers, &secret_header_values);
978        migrated = effective != original_effective;
979    }
980
981    (
982        persisted_headers,
983        persisted_secret_headers,
984        secret_header_values,
985        migrated,
986    )
987}
988
989fn split_headers_for_storage(
990    server_name: &str,
991    headers: HashMap<String, String>,
992    explicit_secret_headers: HashMap<String, McpSecretRef>,
993    current_tenant: &TenantContext,
994) -> (
995    HashMap<String, String>,
996    HashMap<String, McpSecretRef>,
997    HashMap<String, String>,
998) {
999    let mut persisted_headers = HashMap::new();
1000    let mut persisted_secret_headers = HashMap::new();
1001    let mut secret_header_values = HashMap::new();
1002
1003    for (header_name, raw_value) in headers {
1004        let value = raw_value.trim().to_string();
1005        if value.is_empty() {
1006            continue;
1007        }
1008        if let Some(secret_ref) = parse_secret_header_reference(&value) {
1009            if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
1010                secret_header_values.insert(header_name.clone(), resolved);
1011            }
1012            persisted_secret_headers.insert(header_name, secret_ref);
1013            continue;
1014        }
1015        if header_name_is_sensitive(&header_name) {
1016            let secret_id = mcp_header_secret_id(server_name, &header_name);
1017            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
1018                persisted_secret_headers.insert(
1019                    header_name.clone(),
1020                    McpSecretRef::Store {
1021                        secret_id: secret_id.clone(),
1022                        tenant_context: current_tenant.clone(),
1023                    },
1024                );
1025                secret_header_values.insert(header_name, value);
1026                continue;
1027            }
1028        }
1029        persisted_headers.insert(header_name, value);
1030    }
1031
1032    for (header_name, secret_ref) in explicit_secret_headers {
1033        if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
1034            secret_header_values.insert(header_name.clone(), resolved);
1035        }
1036        persisted_headers.remove(&header_name);
1037        persisted_secret_headers.insert(header_name, secret_ref);
1038    }
1039
1040    (
1041        persisted_headers,
1042        persisted_secret_headers,
1043        secret_header_values,
1044    )
1045}
1046
1047fn combine_headers(
1048    headers: &HashMap<String, String>,
1049    secret_header_values: &HashMap<String, String>,
1050) -> HashMap<String, String> {
1051    let mut combined = headers.clone();
1052    for (key, value) in secret_header_values {
1053        if !value.trim().is_empty() {
1054            combined.insert(key.clone(), value.clone());
1055        }
1056    }
1057    combined
1058}
1059
1060fn effective_headers(server: &McpServer) -> HashMap<String, String> {
1061    combine_headers(&server.headers, &server.secret_header_values)
1062}
1063
1064fn redacted_server_view(server: &McpServer) -> McpServer {
1065    let mut clone = server.clone();
1066    for (header_name, secret_ref) in &clone.secret_headers {
1067        clone.headers.insert(
1068            header_name.clone(),
1069            redacted_secret_header_value(secret_ref),
1070        );
1071    }
1072    clone.secret_header_values.clear();
1073    clone
1074}
1075
1076fn normalize_auth_kind(raw: &str) -> String {
1077    match raw.trim().to_ascii_lowercase().as_str() {
1078        "oauth" | "auto" | "bearer" | "x-api-key" | "custom" | "none" => {
1079            raw.trim().to_ascii_lowercase()
1080        }
1081        _ => String::new(),
1082    }
1083}
1084
1085fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
1086    match secret_ref {
1087        McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
1088        McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
1089    }
1090}
1091
1092fn resolve_secret_header_values(
1093    secret_headers: &HashMap<String, McpSecretRef>,
1094    current_tenant: &TenantContext,
1095) -> HashMap<String, String> {
1096    let mut out = HashMap::new();
1097    for (header_name, secret_ref) in secret_headers {
1098        if let Some(value) = resolve_secret_ref_value(secret_ref, current_tenant) {
1099            if !value.trim().is_empty() {
1100                out.insert(header_name.clone(), value);
1101            }
1102        }
1103    }
1104    out
1105}
1106
1107fn delete_secret_header_refs(
1108    secret_headers: &HashMap<String, McpSecretRef>,
1109    current_tenant: &TenantContext,
1110) {
1111    for secret_ref in secret_headers.values() {
1112        if let McpSecretRef::Store {
1113            secret_id,
1114            tenant_context,
1115        } = secret_ref
1116        {
1117            if tenant_context != current_tenant {
1118                continue;
1119            }
1120            let _ = tandem_core::delete_provider_auth(secret_id);
1121        }
1122    }
1123}