Skip to main content

mcpr_core/proxy/
emit.rs

1//! `RequestEvent` construction and emission.
2//!
3//! The sole construction site for [`RequestEvent`]. `handle_request`
4//! calls [`emit`] once, immediately after the response chain has
5//! finished, before `IntoResponse` converts the value to an axum
6//! response.
7//!
8//! `mcpr-cloud/backend/` consumes `RequestEvent` and session-lifecycle
9//! events — the inline tests below cover the field shapes both rely on.
10
11use crate::event::{ProxyEvent, RequestEvent};
12
13use super::pipeline::middlewares::shared;
14use super::pipeline::values::{Context, Envelope, Response, StageTiming};
15
16/// Build a `RequestEvent` from the accumulated context and the final
17/// response. Separated from [`emit`] so it can be unit-tested without a
18/// live event bus.
19pub fn build_request_event(proxy_name: &str, cx: &Context, resp: &Response) -> RequestEvent {
20    let status = derive_status(resp);
21    let response_size = derive_response_size(cx, resp);
22    let (error_code, error_msg) = derive_error(resp);
23    let (mcp_method_str, tool) = derive_method_and_tool(cx);
24    let session_id = derive_session_id(cx, resp);
25    let (client_name, client_version) = derive_client(cx);
26    let note = derive_note(cx, resp);
27
28    RequestEvent {
29        id: uuid::Uuid::new_v4().to_string(),
30        ts: chrono::Utc::now().timestamp_millis(),
31        proxy: proxy_name.to_string(),
32        session_id,
33        method: cx.intake.http_method.to_string(),
34        path: cx.intake.path.clone(),
35        mcp_method: mcp_method_str,
36        tool,
37        resource_uri: cx.working.request_resource_uri.clone(),
38        prompt_name: cx.working.request_prompt_name.clone(),
39        status,
40        latency_us: cx.intake.start.elapsed().as_micros() as u64,
41        upstream_us: cx.working.upstream_us,
42        request_size: Some(cx.intake.request_size as u64),
43        response_size,
44        error_code,
45        error_msg: error_msg.map(|m| m.chars().take(512).collect()),
46        client_name,
47        client_version,
48        note,
49        stage_timings: derive_stage_timings(&cx.working.timings),
50    }
51}
52
53fn derive_stage_timings(timings: &[StageTiming]) -> Option<Vec<StageTiming>> {
54    if timings.is_empty() {
55        None
56    } else {
57        Some(timings.to_vec())
58    }
59}
60
61/// Emit a `Request` event to the proxy's event bus. Called once per
62/// request, after the response chain.
63pub fn emit(cx: &Context, resp: &Response) {
64    let state = &cx.intake.proxy;
65    state
66        .event_bus
67        .emit(ProxyEvent::Request(Box::new(build_request_event(
68            &state.name,
69            cx,
70            resp,
71        ))));
72}
73
74fn derive_status(resp: &Response) -> u16 {
75    match resp {
76        Response::McpBuffered { status, .. }
77        | Response::McpStreamed { status, .. }
78        | Response::OauthJson { status, .. }
79        | Response::Raw { status, .. } => status.as_u16(),
80        Response::Upstream502 { .. } => 502,
81    }
82}
83
84fn derive_response_size(cx: &Context, _resp: &Response) -> Option<u64> {
85    // `EnvelopeSealMiddleware` stashes the serialized buffered-body size
86    // onto `cx.working.response_size`. Streaming paths and 502s leave it
87    // unset — matching the legacy `response_size: None` on those paths.
88    cx.working.response_size
89}
90
91fn derive_error(resp: &Response) -> (Option<String>, Option<String>) {
92    match resp {
93        Response::Upstream502 { reason } => (None, Some(reason.clone())),
94        Response::McpBuffered { message, .. } => {
95            if let Some(err) = &message.envelope.error {
96                (Some(err.code.to_string()), Some(err.message.clone()))
97            } else {
98                (None, None)
99            }
100        }
101        _ => (None, None),
102    }
103}
104
105fn derive_method_and_tool(cx: &Context) -> (Option<String>, Option<String>) {
106    // Legacy emit.rs tagged SSE GETs with `mcp_method = Some("SSE")` —
107    // preserve that for cloud/backend continuity. Every other MCP method
108    // comes from `cx.working.request_method` (set by SessionTouch).
109    let http_get_is_sse = cx.intake.http_method == axum::http::Method::GET;
110    let method_str = cx
111        .working
112        .request_method
113        .as_ref()
114        .and_then(crate::protocol::mcp::ClientMethod::as_str)
115        .map(str::to_owned);
116    let mcp_method_str = match (method_str, http_get_is_sse) {
117        (Some(m), _) => Some(m),
118        (None, true) => Some("SSE".to_owned()),
119        (None, false) => None,
120    };
121    (mcp_method_str, cx.working.request_tool.clone())
122}
123
124fn derive_session_id(cx: &Context, resp: &Response) -> Option<String> {
125    // Prefer the session we touched/created on the request side. Fall
126    // back to reading the header off the response for `initialize`,
127    // where `SessionRecord` creates the session but does not mutate
128    // `cx.working.session`.
129    if let Some(s) = cx.working.session.as_ref() {
130        return Some(s.id.clone());
131    }
132    let headers = match resp {
133        Response::McpBuffered { headers, .. }
134        | Response::McpStreamed { headers, .. }
135        | Response::Raw { headers, .. } => Some(headers),
136        _ => None,
137    };
138    headers
139        .and_then(|h| h.get("mcp-session-id"))
140        .and_then(|v| v.to_str().ok())
141        .map(str::to_owned)
142}
143
144fn derive_client(cx: &Context) -> (Option<String>, Option<String>) {
145    // Request-side `ClientInfoInjectMiddleware` stashes `clientInfo` on
146    // the initialize path. Subsequent requests read it from the session
147    // record that `SessionTouchMiddleware` loaded into `cx.working.session`.
148    if let Some(c) = cx.working.client.as_ref() {
149        return (Some(c.name.clone()), c.version.clone());
150    }
151    if let Some(s) = cx.working.session.as_ref()
152        && let Some(c) = s.client_info.as_ref()
153    {
154        return (Some(c.name.clone()), c.version.clone());
155    }
156    (None, None)
157}
158
159fn derive_note(cx: &Context, resp: &Response) -> String {
160    // Middlewares push tags as they run. The Emitter appends shape-derived
161    // tags that middlewares can't know about from their vantage point —
162    // specifically `upstream error`, which only makes sense once we see
163    // the final `Response::Upstream502` variant.
164    let mut tags: Vec<&str> = cx.working.tags.as_slice().to_vec();
165    if matches!(resp, Response::Upstream502 { .. }) && !tags.contains(&"upstream error") {
166        tags.push("upstream error");
167    }
168    // For SSE legacy GETs the transport returns `Response::McpStreamed`
169    // with `Envelope::Sse`. EnvelopeSeal does not touch streamed
170    // responses, so no middleware tags it — surface from shape here to
171    // match legacy `note = "sse"`.
172    if matches!(
173        resp,
174        Response::McpStreamed {
175            envelope: Envelope::Sse,
176            ..
177        }
178    ) && !tags.contains(&"sse")
179    {
180        tags.push("sse");
181    }
182    tags.join("+")
183}
184
185/// Normalize a client name to a platform identifier used in `SessionStart`.
186/// Thin re-export of `pipeline::middlewares::shared::normalize_platform`
187/// so external callers of this module don't have to reach into the
188/// middlewares submodule.
189pub fn normalize_platform(client_name: &str) -> &'static str {
190    shared::normalize_platform(client_name)
191}
192
193#[cfg(test)]
194#[allow(non_snake_case)]
195mod tests {
196    use super::*;
197
198    use axum::body::Body;
199    use axum::http::{HeaderMap, HeaderValue, Method, StatusCode};
200
201    use crate::protocol::jsonrpc::JsonRpcEnvelope;
202    use crate::protocol::mcp::{
203        ClientMethod, LifecycleMethod, McpMessage, MessageKind, ServerKind, ToolsMethod,
204    };
205    use crate::proxy::pipeline::middlewares::test_support::{test_context, test_proxy_state};
206    use crate::proxy::pipeline::values::{Envelope, Response};
207
208    fn buffered_ok(body: &str) -> Response {
209        let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
210        let message = McpMessage {
211            envelope,
212            kind: MessageKind::Server(ServerKind::Result),
213        };
214        Response::McpBuffered {
215            envelope: Envelope::Json,
216            message,
217            status: StatusCode::OK,
218            headers: HeaderMap::new(),
219        }
220    }
221
222    fn buffered_with_session(body: &str, session_id: &str) -> Response {
223        let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
224        let message = McpMessage {
225            envelope,
226            kind: MessageKind::Server(ServerKind::Result),
227        };
228        let mut headers = HeaderMap::new();
229        headers.insert("mcp-session-id", HeaderValue::from_str(session_id).unwrap());
230        Response::McpBuffered {
231            envelope: Envelope::Json,
232            message,
233            status: StatusCode::OK,
234            headers,
235        }
236    }
237
238    #[tokio::test]
239    async fn build__tools_list_200_ok_sets_method_and_status() {
240        let proxy = test_proxy_state();
241        let mut cx = test_context(proxy);
242        cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
243        cx.working.tags.push("rewritten");
244
245        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#);
246        let ev = build_request_event("test-proxy", &cx, &resp);
247
248        assert_eq!(ev.status, 200);
249        assert_eq!(ev.mcp_method.as_deref(), Some("tools/list"));
250        assert_eq!(ev.proxy, "test-proxy");
251        assert_eq!(ev.method, "POST");
252        assert_eq!(ev.path, "/mcp");
253        assert_eq!(ev.note, "rewritten");
254        assert!(ev.error_code.is_none());
255    }
256
257    #[tokio::test]
258    async fn build__rpc_error_in_buffered_result_surfaces_code_and_message() {
259        let proxy = test_proxy_state();
260        let mut cx = test_context(proxy);
261        cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
262        let resp =
263            buffered_ok(r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"bad"}}"#);
264        let ev = build_request_event("p", &cx, &resp);
265        assert_eq!(ev.error_code.as_deref(), Some("-32600"));
266        assert_eq!(ev.error_msg.as_deref(), Some("bad"));
267    }
268
269    #[tokio::test]
270    async fn build__upstream_502_tags_note_as_upstream_error() {
271        let proxy = test_proxy_state();
272        let cx = test_context(proxy);
273        let resp = Response::Upstream502 {
274            reason: "connection refused".into(),
275        };
276        let ev = build_request_event("p", &cx, &resp);
277        assert_eq!(ev.status, 502);
278        assert_eq!(ev.note, "upstream error");
279        assert_eq!(ev.error_msg.as_deref(), Some("connection refused"));
280    }
281
282    #[tokio::test]
283    async fn build__sse_streamed_response_tags_note_as_sse() {
284        let proxy = test_proxy_state();
285        let cx = test_context(proxy);
286        let resp = Response::McpStreamed {
287            envelope: Envelope::Sse,
288            body: Body::empty(),
289            status: StatusCode::OK,
290            headers: HeaderMap::new(),
291        };
292        let ev = build_request_event("p", &cx, &resp);
293        assert_eq!(ev.note, "sse");
294    }
295
296    #[tokio::test]
297    async fn build__sse_get_without_stashed_method_reports_mcp_method_as_SSE_literal() {
298        let proxy = test_proxy_state();
299        let mut cx = test_context(proxy);
300        cx.intake.http_method = Method::GET;
301        let resp = Response::McpStreamed {
302            envelope: Envelope::Sse,
303            body: Body::empty(),
304            status: StatusCode::OK,
305            headers: HeaderMap::new(),
306        };
307        let ev = build_request_event("p", &cx, &resp);
308        assert_eq!(ev.mcp_method.as_deref(), Some("SSE"));
309    }
310
311    #[tokio::test]
312    async fn build__client_info_preferred_over_session_when_set() {
313        use crate::protocol::session::ClientInfo;
314        let proxy = test_proxy_state();
315        let mut cx = test_context(proxy);
316        cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
317        cx.working.client = Some(ClientInfo {
318            name: "claude-desktop".into(),
319            version: Some("1.2.0".into()),
320        });
321        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
322        let ev = build_request_event("p", &cx, &resp);
323        assert_eq!(ev.client_name.as_deref(), Some("claude-desktop"));
324        assert_eq!(ev.client_version.as_deref(), Some("1.2.0"));
325    }
326
327    #[tokio::test]
328    async fn build__session_id_falls_back_to_response_header_when_working_session_empty() {
329        let proxy = test_proxy_state();
330        let mut cx = test_context(proxy);
331        cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
332        let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-abc");
333        let ev = build_request_event("p", &cx, &resp);
334        assert_eq!(ev.session_id.as_deref(), Some("sess-abc"));
335    }
336
337    #[tokio::test]
338    async fn build__session_id_uses_working_session_when_present() {
339        use crate::protocol::session::SessionInfo;
340        let proxy = test_proxy_state();
341        let mut cx = test_context(proxy);
342        cx.working.session = Some(SessionInfo {
343            id: "sess-working".into(),
344            state: crate::protocol::session::SessionState::Active,
345            client_info: None,
346            request_count: 0,
347            created_at: chrono::Utc::now(),
348            last_active: chrono::Utc::now(),
349        });
350        let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-header");
351        let ev = build_request_event("p", &cx, &resp);
352        assert_eq!(ev.session_id.as_deref(), Some("sess-working"));
353    }
354
355    #[tokio::test]
356    async fn build__stage_timings_propagated() {
357        let proxy = test_proxy_state();
358        let mut cx = test_context(proxy);
359        cx.working.timings.push(StageTiming {
360            name: "intake_parse",
361            elapsed_us: 42,
362        });
363        cx.working.timings.push(StageTiming {
364            name: "csp_rewrite",
365            elapsed_us: 100,
366        });
367        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
368        let ev = build_request_event("p", &cx, &resp);
369        let timings = ev.stage_timings.expect("stage_timings populated");
370        assert_eq!(timings.len(), 2);
371        assert_eq!(timings[0].name, "intake_parse");
372        assert_eq!(timings[0].elapsed_us, 42);
373        assert_eq!(timings[1].name, "csp_rewrite");
374    }
375
376    #[tokio::test]
377    async fn build__tags_joined_with_plus() {
378        let proxy = test_proxy_state();
379        let mut cx = test_context(proxy);
380        cx.working.tags.push("rewritten");
381        cx.working.tags.push("sse");
382        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
383        let ev = build_request_event("p", &cx, &resp);
384        assert_eq!(ev.note, "rewritten+sse");
385    }
386}