Skip to main content

osproxy_transport/
request.rs

1//! The owned request/response shapes the ingress hands to a handler.
2//!
3//! The transport parses bytes off the wire into an [`IngressRequest`] (owned, so
4//! it outlives the borrowed hyper request) and writes an [`IngressResponse`]
5//! back. It carries no routing or tenancy meaning, just the parsed HTTP facts
6//! plus the endpoint classification.
7
8use bytes::Bytes;
9use http_body_util::combinators::UnsyncBoxBody;
10use http_body_util::{BodyExt, Full};
11use osproxy_core::EndpointKind;
12use osproxy_spi::{HttpMethod, Protocol};
13
14/// The transport's HTTP response body: boxed so a response may be buffered bytes
15/// or a **live stream** piped from the upstream without buffering (ADR-014).
16/// Unsync, the server only needs `Send`. Structurally identical to
17/// `osproxy-sink`'s `ByteBody`, so a streamed upstream response flows through
18/// as-is, no copy.
19pub type ResponseBody = UnsyncBoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
20
21/// Wraps fully-buffered bytes as a [`ResponseBody`] (the buffered response path).
22#[must_use]
23pub fn buffered_response(body: Vec<u8>) -> ResponseBody {
24    Full::new(Bytes::from(body))
25        .map_err(|never| match never {})
26        .boxed_unsync()
27}
28
29/// A streaming response a handler returns for a verbatim forward (ADR-014): a
30/// status, extra headers, and a body piped to the client without buffering.
31pub struct StreamingResponse {
32    /// The HTTP status code.
33    pub status: u16,
34    /// Extra response headers (beyond the content type the transport sets).
35    pub headers: Vec<(String, String)>,
36    /// The response body, a live stream, or buffered bytes for an error.
37    pub body: ResponseBody,
38}
39
40impl std::fmt::Debug for StreamingResponse {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        // The body is a stream, not `Debug`; show the rest of the shape.
43        f.debug_struct("StreamingResponse")
44            .field("status", &self.status)
45            .field("headers", &self.headers)
46            .finish_non_exhaustive()
47    }
48}
49
50impl StreamingResponse {
51    /// A response whose body is a live stream.
52    #[must_use]
53    pub fn stream(status: u16, body: ResponseBody) -> Self {
54        Self {
55            status,
56            headers: Vec::new(),
57            body,
58        }
59    }
60
61    /// A response with a buffered body (e.g. an error), boxed into the streaming
62    /// body type so both kinds share one response type.
63    #[must_use]
64    pub fn buffered(status: u16, body: Vec<u8>) -> Self {
65        Self {
66            status,
67            headers: Vec::new(),
68            body: buffered_response(body),
69        }
70    }
71
72    /// Adds a response header (builder style).
73    #[must_use]
74    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
75        self.headers.push((name.into(), value.into()));
76        self
77    }
78}
79
80/// A parsed, owned client request ready for the pipeline.
81#[derive(Clone, PartialEq, Eq, Debug)]
82pub struct IngressRequest {
83    /// The HTTP method.
84    pub method: HttpMethod,
85    /// The wire protocol the request arrived on (HTTP/1.1, HTTP/2, or gRPC). The
86    /// `auto` ingress builder negotiates h1/h2 per connection; the engine records
87    /// it for tracing and may select the upstream protocol from it (`docs/04` ยง7).
88    pub protocol: Protocol,
89    /// The raw request path (used to route proxy admin endpoints such as
90    /// `/debug/explain/{id}` that are not OpenSearch paths).
91    pub path: String,
92    /// The endpoint classification derived from method + path.
93    pub endpoint: EndpointKind,
94    /// The logical index from the path (pre-rewrite), empty if the path has none.
95    pub logical_index: String,
96    /// The document id from the path, if the endpoint carries one (`_doc/{id}`).
97    pub doc_id: Option<String>,
98    /// The request headers, in arrival order.
99    pub headers: Vec<(String, String)>,
100    /// The request body.
101    pub body: Vec<u8>,
102    /// The raw URL query string (without the `?`), if any. The engine forwards
103    /// only an allow-list of cursor params (`scroll`/`keep_alive`) upstream,
104    /// query-affecting params are never forwarded, so the body partition filter
105    /// cannot be bypassed (NFR-S4).
106    pub query: Option<String>,
107    /// The verified client-certificate identity, if the connection was mutually
108    /// authenticated (mTLS). A stable id derived from the cert, never the raw
109    /// certificate material.
110    pub client_cert_subject: Option<String>,
111    /// Whether the request arrived over a TLS-terminated connection. The handler
112    /// refuses to mutate a request body over cleartext, since the proxy must
113    /// terminate TLS to rewrite the stream (NFR-S1).
114    pub secure: bool,
115}
116
117/// The response a handler returns for the transport to write back.
118#[derive(Clone, PartialEq, Eq, Debug)]
119pub struct IngressResponse {
120    /// The HTTP status code.
121    pub status: u16,
122    /// Extra response headers (beyond the JSON content type the transport sets).
123    pub headers: Vec<(String, String)>,
124    /// The response body (JSON).
125    pub body: Vec<u8>,
126}
127
128impl IngressResponse {
129    /// A JSON response with the given status and body.
130    #[must_use]
131    pub fn json(status: u16, body: Vec<u8>) -> Self {
132        Self {
133            status,
134            headers: Vec::new(),
135            body,
136        }
137    }
138
139    /// Adds a response header (builder style).
140    #[must_use]
141    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
142        self.headers.push((name.into(), value.into()));
143        self
144    }
145}