Skip to main content

osproxy_engine/
passthrough.rs

1//! Tenant-agnostic passthrough: forward a request verbatim to one cluster.
2//!
3//! When a [`PassthroughPolicy`] is set and [matches](PassthroughPolicy::matches)
4//! a request, the pipeline skips tenancy entirely (no partition resolve, no body
5//! rewrite, no isolation) and forwards the raw request to the configured cluster,
6//! returning the upstream response unchanged.
7//!
8//! The match is **per request, by logical index**, so one proxy serves both
9//! modes at once: list the indices that are not (yet) onboarded into tenancy and
10//! those flow through verbatim, while everything else is tenant-isolated. This is
11//! the composable migration shape, legacy indices pass through, tenanted indices
12//! do not, not a global "isolation off" switch. It is **fail-closed**: an index
13//! that does not match keeps full tenancy. Matching is on the operator-configured
14//! index list only, never a client-supplied header, so a client cannot opt itself
15//! out of isolation. An empty match list means *every* request passes through (the
16//! whole-instance transparent/capture proxy).
17//!
18//! It reuses the same verbatim-forward primitive the admin and cursor paths use
19//! (a [`CursorOp`]): method, path, body, and query go upstream as-is, and the
20//! response comes back untouched. The forward still flows through the pipeline's
21//! trace, metrics, and pooling, so observability and connection reuse are intact.
22
23use osproxy_core::ClusterId;
24use osproxy_observe::{DispatchInfo, RequestTrace};
25use osproxy_sink::{ByteBody, CursorOp, ForwardOp, Reader, Sink, StreamingForward};
26use osproxy_tenancy::Router;
27
28use crate::error::RequestError;
29use crate::pipeline::{Pipeline, PipelineResponse};
30use osproxy_spi::RequestCtx;
31
32/// Where a passthrough proxy forwards a matching request: one cluster and its
33/// base URL, plus the logical-index prefixes that select which requests pass
34/// through verbatim (empty ⇒ all of them).
35#[derive(Clone, Debug)]
36pub struct PassthroughPolicy {
37    /// The cluster a matching request is forwarded to.
38    pub cluster: ClusterId,
39    /// The cluster's base URL (the sink pools it like any endpoint).
40    pub endpoint: Option<String>,
41    /// Logical-index prefixes that route verbatim. Empty means *every* request
42    /// passes through (whole-instance transparent proxy); non-empty means only
43    /// requests whose logical index matches a prefix pass through, the rest stay
44    /// tenant-isolated (fail-closed). Operator-configured, never client-supplied.
45    index_prefixes: Vec<String>,
46}
47
48impl PassthroughPolicy {
49    /// A policy forwarding *every* request to `cluster` at `endpoint` (the
50    /// whole-instance transparent proxy). Add [`with_index_prefixes`] to pass
51    /// through only selected indices and tenant-isolate the rest.
52    ///
53    /// [`with_index_prefixes`]: PassthroughPolicy::with_index_prefixes
54    #[must_use]
55    pub fn new(cluster: ClusterId, endpoint: impl Into<String>) -> Self {
56        Self {
57            cluster,
58            endpoint: Some(endpoint.into()),
59            index_prefixes: Vec::new(),
60        }
61    }
62
63    /// Restricts passthrough to requests whose logical index starts with one of
64    /// `prefixes`; all other requests keep full tenancy. An empty list (the
65    /// default) passes everything through.
66    #[must_use]
67    pub fn with_index_prefixes(mut self, prefixes: Vec<String>) -> Self {
68        self.index_prefixes = prefixes;
69        self
70    }
71
72    /// Whether `ctx` should be forwarded verbatim. Matches when no prefixes are
73    /// configured (whole-instance passthrough) or the request's logical index
74    /// starts with a configured prefix; otherwise the request stays tenanted.
75    #[must_use]
76    pub fn matches(&self, ctx: &RequestCtx<'_>) -> bool {
77        self.matches_index(ctx.logical_index())
78    }
79
80    /// Whether a request for `logical_index` should be forwarded verbatim. The
81    /// body-free half of [`matches`](Self::matches), so the transport can decide
82    /// to **stream** a passthrough request before buffering its body (ADR-014
83    /// stage 2).
84    #[must_use]
85    pub fn matches_index(&self, logical_index: &str) -> bool {
86        self.index_prefixes.is_empty()
87            || self
88                .index_prefixes
89                .iter()
90                .any(|p| logical_index.starts_with(p.as_str()))
91    }
92
93    /// The cluster + base URL a matching request forwards to.
94    fn target(&self) -> (ClusterId, Option<String>) {
95        (self.cluster.clone(), self.endpoint.clone())
96    }
97}
98
99impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
100    /// Forwards `ctx` verbatim to the passthrough cluster and returns the raw
101    /// upstream response. Reuses the cursor verbatim-forward op; the sink guards
102    /// the path against traversal at the same choke point as admin/cursor.
103    pub(crate) async fn forward(
104        &self,
105        ctx: &RequestCtx<'_>,
106        policy: &PassthroughPolicy,
107        trace: &mut RequestTrace,
108    ) -> Result<PipelineResponse, RequestError> {
109        let op = CursorOp::new(
110            policy.cluster.clone(),
111            ctx.method(),
112            ctx.path().to_owned(),
113            ctx.body().to_vec(),
114        )
115        .with_endpoint(policy.endpoint.clone())
116        .with_query(ctx.query().map(str::to_owned))
117        .with_protocol(ctx.protocol())
118        .with_trace(self.upstream_trace(ctx))
119        .with_forward_headers(ctx.forward_headers().to_vec());
120        let outcome = self.sink.cursor(op).await?;
121        trace.record_dispatch(DispatchInfo {
122            cluster: policy.cluster.clone(),
123            upstream_status: outcome.status,
124            pool_reuse: outcome.pool_reuse,
125        });
126        Ok(PipelineResponse {
127            status: outcome.status,
128            body: outcome.body,
129            // Verbatim passthrough: carry the upstream content type so a non-JSON
130            // body is not relabeled `application/json`.
131            content_type: outcome.content_type,
132        })
133    }
134
135    /// Forwards `ctx` verbatim with its body supplied as a **stream**, piped
136    /// straight to the upstream, and returns the upstream response as a live
137    /// stream too, so neither direction lands in memory (ADR-014). The fully
138    /// streaming twin of [`forward`](Self::forward).
139    pub(crate) async fn forward_stream(
140        &self,
141        ctx: &RequestCtx<'_>,
142        policy: &PassthroughPolicy,
143        body: ByteBody,
144        trace: &mut RequestTrace,
145    ) -> Result<StreamingForward, RequestError> {
146        let (cluster, endpoint) = policy.target();
147        let op = ForwardOp::new(cluster.clone(), ctx.method(), ctx.path().to_owned())
148            .with_endpoint(endpoint)
149            .with_query(ctx.query().map(str::to_owned))
150            .with_protocol(ctx.protocol())
151            .with_trace(self.upstream_trace(ctx))
152            .with_forward_headers(ctx.forward_headers().to_vec());
153        let outcome = self.sink.forward_stream(op, body).await?;
154        trace.record_dispatch(DispatchInfo {
155            cluster,
156            upstream_status: outcome.status,
157            pool_reuse: outcome.pool_reuse,
158        });
159        Ok(outcome)
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use osproxy_core::{EndpointKind, PrincipalId, RequestId};
167    use osproxy_spi::{HeaderView, HttpMethod, Principal, Protocol};
168
169    fn ctx_for<'a>(
170        principal: &'a Principal,
171        rid: &'a RequestId,
172        headers: &'a [(String, String)],
173        logical_index: &'a str,
174    ) -> RequestCtx<'a> {
175        RequestCtx::new(
176            principal,
177            rid,
178            HttpMethod::Post,
179            EndpointKind::IngestDoc,
180            Protocol::Http1,
181            logical_index,
182            HeaderView::new(headers),
183            b"",
184        )
185    }
186
187    fn matches_index(policy: &PassthroughPolicy, logical_index: &str) -> bool {
188        let principal = Principal::new(PrincipalId::from("svc"));
189        let rid = RequestId::from("r");
190        let headers = vec![];
191        policy.matches(&ctx_for(&principal, &rid, &headers, logical_index))
192    }
193
194    #[test]
195    fn a_prefix_free_policy_passes_every_request_through() {
196        let policy = PassthroughPolicy::new(ClusterId::from("c"), "http://c:9200");
197        assert!(matches_index(&policy, "anything"));
198        assert!(matches_index(&policy, "orders"));
199    }
200
201    #[test]
202    fn a_prefix_policy_passes_only_matching_indices_and_isolates_the_rest() {
203        // The migration shape: legacy indices pass through, everything else stays
204        // tenanted (fail-closed, a non-match keeps tenancy).
205        let policy = PassthroughPolicy::new(ClusterId::from("c"), "http://c:9200")
206            .with_index_prefixes(vec!["legacy-".to_owned(), "raw_".to_owned()]);
207        assert!(matches_index(&policy, "legacy-orders"), "prefix match");
208        assert!(matches_index(&policy, "raw_events"), "second prefix match");
209        assert!(!matches_index(&policy, "orders"), "tenanted index isolated");
210        assert!(
211            !matches_index(&policy, "not-legacy-orders"),
212            "prefix must anchor at the start, not match mid-string"
213        );
214    }
215}