Skip to main content

assay_core/mcp/
parser.rs

1use crate::mcp::types::*;
2use anyhow::{bail, Context, Result};
3use serde::Deserialize;
4use std::collections::HashSet;
5
6/// Parse MCP transcript file contents into normalized McpEvents.
7pub fn parse_mcp_transcript(text: &str, format: McpInputFormat) -> Result<Vec<McpEvent>> {
8    let events = match format {
9        McpInputFormat::JsonRpc => parse_jsonrpc_jsonl(text),
10        McpInputFormat::Inspector => parse_inspector_best_effort(text),
11        McpInputFormat::StreamableHttp => parse_streamable_http_transcript(text),
12        McpInputFormat::HttpSse => parse_http_sse_transcript(text),
13    }?;
14    validate_mcp_events(&events)?;
15    Ok(events)
16}
17
18fn parse_jsonrpc_jsonl(text: &str) -> Result<Vec<McpEvent>> {
19    let mut out = Vec::new();
20
21    for (lineno, line) in text.lines().enumerate() {
22        let line = line.trim();
23        if line.is_empty() {
24            continue;
25        }
26
27        let v: serde_json::Value = serde_json::from_str(line)
28            .with_context(|| format!("invalid JSON on line {}", lineno + 1))?;
29
30        let event = parse_jsonrpc_message(v, (lineno + 1) as u64, None)?;
31        out.push(event);
32    }
33
34    Ok(out)
35}
36
37fn parse_inspector_best_effort(text: &str) -> Result<Vec<McpEvent>> {
38    let v: serde_json::Value = serde_json::from_str(text).context("invalid inspector JSON")?;
39
40    // Handle Inspector export variations:
41    // 1. Array of events
42    // 2. Object with "events" array
43    let arr = v
44        .get("events")
45        .cloned()
46        .or_else(|| v.as_array().cloned().map(serde_json::Value::Array))
47        .and_then(|x| x.as_array().cloned())
48        .unwrap_or_default();
49
50    let mut out = Vec::new();
51    for (idx, item) in arr.into_iter().enumerate() {
52        // Use array index as source_line for sorting stability
53        let event = parse_jsonrpc_message(item, (idx + 1) as u64, None)?;
54        out.push(event);
55    }
56
57    Ok(out)
58}
59
60fn parse_streamable_http_transcript(text: &str) -> Result<Vec<McpEvent>> {
61    parse_transport_transcript(text, "streamable-http", "streamable-http transcript", false)
62}
63
64fn parse_http_sse_transcript(text: &str) -> Result<Vec<McpEvent>> {
65    parse_transport_transcript(text, "http-sse", "http-sse transcript", true)
66}
67
68fn parse_transport_transcript(
69    text: &str,
70    expected_transport: &str,
71    source_label: &str,
72    allow_endpoint_event: bool,
73) -> Result<Vec<McpEvent>> {
74    let transcript: TransportTranscript =
75        serde_json::from_str(text).with_context(|| format!("invalid {}", source_label))?;
76
77    let actual_transport = transcript.transport.as_deref().unwrap_or("missing");
78    if actual_transport != expected_transport {
79        bail!(
80            "{} transport must be {:?}, found {:?}",
81            source_label,
82            expected_transport,
83            actual_transport
84        );
85    }
86
87    let mut out = Vec::new();
88    for (idx, entry) in transcript.entries.into_iter().enumerate() {
89        let source_line = (idx + 1) as u64;
90        let present = usize::from(entry.request.is_some())
91            + usize::from(entry.response.is_some())
92            + usize::from(entry.sse.is_some());
93
94        if present != 1 {
95            bail!(
96                "{} entry {} must contain exactly one of request, response, or sse",
97                source_label,
98                source_line
99            );
100        }
101
102        if let Some(request) = entry.request {
103            out.push(parse_jsonrpc_message(
104                request,
105                source_line,
106                entry.timestamp_ms,
107            )?);
108            continue;
109        }
110
111        if let Some(response) = entry.response {
112            out.push(parse_jsonrpc_message(
113                response,
114                source_line,
115                entry.timestamp_ms,
116            )?);
117            continue;
118        }
119
120        if let Some(sse) = entry.sse {
121            if let Some(jsonrpc) = extract_jsonrpc_from_sse(&sse, allow_endpoint_event) {
122                out.push(parse_jsonrpc_message(
123                    jsonrpc,
124                    source_line,
125                    entry.timestamp_ms,
126                )?);
127            }
128        }
129    }
130
131    Ok(out)
132}
133
134fn parse_jsonrpc_message(
135    v: serde_json::Value,
136    source_line: u64,
137    timestamp_ms_override: Option<u64>,
138) -> Result<McpEvent> {
139    if !v.is_object() {
140        bail!(
141            "MCP event at source line {} must be a JSON object",
142            source_line
143        );
144    }
145
146    let ts_ms = timestamp_ms_override.or_else(|| extract_ts_ms(&v));
147
148    // JSON-RPC ID extraction
149    let id_str = normalize_jsonrpc_id(v.get("id"), source_line)?;
150
151    // Check for JSON-RPC Request (has method)
152    let method = v
153        .get("method")
154        .and_then(|m| m.as_str())
155        .map(|s| s.to_string());
156
157    let payload = if let Some(method) = method {
158        match method.as_str() {
159            "tools/list" => McpPayload::ToolsListRequest { raw: v.clone() },
160            "tools/call" => {
161                let params = v.get("params").cloned().unwrap_or(serde_json::Value::Null);
162                let name = params
163                    .get("name")
164                    .and_then(|x| x.as_str())
165                    .unwrap_or("unknown_tool")
166                    .to_string();
167                let arguments = params
168                    .get("arguments")
169                    .cloned()
170                    .unwrap_or(serde_json::Value::Null);
171                McpPayload::ToolCallRequest {
172                    name,
173                    arguments,
174                    raw: v.clone(),
175                }
176            }
177            // Add other standard MCP methods mapping here if needed
178            _ => McpPayload::Other { raw: v.clone() },
179        }
180    } else {
181        // Response (result or error)
182        if v.get("result").is_some() {
183            if looks_like_tools_list_result(&v) {
184                let tools = parse_tools_list_result(&v)?;
185                McpPayload::ToolsListResponse {
186                    tools,
187                    raw: v.clone(),
188                }
189            } else {
190                McpPayload::ToolCallResponse {
191                    result: v.get("result").cloned().unwrap_or(serde_json::Value::Null),
192                    is_error: false,
193                    raw: v.clone(),
194                }
195            }
196        } else if v.get("error").is_some() {
197            McpPayload::ToolCallResponse {
198                result: v.get("error").cloned().unwrap_or(serde_json::Value::Null),
199                is_error: true,
200                raw: v.clone(),
201            }
202        } else {
203            // Maybe it's not JSON-RPC, or it's a notification/special event
204            // Check for known "Session" markers if any (ad-hoc)
205            McpPayload::Other { raw: v.clone() }
206        }
207    };
208
209    Ok(McpEvent {
210        source_line,
211        timestamp_ms: ts_ms,
212        jsonrpc_id: id_str,
213        payload,
214    })
215}
216
217fn normalize_jsonrpc_id(
218    raw_id: Option<&serde_json::Value>,
219    source_line: u64,
220) -> Result<Option<String>> {
221    match raw_id {
222        None | Some(serde_json::Value::Null) => Ok(None),
223        Some(serde_json::Value::String(id)) => Ok(Some(id.clone())),
224        Some(serde_json::Value::Number(id)) => Ok(Some(id.to_string())),
225        Some(serde_json::Value::Bool(_)) => {
226            bail!(
227                "JSON-RPC id on source line {} must not be a boolean",
228                source_line
229            )
230        }
231        Some(serde_json::Value::Array(_)) => {
232            bail!(
233                "JSON-RPC id on source line {} must not be an array",
234                source_line
235            )
236        }
237        Some(serde_json::Value::Object(_)) => {
238            bail!(
239                "JSON-RPC id on source line {} must not be an object",
240                source_line
241            )
242        }
243    }
244}
245
246fn validate_mcp_events(events: &[McpEvent]) -> Result<()> {
247    let mut seen_tool_call_request_ids = HashSet::new();
248
249    for event in events {
250        if matches!(&event.payload, McpPayload::ToolCallRequest { .. }) {
251            if let Some(id) = &event.jsonrpc_id {
252                if !seen_tool_call_request_ids.insert(id.clone()) {
253                    bail!(
254                        "duplicate tools/call request id {:?} at source line {}",
255                        id,
256                        event.source_line
257                    );
258                }
259            }
260        }
261    }
262
263    Ok(())
264}
265
266fn extract_jsonrpc_from_sse(
267    sse: &TransportSseEnvelope,
268    allow_endpoint_event: bool,
269) -> Option<serde_json::Value> {
270    let event_name = sse.event.as_deref().unwrap_or("message");
271    if event_name == "endpoint" && allow_endpoint_event {
272        return None;
273    }
274
275    if event_name != "message" {
276        return None;
277    }
278
279    extract_jsonrpc_like_value(&sse.data)
280}
281
282fn extract_jsonrpc_like_value(value: &serde_json::Value) -> Option<serde_json::Value> {
283    match value {
284        serde_json::Value::Object(map)
285            if map.contains_key("method")
286                || map.contains_key("result")
287                || map.contains_key("error")
288                || map.contains_key("jsonrpc") =>
289        {
290            Some(value.clone())
291        }
292        serde_json::Value::String(text) => serde_json::from_str::<serde_json::Value>(text)
293            .ok()
294            .and_then(|parsed| extract_jsonrpc_like_value(&parsed)),
295        _ => None,
296    }
297}
298
299fn extract_ts_ms(v: &serde_json::Value) -> Option<u64> {
300    // Try standard keys.
301    if let Some(t) = v.get("timestamp_ms").and_then(|t| t.as_u64()) {
302        return Some(t);
303    }
304    if let Some(t) = v.get("timestamp").and_then(|t| t.as_u64()) {
305        return Some(t); // Assume ms if big integer, otherwise might be seconds?
306                        // For P0, assume ms or handled by caller if not.
307    }
308    None
309}
310
311fn looks_like_tools_list_result(v: &serde_json::Value) -> bool {
312    v.get("result")
313        .and_then(|r| r.get("tools"))
314        .and_then(|t| t.as_array())
315        .is_some()
316}
317
318fn parse_tools_list_result(v: &serde_json::Value) -> Result<Vec<McpToolDef>> {
319    let tools = v
320        .get("result")
321        .and_then(|r| r.get("tools"))
322        .and_then(|t| t.as_array())
323        .cloned()
324        .unwrap_or_default();
325
326    let mut out = Vec::new();
327    for tool in tools {
328        let name = tool
329            .get("name")
330            .and_then(|x| x.as_str())
331            .unwrap_or("unknown")
332            .to_string();
333        let description = tool
334            .get("description")
335            .and_then(|x| x.as_str())
336            .map(|s| s.to_string());
337        // Handle inputSchema (camelCase) or input_schema (snake_case)
338        let input_schema = tool
339            .get("inputSchema")
340            .cloned()
341            .or_else(|| tool.get("input_schema").cloned());
342        out.push(McpToolDef {
343            name,
344            description,
345            input_schema,
346            tool_identity: None,
347        });
348    }
349    Ok(out)
350}
351
352#[derive(Debug, Deserialize)]
353struct TransportTranscript {
354    transport: Option<String>,
355    #[allow(dead_code)]
356    #[serde(default)]
357    transport_context: Option<serde_json::Value>,
358    #[allow(dead_code)]
359    #[serde(default)]
360    headers: Option<serde_json::Value>,
361    #[serde(default)]
362    entries: Vec<TransportTranscriptEntry>,
363}
364
365#[derive(Debug, Deserialize)]
366struct TransportTranscriptEntry {
367    #[serde(default)]
368    timestamp_ms: Option<u64>,
369    #[allow(dead_code)]
370    #[serde(default)]
371    transport_context: Option<serde_json::Value>,
372    #[allow(dead_code)]
373    #[serde(default)]
374    headers: Option<serde_json::Value>,
375    #[serde(default)]
376    request: Option<serde_json::Value>,
377    #[serde(default)]
378    response: Option<serde_json::Value>,
379    #[serde(default)]
380    sse: Option<TransportSseEnvelope>,
381}
382
383#[derive(Debug, Deserialize)]
384struct TransportSseEnvelope {
385    #[serde(default)]
386    event: Option<String>,
387    #[allow(dead_code)]
388    #[serde(default)]
389    id: Option<String>,
390    data: serde_json::Value,
391}