osproxy_engine/passthrough.rs
1//! Tenant-agnostic passthrough: forward a request verbatim to one cluster.
2//!
3//! When a [`PassthroughPolicy`] is set and [matches](PassthroughPolicy::matches)
4//! a request, the pipeline skips tenancy entirely (no partition resolve, no body
5//! rewrite, no isolation) and forwards the raw request to the configured cluster,
6//! returning the upstream response unchanged.
7//!
8//! The match is **per request, by logical index**, so one proxy serves both
9//! modes at once: list the indices that are not (yet) onboarded into tenancy and
10//! those flow through verbatim, while everything else is tenant-isolated. This is
11//! the composable migration shape, legacy indices pass through, tenanted indices
12//! do not, not a global "isolation off" switch. It is **fail-closed**: an index
13//! that does not match keeps full tenancy. Matching is on the operator-configured
14//! index list only, never a client-supplied header, so a client cannot opt itself
15//! out of isolation. An empty match list means *every* request passes through (the
16//! whole-instance transparent/capture proxy).
17//!
18//! It reuses the same verbatim-forward primitive the admin and cursor paths use
19//! (a [`CursorOp`]): method, path, body, and query go upstream as-is, and the
20//! response comes back untouched. The forward still flows through the pipeline's
21//! trace, metrics, and pooling, so observability and connection reuse are intact.
22
23use osproxy_core::ClusterId;
24use osproxy_observe::{DispatchInfo, RequestTrace};
25use osproxy_sink::{ByteBody, CursorOp, ForwardOp, Reader, Sink, StreamingForward};
26use osproxy_tenancy::Router;
27
28use crate::error::RequestError;
29use crate::pipeline::{Pipeline, PipelineResponse};
30use osproxy_spi::RequestCtx;
31
32/// Where a passthrough proxy forwards a matching request: one cluster and its
33/// base URL, plus the logical-index prefixes that select which requests pass
34/// through verbatim (empty ⇒ all of them).
35#[derive(Clone, Debug)]
36pub struct PassthroughPolicy {
37 /// The cluster a matching request is forwarded to.
38 pub cluster: ClusterId,
39 /// The cluster's base URL (the sink pools it like any endpoint).
40 pub endpoint: Option<String>,
41 /// Logical-index prefixes that route verbatim. Empty means *every* request
42 /// passes through (whole-instance transparent proxy); non-empty means only
43 /// requests whose logical index matches a prefix pass through, the rest stay
44 /// tenant-isolated (fail-closed). Operator-configured, never client-supplied.
45 index_prefixes: Vec<String>,
46}
47
48impl PassthroughPolicy {
49 /// A policy forwarding *every* request to `cluster` at `endpoint` (the
50 /// whole-instance transparent proxy). Add [`with_index_prefixes`] to pass
51 /// through only selected indices and tenant-isolate the rest.
52 ///
53 /// [`with_index_prefixes`]: PassthroughPolicy::with_index_prefixes
54 #[must_use]
55 pub fn new(cluster: ClusterId, endpoint: impl Into<String>) -> Self {
56 Self {
57 cluster,
58 endpoint: Some(endpoint.into()),
59 index_prefixes: Vec::new(),
60 }
61 }
62
63 /// Restricts passthrough to requests whose logical index starts with one of
64 /// `prefixes`; all other requests keep full tenancy. An empty list (the
65 /// default) passes everything through.
66 #[must_use]
67 pub fn with_index_prefixes(mut self, prefixes: Vec<String>) -> Self {
68 self.index_prefixes = prefixes;
69 self
70 }
71
72 /// Whether `ctx` should be forwarded verbatim. Matches when no prefixes are
73 /// configured (whole-instance passthrough) or the request's logical index
74 /// starts with a configured prefix; otherwise the request stays tenanted.
75 #[must_use]
76 pub fn matches(&self, ctx: &RequestCtx<'_>) -> bool {
77 self.matches_index(ctx.logical_index())
78 }
79
80 /// Whether a request for `logical_index` should be forwarded verbatim. The
81 /// body-free half of [`matches`](Self::matches), so the transport can decide
82 /// to **stream** a passthrough request before buffering its body (ADR-014
83 /// stage 2).
84 #[must_use]
85 pub fn matches_index(&self, logical_index: &str) -> bool {
86 self.index_prefixes.is_empty()
87 || self
88 .index_prefixes
89 .iter()
90 .any(|p| logical_index.starts_with(p.as_str()))
91 }
92
93 /// The cluster + base URL a matching request forwards to.
94 fn target(&self) -> (ClusterId, Option<String>) {
95 (self.cluster.clone(), self.endpoint.clone())
96 }
97}
98
99impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
100 /// Forwards `ctx` verbatim to the passthrough cluster and returns the raw
101 /// upstream response. Reuses the cursor verbatim-forward op; the sink guards
102 /// the path against traversal at the same choke point as admin/cursor.
103 pub(crate) async fn forward(
104 &self,
105 ctx: &RequestCtx<'_>,
106 policy: &PassthroughPolicy,
107 trace: &mut RequestTrace,
108 ) -> Result<PipelineResponse, RequestError> {
109 let op = CursorOp::new(
110 policy.cluster.clone(),
111 ctx.method(),
112 ctx.path().to_owned(),
113 ctx.body().to_vec(),
114 )
115 .with_endpoint(policy.endpoint.clone())
116 .with_query(ctx.query().map(str::to_owned))
117 .with_protocol(ctx.protocol())
118 .with_trace(self.upstream_trace(ctx))
119 .with_forward_headers(ctx.forward_headers().to_vec());
120 let outcome = self.sink.cursor(op).await?;
121 trace.record_dispatch(DispatchInfo {
122 cluster: policy.cluster.clone(),
123 upstream_status: outcome.status,
124 pool_reuse: outcome.pool_reuse,
125 });
126 Ok(PipelineResponse {
127 status: outcome.status,
128 body: outcome.body,
129 // Verbatim passthrough: carry the upstream content type so a non-JSON
130 // body is not relabeled `application/json`.
131 content_type: outcome.content_type,
132 })
133 }
134
135 /// Forwards `ctx` verbatim with its body supplied as a **stream**, piped
136 /// straight to the upstream, and returns the upstream response as a live
137 /// stream too, so neither direction lands in memory (ADR-014). The fully
138 /// streaming twin of [`forward`](Self::forward).
139 pub(crate) async fn forward_stream(
140 &self,
141 ctx: &RequestCtx<'_>,
142 policy: &PassthroughPolicy,
143 body: ByteBody,
144 trace: &mut RequestTrace,
145 ) -> Result<StreamingForward, RequestError> {
146 let (cluster, endpoint) = policy.target();
147 let op = ForwardOp::new(cluster.clone(), ctx.method(), ctx.path().to_owned())
148 .with_endpoint(endpoint)
149 .with_query(ctx.query().map(str::to_owned))
150 .with_protocol(ctx.protocol())
151 .with_trace(self.upstream_trace(ctx))
152 .with_forward_headers(ctx.forward_headers().to_vec());
153 let outcome = self.sink.forward_stream(op, body).await?;
154 trace.record_dispatch(DispatchInfo {
155 cluster,
156 upstream_status: outcome.status,
157 pool_reuse: outcome.pool_reuse,
158 });
159 Ok(outcome)
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use osproxy_core::{EndpointKind, PrincipalId, RequestId};
167 use osproxy_spi::{HeaderView, HttpMethod, Principal, Protocol};
168
169 fn ctx_for<'a>(
170 principal: &'a Principal,
171 rid: &'a RequestId,
172 headers: &'a [(String, String)],
173 logical_index: &'a str,
174 ) -> RequestCtx<'a> {
175 RequestCtx::new(
176 principal,
177 rid,
178 HttpMethod::Post,
179 EndpointKind::IngestDoc,
180 Protocol::Http1,
181 logical_index,
182 HeaderView::new(headers),
183 b"",
184 )
185 }
186
187 fn matches_index(policy: &PassthroughPolicy, logical_index: &str) -> bool {
188 let principal = Principal::new(PrincipalId::from("svc"));
189 let rid = RequestId::from("r");
190 let headers = vec![];
191 policy.matches(&ctx_for(&principal, &rid, &headers, logical_index))
192 }
193
194 #[test]
195 fn a_prefix_free_policy_passes_every_request_through() {
196 let policy = PassthroughPolicy::new(ClusterId::from("c"), "http://c:9200");
197 assert!(matches_index(&policy, "anything"));
198 assert!(matches_index(&policy, "orders"));
199 }
200
201 #[test]
202 fn a_prefix_policy_passes_only_matching_indices_and_isolates_the_rest() {
203 // The migration shape: legacy indices pass through, everything else stays
204 // tenanted (fail-closed, a non-match keeps tenancy).
205 let policy = PassthroughPolicy::new(ClusterId::from("c"), "http://c:9200")
206 .with_index_prefixes(vec!["legacy-".to_owned(), "raw_".to_owned()]);
207 assert!(matches_index(&policy, "legacy-orders"), "prefix match");
208 assert!(matches_index(&policy, "raw_events"), "second prefix match");
209 assert!(!matches_index(&policy, "orders"), "tenanted index isolated");
210 assert!(
211 !matches_index(&policy, "not-legacy-orders"),
212 "prefix must anchor at the start, not match mid-string"
213 );
214 }
215}