osproxy_engine/admin.rs
1//! Administrative pass-through (`docs/03` ยง6), `_cat`/`_cluster`/`_nodes`.
2//!
3//! These endpoints carry no tenancy semantics, so the proxy cannot filter or
4//! strip them; per `docs/decisions/006` and `docs/specs/opensearch-endpoints.md`
5//! the only safe choices are **reject** (the default) or **pass through to an
6//! operator-allow-listed cluster**, with the operator accepting that admin
7//! output is cluster-wide (not tenant-scoped). The policy is opt-in: without one,
8//! every admin request is rejected exactly like an unsupported endpoint.
9
10use osproxy_core::ClusterId;
11use osproxy_observe::{DispatchInfo, RequestTrace};
12use osproxy_sink::{CursorOp, Reader, Sink};
13use osproxy_spi::{RequestCtx, SpiError};
14use osproxy_tenancy::Router;
15
16use crate::error::RequestError;
17use crate::pipeline::{Pipeline, PipelineResponse};
18
19/// The operator's admin pass-through policy: the cluster that answers admin
20/// requests and the path prefixes permitted through. A request whose path does
21/// not match an allowed prefix is rejected, so enabling pass-through for
22/// `_cat/health` does not silently open `_cluster/settings`.
23#[derive(Clone, Debug)]
24pub struct AdminPolicy {
25 cluster: ClusterId,
26 allowed_prefixes: Vec<String>,
27 endpoint: Option<String>,
28}
29
30impl AdminPolicy {
31 /// A policy forwarding any path matching one of `allowed_prefixes` to
32 /// `cluster`. Prefixes are matched against the raw request path (e.g.
33 /// `/_cat/`, `/_cluster/health`); an empty list allows nothing.
34 #[must_use]
35 pub fn new(cluster: ClusterId, allowed_prefixes: Vec<String>) -> Self {
36 Self {
37 cluster,
38 allowed_prefixes,
39 endpoint: None,
40 }
41 }
42
43 /// Sets the admin cluster's base URL (builder style). The admin cluster is
44 /// operator infrastructure, not a tenancy placement, so its endpoint is given
45 /// here; without it the sink falls back to the tenancy's `cluster_endpoint`.
46 #[must_use]
47 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
48 self.endpoint = Some(endpoint.into());
49 self
50 }
51
52 /// Whether `path` is allow-listed for pass-through. A path containing a `..`
53 /// segment is never allowed: the prefix is an authorization boundary, and
54 /// upstream `..` resolution would otherwise let `/_cat/../_cluster/settings`
55 /// slip past a `/_cat/`-only allow-list.
56 #[must_use]
57 fn allows(&self, path: &str) -> bool {
58 if path.split('/').any(|seg| seg == "..") {
59 return false;
60 }
61 self.allowed_prefixes.iter().any(|p| path.starts_with(p))
62 }
63}
64
65impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
66 /// Forwards an allow-listed admin request verbatim to the policy's cluster,
67 /// or rejects it (the default when no policy is configured, and for any path
68 /// not on the allow-list). Admin output is not tenancy-filtered, so the full
69 /// path and query are forwarded, there is no body partition filter to bypass.
70 pub(crate) async fn admin(
71 &self,
72 ctx: &RequestCtx<'_>,
73 trace: &mut RequestTrace,
74 ) -> Result<PipelineResponse, RequestError> {
75 let Some(policy) = self.admin_policy.as_ref().filter(|p| p.allows(ctx.path())) else {
76 // No policy, or the path is not allow-listed: reject like an
77 // unsupported endpoint (fail-closed, `docs/decisions/006`).
78 return Err(RequestError::Spi(SpiError::UnsupportedEndpoint {
79 endpoint: ctx.endpoint(),
80 }));
81 };
82 // The admin cluster's endpoint: the operator-supplied one, else the
83 // tenancy's lookup for that cluster id.
84 let endpoint = policy
85 .endpoint
86 .clone()
87 .or_else(|| self.router.cluster_endpoint(&policy.cluster));
88 let op = CursorOp::new(
89 policy.cluster.clone(),
90 ctx.method(),
91 ctx.path().to_owned(),
92 ctx.body().to_vec(),
93 )
94 .with_endpoint(endpoint)
95 .with_query(ctx.query().map(str::to_owned))
96 .with_trace(self.upstream_trace(ctx))
97 .with_forward_headers(ctx.forward_headers().to_vec());
98 let outcome = self.sink.cursor(op).await?;
99 trace.record_dispatch(DispatchInfo {
100 cluster: policy.cluster.clone(),
101 upstream_status: outcome.status,
102 pool_reuse: outcome.pool_reuse,
103 });
104 Ok(PipelineResponse {
105 status: outcome.status,
106 body: outcome.body,
107 // Admin output (`_cat` etc.) is often `text/plain`; forward the
108 // upstream content type rather than mislabeling it `application/json`.
109 content_type: outcome.content_type,
110 })
111 }
112}