Skip to main content

mcpr_core/proxy/pipeline/
values.rs

1//! Top-level value types for the pipeline.
2//!
3//! See `PIPELINE.md` §Types. These are the sum types
4//! passed between pipeline stages: `Request` in, `Response` out, with
5//! `Context` threaded by reference.
6
7use std::fmt;
8use std::sync::Arc;
9use std::time::Instant;
10
11use axum::{
12    body::{Body, Bytes},
13    http::{HeaderMap, HeaderValue, Method, StatusCode, header::CONTENT_TYPE},
14    response::IntoResponse,
15};
16
17use crate::protocol::session::{ClientInfo, SessionInfo};
18use crate::proxy::ProxyState;
19use crate::proxy::forwarding::build_response;
20use crate::proxy::sse::wrap_as_sse;
21
22use crate::protocol::jsonrpc::JsonRpcEnvelope;
23use crate::protocol::mcp::{ClientKind, ClientMethod, McpMessage};
24
25use super::stubs::{OAuthKind, SessionId, TagSet, UrlMap};
26
27// ── Request side ─────────────────────────────────────────────
28
29/// Top-level sum type produced by intake. Owns its body.
30#[derive(Debug)]
31pub enum Request {
32    /// JSON-RPC 2.0 over streamable HTTP or legacy SSE.
33    Mcp(McpRequest),
34    /// OAuth / discovery / token / callback — content-matched.
35    OAuth(OAuthRequest),
36    /// Everything else — forwarded unchanged, no inspection.
37    Raw(RawRequest),
38}
39
40/// An MCP HTTP request from the client. Body carries exactly one
41/// JSON-RPC message — no batching per MCP 2025-11-25.
42#[derive(Debug)]
43pub struct McpRequest {
44    pub transport: McpTransport,
45    pub envelope: JsonRpcEnvelope,
46    pub kind: ClientKind,
47    pub headers: HeaderMap,
48    pub session_hint: Option<SessionId>,
49}
50
51/// Which MCP transport the request is using. Streamable HTTP is the
52/// primary path; legacy HTTP+SSE is supported but demoted.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum McpTransport {
55    /// POST body carries a client→server message; response is a single
56    /// JSON body or a chunked stream of messages.
57    StreamableHttpPost,
58    /// GET that opens a server-push stream — used for server→client
59    /// messages outside the request/response pattern.
60    StreamableHttpGet,
61    /// Legacy HTTP+SSE: GET with `Accept: text/event-stream`.
62    SseLegacyGet,
63}
64
65#[derive(Debug)]
66pub struct OAuthRequest {
67    pub kind: OAuthKind,
68    pub body: Bytes,
69    pub headers: HeaderMap,
70}
71
72#[derive(Debug)]
73pub struct RawRequest {
74    pub method: Method,
75    pub path: String,
76    pub body: Body,
77    pub headers: HeaderMap,
78}
79
80// ── Response side ────────────────────────────────────────────
81
82/// Sum type produced by the transport, or by a short-circuiting
83/// middleware. `impl IntoResponse for Response` (below) converts this
84/// into an axum response at the edge.
85#[derive(Debug)]
86pub enum Response {
87    /// Buffered MCP response: one parsed `McpMessage`, mutated in place
88    /// by response middlewares, serialized once by `EnvelopeSeal`.
89    McpBuffered {
90        envelope: Envelope,
91        message: McpMessage,
92        status: StatusCode,
93        headers: HeaderMap,
94    },
95    /// Streamed MCP response: bytes forwarded as-is. Content-touching
96    /// middlewares do not fire on this variant.
97    McpStreamed {
98        envelope: Envelope,
99        body: Body,
100        status: StatusCode,
101        headers: HeaderMap,
102    },
103    /// OAuth discovery / token JSON — a parsed document that
104    /// `UrlMapMiddleware` rewrites before `IntoResponse`.
105    OauthJson {
106        doc: serde_json::Value,
107        status: StatusCode,
108        headers: HeaderMap,
109    },
110    /// Forwarded raw body — no inspection.
111    Raw {
112        body: Body,
113        status: StatusCode,
114        headers: HeaderMap,
115    },
116    /// Upstream failure. Travels through the response chain like any
117    /// other response — `HealthTrack` records it, `emit` tags the event
118    /// as `upstream error`, and `IntoResponse` renders a 502.
119    Upstream502 { reason: String },
120}
121
122/// Framing of a buffered MCP response. Data, not control flow — the
123/// final `EnvelopeSeal` stage applies the wrap once.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum Envelope {
126    Json,
127    Sse,
128}
129
130impl IntoResponse for Response {
131    fn into_response(self) -> axum::response::Response {
132        match self {
133            Response::Raw {
134                body,
135                status,
136                headers,
137            } => build_response(status.as_u16(), &headers, body),
138            Response::McpStreamed {
139                body,
140                status,
141                headers,
142                ..
143            } => build_response(status.as_u16(), &headers, body),
144            Response::McpBuffered {
145                envelope: env,
146                message,
147                status,
148                mut headers,
149            } => {
150                let json_bytes = message.envelope.to_bytes();
151                let (bytes, ct) = match env {
152                    Envelope::Json => (json_bytes, "application/json"),
153                    Envelope::Sse => (wrap_as_sse(&json_bytes), "text/event-stream"),
154                };
155                headers.insert(CONTENT_TYPE, HeaderValue::from_static(ct));
156                build_response(status.as_u16(), &headers, Body::from(bytes))
157            }
158            Response::OauthJson {
159                doc,
160                status,
161                mut headers,
162            } => {
163                let bytes = serde_json::to_vec(&doc).unwrap_or_default();
164                headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
165                build_response(status.as_u16(), &headers, Body::from(bytes))
166            }
167            Response::Upstream502 { reason } => {
168                (StatusCode::BAD_GATEWAY, format!("Upstream error: {reason}")).into_response()
169            }
170        }
171    }
172}
173
174// ── Route ────────────────────────────────────────────────────
175
176/// Output of the router. Declarative; no I/O.
177#[derive(Debug, Clone)]
178pub enum Route {
179    McpStreamableHttp {
180        upstream: String,
181        method: ClientMethod,
182        buffer_policy: BufferPolicy,
183    },
184    McpSseLegacy {
185        upstream: String,
186    },
187    Oauth {
188        upstream: String,
189        rewrite: UrlMap,
190    },
191    Raw {
192        upstream: String,
193    },
194}
195
196/// Whether the transport should collect the upstream body or forward
197/// bytes as they arrive. Owned by the routing table, not by the
198/// protocol enum.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub enum BufferPolicy {
201    Streamed,
202    Buffered { max: usize },
203}
204
205// ── Context ──────────────────────────────────────────────────
206
207/// Per-request state carried by reference through both chains. Split so
208/// the type system distinguishes immutable-after-intake fields from
209/// mutable working state.
210#[derive(Debug)]
211pub struct Context {
212    pub intake: Intake,
213    pub working: Working,
214}
215
216/// Set once at intake, read many times. Changing anything here after
217/// intake is a type error.
218pub struct Intake {
219    pub start: Instant,
220    pub proxy: Arc<ProxyState>,
221    pub http_method: Method,
222    pub path: String,
223    pub request_size: usize,
224}
225
226// `ProxyState` does not implement `Debug`. Print its name and skip its
227// internals so `Intake` can still be logged/asserted on in tests.
228impl fmt::Debug for Intake {
229    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230        f.debug_struct("Intake")
231            .field("start", &self.start)
232            .field("proxy", &"Arc<ProxyState>")
233            .field("http_method", &self.http_method)
234            .field("path", &self.path)
235            .field("request_size", &self.request_size)
236            .finish()
237    }
238}
239
240/// Mutated by middlewares as they learn about the request. Final
241/// contents feed the `Emit` stage.
242#[derive(Debug, Default)]
243pub struct Working {
244    pub session: Option<SessionInfo>,
245    pub client: Option<ClientInfo>,
246    /// Originating client method, stashed on the request side so
247    /// response-side middlewares know what produced the response.
248    pub request_method: Option<ClientMethod>,
249    /// Tool name for `tools/call`, stashed on the request side so the
250    /// emitter can populate `RequestEvent.tool` without re-parsing.
251    pub request_tool: Option<String>,
252    /// Resource URI for `resources/{read,subscribe,unsubscribe}`. Feeds
253    /// `RequestEvent.resource_uri`.
254    pub request_resource_uri: Option<String>,
255    /// Prompt name for `prompts/get`. Feeds `RequestEvent.prompt_name`.
256    pub request_prompt_name: Option<String>,
257    /// Serialized response body size in bytes. `EnvelopeSealMiddleware`
258    /// fills this on the buffered path; streaming paths leave it `None`.
259    /// Feeds `RequestEvent.response_size`.
260    pub response_size: Option<u64>,
261    /// Wall-clock time spent in `forward_request` (network RTT +
262    /// upstream work). Populated by `ProxyTransport`. Feeds
263    /// `RequestEvent.upstream_us`.
264    pub upstream_us: Option<u64>,
265    pub tags: TagSet,
266    /// Per-stage wall-clock timings, pushed in order as each stage
267    /// completes. Populated only when `MCPR_STAGE_TIMING` is set —
268    /// otherwise stays empty. Feeds `RequestEvent.stage_timings`.
269    pub timings: Vec<StageTiming>,
270}
271
272/// One named wall-clock measurement for a single pipeline stage.
273///
274/// Each middleware and named non-middleware site (intake parse,
275/// transport upstream/buffer/unwrap/parse) pushes one of these onto
276/// `Working.timings` when stage timing is enabled. The driver sums
277/// nothing — duplicates are fine; aggregators (e.g. the bench
278/// diagnostic) group by `name`.
279#[derive(Debug, Clone, serde::Serialize)]
280pub struct StageTiming {
281    pub name: &'static str,
282    pub elapsed_us: u64,
283}