#![allow(clippy::unwrap_used)]
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::body::Incoming;
use hyper::service::service_fn;
use hyper::{Method, Request, Response};
use hyper_util::client::legacy::Client;
use hyper_util::rt::{TokioExecutor, TokioIo};
use osproxy_core::{ClusterId, IndexName};
use osproxy_engine::{PassthroughPolicy, Pipeline};
use osproxy_server::auth::ReferenceAuthenticator;
use osproxy_server::capture::{MemoryCapture, RedactingCapture};
use osproxy_server::handler::AppHandler;
use osproxy_server::tenancy::ReferenceTenancy;
use osproxy_sink::OpenSearchSink;
use osproxy_tenancy::TenancyRouter;
use osproxy_transport::IngressHandler;
use tokio::net::TcpListener;
#[derive(Clone, Debug, Default)]
struct Captured {
method: String,
uri: String,
body: String,
}
async fn start_upstream() -> (String, Arc<Mutex<Captured>>) {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let captured = Arc::new(Mutex::new(Captured::default()));
let cap = Arc::clone(&captured);
tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let io = TokioIo::new(stream);
let svc = service_fn(move |req: Request<Incoming>| {
let cap = Arc::clone(&cap);
async move {
let method = req.method().to_string();
let uri = req.uri().to_string();
let body = req.into_body().collect().await.unwrap().to_bytes();
*cap.lock().unwrap() = Captured {
method,
uri,
body: String::from_utf8_lossy(&body).into_owned(),
};
let resp = Response::builder()
.status(201)
.body(Full::new(Bytes::from(
r#"{"_id":"acme:7","result":"created"}"#,
)))
.unwrap();
Ok::<_, std::convert::Infallible>(resp)
}
});
hyper::server::conn::http1::Builder::new()
.serve_connection(io, svc)
.await
.unwrap();
});
(format!("http://{addr}"), captured)
}
#[tokio::test]
async fn put_doc_is_tenanted_and_forwarded_upstream() {
let (upstream, captured) = start_upstream().await;
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from("osproxy-shared"), upstream);
let handler = Arc::new(
AppHandler::new(
Pipeline::new(TenancyRouter::new(tenancy), sink),
ReferenceAuthenticator::dev(),
)
.with_require_tls_for_mutation(false), );
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/orders/_doc"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from_static(
br#"{"tenant_id":"acme","id":7,"msg":"hi"}"#,
)))
.unwrap();
let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 201);
let got = captured.lock().unwrap().clone();
assert_eq!(got.method, "PUT");
assert_eq!(got.uri, "/osproxy-shared/_doc/acme:7?routing=acme");
assert!(got.body.contains(r#""_tenant":"acme""#), "{}", got.body);
let request_id = resp
.headers()
.get("x-request-id")
.unwrap()
.to_str()
.unwrap()
.to_owned();
let explain = client
.request(
Request::builder()
.method(Method::GET)
.uri(format!("http://{proxy_addr}/debug/explain/{request_id}"))
.body(Full::new(Bytes::new()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(explain.status(), 200);
let doc = explain.into_body().collect().await.unwrap().to_bytes();
let text = String::from_utf8(doc.to_vec()).unwrap();
assert!(text.contains(r#""partition_id":"acme""#), "{text}");
assert!(text.contains(r#""outcome":"ok""#), "{text}");
assert!(!text.contains("\"hi\""), "value leaked: {text}");
assert_metrics_snapshot(&client, proxy_addr).await;
}
async fn assert_metrics_snapshot(
client: &Client<hyper_util::client::legacy::connect::HttpConnector, Full<Bytes>>,
proxy_addr: std::net::SocketAddr,
) {
let metrics = client
.request(
Request::builder()
.method(Method::GET)
.uri(format!("http://{proxy_addr}/metrics"))
.body(Full::new(Bytes::new()))
.unwrap(),
)
.await
.unwrap();
assert_eq!(metrics.status(), 200);
let mbody = metrics.into_body().collect().await.unwrap().to_bytes();
let mtext = String::from_utf8(mbody.to_vec()).unwrap();
let snap: serde_json::Value = serde_json::from_str(&mtext).unwrap();
assert_eq!(snap["requests_total"], 1, "one data-plane request: {mtext}");
assert_eq!(snap["requests_ok"], 1);
assert_eq!(snap["requests_error"], 0);
assert_eq!(snap["pools"][0]["cluster"], "default");
assert_eq!(snap["pools"][0]["dispatched"], 1);
assert!(!mtext.contains("acme"), "metrics leaked tenant: {mtext}");
}
#[tokio::test]
async fn unresolved_partition_returns_client_error() {
let (upstream, _captured) = start_upstream().await;
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from("osproxy-shared"), upstream);
let handler = Arc::new(
AppHandler::new(
Pipeline::new(TenancyRouter::new(tenancy), sink),
ReferenceAuthenticator::dev(),
)
.with_require_tls_for_mutation(false), );
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/orders/_doc"))
.body(Full::new(Bytes::from_static(br#"{"id":7}"#)))
.unwrap();
let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 400);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let text = String::from_utf8(body.to_vec()).unwrap();
assert!(text.contains("partition_unresolved"), "{text}");
}
#[tokio::test]
async fn token_auth_rejects_missing_and_accepts_valid() {
let (upstream, _captured) = start_upstream().await;
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from("osproxy-shared"), upstream);
let mut tokens = HashMap::new();
tokens.insert("s3cr3t".to_owned(), "svc-ingest".to_owned());
let handler = Arc::new(
AppHandler::new(
Pipeline::new(TenancyRouter::new(tenancy), sink),
ReferenceAuthenticator::new(tokens),
)
.with_require_tls_for_mutation(false), );
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let body = || Full::new(Bytes::from_static(br#"{"tenant_id":"acme","id":7}"#));
let unauth = client
.request(
Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/orders/_doc"))
.body(body())
.unwrap(),
)
.await
.unwrap();
assert_eq!(unauth.status(), 401);
let ok = client
.request(
Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/orders/_doc"))
.header("authorization", "Bearer s3cr3t")
.body(body())
.unwrap(),
)
.await
.unwrap();
assert_eq!(ok.status(), 201);
}
#[tokio::test]
async fn a_mutating_request_over_cleartext_is_refused_when_tls_is_required() {
let (upstream, captured) = start_upstream().await;
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from("osproxy-shared"), upstream);
let handler = AppHandler::new(
Pipeline::new(TenancyRouter::new(tenancy), sink),
ReferenceAuthenticator::dev(),
);
let ingest = |secure: bool| osproxy_transport::IngressRequest {
method: osproxy_spi::HttpMethod::Post,
protocol: osproxy_spi::Protocol::Http1,
path: "/orders/_doc".to_owned(),
endpoint: osproxy_core::EndpointKind::IngestDoc,
logical_index: "orders".to_owned(),
doc_id: None,
headers: vec![],
body: br#"{"tenant_id":"acme","id":7}"#.to_vec(),
query: None,
client_cert_subject: None,
secure,
};
let refused = handler.handle(ingest(false)).await;
assert_eq!(refused.status, 403);
assert!(
String::from_utf8_lossy(&refused.body).contains("tls_required"),
"value-free tls_required body: {refused:?}"
);
assert_eq!(
captured.lock().unwrap().method,
"",
"a refused request never reaches the upstream"
);
let ok = handler.handle(ingest(true)).await;
assert_eq!(ok.status, 201);
assert_eq!(captured.lock().unwrap().method, "PUT");
}
struct DenyIngest;
impl osproxy_spi::Authorizer for DenyIngest {
async fn authorize(
&self,
_principal: &osproxy_spi::Principal,
action: &osproxy_spi::Action,
) -> Result<(), osproxy_spi::AuthError> {
if action.endpoint == osproxy_core::EndpointKind::IngestDoc {
return Err(osproxy_spi::AuthError::Unauthorized);
}
Ok(())
}
}
#[tokio::test]
async fn a_supplied_authorizer_can_decline_an_authenticated_request() {
let (upstream, captured) = start_upstream().await;
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from("osproxy-shared"), upstream);
let handler = AppHandler::new(
Pipeline::new(TenancyRouter::new(tenancy), sink),
ReferenceAuthenticator::dev(),
)
.with_require_tls_for_mutation(false)
.with_authorizer(DenyIngest);
let ingest = osproxy_transport::IngressRequest {
method: osproxy_spi::HttpMethod::Post,
protocol: osproxy_spi::Protocol::Http1,
path: "/orders/_doc".to_owned(),
endpoint: osproxy_core::EndpointKind::IngestDoc,
logical_index: "orders".to_owned(),
doc_id: None,
headers: vec![],
body: br#"{"tenant_id":"acme","id":7}"#.to_vec(),
query: None,
client_cert_subject: None,
secure: true,
};
let denied = handler.handle(ingest).await;
assert_eq!(denied.status, 403, "authorizer declined the action");
assert_eq!(
captured.lock().unwrap().method,
"",
"a request the authorizer declined never reaches the upstream"
);
}
#[tokio::test]
async fn passthrough_streams_the_body_verbatim_when_capture_is_off() {
let (upstream, captured) = start_upstream().await;
let cluster = ClusterId::from("source");
let tenancy = ReferenceTenancy::new(cluster.clone(), IndexName::from("ignored"), &upstream);
let pipeline = Pipeline::new(TenancyRouter::new(tenancy), OpenSearchSink::new())
.with_passthrough(PassthroughPolicy::new(cluster, upstream));
let handler = Arc::new(
AppHandler::new(pipeline, ReferenceAuthenticator::dev())
.with_require_tls_for_mutation(false),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/legacy/_doc/raw-9"))
.body(Full::new(Bytes::from_static(
br#"{"id":9,"msg":"streamed"}"#,
)))
.unwrap();
let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 201);
let got = captured.lock().unwrap().clone();
assert_eq!(got.method, "POST");
assert_eq!(got.uri, "/legacy/_doc/raw-9");
assert_eq!(
got.body, r#"{"id":9,"msg":"streamed"}"#,
"the streamed body reached the upstream verbatim"
);
assert!(!got.body.contains("_tenant"), "no tenancy injection");
}
#[tokio::test]
async fn passthrough_forwards_verbatim_and_capture_records_the_exchange() {
let (upstream, captured) = start_upstream().await;
let cluster = ClusterId::from("source");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster.clone(), IndexName::from("ignored"), &upstream);
let pipeline = Pipeline::new(TenancyRouter::new(tenancy), sink)
.with_passthrough(PassthroughPolicy::new(cluster, upstream))
.with_baseline_capture(true);
let capture = MemoryCapture::new();
let handler = Arc::new(
AppHandler::new(pipeline, ReferenceAuthenticator::dev())
.with_require_tls_for_mutation(false)
.with_capture(Box::new(RedactingCapture::new(capture.clone()))),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/orders/_doc/raw-1"))
.header("authorization", "Bearer s3cret")
.body(Full::new(Bytes::from_static(br#"{"id":7,"msg":"hi"}"#)))
.unwrap();
let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 201);
let got = captured.lock().unwrap().clone();
assert_eq!(got.method, "POST");
assert_eq!(got.uri, "/orders/_doc/raw-1");
assert_eq!(got.body, r#"{"id":7,"msg":"hi"}"#);
assert!(
!got.body.contains("_tenant"),
"no tenancy injection: {}",
got.body
);
let records = capture.records();
assert_eq!(records.len(), 1, "one captured exchange");
assert_eq!(records[0].path, "/orders/_doc/raw-1");
assert_eq!(records[0].body, br#"{"id":7,"msg":"hi"}"#);
assert_eq!(records[0].response_status, 201);
assert!(
!records[0]
.headers
.iter()
.any(|(k, _)| k.eq_ignore_ascii_case("authorization")),
"the captured stream must not carry the credential: {:?}",
records[0].headers
);
}
#[tokio::test]
async fn a_wired_capture_sink_stays_off_until_enabled() {
let (upstream, _captured) = start_upstream().await;
let cluster = ClusterId::from("source");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster.clone(), IndexName::from("ignored"), &upstream);
let pipeline = Pipeline::new(TenancyRouter::new(tenancy), sink)
.with_passthrough(PassthroughPolicy::new(cluster, upstream));
let capture = MemoryCapture::new();
let handler = Arc::new(
AppHandler::new(pipeline, ReferenceAuthenticator::dev())
.with_require_tls_for_mutation(false)
.with_capture(Box::new(capture.clone())),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/orders/_doc/raw-1"))
.body(Full::new(Bytes::from_static(br#"{"id":7}"#)))
.unwrap();
assert_eq!(client.request(req).await.unwrap().status(), 201);
assert!(
capture.records().is_empty(),
"a wired sink captures nothing until capture is enabled"
);
}
#[derive(Clone, Default)]
struct RecordingLog(Arc<Mutex<Vec<serde_json::Value>>>);
impl osproxy_server::log::RequestLog for RecordingLog {
fn emit(&self, record: &serde_json::Value) {
self.0.lock().unwrap().push(record.clone());
}
}
#[tokio::test]
async fn a_handled_request_emits_a_structured_log_carrying_the_trace_id() {
let (upstream, _captured) = start_upstream().await;
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from("osproxy-shared"), upstream);
let log = RecordingLog::default();
let handler = Arc::new(
AppHandler::new(
Pipeline::new(TenancyRouter::new(tenancy), sink),
ReferenceAuthenticator::dev(),
)
.with_request_log(Box::new(log.clone()))
.with_require_tls_for_mutation(false), );
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{proxy_addr}/orders/_doc"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from_static(
br#"{"tenant_id":"acme","id":7}"#,
)))
.unwrap();
let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 201);
let records = log.0.lock().unwrap();
assert_eq!(records.len(), 1, "one structured log line per request");
let rec = &records[0];
assert_eq!(rec["outcome"], "ok");
assert!(rec["request_id"].is_string(), "record: {rec}");
assert!(
rec["trace_id"].as_str().is_some_and(|t| t.len() == 32),
"structured log carries the 32-hex trace id: {rec}"
);
}