#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use osproxy_core::{
ClusterId, EndpointKind, FieldName, IndexName, PartitionId, PrincipalId, RequestId,
};
use osproxy_engine::{Pipeline, PipelineResponse};
use osproxy_sink::MemorySink;
use osproxy_spi::{
BodyDoc, DocIdRule, HeaderView, HttpMethod, IdTemplate, InjectedField, InjectedValue, JsonPath,
PartitionKeySpec, Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec,
SpiError, TenancySpi,
};
use osproxy_tenancy::{PlacementTable, TenancyRouter};
use proptest::prelude::*;
use serde_json::{json, Map, Value};
const PARTITION: &str = "acme";
const LOGICAL_INDEX: &str = "orders";
struct SharedTenancy {
table: Arc<PlacementTable>,
}
impl TenancySpi for SharedTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::AnyOf(vec![
PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
PartitionKeySpec::Header("x-tenant".to_owned()),
]),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, p: &PartitionId) -> Result<PlacementAt, SpiError> {
self.table.get(p).ok_or_else(|| SpiError::PlacementMissing {
partition: p.clone(),
})
}
}
fn pipeline() -> Pipeline<TenancyRouter<SharedTenancy>, MemorySink> {
let table = Arc::new(PlacementTable::new());
table.set(
PartitionId::from(PARTITION),
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from("shared"),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
},
);
Pipeline::new(
TenancyRouter::new(SharedTenancy { table }),
MemorySink::new(),
)
}
async fn write(
p: &Pipeline<TenancyRouter<SharedTenancy>, MemorySink>,
body: &[u8],
) -> PipelineResponse {
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("w");
let headers = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestDoc,
Protocol::Http1,
LOGICAL_INDEX,
HeaderView::new(&headers),
body,
);
p.handle(&ctx).await.unwrap()
}
async fn read(
p: &Pipeline<TenancyRouter<SharedTenancy>, MemorySink>,
logical_id: &str,
) -> PipelineResponse {
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![("x-tenant".to_owned(), PARTITION.to_owned())];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Get,
EndpointKind::GetById,
Protocol::Http1,
LOGICAL_INDEX,
HeaderView::new(&headers),
b"",
)
.with_doc_id(Some(logical_id));
p.handle(&ctx).await.unwrap()
}
fn client_doc() -> impl Strategy<Value = (i64, Value)> {
let leaf = prop_oneof![
any::<bool>().prop_map(Value::from),
any::<i64>().prop_map(Value::from),
"[a-z ]{0,12}".prop_map(Value::from),
];
let extras = prop::collection::vec(("[a-z]{1,6}", leaf), 0..5);
(any::<i64>(), extras).prop_map(|(id, entries)| {
let mut obj = Map::new();
obj.insert("tenant_id".to_owned(), json!(PARTITION));
obj.insert("id".to_owned(), json!(id));
for (k, v) in entries {
if k != "id" && k != "tenant_id" {
obj.insert(k, v);
}
}
(id, Value::Object(obj))
})
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(64))]
#[test]
fn write_then_read_recovers_the_logical_document((id, doc) in client_doc()) {
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async {
let p = pipeline();
let body = serde_json::to_vec(&doc).unwrap();
let w = write(&p, &body).await;
prop_assert_eq!(w.status, 201);
let r = read(&p, &id.to_string()).await;
prop_assert_eq!(r.status, 200);
let got: Value = serde_json::from_slice(&r.body).unwrap();
prop_assert_eq!(&got["_id"], &json!(id.to_string()));
prop_assert_eq!(&got["_index"], &json!(LOGICAL_INDEX));
prop_assert!(got.get("_routing").is_none());
prop_assert!(got["_source"].get("_tenant").is_none());
prop_assert_eq!(&got["_source"], &doc);
Ok(())
})?;
}
}