1use crate::event::{ProxyEvent, RequestEvent};
12
13use super::pipeline::middlewares::shared;
14use super::pipeline::values::{Context, Envelope, Response, StageTiming};
15
16pub fn build_request_event(proxy_name: &str, cx: &Context, resp: &Response) -> RequestEvent {
20 let status = derive_status(resp);
21 let response_size = derive_response_size(cx, resp);
22 let (error_code, error_msg) = derive_error(resp);
23 let (mcp_method_str, tool) = derive_method_and_tool(cx);
24 let session_id = derive_session_id(cx, resp);
25 let (client_name, client_version) = derive_client(cx);
26 let note = derive_note(cx, resp);
27
28 RequestEvent {
29 id: uuid::Uuid::new_v4().to_string(),
30 ts: chrono::Utc::now().timestamp_millis(),
31 proxy: proxy_name.to_string(),
32 session_id,
33 method: cx.intake.http_method.to_string(),
34 path: cx.intake.path.clone(),
35 mcp_method: mcp_method_str,
36 tool,
37 status,
38 latency_us: cx.intake.start.elapsed().as_micros() as u64,
39 upstream_us: cx.working.upstream_us,
40 request_size: Some(cx.intake.request_size as u64),
41 response_size,
42 error_code,
43 error_msg: error_msg.map(|m| m.chars().take(512).collect()),
44 client_name,
45 client_version,
46 note,
47 stage_timings: derive_stage_timings(&cx.working.timings),
48 }
49}
50
51fn derive_stage_timings(timings: &[StageTiming]) -> Option<Vec<StageTiming>> {
52 if timings.is_empty() {
53 None
54 } else {
55 Some(timings.to_vec())
56 }
57}
58
59pub fn emit(cx: &Context, resp: &Response) {
62 let state = &cx.intake.proxy;
63 state
64 .event_bus
65 .emit(ProxyEvent::Request(Box::new(build_request_event(
66 &state.name,
67 cx,
68 resp,
69 ))));
70}
71
72fn derive_status(resp: &Response) -> u16 {
73 match resp {
74 Response::McpBuffered { status, .. }
75 | Response::McpStreamed { status, .. }
76 | Response::OauthJson { status, .. }
77 | Response::Raw { status, .. } => status.as_u16(),
78 Response::Upstream502 { .. } => 502,
79 }
80}
81
82fn derive_response_size(cx: &Context, _resp: &Response) -> Option<u64> {
83 cx.working.response_size
87}
88
89fn derive_error(resp: &Response) -> (Option<String>, Option<String>) {
90 match resp {
91 Response::Upstream502 { reason } => (None, Some(reason.clone())),
92 Response::McpBuffered { message, .. } => {
93 if let Some(err) = &message.envelope.error {
94 (Some(err.code.to_string()), Some(err.message.clone()))
95 } else {
96 (None, None)
97 }
98 }
99 _ => (None, None),
100 }
101}
102
103fn derive_method_and_tool(cx: &Context) -> (Option<String>, Option<String>) {
104 let http_get_is_sse = cx.intake.http_method == axum::http::Method::GET;
108 let method_str = cx
109 .working
110 .request_method
111 .as_ref()
112 .and_then(crate::protocol::mcp::ClientMethod::as_str)
113 .map(str::to_owned);
114 let mcp_method_str = match (method_str, http_get_is_sse) {
115 (Some(m), _) => Some(m),
116 (None, true) => Some("SSE".to_owned()),
117 (None, false) => None,
118 };
119 (mcp_method_str, cx.working.request_tool.clone())
120}
121
122fn derive_session_id(cx: &Context, resp: &Response) -> Option<String> {
123 if let Some(s) = cx.working.session.as_ref() {
128 return Some(s.id.clone());
129 }
130 let headers = match resp {
131 Response::McpBuffered { headers, .. }
132 | Response::McpStreamed { headers, .. }
133 | Response::Raw { headers, .. } => Some(headers),
134 _ => None,
135 };
136 headers
137 .and_then(|h| h.get("mcp-session-id"))
138 .and_then(|v| v.to_str().ok())
139 .map(str::to_owned)
140}
141
142fn derive_client(cx: &Context) -> (Option<String>, Option<String>) {
143 if let Some(c) = cx.working.client.as_ref() {
147 return (Some(c.name.clone()), c.version.clone());
148 }
149 if let Some(s) = cx.working.session.as_ref()
150 && let Some(c) = s.client_info.as_ref()
151 {
152 return (Some(c.name.clone()), c.version.clone());
153 }
154 (None, None)
155}
156
157fn derive_note(cx: &Context, resp: &Response) -> String {
158 let mut tags: Vec<&str> = cx.working.tags.as_slice().to_vec();
163 if matches!(resp, Response::Upstream502 { .. }) && !tags.contains(&"upstream error") {
164 tags.push("upstream error");
165 }
166 if matches!(
171 resp,
172 Response::McpStreamed {
173 envelope: Envelope::Sse,
174 ..
175 }
176 ) && !tags.contains(&"sse")
177 {
178 tags.push("sse");
179 }
180 tags.join("+")
181}
182
183pub fn normalize_platform(client_name: &str) -> &'static str {
188 shared::normalize_platform(client_name)
189}
190
191#[cfg(test)]
192#[allow(non_snake_case)]
193mod tests {
194 use super::*;
195
196 use axum::body::Body;
197 use axum::http::{HeaderMap, HeaderValue, Method, StatusCode};
198
199 use crate::protocol::jsonrpc::JsonRpcEnvelope;
200 use crate::protocol::mcp::{
201 ClientMethod, LifecycleMethod, McpMessage, MessageKind, ServerKind, ToolsMethod,
202 };
203 use crate::proxy::pipeline::middlewares::test_support::{test_context, test_proxy_state};
204 use crate::proxy::pipeline::values::{Envelope, Response};
205
206 fn buffered_ok(body: &str) -> Response {
207 let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
208 let message = McpMessage {
209 envelope,
210 kind: MessageKind::Server(ServerKind::Result),
211 };
212 Response::McpBuffered {
213 envelope: Envelope::Json,
214 message,
215 status: StatusCode::OK,
216 headers: HeaderMap::new(),
217 }
218 }
219
220 fn buffered_with_session(body: &str, session_id: &str) -> Response {
221 let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
222 let message = McpMessage {
223 envelope,
224 kind: MessageKind::Server(ServerKind::Result),
225 };
226 let mut headers = HeaderMap::new();
227 headers.insert("mcp-session-id", HeaderValue::from_str(session_id).unwrap());
228 Response::McpBuffered {
229 envelope: Envelope::Json,
230 message,
231 status: StatusCode::OK,
232 headers,
233 }
234 }
235
236 #[tokio::test]
237 async fn build__tools_list_200_ok_sets_method_and_status() {
238 let proxy = test_proxy_state();
239 let mut cx = test_context(proxy);
240 cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
241 cx.working.tags.push("rewritten");
242
243 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#);
244 let ev = build_request_event("test-proxy", &cx, &resp);
245
246 assert_eq!(ev.status, 200);
247 assert_eq!(ev.mcp_method.as_deref(), Some("tools/list"));
248 assert_eq!(ev.proxy, "test-proxy");
249 assert_eq!(ev.method, "POST");
250 assert_eq!(ev.path, "/mcp");
251 assert_eq!(ev.note, "rewritten");
252 assert!(ev.error_code.is_none());
253 }
254
255 #[tokio::test]
256 async fn build__rpc_error_in_buffered_result_surfaces_code_and_message() {
257 let proxy = test_proxy_state();
258 let mut cx = test_context(proxy);
259 cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
260 let resp =
261 buffered_ok(r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"bad"}}"#);
262 let ev = build_request_event("p", &cx, &resp);
263 assert_eq!(ev.error_code.as_deref(), Some("-32600"));
264 assert_eq!(ev.error_msg.as_deref(), Some("bad"));
265 }
266
267 #[tokio::test]
268 async fn build__upstream_502_tags_note_as_upstream_error() {
269 let proxy = test_proxy_state();
270 let cx = test_context(proxy);
271 let resp = Response::Upstream502 {
272 reason: "connection refused".into(),
273 };
274 let ev = build_request_event("p", &cx, &resp);
275 assert_eq!(ev.status, 502);
276 assert_eq!(ev.note, "upstream error");
277 assert_eq!(ev.error_msg.as_deref(), Some("connection refused"));
278 }
279
280 #[tokio::test]
281 async fn build__sse_streamed_response_tags_note_as_sse() {
282 let proxy = test_proxy_state();
283 let cx = test_context(proxy);
284 let resp = Response::McpStreamed {
285 envelope: Envelope::Sse,
286 body: Body::empty(),
287 status: StatusCode::OK,
288 headers: HeaderMap::new(),
289 };
290 let ev = build_request_event("p", &cx, &resp);
291 assert_eq!(ev.note, "sse");
292 }
293
294 #[tokio::test]
295 async fn build__sse_get_without_stashed_method_reports_mcp_method_as_SSE_literal() {
296 let proxy = test_proxy_state();
297 let mut cx = test_context(proxy);
298 cx.intake.http_method = Method::GET;
299 let resp = Response::McpStreamed {
300 envelope: Envelope::Sse,
301 body: Body::empty(),
302 status: StatusCode::OK,
303 headers: HeaderMap::new(),
304 };
305 let ev = build_request_event("p", &cx, &resp);
306 assert_eq!(ev.mcp_method.as_deref(), Some("SSE"));
307 }
308
309 #[tokio::test]
310 async fn build__client_info_preferred_over_session_when_set() {
311 use crate::protocol::session::ClientInfo;
312 let proxy = test_proxy_state();
313 let mut cx = test_context(proxy);
314 cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
315 cx.working.client = Some(ClientInfo {
316 name: "claude-desktop".into(),
317 version: Some("1.2.0".into()),
318 });
319 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
320 let ev = build_request_event("p", &cx, &resp);
321 assert_eq!(ev.client_name.as_deref(), Some("claude-desktop"));
322 assert_eq!(ev.client_version.as_deref(), Some("1.2.0"));
323 }
324
325 #[tokio::test]
326 async fn build__session_id_falls_back_to_response_header_when_working_session_empty() {
327 let proxy = test_proxy_state();
328 let mut cx = test_context(proxy);
329 cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
330 let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-abc");
331 let ev = build_request_event("p", &cx, &resp);
332 assert_eq!(ev.session_id.as_deref(), Some("sess-abc"));
333 }
334
335 #[tokio::test]
336 async fn build__session_id_uses_working_session_when_present() {
337 use crate::protocol::session::SessionInfo;
338 let proxy = test_proxy_state();
339 let mut cx = test_context(proxy);
340 cx.working.session = Some(SessionInfo {
341 id: "sess-working".into(),
342 state: crate::protocol::session::SessionState::Active,
343 client_info: None,
344 request_count: 0,
345 created_at: chrono::Utc::now(),
346 last_active: chrono::Utc::now(),
347 });
348 let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-header");
349 let ev = build_request_event("p", &cx, &resp);
350 assert_eq!(ev.session_id.as_deref(), Some("sess-working"));
351 }
352
353 #[tokio::test]
354 async fn build__stage_timings_propagated() {
355 let proxy = test_proxy_state();
356 let mut cx = test_context(proxy);
357 cx.working.timings.push(StageTiming {
358 name: "intake_parse",
359 elapsed_us: 42,
360 });
361 cx.working.timings.push(StageTiming {
362 name: "csp_rewrite",
363 elapsed_us: 100,
364 });
365 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
366 let ev = build_request_event("p", &cx, &resp);
367 let timings = ev.stage_timings.expect("stage_timings populated");
368 assert_eq!(timings.len(), 2);
369 assert_eq!(timings[0].name, "intake_parse");
370 assert_eq!(timings[0].elapsed_us, 42);
371 assert_eq!(timings[1].name, "csp_rewrite");
372 }
373
374 #[tokio::test]
375 async fn build__tags_joined_with_plus() {
376 let proxy = test_proxy_state();
377 let mut cx = test_context(proxy);
378 cx.working.tags.push("rewritten");
379 cx.working.tags.push("sse");
380 let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
381 let ev = build_request_event("p", &cx, &resp);
382 assert_eq!(ev.note, "rewritten+sse");
383 }
384}