use super::*;
pub(crate) async fn stdio_call_raw(
inner: &mut StdioMcpClientInner,
server_name: &str,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, VmError> {
for _ in 0..2 {
let id = inner.next_id;
inner.next_id += 1;
let request = serde_json::json!({
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": request_params_for_protocol(
inner.protocol_mode,
&inner.protocol_version,
params.clone(),
),
});
write_stdio_json(&mut inner.stdin, &request).await?;
let msg = read_stdio_response(inner, server_name, method, id).await?;
if maybe_retry_unsupported_protocol(inner.protocol_mode, &mut inner.protocol_version, &msg)
{
continue;
}
return Ok(msg);
}
Err(VmError::Runtime(
"MCP request failed after protocol-version retry".into(),
))
}
pub(crate) async fn write_stdio_json(
stdin: &mut ChildStdin,
message: &serde_json::Value,
) -> Result<(), VmError> {
let line = serde_json::to_string(message)
.map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
stdin
.write_all(line.as_bytes())
.await
.map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
stdin
.write_all(b"\n")
.await
.map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
stdin
.flush()
.await
.map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))
}
pub(crate) async fn read_stdio_response(
inner: &mut StdioMcpClientInner,
server_name: &str,
method: &str,
id: u64,
) -> Result<serde_json::Value, VmError> {
let mut line_buf = String::new();
loop {
line_buf.clear();
let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
.await
.map_err(|_| {
VmError::Runtime(format!(
"MCP: server did not respond to '{method}' within {}s",
MCP_TIMEOUT.as_secs()
))
})?
.map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
if bytes_read == 0 {
return Err(VmError::Runtime("MCP: server closed connection".into()));
}
let trimmed = line_buf.trim();
if trimmed.is_empty() {
continue;
}
let msg: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => continue,
};
if msg.get("id").is_none() {
let _ = handle_inbound_client_request(server_name, &msg).await;
continue;
}
if msg["id"].as_u64() == Some(id)
&& (msg.get("result").is_some() || msg.get("error").is_some())
{
return Ok(msg);
}
let response = match handle_inbound_client_request(server_name, &msg).await {
Some(response) => response,
None => continue,
};
write_stdio_json(&mut inner.stdin, &response).await?;
}
}
pub(crate) async fn handle_inbound_client_request(
server_name: &str,
msg: &serde_json::Value,
) -> Option<serde_json::Value> {
let method = msg.get("method").and_then(|value| value.as_str())?;
if method == "notifications/progress" {
relay_progress_notification(server_name, msg);
return None;
}
if method == "notifications/message" {
relay_log_notification(server_name, msg);
return None;
}
if method == "notifications/resources/updated"
|| method == "notifications/resources/list_changed"
|| method == "notifications/tools/list_changed"
|| method == "notifications/prompts/list_changed"
{
relay_resource_notification(server_name, method, msg);
return None;
}
if method == crate::mcp_elicit::ELICITATION_METHOD {
return Some(crate::mcp_elicit::dispatch_inbound_elicitation(server_name, msg).await);
}
if method == crate::mcp_sampling::SAMPLING_METHOD {
return Some(crate::mcp_sampling::dispatch_inbound_sampling(server_name, msg).await);
}
if method == crate::mcp_protocol::METHOD_ROOTS_LIST {
let id = msg.get("id")?.clone();
return Some(harn_roots_list_response(id));
}
client_request_rejection(msg)
}
pub(crate) async fn stdio_notify(
inner: &mut StdioMcpClientInner,
method: &str,
params: serde_json::Value,
) -> Result<(), VmError> {
let notification = serde_json::json!({
"jsonrpc": "2.0",
"method": method,
"params": request_params_for_protocol(
inner.protocol_mode,
&inner.protocol_version,
params,
),
});
let line = serde_json::to_string(¬ification)
.map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
inner
.stdin
.write_all(line.as_bytes())
.await
.map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
inner
.stdin
.write_all(b"\n")
.await
.map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
inner
.stdin
.flush()
.await
.map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
Ok(())
}
pub(crate) async fn http_call_raw(
inner: &mut HttpMcpClientInner,
server_name: &str,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, VmError> {
let id = inner.next_id;
inner.next_id += 1;
send_http_request(inner, server_name, method, params, Some(id)).await
}
pub(crate) async fn http_notify(
inner: &mut HttpMcpClientInner,
server_name: &str,
method: &str,
params: serde_json::Value,
) -> Result<(), VmError> {
let _ = send_http_request(inner, server_name, method, params, None).await?;
Ok(())
}
pub(crate) async fn send_http_request(
inner: &mut HttpMcpClientInner,
server_name: &str,
method: &str,
params: serde_json::Value,
id: Option<u64>,
) -> Result<serde_json::Value, VmError> {
for attempt in 0..2 {
let response = send_http_request_once(inner, method, params.clone(), id).await?;
let status = response.status().as_u16();
let headers = response.headers().clone();
if let Some(protocol_version) = headers
.get(RC_HEADER_PROTOCOL_VERSION)
.and_then(|v| v.to_str().ok())
{
inner.protocol_version = protocol_version.to_string();
}
if inner.protocol_mode == McpProtocolMode::Legacy {
if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
inner.session_id = Some(session_id.to_string());
}
}
if inner.protocol_mode == McpProtocolMode::Legacy
&& status == 404
&& inner.session_id.is_some()
&& method != "initialize"
&& attempt == 0
{
inner.session_id = None;
inner.abort_get_stream();
reinitialize_http_client(inner).await?;
continue;
}
if status == 401 {
emit_mcp_auth_required_event(server_name, &inner.url, &headers);
return Err(VmError::Thrown(VmValue::String(std::sync::Arc::from(
"MCP authorization required",
))));
}
let body = response
.text()
.await
.map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
if body.trim().is_empty() {
if should_fallback_to_legacy_http_discovery(inner.protocol_mode, method, status) {
return Ok(http_discovery_fallback_response(id));
}
if status >= 400 {
return Err(VmError::Runtime(format!(
"MCP HTTP request returned {status} with an empty response body"
)));
}
if status < 400 {
ensure_http_get_stream(inner, server_name);
}
return Ok(serde_json::Value::Null);
}
let msg = match parse_http_response_body(inner, server_name, &body, status, id).await {
Ok(msg) => msg,
Err(_)
if should_fallback_to_legacy_http_discovery(
inner.protocol_mode,
method,
status,
) =>
{
return Ok(http_discovery_fallback_response(id));
}
Err(err) => return Err(err),
};
if maybe_retry_unsupported_protocol(inner.protocol_mode, &mut inner.protocol_version, &msg)
&& attempt == 0
{
continue;
}
ensure_http_get_stream(inner, server_name);
if status >= 400 && id.is_none() {
return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
}
return Ok(msg);
}
Err(VmError::Runtime("MCP HTTP request failed".into()))
}
pub(crate) async fn send_http_request_once(
inner: &mut HttpMcpClientInner,
method: &str,
params: serde_json::Value,
id: Option<u64>,
) -> Result<reqwest::Response, VmError> {
let request_params =
request_params_for_protocol(inner.protocol_mode, &inner.protocol_version, params);
let mut payload = serde_json::json!({
"jsonrpc": "2.0",
"method": method,
"params": request_params,
});
if let Some(id) = id {
payload["id"] = serde_json::json!(id);
}
let payload = wrap_http_payload(payload, inner.proxy_server_name.as_deref());
let request = inner
.client
.post(&inner.url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.json(&payload);
let request = apply_http_headers(
request,
&inner.auth_token,
inner.protocol_mode,
&inner.protocol_version,
legacy_session_id(inner),
Some(method),
payload.get("params"),
&inner.tool_headers,
);
request
.timeout(MCP_TIMEOUT)
.send()
.await
.map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
}
pub(crate) fn ensure_http_get_stream(inner: &mut HttpMcpClientInner, server_name: &str) {
if inner.protocol_mode == McpProtocolMode::Modern {
return;
}
if server_name.is_empty() {
return;
}
if inner
.get_stream_task
.as_ref()
.is_some_and(|task| !task.is_finished())
{
return;
}
let config = HttpStreamConfig {
client: inner.client.clone(),
url: inner.url.clone(),
auth_token: inner.auth_token.clone(),
protocol_mode: inner.protocol_mode,
protocol_version: inner.protocol_version.clone(),
session_id: inner.session_id.clone(),
proxy_server_name: inner.proxy_server_name.clone(),
server_name: server_name.to_string(),
};
inner.get_stream_task = Some(tokio::task::spawn_local(run_http_get_stream(config)));
}
pub(crate) async fn run_http_get_stream(config: HttpStreamConfig) {
let request = apply_http_headers(
config
.client
.get(&config.url)
.header("Accept", "text/event-stream"),
&config.auth_token,
config.protocol_mode,
&config.protocol_version,
config.session_id.as_deref(),
None,
None,
&BTreeMap::new(),
);
let Ok(mut stream) = EventSource::new(request) else {
return;
};
while let Some(event) = stream.next().await {
match event {
Ok(SseEvent::Open) => {}
Ok(SseEvent::Message(message)) => {
if message.data.trim().is_empty() {
continue;
}
let Ok(msg) = serde_json::from_str::<serde_json::Value>(&message.data) else {
tracing::debug!("MCP HTTP GET stream received non-JSON event");
continue;
};
if let Some(response) =
handle_inbound_client_request(&config.server_name, &msg).await
{
let _ = post_http_jsonrpc_payload(&config, response).await;
}
}
Err(error) => {
tracing::debug!("MCP HTTP GET stream ended with error: {error}");
break;
}
}
}
stream.close();
}
pub(crate) async fn post_http_jsonrpc_payload(
config: &HttpStreamConfig,
payload: serde_json::Value,
) -> Result<(), VmError> {
let payload = wrap_http_payload(payload, config.proxy_server_name.as_deref());
let request = config
.client
.post(&config.url)
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.json(&payload)
.timeout(MCP_TIMEOUT);
let request = apply_http_headers(
request,
&config.auth_token,
config.protocol_mode,
&config.protocol_version,
config.session_id.as_deref(),
None,
None,
&BTreeMap::new(),
);
let response = request
.send()
.await
.map_err(|e| VmError::Runtime(format!("MCP HTTP response POST error: {e}")))?;
if response.status().is_success() {
Ok(())
} else {
Err(VmError::Runtime(format!(
"MCP HTTP response POST returned {}",
response.status()
)))
}
}
pub(crate) fn apply_http_headers(
mut request: reqwest::RequestBuilder,
auth_token: &Option<String>,
protocol_mode: McpProtocolMode,
protocol_version: &str,
session_id: Option<&str>,
method: Option<&str>,
params: Option<&serde_json::Value>,
tool_headers: &BTreeMap<String, Vec<McpToolHeader>>,
) -> reqwest::RequestBuilder {
request = request.header(RC_HEADER_PROTOCOL_VERSION, protocol_version);
if let Some(token) = auth_token {
request = request.header("Authorization", format!("Bearer {token}"));
}
if protocol_mode == McpProtocolMode::Legacy {
if let Some(session_id) = session_id {
request = request.header(MCP_SESSION_HEADER_LEGACY, session_id);
}
}
if protocol_mode == McpProtocolMode::Modern {
if let Some(method) = method {
request = request.header(RC_HEADER_METHOD, method);
if let Some(params) = params {
if let Some(name) = rc_name_header_value(method, params) {
request = request.header(RC_HEADER_NAME, name);
}
}
if method == "tools/call" {
request = apply_mcp_tool_parameter_headers(request, params, tool_headers);
}
}
}
request
}
pub(crate) fn legacy_session_id(inner: &HttpMcpClientInner) -> Option<&str> {
(inner.protocol_mode == McpProtocolMode::Legacy)
.then_some(inner.session_id.as_deref())
.flatten()
}
pub(crate) fn apply_mcp_tool_parameter_headers(
mut request: reqwest::RequestBuilder,
params: Option<&serde_json::Value>,
tool_headers: &BTreeMap<String, Vec<McpToolHeader>>,
) -> reqwest::RequestBuilder {
let Some(params) = params else {
return request;
};
let Some(tool_name) = params.get("name").and_then(|value| value.as_str()) else {
return request;
};
let Some(headers) = tool_headers.get(tool_name) else {
return request;
};
let Some(arguments) = params.get("arguments").and_then(|value| value.as_object()) else {
return request;
};
for header in headers {
let Some(value) = arguments.get(&header.parameter) else {
continue;
};
if value.is_null() {
continue;
}
let Some(encoded) = encode_mcp_header_value(value) else {
continue;
};
request = request.header(header.header_name.as_str(), encoded);
}
request
}
pub(crate) fn wrap_http_payload(
payload: serde_json::Value,
proxy_server_name: Option<&str>,
) -> serde_json::Value {
let Some(proxy_server_name) = proxy_server_name else {
return payload;
};
let mut wrapped = serde_json::Map::new();
wrapped.insert(
"serverName".to_string(),
serde_json::Value::String(proxy_server_name.to_string()),
);
if let Some(object) = payload.as_object() {
for (key, value) in object {
wrapped.insert(key.clone(), value.clone());
}
}
serde_json::Value::Object(wrapped)
}
pub(crate) async fn reinitialize_http_client(
inner: &mut HttpMcpClientInner,
) -> Result<(), VmError> {
let initialize = send_http_request_once(
inner,
"initialize",
legacy_initialize_params(&inner.protocol_version),
Some(0),
)
.await?;
if let Some(protocol_version) = initialize
.headers()
.get(RC_HEADER_PROTOCOL_VERSION)
.and_then(|v| v.to_str().ok())
{
inner.protocol_version = protocol_version.to_string();
}
if inner.protocol_mode == McpProtocolMode::Legacy {
if let Some(session_id) = initialize
.headers()
.get(MCP_SESSION_HEADER_LEGACY)
.and_then(|v| v.to_str().ok())
{
inner.session_id = Some(session_id.to_string());
}
}
let status = initialize.status().as_u16();
let body = initialize
.text()
.await
.map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
let msg = parse_http_response_body(inner, "", &body, status, Some(0)).await?;
if status >= 400 {
return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
}
let _ = parse_jsonrpc_result(msg)?;
let response = send_http_request_once(
inner,
"notifications/initialized",
serde_json::json!({}),
None,
)
.await?;
let status = response.status().as_u16();
if let Some(protocol_version) = response
.headers()
.get(RC_HEADER_PROTOCOL_VERSION)
.and_then(|v| v.to_str().ok())
{
inner.protocol_version = protocol_version.to_string();
}
if inner.protocol_mode == McpProtocolMode::Legacy {
if let Some(session_id) = response
.headers()
.get(MCP_SESSION_HEADER_LEGACY)
.and_then(|v| v.to_str().ok())
{
inner.session_id = Some(session_id.to_string());
}
}
let body = response
.text()
.await
.map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
if body.trim().is_empty() || status < 400 {
return Ok(());
}
let msg = parse_http_response_body(inner, "", &body, status, None).await?;
Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
}
pub(crate) async fn parse_http_response_body(
inner: &HttpMcpClientInner,
server_name: &str,
body: &str,
status: u16,
request_id: Option<u64>,
) -> Result<serde_json::Value, VmError> {
if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
return parse_sse_jsonrpc_body(inner, server_name, body, request_id).await;
}
serde_json::from_str(body).map_err(|e| {
VmError::Runtime(format!(
"MCP HTTP response parse error (status {status}): {e}"
))
})
}
pub(crate) async fn parse_sse_jsonrpc_body(
inner: &HttpMcpClientInner,
server_name: &str,
body: &str,
request_id: Option<u64>,
) -> Result<serde_json::Value, VmError> {
let mut current_data = Vec::new();
let mut messages = Vec::new();
for line in body.lines() {
if line.is_empty() {
if !current_data.is_empty() {
messages.push(current_data.join("\n"));
current_data.clear();
}
continue;
}
if let Some(data) = line.strip_prefix("data:") {
current_data.push(data.trim_start().to_string());
}
}
if !current_data.is_empty() {
messages.push(current_data.join("\n"));
}
let config = HttpStreamConfig {
client: inner.client.clone(),
url: inner.url.clone(),
auth_token: inner.auth_token.clone(),
protocol_mode: inner.protocol_mode,
protocol_version: inner.protocol_version.clone(),
session_id: inner.session_id.clone(),
proxy_server_name: inner.proxy_server_name.clone(),
server_name: server_name.to_string(),
};
let mut fallback = None;
for message in messages {
if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
if request_id.is_some()
&& value["id"].as_u64() == request_id
&& (value.get("result").is_some() || value.get("error").is_some())
{
return Ok(value);
}
if let Some(response) = handle_inbound_client_request(server_name, &value).await {
let _ = post_http_jsonrpc_payload(&config, response).await;
continue;
}
if value.get("result").is_some() || value.get("error").is_some() {
fallback = Some(value);
}
}
}
fallback.ok_or_else(|| {
VmError::Runtime(
"MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
)
})
}