#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::sync::{Arc, Mutex};
use osproxy_core::Epoch;
use osproxy_core::{ClusterId, EndpointKind, ErrorCode, IndexName, PartitionId, RequestId};
use osproxy_engine::{AdminPolicy, Pipeline, RequestError};
use osproxy_sink::{
CountOutcome, CursorOp, CursorOutcome, MemorySink, ReadOp, ReadOutcome, Reader, SearchOp,
SearchOutcome, Sink, SinkError, WriteAck, WriteBatch,
};
use osproxy_spi::{
BodyDoc, DocIdRule, HeaderView, HttpMethod, InjectedField, JsonPath, PartitionKeySpec,
Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec, SpiError, TenancySpi,
};
use osproxy_tenancy::TenancyRouter;
type StubPipeline = Pipeline<TenancyRouter<StubTenancy>, RecordingSink>;
struct RecordingSink {
seen: Arc<Mutex<Option<CursorOp>>>,
inner: MemorySink,
}
impl RecordingSink {
fn new() -> (Self, Arc<Mutex<Option<CursorOp>>>) {
let seen = Arc::new(Mutex::new(None));
(
Self {
seen: seen.clone(),
inner: MemorySink::new(),
},
seen,
)
}
}
impl Sink for RecordingSink {
async fn write(&self, batch: WriteBatch) -> Result<WriteAck, SinkError> {
self.inner.write(batch).await
}
}
impl Reader for RecordingSink {
async fn get(&self, op: ReadOp) -> Result<ReadOutcome, SinkError> {
self.inner.get(op).await
}
async fn search(&self, op: SearchOp) -> Result<SearchOutcome, SinkError> {
self.inner.search(op).await
}
async fn count(&self, op: SearchOp) -> Result<CountOutcome, SinkError> {
self.inner.count(op).await
}
async fn cursor(&self, op: CursorOp) -> Result<CursorOutcome, SinkError> {
*self.seen.lock().unwrap() = Some(op);
Ok(CursorOutcome::new(200, br#"[{"index":"a"}]"#.to_vec()))
}
}
struct StubTenancy;
impl TenancySpi for StubTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
None
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, _partition: &PartitionId) -> Result<PlacementAt, SpiError> {
Ok(PlacementAt::new(
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from("shared"),
inject: vec![],
},
Epoch::new(1),
))
}
}
fn pipeline(policy: Option<AdminPolicy>) -> (StubPipeline, Arc<Mutex<Option<CursorOp>>>) {
let (sink, seen) = RecordingSink::new();
let mut p = Pipeline::new(TenancyRouter::new(StubTenancy), sink);
if let Some(policy) = policy {
p = p.with_admin_passthrough(policy);
}
(p, seen)
}
async fn run(
p: &StubPipeline,
path: &str,
query: Option<&str>,
) -> Result<(u16, Vec<u8>), RequestError> {
let principal = Principal::new(osproxy_core::PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers: Vec<(String, String)> = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Get,
EndpointKind::Admin,
Protocol::Http1,
"",
HeaderView::new(&headers),
b"",
)
.with_path(path)
.with_query(query);
p.handle(&ctx).await.map(|r| (r.status, r.body))
}
#[tokio::test]
async fn an_allow_listed_admin_request_forwards_verbatim_to_the_admin_cluster() {
let policy = AdminPolicy::new(ClusterId::from("admin-1"), vec!["/_cat/".to_owned()]);
let (p, seen) = pipeline(Some(policy));
let (status, body) = run(&p, "/_cat/indices", Some("v&format=json"))
.await
.expect("allow-listed admin request passes through");
assert_eq!(status, 200);
assert_eq!(body, br#"[{"index":"a"}]"#);
let op = seen.lock().unwrap().clone().expect("forwarded to the sink");
assert_eq!(
op.cluster,
ClusterId::from("admin-1"),
"routed to admin cluster"
);
assert_eq!(op.path, "/_cat/indices", "full path forwarded verbatim");
assert_eq!(op.query.as_deref(), Some("v&format=json"));
}
#[tokio::test]
async fn an_admin_request_off_the_allow_list_is_rejected() {
let policy = AdminPolicy::new(ClusterId::from("admin-1"), vec!["/_cat/".to_owned()]);
let (p, seen) = pipeline(Some(policy));
let err = run(&p, "/_cluster/settings", None)
.await
.expect_err("a non-allow-listed admin path is rejected");
assert_eq!(err.code(), ErrorCode::UnsupportedEndpoint);
assert!(seen.lock().unwrap().is_none(), "no dispatch on rejection");
}
#[tokio::test]
async fn a_traversal_path_cannot_escape_the_allow_listed_prefix() {
let policy = AdminPolicy::new(ClusterId::from("admin-1"), vec!["/_cat/".to_owned()]);
let (p, seen) = pipeline(Some(policy));
let err = run(&p, "/_cat/../_cluster/settings", None)
.await
.expect_err("a traversal path must be rejected");
assert_eq!(err.code(), ErrorCode::UnsupportedEndpoint);
assert!(
seen.lock().unwrap().is_none(),
"no dispatch on a traversal path"
);
}
#[tokio::test]
async fn admin_is_rejected_when_no_policy_is_configured() {
let (p, seen) = pipeline(None); let err = run(&p, "/_cat/indices", None)
.await
.expect_err("admin is rejected by default");
assert_eq!(err.code(), ErrorCode::UnsupportedEndpoint);
assert!(seen.lock().unwrap().is_none(), "no dispatch when disabled");
}