use rmcp::{
service::{RoleClient, RunningService},
transport::streamable_http_client::StreamableHttpClientTransportConfig,
ServiceExt,
};
use crate::bridge::obs::sanitize_url;
use crate::error::{BlockError, BlockResult};
use crate::mcp_client::handler::AgentBlockClientHandler;
pub(super) async fn connect_http_transport(
name: &str,
url: &str,
opts: &serde_json::Value,
handler: AgentBlockClientHandler,
rpc_timeout: std::time::Duration,
) -> BlockResult<RunningService<RoleClient, AgentBlockClientHandler>> {
let mut config = StreamableHttpClientTransportConfig::with_uri(url);
if let Some(auth) = opts
.get("auth_header")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
{
config = config.auth_header(auth);
}
let transport = rmcp::transport::StreamableHttpClientTransport::from_config(config);
let safe_url = sanitize_url(url);
tokio::time::timeout(rpc_timeout, handler.serve(transport))
.await
.map_err(|_| {
tracing::warn!(server = %name, url = %safe_url, timeout = ?rpc_timeout, "mcp http initialize timed out");
BlockError::Timeout(format!(
"http connect '{name}' to {safe_url} timed out after {rpc_timeout:?}"
))
})?
.map_err(|e| {
tracing::warn!(server = %name, url = %safe_url, error = %e, "mcp http initialize failed");
BlockError::Mcp(format!("http connect '{name}' to {safe_url}: {e}"))
})
}