use osproxy_core::ClusterId;
use osproxy_observe::{DispatchInfo, RequestTrace};
use osproxy_sink::{CursorOp, Reader, Sink};
use osproxy_spi::{RequestCtx, SpiError};
use osproxy_tenancy::Router;
use crate::error::RequestError;
use crate::pipeline::{Pipeline, PipelineResponse};
#[derive(Clone, Debug)]
pub struct AdminPolicy {
cluster: ClusterId,
allowed_prefixes: Vec<String>,
endpoint: Option<String>,
}
impl AdminPolicy {
#[must_use]
pub fn new(cluster: ClusterId, allowed_prefixes: Vec<String>) -> Self {
Self {
cluster,
allowed_prefixes,
endpoint: None,
}
}
#[must_use]
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
#[must_use]
fn allows(&self, path: &str) -> bool {
if path.split('/').any(|seg| seg == "..") {
return false;
}
self.allowed_prefixes.iter().any(|p| path.starts_with(p))
}
}
impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
pub(crate) async fn admin(
&self,
ctx: &RequestCtx<'_>,
trace: &mut RequestTrace,
) -> Result<PipelineResponse, RequestError> {
let Some(policy) = self.admin_policy.as_ref().filter(|p| p.allows(ctx.path())) else {
return Err(RequestError::Spi(SpiError::UnsupportedEndpoint {
endpoint: ctx.endpoint(),
}));
};
let endpoint = policy
.endpoint
.clone()
.or_else(|| self.router.cluster_endpoint(&policy.cluster));
let op = CursorOp::new(
policy.cluster.clone(),
ctx.method(),
ctx.path().to_owned(),
ctx.body().to_vec(),
)
.with_endpoint(endpoint)
.with_query(ctx.query().map(str::to_owned))
.with_trace(self.upstream_trace(ctx))
.with_forward_headers(ctx.forward_headers().to_vec());
let outcome = self.sink.cursor(op).await?;
trace.record_dispatch(DispatchInfo {
cluster: policy.cluster.clone(),
upstream_status: outcome.status,
pool_reuse: outcome.pool_reuse,
});
Ok(PipelineResponse {
status: outcome.status,
body: outcome.body,
content_type: outcome.content_type,
})
}
}