use async_trait::async_trait;
use car_engine::{McpSession, McpToolInfo};
use serde_json::{json, Value};
use crate::error::ConnectorError;
pub const CLIENT_PROTOCOL_VERSION: &str = "2025-06-18";
const SESSION_HEADER: &str = "Mcp-Session-Id";
const PROTOCOL_HEADER: &str = "MCP-Protocol-Version";
pub struct McpHttpSession {
name: String,
endpoint: String,
http: reqwest::Client,
auth_headers: Vec<(String, String)>,
session_id: Option<String>,
negotiated_version: Option<String>,
next_id: u64,
}
impl McpHttpSession {
pub fn new(
name: impl Into<String>,
endpoint: impl Into<String>,
auth_headers: Vec<(String, String)>,
) -> Result<Self, ConnectorError> {
let http = reqwest::Client::builder()
.build()
.map_err(|e| ConnectorError::Http(e.to_string()))?;
Ok(Self {
name: name.into(),
endpoint: endpoint.into(),
http,
auth_headers,
session_id: None,
negotiated_version: None,
next_id: 1,
})
}
fn next_id(&mut self) -> u64 {
let id = self.next_id;
self.next_id += 1;
id
}
pub async fn initialize(&mut self) -> Result<Value, ConnectorError> {
let id = self.next_id();
let result = self
.request(
"initialize",
json!({
"protocolVersion": CLIENT_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {
"name": "car-connectors",
"version": env!("CARGO_PKG_VERSION"),
},
}),
Some(id),
)
.await?;
if let Some(v) = result.get("protocolVersion").and_then(|v| v.as_str()) {
self.negotiated_version = Some(v.to_string());
}
self.request("notifications/initialized", json!({}), None)
.await?;
Ok(result)
}
async fn request(
&mut self,
method: &str,
params: Value,
id: Option<u64>,
) -> Result<Value, ConnectorError> {
let body = match id {
Some(id) => json!({"jsonrpc": "2.0", "id": id, "method": method, "params": params}),
None => json!({"jsonrpc": "2.0", "method": method, "params": params}),
};
let mut req = self
.http
.post(&self.endpoint)
.header("content-type", "application/json")
.header("accept", "application/json, text/event-stream")
.json(&body);
for (k, v) in &self.auth_headers {
req = req.header(k.as_str(), v.as_str());
}
if let Some(sid) = &self.session_id {
req = req.header(SESSION_HEADER, sid);
}
if let Some(ver) = &self.negotiated_version {
req = req.header(PROTOCOL_HEADER, ver);
}
let resp = req
.send()
.await
.map_err(|e| ConnectorError::Http(e.to_string()))?;
if let Some(sid) = resp
.headers()
.get(SESSION_HEADER)
.and_then(|v| v.to_str().ok())
{
self.session_id = Some(sid.to_string());
}
let status = resp.status();
let ctype = resp
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
let text = resp
.text()
.await
.map_err(|e| ConnectorError::Http(e.to_string()))?;
if !status.is_success() {
return Err(ConnectorError::Http(format!("HTTP {status}: {text}")));
}
let Some(want_id) = id else {
return Ok(Value::Null);
};
let envelope = if ctype.contains("text/event-stream") {
extract_sse_response(&text, want_id)?
} else if text.trim().is_empty() {
return Err(ConnectorError::Protocol("empty response body".into()));
} else {
serde_json::from_str::<Value>(&text)
.map_err(|e| ConnectorError::Protocol(format!("parse json response: {e}")))?
};
parse_envelope(envelope)
}
}
#[async_trait]
impl McpSession for McpHttpSession {
async fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
let id = self.next_id();
let result = self
.request("tools/list", json!({}), Some(id))
.await
.map_err(|e| e.to_string())?;
let tools = result
.get("tools")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
tools
.into_iter()
.map(|t| serde_json::from_value(t).map_err(|e| format!("invalid tool definition: {e}")))
.collect()
}
async fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, String> {
let id = self.next_id();
let result = self
.request(
"tools/call",
json!({"name": name, "arguments": arguments}),
Some(id),
)
.await
.map_err(|e| e.to_string())?;
Ok(extract_text_content(result))
}
fn name(&self) -> &str {
&self.name
}
}
fn extract_text_content(result: Value) -> Value {
if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
let texts: Vec<&str> = content
.iter()
.filter_map(|block| {
if block.get("type").and_then(|t| t.as_str()) == Some("text") {
block.get("text").and_then(|t| t.as_str())
} else {
None
}
})
.collect();
if !texts.is_empty() {
return Value::String(texts.join("\n"));
}
}
result
}
fn parse_envelope(v: Value) -> Result<Value, ConnectorError> {
if let Some(err) = v.get("error") {
let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
let message = err
.get("message")
.and_then(|m| m.as_str())
.unwrap_or("unknown error")
.to_string();
return Err(ConnectorError::Rpc { code, message });
}
v.get("result")
.cloned()
.ok_or_else(|| ConnectorError::Protocol("response missing `result`".into()))
}
fn flush_event(data: &mut String, envelopes: &mut Vec<Value>) {
if !data.is_empty() {
if let Ok(v) = serde_json::from_str::<Value>(data) {
envelopes.push(v);
}
data.clear();
}
}
fn extract_sse_response(body: &str, want_id: u64) -> Result<Value, ConnectorError> {
let mut data = String::new();
let mut envelopes: Vec<Value> = Vec::new();
for line in body.lines() {
if line.is_empty() {
flush_event(&mut data, &mut envelopes);
} else if let Some(rest) = line.strip_prefix("data:") {
if !data.is_empty() {
data.push('\n');
}
data.push_str(rest.strip_prefix(' ').unwrap_or(rest));
}
}
flush_event(&mut data, &mut envelopes);
if let Some(matching) = envelopes
.iter()
.find(|e| e.get("id").and_then(|v| v.as_u64()) == Some(want_id))
{
return Ok(matching.clone());
}
envelopes
.into_iter()
.next_back()
.ok_or_else(|| ConnectorError::Protocol("no JSON-RPC envelope in SSE stream".into()))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_envelope_extracts_result() {
let v = json!({"jsonrpc": "2.0", "id": 1, "result": {"ok": true}});
assert_eq!(parse_envelope(v).unwrap(), json!({"ok": true}));
}
#[test]
fn parse_envelope_surfaces_rpc_error() {
let v = json!({"jsonrpc": "2.0", "id": 1, "error": {"code": -32601, "message": "nope"}});
match parse_envelope(v) {
Err(ConnectorError::Rpc { code, message }) => {
assert_eq!(code, -32601);
assert_eq!(message, "nope");
}
other => panic!("expected Rpc error, got {other:?}"),
}
}
#[test]
fn sse_picks_envelope_with_matching_id_past_notifications() {
let body = "event: message\n\
data: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/progress\",\"params\":{}}\n\
\n\
event: message\n\
data: {\"jsonrpc\":\"2.0\",\"id\":42,\"result\":{\"tools\":[]}}\n\
\n";
let env = extract_sse_response(body, 42).unwrap();
assert_eq!(env["result"]["tools"], json!([]));
}
#[test]
fn sse_joins_multiline_data() {
let body = "data: {\"jsonrpc\":\"2.0\",\"id\":1,\n\
data: \"result\":{\"v\":7}}\n\
\n";
let env = extract_sse_response(body, 1).unwrap();
assert_eq!(env["result"]["v"], json!(7));
}
#[test]
fn sse_empty_stream_errors() {
assert!(extract_sse_response("\n\n", 1).is_err());
}
}