use super::*;
use base64::Engine;
use serde::{Deserialize, Serialize};
use sha2::Digest;
use tandem_runtime::McpAuthChallenge;
use tandem_types::RequestPrincipal;
use uuid::Uuid;
const BUILTIN_GITHUB_MCP_SERVER_NAME: &str = "github";
const BUILTIN_GITHUB_MCP_TRANSPORT_URL: &str = "https://api.githubcopilot.com/mcp/";
const BUILTIN_TANDEM_DOCS_MCP_SERVER_NAME: &str = "tandem-mcp";
const BUILTIN_TANDEM_DOCS_MCP_TRANSPORT_URL: &str = "https://tandem.ac/mcp";
const MCP_OAUTH_SESSION_TTL_MS: u64 = 10 * 60 * 1000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct McpOAuthSessionRecord {
pub session_id: String,
pub server_name: String,
pub status: String,
pub created_at_ms: u64,
pub expires_at_ms: u64,
pub redirect_uri: String,
pub state: String,
pub code_verifier: String,
pub authorization_url: String,
pub token_endpoint: String,
pub client_id: String,
pub client_secret: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
pub(super) struct McpOAuthCallbackInput {
pub code: Option<String>,
pub state: Option<String>,
pub error: Option<String>,
pub error_description: Option<String>,
}
#[derive(Debug, Deserialize)]
struct McpProtectedResourceMetadata {
authorization_servers: Option<Vec<String>>,
}
#[derive(Debug, Deserialize)]
struct McpAuthorizationServerMetadata {
authorization_endpoint: String,
token_endpoint: String,
registration_endpoint: Option<String>,
}
#[derive(Debug, Deserialize)]
struct McpDynamicClientRegistrationResponse {
client_id: String,
client_secret: Option<String>,
}
#[derive(Debug, Deserialize)]
struct McpTokenExchangeResponse {
access_token: Option<String>,
refresh_token: Option<String>,
expires_in: Option<u64>,
}
#[derive(Debug)]
struct McpOAuthBootstrap {
authorization_endpoint: String,
token_endpoint: String,
registration_endpoint: String,
resource_metadata_url: String,
}
pub(super) async fn bootstrap_mcp_servers_when_ready(state: AppState) {
if state.wait_until_ready_or_failed(120, 250).await {
bootstrap_mcp_servers(&state).await;
} else {
tracing::warn!("mcp bootstrap: skipped because runtime startup failed or timed out");
}
}
pub(super) async fn bootstrap_mcp_servers(state: &AppState) {
let _ = ensure_builtin_github_mcp_server(state).await;
let _ = ensure_builtin_tandem_docs_mcp_server(state).await;
let _ = ensure_hosted_kb_mcp_server(state).await;
let mut enabled_servers = state
.mcp
.list()
.await
.into_iter()
.filter_map(|(name, server)| if server.enabled { Some(name) } else { None })
.collect::<Vec<_>>();
enabled_servers.sort();
for name in enabled_servers {
let connected = connect_with_backoff(state, &name).await;
if !connected {
tracing::warn!(
"mcp bootstrap: gave up connecting server '{}' after retries",
name
);
continue;
}
let count = sync_mcp_tools_for_server(state, &name).await;
state.event_bus.publish(EngineEvent::new(
"mcp.server.connected",
json!({
"name": name,
"status": "connected",
"source": "startup_bootstrap"
}),
));
state.event_bus.publish(EngineEvent::new(
"mcp.tools.updated",
json!({
"name": name,
"count": count,
"source": "startup_bootstrap"
}),
));
tracing::info!(
"mcp bootstrap: connected '{}' with {} tools registered",
name,
count
);
}
}
async fn connect_with_backoff(state: &AppState, name: &str) -> bool {
const DELAYS_SECS: &[u64] = &[0, 2, 4, 6, 8, 10];
for (attempt, delay) in DELAYS_SECS.iter().enumerate() {
if *delay > 0 {
tokio::time::sleep(std::time::Duration::from_secs(*delay)).await;
}
if state.mcp.connect(name).await {
if attempt > 0 {
tracing::info!(
"mcp bootstrap: connected '{}' after {} retr{}",
name,
attempt,
if attempt == 1 { "y" } else { "ies" }
);
}
return true;
}
tracing::debug!(
"mcp bootstrap: connect to '{}' attempt {} failed; will retry",
name,
attempt + 1
);
}
false
}
fn builtin_tandem_docs_mcp_transport_url() -> String {
std::env::var("TANDEM_DOCS_MCP_TRANSPORT_URL")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| BUILTIN_TANDEM_DOCS_MCP_TRANSPORT_URL.to_string())
}
async fn ensure_hosted_kb_mcp_server(state: &AppState) -> bool {
let cfg = state.config.get_effective_value().await;
let Some(hosted) = cfg.get("hosted").and_then(Value::as_object) else {
return false;
};
let kb_image = hosted
.get("kb_image")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
let Some(kb_image) = kb_image else {
return false;
};
let kb_admin_url = hosted
.get("kb_admin_url")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("http://tandem-kb-mcp:39736");
let transport = format!("{}/mcp", kb_admin_url.trim_end_matches('/'));
tracing::info!(
"mcp bootstrap: ensuring hosted KB MCP server from {}",
kb_image
);
let connected = ensure_remote_mcp_server(state, "kb", &transport, HashMap::new()).await;
let _ = state
.mcp
.set_grounding_metadata("kb", Some("knowledgebase".to_string()), Some(true))
.await;
connected
}
fn github_mcp_headers_from_auth() -> Option<HashMap<String, String>> {
let token = std::env::var("GITHUB_PERSONAL_ACCESS_TOKEN")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.or_else(|| {
std::env::var("GITHUB_TOKEN")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
})
.or_else(|| {
tandem_core::load_provider_auth()
.get("github")
.cloned()
.filter(|value| !value.trim().is_empty())
})
.or_else(|| {
tandem_core::load_provider_auth()
.get("copilot")
.cloned()
.filter(|value| !value.trim().is_empty())
})?;
let mut headers = HashMap::new();
headers.insert("Authorization".to_string(), format!("Bearer {token}"));
Some(headers)
}
pub(super) async fn ensure_remote_mcp_server(
state: &AppState,
name: &str,
transport_url: &str,
headers: HashMap<String, String>,
) -> bool {
let existing = state.mcp.list().await.get(name).cloned();
if let Some(server) = existing {
if !server.enabled {
return false;
}
if server.transport.trim() == transport_url.trim() && !headers.is_empty() {
let mut effective_headers = server.headers.clone();
for (key, value) in server.secret_header_values {
effective_headers.insert(key, value);
}
if effective_headers != headers {
state
.mcp
.add_or_update(
name.to_string(),
transport_url.to_string(),
headers,
server.enabled,
)
.await;
}
}
let connected = state.mcp.connect(name).await;
if connected {
let _ = sync_mcp_tools_for_server(state, name).await;
}
return connected;
}
state
.mcp
.add_or_update(name.to_string(), transport_url.to_string(), headers, true)
.await;
let connected = state.mcp.connect(name).await;
if connected {
let _ = sync_mcp_tools_for_server(state, name).await;
}
connected
}
pub(super) async fn ensure_builtin_github_mcp_server(state: &AppState) -> bool {
let Some(headers) = github_mcp_headers_from_auth() else {
let existing = state
.mcp
.list()
.await
.get(BUILTIN_GITHUB_MCP_SERVER_NAME)
.cloned();
if let Some(server) = existing {
if !server.enabled {
return false;
}
let connected = state.mcp.connect(BUILTIN_GITHUB_MCP_SERVER_NAME).await;
if connected {
let _ = sync_mcp_tools_for_server(state, BUILTIN_GITHUB_MCP_SERVER_NAME).await;
}
return connected;
}
tracing::info!(
"mcp bootstrap: GitHub PAT not available, skipping builtin GitHub MCP server"
);
return false;
};
ensure_remote_mcp_server(
state,
BUILTIN_GITHUB_MCP_SERVER_NAME,
BUILTIN_GITHUB_MCP_TRANSPORT_URL,
headers,
)
.await
}
pub(super) async fn ensure_builtin_tandem_docs_mcp_server(state: &AppState) -> bool {
let transport = builtin_tandem_docs_mcp_transport_url();
ensure_remote_mcp_server(
state,
BUILTIN_TANDEM_DOCS_MCP_SERVER_NAME,
&transport,
HashMap::new(),
)
.await
}
#[derive(Debug, Deserialize, Default)]
pub(super) struct McpAddInput {
pub name: Option<String>,
pub transport: Option<String>,
pub auth_kind: Option<String>,
pub headers: Option<HashMap<String, String>>,
pub secret_headers: Option<HashMap<String, tandem_runtime::McpSecretRef>>,
pub enabled: Option<bool>,
pub allowed_tools: Option<Vec<String>>,
pub purpose: Option<String>,
pub grounding_required: Option<bool>,
}
#[derive(Debug, Deserialize, Default)]
pub(super) struct McpPatchInput {
pub enabled: Option<bool>,
pub allowed_tools: Option<Vec<String>>,
pub clear_allowed_tools: Option<bool>,
pub purpose: Option<String>,
pub grounding_required: Option<bool>,
}
#[derive(Clone)]
pub(super) struct McpBridgeTool {
pub schema: ToolSchema,
pub mcp: tandem_runtime::McpRegistry,
pub server_name: String,
pub tool_name: String,
}
#[async_trait]
impl Tool for McpBridgeTool {
fn schema(&self) -> ToolSchema {
self.schema.clone()
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
self.mcp
.call_tool(&self.server_name, &self.tool_name, args)
.await
.map_err(anyhow::Error::msg)
}
}
pub(super) async fn list_mcp(State(state): State<AppState>) -> Json<Value> {
Json(json!(state.mcp.list_public().await))
}
pub(super) async fn add_mcp(
State(state): State<AppState>,
Json(input): Json<McpAddInput>,
) -> Json<Value> {
let name = input.name.unwrap_or_else(|| "default".to_string());
let transport = input.transport.unwrap_or_else(|| "stdio".to_string());
let auth_kind = normalize_mcp_auth_kind(input.auth_kind.as_deref().unwrap_or_default());
let audit_transport = transport.clone();
state
.mcp
.add_or_update_with_secret_refs(
name.clone(),
transport,
input.headers.unwrap_or_default(),
input.secret_headers.unwrap_or_default(),
input.enabled.unwrap_or(true),
)
.await;
if let Some(allowed_tools) = input.allowed_tools.clone() {
let _ = state
.mcp
.set_allowed_tools(&name, Some(allowed_tools))
.await;
}
if input.purpose.is_some() || input.grounding_required.is_some() {
let _ = state
.mcp
.set_grounding_metadata(&name, input.purpose.clone(), input.grounding_required)
.await;
}
if !auth_kind.is_empty() {
let _ = state.mcp.set_auth_kind(&name, auth_kind.clone()).await;
}
state.event_bus.publish(EngineEvent::new(
"mcp.server.updated",
json!({
"name": name,
}),
));
let _ = crate::audit::append_protected_audit_event(
&state,
"mcp.server.updated",
&tandem_types::TenantContext::local_implicit(),
None,
json!({
"name": name,
"transport": audit_transport,
"enabled": input.enabled.unwrap_or(true),
"auth_kind": auth_kind,
"allowed_tools": input.allowed_tools,
"purpose": input.purpose,
"grounding_required": input.grounding_required,
}),
)
.await;
Json(json!({"ok": true}))
}
fn normalize_mcp_auth_kind(raw: &str) -> String {
match raw.trim().to_ascii_lowercase().as_str() {
"oauth" | "auto" | "bearer" | "x-api-key" | "custom" | "none" => {
raw.trim().to_ascii_lowercase()
}
_ => String::new(),
}
}
fn mcp_tool_names_for_server(tool_names: &[String], server_name: &str) -> Vec<String> {
let prefix = format!("mcp.{}.", mcp_namespace_segment(server_name));
let mut tools = tool_names
.iter()
.filter(|tool_name| tool_name.starts_with(&prefix))
.cloned()
.collect::<Vec<_>>();
tools.sort();
tools.dedup();
tools
}
#[derive(Default)]
struct McpToolScopeFilter {
wildcard_server_segments: std::collections::HashSet<String>,
exact_tool_names: std::collections::HashSet<String>,
}
fn parse_mcp_tool_scope_filter(tool_names: &[String]) -> McpToolScopeFilter {
let mut filter = McpToolScopeFilter::default();
for raw in tool_names {
let tool_name = raw.trim();
if tool_name.is_empty() {
continue;
}
if let Some(rest) = tool_name.strip_prefix("mcp.") {
if let Some((server_segment, tool_segment)) = rest.split_once('.') {
if tool_segment == "*" {
filter
.wildcard_server_segments
.insert(server_segment.to_string());
} else {
filter
.exact_tool_names
.insert(format!("mcp.{server_segment}.{tool_segment}"));
}
}
}
}
filter
}
fn filter_mcp_snapshot_by_tool_scope(snapshot: Value, filter: &McpToolScopeFilter) -> Value {
let mut snapshot = snapshot;
if filter.wildcard_server_segments.is_empty() && filter.exact_tool_names.is_empty() {
return snapshot;
}
if let Some(root) = snapshot.as_object_mut() {
if let Some(Value::Array(rows)) = root.get_mut("servers") {
rows.retain(|row| {
let server_name = row.get("name").and_then(Value::as_str).unwrap_or("");
let server_segment = mcp_namespace_segment(server_name);
if filter.wildcard_server_segments.contains(&server_segment) {
return true;
}
let exact_tools = row
.get("remote_tools")
.and_then(Value::as_array)
.map(|tools| {
tools
.iter()
.filter_map(Value::as_str)
.map(str::to_string)
.collect::<std::collections::HashSet<_>>()
})
.unwrap_or_default();
exact_tools
.iter()
.any(|tool_name| filter.exact_tool_names.contains(tool_name))
});
}
if let Some(Value::Array(rows)) = root.get_mut("connected_server_names") {
rows.retain(|row| {
row.as_str().is_some_and(|server| {
let segment = mcp_namespace_segment(server);
filter.wildcard_server_segments.contains(&segment)
|| filter
.exact_tool_names
.iter()
.any(|tool| tool.starts_with(&format!("mcp.{segment}.")))
})
});
}
if let Some(Value::Array(rows)) = root.get_mut("enabled_server_names") {
rows.retain(|row| {
row.as_str().is_some_and(|server| {
let segment = mcp_namespace_segment(server);
filter.wildcard_server_segments.contains(&segment)
|| filter
.exact_tool_names
.iter()
.any(|tool| tool.starts_with(&format!("mcp.{segment}.")))
})
});
}
if let Some(Value::Array(rows)) = root.get_mut("remote_tools") {
rows.retain(|row| {
row.get("server_name")
.and_then(Value::as_str)
.is_some_and(|server| {
let segment = mcp_namespace_segment(server);
if filter.wildcard_server_segments.contains(&segment) {
return true;
}
row.get("namespaced_name")
.and_then(Value::as_str)
.is_some_and(|tool_name| filter.exact_tool_names.contains(tool_name))
})
});
}
if let Some(Value::Array(rows)) = root.get_mut("registered_tools") {
rows.retain(|row| {
row.as_str().is_some_and(|tool_name| {
tool_name == "mcp_list"
|| filter.exact_tool_names.contains(tool_name)
|| filter
.wildcard_server_segments
.iter()
.any(|segment| tool_name.starts_with(&format!("mcp.{segment}.")))
})
});
}
}
snapshot
}
pub(crate) async fn mcp_inventory_snapshot(state: &AppState) -> Value {
let mut server_rows = state.mcp.list().await.into_values().collect::<Vec<_>>();
server_rows.sort_by(|a, b| a.name.cmp(&b.name));
let remote_tools = state.mcp.list_tools().await;
let registered_tool_names = state
.tools
.list()
.await
.into_iter()
.map(|schema| schema.name)
.collect::<Vec<_>>();
let mut connected_server_names = Vec::new();
let mut enabled_server_names = Vec::new();
let mut all_remote_tool_names = Vec::new();
let mut all_registered_tool_names = Vec::new();
let mut servers = Vec::new();
for server in server_rows {
let mut remote_tool_names = remote_tools
.iter()
.filter(|tool| tool.server_name == server.name)
.map(|tool| tool.namespaced_name.trim().to_string())
.filter(|tool_name| !tool_name.is_empty())
.collect::<Vec<_>>();
remote_tool_names.sort();
remote_tool_names.dedup();
let registered_names = mcp_tool_names_for_server(®istered_tool_names, &server.name);
if server.enabled {
enabled_server_names.push(server.name.clone());
}
if server.connected {
connected_server_names.push(server.name.clone());
}
all_remote_tool_names.extend(remote_tool_names.clone());
all_registered_tool_names.extend(registered_names.clone());
let mut pending_auth_tools = server
.pending_auth_by_tool
.keys()
.cloned()
.collect::<Vec<_>>();
pending_auth_tools.sort();
pending_auth_tools.dedup();
servers.push(json!({
"name": server.name,
"transport": server.transport,
"enabled": server.enabled,
"connected": server.connected,
"last_error": server.last_error,
"last_auth_challenge": server.last_auth_challenge,
"pending_auth_tools": pending_auth_tools,
"remote_tool_count": remote_tool_names.len(),
"registered_tool_count": registered_names.len(),
"allowed_tool_count": server.allowed_tools.as_ref().map(|tools| tools.len()).unwrap_or(remote_tool_names.len()),
"allowed_tools": server.allowed_tools.clone(),
"discovered_tool_count": server.tool_cache.len(),
"remote_tools": remote_tool_names,
"registered_tools": registered_names,
}));
}
connected_server_names.sort();
connected_server_names.dedup();
enabled_server_names.sort();
enabled_server_names.dedup();
all_remote_tool_names.sort();
all_remote_tool_names.dedup();
all_registered_tool_names.sort();
all_registered_tool_names.dedup();
json!({
"inventory_version": 1,
"connected_server_names": connected_server_names,
"enabled_server_names": enabled_server_names,
"remote_tools": all_remote_tool_names,
"registered_tools": all_registered_tool_names,
"servers": servers,
})
}
async fn current_mcp_auth_challenge(state: &AppState, name: &str) -> Option<McpAuthChallenge> {
state
.mcp
.list()
.await
.get(name)
.and_then(|server| server.last_auth_challenge.clone())
}
fn effective_mcp_headers(server: &tandem_runtime::McpServer) -> HashMap<String, String> {
let mut headers = server.headers.clone();
for (key, value) in &server.secret_header_values {
headers.insert(key.clone(), value.clone());
}
headers
}
fn mcp_uses_oauth(server: &tandem_runtime::McpServer) -> bool {
server.auth_kind.trim().eq_ignore_ascii_case("oauth")
}
fn mcp_public_base_url(state: &AppState, cfg: &Value) -> String {
cfg.get("hosted")
.and_then(Value::as_object)
.and_then(|hosted| hosted.get("public_url"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.unwrap_or_else(|| state.server_base_url())
}
fn mcp_public_base_url_from_headers(headers: &HeaderMap) -> Option<String> {
if let Some(origin) = headers
.get("origin")
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
{
if let Ok(parsed) = reqwest::Url::parse(origin) {
if let Some(host) = parsed.host_str() {
let mut out = format!("{}://{}", parsed.scheme(), host);
if let Some(port) = parsed.port() {
out.push(':');
out.push_str(&port.to_string());
}
return Some(out);
}
}
}
if let Some(referer) = headers
.get("referer")
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
{
if let Ok(parsed) = reqwest::Url::parse(referer) {
if let Some(host) = parsed.host_str() {
let mut out = format!("{}://{}", parsed.scheme(), host);
if let Some(port) = parsed.port() {
out.push(':');
out.push_str(&port.to_string());
}
return Some(out);
}
}
}
let host = headers
.get("x-forwarded-host")
.and_then(|value| value.to_str().ok())
.map(|value| value.split(',').next().unwrap_or(value).trim())
.filter(|value| !value.is_empty())?;
let proto = headers
.get("x-forwarded-proto")
.and_then(|value| value.to_str().ok())
.map(|value| value.split(',').next().unwrap_or(value).trim())
.filter(|value| !value.is_empty())
.unwrap_or("http");
Some(format!("{proto}://{host}"))
}
fn mcp_oauth_redirect_uri_for_base(base_url: &str, server_name: &str) -> String {
let base = base_url.trim().trim_end_matches('/');
format!(
"{base}/api/engine/mcp/{}/auth/callback",
urlencoding::encode(server_name)
)
}
fn generate_mcp_oauth_state() -> String {
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(format!(
"{}:{}",
Uuid::new_v4(),
Uuid::new_v4()
))
}
fn generate_mcp_pkce_pair() -> (String, String) {
let verifier = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(format!(
"{}:{}",
Uuid::new_v4(),
Uuid::new_v4()
));
let digest = sha2::Sha256::digest(verifier.as_bytes());
let challenge = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(digest);
(verifier, challenge)
}
fn mcp_oauth_provider_id(server_name: &str) -> String {
format!("mcp-oauth::{}", mcp_namespace_segment(server_name))
}
fn www_authenticate_param(header: &str, key: &str) -> Option<String> {
for part in header.split(',') {
let trimmed = part.trim();
let trimmed = trimmed.strip_prefix("Bearer ").unwrap_or(trimmed);
let (candidate_key, candidate_value) = trimmed.split_once('=')?;
if candidate_key.trim().eq_ignore_ascii_case(key) {
let value = candidate_value.trim().trim_matches('"').trim();
if !value.is_empty() {
return Some(value.to_string());
}
}
}
None
}
fn default_mcp_resource_metadata_url(endpoint: &str) -> Option<String> {
let parsed = reqwest::Url::parse(endpoint).ok()?;
let host = parsed.host_str()?;
let mut out = format!("{}://{}", parsed.scheme(), host);
if let Some(port) = parsed.port() {
out.push(':');
out.push_str(&port.to_string());
}
out.push_str("/.well-known/oauth-protected-resource");
out.push_str(parsed.path());
Some(out)
}
fn authorization_server_metadata_url(base: &str) -> String {
let trimmed = base.trim().trim_end_matches('/');
if trimmed.ends_with("/.well-known/oauth-authorization-server") {
trimmed.to_string()
} else {
format!("{trimmed}/.well-known/oauth-authorization-server")
}
}
fn build_mcp_authorization_url(
authorization_endpoint: &str,
client_id: &str,
redirect_uri: &str,
code_challenge: &str,
state: &str,
) -> String {
let pairs = [
("response_type", "code".to_string()),
("client_id", client_id.to_string()),
("redirect_uri", redirect_uri.to_string()),
("code_challenge", code_challenge.to_string()),
("code_challenge_method", "S256".to_string()),
("state", state.to_string()),
];
let query = pairs
.iter()
.map(|(key, value)| format!("{key}={}", urlencoding::encode(value)))
.collect::<Vec<_>>()
.join("&");
format!("{}?{}", authorization_endpoint.trim(), query)
}
fn mcp_oauth_callback_html(ok: bool, title: &str, detail: &str) -> axum::response::Html<String> {
let status = if ok { "Connected" } else { "OAuth failed" };
let body = format!(
"<!doctype html><html><head><meta charset=\"utf-8\"><title>{title}</title></head><body style=\"font-family: sans-serif; padding: 32px;\"><h1>{status}</h1><p>{detail}</p><script>setTimeout(function(){{try{{window.close();}}catch(e){{}}}}, 500);</script></body></html>"
);
axum::response::Html(body)
}
async fn discover_mcp_oauth_bootstrap(
endpoint: &str,
headers: &HashMap<String, String>,
) -> Result<McpOAuthBootstrap, String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(15))
.user_agent(format!("tandem/{}", env!("CARGO_PKG_VERSION")))
.build()
.map_err(|error| format!("failed to build MCP OAuth client: {error}"))?;
let mut request = client
.post(endpoint)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(
reqwest::header::ACCEPT,
"application/json, application/json+rpc, text/event-stream",
)
.json(&json!({
"jsonrpc": "2.0",
"id": "initialize-oauth-discovery",
"method": "initialize",
"params": {
"protocolVersion": "2025-11-25",
"capabilities": {},
"clientInfo": {
"name": "tandem",
"version": env!("CARGO_PKG_VERSION"),
}
}
}));
for (key, value) in headers {
request = request.header(key, value);
}
let response = request
.send()
.await
.map_err(|error| format!("mcp oauth discovery request failed: {error}"))?;
let status = response.status();
let www_authenticate = response
.headers()
.get(reqwest::header::WWW_AUTHENTICATE)
.and_then(|value| value.to_str().ok())
.map(str::to_string);
let body = response
.text()
.await
.map_err(|error| format!("failed to read MCP OAuth discovery response: {error}"))?;
if status.is_success() {
return Err("mcp server did not request oauth authorization".to_string());
}
if status != reqwest::StatusCode::UNAUTHORIZED {
return Err(format!(
"mcp server returned HTTP {} during oauth discovery: {}",
status.as_u16(),
body.chars().take(240).collect::<String>()
));
}
let resource_metadata_url = www_authenticate
.as_deref()
.and_then(|header| www_authenticate_param(header, "resource_metadata"))
.or_else(|| default_mcp_resource_metadata_url(endpoint))
.ok_or_else(|| {
"mcp oauth discovery did not include resource metadata in WWW-Authenticate".to_string()
})?;
let protected_resource = client
.get(&resource_metadata_url)
.header(reqwest::header::ACCEPT, "application/json")
.send()
.await
.map_err(|error| format!("failed to fetch MCP protected resource metadata: {error}"))?;
let protected_status = protected_resource.status();
let protected_body = protected_resource
.text()
.await
.map_err(|error| format!("failed to read MCP protected resource metadata: {error}"))?;
if !protected_status.is_success() {
return Err(format!(
"protected resource metadata request failed with HTTP {}: {}",
protected_status.as_u16(),
protected_body.chars().take(240).collect::<String>()
));
}
let protected_metadata: McpProtectedResourceMetadata = serde_json::from_str(&protected_body)
.map_err(|error| format!("invalid protected resource metadata: {error}"))?;
let authorization_server = protected_metadata
.authorization_servers
.unwrap_or_default()
.into_iter()
.find(|value| !value.trim().is_empty())
.ok_or_else(|| {
"protected resource metadata did not include an authorization server".to_string()
})?;
let metadata_url = authorization_server_metadata_url(&authorization_server);
let auth_server_response = client
.get(&metadata_url)
.header(reqwest::header::ACCEPT, "application/json")
.send()
.await
.map_err(|error| format!("failed to fetch authorization server metadata: {error}"))?;
let auth_server_status = auth_server_response.status();
let auth_server_body = auth_server_response
.text()
.await
.map_err(|error| format!("failed to read authorization server metadata: {error}"))?;
if !auth_server_status.is_success() {
return Err(format!(
"authorization server metadata request failed with HTTP {}: {}",
auth_server_status.as_u16(),
auth_server_body.chars().take(240).collect::<String>()
));
}
let auth_metadata: McpAuthorizationServerMetadata = serde_json::from_str(&auth_server_body)
.map_err(|error| format!("invalid authorization server metadata: {error}"))?;
let registration_endpoint = auth_metadata
.registration_endpoint
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| {
"authorization server does not support dynamic client registration".to_string()
})?
.to_string();
Ok(McpOAuthBootstrap {
authorization_endpoint: auth_metadata.authorization_endpoint,
token_endpoint: auth_metadata.token_endpoint,
registration_endpoint,
resource_metadata_url,
})
}
async fn start_mcp_oauth_session(
state: &AppState,
name: &str,
public_base_url_hint: Option<&str>,
) -> Result<McpAuthChallenge, String> {
let server = state
.mcp
.list()
.await
.get(name)
.cloned()
.ok_or_else(|| format!("MCP server '{name}' not found"))?;
if !mcp_uses_oauth(&server) {
return Err(format!("MCP server '{name}' is not configured for OAuth"));
}
let endpoint = server.transport.trim().to_string();
if !(endpoint.starts_with("http://") || endpoint.starts_with("https://")) {
return Err("MCP OAuth is only supported for HTTP/S transports".to_string());
}
if let Some(existing) = current_mcp_auth_challenge(state, name).await {
return Ok(existing);
}
let bootstrap =
discover_mcp_oauth_bootstrap(&endpoint, &effective_mcp_headers(&server)).await?;
let effective_cfg = state.config.get_effective_value().await;
let public_base_url = public_base_url_hint
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.unwrap_or_else(|| mcp_public_base_url(state, &effective_cfg));
let redirect_uri = mcp_oauth_redirect_uri_for_base(&public_base_url, name);
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(15))
.user_agent(format!("tandem/{}", env!("CARGO_PKG_VERSION")))
.build()
.map_err(|error| format!("failed to build MCP OAuth registration client: {error}"))?;
let registration_response = client
.post(&bootstrap.registration_endpoint)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(reqwest::header::ACCEPT, "application/json")
.json(&json!({
"client_name": "Tandem MCP Client",
"client_uri": "https://tandem.ac",
"redirect_uris": [redirect_uri],
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"token_endpoint_auth_method": "none",
}))
.send()
.await
.map_err(|error| format!("failed to register MCP OAuth client: {error}"))?;
let registration_status = registration_response.status();
let registration_body = registration_response
.text()
.await
.map_err(|error| format!("failed to read MCP OAuth registration response: {error}"))?;
if !registration_status.is_success() {
return Err(format!(
"dynamic client registration failed with HTTP {}: {}",
registration_status.as_u16(),
registration_body.chars().take(240).collect::<String>()
));
}
let registration: McpDynamicClientRegistrationResponse =
serde_json::from_str(®istration_body)
.map_err(|error| format!("invalid dynamic client registration response: {error}"))?;
let (code_verifier, code_challenge) = generate_mcp_pkce_pair();
let state_token = generate_mcp_oauth_state();
let authorization_url = build_mcp_authorization_url(
&bootstrap.authorization_endpoint,
®istration.client_id,
&redirect_uri,
&code_challenge,
&state_token,
);
let challenge = McpAuthChallenge {
challenge_id: format!("mcp-oauth-{}", Uuid::new_v4()),
tool_name: name.to_string(),
authorization_url: authorization_url.clone(),
message: format!(
"Authorization required. Open the link to connect this MCP server. Discovered OAuth metadata from {}.",
bootstrap.resource_metadata_url
),
requested_at_ms: crate::now_ms(),
status: "pending".to_string(),
};
let session_id = Uuid::new_v4().to_string();
let created_at_ms = crate::now_ms();
state.mcp_oauth_sessions.write().await.insert(
session_id.clone(),
McpOAuthSessionRecord {
session_id,
server_name: name.to_string(),
status: "pending".to_string(),
created_at_ms,
expires_at_ms: created_at_ms.saturating_add(MCP_OAUTH_SESSION_TTL_MS),
redirect_uri,
state: state_token,
code_verifier,
authorization_url: authorization_url.clone(),
token_endpoint: bootstrap.token_endpoint,
client_id: registration.client_id,
client_secret: registration.client_secret,
error: None,
},
);
let _ = state
.mcp
.record_server_auth_challenge(name, challenge.clone(), None)
.await;
Ok(challenge)
}
async fn find_pending_mcp_oauth_session(
state: &AppState,
server_name: &str,
) -> Option<McpOAuthSessionRecord> {
state
.mcp_oauth_sessions
.read()
.await
.values()
.find(|session| {
session.server_name == server_name
&& session.status.trim().eq_ignore_ascii_case("pending")
&& session.expires_at_ms > crate::now_ms()
})
.cloned()
}
async fn exchange_mcp_oauth_code(
session: &McpOAuthSessionRecord,
code: &str,
) -> Result<McpTokenExchangeResponse, String> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(15))
.user_agent(format!("tandem/{}", env!("CARGO_PKG_VERSION")))
.build()
.map_err(|error| format!("failed to build MCP OAuth token client: {error}"))?;
let mut params = vec![
("grant_type", "authorization_code".to_string()),
("code", code.to_string()),
("client_id", session.client_id.clone()),
("redirect_uri", session.redirect_uri.clone()),
("code_verifier", session.code_verifier.clone()),
];
if let Some(client_secret) = session
.client_secret
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
params.push(("client_secret", client_secret.to_string()));
}
let response = client
.post(&session.token_endpoint)
.header(reqwest::header::ACCEPT, "application/json")
.form(¶ms)
.send()
.await
.map_err(|error| format!("mcp oauth token exchange failed: {error}"))?;
let status = response.status();
let body = response
.text()
.await
.map_err(|error| format!("failed to read MCP OAuth token response: {error}"))?;
if !status.is_success() {
return Err(format!(
"mcp oauth token exchange failed with HTTP {}: {}",
status.as_u16(),
body.chars().take(240).collect::<String>()
));
}
serde_json::from_str(&body)
.map_err(|error| format!("invalid mcp oauth token response: {error}"))
}
async fn finish_mcp_oauth_callback(
state: AppState,
name: String,
input: McpOAuthCallbackInput,
) -> Result<(), String> {
let state_token = input
.state
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| "missing oauth state".to_string())?;
let session_id = {
let sessions = state.mcp_oauth_sessions.read().await;
sessions.iter().find_map(|(session_id, session)| {
(session.server_name == name && session.state == state_token)
.then(|| session_id.clone())
})
}
.ok_or_else(|| "mcp oauth session not found or expired".to_string())?;
if let Some(error) = input
.error
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
let detail = input
.error_description
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.unwrap_or_else(|| error.to_string());
if let Some(session) = state.mcp_oauth_sessions.write().await.get_mut(&session_id) {
session.status = "error".to_string();
session.error = Some(detail.clone());
}
return Err(detail);
}
let code = input
.code
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.ok_or_else(|| "missing authorization code".to_string())?;
let session = state
.mcp_oauth_sessions
.read()
.await
.get(&session_id)
.cloned()
.ok_or_else(|| "mcp oauth session not found".to_string())?;
if session.expires_at_ms <= crate::now_ms() {
return Err("mcp oauth session expired before callback completed".to_string());
}
let exchanged = exchange_mcp_oauth_code(&session, code).await?;
let access_token = exchanged
.access_token
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.ok_or_else(|| "mcp oauth token exchange returned no access token".to_string())?;
let refresh_token = exchanged
.refresh_token
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.ok_or_else(|| "mcp oauth token exchange returned no refresh token".to_string())?;
let expires_at_ms =
crate::now_ms().saturating_add(exchanged.expires_in.unwrap_or(3600).saturating_mul(1000));
state
.mcp
.set_bearer_token(&name, &access_token)
.await
.map_err(|error| format!("failed to store mcp oauth token: {error}"))?;
state
.mcp
.set_oauth_refresh_config(
&name,
mcp_oauth_provider_id(&name),
session.token_endpoint.clone(),
session.client_id.clone(),
session.client_secret.clone(),
)
.await
.map_err(|error| format!("failed to store mcp oauth refresh metadata: {error}"))?;
let _ = tandem_core::set_provider_oauth_credential(
&mcp_oauth_provider_id(&name),
tandem_core::OAuthProviderCredential {
provider_id: mcp_oauth_provider_id(&name),
access_token: access_token.clone(),
refresh_token,
expires_at_ms,
account_id: None,
email: None,
display_name: None,
managed_by: "tandem".to_string(),
api_key: None,
},
);
match state.mcp.refresh(&name).await {
Ok(_) => {
let count = sync_mcp_tools_for_server(&state, &name).await;
let _ = state.mcp.clear_server_auth_challenge(&name).await;
state.event_bus.publish(EngineEvent::new(
"mcp.server.connected",
json!({
"name": name,
"status": "connected",
"source": "oauth_callback"
}),
));
state.event_bus.publish(EngineEvent::new(
"mcp.tools.updated",
json!({
"name": name,
"count": count,
"source": "oauth_callback"
}),
));
}
Err(error) => {
if let Some(session_mut) = state.mcp_oauth_sessions.write().await.get_mut(&session_id) {
session_mut.status = "error".to_string();
session_mut.error = Some(error.clone());
}
return Err(error);
}
}
if let Some(session_mut) = state.mcp_oauth_sessions.write().await.get_mut(&session_id) {
session_mut.status = "connected".to_string();
session_mut.error = None;
}
Ok(())
}
fn filter_mcp_inventory_snapshot_to_servers(snapshot: Value, allowed_servers: &[String]) -> Value {
let mut snapshot = snapshot;
let allowed_servers = allowed_servers
.iter()
.map(|server| server.trim().to_string())
.filter(|server| !server.is_empty())
.collect::<std::collections::HashSet<_>>();
if allowed_servers.is_empty() {
return snapshot;
}
let allowed_tool_prefixes = allowed_servers
.iter()
.map(|server| format!("mcp.{}.", mcp_namespace_segment(server)))
.collect::<Vec<_>>();
let keep_server = |name: &str| allowed_servers.contains(name);
if let Some(root) = snapshot.as_object_mut() {
if let Some(Value::Array(rows)) = root.get_mut("servers") {
rows.retain(|row| {
row.get("name")
.and_then(Value::as_str)
.is_some_and(keep_server)
});
}
if let Some(Value::Array(rows)) = root.get_mut("connected_server_names") {
rows.retain(|row| row.as_str().is_some_and(keep_server));
}
if let Some(Value::Array(rows)) = root.get_mut("enabled_server_names") {
rows.retain(|row| row.as_str().is_some_and(keep_server));
}
if let Some(Value::Array(rows)) = root.get_mut("remote_tools") {
rows.retain(|row| {
row.get("server_name")
.and_then(Value::as_str)
.is_some_and(keep_server)
});
}
if let Some(Value::Array(rows)) = root.get_mut("registered_tools") {
rows.retain(|row| {
row.as_str().is_some_and(|tool_name| {
tool_name == "mcp_list"
|| allowed_tool_prefixes
.iter()
.any(|prefix| tool_name.starts_with(prefix))
})
});
}
}
snapshot
}
fn filter_mcp_snapshot_by_exact_and_wildcard_tools(
snapshot: Value,
allowed_tools: &[String],
) -> Value {
let filter = parse_mcp_tool_scope_filter(allowed_tools);
filter_mcp_snapshot_by_tool_scope(snapshot, &filter)
}
fn filter_mcp_snapshot_by_namespace_segments(
snapshot: Value,
allowed_segments: &[String],
) -> Value {
let mut snapshot = snapshot;
let segments_set: std::collections::HashSet<&str> =
allowed_segments.iter().map(|s| s.as_str()).collect();
let keep_server = |name: &str| segments_set.contains(mcp_namespace_segment(name).as_str());
let allowed_tool_prefixes: Vec<String> = allowed_segments
.iter()
.map(|seg| format!("mcp.{}.", seg))
.collect();
if let Some(root) = snapshot.as_object_mut() {
if let Some(Value::Array(rows)) = root.get_mut("servers") {
rows.retain(|row| {
row.get("name")
.and_then(Value::as_str)
.is_some_and(keep_server)
});
}
if let Some(Value::Array(rows)) = root.get_mut("connected_server_names") {
rows.retain(|row| row.as_str().is_some_and(keep_server));
}
if let Some(Value::Array(rows)) = root.get_mut("enabled_server_names") {
rows.retain(|row| row.as_str().is_some_and(keep_server));
}
if let Some(Value::Array(rows)) = root.get_mut("remote_tools") {
rows.retain(|row| {
row.get("server_name")
.and_then(Value::as_str)
.is_some_and(keep_server)
});
}
if let Some(Value::Array(rows)) = root.get_mut("registered_tools") {
rows.retain(|row| {
row.as_str().is_some_and(|tool_name| {
tool_name == "mcp_list"
|| allowed_tool_prefixes
.iter()
.any(|prefix| tool_name.starts_with(prefix))
})
});
}
}
snapshot
}
fn session_mcp_tool_filter(session_tools: &[String]) -> McpToolScopeFilter {
parse_mcp_tool_scope_filter(session_tools)
}
async fn scoped_mcp_servers_for_session(state: &AppState, session_id: &str) -> Vec<String> {
state
.automation_v2_session_mcp_servers
.read()
.await
.get(session_id)
.cloned()
.unwrap_or_default()
}
#[derive(Clone)]
pub(crate) struct McpListTool {
state: AppState,
}
impl McpListTool {
pub fn new(state: AppState) -> Self {
Self { state }
}
}
#[async_trait]
impl Tool for McpListTool {
fn schema(&self) -> ToolSchema {
ToolSchema::new(
"mcp_list",
"List the currently configured and connected MCP servers and tools",
json!({
"type": "object",
"properties": {},
"additionalProperties": false,
}),
)
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let mut snapshot = mcp_inventory_snapshot(&self.state).await;
let session_id = args.get("__session_id").and_then(Value::as_str);
let allowed_servers = if let Some(sid) = session_id {
scoped_mcp_servers_for_session(&self.state, sid).await
} else {
Vec::new()
};
let mut session_tool_filter = McpToolScopeFilter::default();
if let Some(sid) = session_id {
if let Some(rt) = self.state.runtime.get() {
let session_tools = rt.engine_loop.get_session_allowed_tools(sid).await;
session_tool_filter = session_mcp_tool_filter(&session_tools);
}
}
if !allowed_servers.is_empty() {
snapshot = filter_mcp_inventory_snapshot_to_servers(snapshot, &allowed_servers);
}
if !session_tool_filter.wildcard_server_segments.is_empty()
|| !session_tool_filter.exact_tool_names.is_empty()
{
snapshot = filter_mcp_snapshot_by_tool_scope(snapshot, &session_tool_filter);
}
let output =
serde_json::to_string_pretty(&snapshot).unwrap_or_else(|_| snapshot.to_string());
Ok(ToolResult {
output,
metadata: snapshot,
})
}
}
pub(crate) fn mcp_namespace_segment(raw: &str) -> String {
let mut out = String::new();
let mut previous_underscore = false;
for ch in raw.trim().chars() {
if ch.is_ascii_alphanumeric() {
out.push(ch.to_ascii_lowercase());
previous_underscore = false;
} else if !previous_underscore {
out.push('_');
previous_underscore = true;
}
}
let cleaned = out.trim_matches('_');
if cleaned.is_empty() {
"server".to_string()
} else {
cleaned.to_string()
}
}
pub(crate) async fn sync_mcp_tools_for_server(state: &AppState, name: &str) -> usize {
let prefix = format!("mcp.{}.", mcp_namespace_segment(name));
state.tools.unregister_by_prefix(&prefix).await;
let tools = state.mcp.server_tools(name).await;
for tool in &tools {
let schema = ToolSchema::new(
tool.namespaced_name.clone(),
if tool.description.trim().is_empty() {
format!("MCP tool {} from {}", tool.tool_name, tool.server_name)
} else {
tool.description.clone()
},
tool.input_schema.clone(),
);
state
.tools
.register_tool(
schema.name.clone(),
Arc::new(McpBridgeTool {
schema,
mcp: state.mcp.clone(),
server_name: tool.server_name.clone(),
tool_name: tool.tool_name.clone(),
}),
)
.await;
}
tools.len()
}
pub(super) async fn connect_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
headers: HeaderMap,
) -> Json<Value> {
let ok = state.mcp.connect(&name).await;
let public_base_url = mcp_public_base_url_from_headers(&headers);
let auth_challenge = if ok {
None
} else {
let current = current_mcp_auth_challenge(&state, &name).await;
if current.is_some() {
current
} else {
let server = state.mcp.list().await.get(&name).cloned();
if server.as_ref().is_some_and(mcp_uses_oauth) {
start_mcp_oauth_session(&state, &name, public_base_url.as_deref())
.await
.ok()
} else {
None
}
}
};
if ok {
let count = sync_mcp_tools_for_server(&state, &name).await;
state.event_bus.publish(EngineEvent::new(
"mcp.server.connected",
json!({
"name": name,
"status": "connected",
}),
));
state.event_bus.publish(EngineEvent::new(
"mcp.tools.updated",
json!({
"name": name,
"count": count,
}),
));
} else {
let prefix = format!("mcp.{}.", mcp_namespace_segment(&name));
let removed = state.tools.unregister_by_prefix(&prefix).await;
state.event_bus.publish(EngineEvent::new(
"mcp.server.disconnected",
json!({
"name": name,
"removedToolCount": removed,
"reason": "connect_failed"
}),
));
}
Json(json!({
"ok": ok,
"pendingAuth": auth_challenge.is_some(),
"lastAuthChallenge": auth_challenge,
"authorizationUrl": auth_challenge.as_ref().map(|challenge| challenge.authorization_url.clone()),
}))
}
pub(super) async fn disconnect_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Json<Value> {
let ok = state.mcp.disconnect(&name).await;
if ok {
let prefix = format!("mcp.{}.", mcp_namespace_segment(&name));
let removed = state.tools.unregister_by_prefix(&prefix).await;
state.event_bus.publish(EngineEvent::new(
"mcp.server.disconnected",
json!({
"name": name,
"removedToolCount": removed,
}),
));
}
Json(json!({"ok": ok}))
}
pub(super) async fn delete_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Json<Value> {
let prefix = format!("mcp.{}.", mcp_namespace_segment(&name));
let removed_tool_count = state.tools.unregister_by_prefix(&prefix).await;
let ok = state.mcp.remove(&name).await;
if ok {
state.event_bus.publish(EngineEvent::new(
"mcp.server.deleted",
json!({
"name": name,
"removedToolCount": removed_tool_count,
}),
));
let _ = crate::audit::append_protected_audit_event(
&state,
"mcp.server.deleted",
&tandem_types::TenantContext::local_implicit(),
None,
json!({
"name": name,
"removedToolCount": removed_tool_count,
}),
)
.await;
}
Json(json!({ "ok": ok, "removedToolCount": removed_tool_count }))
}
pub(super) async fn patch_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
Json(input): Json<McpPatchInput>,
) -> Json<Value> {
let mut changed = false;
let mut should_resync = false;
if input.clear_allowed_tools.unwrap_or(false) || input.allowed_tools.is_some() {
let next_allowed_tools = if input.clear_allowed_tools.unwrap_or(false) {
None
} else {
input.allowed_tools.clone()
};
changed |= state.mcp.set_allowed_tools(&name, next_allowed_tools).await;
should_resync = true;
}
if input.purpose.is_some() || input.grounding_required.is_some() {
changed |= state
.mcp
.set_grounding_metadata(&name, input.purpose.clone(), input.grounding_required)
.await;
}
if let Some(enabled) = input.enabled {
let enabled_changed = state.mcp.set_enabled(&name, enabled).await;
changed |= enabled_changed;
if enabled_changed {
if enabled {
let _ = state.mcp.connect(&name).await;
} else {
let prefix = format!("mcp.{}.", mcp_namespace_segment(&name));
let _ = state.tools.unregister_by_prefix(&prefix).await;
}
state.event_bus.publish(EngineEvent::new(
"mcp.server.updated",
json!({
"name": name,
"enabled": enabled,
}),
));
let _ = crate::audit::append_protected_audit_event(
&state,
"mcp.server.updated",
&tandem_types::TenantContext::local_implicit(),
None,
json!({
"name": name,
"enabled": enabled,
}),
)
.await;
if enabled {
should_resync = true;
}
}
}
if should_resync {
let server = state.mcp.list().await.get(&name).cloned();
if server
.as_ref()
.is_some_and(|server| server.enabled && server.connected)
{
let count = sync_mcp_tools_for_server(&state, &name).await;
state.event_bus.publish(EngineEvent::new(
"mcp.tools.updated",
json!({
"name": name,
"count": count,
}),
));
}
}
Json(json!({"ok": changed}))
}
pub(super) async fn refresh_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
headers: HeaderMap,
) -> Json<Value> {
let result = state.mcp.refresh(&name).await;
let public_base_url = mcp_public_base_url_from_headers(&headers);
match result {
Ok(tools) => {
let count = sync_mcp_tools_for_server(&state, &name).await;
state.event_bus.publish(EngineEvent::new(
"mcp.tools.updated",
json!({
"name": name,
"count": count,
}),
));
Json(json!({
"ok": true,
"count": tools.len(),
}))
}
Err(error) => {
let mut auth_challenge = current_mcp_auth_challenge(&state, &name).await;
if auth_challenge.is_none() {
let server = state.mcp.list().await.get(&name).cloned();
if server.as_ref().is_some_and(mcp_uses_oauth) {
auth_challenge =
start_mcp_oauth_session(&state, &name, public_base_url.as_deref())
.await
.ok();
}
}
let prefix = format!("mcp.{}.", mcp_namespace_segment(&name));
let removed = state.tools.unregister_by_prefix(&prefix).await;
state.event_bus.publish(EngineEvent::new(
"mcp.server.disconnected",
json!({
"name": name,
"removedToolCount": removed,
"reason": "refresh_failed"
}),
));
Json(json!({
"ok": false,
"error": error,
"pendingAuth": auth_challenge.is_some(),
"lastAuthChallenge": auth_challenge,
"authorizationUrl": auth_challenge.as_ref().map(|challenge| challenge.authorization_url.clone()),
"removedToolCount": removed
}))
}
}
}
pub(super) async fn auth_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Json<Value> {
if let Some(auth_challenge) = current_mcp_auth_challenge(&state, &name).await {
return Json(json!({
"ok": true,
"pending": true,
"lastAuthChallenge": auth_challenge,
"authorizationUrl": auth_challenge.authorization_url,
}));
}
Json(json!({
"ok": false,
"pending": false,
"name": name,
"message": "No MCP auth challenge recorded yet.",
}))
}
pub(super) async fn callback_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
headers: HeaderMap,
) -> Json<Value> {
authenticate_mcp(State(state), Path(name), headers).await
}
pub(super) async fn callback_mcp_get(
State(state): State<AppState>,
Path(name): Path<String>,
Query(input): Query<McpOAuthCallbackInput>,
) -> impl IntoResponse {
match finish_mcp_oauth_callback(state, name, input).await {
Ok(()) => mcp_oauth_callback_html(
true,
"Tandem MCP Connected",
"The MCP OAuth sign-in completed successfully. You can close this window.",
)
.into_response(),
Err(error) => {
mcp_oauth_callback_html(false, "Tandem MCP OAuth Failed", &error).into_response()
}
}
}
pub(super) async fn authenticate_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
headers: HeaderMap,
) -> Json<Value> {
let public_base_url = mcp_public_base_url_from_headers(&headers);
if let Some(session) = find_pending_mcp_oauth_session(&state, &name).await {
let last_auth_challenge = current_mcp_auth_challenge(&state, &name).await;
return Json(json!({
"ok": true,
"authenticated": false,
"connected": false,
"pendingAuth": true,
"lastAuthChallenge": last_auth_challenge,
"authorizationUrl": last_auth_challenge.as_ref().map(|challenge| challenge.authorization_url.clone()).unwrap_or(session.authorization_url),
}));
}
let refresh = state.mcp.refresh(&name).await;
let current = state.mcp.list().await.get(&name).cloned();
let last_auth_challenge = current
.as_ref()
.and_then(|server| server.last_auth_challenge.clone());
match refresh {
Ok(tools) => {
let count = sync_mcp_tools_for_server(&state, &name).await;
let _ = state.mcp.clear_server_auth_challenge(&name).await;
Json(json!({
"ok": true,
"authenticated": true,
"connected": true,
"pendingAuth": false,
"lastAuthChallenge": Value::Null,
"authorizationUrl": Value::Null,
"count": count.max(tools.len()),
}))
}
Err(error) => {
let mut auth_challenge = last_auth_challenge;
if auth_challenge.is_none() {
let server = state.mcp.list().await.get(&name).cloned();
if server.as_ref().is_some_and(mcp_uses_oauth) {
auth_challenge =
start_mcp_oauth_session(&state, &name, public_base_url.as_deref())
.await
.ok();
}
}
Json(json!({
"ok": false,
"authenticated": false,
"connected": current.as_ref().map(|server| server.connected).unwrap_or(false),
"pendingAuth": auth_challenge.is_some(),
"lastAuthChallenge": auth_challenge,
"authorizationUrl": auth_challenge.as_ref().map(|challenge| challenge.authorization_url.clone()),
"error": error,
}))
}
}
}
pub(super) async fn delete_auth_mcp(
State(state): State<AppState>,
Path(name): Path<String>,
) -> Json<Value> {
disconnect_mcp(State(state), Path(name)).await
}
pub(super) async fn mcp_tools(State(state): State<AppState>) -> Json<Value> {
Json(json!(state.mcp.list_tools().await))
}
pub(super) async fn mcp_resources(State(state): State<AppState>) -> Json<Value> {
let resources = state
.mcp
.list()
.await
.into_values()
.filter(|server| server.connected)
.map(|server| {
json!({
"server": server.name,
"resources": [
{"uri": format!("mcp://{}/tools", server.name), "name":"tools"},
{"uri": format!("mcp://{}/prompts", server.name), "name":"prompts"}
]
})
})
.collect::<Vec<_>>();
Json(json!(resources))
}