osproxy_server/tenancy.rs
1//! The reference tenancy implementation the binary serves.
2//!
3//! A minimal but complete [`TenancySpi`]: the partition is the `tenant_id`
4//! body field on ingest (or the `x-tenant` header on by-id reads, which carry
5//! no body), every document gets a `_tenant` field and a `{partition}:{body.id}`
6//! id with routing, and every partition lives on one shared index. It exists to
7//! make the binary runnable and to demonstrate the SPI; real consumers provide
8//! their own.
9
10use osproxy_core::{ClusterId, Epoch, FieldName, IndexName, PartitionId};
11use osproxy_spi::{
12 BodyDoc, DocIdRule, IdTemplate, InjectedField, InjectedValue, JsonPath, PartitionKeySpec,
13 Placement, PlacementAt, SpiError, TenancySpi,
14};
15
16/// The injected tenancy field name.
17const TENANT_FIELD: &str = "_tenant";
18
19/// The header carrying the partition on by-id reads (which have no body).
20const TENANT_HEADER: &str = "x-tenant";
21
22/// A single-shared-index tenancy: all partitions share one physical index,
23/// isolated by an injected `_tenant` field.
24#[derive(Debug)]
25pub struct ReferenceTenancy {
26 cluster: ClusterId,
27 index: IndexName,
28 endpoint: String,
29}
30
31impl ReferenceTenancy {
32 /// Builds the reference tenancy over one cluster and shared index, served at
33 /// `endpoint` (the cluster's base URL, reported as part of the placement
34 /// result so the sink can pool it).
35 #[must_use]
36 pub fn new(cluster: ClusterId, index: IndexName, endpoint: impl Into<String>) -> Self {
37 Self {
38 cluster,
39 index,
40 endpoint: endpoint.into(),
41 }
42 }
43}
44
45impl TenancySpi for ReferenceTenancy {
46 fn resolve_partition(
47 &self,
48 ctx: &osproxy_spi::RequestCtx<'_>,
49 body: BodyDoc<'_>,
50 ) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
51 // Ingest carries the partition in the body; by-id reads have no body, so
52 // they carry it in a header set by the caller (or an auth gateway).
53 let spec = PartitionKeySpec::AnyOf(vec![
54 PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
55 PartitionKeySpec::Header(TENANT_HEADER.to_owned()),
56 ]);
57 osproxy_tenancy::resolve_partition_spec(&spec, ctx, body)
58 }
59
60 fn doc_id_rule(&self) -> Option<DocIdRule> {
61 Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
62 }
63
64 fn injected_fields(&self) -> Vec<InjectedField> {
65 vec![InjectedField::new(
66 FieldName::from(TENANT_FIELD),
67 InjectedValue::PartitionId,
68 )]
69 }
70
71 // `sensitive_fields` is left at the deny-by-default `all_sensitive`: this
72 // tenancy carries real tenant payloads, so every value is redacted unless a
73 // future revision allow-lists a known-safe field.
74
75 fn cluster_endpoint(&self, cluster: &ClusterId) -> Option<String> {
76 // The cursor-affinity path routes by cluster id with no placement; resolve
77 // its endpoint here (this reference tenancy has exactly one cluster).
78 (cluster == &self.cluster).then(|| self.endpoint.clone())
79 }
80
81 async fn placement_for(&self, _partition: &PartitionId) -> Result<PlacementAt, SpiError> {
82 // Every partition resolves to the same shared index. A constant epoch:
83 // this reference tenancy has no migration (the epoch story is exercised
84 // by the PlacementTable-backed implementations).
85 Ok(PlacementAt::new(
86 Placement::SharedIndex {
87 cluster: self.cluster.clone(),
88 index: self.index.clone(),
89 inject: self.injected_fields(),
90 },
91 Epoch::new(1),
92 )
93 .with_endpoint(self.endpoint.clone()))
94 }
95}