1use crate::mcp::types::*;
2use anyhow::{bail, Context, Result};
3use serde::Deserialize;
4use std::collections::HashSet;
5
6pub fn parse_mcp_transcript(text: &str, format: McpInputFormat) -> Result<Vec<McpEvent>> {
8 let events = match format {
9 McpInputFormat::JsonRpc => parse_jsonrpc_jsonl(text),
10 McpInputFormat::Inspector => parse_inspector_best_effort(text),
11 McpInputFormat::StreamableHttp => parse_streamable_http_transcript(text),
12 McpInputFormat::HttpSse => parse_http_sse_transcript(text),
13 }?;
14 validate_mcp_events(&events)?;
15 Ok(events)
16}
17
18fn parse_jsonrpc_jsonl(text: &str) -> Result<Vec<McpEvent>> {
19 let mut out = Vec::new();
20
21 for (lineno, line) in text.lines().enumerate() {
22 let line = line.trim();
23 if line.is_empty() {
24 continue;
25 }
26
27 let v: serde_json::Value = serde_json::from_str(line)
28 .with_context(|| format!("invalid JSON on line {}", lineno + 1))?;
29
30 let event = parse_jsonrpc_message(v, (lineno + 1) as u64, None)?;
31 out.push(event);
32 }
33
34 Ok(out)
35}
36
37fn parse_inspector_best_effort(text: &str) -> Result<Vec<McpEvent>> {
38 let v: serde_json::Value = serde_json::from_str(text).context("invalid inspector JSON")?;
39
40 let arr = v
44 .get("events")
45 .cloned()
46 .or_else(|| v.as_array().cloned().map(serde_json::Value::Array))
47 .and_then(|x| x.as_array().cloned())
48 .unwrap_or_default();
49
50 let mut out = Vec::new();
51 for (idx, item) in arr.into_iter().enumerate() {
52 let event = parse_jsonrpc_message(item, (idx + 1) as u64, None)?;
54 out.push(event);
55 }
56
57 Ok(out)
58}
59
60fn parse_streamable_http_transcript(text: &str) -> Result<Vec<McpEvent>> {
61 parse_transport_transcript(text, "streamable-http", "streamable-http transcript", false)
62}
63
64fn parse_http_sse_transcript(text: &str) -> Result<Vec<McpEvent>> {
65 parse_transport_transcript(text, "http-sse", "http-sse transcript", true)
66}
67
68fn parse_transport_transcript(
69 text: &str,
70 expected_transport: &str,
71 source_label: &str,
72 allow_endpoint_event: bool,
73) -> Result<Vec<McpEvent>> {
74 let transcript: TransportTranscript =
75 serde_json::from_str(text).with_context(|| format!("invalid {}", source_label))?;
76
77 let actual_transport = transcript.transport.as_deref().unwrap_or("missing");
78 if actual_transport != expected_transport {
79 bail!(
80 "{} transport must be {:?}, found {:?}",
81 source_label,
82 expected_transport,
83 actual_transport
84 );
85 }
86
87 let mut out = Vec::new();
88 for (idx, entry) in transcript.entries.into_iter().enumerate() {
89 let source_line = (idx + 1) as u64;
90 let present = usize::from(entry.request.is_some())
91 + usize::from(entry.response.is_some())
92 + usize::from(entry.sse.is_some());
93
94 if present != 1 {
95 bail!(
96 "{} entry {} must contain exactly one of request, response, or sse",
97 source_label,
98 source_line
99 );
100 }
101
102 if let Some(request) = entry.request {
103 out.push(parse_jsonrpc_message(
104 request,
105 source_line,
106 entry.timestamp_ms,
107 )?);
108 continue;
109 }
110
111 if let Some(response) = entry.response {
112 out.push(parse_jsonrpc_message(
113 response,
114 source_line,
115 entry.timestamp_ms,
116 )?);
117 continue;
118 }
119
120 if let Some(sse) = entry.sse {
121 if let Some(jsonrpc) = extract_jsonrpc_from_sse(&sse, allow_endpoint_event) {
122 out.push(parse_jsonrpc_message(
123 jsonrpc,
124 source_line,
125 entry.timestamp_ms,
126 )?);
127 }
128 }
129 }
130
131 Ok(out)
132}
133
134fn parse_jsonrpc_message(
135 v: serde_json::Value,
136 source_line: u64,
137 timestamp_ms_override: Option<u64>,
138) -> Result<McpEvent> {
139 if !v.is_object() {
140 bail!(
141 "MCP event at source line {} must be a JSON object",
142 source_line
143 );
144 }
145
146 let ts_ms = timestamp_ms_override.or_else(|| extract_ts_ms(&v));
147
148 let id_str = normalize_jsonrpc_id(v.get("id"), source_line)?;
150
151 let method = v
153 .get("method")
154 .and_then(|m| m.as_str())
155 .map(|s| s.to_string());
156
157 let payload = if let Some(method) = method {
158 match method.as_str() {
159 "tools/list" => McpPayload::ToolsListRequest { raw: v.clone() },
160 "tools/call" => {
161 let params = v.get("params").cloned().unwrap_or(serde_json::Value::Null);
162 let name = params
163 .get("name")
164 .and_then(|x| x.as_str())
165 .unwrap_or("unknown_tool")
166 .to_string();
167 let arguments = params
168 .get("arguments")
169 .cloned()
170 .unwrap_or(serde_json::Value::Null);
171 McpPayload::ToolCallRequest {
172 name,
173 arguments,
174 raw: v.clone(),
175 }
176 }
177 _ => McpPayload::Other { raw: v.clone() },
179 }
180 } else {
181 if v.get("result").is_some() {
183 if looks_like_tools_list_result(&v) {
184 let tools = parse_tools_list_result(&v)?;
185 McpPayload::ToolsListResponse {
186 tools,
187 raw: v.clone(),
188 }
189 } else {
190 McpPayload::ToolCallResponse {
191 result: v.get("result").cloned().unwrap_or(serde_json::Value::Null),
192 is_error: false,
193 raw: v.clone(),
194 }
195 }
196 } else if v.get("error").is_some() {
197 McpPayload::ToolCallResponse {
198 result: v.get("error").cloned().unwrap_or(serde_json::Value::Null),
199 is_error: true,
200 raw: v.clone(),
201 }
202 } else {
203 McpPayload::Other { raw: v.clone() }
206 }
207 };
208
209 Ok(McpEvent {
210 source_line,
211 timestamp_ms: ts_ms,
212 jsonrpc_id: id_str,
213 payload,
214 })
215}
216
217fn normalize_jsonrpc_id(
218 raw_id: Option<&serde_json::Value>,
219 source_line: u64,
220) -> Result<Option<String>> {
221 match raw_id {
222 None | Some(serde_json::Value::Null) => Ok(None),
223 Some(serde_json::Value::String(id)) => Ok(Some(id.clone())),
224 Some(serde_json::Value::Number(id)) => Ok(Some(id.to_string())),
225 Some(serde_json::Value::Bool(_)) => {
226 bail!(
227 "JSON-RPC id on source line {} must not be a boolean",
228 source_line
229 )
230 }
231 Some(serde_json::Value::Array(_)) => {
232 bail!(
233 "JSON-RPC id on source line {} must not be an array",
234 source_line
235 )
236 }
237 Some(serde_json::Value::Object(_)) => {
238 bail!(
239 "JSON-RPC id on source line {} must not be an object",
240 source_line
241 )
242 }
243 }
244}
245
246fn validate_mcp_events(events: &[McpEvent]) -> Result<()> {
247 let mut seen_tool_call_request_ids = HashSet::new();
248
249 for event in events {
250 if matches!(&event.payload, McpPayload::ToolCallRequest { .. }) {
251 if let Some(id) = &event.jsonrpc_id {
252 if !seen_tool_call_request_ids.insert(id.clone()) {
253 bail!(
254 "duplicate tools/call request id {:?} at source line {}",
255 id,
256 event.source_line
257 );
258 }
259 }
260 }
261 }
262
263 Ok(())
264}
265
266fn extract_jsonrpc_from_sse(
267 sse: &TransportSseEnvelope,
268 allow_endpoint_event: bool,
269) -> Option<serde_json::Value> {
270 let event_name = sse.event.as_deref().unwrap_or("message");
271 if event_name == "endpoint" && allow_endpoint_event {
272 return None;
273 }
274
275 if event_name != "message" {
276 return None;
277 }
278
279 extract_jsonrpc_like_value(&sse.data)
280}
281
282fn extract_jsonrpc_like_value(value: &serde_json::Value) -> Option<serde_json::Value> {
283 match value {
284 serde_json::Value::Object(map)
285 if map.contains_key("method")
286 || map.contains_key("result")
287 || map.contains_key("error")
288 || map.contains_key("jsonrpc") =>
289 {
290 Some(value.clone())
291 }
292 serde_json::Value::String(text) => serde_json::from_str::<serde_json::Value>(text)
293 .ok()
294 .and_then(|parsed| extract_jsonrpc_like_value(&parsed)),
295 _ => None,
296 }
297}
298
299fn extract_ts_ms(v: &serde_json::Value) -> Option<u64> {
300 if let Some(t) = v.get("timestamp_ms").and_then(|t| t.as_u64()) {
302 return Some(t);
303 }
304 if let Some(t) = v.get("timestamp").and_then(|t| t.as_u64()) {
305 return Some(t); }
308 None
309}
310
311fn looks_like_tools_list_result(v: &serde_json::Value) -> bool {
312 v.get("result")
313 .and_then(|r| r.get("tools"))
314 .and_then(|t| t.as_array())
315 .is_some()
316}
317
318fn parse_tools_list_result(v: &serde_json::Value) -> Result<Vec<McpToolDef>> {
319 let tools = v
320 .get("result")
321 .and_then(|r| r.get("tools"))
322 .and_then(|t| t.as_array())
323 .cloned()
324 .unwrap_or_default();
325
326 let mut out = Vec::new();
327 for tool in tools {
328 let name = tool
329 .get("name")
330 .and_then(|x| x.as_str())
331 .unwrap_or("unknown")
332 .to_string();
333 let description = tool
334 .get("description")
335 .and_then(|x| x.as_str())
336 .map(|s| s.to_string());
337 let input_schema = tool
339 .get("inputSchema")
340 .cloned()
341 .or_else(|| tool.get("input_schema").cloned());
342 out.push(McpToolDef {
343 name,
344 description,
345 input_schema,
346 tool_identity: None,
347 });
348 }
349 Ok(out)
350}
351
352#[derive(Debug, Deserialize)]
353struct TransportTranscript {
354 transport: Option<String>,
355 #[allow(dead_code)]
356 #[serde(default)]
357 transport_context: Option<serde_json::Value>,
358 #[allow(dead_code)]
359 #[serde(default)]
360 headers: Option<serde_json::Value>,
361 #[serde(default)]
362 entries: Vec<TransportTranscriptEntry>,
363}
364
365#[derive(Debug, Deserialize)]
366struct TransportTranscriptEntry {
367 #[serde(default)]
368 timestamp_ms: Option<u64>,
369 #[allow(dead_code)]
370 #[serde(default)]
371 transport_context: Option<serde_json::Value>,
372 #[allow(dead_code)]
373 #[serde(default)]
374 headers: Option<serde_json::Value>,
375 #[serde(default)]
376 request: Option<serde_json::Value>,
377 #[serde(default)]
378 response: Option<serde_json::Value>,
379 #[serde(default)]
380 sse: Option<TransportSseEnvelope>,
381}
382
383#[derive(Debug, Deserialize)]
384struct TransportSseEnvelope {
385 #[serde(default)]
386 event: Option<String>,
387 #[allow(dead_code)]
388 #[serde(default)]
389 id: Option<String>,
390 data: serde_json::Value,
391}