car-connectors 0.25.0

Remote MCP connectors for the Common Agent Runtime — connect to remote MCP servers over HTTP, register their tools, and route calls through CAR's governance layer (validator, policy, eventlog).
//! HTTP-streamable MCP client transport.
//!
//! Implements [`car_engine::McpSession`] over the MCP "streamable HTTP"
//! transport: each JSON-RPC request is a POST to the server's single
//! endpoint; the response is either `application/json` (one envelope)
//! or `text/event-stream` (an SSE stream that carries the response
//! among any notifications). The server may assign an `Mcp-Session-Id`
//! on `initialize`, which we echo on every subsequent request.
//!
//! Phase 1 supports unauthenticated servers and servers that take a
//! static auth header (resolved from the keychain by the connector
//! manager — never stored in plaintext config). OAuth 2.1 is Phase 2.

use async_trait::async_trait;
use car_engine::{McpSession, McpToolInfo};
use serde_json::{json, Value};

use crate::error::ConnectorError;

/// MCP protocol version this client advertises in `initialize`.
pub const CLIENT_PROTOCOL_VERSION: &str = "2025-06-18";

const SESSION_HEADER: &str = "Mcp-Session-Id";
const PROTOCOL_HEADER: &str = "MCP-Protocol-Version";

/// A remote MCP server reached over the HTTP-streamable transport.
pub struct McpHttpSession {
    name: String,
    endpoint: String,
    http: reqwest::Client,
    /// Static auth headers, already resolved from the keychain. Empty
    /// for unauthenticated servers.
    auth_headers: Vec<(String, String)>,
    /// Server-assigned session id, captured from the `Mcp-Session-Id`
    /// response header on the first response that carries it.
    session_id: Option<String>,
    /// Protocol version the server negotiated in its `initialize`
    /// result; echoed on later requests per the spec.
    negotiated_version: Option<String>,
    next_id: u64,
}

impl McpHttpSession {
    /// Construct a session. Does not perform any I/O — call
    /// [`initialize`](Self::initialize) before listing or calling tools.
    pub fn new(
        name: impl Into<String>,
        endpoint: impl Into<String>,
        auth_headers: Vec<(String, String)>,
    ) -> Result<Self, ConnectorError> {
        let http = reqwest::Client::builder()
            .build()
            .map_err(|e| ConnectorError::Http(e.to_string()))?;
        Ok(Self {
            name: name.into(),
            endpoint: endpoint.into(),
            http,
            auth_headers,
            session_id: None,
            negotiated_version: None,
            next_id: 1,
        })
    }

    fn next_id(&mut self) -> u64 {
        let id = self.next_id;
        self.next_id += 1;
        id
    }

    /// Perform the MCP `initialize` handshake: capture the negotiated
    /// protocol version and `Mcp-Session-Id`, then send the
    /// `notifications/initialized` notification. Returns the server's
    /// `initialize` result (serverInfo + capabilities).
    pub async fn initialize(&mut self) -> Result<Value, ConnectorError> {
        let id = self.next_id();
        let result = self
            .request(
                "initialize",
                json!({
                    "protocolVersion": CLIENT_PROTOCOL_VERSION,
                    "capabilities": {},
                    "clientInfo": {
                        "name": "car-connectors",
                        "version": env!("CARGO_PKG_VERSION"),
                    },
                }),
                Some(id),
            )
            .await?;
        if let Some(v) = result.get("protocolVersion").and_then(|v| v.as_str()) {
            self.negotiated_version = Some(v.to_string());
        }
        // Notification: no id, no response body to parse.
        self.request("notifications/initialized", json!({}), None)
            .await?;
        Ok(result)
    }

    /// Send one JSON-RPC message. With `id`, awaits and returns the
    /// `result` value; without an `id` (a notification) returns
    /// `Value::Null` once the POST is accepted.
    async fn request(
        &mut self,
        method: &str,
        params: Value,
        id: Option<u64>,
    ) -> Result<Value, ConnectorError> {
        let body = match id {
            Some(id) => json!({"jsonrpc": "2.0", "id": id, "method": method, "params": params}),
            None => json!({"jsonrpc": "2.0", "method": method, "params": params}),
        };

        let mut req = self
            .http
            .post(&self.endpoint)
            .header("content-type", "application/json")
            .header("accept", "application/json, text/event-stream")
            .json(&body);
        for (k, v) in &self.auth_headers {
            req = req.header(k.as_str(), v.as_str());
        }
        if let Some(sid) = &self.session_id {
            req = req.header(SESSION_HEADER, sid);
        }
        if let Some(ver) = &self.negotiated_version {
            req = req.header(PROTOCOL_HEADER, ver);
        }

        let resp = req
            .send()
            .await
            .map_err(|e| ConnectorError::Http(e.to_string()))?;

        // Capture/refresh the server-assigned session id.
        if let Some(sid) = resp
            .headers()
            .get(SESSION_HEADER)
            .and_then(|v| v.to_str().ok())
        {
            self.session_id = Some(sid.to_string());
        }

        let status = resp.status();
        let ctype = resp
            .headers()
            .get("content-type")
            .and_then(|v| v.to_str().ok())
            .unwrap_or("")
            .to_string();
        let text = resp
            .text()
            .await
            .map_err(|e| ConnectorError::Http(e.to_string()))?;

        if !status.is_success() {
            return Err(ConnectorError::Http(format!("HTTP {status}: {text}")));
        }

        // Notification accepted (typically 202, empty body).
        let Some(want_id) = id else {
            return Ok(Value::Null);
        };

        let envelope = if ctype.contains("text/event-stream") {
            extract_sse_response(&text, want_id)?
        } else if text.trim().is_empty() {
            return Err(ConnectorError::Protocol("empty response body".into()));
        } else {
            serde_json::from_str::<Value>(&text)
                .map_err(|e| ConnectorError::Protocol(format!("parse json response: {e}")))?
        };
        parse_envelope(envelope)
    }
}

