use tokio::time::Duration;
use super::*;
impl McpServerManager {
pub(super) async fn attempt_reconnection(&self, runtime: Arc<ServerRuntime>) -> Result<()> {
let server_id = runtime.config.id.clone();
if runtime
.reconnecting
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
info!(
"Reconnection already in progress for MCP server '{}'",
server_id
);
return Ok(());
}
let reconnect_config = &runtime.config.reconnect;
let mut current_backoff = reconnect_config.initial_backoff_ms;
let mut attempt = 0u32;
info!(
"Starting reconnection attempts for MCP server '{}' (max_attempts: {})",
server_id,
if reconnect_config.max_attempts == 0 {
"unlimited".to_string()
} else {
reconnect_config.max_attempts.to_string()
}
);
loop {
if runtime.shutdown.load(Ordering::SeqCst) {
info!(
"Reconnection cancelled due to shutdown for MCP server '{}'",
server_id
);
runtime.reconnecting.store(false, Ordering::SeqCst);
return Ok(());
}
if reconnect_config.max_attempts > 0 && attempt >= reconnect_config.max_attempts {
error!(
"Max reconnection attempts ({}) reached for MCP server '{}'",
reconnect_config.max_attempts, server_id
);
let mut info = runtime.info.write().await;
info.status = ServerStatus::Error;
info.last_error = Some("Max reconnection attempts reached".to_string());
info.disconnected_at = Some(Utc::now());
if let Some(ref tx) = self.event_tx {
let _ = tx
.send(McpEvent::ServerStatusChanged {
server_id: server_id.clone(),
status: ServerStatus::Error,
error: Some("Max reconnection attempts reached".to_string()),
})
.await;
}
runtime.reconnecting.store(false, Ordering::SeqCst);
return Err(McpError::Connection(format!(
"Max reconnection attempts reached for server '{}'",
server_id
)));
}
attempt += 1;
info!(
"Reconnection attempt {} for MCP server '{}' (backoff: {}ms)",
attempt, server_id, current_backoff
);
tokio::time::sleep(Duration::from_millis(current_backoff)).await;
match self.reconnect_server(runtime.clone()).await {
Ok(_) => {
info!(
"Successfully reconnected MCP server '{}' after {} attempt(s)",
server_id, attempt
);
let mut info = runtime.info.write().await;
info.status = ServerStatus::Ready;
info.last_error = None;
info.restart_count += 1;
info.disconnected_at = None;
if let Some(ref tx) = self.event_tx {
let _ = tx
.send(McpEvent::ServerStatusChanged {
server_id: server_id.clone(),
status: ServerStatus::Ready,
error: None,
})
.await;
}
runtime.reconnecting.store(false, Ordering::SeqCst);
return Ok(());
}
Err(e) => {
warn!(
"Reconnection attempt {} failed for MCP server '{}': {}",
attempt, server_id, e
);
let mut info = runtime.info.write().await;
info.last_error = Some(e.to_string());
if reconnect_config.max_backoff_ms > current_backoff {
current_backoff =
std::cmp::min(current_backoff * 2, reconnect_config.max_backoff_ms);
}
}
}
}
}
async fn reconnect_server(&self, runtime: Arc<ServerRuntime>) -> Result<()> {
let server_id = runtime.config.id.clone();
info!("Attempting to reconnect MCP server '{}'", server_id);
{
let mut client = runtime.client.write().await;
if client.is_connected().await {
let _ = client.disconnect().await;
}
}
let (client, tools) = self
.bootstrap_server_client(&server_id, &runtime.config, "reconnect")
.await?;
{
let mut client_lock = runtime.client.write().await;
*client_lock = client;
}
{
let mut tools_lock = runtime.tools.write().await;
*tools_lock = tools.clone();
}
self.index.remove_server_tools(&server_id);
let aliases = self.index.register_server_tools(
&server_id,
&tools,
&runtime.config.allowed_tools,
&runtime.config.denied_tools,
);
info!(
"Re-registered {} MCP tools for server '{}'",
aliases.len(),
server_id
);
if let Some(ref tx) = self.event_tx {
let tool_names: Vec<String> = aliases.into_iter().map(|a| a.alias).collect();
let _ = tx
.send(McpEvent::ToolsChanged {
server_id,
tools: tool_names,
})
.await;
}
Ok(())
}
pub(super) async fn bootstrap_server_client(
&self,
server_id: &str,
config: &McpServerConfig,
phase: &'static str,
) -> Result<(McpProtocolClient, Vec<McpTool>)> {
let transport = self.build_transport(&config.transport).await?;
let mut client = McpProtocolClient::new(transport);
client.connect().await.map_err(|e| {
error!(
"Failed to connect MCP server '{}' during {}: {}",
server_id, phase, e
);
e
})?;
let init_result = client
.initialize(config.request_timeout_ms)
.await
.map_err(|e| {
error!(
"Failed to initialize MCP server '{}' during {}: {}",
server_id, phase, e
);
e
})?;
info!(
"MCP server '{}' initialized during {}: {} v{}",
server_id, phase, init_result.server_info.name, init_result.server_info.version
);
let tools = client.list_tools(config.request_timeout_ms).await?;
info!(
"MCP server '{}' has {} tools during {}",
server_id,
tools.len(),
phase
);
Ok((client, tools))
}
async fn build_transport(&self, config: &TransportConfig) -> Result<Box<dyn McpTransport>> {
match config {
TransportConfig::Stdio(stdio_config) => {
Ok(Box::new(StdioTransport::new(stdio_config.clone())))
}
TransportConfig::Sse(sse_config) => {
if let Some(cfg_handle) = self.config.as_ref() {
let cfg = cfg_handle.read().await.clone();
let client = bamboo_infrastructure::llm::http_client::build_http_client(&cfg)
.map_err(|e| {
McpError::InvalidConfig(format!(
"Failed to build HTTP client for MCP SSE transport: {e}"
))
})?;
Ok(Box::new(SseTransport::new_with_client(
sse_config.clone(),
client,
)))
} else {
Ok(Box::new(SseTransport::new(sse_config.clone())))
}
}
TransportConfig::StreamableHttp(sh_config) => {
if let Some(cfg_handle) = self.config.as_ref() {
let cfg = cfg_handle.read().await.clone();
let client = bamboo_infrastructure::llm::http_client::build_http_client(&cfg)
.map_err(|e| {
McpError::InvalidConfig(format!(
"Failed to build HTTP client for MCP StreamableHTTP transport: {e}"
))
})?;
Ok(Box::new(StreamableHttpTransport::new_with_client(
sh_config.clone(),
client,
)))
} else {
Ok(Box::new(StreamableHttpTransport::new(sh_config.clone())))
}
}
}
}
}