#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use hyper::{Method, Request, Response};
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use osproxy_core::{ClusterId, IndexName};
use osproxy_engine::Pipeline;
use osproxy_server::auth::ReferenceAuthenticator;
use osproxy_server::cursor::HmacCursorSigner;
use osproxy_server::handler::AppHandler;
use osproxy_server::tenancy::ReferenceTenancy;
use osproxy_sink::OpenSearchSink;
use osproxy_tenancy::TenancyRouter;
use testcontainers::core::{ContainerPort, WaitFor};
use testcontainers::runners::AsyncRunner;
use testcontainers::{GenericImage, ImageExt};
use tokio::net::TcpListener;
const INDEX: &str = "osproxy-shared";
type HttpClient = Client<hyper_util::client::legacy::connect::HttpConnector, Full<Bytes>>;
async fn start_opensearch() -> (testcontainers::ContainerAsync<GenericImage>, String) {
let container = GenericImage::new("opensearchproject/opensearch", "2.11.1")
.with_exposed_port(ContainerPort::Tcp(9200))
.with_wait_for(WaitFor::message_on_stdout("] started"))
.with_env_var("discovery.type", "single-node")
.with_env_var("DISABLE_SECURITY_PLUGIN", "true")
.with_env_var("DISABLE_INSTALL_DEMO_CONFIG", "true")
.with_env_var("bootstrap.memory_lock", "false")
.with_env_var("OPENSEARCH_JAVA_OPTS", "-Xms512m -Xmx512m")
.start()
.await
.unwrap();
let host = container.get_host().await.unwrap();
let port = container.get_host_port_ipv4(9200).await.unwrap();
let base = format!("http://{host}:{port}");
(container, base)
}
async fn wait_ready(client: &HttpClient, base: &str) -> bool {
for _ in 0..60 {
if let Ok((200, _)) = get(client, &format!("{base}/_cluster/health")).await {
return true;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
false
}
async fn spawn_proxy(upstream: String) -> String {
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from(INDEX), upstream);
let handler = Arc::new(
AppHandler::new(
Pipeline::new(TenancyRouter::new(tenancy), sink),
ReferenceAuthenticator::dev(),
)
.with_require_tls_for_mutation(false),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
format!("http://{addr}")
}
async fn spawn_proxy_with_affinity(upstream: String) -> String {
let cluster = ClusterId::from("default");
let sink = OpenSearchSink::new();
let tenancy = ReferenceTenancy::new(cluster, IndexName::from(INDEX), upstream);
let pipeline = Pipeline::new(TenancyRouter::new(tenancy), sink)
.with_cursor_signer(Arc::new(HmacCursorSigner::new(b"scroll-test-key")));
let handler = Arc::new(
AppHandler::new(pipeline, ReferenceAuthenticator::dev())
.with_require_tls_for_mutation(false),
);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = osproxy_transport::serve(listener, handler).await;
});
format!("http://{addr}")
}
async fn get(client: &HttpClient, url: &str) -> Result<(u16, String), String> {
send(client, Method::GET, url, Bytes::new()).await
}
async fn get_with_tenant(
client: &HttpClient,
url: &str,
tenant: &str,
) -> Result<(u16, String), String> {
request_with_tenant(client, Method::GET, url, tenant, Bytes::new()).await
}
async fn request_with_tenant(
client: &HttpClient,
method: Method,
url: &str,
tenant: &str,
body: Bytes,
) -> Result<(u16, String), String> {
let req = Request::builder()
.method(method)
.uri(url)
.header("content-type", "application/json")
.header("x-tenant", tenant)
.body(Full::new(body))
.map_err(|e| e.to_string())?;
let resp: Response<_> = client.request(req).await.map_err(|e| e.to_string())?;
let status = resp.status().as_u16();
let bytes = resp
.into_body()
.collect()
.await
.map_err(|e| e.to_string())?
.to_bytes();
Ok((status, String::from_utf8_lossy(&bytes).into_owned()))
}
async fn send(
client: &HttpClient,
method: Method,
url: &str,
body: Bytes,
) -> Result<(u16, String), String> {
let req = Request::builder()
.method(method)
.uri(url)
.header("content-type", "application/json")
.body(Full::new(body))
.map_err(|e| e.to_string())?;
let resp: Response<_> = client.request(req).await.map_err(|e| e.to_string())?;
let status = resp.status().as_u16();
let bytes = resp
.into_body()
.collect()
.await
.map_err(|e| e.to_string())?
.to_bytes();
Ok((status, String::from_utf8_lossy(&bytes).into_owned()))
}
#[tokio::test]
#[ignore = "requires Docker; run with --ignored"]
async fn ingest_round_trips_to_real_opensearch() {
let client: HttpClient = Client::builder(TokioExecutor::new()).build_http();
let (_container, os_base) = start_opensearch().await;
assert!(
wait_ready(&client, &os_base).await,
"opensearch did not become ready"
);
let proxy = spawn_proxy(os_base.clone()).await;
let (status, body) = send(
&client,
Method::POST,
&format!("{proxy}/orders/_doc"),
Bytes::from_static(br#"{"tenant_id":"acme","id":7,"msg":"hello"}"#),
)
.await
.unwrap();
assert_eq!(status, 201, "proxy ingest failed: {body}");
assert!(body.contains(r#""_id":"7""#), "{body}");
assert!(
!body.contains("acme:7"),
"physical id leaked to client: {body}"
);
let (status, doc) = get(
&client,
&format!("{os_base}/{INDEX}/_doc/acme:7?routing=acme"),
)
.await
.unwrap();
assert_eq!(status, 200, "doc not found in opensearch: {doc}");
let parsed: serde_json::Value = serde_json::from_str(&doc).unwrap();
assert_eq!(parsed["_index"], INDEX);
assert_eq!(parsed["_id"], "acme:7");
assert_eq!(parsed["_source"]["_tenant"], "acme");
assert_eq!(parsed["_source"]["msg"], "hello");
assert_eq!(parsed["_routing"], "acme");
assert_logical_read(&client, &proxy).await;
assert_delete_removes(&client, &proxy).await;
}
async fn assert_logical_read(client: &HttpClient, proxy: &str) {
let (status, logical) = get_with_tenant(client, &format!("{proxy}/orders/_doc/7"), "acme")
.await
.unwrap();
assert_eq!(status, 200, "proxy read failed: {logical}");
let seen: serde_json::Value = serde_json::from_str(&logical).unwrap();
assert_eq!(seen["_index"], "orders");
assert_eq!(seen["_id"], "7");
assert!(seen.get("_routing").is_none(), "{logical}");
assert!(seen["_source"].get("_tenant").is_none(), "{logical}");
assert_eq!(seen["_source"]["msg"], "hello");
let (status, miss) = get_with_tenant(client, &format!("{proxy}/orders/_doc/999"), "acme")
.await
.unwrap();
assert_eq!(status, 404, "{miss}");
let miss: serde_json::Value = serde_json::from_str(&miss).unwrap();
assert_eq!(miss["_id"], "999");
assert_eq!(miss["found"], false);
}
async fn assert_delete_removes(client: &HttpClient, proxy: &str) {
let (status, deleted) = request_with_tenant(
client,
Method::DELETE,
&format!("{proxy}/orders/_doc/7"),
"acme",
Bytes::new(),
)
.await
.unwrap();
assert_eq!(status, 200, "{deleted}");
let deleted: serde_json::Value = serde_json::from_str(&deleted).unwrap();
assert_eq!(deleted["_id"], "7");
assert_eq!(deleted["result"], "deleted");
let (status, gone) = get_with_tenant(client, &format!("{proxy}/orders/_doc/7"), "acme")
.await
.unwrap();
assert_eq!(status, 404, "doc should be gone after delete: {gone}");
}
#[tokio::test]
#[ignore = "requires Docker; run with --ignored"]
async fn search_is_isolated_to_the_callers_partition() {
let client: HttpClient = Client::builder(TokioExecutor::new()).build_http();
let (_container, os_base) = start_opensearch().await;
assert!(
wait_ready(&client, &os_base).await,
"opensearch did not become ready"
);
let proxy = spawn_proxy(os_base.clone()).await;
for (tenant, id, msg) in [("acme", 1, "acme-doc"), ("globex", 1, "globex-doc")] {
let body = format!(r#"{{"tenant_id":"{tenant}","id":{id},"msg":"{msg}"}}"#);
let (status, b) = send(
&client,
Method::POST,
&format!("{proxy}/orders/_doc"),
Bytes::from(body),
)
.await
.unwrap();
assert_eq!(status, 201, "{b}");
}
let _ = send(
&client,
Method::POST,
&format!("{os_base}/{INDEX}/_refresh"),
Bytes::new(),
)
.await
.unwrap();
let (status, hits) = request_with_tenant(
&client,
Method::POST,
&format!("{proxy}/orders/_search"),
"acme",
Bytes::from_static(br#"{"query":{"match_all":{}}}"#),
)
.await
.unwrap();
assert_eq!(status, 200, "{hits}");
let parsed: serde_json::Value = serde_json::from_str(&hits).unwrap();
let hits_arr = parsed["hits"]["hits"].as_array().unwrap();
assert_eq!(hits_arr.len(), 1, "expected only acme's doc: {hits}");
let hit = &hits_arr[0];
assert_eq!(hit["_index"], "orders");
assert_eq!(hit["_id"], "1");
assert!(hit["_source"].get("_tenant").is_none(), "{hits}");
assert_eq!(hit["_source"]["msg"], "acme-doc");
assert!(!hits.contains("globex-doc"), "isolation breach: {hits}");
assert_count_is_partition_scoped(&client, &proxy).await;
}
async fn assert_count_is_partition_scoped(client: &HttpClient, proxy: &str) {
let (status, counted) = request_with_tenant(
client,
Method::POST,
&format!("{proxy}/orders/_count"),
"acme",
Bytes::from_static(br#"{"query":{"match_all":{}}}"#),
)
.await
.unwrap();
assert_eq!(status, 200, "{counted}");
let counted: serde_json::Value = serde_json::from_str(&counted).unwrap();
assert_eq!(
counted["count"], 1,
"count must be partition-scoped: {counted}"
);
}
#[tokio::test]
#[ignore = "requires Docker; run with --ignored"]
async fn bulk_demux_round_trips_to_real_opensearch() {
let client: HttpClient = Client::builder(TokioExecutor::new()).build_http();
let (_container, os_base) = start_opensearch().await;
assert!(
wait_ready(&client, &os_base).await,
"opensearch did not become ready"
);
let proxy = spawn_proxy(os_base.clone()).await;
let ndjson = concat!(
"{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":1,\"msg\":\"a1\"}\n",
"{\"index\":{}}\n{\"tenant_id\":\"globex\",\"id\":2,\"msg\":\"g2\"}\n",
"{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":3,\"msg\":\"a3\"}\n",
);
let (status, body) = send(
&client,
Method::POST,
&format!("{proxy}/orders/_bulk"),
Bytes::from_static(ndjson.as_bytes()),
)
.await
.unwrap();
assert_eq!(status, 200, "{body}");
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["errors"], false, "{body}");
let items = parsed["items"].as_array().unwrap();
assert_eq!(items.len(), 3);
for item in items {
assert_eq!(item["index"]["status"], 201, "{body}");
}
let _ = send(
&client,
Method::POST,
&format!("{os_base}/{INDEX}/_refresh"),
Bytes::new(),
)
.await
.unwrap();
let (status, counted) = request_with_tenant(
&client,
Method::POST,
&format!("{proxy}/orders/_count"),
"acme",
Bytes::from_static(br#"{"query":{"match_all":{}}}"#),
)
.await
.unwrap();
assert_eq!(status, 200, "{counted}");
let counted: serde_json::Value = serde_json::from_str(&counted).unwrap();
assert_eq!(counted["count"], 2, "acme bulk docs, isolated: {counted}");
assert_bulk_create_and_update(&client, &os_base, &proxy).await;
}
async fn assert_bulk_create_and_update(client: &HttpClient, os_base: &str, proxy: &str) {
let ndjson = concat!(
"{\"create\":{\"_id\":\"5\"}}\n{\"msg\":\"c5\"}\n",
"{\"update\":{\"_id\":\"6\"}}\n{\"doc\":{\"msg\":\"u6\"},\"upsert\":{\"msg\":\"up6\"}}\n",
"{\"update\":{\"_id\":\"1\"}}\n{\"doc\":{\"msg\":\"a1-patched\"}}\n",
);
let (status, body) = request_with_tenant(
client,
Method::POST,
&format!("{proxy}/orders/_bulk"),
"acme",
Bytes::from(ndjson.to_owned()),
)
.await
.unwrap();
assert_eq!(status, 200, "{body}");
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["errors"], false, "create+update bulk: {body}");
let _ = send(
client,
Method::POST,
&format!("{os_base}/{INDEX}/_refresh"),
Bytes::new(),
)
.await
.unwrap();
for (id, msg) in [("5", "c5"), ("6", "up6"), ("1", "a1-patched")] {
let (status, doc) = get(
client,
&format!("{os_base}/{INDEX}/_doc/acme:{id}?routing=acme"),
)
.await
.unwrap();
assert_eq!(status, 200, "acme:{id} missing: {doc}");
let parsed: serde_json::Value = serde_json::from_str(&doc).unwrap();
assert_eq!(parsed["_source"]["_tenant"], "acme", "{doc}");
assert_eq!(parsed["_source"]["msg"], msg, "{doc}");
}
let (status, body) = request_with_tenant(
client,
Method::POST,
&format!("{proxy}/orders/_bulk"),
"acme",
Bytes::from_static(b"{\"create\":{\"_id\":\"5\"}}\n{\"msg\":\"dup\"}\n"),
)
.await
.unwrap();
assert_eq!(status, 200, "{body}");
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(
parsed["errors"], true,
"create conflict should error: {body}"
);
assert_eq!(parsed["items"][0]["create"]["status"], 409, "{body}");
}
#[tokio::test]
#[ignore = "requires Docker; run with --ignored"]
async fn blind_diagnosis_for_success_and_failure() {
let client: HttpClient = Client::builder(TokioExecutor::new()).build_http();
let (_container, os_base) = start_opensearch().await;
assert!(
wait_ready(&client, &os_base).await,
"opensearch did not become ready"
);
let proxy = spawn_proxy(os_base).await;
let req = Request::builder()
.method(Method::POST)
.uri(format!("{proxy}/orders/_doc"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from_static(
br#"{"tenant_id":"acme","id":1}"#,
)))
.unwrap();
let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), 201);
let request_id = resp
.headers()
.get("x-request-id")
.unwrap()
.to_str()
.unwrap()
.to_owned();
let (status, explain) = get(&client, &format!("{proxy}/debug/explain/{request_id}"))
.await
.unwrap();
assert_eq!(status, 200);
assert!(explain.contains(r#""outcome":"ok""#), "{explain}");
assert!(explain.contains(r#""partition_id":"acme""#), "{explain}");
assert!(explain.contains(r#""upstream_status":201"#), "{explain}");
assert!(!explain.contains("\"hello\""), "value leaked: {explain}");
let (status, body) = send(
&client,
Method::POST,
&format!("{proxy}/orders/_doc"),
Bytes::from_static(br#"{"id":2}"#),
)
.await
.unwrap();
assert_eq!(status, 400);
assert!(body.contains("partition_unresolved"), "{body}");
}
async fn seed_acme_docs(client: &HttpClient, proxy: &str, os_base: &str) {
for id in 1..=3 {
let (status, body) = send(
client,
Method::POST,
&format!("{proxy}/orders/_doc"),
Bytes::from(format!(r#"{{"tenant_id":"acme","id":{id},"msg":"m{id}"}}"#)),
)
.await
.unwrap();
assert_eq!(status, 201, "ingest {id}: {body}");
}
let _ = send(
client,
Method::POST,
&format!("{os_base}/{INDEX}/_refresh"),
Bytes::new(),
)
.await
.unwrap();
}
#[tokio::test]
#[ignore = "requires Docker; run with --ignored"]
async fn scroll_create_and_continue_round_trip_through_the_proxy() {
let client: HttpClient = Client::builder(TokioExecutor::new()).build_http();
let (_container, os_base) = start_opensearch().await;
assert!(wait_ready(&client, &os_base).await, "opensearch not ready");
let proxy = spawn_proxy_with_affinity(os_base.clone()).await;
seed_acme_docs(&client, &proxy, &os_base).await;
let (status, body) = request_with_tenant(
&client,
Method::POST,
&format!("{proxy}/orders/_search?scroll=1m"),
"acme",
Bytes::from_static(br#"{"size":1,"query":{"match_all":{}}}"#),
)
.await
.unwrap();
assert_eq!(status, 200, "scroll open: {body}");
let v: serde_json::Value = serde_json::from_str(&body).unwrap();
let scroll_id = v["_scroll_id"]
.as_str()
.expect("scroll create returns a _scroll_id");
assert!(
scroll_id.contains('.'),
"the scroll id must be a wrapped envelope, not the raw upstream id: {scroll_id}"
);
assert_eq!(
v["hits"]["hits"].as_array().unwrap().len(),
1,
"first page has one hit: {body}"
);
let (status, body) = send(
&client,
Method::POST,
&format!("{proxy}/_search/scroll"),
Bytes::from(format!(r#"{{"scroll":"1m","scroll_id":"{scroll_id}"}}"#)),
)
.await
.unwrap();
assert_eq!(status, 200, "scroll continue: {body}");
let v2: serde_json::Value = serde_json::from_str(&body).unwrap();
assert!(
v2["_scroll_id"].as_str().is_some_and(|s| s.contains('.')),
"the continue response re-wraps the next page id: {body}"
);
assert_eq!(
v2["hits"]["hits"].as_array().unwrap().len(),
1,
"second page has one hit: {body}"
);
}
#[tokio::test]
#[ignore = "requires Docker; run with --ignored"]
async fn pit_create_search_and_close_round_trip_through_the_proxy() {
let client: HttpClient = Client::builder(TokioExecutor::new()).build_http();
let (_container, os_base) = start_opensearch().await;
assert!(wait_ready(&client, &os_base).await, "opensearch not ready");
let proxy = spawn_proxy_with_affinity(os_base.clone()).await;
seed_acme_docs(&client, &proxy, &os_base).await;
let (status, body) = request_with_tenant(
&client,
Method::POST,
&format!("{proxy}/orders/_search/point_in_time?keep_alive=5m"),
"acme",
Bytes::new(),
)
.await
.unwrap();
assert_eq!(status, 200, "pit create: {body}");
let v: serde_json::Value = serde_json::from_str(&body).unwrap();
let pit_id = v["pit_id"].as_str().expect("create returns a pit_id");
assert!(
pit_id.contains('.'),
"the pit id must be a wrapped envelope, not the raw upstream id: {pit_id}"
);
let (status, body) = request_with_tenant(
&client,
Method::POST,
&format!("{proxy}/_search"),
"acme",
Bytes::from(format!(
r#"{{"query":{{"match_all":{{}}}},"pit":{{"id":"{pit_id}","keep_alive":"5m"}}}}"#
)),
)
.await
.unwrap();
assert_eq!(status, 200, "pit search: {body}");
let v: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(
v["hits"]["total"]["value"].as_u64(),
Some(3),
"pit search sees acme's three docs, partition-scoped: {body}"
);
assert!(
v["pit_id"].as_str().is_some_and(|s| s.contains('.')),
"the search response re-wraps the refreshed pit_id: {body}"
);
let (status, body) = send(
&client,
Method::DELETE,
&format!("{proxy}/_search/point_in_time"),
Bytes::from(format!(r#"{{"pit_id":["{pit_id}"]}}"#)),
)
.await
.unwrap();
assert_eq!(status, 200, "pit close: {body}");
let v: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(
v["pits"][0]["successful"].as_bool(),
Some(true),
"the pit was closed on its pinned cluster: {body}"
);
}