mcpr_core/proxy/pipeline/values.rs
1//! Top-level value types for the pipeline.
2//!
3//! See `PIPELINE.md` §Types. These are the sum types
4//! passed between pipeline stages: `Request` in, `Response` out, with
5//! `Context` threaded by reference.
6
7use std::fmt;
8use std::sync::Arc;
9use std::time::Instant;
10
11use axum::{
12 body::{Body, Bytes},
13 http::{HeaderMap, HeaderValue, Method, StatusCode, header::CONTENT_TYPE},
14 response::IntoResponse,
15};
16
17use crate::protocol::session::{ClientInfo, SessionInfo};
18use crate::proxy::ProxyState;
19use crate::proxy::forwarding::build_response;
20use crate::proxy::sse::wrap_as_sse;
21
22use crate::protocol::jsonrpc::JsonRpcEnvelope;
23use crate::protocol::mcp::{ClientKind, ClientMethod, McpMessage};
24
25use super::stubs::{OAuthKind, SessionId, TagSet, UrlMap};
26
27// ── Request side ─────────────────────────────────────────────
28
29/// Top-level sum type produced by intake. Owns its body.
30#[derive(Debug)]
31pub enum Request {
32 /// JSON-RPC 2.0 over streamable HTTP or legacy SSE.
33 Mcp(McpRequest),
34 /// OAuth / discovery / token / callback — content-matched.
35 OAuth(OAuthRequest),
36 /// Everything else — forwarded unchanged, no inspection.
37 Raw(RawRequest),
38}
39
40/// An MCP HTTP request from the client. Body carries exactly one
41/// JSON-RPC message — no batching per MCP 2025-11-25.
42#[derive(Debug)]
43pub struct McpRequest {
44 pub transport: McpTransport,
45 pub envelope: JsonRpcEnvelope,
46 pub kind: ClientKind,
47 pub headers: HeaderMap,
48 pub session_hint: Option<SessionId>,
49}
50
51/// Which MCP transport the request is using. Streamable HTTP is the
52/// primary path; legacy HTTP+SSE is supported but demoted.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum McpTransport {
55 /// POST body carries a client→server message; response is a single
56 /// JSON body or a chunked stream of messages.
57 StreamableHttpPost,
58 /// GET that opens a server-push stream — used for server→client
59 /// messages outside the request/response pattern.
60 StreamableHttpGet,
61 /// Legacy HTTP+SSE: GET with `Accept: text/event-stream`.
62 SseLegacyGet,
63}
64
65#[derive(Debug)]
66pub struct OAuthRequest {
67 pub kind: OAuthKind,
68 pub body: Bytes,
69 pub headers: HeaderMap,
70}
71
72#[derive(Debug)]
73pub struct RawRequest {
74 pub method: Method,
75 pub path: String,
76 pub body: Body,
77 pub headers: HeaderMap,
78}
79
80// ── Response side ────────────────────────────────────────────
81
82/// Sum type produced by the transport, or by a short-circuiting
83/// middleware. `impl IntoResponse for Response` (below) converts this
84/// into an axum response at the edge.
85#[derive(Debug)]
86pub enum Response {
87 /// Buffered MCP response: one parsed `McpMessage`, mutated in place
88 /// by response middlewares, serialized once by `EnvelopeSeal`.
89 McpBuffered {
90 envelope: Envelope,
91 message: McpMessage,
92 status: StatusCode,
93 headers: HeaderMap,
94 },
95 /// Streamed MCP response: bytes forwarded as-is. Content-touching
96 /// middlewares do not fire on this variant.
97 McpStreamed {
98 envelope: Envelope,
99 body: Body,
100 status: StatusCode,
101 headers: HeaderMap,
102 },
103 /// OAuth discovery / token JSON — a parsed document that
104 /// `UrlMapMiddleware` rewrites before `IntoResponse`.
105 OauthJson {
106 doc: serde_json::Value,
107 status: StatusCode,
108 headers: HeaderMap,
109 },
110 /// Forwarded raw body — no inspection.
111 Raw {
112 body: Body,
113 status: StatusCode,
114 headers: HeaderMap,
115 },
116 /// Upstream failure. Travels through the response chain like any
117 /// other response — `HealthTrack` records it, `emit` tags the event
118 /// as `upstream error`, and `IntoResponse` renders a 502.
119 Upstream502 { reason: String },
120}
121
122/// Framing of a buffered MCP response. Data, not control flow — the
123/// final `EnvelopeSeal` stage applies the wrap once.
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub enum Envelope {
126 Json,
127 Sse,
128}
129
130impl IntoResponse for Response {
131 fn into_response(self) -> axum::response::Response {
132 match self {
133 Response::Raw {
134 body,
135 status,
136 headers,
137 } => build_response(status.as_u16(), &headers, body),
138 Response::McpStreamed {
139 body,
140 status,
141 headers,
142 ..
143 } => build_response(status.as_u16(), &headers, body),
144 Response::McpBuffered {
145 envelope: env,
146 message,
147 status,
148 mut headers,
149 } => {
150 let json_bytes = message.envelope.to_bytes();
151 let (bytes, ct) = match env {
152 Envelope::Json => (json_bytes, "application/json"),
153 Envelope::Sse => (wrap_as_sse(&json_bytes), "text/event-stream"),
154 };
155 headers.insert(CONTENT_TYPE, HeaderValue::from_static(ct));
156 build_response(status.as_u16(), &headers, Body::from(bytes))
157 }
158 Response::OauthJson {
159 doc,
160 status,
161 mut headers,
162 } => {
163 let bytes = serde_json::to_vec(&doc).unwrap_or_default();
164 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
165 build_response(status.as_u16(), &headers, Body::from(bytes))
166 }
167 Response::Upstream502 { reason } => {
168 (StatusCode::BAD_GATEWAY, format!("Upstream error: {reason}")).into_response()
169 }
170 }
171 }
172}
173
174// ── Route ────────────────────────────────────────────────────
175
176/// Output of the router. Declarative; no I/O.
177#[derive(Debug, Clone)]
178pub enum Route {
179 McpStreamableHttp {
180 upstream: String,
181 method: ClientMethod,
182 buffer_policy: BufferPolicy,
183 },
184 McpSseLegacy {
185 upstream: String,
186 },
187 Oauth {
188 upstream: String,
189 rewrite: UrlMap,
190 },
191 Raw {
192 upstream: String,
193 },
194}
195
196/// Whether the transport should collect the upstream body or forward
197/// bytes as they arrive. Owned by the routing table, not by the
198/// protocol enum.
199#[derive(Debug, Clone, Copy, PartialEq, Eq)]
200pub enum BufferPolicy {
201 Streamed,
202 Buffered { max: usize },
203}
204
205// ── Context ──────────────────────────────────────────────────
206
207/// Per-request state carried by reference through both chains. Split so
208/// the type system distinguishes immutable-after-intake fields from
209/// mutable working state.
210#[derive(Debug)]
211pub struct Context {
212 pub intake: Intake,
213 pub working: Working,
214}
215
216/// Set once at intake, read many times. Changing anything here after
217/// intake is a type error.
218pub struct Intake {
219 pub start: Instant,
220 pub proxy: Arc<ProxyState>,
221 pub http_method: Method,
222 pub path: String,
223 pub request_size: usize,
224}
225
226// `ProxyState` does not implement `Debug`. Print its name and skip its
227// internals so `Intake` can still be logged/asserted on in tests.
228impl fmt::Debug for Intake {
229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230 f.debug_struct("Intake")
231 .field("start", &self.start)
232 .field("proxy", &"Arc<ProxyState>")
233 .field("http_method", &self.http_method)
234 .field("path", &self.path)
235 .field("request_size", &self.request_size)
236 .finish()
237 }
238}
239
240/// Mutated by middlewares as they learn about the request. Final
241/// contents feed the `Emit` stage.
242#[derive(Debug, Default)]
243pub struct Working {
244 pub session: Option<SessionInfo>,
245 pub client: Option<ClientInfo>,
246 /// Originating client method, stashed on the request side so
247 /// response-side middlewares know what produced the response.
248 pub request_method: Option<ClientMethod>,
249 /// Tool name for `tools/call`, stashed on the request side so the
250 /// emitter can populate `RequestEvent.tool` without re-parsing.
251 pub request_tool: Option<String>,
252 /// Resource URI for `resources/{read,subscribe,unsubscribe}`. Feeds
253 /// `RequestEvent.resource_uri`.
254 pub request_resource_uri: Option<String>,
255 /// Prompt name for `prompts/get`. Feeds `RequestEvent.prompt_name`.
256 pub request_prompt_name: Option<String>,
257 /// Serialized response body size in bytes. `EnvelopeSealMiddleware`
258 /// fills this on the buffered path; streaming paths leave it `None`.
259 /// Feeds `RequestEvent.response_size`.
260 pub response_size: Option<u64>,
261 /// Wall-clock time spent in `forward_request` (network RTT +
262 /// upstream work). Populated by `ProxyTransport`. Feeds
263 /// `RequestEvent.upstream_us`.
264 pub upstream_us: Option<u64>,
265 pub tags: TagSet,
266 /// Per-stage wall-clock timings, pushed in order as each stage
267 /// completes. Populated only when `MCPR_STAGE_TIMING` is set —
268 /// otherwise stays empty. Feeds `RequestEvent.stage_timings`.
269 pub timings: Vec<StageTiming>,
270}
271
272/// One named wall-clock measurement for a single pipeline stage.
273///
274/// Each middleware and named non-middleware site (intake parse,
275/// transport upstream/buffer/unwrap/parse) pushes one of these onto
276/// `Working.timings` when stage timing is enabled. The driver sums
277/// nothing — duplicates are fine; aggregators (e.g. the bench
278/// diagnostic) group by `name`.
279#[derive(Debug, Clone, serde::Serialize)]
280pub struct StageTiming {
281 pub name: &'static str,
282 pub elapsed_us: u64,
283}