use tokio::time::{interval, Duration};
use super::fingerprint::desired_proxy_fingerprint;
use super::*;
impl McpServerManager {
pub async fn start_server(&self, config: McpServerConfig) -> Result<()> {
let server_id = config.id.clone();
if self.runtimes.contains_key(&server_id) {
return Err(McpError::AlreadyRunning(server_id));
}
info!("Starting MCP server '{}'", server_id);
let runtime_proxy_fingerprint = desired_proxy_fingerprint(self.config.as_ref()).await;
let (client, tools) = self
.bootstrap_server_client(&server_id, &config, "start")
.await?;
let runtime = Arc::new(ServerRuntime {
config: config.clone(),
client: RwLock::new(client),
info: RwLock::new(RuntimeInfo {
status: ServerStatus::Ready,
last_error: None,
connected_at: Some(Utc::now()),
disconnected_at: None,
tool_count: tools.len(),
restart_count: 0,
last_ping_at: Some(Utc::now()),
}),
tools: RwLock::new(tools.clone()),
shutdown: AtomicBool::new(false),
reconnecting: AtomicBool::new(false),
qos: McpServerQos::new(McpQosConfig::default()),
proxy_fingerprint: runtime_proxy_fingerprint,
});
let aliases = self.index.register_server_tools(
&server_id,
&tools,
&config.allowed_tools,
&config.denied_tools,
);
info!(
"Registered {} MCP tools for server '{}'",
aliases.len(),
server_id
);
self.runtimes.insert(server_id.clone(), runtime.clone());
if let Some(ref tx) = self.event_tx {
let _ = tx
.send(McpEvent::ServerStatusChanged {
server_id: server_id.clone(),
status: ServerStatus::Ready,
error: None,
})
.await;
let tool_names: Vec<String> = aliases.into_iter().map(|a| a.alias).collect();
let _ = tx
.send(McpEvent::ToolsChanged {
server_id,
tools: tool_names,
})
.await;
}
self.start_health_check(runtime, config.healthcheck_interval_ms);
Ok(())
}
pub async fn stop_server(&self, server_id: &str) -> Result<()> {
let (_, runtime) = self
.runtimes
.remove(server_id)
.ok_or_else(|| McpError::NotRunning(server_id.to_string()))?;
info!("Stopping MCP server '{}'", server_id);
runtime.shutdown.store(true, Ordering::SeqCst);
let mut client = runtime.client.write().await;
if let Err(e) = client.disconnect().await {
warn!("Error disconnecting MCP server '{}': {}", server_id, e);
}
let mut info = runtime.info.write().await;
info.status = ServerStatus::Stopped;
info.disconnected_at = Some(Utc::now());
self.index.remove_server_tools(server_id);
if let Some(ref tx) = self.event_tx {
let _ = tx
.send(McpEvent::ServerStatusChanged {
server_id: server_id.to_string(),
status: ServerStatus::Stopped,
error: None,
})
.await;
}
info!("MCP server '{}' stopped", server_id);
Ok(())
}
pub async fn call_tool(
&self,
server_id: &str,
tool_name: &str,
args: serde_json::Value,
) -> Result<crate::mcp::types::McpCallResult> {
let runtime = self
.runtimes
.get(server_id)
.ok_or_else(|| McpError::ServerNotFound(server_id.to_string()))?;
runtime.qos.check_circuit(server_id, tool_name).await?;
let _permit = runtime.qos.acquire_permit().await?;
let client = runtime.client.read().await;
let timeout = runtime.config.request_timeout_ms;
let result = client.call_tool(tool_name, args, timeout).await;
drop(client);
let result = match result {
Ok(result) => {
runtime.qos.record_success().await;
result
}
Err(error) => {
runtime
.qos
.record_failure(server_id, tool_name, &error)
.await;
return Err(error);
}
};
if let Some(ref tx) = self.event_tx {
let _ = tx
.send(McpEvent::ToolExecuted {
server_id: server_id.to_string(),
tool_name: tool_name.to_string(),
success: !result.is_error,
})
.await;
}
Ok(result)
}
pub fn get_tool_info(&self, server_id: &str, tool_name: &str) -> Option<McpTool> {
self.runtimes.get(server_id).and_then(|runtime| {
let tools = runtime.tools.try_read().ok()?;
tools.iter().find(|t| t.name == tool_name).cloned()
})
}
pub async fn refresh_tools(&self, server_id: &str) -> Result<()> {
let runtime = self
.runtimes
.get(server_id)
.ok_or_else(|| McpError::ServerNotFound(server_id.to_string()))?;
info!("Refreshing tools for MCP server '{}'", server_id);
let client = runtime.client.read().await;
let new_tools = client.list_tools(runtime.config.request_timeout_ms).await?;
drop(client);
let mut tools = runtime.tools.write().await;
*tools = new_tools.clone();
drop(tools);
let mut info = runtime.info.write().await;
info.tool_count = new_tools.len();
self.index.remove_server_tools(server_id);
let aliases = self.index.register_server_tools(
server_id,
&new_tools,
&runtime.config.allowed_tools,
&runtime.config.denied_tools,
);
info!(
"Refreshed {} tools for MCP server '{}'",
aliases.len(),
server_id
);
if let Some(ref tx) = self.event_tx {
let tool_names: Vec<String> = aliases.into_iter().map(|a| a.alias).collect();
let _ = tx
.send(McpEvent::ToolsChanged {
server_id: server_id.to_string(),
tools: tool_names,
})
.await;
}
Ok(())
}
fn start_health_check(&self, runtime: Arc<ServerRuntime>, interval_ms: u64) {
let server_id = runtime.config.id.clone();
let manager = Arc::new(self.clone());
tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(interval_ms));
loop {
interval.tick().await;
if runtime.shutdown.load(Ordering::SeqCst) {
break;
}
if runtime.reconnecting.load(Ordering::SeqCst) {
continue;
}
let client = runtime.client.read().await;
match client.ping(runtime.config.request_timeout_ms).await {
Ok(_) => {
let mut info = runtime.info.write().await;
info.last_ping_at = Some(Utc::now());
if info.status == ServerStatus::Degraded {
info.status = ServerStatus::Ready;
if let Some(ref tx) = manager.event_tx {
let _ = tx
.send(McpEvent::ServerStatusChanged {
server_id: server_id.clone(),
status: ServerStatus::Ready,
error: None,
})
.await;
}
}
}
Err(e) => {
warn!("Health check failed for MCP server '{}': {}", server_id, e);
drop(client);
{
let mut info = runtime.info.write().await;
info.status = ServerStatus::Degraded;
info.last_error = Some(e.to_string());
}
if let Some(ref tx) = manager.event_tx {
let _ = tx
.send(McpEvent::ServerStatusChanged {
server_id: server_id.clone(),
status: ServerStatus::Degraded,
error: Some(e.to_string()),
})
.await;
}
if runtime.config.reconnect.enabled {
if let Err(reconnect_err) =
manager.attempt_reconnection(runtime.clone()).await
{
error!(
"Reconnection failed for MCP server '{}': {}",
server_id, reconnect_err
);
}
}
}
}
}
});
}
}