1use crate::event::{ProxyEvent, RequestEvent};
12
13use super::pipeline::middlewares::shared;
14use super::pipeline::values::{Context, Envelope, Response, StageTiming};
15
16pub fn build_request_event(proxy_name: &str, cx: &Context, resp: &Response) -> RequestEvent {
20 let status = derive_status(resp);
21 let response_size = derive_response_size(cx, resp);
22 let (error_code, error_msg) = derive_error(resp);
23 let (mcp_method_str, tool) = derive_method_and_tool(cx);
24 let session_id = derive_session_id(cx, resp);
25 let (client_name, client_version) = derive_client(cx);
26 let note = derive_note(cx, resp);
27
28 RequestEvent {
29 id: uuid::Uuid::new_v4().to_string(),
30 ts: chrono::Utc::now().timestamp_millis(),
31 proxy: proxy_name.to_string(),
32 session_id,
33 method: cx.intake.http_method.to_string(),
34 path: cx.intake.path.clone(),
35 mcp_method: mcp_method_str,
36 tool,
37 resource_uri: cx.working.request_resource_uri.clone(),
38 prompt_name: cx.working.request_prompt_name.clone(),
39 status,
40 latency_us: cx.intake.start.elapsed().as_micros() as u64,
41 upstream_us: cx.working.upstream_us,
42 request_size: Some(cx.intake.request_size as u64),
43 response_size,
44 error_code,
45 error_msg: error_msg.map(|m| m.chars().take(512).collect()),
46 client_name,
47 client_version,
48 note,
49 stage_timings: derive_stage_timings(&cx.working.timings),
50 }
51}
52
53fn derive_stage_timings(timings: &[StageTiming]) -> Option<Vec<StageTiming>> {
54 if timings.is_empty() {
55 None
56 } else {
57 Some(timings.to_vec())
58 }
59}
60
61pub fn emit(cx: &Context, resp: &Response) {
64 let state = &cx.intake.proxy;
65 state
66 .event_bus
67 .emit(ProxyEvent::Request(Box::new(build_request_event(
68 &state.name,
69 cx,
70 resp,
71 ))));
72}
73
74fn derive_status(resp: &Response) -> u16 {
75 match resp {
76 Response::McpBuffered { status, .. }
77 | Response::McpStreamed { status, .. }
78 | Response::OauthJson { status, .. }
79 | Response::Raw { status, .. } => status.as_u16(),
80 Response::Upstream502 { .. } => 502,
81 }
82}
83
84fn derive_response_size(cx: &Context, _resp: &Response) -> Option<u64> {
85 cx.working.response_size
89}
90
91fn derive_error(resp: &Response) -> (Option<String>, Option<String>) {
92 match resp {
93 Response::Upstream502 { reason } => (None, Some(reason.clone())),
94 Response::McpBuffered { message, .. } => {
95 if let Some(err) = &message.envelope.error {
96 (Some(err.code.to_string()), Some(err.message.clone()))
97 } else {
98 (None, None)
99 }
100 }
101 _ => (None, None),
102 }
103}
104
105fn derive_method_and_tool(cx: &Context) -> (Option<String>, Option<String>) {
106 let http_get_is_sse = cx.intake.http_method == axum::http::Method::GET;
110 let method_str = cx
111 .working
112 .request_method
113 .as_ref()
114 .and_then(crate::protocol::mcp::ClientMethod::as_str)
115 .map(str::to_owned);
116 let mcp_method_str = match (method_str, http_get_is_sse) {
117 (Some(m), _) => Some(m),
118 (None, true) => Some("SSE".to_owned()),
119 (None, false) => None,
120 };
121 (mcp_method_str, cx.working.request_tool.clone())
122}
123
124fn derive_session_id(cx: &Context, resp: &Response) -> Option<String> {
125 if let Some(s) = cx.working.session.as_ref() {
130 return Some(s.id.clone());
131 }
132 let headers = match resp {
133 Response::McpBuffered { headers, .. }
134 | Response::McpStreamed { headers, .. }
135 | Response::Raw { headers, .. } => Some(headers),
136 _ => None,
137 };
138 headers
139 .and_then(|h| h.get("mcp-session-id"))
140 .and_then(|v| v.to_str().ok())
141 .map(str::to_owned)
142}
143
144fn derive_client(cx: &Context) -> (Option<String>, Option<String>) {
145 if let Some(c) = cx.working.client.as_ref() {
149 return (Some(c.name.clone()), c.version.clone());
150 }
151 if let Some(s) = cx.working.session.as_ref()
152 && let Some(c) = s.client_info.as_ref()
153 {
154 return (Some(c.name.clone()), c.version.clone());
155 }
156 (None, None)
157}
158
159fn derive_note(cx: &Context, resp: &Response) -> String {
160 let mut tags: Vec<&str> = cx.working.tags.as_slice().to_vec();
165 if matches!(resp, Response::Upstream502 { .. }) && !tags.contains(&"upstream error") {
166 tags.push("upstream error");
167 }
168 if matches!(
173 resp,
174 Response::McpStreamed {
175 envelope: Envelope::Sse,
176 ..
177 }
178 ) && !tags.contains(&"sse")
179 {
180 tags.push("sse");
181 }
182 tags.join("+")
183}
184
185pub fn normalize_platform(client_name: &str) -> &'static str {
190 shared::normalize_platform(client_name)
191}
192
193#[cfg(test)]
194#[allow(non_snake_case)]
195mod tests {
196 use super::*;
197
198 use axum::body::Body;
199 use axum::http::{HeaderMap, HeaderValue, Method, StatusCode};
200
201 use crate::protocol::jsonrpc::JsonRpcEnvelope;
202 use crate::protocol::mcp::{
203 ClientMethod, LifecycleMethod, McpMessage, MessageKind, ServerKind, ToolsMethod,
204 };
205 use crate::proxy::pipeline::middlewares::test_support::{test_context, test_proxy_state};
206 use crate::proxy::pipeline::values::{Envelope, Response};
207
208 fn buffered_ok(body: &str) -> Response {
209 let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
210 let message = McpMessage {
211 envelope,
212 kind: MessageKind::Server(ServerKind::Result),
213 };
214 Response::McpBuffered {
215 envelope: Envelope::Json,
216 message,
217 status: StatusCode::OK,
218 headers: HeaderMap::new(),
219 }
220 }
221
222 fn buffered_with_session(body: &str, session_id: &str) -> Response {
223 let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
224 let message = McpMessage {
225 envelope,
226 kind: MessageKind::Server(ServerKind::Result),
227 };
228 let mut headers = HeaderMap::new();
229 headers.insert("mcp-session-id", HeaderValue::from_str(session_id).unwrap());
230 Response::McpBuffered {
231 envelope: Envelope::Json,
232 message,
233 status: StatusCode::OK,
234 headers,
235 }
236 }
237
238 #[tokio::test]
239 async fn build__tools_list_200_ok_sets_method_and_status() {
240 let proxy = test_proxy_state();
241 let mut cx = test_context(proxy);
242 cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
243 cx.working.tags.push("rewritten");
244
245 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#);
246 let ev = build_request_event("test-proxy", &cx, &resp);
247
248 assert_eq!(ev.status, 200);
249 assert_eq!(ev.mcp_method.as_deref(), Some("tools/list"));
250 assert_eq!(ev.proxy, "test-proxy");
251 assert_eq!(ev.method, "POST");
252 assert_eq!(ev.path, "/mcp");
253 assert_eq!(ev.note, "rewritten");
254 assert!(ev.error_code.is_none());
255 }
256
257 #[tokio::test]
258 async fn build__rpc_error_in_buffered_result_surfaces_code_and_message() {
259 let proxy = test_proxy_state();
260 let mut cx = test_context(proxy);
261 cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
262 let resp =
263 buffered_ok(r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"bad"}}"#);
264 let ev = build_request_event("p", &cx, &resp);
265 assert_eq!(ev.error_code.as_deref(), Some("-32600"));
266 assert_eq!(ev.error_msg.as_deref(), Some("bad"));
267 }
268
269 #[tokio::test]
270 async fn build__upstream_502_tags_note_as_upstream_error() {
271 let proxy = test_proxy_state();
272 let cx = test_context(proxy);
273 let resp = Response::Upstream502 {
274 reason: "connection refused".into(),
275 };
276 let ev = build_request_event("p", &cx, &resp);
277 assert_eq!(ev.status, 502);
278 assert_eq!(ev.note, "upstream error");
279 assert_eq!(ev.error_msg.as_deref(), Some("connection refused"));
280 }
281
282 #[tokio::test]
283 async fn build__sse_streamed_response_tags_note_as_sse() {
284 let proxy = test_proxy_state();
285 let cx = test_context(proxy);
286 let resp = Response::McpStreamed {
287 envelope: Envelope::Sse,
288 body: Body::empty(),
289 status: StatusCode::OK,
290 headers: HeaderMap::new(),
291 };
292 let ev = build_request_event("p", &cx, &resp);
293 assert_eq!(ev.note, "sse");
294 }
295
296 #[tokio::test]
297 async fn build__sse_get_without_stashed_method_reports_mcp_method_as_SSE_literal() {
298 let proxy = test_proxy_state();
299 let mut cx = test_context(proxy);
300 cx.intake.http_method = Method::GET;
301 let resp = Response::McpStreamed {
302 envelope: Envelope::Sse,
303 body: Body::empty(),
304 status: StatusCode::OK,
305 headers: HeaderMap::new(),
306 };
307 let ev = build_request_event("p", &cx, &resp);
308 assert_eq!(ev.mcp_method.as_deref(), Some("SSE"));
309 }
310
311 #[tokio::test]
312 async fn build__client_info_preferred_over_session_when_set() {
313 use crate::protocol::session::ClientInfo;
314 let proxy = test_proxy_state();
315 let mut cx = test_context(proxy);
316 cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
317 cx.working.client = Some(ClientInfo {
318 name: "claude-desktop".into(),
319 version: Some("1.2.0".into()),
320 });
321 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
322 let ev = build_request_event("p", &cx, &resp);
323 assert_eq!(ev.client_name.as_deref(), Some("claude-desktop"));
324 assert_eq!(ev.client_version.as_deref(), Some("1.2.0"));
325 }
326
327 #[tokio::test]
328 async fn build__session_id_falls_back_to_response_header_when_working_session_empty() {
329 let proxy = test_proxy_state();
330 let mut cx = test_context(proxy);
331 cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
332 let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-abc");
333 let ev = build_request_event("p", &cx, &resp);
334 assert_eq!(ev.session_id.as_deref(), Some("sess-abc"));
335 }
336
337 #[tokio::test]
338 async fn build__session_id_uses_working_session_when_present() {
339 use crate::protocol::session::SessionInfo;
340 let proxy = test_proxy_state();
341 let mut cx = test_context(proxy);
342 cx.working.session = Some(SessionInfo {
343 id: "sess-working".into(),
344 state: crate::protocol::session::SessionState::Active,
345 client_info: None,
346 request_count: 0,
347 created_at: chrono::Utc::now(),
348 last_active: chrono::Utc::now(),
349 });
350 let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-header");
351 let ev = build_request_event("p", &cx, &resp);
352 assert_eq!(ev.session_id.as_deref(), Some("sess-working"));
353 }
354
355 #[tokio::test]
356 async fn build__stage_timings_propagated() {
357 let proxy = test_proxy_state();
358 let mut cx = test_context(proxy);
359 cx.working.timings.push(StageTiming {
360 name: "intake_parse",
361 elapsed_us: 42,
362 });
363 cx.working.timings.push(StageTiming {
364 name: "csp_rewrite",
365 elapsed_us: 100,
366 });
367 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
368 let ev = build_request_event("p", &cx, &resp);
369 let timings = ev.stage_timings.expect("stage_timings populated");
370 assert_eq!(timings.len(), 2);
371 assert_eq!(timings[0].name, "intake_parse");
372 assert_eq!(timings[0].elapsed_us, 42);
373 assert_eq!(timings[1].name, "csp_rewrite");
374 }
375
376 #[tokio::test]
377 async fn build__tags_joined_with_plus() {
378 let proxy = test_proxy_state();
379 let mut cx = test_context(proxy);
380 cx.working.tags.push("rewritten");
381 cx.working.tags.push("sse");
382 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
383 let ev = build_request_event("p", &cx, &resp);
384 assert_eq!(ev.note, "rewritten+sse");
385 }
386}