use super::*;
use osproxy_core::{ClusterId, Epoch, IndexName, PartitionId, Target};
use osproxy_spi::{IdTemplate, InjectedField, InjectedValue, Protocol, RouteDecision};
use serde_json::json;
fn resolved(transform: BodyTransform) -> Resolved {
Resolved {
partition: PartitionId::from("acme"),
decision: RouteDecision {
target: Target::new(ClusterId::from("eu-1"), IndexName::from("shared")),
upstream_protocol: Protocol::Http1,
header_ops: Vec::new(),
body_transform: transform,
epoch: Epoch::new(4),
},
migration: osproxy_spi::MigrationPhase::Settled,
}
}
fn shared_transform() -> BodyTransform {
BodyTransform::Both {
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
id: DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true),
}
}
#[test]
fn read_op_maps_logical_id_and_sets_routing() {
let (op, shape) = build_read_op(&resolved(shared_transform()), "7").unwrap();
assert_eq!(op.id, "acme:7");
assert_eq!(op.routing.as_deref(), Some("acme"));
assert_eq!(op.target.index.as_str(), "shared");
assert_eq!(shape.inject_names, vec![FieldName::from("_tenant")]);
}
#[test]
fn read_op_without_id_rule_uses_client_id() {
let (op, _) = build_read_op(&resolved(BodyTransform::None), "raw-id").unwrap();
assert_eq!(op.id, "raw-id");
assert!(op.routing.is_none());
}
#[test]
fn found_response_is_the_logical_document() {
let upstream = br#"{
"_index": "shared",
"_id": "acme:7",
"_routing": "acme",
"found": true,
"_source": { "_tenant": "acme", "msg": "hi" }
}"#;
let body = shape_found(upstream, "orders", "7", &[FieldName::from("_tenant")]).unwrap();
let doc: Value = serde_json::from_slice(&body).unwrap();
assert_eq!(doc["_index"], "orders");
assert_eq!(doc["_id"], "7");
assert!(doc.get("_routing").is_none());
assert!(doc["_source"].get("_tenant").is_none());
assert_eq!(doc["_source"]["msg"], "hi");
}
#[test]
fn not_found_body_is_logical() {
let doc: Value = serde_json::from_slice(¬_found_body("orders", "7")).unwrap();
assert_eq!(doc["_index"], "orders");
assert_eq!(doc["_id"], "7");
assert_eq!(doc["found"], false);
}
#[test]
fn delete_op_maps_logical_id_and_sets_routing() {
let op = build_delete_op(&resolved(shared_transform()), "7").unwrap();
assert_eq!(op.epoch, Epoch::new(4));
let DocOp::Delete { id, routing } = &op.doc else {
unreachable!("delete-by-id produces a Delete op")
};
assert_eq!(id, "acme:7");
assert_eq!(routing.as_deref(), Some("acme"));
}
#[test]
fn delete_response_reports_logical_terms() {
let ok: Value = serde_json::from_slice(&shape_delete("orders", "7", 200)).unwrap();
assert_eq!(ok["_index"], "orders");
assert_eq!(ok["_id"], "7");
assert_eq!(ok["result"], "deleted");
let miss: Value = serde_json::from_slice(&shape_delete("orders", "7", 404)).unwrap();
assert_eq!(miss["result"], "not_found");
}
#[test]
fn search_op_wraps_client_query_in_the_partition_filter() {
let (op, _) = build_search_op(
&resolved(shared_transform()),
br#"{"query":{"match_all":{}}}"#,
)
.unwrap();
let q: Value = serde_json::from_slice(&op.body).unwrap();
assert_eq!(q["query"]["bool"]["filter"][0]["term"]["_tenant"], "acme");
assert_eq!(q["query"]["bool"]["must"][0]["match_all"], json!({}));
}
#[test]
fn a_decorative_injected_field_is_stripped_but_never_filtered() {
let transform = BodyTransform::Both {
inject: vec![
InjectedField::new(FieldName::from("_tenant"), InjectedValue::PartitionId),
InjectedField::new(
FieldName::from("_region"),
InjectedValue::Constant(json!("eu")),
),
],
id: DocIdRule::new(IdTemplate::new("{partition}:{body.id}")),
};
let (op, shape) =
build_search_op(&resolved(transform), br#"{"query":{"match_all":{}}}"#).unwrap();
let q: Value = serde_json::from_slice(&op.body).unwrap();
let filter = q["query"]["bool"]["filter"].as_array().unwrap();
assert_eq!(filter.len(), 1, "exactly one isolation term: {q}");
assert_eq!(filter[0]["term"]["_tenant"], "acme");
assert!(
filter[0]["term"].get("_region").is_none(),
"the decorative field must not be filtered: {q}"
);
assert_eq!(
shape.inject_names,
vec![FieldName::from("_tenant"), FieldName::from("_region")]
);
}
#[test]
fn hits_are_stripped_to_the_logical_view() {
let upstream = br#"{
"hits": { "total": { "value": 1 }, "hits": [
{ "_index": "shared", "_id": "acme:7", "_routing": "acme",
"_source": { "_tenant": "acme", "msg": "hi" } }
] }
}"#;
let shape = read_shape(&shared_transform());
let body = shape_hits(upstream, "orders", "acme", &shape).unwrap();
let doc: Value = serde_json::from_slice(&body).unwrap();
let hit = &doc["hits"]["hits"][0];
assert_eq!(hit["_index"], "orders");
assert_eq!(hit["_id"], "7");
assert!(hit.get("_routing").is_none());
assert!(hit["_source"].get("_tenant").is_none());
assert_eq!(hit["_source"]["msg"], "hi");
}
#[test]
fn top_level_siblings_including_aggregations_pass_through_untouched() {
let upstream = br#"{
"took": 5,
"_shards": { "total": 3, "successful": 3 },
"hits": { "total": { "value": 1 }, "hits": [
{ "_index": "shared", "_id": "acme:7", "_routing": "acme",
"_source": { "_tenant": "acme", "msg": "hi" } }
] },
"aggregations": { "by_day": { "buckets": [ { "key": 1, "doc_count": 9 } ] } }
}"#;
let shape = read_shape(&shared_transform());
let body = shape_hits(upstream, "orders", "acme", &shape).unwrap();
let doc: Value = serde_json::from_slice(&body).unwrap();
assert_eq!(doc["hits"]["hits"][0]["_index"], "orders");
assert!(doc["hits"]["hits"][0]["_source"].get("_tenant").is_none());
assert_eq!(doc["took"], 5);
assert_eq!(doc["_shards"]["successful"], 3);
assert_eq!(doc["aggregations"]["by_day"]["buckets"][0]["doc_count"], 9);
}
#[test]
fn a_non_object_response_passes_through_unchanged() {
let shape = read_shape(&shared_transform());
assert_eq!(
shape_hits(b"[1,2,3]", "orders", "acme", &shape).unwrap(),
b"[1,2,3]"
);
assert!(shape_hits(b"not json", "orders", "acme", &shape).is_err());
}