Skip to main content

assay_core/mcp/
parser.rs

1use crate::mcp::types::*;
2use anyhow::{bail, Context, Result};
3use serde::Deserialize;
4use std::collections::HashSet;
5
6/// Parse MCP transcript file contents into normalized McpEvents.
7pub fn parse_mcp_transcript(text: &str, format: McpInputFormat) -> Result<Vec<McpEvent>> {
8    let events = match format {
9        McpInputFormat::JsonRpc => parse_jsonrpc_jsonl(text),
10        McpInputFormat::Inspector => parse_inspector_best_effort(text),
11        McpInputFormat::StreamableHttp => parse_streamable_http_transcript(text),
12        McpInputFormat::HttpSse => parse_http_sse_transcript(text),
13    }?;
14    validate_mcp_events(&events)?;
15    Ok(events)
16}
17
18fn parse_jsonrpc_jsonl(text: &str) -> Result<Vec<McpEvent>> {
19    let mut out = Vec::new();
20
21    for (lineno, line) in text.lines().enumerate() {
22        let line = line.trim();
23        if line.is_empty() {
24            continue;
25        }
26
27        let v: serde_json::Value = serde_json::from_str(line)
28            .with_context(|| format!("invalid JSON on line {}", lineno + 1))?;
29
30        let event = parse_jsonrpc_message(
31            v,
32            (lineno + 1) as u64,
33            None,
34            McpAuthorizationDiscovery::default(),
35        )?;
36        out.push(event);
37    }
38
39    Ok(out)
40}
41
42fn parse_inspector_best_effort(text: &str) -> Result<Vec<McpEvent>> {
43    let v: serde_json::Value = serde_json::from_str(text).context("invalid inspector JSON")?;
44
45    // Handle Inspector export variations:
46    // 1. Array of events
47    // 2. Object with "events" array
48    let arr = v
49        .get("events")
50        .cloned()
51        .or_else(|| v.as_array().cloned().map(serde_json::Value::Array))
52        .and_then(|x| x.as_array().cloned())
53        .unwrap_or_default();
54
55    let mut out = Vec::new();
56    for (idx, item) in arr.into_iter().enumerate() {
57        // Use array index as source_line for sorting stability
58        let event = parse_jsonrpc_message(
59            item,
60            (idx + 1) as u64,
61            None,
62            McpAuthorizationDiscovery::default(),
63        )?;
64        out.push(event);
65    }
66
67    Ok(out)
68}
69
70fn parse_streamable_http_transcript(text: &str) -> Result<Vec<McpEvent>> {
71    parse_transport_transcript(text, "streamable-http", "streamable-http transcript", false)
72}
73
74fn parse_http_sse_transcript(text: &str) -> Result<Vec<McpEvent>> {
75    parse_transport_transcript(text, "http-sse", "http-sse transcript", true)
76}
77
78fn parse_transport_transcript(
79    text: &str,
80    expected_transport: &str,
81    source_label: &str,
82    allow_endpoint_event: bool,
83) -> Result<Vec<McpEvent>> {
84    let transcript: TransportTranscript =
85        serde_json::from_str(text).with_context(|| format!("invalid {}", source_label))?;
86
87    let actual_transport = transcript.transport.as_deref().unwrap_or("missing");
88    if actual_transport != expected_transport {
89        bail!(
90            "{} transport must be {:?}, found {:?}",
91            source_label,
92            expected_transport,
93            actual_transport
94        );
95    }
96
97    let mut out = Vec::new();
98    for (idx, entry) in transcript.entries.into_iter().enumerate() {
99        let source_line = (idx + 1) as u64;
100        let present = usize::from(entry.request.is_some())
101            + usize::from(entry.response.is_some())
102            + usize::from(entry.sse.is_some());
103
104        if present != 1 {
105            bail!(
106                "{} entry {} must contain exactly one of request, response, or sse",
107                source_label,
108                source_line
109            );
110        }
111
112        if let Some(request) = entry.request {
113            out.push(parse_jsonrpc_message(
114                request,
115                source_line,
116                entry.timestamp_ms,
117                McpAuthorizationDiscovery::default(),
118            )?);
119            continue;
120        }
121
122        let auth_discovery = parse_transport_auth_discovery(&entry);
123
124        if let Some(response) = entry.response {
125            out.push(parse_jsonrpc_message(
126                response,
127                source_line,
128                entry.timestamp_ms,
129                auth_discovery,
130            )?);
131            continue;
132        }
133
134        if let Some(sse) = entry.sse {
135            if let Some(jsonrpc) = extract_jsonrpc_from_sse(&sse, allow_endpoint_event) {
136                out.push(parse_jsonrpc_message(
137                    jsonrpc,
138                    source_line,
139                    entry.timestamp_ms,
140                    McpAuthorizationDiscovery::default(),
141                )?);
142            }
143        }
144    }
145
146    Ok(out)
147}
148
149fn parse_jsonrpc_message(
150    v: serde_json::Value,
151    source_line: u64,
152    timestamp_ms_override: Option<u64>,
153    auth_discovery: McpAuthorizationDiscovery,
154) -> Result<McpEvent> {
155    if !v.is_object() {
156        bail!(
157            "MCP event at source line {} must be a JSON object",
158            source_line
159        );
160    }
161
162    let ts_ms = timestamp_ms_override.or_else(|| extract_ts_ms(&v));
163
164    // JSON-RPC ID extraction
165    let id_str = normalize_jsonrpc_id(v.get("id"), source_line)?;
166
167    // Check for JSON-RPC Request (has method)
168    let method = v
169        .get("method")
170        .and_then(|m| m.as_str())
171        .map(|s| s.to_string());
172
173    let payload = if let Some(method) = method {
174        match method.as_str() {
175            "tools/list" => McpPayload::ToolsListRequest { raw: v.clone() },
176            "tools/call" => {
177                let params = v.get("params").cloned().unwrap_or(serde_json::Value::Null);
178                let name = params
179                    .get("name")
180                    .and_then(|x| x.as_str())
181                    .unwrap_or("unknown_tool")
182                    .to_string();
183                let arguments = params
184                    .get("arguments")
185                    .cloned()
186                    .unwrap_or(serde_json::Value::Null);
187                McpPayload::ToolCallRequest {
188                    name,
189                    arguments,
190                    raw: v.clone(),
191                }
192            }
193            // Add other standard MCP methods mapping here if needed
194            _ => McpPayload::Other { raw: v.clone() },
195        }
196    } else {
197        // Response (result or error)
198        if v.get("result").is_some() {
199            if looks_like_tools_list_result(&v) {
200                let tools = parse_tools_list_result(&v)?;
201                McpPayload::ToolsListResponse {
202                    tools,
203                    raw: v.clone(),
204                }
205            } else {
206                McpPayload::ToolCallResponse {
207                    result: v.get("result").cloned().unwrap_or(serde_json::Value::Null),
208                    is_error: false,
209                    raw: v.clone(),
210                }
211            }
212        } else if v.get("error").is_some() {
213            McpPayload::ToolCallResponse {
214                result: v.get("error").cloned().unwrap_or(serde_json::Value::Null),
215                is_error: true,
216                raw: v.clone(),
217            }
218        } else {
219            // Maybe it's not JSON-RPC, or it's a notification/special event
220            // Check for known "Session" markers if any (ad-hoc)
221            McpPayload::Other { raw: v.clone() }
222        }
223    };
224
225    Ok(McpEvent {
226        source_line,
227        timestamp_ms: ts_ms,
228        jsonrpc_id: id_str,
229        auth_discovery,
230        payload,
231    })
232}
233
234fn parse_transport_auth_discovery(entry: &TransportTranscriptEntry) -> McpAuthorizationDiscovery {
235    let Some(status) = extract_http_status(entry) else {
236        return McpAuthorizationDiscovery::default();
237    };
238
239    if status != 401 {
240        return McpAuthorizationDiscovery::default();
241    }
242
243    let header_value = entry
244        .transport_context
245        .as_ref()
246        .and_then(|value| find_header_case_insensitive(value, "www-authenticate"))
247        .or_else(|| {
248            entry
249                .headers
250                .as_ref()
251                .and_then(|value| find_header_case_insensitive(value, "www-authenticate"))
252        });
253
254    let Some(www_authenticate) = header_value else {
255        return McpAuthorizationDiscovery::default();
256    };
257
258    let resource_metadata_visible = auth_param_visible(&www_authenticate, "resource_metadata");
259    let scope_challenge_visible = auth_param_visible(&www_authenticate, "scope");
260
261    if !resource_metadata_visible && !scope_challenge_visible {
262        return McpAuthorizationDiscovery::default();
263    }
264
265    McpAuthorizationDiscovery {
266        visible: true,
267        source_kind: McpAuthorizationDiscoverySourceKind::WwwAuthenticate,
268        resource_metadata_visible,
269        authorization_servers_visible: false,
270        scope_challenge_visible,
271    }
272}
273
274fn extract_http_status(entry: &TransportTranscriptEntry) -> Option<u16> {
275    entry
276        .transport_context
277        .as_ref()
278        .and_then(extract_http_status_from_value)
279        .or_else(|| {
280            entry
281                .headers
282                .as_ref()
283                .and_then(extract_http_status_from_value)
284        })
285}
286
287fn extract_http_status_from_value(value: &serde_json::Value) -> Option<u16> {
288    match value {
289        serde_json::Value::Object(map) => {
290            for key in ["status", "status_code", "http_status"] {
291                if let Some(status) = map.get(key).and_then(json_value_to_u16) {
292                    return Some(status);
293                }
294            }
295
296            map.get("response").and_then(extract_http_status_from_value)
297        }
298        _ => None,
299    }
300}
301
302fn json_value_to_u16(value: &serde_json::Value) -> Option<u16> {
303    match value {
304        serde_json::Value::Number(n) => n.as_u64().and_then(|n| u16::try_from(n).ok()),
305        serde_json::Value::String(s) => s.parse::<u16>().ok(),
306        _ => None,
307    }
308}
309
310fn find_header_case_insensitive(value: &serde_json::Value, header_name: &str) -> Option<String> {
311    match value {
312        serde_json::Value::Object(map) => {
313            if let Some(headers) = map.get("headers") {
314                if let Some(found) = find_header_case_insensitive(headers, header_name) {
315                    return Some(found);
316                }
317            }
318
319            if let Some(response) = map.get("response") {
320                if let Some(found) = find_header_case_insensitive(response, header_name) {
321                    return Some(found);
322                }
323            }
324
325            map.iter().find_map(|(key, value)| {
326                if key.eq_ignore_ascii_case(header_name) {
327                    value.as_str().map(ToString::to_string)
328                } else {
329                    None
330                }
331            })
332        }
333        _ => None,
334    }
335}
336
337fn auth_param_visible(header_value: &str, param_name: &str) -> bool {
338    let lower = header_value.to_ascii_lowercase();
339    let needle = format!("{param_name}=");
340
341    lower
342        .match_indices(&needle)
343        .any(|(idx, _)| idx == 0 || matches!(lower.as_bytes()[idx - 1], b' ' | b',' | b'\t'))
344}
345
346fn normalize_jsonrpc_id(
347    raw_id: Option<&serde_json::Value>,
348    source_line: u64,
349) -> Result<Option<String>> {
350    match raw_id {
351        None | Some(serde_json::Value::Null) => Ok(None),
352        Some(serde_json::Value::String(id)) => Ok(Some(id.clone())),
353        Some(serde_json::Value::Number(id)) => Ok(Some(id.to_string())),
354        Some(serde_json::Value::Bool(_)) => {
355            bail!(
356                "JSON-RPC id on source line {} must not be a boolean",
357                source_line
358            )
359        }
360        Some(serde_json::Value::Array(_)) => {
361            bail!(
362                "JSON-RPC id on source line {} must not be an array",
363                source_line
364            )
365        }
366        Some(serde_json::Value::Object(_)) => {
367            bail!(
368                "JSON-RPC id on source line {} must not be an object",
369                source_line
370            )
371        }
372    }
373}
374
375fn validate_mcp_events(events: &[McpEvent]) -> Result<()> {
376    let mut seen_tool_call_request_ids = HashSet::new();
377
378    for event in events {
379        if matches!(&event.payload, McpPayload::ToolCallRequest { .. }) {
380            if let Some(id) = &event.jsonrpc_id {
381                if !seen_tool_call_request_ids.insert(id.clone()) {
382                    bail!(
383                        "duplicate tools/call request id {:?} at source line {}",
384                        id,
385                        event.source_line
386                    );
387                }
388            }
389        }
390    }
391
392    Ok(())
393}
394
395fn extract_jsonrpc_from_sse(
396    sse: &TransportSseEnvelope,
397    allow_endpoint_event: bool,
398) -> Option<serde_json::Value> {
399    let event_name = sse.event.as_deref().unwrap_or("message");
400    if event_name == "endpoint" && allow_endpoint_event {
401        return None;
402    }
403
404    if event_name != "message" {
405        return None;
406    }
407
408    extract_jsonrpc_like_value(&sse.data)
409}
410
411fn extract_jsonrpc_like_value(value: &serde_json::Value) -> Option<serde_json::Value> {
412    match value {
413        serde_json::Value::Object(map)
414            if map.contains_key("method")
415                || map.contains_key("result")
416                || map.contains_key("error")
417                || map.contains_key("jsonrpc") =>
418        {
419            Some(value.clone())
420        }
421        serde_json::Value::String(text) => serde_json::from_str::<serde_json::Value>(text)
422            .ok()
423            .and_then(|parsed| extract_jsonrpc_like_value(&parsed)),
424        _ => None,
425    }
426}
427
428fn extract_ts_ms(v: &serde_json::Value) -> Option<u64> {
429    // Try standard keys.
430    if let Some(t) = v.get("timestamp_ms").and_then(|t| t.as_u64()) {
431        return Some(t);
432    }
433    if let Some(t) = v.get("timestamp").and_then(|t| t.as_u64()) {
434        return Some(t); // Assume ms if big integer, otherwise might be seconds?
435                        // For P0, assume ms or handled by caller if not.
436    }
437    None
438}
439
440fn looks_like_tools_list_result(v: &serde_json::Value) -> bool {
441    v.get("result")
442        .and_then(|r| r.get("tools"))
443        .and_then(|t| t.as_array())
444        .is_some()
445}
446
447fn parse_tools_list_result(v: &serde_json::Value) -> Result<Vec<McpToolDef>> {
448    let tools = v
449        .get("result")
450        .and_then(|r| r.get("tools"))
451        .and_then(|t| t.as_array())
452        .cloned()
453        .unwrap_or_default();
454
455    let mut out = Vec::new();
456    for tool in tools {
457        let name = tool
458            .get("name")
459            .and_then(|x| x.as_str())
460            .unwrap_or("unknown")
461            .to_string();
462        let description = tool
463            .get("description")
464            .and_then(|x| x.as_str())
465            .map(|s| s.to_string());
466        // Handle inputSchema (camelCase) or input_schema (snake_case)
467        let input_schema = tool
468            .get("inputSchema")
469            .cloned()
470            .or_else(|| tool.get("input_schema").cloned());
471        out.push(McpToolDef {
472            name,
473            description,
474            input_schema,
475            tool_identity: None,
476        });
477    }
478    Ok(out)
479}
480
481#[derive(Debug, Deserialize)]
482struct TransportTranscript {
483    transport: Option<String>,
484    #[allow(dead_code)]
485    #[serde(default)]
486    transport_context: Option<serde_json::Value>,
487    #[allow(dead_code)]
488    #[serde(default)]
489    headers: Option<serde_json::Value>,
490    #[serde(default)]
491    entries: Vec<TransportTranscriptEntry>,
492}
493
494#[derive(Debug, Deserialize)]
495struct TransportTranscriptEntry {
496    #[serde(default)]
497    timestamp_ms: Option<u64>,
498    #[allow(dead_code)]
499    #[serde(default)]
500    transport_context: Option<serde_json::Value>,
501    #[allow(dead_code)]
502    #[serde(default)]
503    headers: Option<serde_json::Value>,
504    #[serde(default)]
505    request: Option<serde_json::Value>,
506    #[serde(default)]
507    response: Option<serde_json::Value>,
508    #[serde(default)]
509    sse: Option<TransportSseEnvelope>,
510}
511
512#[derive(Debug, Deserialize)]
513struct TransportSseEnvelope {
514    #[serde(default)]
515    event: Option<String>,
516    #[allow(dead_code)]
517    #[serde(default)]
518    id: Option<String>,
519    data: serde_json::Value,
520}