use super::*;
pub(crate) async fn mcp_connect_stdio_impl(
command: &str,
args: &[String],
env: &BTreeMap<String, String>,
protocol_mode: McpProtocolMode,
protocol_version: String,
) -> Result<VmMcpClientHandle, VmError> {
let mut cmd = tokio::process::Command::new(command);
cmd.args(args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.envs(env);
cmd.kill_on_drop(true);
let mut child = cmd.spawn().map_err(|e| {
VmError::Thrown(VmValue::String(std::sync::Arc::from(format!(
"mcp_connect: failed to spawn '{command}': {e}"
))))
})?;
let stdin = child
.stdin
.take()
.ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
let handle = VmMcpClientHandle {
name: command.to_string(),
inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
StdioMcpClientInner {
child,
stdin,
reader: BufReader::new(stdout),
next_id: 1,
protocol_mode,
protocol_version,
},
)))),
last_roots: Arc::new(Mutex::new(Vec::new())),
initialize_result: Arc::new(Mutex::new(None)),
cache_hints: Arc::new(Mutex::new(BTreeMap::new())),
};
initialize_client(&handle).await?;
Ok(handle)
}
pub(crate) async fn mcp_connect_http_impl(
spec: &McpServerSpec,
) -> Result<VmMcpClientHandle, VmError> {
let client = reqwest::Client::builder()
.build()
.map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
let protocol_mode = resolve_protocol_mode(
spec.protocol_mode.as_deref(),
spec.protocol_version.as_deref(),
)?;
let protocol_version = spec
.protocol_version
.clone()
.unwrap_or_else(|| default_protocol_version(protocol_mode).to_string());
let auth_token = resolve_http_auth_token(spec).await;
let handle = VmMcpClientHandle {
name: spec.name.clone(),
inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
client,
url: spec.url.clone(),
auth_token,
protocol_mode,
protocol_version,
session_id: None,
next_id: 1,
proxy_server_name: spec.proxy_server_name.clone(),
get_stream_task: None,
tool_headers: BTreeMap::new(),
})))),
last_roots: Arc::new(Mutex::new(Vec::new())),
initialize_result: Arc::new(Mutex::new(None)),
cache_hints: Arc::new(Mutex::new(BTreeMap::new())),
};
initialize_client(&handle).await?;
Ok(handle)
}
pub(crate) async fn resolve_http_auth_token(spec: &McpServerSpec) -> Option<String> {
resolve_http_auth_token_with(spec, |server_url| async move {
crate::mcp_oauth::resolve_bearer(&server_url).await
})
.await
}
pub(crate) async fn resolve_http_auth_token_with<R, Fut>(
spec: &McpServerSpec,
resolver: R,
) -> Option<String>
where
R: FnOnce(String) -> Fut,
Fut: Future<Output = Result<Option<String>, String>>,
{
if let Some(token) = spec.auth_token.as_deref().filter(|token| !token.is_empty()) {
return Some(token.to_string());
}
if spec.url.is_empty() {
return None;
}
resolver(spec.url.clone()).await.unwrap_or(None)
}
pub(crate) async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
if handle.protocol_mode().await? == McpProtocolMode::Modern {
let discover = handle
.call_raw("server/discover", serde_json::json!({}))
.await?;
if is_method_not_found_response(&discover) {
handle.switch_to_legacy_protocol().await?;
return initialize_legacy_client(handle).await;
}
let discover_result = parse_jsonrpc_result(discover)?;
*handle.initialize_result.lock().await = Some(discover_result);
return Ok(());
}
initialize_legacy_client(handle).await
}
pub(crate) async fn initialize_legacy_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
let protocol_version = handle.protocol_version().await?;
let initialize_result = handle
.call("initialize", legacy_initialize_params(&protocol_version))
.await?;
*handle.initialize_result.lock().await = Some(initialize_result);
handle
.notify("notifications/initialized", serde_json::json!({}))
.await?;
Ok(())
}
pub async fn connect_mcp_server_from_spec(
spec: &McpServerSpec,
) -> Result<VmMcpClientHandle, VmError> {
let mut handle = match spec.transport {
McpTransport::Stdio => {
let protocol_mode = resolve_protocol_mode(
spec.protocol_mode.as_deref(),
spec.protocol_version.as_deref(),
)?;
let protocol_version = spec
.protocol_version
.clone()
.unwrap_or_else(|| default_protocol_version(protocol_mode).to_string());
mcp_connect_stdio_impl(
&spec.command,
&spec.args,
&spec.env,
protocol_mode,
protocol_version,
)
.await?
}
McpTransport::Http => mcp_connect_http_impl(spec).await?,
};
handle.name = spec.name.clone();
Ok(handle)
}
pub async fn connect_mcp_server_from_json(
value: &serde_json::Value,
) -> Result<VmMcpClientHandle, VmError> {
let spec: McpServerSpec = serde_json::from_value(value.clone())
.map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
connect_mcp_server_from_spec(&spec).await
}