1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use reqwest::header::{HeaderMap, HeaderName, HeaderValue, ACCEPT, CONTENT_TYPE};
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use sha2::{Digest, Sha256};
10use tandem_types::{LocalImplicitTenant, SecretRef, TenantContext, ToolResult};
11use tokio::process::{Child, Command};
12use tokio::sync::{Mutex, RwLock};
13
14const MCP_PROTOCOL_VERSION: &str = "2025-11-25";
15const MCP_CLIENT_NAME: &str = "tandem";
16const MCP_CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
17const MCP_AUTH_REPROBE_COOLDOWN_MS: u64 = 15_000;
18const MCP_SECRET_PLACEHOLDER: &str = "";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct McpToolCacheEntry {
22 pub tool_name: String,
23 pub description: String,
24 #[serde(default)]
25 pub input_schema: Value,
26 pub fetched_at_ms: u64,
27 pub schema_hash: String,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct McpServer {
32 pub name: String,
33 pub transport: String,
34 #[serde(default, skip_serializing_if = "String::is_empty")]
35 pub auth_kind: String,
36 #[serde(default = "default_enabled")]
37 pub enabled: bool,
38 pub connected: bool,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub pid: Option<u32>,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub last_error: Option<String>,
43 #[serde(default, skip_serializing_if = "Option::is_none")]
44 pub last_auth_challenge: Option<McpAuthChallenge>,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub mcp_session_id: Option<String>,
47 #[serde(default)]
48 pub headers: HashMap<String, String>,
49 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
50 pub secret_headers: HashMap<String, McpSecretRef>,
51 #[serde(default)]
52 pub tool_cache: Vec<McpToolCacheEntry>,
53 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub tools_fetched_at_ms: Option<u64>,
55 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
56 pub pending_auth_by_tool: HashMap<String, PendingMcpAuth>,
57 #[serde(default, skip)]
58 pub secret_header_values: HashMap<String, String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(tag = "type", rename_all = "snake_case")]
63pub enum McpSecretRef {
64 Store {
65 secret_id: String,
66 #[serde(default)]
67 tenant_context: TenantContext,
68 },
69 Env {
70 env: String,
71 },
72 BearerEnv {
73 env: String,
74 },
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct McpAuthChallenge {
79 pub challenge_id: String,
80 pub tool_name: String,
81 pub authorization_url: String,
82 pub message: String,
83 pub requested_at_ms: u64,
84 pub status: String,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct PendingMcpAuth {
89 pub challenge_id: String,
90 pub authorization_url: String,
91 pub message: String,
92 pub status: String,
93 pub first_seen_ms: u64,
94 pub last_probe_ms: u64,
95}
96
97#[derive(Debug, Clone)]
98enum DiscoverRemoteToolsError {
99 Message(String),
100 AuthChallenge(McpAuthChallenge),
101}
102
103impl From<String> for DiscoverRemoteToolsError {
104 fn from(value: String) -> Self {
105 Self::Message(value)
106 }
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110pub struct McpRemoteTool {
111 pub server_name: String,
112 pub tool_name: String,
113 pub namespaced_name: String,
114 pub description: String,
115 #[serde(default)]
116 pub input_schema: Value,
117 pub fetched_at_ms: u64,
118 pub schema_hash: String,
119}
120
121#[derive(Clone)]
122pub struct McpRegistry {
123 servers: Arc<RwLock<HashMap<String, McpServer>>>,
124 processes: Arc<Mutex<HashMap<String, Child>>>,
125 state_file: Arc<PathBuf>,
126}
127
128impl McpRegistry {
129 pub fn new() -> Self {
130 Self::new_with_state_file(resolve_state_file())
131 }
132
133 pub fn new_with_state_file(state_file: PathBuf) -> Self {
134 let (loaded_state, migrated) = load_state(&state_file);
135 let loaded = loaded_state
136 .into_iter()
137 .map(|(k, mut v)| {
138 v.connected = false;
139 v.pid = None;
140 if v.name.trim().is_empty() {
141 v.name = k.clone();
142 }
143 if v.headers.is_empty() {
144 v.headers = HashMap::new();
145 }
146 if v.secret_headers.is_empty() {
147 v.secret_headers = HashMap::new();
148 }
149 let tenant_context = local_tenant_context();
150 v.secret_header_values =
151 resolve_secret_header_values(&v.secret_headers, &tenant_context);
152 (k, v)
153 })
154 .collect::<HashMap<_, _>>();
155 if migrated {
156 persist_state_blocking(&state_file, &loaded);
157 }
158 Self {
159 servers: Arc::new(RwLock::new(loaded)),
160 processes: Arc::new(Mutex::new(HashMap::new())),
161 state_file: Arc::new(state_file),
162 }
163 }
164
165 pub async fn list(&self) -> HashMap<String, McpServer> {
166 self.servers.read().await.clone()
167 }
168
169 pub async fn list_public(&self) -> HashMap<String, McpServer> {
170 self.servers
171 .read()
172 .await
173 .iter()
174 .map(|(name, server)| (name.clone(), redacted_server_view(server)))
175 .collect()
176 }
177
178 pub async fn add(&self, name: String, transport: String) {
179 self.add_or_update(name, transport, HashMap::new(), true)
180 .await;
181 }
182
183 pub async fn add_or_update(
184 &self,
185 name: String,
186 transport: String,
187 headers: HashMap<String, String>,
188 enabled: bool,
189 ) {
190 self.add_or_update_with_secret_refs(name, transport, headers, HashMap::new(), enabled)
191 .await;
192 }
193
194 pub async fn add_or_update_with_secret_refs(
195 &self,
196 name: String,
197 transport: String,
198 headers: HashMap<String, String>,
199 secret_headers: HashMap<String, McpSecretRef>,
200 enabled: bool,
201 ) {
202 let normalized_name = name.trim().to_string();
203 let tenant_context = local_tenant_context();
204 let (persisted_headers, persisted_secret_headers, secret_header_values) =
205 split_headers_for_storage(&normalized_name, headers, secret_headers, &tenant_context);
206 let mut servers = self.servers.write().await;
207 let existing = servers.get(&normalized_name).cloned();
208 let preserve_cache = existing.as_ref().is_some_and(|row| {
209 row.transport == transport
210 && effective_headers(row)
211 == combine_headers(&persisted_headers, &secret_header_values)
212 });
213 let existing_tool_cache = if preserve_cache {
214 existing
215 .as_ref()
216 .map(|row| row.tool_cache.clone())
217 .unwrap_or_default()
218 } else {
219 Vec::new()
220 };
221 let existing_fetched_at = if preserve_cache {
222 existing.as_ref().and_then(|row| row.tools_fetched_at_ms)
223 } else {
224 None
225 };
226 let server = McpServer {
227 name: normalized_name.clone(),
228 transport,
229 auth_kind: existing
230 .as_ref()
231 .map(|row| row.auth_kind.clone())
232 .unwrap_or_default(),
233 enabled,
234 connected: false,
235 pid: None,
236 last_error: None,
237 last_auth_challenge: None,
238 mcp_session_id: None,
239 headers: persisted_headers,
240 secret_headers: persisted_secret_headers,
241 tool_cache: existing_tool_cache,
242 tools_fetched_at_ms: existing_fetched_at,
243 pending_auth_by_tool: HashMap::new(),
244 secret_header_values,
245 };
246 servers.insert(normalized_name, server);
247 drop(servers);
248 self.persist_state().await;
249 }
250
251 pub async fn set_enabled(&self, name: &str, enabled: bool) -> bool {
252 let mut servers = self.servers.write().await;
253 let Some(server) = servers.get_mut(name) else {
254 return false;
255 };
256 server.enabled = enabled;
257 if !enabled {
258 server.connected = false;
259 server.pid = None;
260 server.last_auth_challenge = None;
261 server.mcp_session_id = None;
262 server.pending_auth_by_tool.clear();
263 }
264 drop(servers);
265 if !enabled {
266 if let Some(mut child) = self.processes.lock().await.remove(name) {
267 let _ = child.kill().await;
268 let _ = child.wait().await;
269 }
270 }
271 self.persist_state().await;
272 true
273 }
274
275 pub async fn remove(&self, name: &str) -> bool {
276 let removed_server = {
277 let mut servers = self.servers.write().await;
278 servers.remove(name)
279 };
280 let Some(server) = removed_server else {
281 return false;
282 };
283 let current_tenant = local_tenant_context();
284 delete_secret_header_refs(&server.secret_headers, ¤t_tenant);
285
286 if let Some(mut child) = self.processes.lock().await.remove(name) {
287 let _ = child.kill().await;
288 let _ = child.wait().await;
289 }
290 self.persist_state().await;
291 true
292 }
293
294 pub async fn connect(&self, name: &str) -> bool {
295 let server = {
296 let servers = self.servers.read().await;
297 let Some(server) = servers.get(name) else {
298 return false;
299 };
300 server.clone()
301 };
302
303 if !server.enabled {
304 let mut servers = self.servers.write().await;
305 if let Some(entry) = servers.get_mut(name) {
306 entry.connected = false;
307 entry.pid = None;
308 entry.last_error = Some("MCP server is disabled".to_string());
309 entry.last_auth_challenge = None;
310 entry.mcp_session_id = None;
311 entry.pending_auth_by_tool.clear();
312 }
313 drop(servers);
314 self.persist_state().await;
315 return false;
316 }
317
318 if let Some(command_text) = parse_stdio_transport(&server.transport) {
319 return self.connect_stdio(name, command_text).await;
320 }
321
322 if parse_remote_endpoint(&server.transport).is_some() {
323 return self.refresh(name).await.is_ok();
324 }
325
326 let mut servers = self.servers.write().await;
327 if let Some(entry) = servers.get_mut(name) {
328 entry.connected = true;
329 entry.pid = None;
330 entry.last_error = None;
331 entry.last_auth_challenge = None;
332 entry.mcp_session_id = None;
333 entry.pending_auth_by_tool.clear();
334 }
335 drop(servers);
336 self.persist_state().await;
337 true
338 }
339
340 pub async fn refresh(&self, name: &str) -> Result<Vec<McpRemoteTool>, String> {
341 let server = {
342 let servers = self.servers.read().await;
343 let Some(server) = servers.get(name) else {
344 return Err("MCP server not found".to_string());
345 };
346 server.clone()
347 };
348
349 if !server.enabled {
350 return Err("MCP server is disabled".to_string());
351 }
352
353 let endpoint = parse_remote_endpoint(&server.transport)
354 .ok_or_else(|| "MCP refresh currently supports HTTP/S transports only".to_string())?;
355
356 let request_headers = effective_headers(&server);
357 let (tools, session_id) = match self
358 .discover_remote_tools(name, &endpoint, &request_headers)
359 .await
360 {
361 Ok(result) => result,
362 Err(DiscoverRemoteToolsError::AuthChallenge(challenge)) => {
363 let mut servers = self.servers.write().await;
364 if let Some(entry) = servers.get_mut(name) {
365 entry.connected = false;
366 entry.pid = None;
367 entry.last_error = Some(challenge.message.clone());
368 entry.last_auth_challenge = Some(challenge.clone());
369 entry.mcp_session_id = None;
370 entry.pending_auth_by_tool.clear();
371 entry.tool_cache.clear();
372 entry.tools_fetched_at_ms = None;
373 }
374 drop(servers);
375 self.persist_state().await;
376 return Err(format!(
377 "MCP server '{name}' requires authorization: {}",
378 challenge.message
379 ));
380 }
381 Err(DiscoverRemoteToolsError::Message(err)) => {
382 let mut servers = self.servers.write().await;
383 if let Some(entry) = servers.get_mut(name) {
384 entry.connected = false;
385 entry.pid = None;
386 entry.last_error = Some(err.clone());
387 entry.last_auth_challenge = None;
388 entry.mcp_session_id = None;
389 entry.pending_auth_by_tool.clear();
390 entry.tool_cache.clear();
391 entry.tools_fetched_at_ms = None;
392 }
393 drop(servers);
394 self.persist_state().await;
395 return Err(err);
396 }
397 };
398
399 let now = now_ms();
400 let cache = tools
401 .iter()
402 .map(|tool| McpToolCacheEntry {
403 tool_name: tool.tool_name.clone(),
404 description: tool.description.clone(),
405 input_schema: tool.input_schema.clone(),
406 fetched_at_ms: now,
407 schema_hash: schema_hash(&tool.input_schema),
408 })
409 .collect::<Vec<_>>();
410
411 let mut servers = self.servers.write().await;
412 if let Some(entry) = servers.get_mut(name) {
413 entry.connected = true;
414 entry.pid = None;
415 entry.last_error = None;
416 entry.last_auth_challenge = None;
417 entry.mcp_session_id = session_id;
418 entry.tool_cache = cache;
419 entry.tools_fetched_at_ms = Some(now);
420 entry.pending_auth_by_tool.clear();
421 }
422 drop(servers);
423 self.persist_state().await;
424 Ok(self.server_tools(name).await)
425 }
426
427 pub async fn disconnect(&self, name: &str) -> bool {
428 if let Some(mut child) = self.processes.lock().await.remove(name) {
429 let _ = child.kill().await;
430 let _ = child.wait().await;
431 }
432 let mut servers = self.servers.write().await;
433 if let Some(server) = servers.get_mut(name) {
434 server.connected = false;
435 server.pid = None;
436 server.last_auth_challenge = None;
437 server.mcp_session_id = None;
438 server.pending_auth_by_tool.clear();
439 drop(servers);
440 self.persist_state().await;
441 return true;
442 }
443 false
444 }
445
446 pub async fn complete_auth(&self, name: &str) -> bool {
447 let mut servers = self.servers.write().await;
448 let Some(server) = servers.get_mut(name) else {
449 return false;
450 };
451 server.last_error = None;
452 server.last_auth_challenge = None;
453 server.pending_auth_by_tool.clear();
454 drop(servers);
455 self.persist_state().await;
456 true
457 }
458
459 pub async fn set_auth_kind(&self, name: &str, auth_kind: String) -> bool {
460 let normalized = normalize_auth_kind(&auth_kind);
461 let mut servers = self.servers.write().await;
462 let Some(server) = servers.get_mut(name) else {
463 return false;
464 };
465 server.auth_kind = normalized;
466 drop(servers);
467 self.persist_state().await;
468 true
469 }
470
471 pub async fn record_server_auth_challenge(
472 &self,
473 name: &str,
474 challenge: McpAuthChallenge,
475 last_error: Option<String>,
476 ) -> bool {
477 let mut servers = self.servers.write().await;
478 let Some(server) = servers.get_mut(name) else {
479 return false;
480 };
481 let tool_key = canonical_tool_key(&challenge.tool_name);
482 server.connected = false;
483 server.pid = None;
484 server.last_error = last_error.or_else(|| Some(challenge.message.clone()));
485 server.last_auth_challenge = Some(challenge.clone());
486 server.mcp_session_id = None;
487 server.pending_auth_by_tool.clear();
488 server
489 .pending_auth_by_tool
490 .insert(tool_key, pending_auth_from_challenge(&challenge));
491 drop(servers);
492 self.persist_state().await;
493 true
494 }
495
496 pub async fn clear_server_auth_challenge(&self, name: &str) -> bool {
497 let mut servers = self.servers.write().await;
498 let Some(server) = servers.get_mut(name) else {
499 return false;
500 };
501 server.last_auth_challenge = None;
502 server.pending_auth_by_tool.clear();
503 drop(servers);
504 self.persist_state().await;
505 true
506 }
507
508 pub async fn set_bearer_token(&self, name: &str, token: &str) -> Result<bool, String> {
509 let trimmed = token.trim();
510 if trimmed.is_empty() {
511 return Err("oauth access token cannot be empty".to_string());
512 }
513 let current_tenant = local_tenant_context();
514 let mut servers = self.servers.write().await;
515 let Some(server) = servers.get_mut(name) else {
516 return Ok(false);
517 };
518 let header_name = "Authorization".to_string();
519 let secret_id = mcp_header_secret_id(name, &header_name);
520 tandem_core::set_provider_auth(&secret_id, &format!("Bearer {trimmed}"))
521 .map_err(|error| error.to_string())?;
522 server.secret_headers.insert(
523 header_name.clone(),
524 McpSecretRef::Store {
525 secret_id: secret_id.clone(),
526 tenant_context: current_tenant,
527 },
528 );
529 server
530 .secret_header_values
531 .insert(header_name.clone(), format!("Bearer {trimmed}"));
532 server.headers.remove(&header_name);
533 drop(servers);
534 self.persist_state().await;
535 Ok(true)
536 }
537
538 pub async fn list_tools(&self) -> Vec<McpRemoteTool> {
539 let mut out = self
540 .servers
541 .read()
542 .await
543 .values()
544 .filter(|server| server.enabled && server.connected)
545 .flat_map(server_tool_rows)
546 .collect::<Vec<_>>();
547 out.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
548 out
549 }
550
551 pub async fn server_tools(&self, name: &str) -> Vec<McpRemoteTool> {
552 let Some(server) = self.servers.read().await.get(name).cloned() else {
553 return Vec::new();
554 };
555 let mut rows = server_tool_rows(&server);
556 rows.sort_by(|a, b| a.namespaced_name.cmp(&b.namespaced_name));
557 rows
558 }
559
560 pub async fn call_tool(
561 &self,
562 server_name: &str,
563 tool_name: &str,
564 args: Value,
565 ) -> Result<ToolResult, String> {
566 let server = {
567 let servers = self.servers.read().await;
568 let Some(server) = servers.get(server_name) else {
569 return Err(format!("MCP server '{server_name}' not found"));
570 };
571 server.clone()
572 };
573
574 if !server.enabled {
575 return Err(format!("MCP server '{server_name}' is disabled"));
576 }
577 if !server.connected {
578 return Err(format!("MCP server '{server_name}' is not connected"));
579 }
580
581 let endpoint = parse_remote_endpoint(&server.transport).ok_or_else(|| {
582 "MCP tools/call currently supports HTTP/S transports only".to_string()
583 })?;
584 let canonical_tool = canonical_tool_key(tool_name);
585 let now = now_ms();
586 if let Some(blocked) = pending_auth_short_circuit(
587 &server,
588 &canonical_tool,
589 tool_name,
590 now,
591 MCP_AUTH_REPROBE_COOLDOWN_MS,
592 ) {
593 return Ok(ToolResult {
594 output: blocked.output,
595 metadata: json!({
596 "server": server_name,
597 "tool": tool_name,
598 "result": Value::Null,
599 "mcpAuth": blocked.mcp_auth
600 }),
601 });
602 }
603 let normalized_args = normalize_mcp_tool_args(&server, tool_name, args);
604
605 {
606 let mut servers = self.servers.write().await;
607 if let Some(row) = servers.get_mut(server_name) {
608 if let Some(pending) = row.pending_auth_by_tool.get_mut(&canonical_tool) {
609 pending.last_probe_ms = now;
610 }
611 }
612 }
613
614 let request = json!({
615 "jsonrpc": "2.0",
616 "id": format!("call-{}-{}", server_name, now_ms()),
617 "method": "tools/call",
618 "params": {
619 "name": tool_name,
620 "arguments": normalized_args
621 }
622 });
623 let (response, session_id) = post_json_rpc_with_session(
624 &endpoint,
625 &effective_headers(&server),
626 request,
627 server.mcp_session_id.as_deref(),
628 )
629 .await?;
630 if session_id.is_some() {
631 let mut servers = self.servers.write().await;
632 if let Some(row) = servers.get_mut(server_name) {
633 row.mcp_session_id = session_id;
634 }
635 drop(servers);
636 self.persist_state().await;
637 }
638
639 if let Some(err) = response.get("error") {
640 if let Some(challenge) = extract_auth_challenge(err, tool_name) {
641 let output = format!(
642 "{}\n\nAuthorize here: {}",
643 challenge.message, challenge.authorization_url
644 );
645 {
646 let mut servers = self.servers.write().await;
647 if let Some(row) = servers.get_mut(server_name) {
648 row.last_auth_challenge = Some(challenge.clone());
649 row.last_error = None;
650 row.pending_auth_by_tool.insert(
651 canonical_tool.clone(),
652 pending_auth_from_challenge(&challenge),
653 );
654 }
655 }
656 self.persist_state().await;
657 return Ok(ToolResult {
658 output,
659 metadata: json!({
660 "server": server_name,
661 "tool": tool_name,
662 "result": Value::Null,
663 "mcpAuth": {
664 "required": true,
665 "challengeId": challenge.challenge_id,
666 "tool": challenge.tool_name,
667 "authorizationUrl": challenge.authorization_url,
668 "message": challenge.message,
669 "status": challenge.status
670 }
671 }),
672 });
673 }
674 let message = err
675 .get("message")
676 .and_then(|v| v.as_str())
677 .unwrap_or("MCP tools/call failed");
678 return Err(message.to_string());
679 }
680
681 let result = response.get("result").cloned().unwrap_or(Value::Null);
682 let auth_challenge = extract_auth_challenge(&result, tool_name);
683 let output = if let Some(challenge) = auth_challenge.as_ref() {
684 format!(
685 "{}\n\nAuthorize here: {}",
686 challenge.message, challenge.authorization_url
687 )
688 } else {
689 result
690 .get("content")
691 .map(render_mcp_content)
692 .or_else(|| result.get("output").map(|v| v.to_string()))
693 .unwrap_or_else(|| result.to_string())
694 };
695
696 {
697 let mut servers = self.servers.write().await;
698 if let Some(row) = servers.get_mut(server_name) {
699 row.last_auth_challenge = auth_challenge.clone();
700 if let Some(challenge) = auth_challenge.as_ref() {
701 row.pending_auth_by_tool.insert(
702 canonical_tool.clone(),
703 pending_auth_from_challenge(challenge),
704 );
705 } else {
706 row.pending_auth_by_tool.remove(&canonical_tool);
707 }
708 }
709 }
710 self.persist_state().await;
711
712 let auth_metadata = auth_challenge.as_ref().map(|challenge| {
713 json!({
714 "required": true,
715 "challengeId": challenge.challenge_id,
716 "tool": challenge.tool_name,
717 "authorizationUrl": challenge.authorization_url,
718 "message": challenge.message,
719 "status": challenge.status
720 })
721 });
722
723 Ok(ToolResult {
724 output,
725 metadata: json!({
726 "server": server_name,
727 "tool": tool_name,
728 "result": result,
729 "mcpAuth": auth_metadata
730 }),
731 })
732 }
733
734 async fn connect_stdio(&self, name: &str, command_text: &str) -> bool {
735 match spawn_stdio_process(command_text).await {
736 Ok(child) => {
737 let pid = child.id();
738 self.processes.lock().await.insert(name.to_string(), child);
739 let mut servers = self.servers.write().await;
740 if let Some(server) = servers.get_mut(name) {
741 server.connected = true;
742 server.pid = pid;
743 server.last_error = None;
744 server.last_auth_challenge = None;
745 server.pending_auth_by_tool.clear();
746 }
747 drop(servers);
748 self.persist_state().await;
749 true
750 }
751 Err(err) => {
752 let mut servers = self.servers.write().await;
753 if let Some(server) = servers.get_mut(name) {
754 server.connected = false;
755 server.pid = None;
756 server.last_error = Some(err);
757 server.last_auth_challenge = None;
758 server.pending_auth_by_tool.clear();
759 }
760 drop(servers);
761 self.persist_state().await;
762 false
763 }
764 }
765 }
766
767 async fn discover_remote_tools(
768 &self,
769 server_name: &str,
770 endpoint: &str,
771 headers: &HashMap<String, String>,
772 ) -> Result<(Vec<McpRemoteTool>, Option<String>), DiscoverRemoteToolsError> {
773 let initialize = json!({
774 "jsonrpc": "2.0",
775 "id": "initialize-1",
776 "method": "initialize",
777 "params": {
778 "protocolVersion": MCP_PROTOCOL_VERSION,
779 "capabilities": {},
780 "clientInfo": {
781 "name": MCP_CLIENT_NAME,
782 "version": MCP_CLIENT_VERSION,
783 }
784 }
785 });
786 let (init_response, mut session_id) =
787 post_json_rpc_with_session(endpoint, headers, initialize, None).await?;
788 if let Some(err) = init_response.get("error") {
789 if let Some(challenge) = extract_auth_challenge(err, server_name) {
790 return Err(DiscoverRemoteToolsError::AuthChallenge(challenge));
791 }
792 let message = err
793 .get("message")
794 .and_then(|v| v.as_str())
795 .unwrap_or("MCP initialize failed");
796 return Err(DiscoverRemoteToolsError::Message(message.to_string()));
797 }
798
799 let tools_list = json!({
800 "jsonrpc": "2.0",
801 "id": "tools-list-1",
802 "method": "tools/list",
803 "params": {}
804 });
805 let (tools_response, next_session_id) =
806 post_json_rpc_with_session(endpoint, headers, tools_list, session_id.as_deref())
807 .await?;
808 if next_session_id.is_some() {
809 session_id = next_session_id;
810 }
811 if let Some(err) = tools_response.get("error") {
812 if let Some(challenge) = extract_auth_challenge(err, server_name) {
813 return Err(DiscoverRemoteToolsError::AuthChallenge(challenge));
814 }
815 let message = err
816 .get("message")
817 .and_then(|v| v.as_str())
818 .unwrap_or("MCP tools/list failed");
819 return Err(DiscoverRemoteToolsError::Message(message.to_string()));
820 }
821
822 let tools = tools_response
823 .get("result")
824 .and_then(|v| v.get("tools"))
825 .and_then(|v| v.as_array())
826 .ok_or_else(|| "MCP tools/list result missing tools array".to_string())?;
827
828 let now = now_ms();
829 let mut out = Vec::new();
830 for row in tools {
831 let Some(tool_name) = row.get("name").and_then(|v| v.as_str()) else {
832 continue;
833 };
834 let description = row
835 .get("description")
836 .and_then(|v| v.as_str())
837 .unwrap_or("")
838 .to_string();
839 let mut input_schema = row
840 .get("inputSchema")
841 .or_else(|| row.get("input_schema"))
842 .cloned()
843 .unwrap_or_else(|| json!({"type":"object"}));
844 normalize_tool_input_schema(&mut input_schema);
845 out.push(McpRemoteTool {
846 server_name: String::new(),
847 tool_name: tool_name.to_string(),
848 namespaced_name: String::new(),
849 description,
850 input_schema,
851 fetched_at_ms: now,
852 schema_hash: String::new(),
853 });
854 }
855
856 Ok((out, session_id))
857 }
858
859 async fn persist_state(&self) {
860 let snapshot = self.servers.read().await.clone();
861 persist_state_blocking(self.state_file.as_path(), &snapshot);
862 }
863}
864
865impl Default for McpRegistry {
866 fn default() -> Self {
867 Self::new()
868 }
869}
870
871fn default_enabled() -> bool {
872 true
873}
874
875fn persist_state_blocking(path: &Path, snapshot: &HashMap<String, McpServer>) {
876 if let Some(parent) = path.parent() {
877 let _ = std::fs::create_dir_all(parent);
878 }
879 if let Ok(payload) = serde_json::to_string_pretty(snapshot) {
880 let _ = std::fs::write(path, payload);
881 }
882}
883
884fn resolve_state_file() -> PathBuf {
885 if let Ok(path) = std::env::var("TANDEM_MCP_REGISTRY") {
886 return PathBuf::from(path);
887 }
888 if let Ok(state_dir) = std::env::var("TANDEM_STATE_DIR") {
889 let trimmed = state_dir.trim();
890 if !trimmed.is_empty() {
891 return PathBuf::from(trimmed).join("mcp_servers.json");
892 }
893 }
894 if let Some(data_dir) = dirs::data_dir() {
895 return data_dir
896 .join("tandem")
897 .join("data")
898 .join("mcp_servers.json");
899 }
900 dirs::home_dir()
901 .map(|home| home.join(".tandem").join("data").join("mcp_servers.json"))
902 .unwrap_or_else(|| PathBuf::from("mcp_servers.json"))
903}
904
905fn load_state(path: &Path) -> (HashMap<String, McpServer>, bool) {
906 let Ok(raw) = std::fs::read_to_string(path) else {
907 return (HashMap::new(), false);
908 };
909 let mut migrated = false;
910 let mut parsed = serde_json::from_str::<HashMap<String, McpServer>>(&raw).unwrap_or_default();
911 for (name, server) in parsed.iter_mut() {
912 let tenant_context = local_tenant_context();
913 let (headers, secret_headers, secret_header_values, server_migrated) =
914 migrate_server_headers(name, server, &tenant_context);
915 migrated = migrated || server_migrated;
916 server.headers = headers;
917 server.secret_headers = secret_headers;
918 server.secret_header_values = secret_header_values;
919 }
920 (parsed, migrated)
921}
922
923fn migrate_server_headers(
924 server_name: &str,
925 server: &McpServer,
926 current_tenant: &TenantContext,
927) -> (
928 HashMap<String, String>,
929 HashMap<String, McpSecretRef>,
930 HashMap<String, String>,
931 bool,
932) {
933 let original_effective = effective_headers(server);
934 let mut persisted_secret_headers = server.secret_headers.clone();
935 let mut secret_header_values =
936 resolve_secret_header_values(&persisted_secret_headers, current_tenant);
937 let mut persisted_headers = server.headers.clone();
938 let mut migrated = false;
939
940 let header_keys = persisted_headers.keys().cloned().collect::<Vec<_>>();
941 for header_name in header_keys {
942 let Some(value) = persisted_headers.get(&header_name).cloned() else {
943 continue;
944 };
945 if persisted_secret_headers.contains_key(&header_name) {
946 continue;
947 }
948 if let Some(secret_ref) = parse_secret_header_reference(value.trim()) {
949 persisted_headers.remove(&header_name);
950 let resolved =
951 resolve_secret_ref_value(&secret_ref, current_tenant).unwrap_or_default();
952 persisted_secret_headers.insert(header_name.clone(), secret_ref);
953 if !resolved.is_empty() {
954 secret_header_values.insert(header_name.clone(), resolved);
955 }
956 migrated = true;
957 continue;
958 }
959 if header_name_is_sensitive(&header_name) && !value.trim().is_empty() {
960 let secret_id = mcp_header_secret_id(server_name, &header_name);
961 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
962 persisted_headers.remove(&header_name);
963 persisted_secret_headers.insert(
964 header_name.clone(),
965 McpSecretRef::Store {
966 secret_id: secret_id.clone(),
967 tenant_context: current_tenant.clone(),
968 },
969 );
970 secret_header_values.insert(header_name.clone(), value);
971 migrated = true;
972 }
973 }
974 }
975
976 if !migrated {
977 let effective = combine_headers(&persisted_headers, &secret_header_values);
978 migrated = effective != original_effective;
979 }
980
981 (
982 persisted_headers,
983 persisted_secret_headers,
984 secret_header_values,
985 migrated,
986 )
987}
988
989fn split_headers_for_storage(
990 server_name: &str,
991 headers: HashMap<String, String>,
992 explicit_secret_headers: HashMap<String, McpSecretRef>,
993 current_tenant: &TenantContext,
994) -> (
995 HashMap<String, String>,
996 HashMap<String, McpSecretRef>,
997 HashMap<String, String>,
998) {
999 let mut persisted_headers = HashMap::new();
1000 let mut persisted_secret_headers = HashMap::new();
1001 let mut secret_header_values = HashMap::new();
1002
1003 for (header_name, raw_value) in headers {
1004 let value = raw_value.trim().to_string();
1005 if value.is_empty() {
1006 continue;
1007 }
1008 if let Some(secret_ref) = parse_secret_header_reference(&value) {
1009 if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
1010 secret_header_values.insert(header_name.clone(), resolved);
1011 }
1012 persisted_secret_headers.insert(header_name, secret_ref);
1013 continue;
1014 }
1015 if header_name_is_sensitive(&header_name) {
1016 let secret_id = mcp_header_secret_id(server_name, &header_name);
1017 if tandem_core::set_provider_auth(&secret_id, &value).is_ok() {
1018 persisted_secret_headers.insert(
1019 header_name.clone(),
1020 McpSecretRef::Store {
1021 secret_id: secret_id.clone(),
1022 tenant_context: current_tenant.clone(),
1023 },
1024 );
1025 secret_header_values.insert(header_name, value);
1026 continue;
1027 }
1028 }
1029 persisted_headers.insert(header_name, value);
1030 }
1031
1032 for (header_name, secret_ref) in explicit_secret_headers {
1033 if let Some(resolved) = resolve_secret_ref_value(&secret_ref, current_tenant) {
1034 secret_header_values.insert(header_name.clone(), resolved);
1035 }
1036 persisted_headers.remove(&header_name);
1037 persisted_secret_headers.insert(header_name, secret_ref);
1038 }
1039
1040 (
1041 persisted_headers,
1042 persisted_secret_headers,
1043 secret_header_values,
1044 )
1045}
1046
1047fn combine_headers(
1048 headers: &HashMap<String, String>,
1049 secret_header_values: &HashMap<String, String>,
1050) -> HashMap<String, String> {
1051 let mut combined = headers.clone();
1052 for (key, value) in secret_header_values {
1053 if !value.trim().is_empty() {
1054 combined.insert(key.clone(), value.clone());
1055 }
1056 }
1057 combined
1058}
1059
1060fn effective_headers(server: &McpServer) -> HashMap<String, String> {
1061 combine_headers(&server.headers, &server.secret_header_values)
1062}
1063
1064fn redacted_server_view(server: &McpServer) -> McpServer {
1065 let mut clone = server.clone();
1066 for (header_name, secret_ref) in &clone.secret_headers {
1067 clone.headers.insert(
1068 header_name.clone(),
1069 redacted_secret_header_value(secret_ref),
1070 );
1071 }
1072 clone.secret_header_values.clear();
1073 clone
1074}
1075
1076fn normalize_auth_kind(raw: &str) -> String {
1077 match raw.trim().to_ascii_lowercase().as_str() {
1078 "oauth" | "auto" | "bearer" | "x-api-key" | "custom" | "none" => {
1079 raw.trim().to_ascii_lowercase()
1080 }
1081 _ => String::new(),
1082 }
1083}
1084
1085fn redacted_secret_header_value(secret_ref: &McpSecretRef) -> String {
1086 match secret_ref {
1087 McpSecretRef::BearerEnv { .. } => "Bearer ".to_string(),
1088 McpSecretRef::Env { .. } | McpSecretRef::Store { .. } => MCP_SECRET_PLACEHOLDER.to_string(),
1089 }
1090}
1091
1092fn resolve_secret_header_values(
1093 secret_headers: &HashMap<String, McpSecretRef>,
1094 current_tenant: &TenantContext,
1095) -> HashMap<String, String> {
1096 let mut out = HashMap::new();
1097 for (header_name, secret_ref) in secret_headers {
1098 if let Some(value) = resolve_secret_ref_value(secret_ref, current_tenant) {
1099 if !value.trim().is_empty() {
1100 out.insert(header_name.clone(), value);
1101 }
1102 }
1103 }
1104 out
1105}
1106
1107fn delete_secret_header_refs(
1108 secret_headers: &HashMap<String, McpSecretRef>,
1109 current_tenant: &TenantContext,
1110) {
1111 for secret_ref in secret_headers.values() {
1112 if let McpSecretRef::Store {
1113 secret_id,
1114 tenant_context,
1115 } = secret_ref
1116 {
1117 if tenant_context != current_tenant {
1118 continue;
1119 }
1120 let _ = tandem_core::delete_provider_auth(secret_id);
1121 }
1122 }
1123}