Skip to main content

mcpr_core/proxy/
emit.rs

1//! `RequestEvent` construction and emission.
2//!
3//! The sole construction site for [`RequestEvent`]. `handle_request`
4//! calls [`emit`] once, immediately after the response chain has
5//! finished, before `IntoResponse` converts the value to an axum
6//! response.
7//!
8//! `mcpr-cloud/backend/` consumes `RequestEvent` and session-lifecycle
9//! events — the inline tests below cover the field shapes both rely on.
10
11use crate::event::{ProxyEvent, RequestEvent};
12
13use super::pipeline::middlewares::shared;
14use super::pipeline::values::{Context, Envelope, Response, StageTiming};
15
16/// Build a `RequestEvent` from the accumulated context and the final
17/// response. Separated from [`emit`] so it can be unit-tested without a
18/// live event bus.
19pub fn build_request_event(proxy_name: &str, cx: &Context, resp: &Response) -> RequestEvent {
20    let status = derive_status(resp);
21    let response_size = derive_response_size(cx, resp);
22    let (error_code, error_msg) = derive_error(resp);
23    let (mcp_method_str, tool) = derive_method_and_tool(cx);
24    let session_id = derive_session_id(cx, resp);
25    let (client_name, client_version) = derive_client(cx);
26    let note = derive_note(cx, resp);
27
28    RequestEvent {
29        id: uuid::Uuid::new_v4().to_string(),
30        ts: chrono::Utc::now().timestamp_millis(),
31        proxy: proxy_name.to_string(),
32        session_id,
33        method: cx.intake.http_method.to_string(),
34        path: cx.intake.path.clone(),
35        mcp_method: mcp_method_str,
36        tool,
37        status,
38        latency_us: cx.intake.start.elapsed().as_micros() as u64,
39        upstream_us: cx.working.upstream_us,
40        request_size: Some(cx.intake.request_size as u64),
41        response_size,
42        error_code,
43        error_msg: error_msg.map(|m| m.chars().take(512).collect()),
44        client_name,
45        client_version,
46        note,
47        stage_timings: derive_stage_timings(&cx.working.timings),
48    }
49}
50
51fn derive_stage_timings(timings: &[StageTiming]) -> Option<Vec<StageTiming>> {
52    if timings.is_empty() {
53        None
54    } else {
55        Some(timings.to_vec())
56    }
57}
58
59/// Emit a `Request` event to the proxy's event bus. Called once per
60/// request, after the response chain.
61pub fn emit(cx: &Context, resp: &Response) {
62    let state = &cx.intake.proxy;
63    state
64        .event_bus
65        .emit(ProxyEvent::Request(Box::new(build_request_event(
66            &state.name,
67            cx,
68            resp,
69        ))));
70}
71
72fn derive_status(resp: &Response) -> u16 {
73    match resp {
74        Response::McpBuffered { status, .. }
75        | Response::McpStreamed { status, .. }
76        | Response::OauthJson { status, .. }
77        | Response::Raw { status, .. } => status.as_u16(),
78        Response::Upstream502 { .. } => 502,
79    }
80}
81
82fn derive_response_size(cx: &Context, _resp: &Response) -> Option<u64> {
83    // `EnvelopeSealMiddleware` stashes the serialized buffered-body size
84    // onto `cx.working.response_size`. Streaming paths and 502s leave it
85    // unset — matching the legacy `response_size: None` on those paths.
86    cx.working.response_size
87}
88
89fn derive_error(resp: &Response) -> (Option<String>, Option<String>) {
90    match resp {
91        Response::Upstream502 { reason } => (None, Some(reason.clone())),
92        Response::McpBuffered { message, .. } => {
93            if let Some(err) = &message.envelope.error {
94                (Some(err.code.to_string()), Some(err.message.clone()))
95            } else {
96                (None, None)
97            }
98        }
99        _ => (None, None),
100    }
101}
102
103fn derive_method_and_tool(cx: &Context) -> (Option<String>, Option<String>) {
104    // Legacy emit.rs tagged SSE GETs with `mcp_method = Some("SSE")` —
105    // preserve that for cloud/backend continuity. Every other MCP method
106    // comes from `cx.working.request_method` (set by SessionTouch).
107    let http_get_is_sse = cx.intake.http_method == axum::http::Method::GET;
108    let method_str = cx
109        .working
110        .request_method
111        .as_ref()
112        .and_then(crate::protocol::mcp::ClientMethod::as_str)
113        .map(str::to_owned);
114    let mcp_method_str = match (method_str, http_get_is_sse) {
115        (Some(m), _) => Some(m),
116        (None, true) => Some("SSE".to_owned()),
117        (None, false) => None,
118    };
119    (mcp_method_str, cx.working.request_tool.clone())
120}
121
122fn derive_session_id(cx: &Context, resp: &Response) -> Option<String> {
123    // Prefer the session we touched/created on the request side. Fall
124    // back to reading the header off the response for `initialize`,
125    // where `SessionRecord` creates the session but does not mutate
126    // `cx.working.session`.
127    if let Some(s) = cx.working.session.as_ref() {
128        return Some(s.id.clone());
129    }
130    let headers = match resp {
131        Response::McpBuffered { headers, .. }
132        | Response::McpStreamed { headers, .. }
133        | Response::Raw { headers, .. } => Some(headers),
134        _ => None,
135    };
136    headers
137        .and_then(|h| h.get("mcp-session-id"))
138        .and_then(|v| v.to_str().ok())
139        .map(str::to_owned)
140}
141
142fn derive_client(cx: &Context) -> (Option<String>, Option<String>) {
143    // Request-side `ClientInfoInjectMiddleware` stashes `clientInfo` on
144    // the initialize path. Subsequent requests read it from the session
145    // record that `SessionTouchMiddleware` loaded into `cx.working.session`.
146    if let Some(c) = cx.working.client.as_ref() {
147        return (Some(c.name.clone()), c.version.clone());
148    }
149    if let Some(s) = cx.working.session.as_ref()
150        && let Some(c) = s.client_info.as_ref()
151    {
152        return (Some(c.name.clone()), c.version.clone());
153    }
154    (None, None)
155}
156
157fn derive_note(cx: &Context, resp: &Response) -> String {
158    // Middlewares push tags as they run. The Emitter appends shape-derived
159    // tags that middlewares can't know about from their vantage point —
160    // specifically `upstream error`, which only makes sense once we see
161    // the final `Response::Upstream502` variant.
162    let mut tags: Vec<&str> = cx.working.tags.as_slice().to_vec();
163    if matches!(resp, Response::Upstream502 { .. }) && !tags.contains(&"upstream error") {
164        tags.push("upstream error");
165    }
166    // For SSE legacy GETs the transport returns `Response::McpStreamed`
167    // with `Envelope::Sse`. EnvelopeSeal does not touch streamed
168    // responses, so no middleware tags it — surface from shape here to
169    // match legacy `note = "sse"`.
170    if matches!(
171        resp,
172        Response::McpStreamed {
173            envelope: Envelope::Sse,
174            ..
175        }
176    ) && !tags.contains(&"sse")
177    {
178        tags.push("sse");
179    }
180    tags.join("+")
181}
182
183/// Normalize a client name to a platform identifier used in `SessionStart`.
184/// Thin re-export of `pipeline::middlewares::shared::normalize_platform`
185/// so external callers of this module don't have to reach into the
186/// middlewares submodule.
187pub fn normalize_platform(client_name: &str) -> &'static str {
188    shared::normalize_platform(client_name)
189}
190
191#[cfg(test)]
192#[allow(non_snake_case)]
193mod tests {
194    use super::*;
195
196    use axum::body::Body;
197    use axum::http::{HeaderMap, HeaderValue, Method, StatusCode};
198
199    use crate::protocol::jsonrpc::JsonRpcEnvelope;
200    use crate::protocol::mcp::{
201        ClientMethod, LifecycleMethod, McpMessage, MessageKind, ServerKind, ToolsMethod,
202    };
203    use crate::proxy::pipeline::middlewares::test_support::{test_context, test_proxy_state};
204    use crate::proxy::pipeline::values::{Envelope, Response};
205
206    fn buffered_ok(body: &str) -> Response {
207        let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
208        let message = McpMessage {
209            envelope,
210            kind: MessageKind::Server(ServerKind::Result),
211        };
212        Response::McpBuffered {
213            envelope: Envelope::Json,
214            message,
215            status: StatusCode::OK,
216            headers: HeaderMap::new(),
217        }
218    }
219
220    fn buffered_with_session(body: &str, session_id: &str) -> Response {
221        let envelope = JsonRpcEnvelope::parse(body.as_bytes()).unwrap();
222        let message = McpMessage {
223            envelope,
224            kind: MessageKind::Server(ServerKind::Result),
225        };
226        let mut headers = HeaderMap::new();
227        headers.insert("mcp-session-id", HeaderValue::from_str(session_id).unwrap());
228        Response::McpBuffered {
229            envelope: Envelope::Json,
230            message,
231            status: StatusCode::OK,
232            headers,
233        }
234    }
235
236    #[tokio::test]
237    async fn build__tools_list_200_ok_sets_method_and_status() {
238        let proxy = test_proxy_state();
239        let mut cx = test_context(proxy);
240        cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
241        cx.working.tags.push("rewritten");
242
243        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}"#);
244        let ev = build_request_event("test-proxy", &cx, &resp);
245
246        assert_eq!(ev.status, 200);
247        assert_eq!(ev.mcp_method.as_deref(), Some("tools/list"));
248        assert_eq!(ev.proxy, "test-proxy");
249        assert_eq!(ev.method, "POST");
250        assert_eq!(ev.path, "/mcp");
251        assert_eq!(ev.note, "rewritten");
252        assert!(ev.error_code.is_none());
253    }
254
255    #[tokio::test]
256    async fn build__rpc_error_in_buffered_result_surfaces_code_and_message() {
257        let proxy = test_proxy_state();
258        let mut cx = test_context(proxy);
259        cx.working.request_method = Some(ClientMethod::Tools(ToolsMethod::List));
260        let resp =
261            buffered_ok(r#"{"jsonrpc":"2.0","id":1,"error":{"code":-32600,"message":"bad"}}"#);
262        let ev = build_request_event("p", &cx, &resp);
263        assert_eq!(ev.error_code.as_deref(), Some("-32600"));
264        assert_eq!(ev.error_msg.as_deref(), Some("bad"));
265    }
266
267    #[tokio::test]
268    async fn build__upstream_502_tags_note_as_upstream_error() {
269        let proxy = test_proxy_state();
270        let cx = test_context(proxy);
271        let resp = Response::Upstream502 {
272            reason: "connection refused".into(),
273        };
274        let ev = build_request_event("p", &cx, &resp);
275        assert_eq!(ev.status, 502);
276        assert_eq!(ev.note, "upstream error");
277        assert_eq!(ev.error_msg.as_deref(), Some("connection refused"));
278    }
279
280    #[tokio::test]
281    async fn build__sse_streamed_response_tags_note_as_sse() {
282        let proxy = test_proxy_state();
283        let cx = test_context(proxy);
284        let resp = Response::McpStreamed {
285            envelope: Envelope::Sse,
286            body: Body::empty(),
287            status: StatusCode::OK,
288            headers: HeaderMap::new(),
289        };
290        let ev = build_request_event("p", &cx, &resp);
291        assert_eq!(ev.note, "sse");
292    }
293
294    #[tokio::test]
295    async fn build__sse_get_without_stashed_method_reports_mcp_method_as_SSE_literal() {
296        let proxy = test_proxy_state();
297        let mut cx = test_context(proxy);
298        cx.intake.http_method = Method::GET;
299        let resp = Response::McpStreamed {
300            envelope: Envelope::Sse,
301            body: Body::empty(),
302            status: StatusCode::OK,
303            headers: HeaderMap::new(),
304        };
305        let ev = build_request_event("p", &cx, &resp);
306        assert_eq!(ev.mcp_method.as_deref(), Some("SSE"));
307    }
308
309    #[tokio::test]
310    async fn build__client_info_preferred_over_session_when_set() {
311        use crate::protocol::session::ClientInfo;
312        let proxy = test_proxy_state();
313        let mut cx = test_context(proxy);
314        cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
315        cx.working.client = Some(ClientInfo {
316            name: "claude-desktop".into(),
317            version: Some("1.2.0".into()),
318        });
319        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
320        let ev = build_request_event("p", &cx, &resp);
321        assert_eq!(ev.client_name.as_deref(), Some("claude-desktop"));
322        assert_eq!(ev.client_version.as_deref(), Some("1.2.0"));
323    }
324
325    #[tokio::test]
326    async fn build__session_id_falls_back_to_response_header_when_working_session_empty() {
327        let proxy = test_proxy_state();
328        let mut cx = test_context(proxy);
329        cx.working.request_method = Some(ClientMethod::Lifecycle(LifecycleMethod::Initialize));
330        let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-abc");
331        let ev = build_request_event("p", &cx, &resp);
332        assert_eq!(ev.session_id.as_deref(), Some("sess-abc"));
333    }
334
335    #[tokio::test]
336    async fn build__session_id_uses_working_session_when_present() {
337        use crate::protocol::session::SessionInfo;
338        let proxy = test_proxy_state();
339        let mut cx = test_context(proxy);
340        cx.working.session = Some(SessionInfo {
341            id: "sess-working".into(),
342            state: crate::protocol::session::SessionState::Active,
343            client_info: None,
344            request_count: 0,
345            created_at: chrono::Utc::now(),
346            last_active: chrono::Utc::now(),
347        });
348        let resp = buffered_with_session(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#, "sess-header");
349        let ev = build_request_event("p", &cx, &resp);
350        assert_eq!(ev.session_id.as_deref(), Some("sess-working"));
351    }
352
353    #[tokio::test]
354    async fn build__stage_timings_propagated() {
355        let proxy = test_proxy_state();
356        let mut cx = test_context(proxy);
357        cx.working.timings.push(StageTiming {
358            name: "intake_parse",
359            elapsed_us: 42,
360        });
361        cx.working.timings.push(StageTiming {
362            name: "csp_rewrite",
363            elapsed_us: 100,
364        });
365        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
366        let ev = build_request_event("p", &cx, &resp);
367        let timings = ev.stage_timings.expect("stage_timings populated");
368        assert_eq!(timings.len(), 2);
369        assert_eq!(timings[0].name, "intake_parse");
370        assert_eq!(timings[0].elapsed_us, 42);
371        assert_eq!(timings[1].name, "csp_rewrite");
372    }
373
374    #[tokio::test]
375    async fn build__tags_joined_with_plus() {
376        let proxy = test_proxy_state();
377        let mut cx = test_context(proxy);
378        cx.working.tags.push("rewritten");
379        cx.working.tags.push("sse");
380        let resp = buffered_ok(r#"{"jsonrpc":"2.0","id":1,"result":{}}"#);
381        let ev = build_request_event("p", &cx, &resp);
382        assert_eq!(ev.note, "rewritten+sse");
383    }
384}