1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::{LocalImplicitTenant, SecretRef, TenantContext, ToolResult};
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22 pub tool_name: String,
23 pub description: String,
24 #[serde(default)]
25 pub input_schema: Value,
26 pub fetched_at_ms: u64,
27 pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32 pub name: String,
33 pub transport: String,
34 #[serde(default = "default_enabled")]
35 pub enabled: bool,
36 pub connected: bool,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub pid: Option<u32>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub last_error: Option<String>,
41 #[serde(default, skip_serializing_if = "Option::is_none")]
42 pub last_auth_challenge: Option<McpAuthChallenge>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub mcp_session_id: Option<String>,
45 #[serde(default)]
46 pub headers: HashMap<String, String>,
47 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
48 pub secret_headers: HashMap<String, McpSecretRef>,
49 #[serde(default)]
50 pub tool_cache: Vec<McpToolCacheEntry>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub tools_fetched_at_ms: Option<u64>,
53 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
54 pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
55 #[serde(default, skip)]
56 pub secret_header_values: HashMap<String, String>,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum McpSecretRef {
62 Store {
63 secret_id: String,
64 #[serde(default)]
65 tenant_context: TenantContext,
66 },
67 Env {
68 env: String,
69 },
70 BearerEnv {
71 env: String,
72 },
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct McpAuthChallenge {
77 pub challenge_id: String,
78 pub tool_name: String,
79 pub authorization_url: String,
80 pub message: String,
81 pub requested_at_ms: u64,
82 pub status: String,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct PendingMcpAuth {
87 pub challenge_id: String,
88 pub authorization_url: String,
89 pub message: String,
90 pub status: String,
91 pub first_seen_ms: u64,
92 pub last_probe_ms: u64,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct McpRemoteTool {
97 pub server_name: String,
98 pub tool_name: String,
99 pub namespaced_name: String,
100 pub description: String,
101 #[serde(default)]
102 pub input_schema: Value,
103 pub fetched_at_ms: u64,
104 pub schema_hash: String,
105}
106
107#[derive(Clone)]
108pub struct McpRegistry {
109 servers: Arc<RwLock<HashMap<String, McpServer>>>,
110 processes: Arc<Mutex<HashMap<String, Child>>>,
111 state_file: Arc<PathBuf>,
112}
113
114impl McpRegistry {
115 pub fn new() -> Self {
116 Self::new_with_state_file(resolve_state_file())
117 }
118
119 pub fn new_with_state_file(state_file: PathBuf) -> Self {
120 let (loaded_state, migrated) = load_state(&state_file);
121 let loaded = loaded_state
122 .into_iter()
123 .map(|(k, mut v)| {
124 v.connected = false;
125 v.pid = None;
126 if v.name.trim().is_empty() {
127 v.name = k.clone();
128 }
129 if v.headers.is_empty() {
130 v.headers = HashMap::new();
131 }
132 if v.secret_headers.is_empty() {
133 v.secret_headers = HashMap::new();
134 }
135 let tenant_context = local_tenant_context();
136 v.secret_header_values =
137 resolve_secret_header_values(&v.secret_headers, &tenant_context);
138 (k, v)
139 })
140 .collect::<HashMap<_, _>>();
141 if migrated {
142 persist_state_blocking(&state_file, &loaded);
143 }
144 Self {
145 servers: Arc::new(RwLock::new(loaded)),
146 processes: Arc::new(Mutex::new(HashMap::new())),
147 state_file: Arc::new(state_file),
148 }
149 }
150
151 pub async fn list(&self) -> HashMap<String, McpServer> {
152 self.servers.read().await.clone()
153 }
154
155 pub async fn list_public(&self) -> HashMap<String, McpServer> {
156 self.servers
157 .read()
158 .await
159 .iter()
160 .map(|(name, server)| (name.clone(), redacted_server_view(server)))
161 .collect()
162 }
163
164 pub async fn add(&self, name: String, transport: String) {
165 self.add_or_update(name, transport, HashMap::new(), true)
166 .await;
167 }
168
169 pub async fn add_or_update(
170 &self,
171 name: String,
172 transport: String,
173 headers: HashMap<String, String>,
174 enabled: bool,
175 ) {
176 self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
177 .await;
178 }
179
180 pub async fn add_or_update_with_secret_refs(
181 &self,
182 name: String,
183 transport: String,
184 headers: HashMap<String, String>,
185 secret_headers: HashMap<String, McpSecretRef>,
186 enabled: bool,
187 ) {
188 let normalized_name = name.trim().to_string();
189 let tenant_context = local_tenant_context();
190 let (persisted_headers, persisted_secret_headers, secret_header_values) =
191 split_headers_for_storage(&normalized_name, headers, secret_headers, &tenant_context);
192 let mut servers = self.servers.write().await;
193 let existing = servers.get(&normalized_name).cloned();
194 let preserve_cache = existing.as_ref().is_some_and(|row| {
195 row.transport == transport
196 && effective_headers(row)
197 == combine_headers(&persisted_headers, &secret_header_values)
198 });
199 let existing_tool_cache = if preserve_cache {
200 existing
201 .as_ref()
202 .map(|row| row.tool_cache.clone())
203 .unwrap_or_default()
204 } else {
205 Vec::new()
206 };
207 let existing_fetched_at = if preserve_cache {
208 existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
209 } else {
210 None
211 };
212 let server = McpServer {
213 name: normalized_name.clone(),
214 transport,
215 enabled,
216 connected: false,
217 pid: None,
218 last_error: None,
219 last_auth_challenge: None,
220 mcp_session_id: None,
221 headers: persisted_headers,
222 secret_headers: persisted_secret_headers,
223 tool_cache: existing_tool_cache,
224 tools_fetched_at_ms: existing_fetched_at,
225 pending_auth_by_tool: HashMap::new(),
226 secret_header_values,
227 };
228 servers.insert(normalized_name, server);
229 drop(servers);
230 self.persist_state().await;
231 }
232
233 pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
234 let mut servers = self.servers.write().await;
235 let Some(server) = servers.get_mut(name) else {
236 return false;
237 };
238 server.enabled = enabled;
239 if !enabled {
240 server.connected = false;
241 server.pid = None;
242 server.last_auth_challenge = None;
243 server.mcp_session_id = None;
244 server.pending_auth_by_tool.clear();
245 }
246 drop(servers);
247 if !enabled {
248 if let Some(mut child) = self.processes.lock().await.remove(name) {
249 let _ = child.kill().await;
250 let _ = child.wait().await;
251 }
252 }
253 self.persist_state().await;
254 true
255 }
256
257 pub async fn remove(&self, name: &str) -> bool {
258 let removed_server = {
259 let mut servers = self.servers.write().await;
260 servers.remove(name)
261 };
262 let Some(server) = removed_server else {
263 return false;
264 };
265 let current_tenant = local_tenant_context();
266 delete_secret_header_refs(&server.secret_headers, ¤t_tenant);
267
268 if let Some(mut child) = self.processes.lock().await.remove(name) {
269 let _ = child.kill().await;
270 let _ = child.wait().await;
271 }
272 self.persist_state().await;
273 true
274 }
275
276 pub async fn connect(&self, name: &str) -> bool {
277 let server = {
278 let servers = self.servers.read().await;
279 let Some(server) = servers.get(name) else {
280 return false;
281 };
282 server.clone()
283 };
284
285 if !server.enabled {
286 let mut servers = self.servers.write().await;
287 if let Some(entry) = servers.get_mut(name) {
288 entry.connected = false;
289 entry.pid = None;
290 entry.last_error = Some("MCP server is disabled".to_string());
291 entry.last_auth_challenge = None;
292 entry.mcp_session_id = None;
293 entry.pending_auth_by_tool.clear();
294 }
295 drop(servers);
296 self.persist_state().await;
297 return false;
298 }
299
300 if let Some(command_text) = parse_stdio_transport(&server.transport) {
301 return self.connect_stdio(name, command_text).await;
302 }
303
304 if parse_remote_endpoint(&server.transport).is_some() {
305 return self.refresh(name).await.is_ok();
306 }
307
308 let mut servers = self.servers.write().await;
309 if let Some(entry) = servers.get_mut(name) {
310 entry.connected = true;
311 entry.pid = None;
312 entry.last_error = None;
313 entry.last_auth_challenge = None;
314 entry.mcp_session_id = None;
315 entry.pending_auth_by_tool.clear();
316 }
317 drop(servers);
318 self.persist_state().await;
319 true
320 }
321
322 pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
323 let server = {
324 let servers = self.servers.read().await;
325 let Some(server) = servers.get(name) else {
326 return Err("MCP server not found".to_string());
327 };
328 server.clone()
329 };
330
331 if !server.enabled {
332 return Err("MCP server is disabled".to_string());
333 }
334
335 let endpoint = parse_remote_endpoint(&server.transport)
336 .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
337
338 let request_headers = effective_headers(&server);
339 let (tools, session_id) = match self
340 .discover_remote_tools(&endpoint, &request_headers)
341 .await
342 {
343 Ok(result) => result,
344 Err(err) => {
345 let mut servers = self.servers.write().await;
346 if let Some(entry) = servers.get_mut(name) {
347 entry.connected = false;
348 entry.pid = None;
349 entry.last_error = Some(err.clone());
350 entry.last_auth_challenge = None;
351 entry.mcp_session_id = None;
352 entry.pending_auth_by_tool.clear();
353 entry.tool_cache.clear();
354 entry.tools_fetched_at_ms = None;
355 }
356 drop(servers);
357 self.persist_state().await;
358 return Err(err);
359 }
360 };
361
362 let now = now_ms();
363 let cache = tools
364 .iter()
365 .map(|tool| McpToolCacheEntry {
366 tool_name: tool.tool_name.clone(),
367 description: tool.description.clone(),
368 input_schema: tool.input_schema.clone(),
369 fetched_at_ms: now,
370 schema_hash: schema_hash(&tool.input_schema),
371 })
372 .collect::<Vec<_>>();
373
374 let mut servers = self.servers.write().await;
375 if let Some(entry) = servers.get_mut(name) {
376 entry.connected = true;
377 entry.pid = None;
378 entry.last_error = None;
379 entry.last_auth_challenge = None;
380 entry.mcp_session_id = session_id;
381 entry.tool_cache = cache;
382 entry.tools_fetched_at_ms = Some(now);
383 entry.pending_auth_by_tool.clear();
384 }
385 drop(servers);
386 self.persist_state().await;
387 Ok(self.server_tools(name).await)
388 }
389
390 pub async fn disconnect(&self, name: &str) -> bool {
391 if let Some(mut child) = self.processes.lock().await.remove(name) {
392 let _ = child.kill().await;
393 let _ = child.wait().await;
394 }
395 let mut servers = self.servers.write().await;
396 if let Some(server) = servers.get_mut(name) {
397 server.connected = false;
398 server.pid = None;
399 server.last_auth_challenge = None;
400 server.mcp_session_id = None;
401 server.pending_auth_by_tool.clear();
402 drop(servers);
403 self.persist_state().await;
404 return true;
405 }
406 false
407 }
408
409 pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
410 let mut out = self
411 .servers
412 .read()
413 .await
414 .values()
415 .filter(|server| server.enabled && server.connected)
416 .flat_map(server_tool_rows)
417 .collect::<Vec<_>>();
418 out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
419 out
420 }
421
422 pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
423 let Some(server) = self.servers.read().await.get(name).cloned() else {
424 return Vec::new();
425 };
426 let mut rows = server_tool_rows(&server);
427 rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
428 rows
429 }
430
431 pub async fn call_tool(
432 &self,
433 server_name: &str,
434 tool_name: &str,
435 args: Value,
436 ) -> Result<ToolResult, String> {
437 let server = {
438 let servers = self.servers.read().await;
439 let Some(server) = servers.get(server_name) else {
440 return Err(format!("MCP server '{server_name}' not found"));
441 };
442 server.clone()
443 };
444
445 if !server.enabled {
446 return Err(format!("MCP server '{server_name}' is disabled"));
447 }
448 if !server.connected {
449 return Err(format!("MCP server '{server_name}' is not connected"));
450 }
451
452 let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
453 "MCP tools/call currently supports HTTP/S transports only".to_string()
454 })?;
455 let canonical_tool = canonical_tool_key(tool_name);
456 let now = now_ms();
457 if let Some(blocked) = pending_auth_short_circuit(
458 &server,
459 &canonical_tool,
460 tool_name,
461 now,
462 MCP_AUTH_REPROBE_COOLDOWN_MS,
463 ) {
464 return Ok(ToolResult {
465 output: blocked.output,
466 metadata: json!({
467 "server": server_name,
468 "tool": tool_name,
469 "result": Value::Null,
470 "mcpAuth": blocked.mcp_auth
471 }),
472 });
473 }
474 let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
475
476 {
477 let mut servers = self.servers.write().await;
478 if let Some(row) = servers.get_mut(server_name) {
479 if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
480 pending.last_probe_ms = now;
481 }
482 }
483 }
484
485 let request = json!({
486 "jsonrpc": "2.0",
487 "id": format!("call-{}-{}", server_name, now_ms()),
488 "method": "tools/call",
489 "params": {
490 "name": tool_name,
491 "arguments": normalized_args
492 }
493 });
494 let (response, session_id) = post_json_rpc_with_session(
495 &endpoint,
496 &effective_headers(&server),
497 request,
498 server.mcp_session_id.as_deref(),
499 )
500 .await?;
501 if session_id.is_some() {
502 let mut servers = self.servers.write().await;
503 if let Some(row) = servers.get_mut(server_name) {
504 row.mcp_session_id = session_id;
505 }
506 drop(servers);
507 self.persist_state().await;
508 }
509
510 if let Some(err) = response.get("error") {
511 if let Some(challenge) = extract_auth_challenge(err, tool_name) {
512 let output = format!(
513 "{}\n\nAuthorize here: {}",
514 challenge.message, challenge.authorization_url
515 );
516 {
517 let mut servers = self.servers.write().await;
518 if let Some(row) = servers.get_mut(server_name) {
519 row.last_auth_challenge = Some(challenge.clone());
520 row.last_error = None;
521 row.pending_auth_by_tool.insert(
522 canonical_tool.clone(),
523 pending_auth_from_challenge(&challenge),
524 );
525 }
526 }
527 self.persist_state().await;
528 return Ok(ToolResult {
529 output,
530 metadata: json!({
531 "server": server_name,
532 "tool": tool_name,
533 "result": Value::Null,
534 "mcpAuth": {
535 "required": true,
536 "challengeId": challenge.challenge_id,
537 "tool": challenge.tool_name,
538 "authorizationUrl": challenge.authorization_url,
539 "message": challenge.message,
540 "status": challenge.status
541 }
542 }),
543 });
544 }
545 let message = err
546 .get("message")
547 .and_then(|v| v.as_str())
548 .unwrap_or("MCP tools/call failed");
549 return Err(message.to_string());
550 }
551
552 let result = response.get("result").cloned().unwrap_or(Value::Null);
553 let auth_challenge = extract_auth_challenge(&result, tool_name);
554 let output = if let Some(challenge) = auth_challenge.as_ref() {
555 format!(
556 "{}\n\nAuthorize here: {}",
557 challenge.message, challenge.authorization_url
558 )
559 } else {
560 result
561 .get("content")
562 .map(render_mcp_content)
563 .or_else(|| result.get("output").map(|v| v.to_string()))
564 .unwrap_or_else(|| result.to_string())
565 };
566
567 {
568 let mut servers = self.servers.write().await;
569 if let Some(row) = servers.get_mut(server_name) {
570 row.last_auth_challenge = auth_challenge.clone();
571 if let Some(challenge) = auth_challenge.as_ref() {
572 row.pending_auth_by_tool.insert(
573 canonical_tool.clone(),
574 pending_auth_from_challenge(challenge),
575 );
576 } else {
577 row.pending_auth_by_tool.remove(&canonical_tool);
578 }
579 }
580 }
581 self.persist_state().await;
582
583 let auth_metadata = auth_challenge.as_ref().map(|challenge| {
584 json!({
585 "required": true,
586 "challengeId": challenge.challenge_id,
587 "tool": challenge.tool_name,
588 "authorizationUrl": challenge.authorization_url,
589 "message": challenge.message,
590 "status": challenge.status
591 })
592 });
593
594 Ok(ToolResult {
595 output,
596 metadata: json!({
597 "server": server_name,
598 "tool": tool_name,
599 "result": result,
600 "mcpAuth": auth_metadata
601 }),
602 })
603 }
604
605 async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
606 match spawn_stdio_process(command_text).await {
607 Ok(child) => {
608 let pid = child.id();
609 self.processes.lock().await.insert(name.to_string(), child);
610 let mut servers = self.servers.write().await;
611 if let Some(server) = servers.get_mut(name) {
612 server.connected = true;
613 server.pid = pid;
614 server.last_error = None;
615 server.last_auth_challenge = None;
616 server.pending_auth_by_tool.clear();
617 }
618 drop(servers);
619 self.persist_state().await;
620 true
621 }
622 Err(err) => {
623 let mut servers = self.servers.write().await;
624 if let Some(server) = servers.get_mut(name) {
625 server.connected = false;
626 server.pid = None;
627 server.last_error = Some(err);
628 server.last_auth_challenge = None;
629 server.pending_auth_by_tool.clear();
630 }
631 drop(servers);
632 self.persist_state().await;
633 false
634 }
635 }
636 }
637
638 async fn discover_remote_tools(
639 &self,
640 endpoint: &str,
641 headers: &HashMap<String, String>,
642 ) -> Result<(Vec<McpRemoteTool>, Option<String>), String> {
643 let initialize = json!({
644 "jsonrpc": "2.0",
645 "id": "initialize-1",
646 "method": "initialize",
647 "params": {
648 "protocolVersion": MCP_PROTOCOL_VERSION,
649 "capabilities": {},
650 "clientInfo": {
651 "name": MCP_CLIENT_NAME,
652 "version": MCP_CLIENT_VERSION,
653 }
654 }
655 });
656 let (init_response, mut session_id) =
657 post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
658 if let Some(err) = init_response.get("error") {
659 let message = err
660 .get("message")
661 .and_then(|v| v.as_str())
662 .unwrap_or("MCP initialize failed");
663 return Err(message.to_string());
664 }
665
666 let tools_list = json!({
667 "jsonrpc": "2.0",
668 "id": "tools-list-1",
669 "method": "tools/list",
670 "params": {}
671 });
672 let (tools_response, next_session_id) =
673 post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
674 .await?;
675 if next_session_id.is_some() {
676 session_id = next_session_id;
677 }
678 if let Some(err) = tools_response.get("error") {
679 let message = err
680 .get("message")
681 .and_then(|v| v.as_str())
682 .unwrap_or("MCP tools/list failed");
683 return Err(message.to_string());
684 }
685
686 let tools = tools_response
687 .get("result")
688 .and_then(|v| v.get("tools"))
689 .and_then(|v| v.as_array())
690 .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
691
692 let now = now_ms();
693 let mut out = Vec::new();
694 for row in tools {
695 let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
696 continue;
697 };
698 let description = row
699 .get("description")
700 .and_then(|v| v.as_str())
701 .unwrap_or("")
702 .to_string();
703 let mut input_schema = row
704 .get("inputSchema")
705 .or_else(|| row.get("input_schema"))
706 .cloned()
707 .unwrap_or_else(|| json!({"type":"object"}));
708 normalize_tool_input_schema(&mut input_schema);
709 out.push(McpRemoteTool {
710 server_name: String::new(),
711 tool_name: tool_name.to_string(),
712 namespaced_name: String::new(),
713 description,
714 input_schema,
715 fetched_at_ms: now,
716 schema_hash: String::new(),
717 });
718 }
719
720 Ok((out, session_id))
721 }
722
723 async fn persist_state(&self) {
724 let snapshot = self.servers.read().await.clone();
725 persist_state_blocking(self.state_file.as_path(), &snapshot);
726 }
727}
728
729impl Default for McpRegistry {
730 fn default() -> Self {
731 Self::new()
732 }
733}
734
735fn default_enabled() -> bool {
736 true
737}
738
739fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
740 if let Some(parent) = path.parent() {
741 let _ = std::fs::create_dir_all(parent);
742 }
743 if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
744 let _ = std::fs::write(path, payload);
745 }
746}
747
748fn resolve_state_file() -> PathBuf {
749 if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
750 return PathBuf::from(path);
751 }
752 if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
753 let trimmed = state_dir.trim();
754 if !trimmed.is_empty() {
755 return PathBuf::from(trimmed).join("mcp_servers.json");
756 }
757 }
758 if let Some(data_dir) = dirs::data_dir() {
759 return data_dir
760 .join("tandem")
761 .join("data")
762 .join("mcp_servers.json");
763 }
764 dirs::home_dir()
765 .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
766 .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
767}
768
769fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
770 let Ok(raw) = std::fs::read_to_string(path) else {
771 return (HashMap::new(), false);
772 };
773 let mut migrated = false;
774 let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
775 for (name, server) in parsed.iter_mut() {
776 let tenant_context = local_tenant_context();
777 let (headers, secret_headers, secret_header_values, server_migrated) =
778 migrate_server_headers(name, server, &tenant_context);
779 migrated = migrated || server_migrated;
780 server.headers = headers;
781 server.secret_headers = secret_headers;
782 server.secret_header_values = secret_header_values;
783 }
784 (parsed, migrated)
785}
786
787fn migrate_server_headers(
788 server_name: &str,
789 server: &McpServer,
790 current_tenant: &TenantContext,
791) -> (
792 HashMap<String, String>,
793 HashMap<String, McpSecretRef>,
794 HashMap<String, String>,
795 bool,
796) {
797 let original_effective = effective_headers(server);
798 let mut persisted_secret_headers = server.secret_headers.clone();
799 let mut secret_header_values =
800 resolve_secret_header_values(&persisted_secret_headers, current_tenant);
801 let mut persisted_headers = server.headers.clone();
802 let mut migrated = false;
803
804 let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
805 for header_name in header_keys {
806 let Some(value) = persisted_headers.get(&header_name).cloned() else {
807 continue;
808 };
809 if persisted_secret_headers.contains_key(&header_name) {
810 continue;
811 }
812 if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
813 persisted_headers.remove(&header_name);
814 let resolved =
815 resolve_secret_ref_value(&secret_ref, current_tenant).unwrap_or_default();
816 persisted_secret_headers.insert(header_name.clone(), secret_ref);
817 if !resolved.is_empty() {
818 secret_header_values.insert(header_name.clone(), resolved);
819 }
820 migrated = true;
821 continue;
822 }
823 if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
824 let secret_id = mcp_header_secret_id(server_name, &header_name);
825 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
826 persisted_headers.remove(&header_name);
827 persisted_secret_headers.insert(
828 header_name.clone(),
829 McpSecretRef::Store {
830 secret_id: secret_id.clone(),
831 tenant_context: current_tenant.clone(),
832 },
833 );
834 secret_header_values.insert(header_name.clone(), value);
835 migrated = true;
836 }
837 }
838 }
839
840 if !migrated {
841 let effective = combine_headers(&persisted_headers, &secret_header_values);
842 migrated = effective != original_effective;
843 }
844
845 (
846 persisted_headers,
847 persisted_secret_headers,
848 secret_header_values,
849 migrated,
850 )
851}
852
853fn split_headers_for_storage(
854 server_name: &str,
855 headers: HashMap<String, String>,
856 explicit_secret_headers: HashMap<String, McpSecretRef>,
857 current_tenant: &TenantContext,
858) -> (
859 HashMap<String, String>,
860 HashMap<String, McpSecretRef>,
861 HashMap<String, String>,
862) {
863 let mut persisted_headers = HashMap::new();
864 let mut persisted_secret_headers = HashMap::new();
865 let mut secret_header_values = HashMap::new();
866
867 for (header_name, raw_value) in headers {
868 let value = raw_value.trim().to_string();
869 if value.is_empty() {
870 continue;
871 }
872 if let Some(secret_ref) = parse_secret_header_reference(&value) {
873 if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
874 secret_header_values.insert(header_name.clone(), resolved);
875 }
876 persisted_secret_headers.insert(header_name, secret_ref);
877 continue;
878 }
879 if header_name_is_sensitive(&header_name) {
880 let secret_id = mcp_header_secret_id(server_name, &header_name);
881 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
882 persisted_secret_headers.insert(
883 header_name.clone(),
884 McpSecretRef::Store {
885 secret_id: secret_id.clone(),
886 tenant_context: current_tenant.clone(),
887 },
888 );
889 secret_header_values.insert(header_name, value);
890 continue;
891 }
892 }
893 persisted_headers.insert(header_name, value);
894 }
895
896 for (header_name, secret_ref) in explicit_secret_headers {
897 if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
898 secret_header_values.insert(header_name.clone(), resolved);
899 }
900 persisted_headers.remove(&header_name);
901 persisted_secret_headers.insert(header_name, secret_ref);
902 }
903
904 (
905 persisted_headers,
906 persisted_secret_headers,
907 secret_header_values,
908 )
909}
910
911fn combine_headers(
912 headers: &HashMap<String, String>,
913 secret_header_values: &HashMap<String, String>,
914) -> HashMap<String, String> {
915 let mut combined = headers.clone();
916 for (key, value) in secret_header_values {
917 if !value.trim().is_empty() {
918 combined.insert(key.clone(), value.clone());
919 }
920 }
921 combined
922}
923
924fn effective_headers(server: &McpServer) -> HashMap<String, String> {
925 combine_headers(&server.headers, &server.secret_header_values)
926}
927
928fn redacted_server_view(server: &McpServer) -> McpServer {
929 let mut clone = server.clone();
930 for (header_name, secret_ref) in &clone.secret_headers {
931 clone.headers.insert(
932 header_name.clone(),
933 redacted_secret_header_value(secret_ref),
934 );
935 }
936 clone.secret_header_values.clear();
937 clone
938}
939
940fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
941 match secret_ref {
942 McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
943 McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
944 }
945}
946
947fn resolve_secret_header_values(
948 secret_headers: &HashMap<String, McpSecretRef>,
949 current_tenant: &TenantContext,
950) -> HashMap<String, String> {
951 let mut out = HashMap::new();
952 for (header_name, secret_ref) in secret_headers {
953 if let Some(value) = resolve_secret_ref_value(secret_ref, current_tenant) {
954 if !value.trim().is_empty() {
955 out.insert(header_name.clone(), value);
956 }
957 }
958 }
959 out
960}
961
962fn delete_secret_header_refs(
963 secret_headers: &HashMap<String, McpSecretRef>,
964 current_tenant: &TenantContext,
965) {
966 for secret_ref in secret_headers.values() {
967 if let McpSecretRef::Store {
968 secret_id,
969 tenant_context,
970 } = secret_ref
971 {
972 if tenant_context != current_tenant {
973 continue;
974 }
975 let _ = tandem_core::delete_provider_auth(secret_id);
976 }
977 }
978}
979