osproxy_transport/request.rs
1//! The owned request/response shapes the ingress hands to a handler.
2//!
3//! The transport parses bytes off the wire into an [`IngressRequest`] (owned, so
4//! it outlives the borrowed hyper request) and writes an [`IngressResponse`]
5//! back. It carries no routing or tenancy meaning, just the parsed HTTP facts
6//! plus the endpoint classification.
7
8use bytes::Bytes;
9use http_body_util::combinators::UnsyncBoxBody;
10use http_body_util::{BodyExt, Full};
11use osproxy_core::EndpointKind;
12use osproxy_spi::{HttpMethod, Protocol};
13
14/// The transport's HTTP response body: boxed so a response may be buffered bytes
15/// or a **live stream** piped from the upstream without buffering (ADR-014).
16/// Unsync, the server only needs `Send`. Structurally identical to
17/// `osproxy-sink`'s `ByteBody`, so a streamed upstream response flows through
18/// as-is, no copy.
19pub type ResponseBody = UnsyncBoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
20
21/// Wraps fully-buffered bytes as a [`ResponseBody`] (the buffered response path).
22#[must_use]
23pub fn buffered_response(body: Vec<u8>) -> ResponseBody {
24 Full::new(Bytes::from(body))
25 .map_err(|never| match never {})
26 .boxed_unsync()
27}
28
29/// A streaming response a handler returns for a verbatim forward (ADR-014): a
30/// status, extra headers, and a body piped to the client without buffering.
31pub struct StreamingResponse {
32 /// The HTTP status code.
33 pub status: u16,
34 /// Extra response headers (beyond the content type the transport sets).
35 pub headers: Vec<(String, String)>,
36 /// The response body, a live stream, or buffered bytes for an error.
37 pub body: ResponseBody,
38}
39
40impl std::fmt::Debug for StreamingResponse {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 // The body is a stream, not `Debug`; show the rest of the shape.
43 f.debug_struct("StreamingResponse")
44 .field("status", &self.status)
45 .field("headers", &self.headers)
46 .finish_non_exhaustive()
47 }
48}
49
50impl StreamingResponse {
51 /// A response whose body is a live stream.
52 #[must_use]
53 pub fn stream(status: u16, body: ResponseBody) -> Self {
54 Self {
55 status,
56 headers: Vec::new(),
57 body,
58 }
59 }
60
61 /// A response with a buffered body (e.g. an error), boxed into the streaming
62 /// body type so both kinds share one response type.
63 #[must_use]
64 pub fn buffered(status: u16, body: Vec<u8>) -> Self {
65 Self {
66 status,
67 headers: Vec::new(),
68 body: buffered_response(body),
69 }
70 }
71
72 /// Adds a response header (builder style).
73 #[must_use]
74 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
75 self.headers.push((name.into(), value.into()));
76 self
77 }
78}
79
80/// A parsed, owned client request ready for the pipeline.
81#[derive(Clone, PartialEq, Eq, Debug)]
82pub struct IngressRequest {
83 /// The HTTP method.
84 pub method: HttpMethod,
85 /// The wire protocol the request arrived on (HTTP/1.1, HTTP/2, or gRPC). The
86 /// `auto` ingress builder negotiates h1/h2 per connection; the engine records
87 /// it for tracing and may select the upstream protocol from it (`docs/04` ยง7).
88 pub protocol: Protocol,
89 /// The raw request path (used to route proxy admin endpoints such as
90 /// `/debug/explain/{id}` that are not OpenSearch paths).
91 pub path: String,
92 /// The endpoint classification derived from method + path.
93 pub endpoint: EndpointKind,
94 /// The logical index from the path (pre-rewrite), empty if the path has none.
95 pub logical_index: String,
96 /// The document id from the path, if the endpoint carries one (`_doc/{id}`).
97 pub doc_id: Option<String>,
98 /// The request headers, in arrival order.
99 pub headers: Vec<(String, String)>,
100 /// The request body.
101 pub body: Vec<u8>,
102 /// The raw URL query string (without the `?`), if any. The engine forwards
103 /// only an allow-list of cursor params (`scroll`/`keep_alive`) upstream,
104 /// query-affecting params are never forwarded, so the body partition filter
105 /// cannot be bypassed (NFR-S4).
106 pub query: Option<String>,
107 /// The verified client-certificate identity, if the connection was mutually
108 /// authenticated (mTLS). A stable id derived from the cert, never the raw
109 /// certificate material.
110 pub client_cert_subject: Option<String>,
111 /// Whether the request arrived over a TLS-terminated connection. The handler
112 /// refuses to mutate a request body over cleartext, since the proxy must
113 /// terminate TLS to rewrite the stream (NFR-S1).
114 pub secure: bool,
115}
116
117/// The response a handler returns for the transport to write back.
118#[derive(Clone, PartialEq, Eq, Debug)]
119pub struct IngressResponse {
120 /// The HTTP status code.
121 pub status: u16,
122 /// Extra response headers (beyond the JSON content type the transport sets).
123 pub headers: Vec<(String, String)>,
124 /// The response body (JSON).
125 pub body: Vec<u8>,
126}
127
128impl IngressResponse {
129 /// A JSON response with the given status and body.
130 #[must_use]
131 pub fn json(status: u16, body: Vec<u8>) -> Self {
132 Self {
133 status,
134 headers: Vec::new(),
135 body,
136 }
137 }
138
139 /// Adds a response header (builder style).
140 #[must_use]
141 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
142 self.headers.push((name.into(), value.into()));
143 self
144 }
145}