Skip to main content

tandem_runtime/mcp_parts/
part01.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::{LocalImplicitTenant, SecretRef, TenantContext, ToolResult};
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22    pub tool_name: String,
23    pub description: String,
24    #[serde(default)]
25    pub input_schema: Value,
26    pub fetched_at_ms: u64,
27    pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32    pub name: String,
33    pub transport: String,
34    #[serde(default = "default_enabled")]
35    pub enabled: bool,
36    pub connected: bool,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub pid: Option<u32>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub last_error: Option<String>,
41    #[serde(default, skip_serializing_if = "Option::is_none")]
42    pub last_auth_challenge: Option<McpAuthChallenge>,
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub mcp_session_id: Option<String>,
45    #[serde(default)]
46    pub headers: HashMap<String, String>,
47    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48    pub secret_headers: HashMap<String, McpSecretRef>,
49    #[serde(default)]
50    pub tool_cache: Vec<McpToolCacheEntry>,
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub tools_fetched_at_ms: Option<u64>,
53    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
54    pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
55    #[serde(default, skip)]
56    pub secret_header_values: HashMap<String, String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum McpSecretRef {
62    Store {
63        secret_id: String,
64        #[serde(default)]
65        tenant_context: TenantContext,
66    },
67    Env {
68        env: String,
69    },
70    BearerEnv {
71        env: String,
72    },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct McpAuthChallenge {
77    pub challenge_id: String,
78    pub tool_name: String,
79    pub authorization_url: String,
80    pub message: String,
81    pub requested_at_ms: u64,
82    pub status: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PendingMcpAuth {
87    pub challenge_id: String,
88    pub authorization_url: String,
89    pub message: String,
90    pub status: String,
91    pub first_seen_ms: u64,
92    pub last_probe_ms: u64,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct McpRemoteTool {
97    pub server_name: String,
98    pub tool_name: String,
99    pub namespaced_name: String,
100    pub description: String,
101    #[serde(default)]
102    pub input_schema: Value,
103    pub fetched_at_ms: u64,
104    pub schema_hash: String,
105}
106
107#[derive(Clone)]
108pub struct McpRegistry {
109    servers: Arc<RwLock<HashMap<String, McpServer>>>,
110    processes: Arc<Mutex<HashMap<String, Child>>>,
111    state_file: Arc<PathBuf>,
112}
113
114impl McpRegistry {
115    pub fn new() -> Self {
116        Self::new_with_state_file(resolve_state_file())
117    }
118
119    pub fn new_with_state_file(state_file: PathBuf) -> Self {
120        let (loaded_state, migrated) = load_state(&state_file);
121        let loaded = loaded_state
122            .into_iter()
123            .map(|(k, mut v)| {
124                v.connected = false;
125                v.pid = None;
126                if v.name.trim().is_empty() {
127                    v.name = k.clone();
128                }
129                if v.headers.is_empty() {
130                    v.headers = HashMap::new();
131                }
132                if v.secret_headers.is_empty() {
133                    v.secret_headers = HashMap::new();
134                }
135                let tenant_context = local_tenant_context();
136                v.secret_header_values =
137                    resolve_secret_header_values(&v.secret_headers, &tenant_context);
138                (k, v)
139            })
140            .collect::<HashMap<_, _>>();
141        if migrated {
142            persist_state_blocking(&state_file, &loaded);
143        }
144        Self {
145            servers: Arc::new(RwLock::new(loaded)),
146            processes: Arc::new(Mutex::new(HashMap::new())),
147            state_file: Arc::new(state_file),
148        }
149    }
150
151    pub async fn list(&self) -> HashMap<String, McpServer> {
152        self.servers.read().await.clone()
153    }
154
155    pub async fn list_public(&self) -> HashMap<String, McpServer> {
156        self.servers
157            .read()
158            .await
159            .iter()
160            .map(|(name, server)| (name.clone(), redacted_server_view(server)))
161            .collect()
162    }
163
164    pub async fn add(&self, name: String, transport: String) {
165        self.add_or_update(name, transport, HashMap::new(), true)
166            .await;
167    }
168
169    pub async fn add_or_update(
170        &self,
171        name: String,
172        transport: String,
173        headers: HashMap<String, String>,
174        enabled: bool,
175    ) {
176        self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
177            .await;
178    }
179
180    pub async fn add_or_update_with_secret_refs(
181        &self,
182        name: String,
183        transport: String,
184        headers: HashMap<String, String>,
185        secret_headers: HashMap<String, McpSecretRef>,
186        enabled: bool,
187    ) {
188        let normalized_name = name.trim().to_string();
189        let tenant_context = local_tenant_context();
190        let (persisted_headers, persisted_secret_headers, secret_header_values) =
191            split_headers_for_storage(&normalized_name, headers, secret_headers, &tenant_context);
192        let mut servers = self.servers.write().await;
193        let existing = servers.get(&normalized_name).cloned();
194        let preserve_cache = existing.as_ref().is_some_and(|row| {
195            row.transport == transport
196                && effective_headers(row)
197                    == combine_headers(&persisted_headers, &secret_header_values)
198        });
199        let existing_tool_cache = if preserve_cache {
200            existing
201                .as_ref()
202                .map(|row| row.tool_cache.clone())
203                .unwrap_or_default()
204        } else {
205            Vec::new()
206        };
207        let existing_fetched_at = if preserve_cache {
208            existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
209        } else {
210            None
211        };
212        let server = McpServer {
213            name: normalized_name.clone(),
214            transport,
215            enabled,
216            connected: false,
217            pid: None,
218            last_error: None,
219            last_auth_challenge: None,
220            mcp_session_id: None,
221            headers: persisted_headers,
222            secret_headers: persisted_secret_headers,
223            tool_cache: existing_tool_cache,
224            tools_fetched_at_ms: existing_fetched_at,
225            pending_auth_by_tool: HashMap::new(),
226            secret_header_values,
227        };
228        servers.insert(normalized_name, server);
229        drop(servers);
230        self.persist_state().await;
231    }
232
233    pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
234        let mut servers = self.servers.write().await;
235        let Some(server) = servers.get_mut(name) else {
236            return false;
237        };
238        server.enabled = enabled;
239        if !enabled {
240            server.connected = false;
241            server.pid = None;
242            server.last_auth_challenge = None;
243            server.mcp_session_id = None;
244            server.pending_auth_by_tool.clear();
245        }
246        drop(servers);
247        if !enabled {
248            if let Some(mut child) = self.processes.lock().await.remove(name) {
249                let _ = child.kill().await;
250                let _ = child.wait().await;
251            }
252        }
253        self.persist_state().await;
254        true
255    }
256
257    pub async fn remove(&self, name: &str) -> bool {
258        let removed_server = {
259            let mut servers = self.servers.write().await;
260            servers.remove(name)
261        };
262        let Some(server) = removed_server else {
263            return false;
264        };
265        let current_tenant = local_tenant_context();
266        delete_secret_header_refs(&server.secret_headers, &current_tenant);
267
268        if let Some(mut child) = self.processes.lock().await.remove(name) {
269            let _ = child.kill().await;
270            let _ = child.wait().await;
271        }
272        self.persist_state().await;
273        true
274    }
275
276    pub async fn connect(&self, name: &str) -> bool {
277        let server = {
278            let servers = self.servers.read().await;
279            let Some(server) = servers.get(name) else {
280                return false;
281            };
282            server.clone()
283        };
284
285        if !server.enabled {
286            let mut servers = self.servers.write().await;
287            if let Some(entry) = servers.get_mut(name) {
288                entry.connected = false;
289                entry.pid = None;
290                entry.last_error = Some("MCP server is disabled".to_string());
291                entry.last_auth_challenge = None;
292                entry.mcp_session_id = None;
293                entry.pending_auth_by_tool.clear();
294            }
295            drop(servers);
296            self.persist_state().await;
297            return false;
298        }
299
300        if let Some(command_text) = parse_stdio_transport(&server.transport) {
301            return self.connect_stdio(name, command_text).await;
302        }
303
304        if parse_remote_endpoint(&server.transport).is_some() {
305            return self.refresh(name).await.is_ok();
306        }
307
308        let mut servers = self.servers.write().await;
309        if let Some(entry) = servers.get_mut(name) {
310            entry.connected = true;
311            entry.pid = None;
312            entry.last_error = None;
313            entry.last_auth_challenge = None;
314            entry.mcp_session_id = None;
315            entry.pending_auth_by_tool.clear();
316        }
317        drop(servers);
318        self.persist_state().await;
319        true
320    }
321
322    pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
323        let server = {
324            let servers = self.servers.read().await;
325            let Some(server) = servers.get(name) else {
326                return Err("MCP server not found".to_string());
327            };
328            server.clone()
329        };
330
331        if !server.enabled {
332            return Err("MCP server is disabled".to_string());
333        }
334
335        let endpoint = parse_remote_endpoint(&server.transport)
336            .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
337
338        let request_headers = effective_headers(&server);
339        let (tools, session_id) = match self
340            .discover_remote_tools(&endpoint, &request_headers)
341            .await
342        {
343            Ok(result) => result,
344            Err(err) => {
345                let mut servers = self.servers.write().await;
346                if let Some(entry) = servers.get_mut(name) {
347                    entry.connected = false;
348                    entry.pid = None;
349                    entry.last_error = Some(err.clone());
350                    entry.last_auth_challenge = None;
351                    entry.mcp_session_id = None;
352                    entry.pending_auth_by_tool.clear();
353                    entry.tool_cache.clear();
354                    entry.tools_fetched_at_ms = None;
355                }
356                drop(servers);
357                self.persist_state().await;
358                return Err(err);
359            }
360        };
361
362        let now = now_ms();
363        let cache = tools
364            .iter()
365            .map(|tool| McpToolCacheEntry {
366                tool_name: tool.tool_name.clone(),
367                description: tool.description.clone(),
368                input_schema: tool.input_schema.clone(),
369                fetched_at_ms: now,
370                schema_hash: schema_hash(&tool.input_schema),
371            })
372            .collect::<Vec<_>>();
373
374        let mut servers = self.servers.write().await;
375        if let Some(entry) = servers.get_mut(name) {
376            entry.connected = true;
377            entry.pid = None;
378            entry.last_error = None;
379            entry.last_auth_challenge = None;
380            entry.mcp_session_id = session_id;
381            entry.tool_cache = cache;
382            entry.tools_fetched_at_ms = Some(now);
383            entry.pending_auth_by_tool.clear();
384        }
385        drop(servers);
386        self.persist_state().await;
387        Ok(self.server_tools(name).await)
388    }
389
390    pub async fn disconnect(&self, name: &str) -> bool {
391        if let Some(mut child) = self.processes.lock().await.remove(name) {
392            let _ = child.kill().await;
393            let _ = child.wait().await;
394        }
395        let mut servers = self.servers.write().await;
396        if let Some(server) = servers.get_mut(name) {
397            server.connected = false;
398            server.pid = None;
399            server.last_auth_challenge = None;
400            server.mcp_session_id = None;
401            server.pending_auth_by_tool.clear();
402            drop(servers);
403            self.persist_state().await;
404            return true;
405        }
406        false
407    }
408
409    pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
410        let mut out = self
411            .servers
412            .read()
413            .await
414            .values()
415            .filter(|server| server.enabled && server.connected)
416            .flat_map(server_tool_rows)
417            .collect::<Vec<_>>();
418        out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
419        out
420    }
421
422    pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
423        let Some(server) = self.servers.read().await.get(name).cloned() else {
424            return Vec::new();
425        };
426        let mut rows = server_tool_rows(&server);
427        rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
428        rows
429    }
430
431    pub async fn call_tool(
432        &self,
433        server_name: &str,
434        tool_name: &str,
435        args: Value,
436    ) -> Result<ToolResult, String> {
437        let server = {
438            let servers = self.servers.read().await;
439            let Some(server) = servers.get(server_name) else {
440                return Err(format!("MCP server '{server_name}' not found"));
441            };
442            server.clone()
443        };
444
445        if !server.enabled {
446            return Err(format!("MCP server '{server_name}' is disabled"));
447        }
448        if !server.connected {
449            return Err(format!("MCP server '{server_name}' is not connected"));
450        }
451
452        let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
453            "MCP tools/call currently supports HTTP/S transports only".to_string()
454        })?;
455        let canonical_tool = canonical_tool_key(tool_name);
456        let now = now_ms();
457        if let Some(blocked) = pending_auth_short_circuit(
458            &server,
459            &canonical_tool,
460            tool_name,
461            now,
462            MCP_AUTH_REPROBE_COOLDOWN_MS,
463        ) {
464            return Ok(ToolResult {
465                output: blocked.output,
466                metadata: json!({
467                    "server": server_name,
468                    "tool": tool_name,
469                    "result": Value::Null,
470                    "mcpAuth": blocked.mcp_auth
471                }),
472            });
473        }
474        let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
475
476        {
477            let mut servers = self.servers.write().await;
478            if let Some(row) = servers.get_mut(server_name) {
479                if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
480                    pending.last_probe_ms = now;
481                }
482            }
483        }
484
485        let request = json!({
486            "jsonrpc": "2.0",
487            "id": format!("call-{}-{}", server_name, now_ms()),
488            "method": "tools/call",
489            "params": {
490                "name": tool_name,
491                "arguments": normalized_args
492            }
493        });
494        let (response, session_id) = post_json_rpc_with_session(
495            &endpoint,
496            &effective_headers(&server),
497            request,
498            server.mcp_session_id.as_deref(),
499        )
500        .await?;
501        if session_id.is_some() {
502            let mut servers = self.servers.write().await;
503            if let Some(row) = servers.get_mut(server_name) {
504                row.mcp_session_id = session_id;
505            }
506            drop(servers);
507            self.persist_state().await;
508        }
509
510        if let Some(err) = response.get("error") {
511            if let Some(challenge) = extract_auth_challenge(err, tool_name) {
512                let output = format!(
513                    "{}\n\nAuthorize here: {}",
514                    challenge.message, challenge.authorization_url
515                );
516                {
517                    let mut servers = self.servers.write().await;
518                    if let Some(row) = servers.get_mut(server_name) {
519                        row.last_auth_challenge = Some(challenge.clone());
520                        row.last_error = None;
521                        row.pending_auth_by_tool.insert(
522                            canonical_tool.clone(),
523                            pending_auth_from_challenge(&challenge),
524                        );
525                    }
526                }
527                self.persist_state().await;
528                return Ok(ToolResult {
529                    output,
530                    metadata: json!({
531                        "server": server_name,
532                        "tool": tool_name,
533                        "result": Value::Null,
534                        "mcpAuth": {
535                            "required": true,
536                            "challengeId": challenge.challenge_id,
537                            "tool": challenge.tool_name,
538                            "authorizationUrl": challenge.authorization_url,
539                            "message": challenge.message,
540                            "status": challenge.status
541                        }
542                    }),
543                });
544            }
545            let message = err
546                .get("message")
547                .and_then(|v| v.as_str())
548                .unwrap_or("MCP tools/call failed");
549            return Err(message.to_string());
550        }
551
552        let result = response.get("result").cloned().unwrap_or(Value::Null);
553        let auth_challenge = extract_auth_challenge(&result, tool_name);
554        let output = if let Some(challenge) = auth_challenge.as_ref() {
555            format!(
556                "{}\n\nAuthorize here: {}",
557                challenge.message, challenge.authorization_url
558            )
559        } else {
560            result
561                .get("content")
562                .map(render_mcp_content)
563                .or_else(|| result.get("output").map(|v| v.to_string()))
564                .unwrap_or_else(|| result.to_string())
565        };
566
567        {
568            let mut servers = self.servers.write().await;
569            if let Some(row) = servers.get_mut(server_name) {
570                row.last_auth_challenge = auth_challenge.clone();
571                if let Some(challenge) = auth_challenge.as_ref() {
572                    row.pending_auth_by_tool.insert(
573                        canonical_tool.clone(),
574                        pending_auth_from_challenge(challenge),
575                    );
576                } else {
577                    row.pending_auth_by_tool.remove(&canonical_tool);
578                }
579            }
580        }
581        self.persist_state().await;
582
583        let auth_metadata = auth_challenge.as_ref().map(|challenge| {
584            json!({
585                "required": true,
586                "challengeId": challenge.challenge_id,
587                "tool": challenge.tool_name,
588                "authorizationUrl": challenge.authorization_url,
589                "message": challenge.message,
590                "status": challenge.status
591            })
592        });
593
594        Ok(ToolResult {
595            output,
596            metadata: json!({
597                "server": server_name,
598                "tool": tool_name,
599                "result": result,
600                "mcpAuth": auth_metadata
601            }),
602        })
603    }
604
605    async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
606        match spawn_stdio_process(command_text).await {
607            Ok(child) => {
608                let pid = child.id();
609                self.processes.lock().await.insert(name.to_string(), child);
610                let mut servers = self.servers.write().await;
611                if let Some(server) = servers.get_mut(name) {
612                    server.connected = true;
613                    server.pid = pid;
614                    server.last_error = None;
615                    server.last_auth_challenge = None;
616                    server.pending_auth_by_tool.clear();
617                }
618                drop(servers);
619                self.persist_state().await;
620                true
621            }
622            Err(err) => {
623                let mut servers = self.servers.write().await;
624                if let Some(server) = servers.get_mut(name) {
625                    server.connected = false;
626                    server.pid = None;
627                    server.last_error = Some(err);
628                    server.last_auth_challenge = None;
629                    server.pending_auth_by_tool.clear();
630                }
631                drop(servers);
632                self.persist_state().await;
633                false
634            }
635        }
636    }
637
638    async fn discover_remote_tools(
639        &self,
640        endpoint: &str,
641        headers: &HashMap<String, String>,
642    ) -> Result<(Vec<McpRemoteTool>, Option<String>), String> {
643        let initialize = json!({
644            "jsonrpc": "2.0",
645            "id": "initialize-1",
646            "method": "initialize",
647            "params": {
648                "protocolVersion": MCP_PROTOCOL_VERSION,
649                "capabilities": {},
650                "clientInfo": {
651                    "name": MCP_CLIENT_NAME,
652                    "version": MCP_CLIENT_VERSION,
653                }
654            }
655        });
656        let (init_response, mut session_id) =
657            post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
658        if let Some(err) = init_response.get("error") {
659            let message = err
660                .get("message")
661                .and_then(|v| v.as_str())
662                .unwrap_or("MCP initialize failed");
663            return Err(message.to_string());
664        }
665
666        let tools_list = json!({
667            "jsonrpc": "2.0",
668            "id": "tools-list-1",
669            "method": "tools/list",
670            "params": {}
671        });
672        let (tools_response, next_session_id) =
673            post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
674                .await?;
675        if next_session_id.is_some() {
676            session_id = next_session_id;
677        }
678        if let Some(err) = tools_response.get("error") {
679            let message = err
680                .get("message")
681                .and_then(|v| v.as_str())
682                .unwrap_or("MCP tools/list failed");
683            return Err(message.to_string());
684        }
685
686        let tools = tools_response
687            .get("result")
688            .and_then(|v| v.get("tools"))
689            .and_then(|v| v.as_array())
690            .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
691
692        let now = now_ms();
693        let mut out = Vec::new();
694        for row in tools {
695            let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
696                continue;
697            };
698            let description = row
699                .get("description")
700                .and_then(|v| v.as_str())
701                .unwrap_or("")
702                .to_string();
703            let mut input_schema = row
704                .get("inputSchema")
705                .or_else(|| row.get("input_schema"))
706                .cloned()
707                .unwrap_or_else(|| json!({"type":"object"}));
708            normalize_tool_input_schema(&mut input_schema);
709            out.push(McpRemoteTool {
710                server_name: String::new(),
711                tool_name: tool_name.to_string(),
712                namespaced_name: String::new(),
713                description,
714                input_schema,
715                fetched_at_ms: now,
716                schema_hash: String::new(),
717            });
718        }
719
720        Ok((out, session_id))
721    }
722
723    async fn persist_state(&self) {
724        let snapshot = self.servers.read().await.clone();
725        persist_state_blocking(self.state_file.as_path(), &snapshot);
726    }
727}
728
729impl Default for McpRegistry {
730    fn default() -> Self {
731        Self::new()
732    }
733}
734
735fn default_enabled() -> bool {
736    true
737}
738
739fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
740    if let Some(parent) = path.parent() {
741        let _ = std::fs::create_dir_all(parent);
742    }
743    if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
744        let _ = std::fs::write(path, payload);
745    }
746}
747
748fn resolve_state_file() -> PathBuf {
749    if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
750        return PathBuf::from(path);
751    }
752    if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
753        let trimmed = state_dir.trim();
754        if !trimmed.is_empty() {
755            return PathBuf::from(trimmed).join("mcp_servers.json");
756        }
757    }
758    if let Some(data_dir) = dirs::data_dir() {
759        return data_dir
760            .join("tandem")
761            .join("data")
762            .join("mcp_servers.json");
763    }
764    dirs::home_dir()
765        .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
766        .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
767}
768
769fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
770    let Ok(raw) = std::fs::read_to_string(path) else {
771        return (HashMap::new(), false);
772    };
773    let mut migrated = false;
774    let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
775    for (name, server) in parsed.iter_mut() {
776        let tenant_context = local_tenant_context();
777        let (headers, secret_headers, secret_header_values, server_migrated) =
778            migrate_server_headers(name, server, &tenant_context);
779        migrated = migrated || server_migrated;
780        server.headers = headers;
781        server.secret_headers = secret_headers;
782        server.secret_header_values = secret_header_values;
783    }
784    (parsed, migrated)
785}
786
787fn migrate_server_headers(
788    server_name: &str,
789    server: &McpServer,
790    current_tenant: &TenantContext,
791) -> (
792    HashMap<String, String>,
793    HashMap<String, McpSecretRef>,
794    HashMap<String, String>,
795    bool,
796) {
797    let original_effective = effective_headers(server);
798    let mut persisted_secret_headers = server.secret_headers.clone();
799    let mut secret_header_values =
800        resolve_secret_header_values(&persisted_secret_headers, current_tenant);
801    let mut persisted_headers = server.headers.clone();
802    let mut migrated = false;
803
804    let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
805    for header_name in header_keys {
806        let Some(value) = persisted_headers.get(&header_name).cloned() else {
807            continue;
808        };
809        if persisted_secret_headers.contains_key(&header_name) {
810            continue;
811        }
812        if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
813            persisted_headers.remove(&header_name);
814            let resolved =
815                resolve_secret_ref_value(&secret_ref, current_tenant).unwrap_or_default();
816            persisted_secret_headers.insert(header_name.clone(), secret_ref);
817            if !resolved.is_empty() {
818                secret_header_values.insert(header_name.clone(), resolved);
819            }
820            migrated = true;
821            continue;
822        }
823        if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
824            let secret_id = mcp_header_secret_id(server_name, &header_name);
825            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
826                persisted_headers.remove(&header_name);
827                persisted_secret_headers.insert(
828                    header_name.clone(),
829                    McpSecretRef::Store {
830                        secret_id: secret_id.clone(),
831                        tenant_context: current_tenant.clone(),
832                    },
833                );
834                secret_header_values.insert(header_name.clone(), value);
835                migrated = true;
836            }
837        }
838    }
839
840    if !migrated {
841        let effective = combine_headers(&persisted_headers, &secret_header_values);
842        migrated = effective != original_effective;
843    }
844
845    (
846        persisted_headers,
847        persisted_secret_headers,
848        secret_header_values,
849        migrated,
850    )
851}
852
853fn split_headers_for_storage(
854    server_name: &str,
855    headers: HashMap<String, String>,
856    explicit_secret_headers: HashMap<String, McpSecretRef>,
857    current_tenant: &TenantContext,
858) -> (
859    HashMap<String, String>,
860    HashMap<String, McpSecretRef>,
861    HashMap<String, String>,
862) {
863    let mut persisted_headers = HashMap::new();
864    let mut persisted_secret_headers = HashMap::new();
865    let mut secret_header_values = HashMap::new();
866
867    for (header_name, raw_value) in headers {
868        let value = raw_value.trim().to_string();
869        if value.is_empty() {
870            continue;
871        }
872        if let Some(secret_ref) = parse_secret_header_reference(&value) {
873            if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
874                secret_header_values.insert(header_name.clone(), resolved);
875            }
876            persisted_secret_headers.insert(header_name, secret_ref);
877            continue;
878        }
879        if header_name_is_sensitive(&header_name) {
880            let secret_id = mcp_header_secret_id(server_name, &header_name);
881            if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
882                persisted_secret_headers.insert(
883                    header_name.clone(),
884                    McpSecretRef::Store {
885                        secret_id: secret_id.clone(),
886                        tenant_context: current_tenant.clone(),
887                    },
888                );
889                secret_header_values.insert(header_name, value);
890                continue;
891            }
892        }
893        persisted_headers.insert(header_name, value);
894    }
895
896    for (header_name, secret_ref) in explicit_secret_headers {
897        if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
898            secret_header_values.insert(header_name.clone(), resolved);
899        }
900        persisted_headers.remove(&header_name);
901        persisted_secret_headers.insert(header_name, secret_ref);
902    }
903
904    (
905        persisted_headers,
906        persisted_secret_headers,
907        secret_header_values,
908    )
909}
910
911fn combine_headers(
912    headers: &HashMap<String, String>,
913    secret_header_values: &HashMap<String, String>,
914) -> HashMap<String, String> {
915    let mut combined = headers.clone();
916    for (key, value) in secret_header_values {
917        if !value.trim().is_empty() {
918            combined.insert(key.clone(), value.clone());
919        }
920    }
921    combined
922}
923
924fn effective_headers(server: &McpServer) -> HashMap<String, String> {
925    combine_headers(&server.headers, &server.secret_header_values)
926}
927
928fn redacted_server_view(server: &McpServer) -> McpServer {
929    let mut clone = server.clone();
930    for (header_name, secret_ref) in &clone.secret_headers {
931        clone.headers.insert(
932            header_name.clone(),
933            redacted_secret_header_value(secret_ref),
934        );
935    }
936    clone.secret_header_values.clear();
937    clone
938}
939
940fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
941    match secret_ref {
942        McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
943        McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
944    }
945}
946
947fn resolve_secret_header_values(
948    secret_headers: &HashMap<String, McpSecretRef>,
949    current_tenant: &TenantContext,
950) -> HashMap<String, String> {
951    let mut out = HashMap::new();
952    for (header_name, secret_ref) in secret_headers {
953        if let Some(value) = resolve_secret_ref_value(secret_ref, current_tenant) {
954            if !value.trim().is_empty() {
955                out.insert(header_name.clone(), value);
956            }
957        }
958    }
959    out
960}
961
962fn delete_secret_header_refs(
963    secret_headers: &HashMap<String, McpSecretRef>,
964    current_tenant: &TenantContext,
965) {
966    for secret_ref in secret_headers.values() {
967        if let McpSecretRef::Store {
968            secret_id,
969            tenant_context,
970        } = secret_ref
971        {
972            if tenant_context != current_tenant {
973                continue;
974            }
975            let _ = tandem_core::delete_provider_auth(secret_id);
976        }
977    }
978}
979