#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use dhat::{HeapStats, Profiler};
use osproxy_core::{
ClusterId, EndpointKind, FieldName, IndexName, PartitionId, PrincipalId, RequestId,
};
use osproxy_engine::Pipeline;
use osproxy_sink::MemorySink;
use osproxy_spi::{
BodyDoc, DocIdRule, HeaderView, HttpMethod, IdTemplate, InjectedField, InjectedValue, JsonPath,
PartitionKeySpec, Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec,
SpiError, TenancySpi,
};
use osproxy_tenancy::{PlacementTable, TenancyRouter};
#[global_allocator]
static ALLOC: dhat::Alloc = dhat::Alloc;
fn allocs(f: impl FnOnce()) -> u64 {
let before = HeapStats::get().total_blocks;
f();
HeapStats::get().total_blocks - before
}
struct SharedTenancy {
table: Arc<PlacementTable>,
}
impl TenancySpi for SharedTenancy {
fn resolve_partition(
&self,
ctx: &RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<PartitionId, SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, p: &PartitionId) -> Result<PlacementAt, SpiError> {
self.table.get(p).ok_or_else(|| SpiError::PlacementMissing {
partition: p.clone(),
})
}
}
fn pipeline() -> Pipeline<TenancyRouter<SharedTenancy>, MemorySink> {
let table = Arc::new(PlacementTable::new());
table.set(
PartitionId::from("acme"),
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from("shared"),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
},
);
Pipeline::new(
TenancyRouter::new(SharedTenancy { table }),
MemorySink::new(),
)
}
fn bulk_body(n: usize) -> Vec<u8> {
let mut body = Vec::new();
for i in 0..n {
body.extend_from_slice(b"{\"index\":{}}\n");
body.extend_from_slice(
format!("{{\"tenant_id\":\"acme\",\"id\":\"k{i}\",\"msg\":\"hello\"}}\n").as_bytes(),
);
}
body
}
fn bulk_handle_allocs(rt: &tokio::runtime::Runtime, n: usize) -> u64 {
let p = pipeline();
let body = bulk_body(n);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("bulk");
let headers = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestBulk,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
&body,
);
allocs(|| {
let resp = rt.block_on(p.handle(&ctx)).unwrap();
assert_eq!(resp.status, 200, "bulk handled");
})
}
#[test]
fn bulk_demux_marginal_allocation_budget() {
if std::env::var_os("LLVM_PROFILE_FILE").is_some() {
return;
}
let _profiler = Profiler::builder().testing().build();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let a1 = bulk_handle_allocs(&rt, 1);
let a41 = bulk_handle_allocs(&rt, 41);
let marginal = a41.saturating_sub(a1) / 40;
eprintln!("BULK_ALLOC a1={a1} a41={a41} marginal_per_doc={marginal}");
assert!(
marginal <= 56,
"bulk per-document allocation budget: {marginal} > 56 \
(a per-item placement clone or a Value-tree response line would exceed it)"
);
}