1use crate::mcp::types::*;
2use anyhow::{bail, Context, Result};
3use serde::Deserialize;
4use std::collections::HashSet;
5
6pub fn parse_mcp_transcript(text: &str, format: McpInputFormat) -> Result<Vec<McpEvent>> {
8 let events = match format {
9 McpInputFormat::JsonRpc => parse_jsonrpc_jsonl(text),
10 McpInputFormat::Inspector => parse_inspector_best_effort(text),
11 McpInputFormat::StreamableHttp => parse_streamable_http_transcript(text),
12 McpInputFormat::HttpSse => parse_http_sse_transcript(text),
13 }?;
14 validate_mcp_events(&events)?;
15 Ok(events)
16}
17
18fn parse_jsonrpc_jsonl(text: &str) -> Result<Vec<McpEvent>> {
19 let mut out = Vec::new();
20
21 for (lineno, line) in text.lines().enumerate() {
22 let line = line.trim();
23 if line.is_empty() {
24 continue;
25 }
26
27 let v: serde_json::Value = serde_json::from_str(line)
28 .with_context(|| format!("invalid JSON on line {}", lineno + 1))?;
29
30 let event = parse_jsonrpc_message(
31 v,
32 (lineno + 1) as u64,
33 None,
34 McpAuthorizationDiscovery::default(),
35 )?;
36 out.push(event);
37 }
38
39 Ok(out)
40}
41
42fn parse_inspector_best_effort(text: &str) -> Result<Vec<McpEvent>> {
43 let v: serde_json::Value = serde_json::from_str(text).context("invalid inspector JSON")?;
44
45 let arr = v
49 .get("events")
50 .cloned()
51 .or_else(|| v.as_array().cloned().map(serde_json::Value::Array))
52 .and_then(|x| x.as_array().cloned())
53 .unwrap_or_default();
54
55 let mut out = Vec::new();
56 for (idx, item) in arr.into_iter().enumerate() {
57 let event = parse_jsonrpc_message(
59 item,
60 (idx + 1) as u64,
61 None,
62 McpAuthorizationDiscovery::default(),
63 )?;
64 out.push(event);
65 }
66
67 Ok(out)
68}
69
70fn parse_streamable_http_transcript(text: &str) -> Result<Vec<McpEvent>> {
71 parse_transport_transcript(text, "streamable-http", "streamable-http transcript", false)
72}
73
74fn parse_http_sse_transcript(text: &str) -> Result<Vec<McpEvent>> {
75 parse_transport_transcript(text, "http-sse", "http-sse transcript", true)
76}
77
78fn parse_transport_transcript(
79 text: &str,
80 expected_transport: &str,
81 source_label: &str,
82 allow_endpoint_event: bool,
83) -> Result<Vec<McpEvent>> {
84 let transcript: TransportTranscript =
85 serde_json::from_str(text).with_context(|| format!("invalid {}", source_label))?;
86
87 let actual_transport = transcript.transport.as_deref().unwrap_or("missing");
88 if actual_transport != expected_transport {
89 bail!(
90 "{} transport must be {:?}, found {:?}",
91 source_label,
92 expected_transport,
93 actual_transport
94 );
95 }
96
97 let mut out = Vec::new();
98 for (idx, entry) in transcript.entries.into_iter().enumerate() {
99 let source_line = (idx + 1) as u64;
100 let present = usize::from(entry.request.is_some())
101 + usize::from(entry.response.is_some())
102 + usize::from(entry.sse.is_some());
103
104 if present != 1 {
105 bail!(
106 "{} entry {} must contain exactly one of request, response, or sse",
107 source_label,
108 source_line
109 );
110 }
111
112 if let Some(request) = entry.request {
113 out.push(parse_jsonrpc_message(
114 request,
115 source_line,
116 entry.timestamp_ms,
117 McpAuthorizationDiscovery::default(),
118 )?);
119 continue;
120 }
121
122 let auth_discovery = parse_transport_auth_discovery(&entry);
123
124 if let Some(response) = entry.response {
125 out.push(parse_jsonrpc_message(
126 response,
127 source_line,
128 entry.timestamp_ms,
129 auth_discovery,
130 )?);
131 continue;
132 }
133
134 if let Some(sse) = entry.sse {
135 if let Some(jsonrpc) = extract_jsonrpc_from_sse(&sse, allow_endpoint_event) {
136 out.push(parse_jsonrpc_message(
137 jsonrpc,
138 source_line,
139 entry.timestamp_ms,
140 McpAuthorizationDiscovery::default(),
141 )?);
142 }
143 }
144 }
145
146 Ok(out)
147}
148
149fn parse_jsonrpc_message(
150 v: serde_json::Value,
151 source_line: u64,
152 timestamp_ms_override: Option<u64>,
153 auth_discovery: McpAuthorizationDiscovery,
154) -> Result<McpEvent> {
155 if !v.is_object() {
156 bail!(
157 "MCP event at source line {} must be a JSON object",
158 source_line
159 );
160 }
161
162 let ts_ms = timestamp_ms_override.or_else(|| extract_ts_ms(&v));
163
164 let id_str = normalize_jsonrpc_id(v.get("id"), source_line)?;
166
167 let method = v
169 .get("method")
170 .and_then(|m| m.as_str())
171 .map(|s| s.to_string());
172
173 let payload = if let Some(method) = method {
174 match method.as_str() {
175 "tools/list" => McpPayload::ToolsListRequest { raw: v.clone() },
176 "tools/call" => {
177 let params = v.get("params").cloned().unwrap_or(serde_json::Value::Null);
178 let name = params
179 .get("name")
180 .and_then(|x| x.as_str())
181 .unwrap_or("unknown_tool")
182 .to_string();
183 let arguments = params
184 .get("arguments")
185 .cloned()
186 .unwrap_or(serde_json::Value::Null);
187 McpPayload::ToolCallRequest {
188 name,
189 arguments,
190 raw: v.clone(),
191 }
192 }
193 _ => McpPayload::Other { raw: v.clone() },
195 }
196 } else {
197 if v.get("result").is_some() {
199 if looks_like_tools_list_result(&v) {
200 let tools = parse_tools_list_result(&v)?;
201 McpPayload::ToolsListResponse {
202 tools,
203 raw: v.clone(),
204 }
205 } else {
206 McpPayload::ToolCallResponse {
207 result: v.get("result").cloned().unwrap_or(serde_json::Value::Null),
208 is_error: false,
209 raw: v.clone(),
210 }
211 }
212 } else if v.get("error").is_some() {
213 McpPayload::ToolCallResponse {
214 result: v.get("error").cloned().unwrap_or(serde_json::Value::Null),
215 is_error: true,
216 raw: v.clone(),
217 }
218 } else {
219 McpPayload::Other { raw: v.clone() }
222 }
223 };
224
225 Ok(McpEvent {
226 source_line,
227 timestamp_ms: ts_ms,
228 jsonrpc_id: id_str,
229 auth_discovery,
230 payload,
231 })
232}
233
234fn parse_transport_auth_discovery(entry: &TransportTranscriptEntry) -> McpAuthorizationDiscovery {
235 let Some(status) = extract_http_status(entry) else {
236 return McpAuthorizationDiscovery::default();
237 };
238
239 if status != 401 {
240 return McpAuthorizationDiscovery::default();
241 }
242
243 let header_value = entry
244 .transport_context
245 .as_ref()
246 .and_then(|value| find_header_case_insensitive(value, "www-authenticate"))
247 .or_else(|| {
248 entry
249 .headers
250 .as_ref()
251 .and_then(|value| find_header_case_insensitive(value, "www-authenticate"))
252 });
253
254 let Some(www_authenticate) = header_value else {
255 return McpAuthorizationDiscovery::default();
256 };
257
258 let resource_metadata_visible = auth_param_visible(&www_authenticate, "resource_metadata");
259 let scope_challenge_visible = auth_param_visible(&www_authenticate, "scope");
260
261 if !resource_metadata_visible && !scope_challenge_visible {
262 return McpAuthorizationDiscovery::default();
263 }
264
265 McpAuthorizationDiscovery {
266 visible: true,
267 source_kind: McpAuthorizationDiscoverySourceKind::WwwAuthenticate,
268 resource_metadata_visible,
269 authorization_servers_visible: false,
270 scope_challenge_visible,
271 }
272}
273
274fn extract_http_status(entry: &TransportTranscriptEntry) -> Option<u16> {
275 entry
276 .transport_context
277 .as_ref()
278 .and_then(extract_http_status_from_value)
279 .or_else(|| {
280 entry
281 .headers
282 .as_ref()
283 .and_then(extract_http_status_from_value)
284 })
285}
286
287fn extract_http_status_from_value(value: &serde_json::Value) -> Option<u16> {
288 match value {
289 serde_json::Value::Object(map) => {
290 for key in ["status", "status_code", "http_status"] {
291 if let Some(status) = map.get(key).and_then(json_value_to_u16) {
292 return Some(status);
293 }
294 }
295
296 map.get("response").and_then(extract_http_status_from_value)
297 }
298 _ => None,
299 }
300}
301
302fn json_value_to_u16(value: &serde_json::Value) -> Option<u16> {
303 match value {
304 serde_json::Value::Number(n) => n.as_u64().and_then(|n| u16::try_from(n).ok()),
305 serde_json::Value::String(s) => s.parse::<u16>().ok(),
306 _ => None,
307 }
308}
309
310fn find_header_case_insensitive(value: &serde_json::Value, header_name: &str) -> Option<String> {
311 match value {
312 serde_json::Value::Object(map) => {
313 if let Some(headers) = map.get("headers") {
314 if let Some(found) = find_header_case_insensitive(headers, header_name) {
315 return Some(found);
316 }
317 }
318
319 if let Some(response) = map.get("response") {
320 if let Some(found) = find_header_case_insensitive(response, header_name) {
321 return Some(found);
322 }
323 }
324
325 map.iter().find_map(|(key, value)| {
326 if key.eq_ignore_ascii_case(header_name) {
327 value.as_str().map(ToString::to_string)
328 } else {
329 None
330 }
331 })
332 }
333 _ => None,
334 }
335}
336
337fn auth_param_visible(header_value: &str, param_name: &str) -> bool {
338 let lower = header_value.to_ascii_lowercase();
339 let needle = format!("{param_name}=");
340
341 lower
342 .match_indices(&needle)
343 .any(|(idx, _)| idx == 0 || matches!(lower.as_bytes()[idx - 1], b' ' | b',' | b'\t'))
344}
345
346fn normalize_jsonrpc_id(
347 raw_id: Option<&serde_json::Value>,
348 source_line: u64,
349) -> Result<Option<String>> {
350 match raw_id {
351 None | Some(serde_json::Value::Null) => Ok(None),
352 Some(serde_json::Value::String(id)) => Ok(Some(id.clone())),
353 Some(serde_json::Value::Number(id)) => Ok(Some(id.to_string())),
354 Some(serde_json::Value::Bool(_)) => {
355 bail!(
356 "JSON-RPC id on source line {} must not be a boolean",
357 source_line
358 )
359 }
360 Some(serde_json::Value::Array(_)) => {
361 bail!(
362 "JSON-RPC id on source line {} must not be an array",
363 source_line
364 )
365 }
366 Some(serde_json::Value::Object(_)) => {
367 bail!(
368 "JSON-RPC id on source line {} must not be an object",
369 source_line
370 )
371 }
372 }
373}
374
375fn validate_mcp_events(events: &[McpEvent]) -> Result<()> {
376 let mut seen_tool_call_request_ids = HashSet::new();
377
378 for event in events {
379 if matches!(&event.payload, McpPayload::ToolCallRequest { .. }) {
380 if let Some(id) = &event.jsonrpc_id {
381 if !seen_tool_call_request_ids.insert(id.clone()) {
382 bail!(
383 "duplicate tools/call request id {:?} at source line {}",
384 id,
385 event.source_line
386 );
387 }
388 }
389 }
390 }
391
392 Ok(())
393}
394
395fn extract_jsonrpc_from_sse(
396 sse: &TransportSseEnvelope,
397 allow_endpoint_event: bool,
398) -> Option<serde_json::Value> {
399 let event_name = sse.event.as_deref().unwrap_or("message");
400 if event_name == "endpoint" && allow_endpoint_event {
401 return None;
402 }
403
404 if event_name != "message" {
405 return None;
406 }
407
408 extract_jsonrpc_like_value(&sse.data)
409}
410
411fn extract_jsonrpc_like_value(value: &serde_json::Value) -> Option<serde_json::Value> {
412 match value {
413 serde_json::Value::Object(map)
414 if map.contains_key("method")
415 || map.contains_key("result")
416 || map.contains_key("error")
417 || map.contains_key("jsonrpc") =>
418 {
419 Some(value.clone())
420 }
421 serde_json::Value::String(text) => serde_json::from_str::<serde_json::Value>(text)
422 .ok()
423 .and_then(|parsed| extract_jsonrpc_like_value(&parsed)),
424 _ => None,
425 }
426}
427
428fn extract_ts_ms(v: &serde_json::Value) -> Option<u64> {
429 if let Some(t) = v.get("timestamp_ms").and_then(|t| t.as_u64()) {
431 return Some(t);
432 }
433 if let Some(t) = v.get("timestamp").and_then(|t| t.as_u64()) {
434 return Some(t); }
437 None
438}
439
440fn looks_like_tools_list_result(v: &serde_json::Value) -> bool {
441 v.get("result")
442 .and_then(|r| r.get("tools"))
443 .and_then(|t| t.as_array())
444 .is_some()
445}
446
447fn parse_tools_list_result(v: &serde_json::Value) -> Result<Vec<McpToolDef>> {
448 let tools = v
449 .get("result")
450 .and_then(|r| r.get("tools"))
451 .and_then(|t| t.as_array())
452 .cloned()
453 .unwrap_or_default();
454
455 let mut out = Vec::new();
456 for tool in tools {
457 let name = tool
458 .get("name")
459 .and_then(|x| x.as_str())
460 .unwrap_or("unknown")
461 .to_string();
462 let description = tool
463 .get("description")
464 .and_then(|x| x.as_str())
465 .map(|s| s.to_string());
466 let input_schema = tool
468 .get("inputSchema")
469 .cloned()
470 .or_else(|| tool.get("input_schema").cloned());
471 out.push(McpToolDef {
472 name,
473 description,
474 input_schema,
475 tool_identity: None,
476 });
477 }
478 Ok(out)
479}
480
481#[derive(Debug, Deserialize)]
482struct TransportTranscript {
483 transport: Option<String>,
484 #[allow(dead_code)]
485 #[serde(default)]
486 transport_context: Option<serde_json::Value>,
487 #[allow(dead_code)]
488 #[serde(default)]
489 headers: Option<serde_json::Value>,
490 #[serde(default)]
491 entries: Vec<TransportTranscriptEntry>,
492}
493
494#[derive(Debug, Deserialize)]
495struct TransportTranscriptEntry {
496 #[serde(default)]
497 timestamp_ms: Option<u64>,
498 #[allow(dead_code)]
499 #[serde(default)]
500 transport_context: Option<serde_json::Value>,
501 #[allow(dead_code)]
502 #[serde(default)]
503 headers: Option<serde_json::Value>,
504 #[serde(default)]
505 request: Option<serde_json::Value>,
506 #[serde(default)]
507 response: Option<serde_json::Value>,
508 #[serde(default)]
509 sse: Option<TransportSseEnvelope>,
510}
511
512#[derive(Debug, Deserialize)]
513struct TransportSseEnvelope {
514 #[serde(default)]
515 event: Option<String>,
516 #[allow(dead_code)]
517 #[serde(default)]
518 id: Option<String>,
519 data: serde_json::Value,
520}