#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use osproxy_core::{
ClusterId, EndpointKind, FieldName, IndexName, PartitionId, PrincipalId, RequestId, Target,
};
use osproxy_engine::{Pipeline, PipelineResponse};
use osproxy_sink::{MemorySink, ReadOp, Reader, Sink, SinkError};
use osproxy_spi::{
BodyDoc, DocIdRule, HeaderView, HttpMethod, IdTemplate, InjectedField, InjectedValue, JsonPath,
PartitionKeySpec, Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec,
SpiError, TenancySpi,
};
use osproxy_tenancy::{PlacementTable, TenancyRouter};
use proptest::prelude::*;
use serde_json::Value;
struct SharedTenancy {
table: Arc<PlacementTable>,
}
impl TenancySpi for SharedTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::AnyOf(vec![
PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
PartitionKeySpec::Header("x-tenant".to_owned()),
]),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, p: &PartitionId) -> Result<PlacementAt, SpiError> {
self.table.get(p).ok_or_else(|| SpiError::PlacementMissing {
partition: p.clone(),
})
}
}
fn pipeline() -> Pipeline<TenancyRouter<SharedTenancy>, MemorySink> {
let table = Arc::new(PlacementTable::new());
for (partition, index) in [("acme", "acme-idx"), ("globex", "globex-idx")] {
table.set(
PartitionId::from(partition),
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from(index),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
},
);
}
Pipeline::new(
TenancyRouter::new(SharedTenancy { table }),
MemorySink::new(),
)
}
async fn bulk(
p: &Pipeline<TenancyRouter<SharedTenancy>, MemorySink>,
body: &[u8],
) -> PipelineResponse {
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("b");
let headers = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
body,
);
p.handle(&ctx).await.unwrap()
}
async fn bulk_streamed(
p: &Pipeline<TenancyRouter<SharedTenancy>, MemorySink>,
body: &[u8],
) -> PipelineResponse {
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("b");
let headers = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
b"",
);
p.handle_bulk_streamed(&ctx, osproxy_sink::buffered(body.to_vec().into()))
.await
.0
.unwrap()
}
fn target(index: &str) -> Target {
Target::new(ClusterId::from("eu-1"), IndexName::from(index))
}
#[tokio::test]
async fn streamed_bulk_matches_the_buffered_path() {
let body = concat!(
"{\"index\":{\"_id\":\"1\"}}\n{\"tenant_id\":\"acme\",\"id\":1}\n",
"{\"index\":{\"_id\":\"2\"}}\n{\"tenant_id\":\"globex\",\"id\":2}\n",
"{\"index\":{\"_id\":\"3\"}}\n{\"tenant_id\":\"acme\",\"id\":3}\n",
)
.as_bytes();
let buffered: Value = serde_json::from_slice(&bulk(&pipeline(), body).await.body).unwrap();
let streamed: Value =
serde_json::from_slice(&bulk_streamed(&pipeline(), body).await.body).unwrap();
assert_eq!(streamed["errors"], buffered["errors"]);
assert_eq!(streamed["items"], buffered["items"]);
assert_eq!(streamed["items"].as_array().unwrap().len(), 3);
}
#[tokio::test]
async fn a_target_flushes_mid_batch_at_the_byte_threshold() {
use std::fmt::Write as _;
let p = pipeline();
let big = "x".repeat(1024 * 1024);
let mut body = String::new();
for i in 0..5 {
body.push_str("{\"index\":{}}\n");
let _ = writeln!(
body,
"{{\"tenant_id\":\"acme\",\"id\":{i},\"data\":\"{big}\"}}"
);
}
let resp = bulk(&p, body.as_bytes()).await;
assert_eq!(resp.status, 200);
assert!(
p.sink().recorded().len() >= 2,
"byte-flush should split the sub-batch mid-stream: {} writes",
p.sink().recorded().len()
);
}
#[tokio::test]
async fn streamed_bulk_positions_a_per_item_failure_in_place() {
let body = concat!(
"{\"index\":{\"_id\":\"1\"}}\n{\"tenant_id\":\"acme\",\"id\":1}\n",
"{\"index\":{\"_id\":\"2\"}}\n{\"no_tenant\":true}\n",
)
.as_bytes();
let resp: Value = serde_json::from_slice(&bulk_streamed(&pipeline(), body).await.body).unwrap();
let items = resp["items"].as_array().unwrap();
assert_eq!(resp["errors"], true);
assert!(items[0]["index"].get("error").is_none(), "first ok");
assert!(
items[1]["index"].get("error").is_some(),
"second failed in place"
);
}
#[tokio::test]
async fn mixed_partition_bulk_demuxes_preserves_order_and_isolates() {
let p = pipeline();
let body = concat!(
"{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":1,\"msg\":\"a1\"}\n",
"{\"index\":{}}\n{\"tenant_id\":\"globex\",\"id\":2,\"msg\":\"g2\"}\n",
"{\"index\":{}}\n{\"id\":3,\"msg\":\"orphan\"}\n",
"{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":4,\"msg\":\"a4\"}\n",
);
let resp = bulk(&p, body.as_bytes()).await;
assert_eq!(resp.status, 200);
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["errors"], true, "{doc}");
let items = doc["items"].as_array().unwrap();
assert_eq!(items.len(), 4);
assert_eq!(items[0]["index"]["_id"], "1");
assert_eq!(items[0]["index"]["status"], 201);
assert_eq!(items[1]["index"]["_id"], "2");
assert_eq!(items[1]["index"]["status"], 201);
assert_eq!(items[2]["index"]["status"], 400);
assert_eq!(items[2]["index"]["error"]["type"], "partition_unresolved");
assert_eq!(items[3]["index"]["_id"], "4");
assert_eq!(items[3]["index"]["status"], 201);
let sink = p.sink();
let a1 = sink
.get(ReadOp::new(
target("acme-idx"),
"acme:1",
Some("acme".into()),
))
.await
.unwrap();
assert!(a1.found, "acme:1 should be in acme-idx");
let g2 = sink
.get(ReadOp::new(
target("globex-idx"),
"globex:2",
Some("globex".into()),
))
.await
.unwrap();
assert!(g2.found, "globex:2 should be in globex-idx");
let cross = sink
.get(ReadOp::new(target("globex-idx"), "acme:1", None))
.await
.unwrap();
assert!(!cross.found, "acme:1 must not be in globex-idx");
}
#[tokio::test]
async fn all_succeed_reports_no_errors() {
let p = pipeline();
let body = concat!(
"{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":1}\n",
"{\"delete\":{\"_id\":\"1\"}}\n",
);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("b");
let headers = vec![("x-tenant".to_owned(), "acme".to_owned())];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
body.as_bytes(),
);
let resp = p.handle(&ctx).await.unwrap();
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["errors"], false, "{doc}");
let items = doc["items"].as_array().unwrap();
assert_eq!(items.len(), 2);
assert_eq!(items[0]["index"]["result"], "created");
assert_eq!(items[1]["delete"]["_id"], "1");
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(48))]
#[test]
fn bulk_preserves_item_order(ops in prop::collection::vec(
(prop_oneof![Just("acme"), Just("globex")], 0u32..1000),
0..12,
)) {
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async {
use std::fmt::Write as _;
let p = pipeline();
let mut body = String::new();
for (tenant, id) in &ops {
let _ = write!(
body,
"{{\"index\":{{}}}}\n{{\"tenant_id\":\"{tenant}\",\"id\":{id}}}\n"
);
}
let resp = bulk(&p, body.as_bytes()).await;
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
let items = doc["items"].as_array().unwrap();
prop_assert_eq!(items.len(), ops.len());
for (item, (_tenant, id)) in items.iter().zip(&ops) {
prop_assert_eq!(&item["index"]["_id"], &Value::from(id.to_string()));
prop_assert_eq!(&item["index"]["status"], &Value::from(201));
}
Ok(())
})?;
}
}
#[tokio::test]
async fn per_item_errors_are_positioned_and_typed() {
let p = pipeline();
let body = concat!(
"{\"update\":{}}\n{\"doc\":{}}\n", "{\"delete\":{}}\n", "{\"index\":{}}\n{\"tenant_id\":\"ghost\",\"id\":9}\n", );
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("b");
let headers = vec![("x-tenant".to_owned(), "acme".to_owned())];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
body.as_bytes(),
);
let resp = p.handle(&ctx).await.unwrap();
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["errors"], true);
let items = doc["items"].as_array().unwrap();
assert_eq!(items[0]["update"]["error"]["type"], "update_without_id");
assert_eq!(items[1]["delete"]["error"]["type"], "delete_without_id");
assert_eq!(items[2]["index"]["error"]["type"], "placement_missing");
assert_eq!(items[2]["index"]["status"], 404);
}
#[tokio::test]
async fn action_line_id_is_mapped_logical_to_physical() {
let p = pipeline();
let body = "{\"index\":{\"_id\":\"99\"}}\n{\"tenant_id\":\"acme\",\"id\":1}\n";
let resp = bulk(&p, body.as_bytes()).await;
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["items"][0]["index"]["_id"], "99");
let hit = p
.sink()
.get(ReadOp::new(
target("acme-idx"),
"acme:99",
Some("acme".into()),
))
.await
.unwrap();
assert!(hit.found, "acme:99 should be stored");
}
#[tokio::test]
async fn large_bulk_flushes_mid_stream_without_dropping_or_reordering() {
use std::fmt::Write as _;
const COUNT: usize = 600;
let p = pipeline();
let mut body = String::new();
for id in 0..COUNT {
let _ = write!(
body,
"{{\"index\":{{}}}}\n{{\"tenant_id\":\"acme\",\"id\":{id},\"msg\":\"m{id}\"}}\n"
);
}
let resp = bulk(&p, body.as_bytes()).await;
assert_eq!(resp.status, 200);
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["errors"], false, "no errors expected");
let items = doc["items"].as_array().unwrap();
assert_eq!(items.len(), COUNT);
for (id, item) in items.iter().enumerate() {
assert_eq!(item["index"]["_id"], id.to_string(), "order preserved");
assert_eq!(item["index"]["status"], 201);
}
for id in ["0", "599"] {
let hit = p
.sink()
.get(ReadOp::new(
target("acme-idx"),
format!("acme:{id}"),
Some("acme".into()),
))
.await
.unwrap();
assert!(hit.found, "acme:{id} should be stored");
}
}
#[tokio::test]
async fn create_action_routes_through_the_create_op() {
let p = pipeline();
let body = "{\"create\":{}}\n{\"tenant_id\":\"acme\",\"id\":5,\"msg\":\"c5\"}\n";
let resp = bulk(&p, body.as_bytes()).await;
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["errors"], false, "{doc}");
assert_eq!(doc["items"][0]["create"]["_id"], "5");
assert_eq!(doc["items"][0]["create"]["status"], 201);
let hit = p
.sink()
.get(ReadOp::new(
target("acme-idx"),
"acme:5",
Some("acme".into()),
))
.await
.unwrap();
assert!(hit.found, "acme:5 should be stored by the create op");
}
#[tokio::test]
async fn update_upsert_injects_tenancy_and_round_trips() {
let p = pipeline();
let body = concat!(
"{\"update\":{\"_id\":\"5\"}}\n",
"{\"doc\":{\"msg\":\"patched\"},\"upsert\":{\"msg\":\"made\"}}\n",
);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("b");
let headers = vec![("x-tenant".to_owned(), "acme".to_owned())];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
body.as_bytes(),
);
let resp = p.handle(&ctx).await.unwrap();
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["errors"], false, "{doc}");
assert_eq!(doc["items"][0]["update"]["_id"], "5");
let hit = p
.sink()
.get(ReadOp::new(
target("acme-idx"),
"acme:5",
Some("acme".into()),
))
.await
.unwrap();
assert!(hit.found, "acme:5 should exist after the upsert");
let stored: Value = serde_json::from_slice(&hit.body).unwrap();
assert_eq!(stored["_source"]["_tenant"], "acme");
assert_eq!(stored["_source"]["msg"], "made");
}
#[tokio::test]
async fn upstream_failure_positions_502_for_that_target() {
struct FailSink;
impl Sink for FailSink {
async fn write(
&self,
_b: osproxy_sink::WriteBatch,
) -> Result<osproxy_sink::WriteAck, SinkError> {
Err(SinkError::Transport { kind: "boom" })
}
}
impl Reader for FailSink {
async fn get(&self, _o: ReadOp) -> Result<osproxy_sink::ReadOutcome, SinkError> {
unreachable!("reads not exercised here")
}
async fn search(
&self,
_o: osproxy_sink::SearchOp,
) -> Result<osproxy_sink::SearchOutcome, SinkError> {
unreachable!("searches not exercised here")
}
async fn count(
&self,
_o: osproxy_sink::SearchOp,
) -> Result<osproxy_sink::CountOutcome, SinkError> {
unreachable!("counts not exercised here")
}
}
let table = Arc::new(PlacementTable::new());
table.set(
PartitionId::from("acme"),
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from("acme-idx"),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
},
);
let p = Pipeline::new(TenancyRouter::new(SharedTenancy { table }), FailSink);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("b");
let headers = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
b"{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":1}\n",
);
let resp = p.handle(&ctx).await.unwrap();
let doc: Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(doc["errors"], true);
assert_eq!(doc["items"][0]["index"]["status"], 502);
assert_eq!(doc["items"][0]["index"]["error"]["type"], "upstream_failed");
}
#[tokio::test]
async fn malformed_bulk_body_is_a_request_error() {
let p = pipeline();
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("b");
let headers = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
b"{\"index\":{}}\n",
);
assert!(p.handle(&ctx).await.is_err());
}