Skip to main content

osproxy_engine/
admin.rs

1//! Administrative pass-through (`docs/03` ยง6), `_cat`/`_cluster`/`_nodes`.
2//!
3//! These endpoints carry no tenancy semantics, so the proxy cannot filter or
4//! strip them; per `docs/decisions/006` and `docs/specs/opensearch-endpoints.md`
5//! the only safe choices are **reject** (the default) or **pass through to an
6//! operator-allow-listed cluster**, with the operator accepting that admin
7//! output is cluster-wide (not tenant-scoped). The policy is opt-in: without one,
8//! every admin request is rejected exactly like an unsupported endpoint.
9
10use osproxy_core::ClusterId;
11use osproxy_observe::{DispatchInfo, RequestTrace};
12use osproxy_sink::{CursorOp, Reader, Sink};
13use osproxy_spi::{RequestCtx, SpiError};
14use osproxy_tenancy::Router;
15
16use crate::error::RequestError;
17use crate::pipeline::{Pipeline, PipelineResponse};
18
19/// The operator's admin pass-through policy: the cluster that answers admin
20/// requests and the path prefixes permitted through. A request whose path does
21/// not match an allowed prefix is rejected, so enabling pass-through for
22/// `_cat/health` does not silently open `_cluster/settings`.
23#[derive(Clone, Debug)]
24pub struct AdminPolicy {
25    cluster: ClusterId,
26    allowed_prefixes: Vec<String>,
27    endpoint: Option<String>,
28}
29
30impl AdminPolicy {
31    /// A policy forwarding any path matching one of `allowed_prefixes` to
32    /// `cluster`. Prefixes are matched against the raw request path (e.g.
33    /// `/_cat/`, `/_cluster/health`); an empty list allows nothing.
34    #[must_use]
35    pub fn new(cluster: ClusterId, allowed_prefixes: Vec<String>) -> Self {
36        Self {
37            cluster,
38            allowed_prefixes,
39            endpoint: None,
40        }
41    }
42
43    /// Sets the admin cluster's base URL (builder style). The admin cluster is
44    /// operator infrastructure, not a tenancy placement, so its endpoint is given
45    /// here; without it the sink falls back to the tenancy's `cluster_endpoint`.
46    #[must_use]
47    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
48        self.endpoint = Some(endpoint.into());
49        self
50    }
51
52    /// Whether `path` is allow-listed for pass-through. A path containing a `..`
53    /// segment is never allowed: the prefix is an authorization boundary, and
54    /// upstream `..` resolution would otherwise let `/_cat/../_cluster/settings`
55    /// slip past a `/_cat/`-only allow-list.
56    #[must_use]
57    fn allows(&self, path: &str) -> bool {
58        if path.split('/').any(|seg| seg == "..") {
59            return false;
60        }
61        self.allowed_prefixes.iter().any(|p| path.starts_with(p))
62    }
63}
64
65impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
66    /// Forwards an allow-listed admin request verbatim to the policy's cluster,
67    /// or rejects it (the default when no policy is configured, and for any path
68    /// not on the allow-list). Admin output is not tenancy-filtered, so the full
69    /// path and query are forwarded, there is no body partition filter to bypass.
70    pub(crate) async fn admin(
71        &self,
72        ctx: &RequestCtx<'_>,
73        trace: &mut RequestTrace,
74    ) -> Result<PipelineResponse, RequestError> {
75        let Some(policy) = self.admin_policy.as_ref().filter(|p| p.allows(ctx.path())) else {
76            // No policy, or the path is not allow-listed: reject like an
77            // unsupported endpoint (fail-closed, `docs/decisions/006`).
78            return Err(RequestError::Spi(SpiError::UnsupportedEndpoint {
79                endpoint: ctx.endpoint(),
80            }));
81        };
82        // The admin cluster's endpoint: the operator-supplied one, else the
83        // tenancy's lookup for that cluster id.
84        let endpoint = policy
85            .endpoint
86            .clone()
87            .or_else(|| self.router.cluster_endpoint(&policy.cluster));
88        let op = CursorOp::new(
89            policy.cluster.clone(),
90            ctx.method(),
91            ctx.path().to_owned(),
92            ctx.body().to_vec(),
93        )
94        .with_endpoint(endpoint)
95        .with_query(ctx.query().map(str::to_owned))
96        .with_trace(self.upstream_trace(ctx))
97        .with_forward_headers(ctx.forward_headers().to_vec());
98        let outcome = self.sink.cursor(op).await?;
99        trace.record_dispatch(DispatchInfo {
100            cluster: policy.cluster.clone(),
101            upstream_status: outcome.status,
102            pool_reuse: outcome.pool_reuse,
103        });
104        Ok(PipelineResponse {
105            status: outcome.status,
106            body: outcome.body,
107            // Admin output (`_cat` etc.) is often `text/plain`; forward the
108            // upstream content type rather than mislabeling it `application/json`.
109            content_type: outcome.content_type,
110        })
111    }
112}