use std::collections::{BTreeMap, HashMap};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use car_engine::{McpSession, McpToolExecutor, McpToolInfo, ToolEntry};
use serde::Serialize;
use tokio::sync::{Mutex, RwLock};
use crate::error::ConnectorError;
use crate::manifest::{self, ConnectorConfig, ConnectorsFile, OAuthConfig, StdioConfig};
use crate::oauth::{self, StoredTokens};
use crate::schema;
use crate::secrets;
use crate::transport::McpHttpSession;
const TOKEN_REFRESH_SKEW_SECS: u64 = 60;
struct PendingAuth {
name: String,
url: String,
redirect_uri: String,
verifier: String,
client_id: String,
client_secret: Option<String>,
token_endpoint: String,
resource: Option<String>,
scopes: Vec<String>,
}
struct ConnectorState {
config: ConnectorConfig,
discovered: Vec<McpToolInfo>,
connected: bool,
last_error: Option<String>,
}
pub struct ConnectorManager {
executor: Arc<McpToolExecutor>,
path: PathBuf,
inner: RwLock<BTreeMap<String, ConnectorState>>,
http: reqwest::Client,
pending_auth: RwLock<HashMap<String, PendingAuth>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ConnectorStatus {
pub slug: String,
pub name: String,
pub url: String,
pub connected: bool,
pub tool_count: usize,
pub enabled_count: usize,
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ToolView {
pub name: String,
pub canonical: String,
pub description: String,
pub enabled: bool,
}
impl ConnectorManager {
pub fn new(executor: Arc<McpToolExecutor>) -> Result<Self, ConnectorError> {
Ok(Self::with_path(executor, manifest::connectors_path()?))
}
pub fn with_path(executor: Arc<McpToolExecutor>, path: PathBuf) -> Self {
Self {
executor,
path,
inner: RwLock::new(BTreeMap::new()),
http: reqwest::Client::new(),
pending_auth: RwLock::new(HashMap::new()),
}
}
pub async fn load_and_connect(&self) -> Result<Vec<ToolEntry>, ConnectorError> {
let file = manifest::load_from(&self.path)?;
let mut entries = Vec::new();
for config in file.connectors {
let slug = config.slug.clone();
self.connect_one(config).await;
entries.extend(self.route_and_collect_enabled(&slug).await);
}
for config in manifest::load_team_connectors() {
let slug = config.slug.clone();
if self.inner.read().await.contains_key(&slug) {
continue;
}
self.connect_one(config).await;
entries.extend(self.route_and_collect_enabled(&slug).await);
}
Ok(entries)
}
pub async fn add(
&self,
name: &str,
url: &str,
secret_headers: Vec<(String, String)>,
) -> Result<ConnectorStatus, ConnectorError> {
let slug = self.unique_slug(&schema::slugify(name)).await;
let mut header_names = Vec::new();
for (h, v) in &secret_headers {
secrets::put_header_secret(&slug, h, v)?;
header_names.push(h.clone());
}
let config = ConnectorConfig {
slug: slug.clone(),
name: name.to_string(),
url: url.to_string(),
stdio: None,
secret_headers: header_names,
oauth: None,
enabled_tools: Vec::new(),
};
self.connect_one(config).await;
self.persist().await?;
self.status(&slug)
.await
.ok_or_else(|| ConnectorError::NotFound(slug))
}
pub async fn add_stdio(
&self,
name: &str,
command: &str,
args: Vec<String>,
env: std::collections::BTreeMap<String, String>,
) -> Result<ConnectorStatus, ConnectorError> {
let slug = self.unique_slug(&schema::slugify(name)).await;
let config = ConnectorConfig {
slug: slug.clone(),
name: name.to_string(),
url: String::new(),
stdio: Some(StdioConfig {
command: command.to_string(),
args,
env,
}),
secret_headers: Vec::new(),
oauth: None,
enabled_tools: Vec::new(),
};
self.connect_one(config).await;
self.persist().await?;
self.status(&slug)
.await
.ok_or_else(|| ConnectorError::NotFound(slug))
}
pub async fn authenticate(
&self,
name: &str,
url: &str,
redirect_uri: &str,
) -> Result<(String, String), ConnectorError> {
let (resource, asm) = oauth::discover(&self.http, url).await?;
let reg = oauth::register_client(&self.http, &asm, redirect_uri, name).await?;
let verifier = car_auth::pkce_verifier();
let state = car_auth::new_state();
let challenge = car_auth::pkce_challenge(&verifier);
let authorize_url = oauth::authorization_url(
&asm,
®.client_id,
redirect_uri,
&state,
&challenge,
resource.as_deref(),
&asm.scopes_supported,
)?;
let pending = PendingAuth {
name: name.to_string(),
url: url.to_string(),
redirect_uri: redirect_uri.to_string(),
verifier,
client_id: reg.client_id,
client_secret: reg.client_secret,
token_endpoint: asm.token_endpoint,
resource,
scopes: asm.scopes_supported,
};
self.pending_auth
.write()
.await
.insert(state.clone(), pending);
Ok((authorize_url, state))
}
pub async fn complete_authentication(
&self,
state: &str,
code: &str,
) -> Result<ConnectorStatus, ConnectorError> {
let pending = self
.pending_auth
.write()
.await
.remove(state)
.ok_or_else(|| ConnectorError::Protocol("unknown or expired auth state".into()))?;
let token = oauth::exchange_code(
&self.http,
&pending.token_endpoint,
&pending.client_id,
pending.client_secret.as_deref(),
&pending.redirect_uri,
code,
&pending.verifier,
pending.resource.as_deref(),
)
.await?;
let slug = self.unique_slug(&schema::slugify(&pending.name)).await;
let stored = StoredTokens::from_response(token, now_unix(), None);
secrets::put_tokens(&slug, &stored)?;
let has_client_secret = pending.client_secret.is_some();
if let Some(secret) = &pending.client_secret {
secrets::put_client_secret(&slug, secret)?;
}
let config = ConnectorConfig {
slug: slug.clone(),
name: pending.name,
url: pending.url,
stdio: None,
secret_headers: Vec::new(),
oauth: Some(OAuthConfig {
token_endpoint: pending.token_endpoint,
client_id: pending.client_id,
has_client_secret,
resource: pending.resource,
scopes: pending.scopes,
}),
enabled_tools: Vec::new(),
};
self.connect_one(config).await;
self.persist().await?;
self.status(&slug)
.await
.ok_or_else(|| ConnectorError::NotFound(slug))
}
pub async fn remove(&self, slug: &str) -> Result<Vec<String>, ConnectorError> {
let state = {
let mut inner = self.inner.write().await;
inner.remove(slug)
};
let state = state.ok_or_else(|| ConnectorError::NotFound(slug.to_string()))?;
self.executor.remove_server(slug).await;
for h in &state.config.secret_headers {
let _ = secrets::delete_header_secret(slug, h);
}
if state.config.oauth.is_some() {
let _ = secrets::delete_tokens(slug);
let _ = secrets::delete_client_secret(slug);
}
let canonicals: Vec<String> = state
.config
.enabled_tools
.iter()
.map(|t| schema::canonical_tool_name(slug, t))
.collect();
self.persist().await?;
Ok(canonicals)
}
pub async fn list(&self) -> Vec<ConnectorStatus> {
let inner = self.inner.read().await;
inner.values().map(state_status).collect()
}
pub async fn tools(&self, slug: &str) -> Result<Vec<ToolView>, ConnectorError> {
let inner = self.inner.read().await;
let state = inner
.get(slug)
.ok_or_else(|| ConnectorError::NotFound(slug.to_string()))?;
Ok(state
.discovered
.iter()
.map(|t| ToolView {
name: t.name.clone(),
canonical: schema::canonical_tool_name(slug, &t.name),
description: t.description.clone().unwrap_or_default(),
enabled: state.config.enabled_tools.contains(&t.name),
})
.collect())
}
pub async fn enable_tools(
&self,
slug: &str,
tools: &[String],
) -> Result<Vec<ToolEntry>, ConnectorError> {
{
let mut inner = self.inner.write().await;
let state = inner
.get_mut(slug)
.ok_or_else(|| ConnectorError::NotFound(slug.to_string()))?;
for t in tools {
if !state.config.enabled_tools.contains(t) {
state.config.enabled_tools.push(t.clone());
}
}
}
self.persist().await?;
Ok(self.route_and_collect_enabled(slug).await)
}
pub async fn disable_tools(
&self,
slug: &str,
tools: &[String],
) -> Result<Vec<String>, ConnectorError> {
let canonicals = {
let mut inner = self.inner.write().await;
let state = inner
.get_mut(slug)
.ok_or_else(|| ConnectorError::NotFound(slug.to_string()))?;
let disabled: Vec<String> = tools
.iter()
.filter(|t| state.config.enabled_tools.contains(t))
.cloned()
.collect();
state.config.enabled_tools.retain(|t| !tools.contains(t));
disabled
.iter()
.map(|t| schema::canonical_tool_name(slug, t))
.collect::<Vec<_>>()
};
for canonical in &canonicals {
self.executor.remove_route(canonical).await;
}
self.persist().await?;
Ok(canonicals)
}
pub async fn refresh(&self, slug: &str) -> Result<Vec<ToolView>, ConnectorError> {
let config = {
let inner = self.inner.read().await;
inner
.get(slug)
.ok_or_else(|| ConnectorError::NotFound(slug.to_string()))?
.config
.clone()
};
self.connect_one(config).await;
let _ = self.route_and_collect_enabled(slug).await;
self.tools(slug).await
}
pub async fn enabled_tool_entries(&self) -> Vec<ToolEntry> {
let inner = self.inner.read().await;
let mut entries = Vec::new();
for state in inner.values() {
for tool in &state.discovered {
if state.config.enabled_tools.contains(&tool.name) {
entries.push(schema::tool_entry(&state.config.slug, tool));
}
}
}
entries
}
async fn connect_one(&self, config: ConnectorConfig) {
let slug = config.slug.clone();
let (discovered, connected, last_error) = self.dial(&config).await;
let mut inner = self.inner.write().await;
inner.insert(
slug,
ConnectorState {
config,
discovered,
connected,
last_error,
},
);
}
async fn dial(&self, config: &ConnectorConfig) -> (Vec<McpToolInfo>, bool, Option<String>) {
if let Some(stdio) = &config.stdio {
return self.dial_stdio(config, stdio).await;
}
let headers = match self.resolve_auth_headers(config).await {
Ok(h) => h,
Err(e) => return (vec![], false, Some(e)),
};
let mut session = match McpHttpSession::new(&config.slug, &config.url, headers) {
Ok(s) => s,
Err(e) => return (vec![], false, Some(e.to_string())),
};
if let Err(e) = session.initialize().await {
return (vec![], false, Some(format!("initialize: {e}")));
}
let tools = session.list_tools().await;
let arc: Arc<Mutex<dyn McpSession>> = Arc::new(Mutex::new(session));
self.executor.add_session(config.slug.clone(), arc).await;
match tools {
Ok(tools) => (tools, true, None),
Err(e) => (vec![], true, Some(format!("tools/list: {e}"))),
}
}
async fn resolve_auth_headers(
&self,
config: &ConnectorConfig,
) -> Result<Vec<(String, String)>, String> {
let mut headers = Vec::new();
for h in &config.secret_headers {
let v = secrets::get_header_secret(&config.slug, h)
.map_err(|e| format!("missing secret '{h}': {e}"))?;
headers.push((h.clone(), v));
}
if let Some(oauth_cfg) = &config.oauth {
let token = self
.valid_access_token(&config.slug, oauth_cfg)
.await
.map_err(|e| format!("oauth: {e}"))?;
headers.push(("Authorization".to_string(), format!("Bearer {token}")));
}
Ok(headers)
}
async fn valid_access_token(
&self,
slug: &str,
cfg: &OAuthConfig,
) -> Result<String, ConnectorError> {
let tokens = secrets::get_tokens(slug)?;
if !tokens.is_expired(now_unix(), TOKEN_REFRESH_SKEW_SECS) {
return Ok(tokens.access_token);
}
let Some(refresh_token) = tokens.refresh_token.clone() else {
return Err(ConnectorError::Protocol(
"access token expired and no refresh token; re-authenticate".into(),
));
};
let client_secret = if cfg.has_client_secret {
secrets::get_client_secret(slug).ok()
} else {
None
};
let resp = oauth::refresh(
&self.http,
&cfg.token_endpoint,
&cfg.client_id,
client_secret.as_deref(),
&refresh_token,
cfg.resource.as_deref(),
)
.await?;
let refreshed = StoredTokens::from_response(resp, now_unix(), Some(refresh_token));
secrets::put_tokens(slug, &refreshed)?;
Ok(refreshed.access_token)
}
async fn dial_stdio(
&self,
config: &ConnectorConfig,
stdio: &StdioConfig,
) -> (Vec<McpToolInfo>, bool, Option<String>) {
let server_cfg = car_engine::McpServerConfig {
name: config.slug.clone(),
command: stdio.command.clone(),
args: stdio.args.clone(),
env: stdio.env.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
cwd: None,
};
let mut server = match car_engine::McpServer::start(server_cfg).await {
Ok(s) => s,
Err(e) => return (vec![], false, Some(e)),
};
let tools = server.list_tools().await;
let session: Arc<Mutex<dyn McpSession>> = Arc::new(Mutex::new(server));
self.executor.add_session(config.slug.clone(), session).await;
match tools {
Ok(tools) => (tools, true, None),
Err(e) => (vec![], true, Some(format!("tools/list: {e}"))),
}
}
async fn route_and_collect_enabled(&self, slug: &str) -> Vec<ToolEntry> {
let inner = self.inner.read().await;
let Some(state) = inner.get(slug) else {
return Vec::new();
};
let mut entries = Vec::new();
for tool in &state.discovered {
if state.config.enabled_tools.contains(&tool.name) {
let canonical = schema::canonical_tool_name(slug, &tool.name);
self.executor.set_route(canonical, slug.to_string()).await;
entries.push(schema::tool_entry(slug, tool));
}
}
entries
}
async fn status(&self, slug: &str) -> Option<ConnectorStatus> {
let inner = self.inner.read().await;
inner.get(slug).map(state_status)
}
async fn unique_slug(&self, base: &str) -> String {
let inner = self.inner.read().await;
if !inner.contains_key(base) {
return base.to_string();
}
let mut n = 2;
loop {
let candidate = format!("{base}_{n}");
if !inner.contains_key(&candidate) {
return candidate;
}
n += 1;
}
}
async fn persist(&self) -> Result<(), ConnectorError> {
let inner = self.inner.read().await;
let file = ConnectorsFile {
connectors: inner.values().map(|s| s.config.clone()).collect(),
};
manifest::save_to(&self.path, &file)
}
}
fn now_unix() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn state_status(state: &ConnectorState) -> ConnectorStatus {
ConnectorStatus {
slug: state.config.slug.clone(),
name: state.config.name.clone(),
url: state.config.url.clone(),
connected: state.connected,
tool_count: state.discovered.len(),
enabled_count: state.config.enabled_tools.len(),
last_error: state.last_error.clone(),
}
}