#[async_trait]
impl McpSession for McpHttpSession {
    async fn list_tools(&mut self) -> Result<Vec<McpToolInfo>, String> {
        let id = self.next_id();
        let result = self
            .request("tools/list", json!({}), Some(id))
            .await
            .map_err(|e| e.to_string())?;
        let tools = result
            .get("tools")
            .and_then(|v| v.as_array())
            .cloned()
            .unwrap_or_default();
        tools
            .into_iter()
            .map(|t| serde_json::from_value(t).map_err(|e| format!("invalid tool definition: {e}")))
            .collect()
    }

    async fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, String> {
        let id = self.next_id();
        let result = self
            .request(
                "tools/call",
                json!({"name": name, "arguments": arguments}),
                Some(id),
            )
            .await
            .map_err(|e| e.to_string())?;
        Ok(extract_text_content(result))
    }

    fn name(&self) -> &str {
        &self.name
    }
}

/// Flatten an MCP `tools/call` result the same way the stdio path does:
/// when the `content` array carries text blocks, join them into a single
/// string; otherwise hand back the structured result unchanged. Keeping
/// this identical to `car_engine::McpServer::call_tool` means a tool
/// behaves the same whether it is reached over stdio or HTTP.
fn extract_text_content(result: Value) -> Value {
    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
        let texts: Vec<&str> = content
            .iter()
            .filter_map(|block| {
                if block.get("type").and_then(|t| t.as_str()) == Some("text") {
                    block.get("text").and_then(|t| t.as_str())
                } else {
                    None
                }
            })
            .collect();
        if !texts.is_empty() {
            return Value::String(texts.join("\n"));
        }
    }
    result
}

/// Extract `result` from a JSON-RPC envelope, mapping an `error` object
/// to [`ConnectorError::Rpc`].
fn parse_envelope(v: Value) -> Result<Value, ConnectorError> {
    if let Some(err) = v.get("error") {
        let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
        let message = err
            .get("message")
            .and_then(|m| m.as_str())
            .unwrap_or("unknown error")
            .to_string();
        return Err(ConnectorError::Rpc { code, message });
    }
    v.get("result")
        .cloned()
        .ok_or_else(|| ConnectorError::Protocol("response missing `result`".into()))
}

/// Push the accumulated SSE `data:` payload (if any) as a parsed
/// envelope and reset the buffer.
fn flush_event(data: &mut String, envelopes: &mut Vec<Value>) {
    if !data.is_empty() {
        if let Ok(v) = serde_json::from_str::<Value>(data) {
            envelopes.push(v);
        }
        data.clear();
    }
}

/// Parse a `text/event-stream` body and return the JSON-RPC envelope
/// whose `id` matches `want_id`. Streamable-HTTP servers may interleave
/// notifications or server-initiated requests before the response, so
/// we scan every `data:` payload and pick the matching id (falling back
/// to the last well-formed envelope if no id matches — e.g. a server
/// that replies with a string id).
fn extract_sse_response(body: &str, want_id: u64) -> Result<Value, ConnectorError> {
    let mut data = String::new();
    let mut envelopes: Vec<Value> = Vec::new();
    for line in body.lines() {
        if line.is_empty() {
            flush_event(&mut data, &mut envelopes);
        } else if let Some(rest) = line.strip_prefix("data:") {
            // SSE spec: multiple data lines in one event join with '\n'.
            if !data.is_empty() {
                data.push('\n');
            }
            data.push_str(rest.strip_prefix(' ').unwrap_or(rest));
        }
        // Other SSE fields (event:, id:, retry:, comment lines) are ignored.
    }
    flush_event(&mut data, &mut envelopes);

    if let Some(matching) = envelopes
        .iter()
        .find(|e| e.get("id").and_then(|v| v.as_u64()) == Some(want_id))
    {
        return Ok(matching.clone());
    }
    envelopes
        .into_iter()
        .next_back()
        .ok_or_else(|| ConnectorError::Protocol("no JSON-RPC envelope in SSE stream".into()))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn parse_envelope_extracts_result() {
        let v = json!({"jsonrpc": "2.0", "id": 1, "result": {"ok": true}});
        assert_eq!(parse_envelope(v).unwrap(), json!({"ok": true}));
    }

    #[test]
    fn parse_envelope_surfaces_rpc_error() {
        let v = json!({"jsonrpc": "2.0", "id": 1, "error": {"code": -32601, "message": "nope"}});
        match parse_envelope(v) {
            Err(ConnectorError::Rpc { code, message }) => {
                assert_eq!(code, -32601);
                assert_eq!(message, "nope");
            }
            other => panic!("expected Rpc error, got {other:?}"),
        }
    }

    #[test]
    fn sse_picks_envelope_with_matching_id_past_notifications() {
        let body = "event: message\n\
            data: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/progress\",\"params\":{}}\n\
            \n\
            event: message\n\
            data: {\"jsonrpc\":\"2.0\",\"id\":42,\"result\":{\"tools\":[]}}\n\
            \n";
        let env = extract_sse_response(body, 42).unwrap();
        assert_eq!(env["result"]["tools"], json!([]));
    }

    #[test]
    fn sse_joins_multiline_data() {
        let body = "data: {\"jsonrpc\":\"2.0\",\"id\":1,\n\
            data: \"result\":{\"v\":7}}\n\
            \n";
        let env = extract_sse_response(body, 1).unwrap();
        assert_eq!(env["result"]["v"], json!(7));
    }

    #[test]
    fn sse_empty_stream_errors() {
        assert!(extract_sse_response("\n\n", 1).is_err());
    }
}