use osproxy_core::{ClusterId, Epoch, FieldName, IndexName, PartitionId};
use osproxy_spi::{
BodyDoc, DocIdRule, IdTemplate, InjectedField, InjectedValue, JsonPath, PartitionKeySpec,
Placement, PlacementAt, SpiError, TenancySpi,
};
const TENANT_FIELD: &str = "_tenant";
const TENANT_HEADER: &str = "x-tenant";
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum PlacementMode {
#[default]
SharedIndex,
DedicatedIndex,
DedicatedCluster,
}
#[derive(Debug)]
pub struct ReferenceTenancy {
cluster: ClusterId,
index: IndexName,
endpoint: String,
mode: PlacementMode,
}
impl ReferenceTenancy {
#[must_use]
pub fn new(cluster: ClusterId, index: IndexName, endpoint: impl Into<String>) -> Self {
Self {
cluster,
index,
endpoint: endpoint.into(),
mode: PlacementMode::SharedIndex,
}
}
#[must_use]
pub fn with_placement_mode(mut self, mode: PlacementMode) -> Self {
self.mode = mode;
self
}
}
impl TenancySpi for ReferenceTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
let spec = PartitionKeySpec::AnyOf(vec![
PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
PartitionKeySpec::Header(TENANT_HEADER.to_owned()),
]);
osproxy_tenancy::resolve_partition_spec(&spec, ctx, body)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
match self.mode {
PlacementMode::SharedIndex => {
Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
}
PlacementMode::DedicatedIndex | PlacementMode::DedicatedCluster => None,
}
}
fn injected_fields(&self) -> Vec<InjectedField> {
match self.mode {
PlacementMode::SharedIndex => vec![InjectedField::new(
FieldName::from(TENANT_FIELD),
InjectedValue::PartitionId,
)],
PlacementMode::DedicatedIndex | PlacementMode::DedicatedCluster => Vec::new(),
}
}
fn cluster_endpoint(&self, cluster: &ClusterId) -> Option<String> {
(cluster == &self.cluster).then(|| self.endpoint.clone())
}
async fn placement_for(&self, partition: &PartitionId) -> Result<PlacementAt, SpiError> {
let placement = match self.mode {
PlacementMode::SharedIndex => Placement::SharedIndex {
cluster: self.cluster.clone(),
index: self.index.clone(),
inject: self.injected_fields(),
},
PlacementMode::DedicatedIndex => Placement::DedicatedIndex {
cluster: self.cluster.clone(),
index: IndexName::from(format!("{}-{}", self.index.as_str(), partition.as_str())),
},
PlacementMode::DedicatedCluster => Placement::DedicatedCluster {
cluster: self.cluster.clone(),
},
};
Ok(PlacementAt::new(placement, Epoch::new(1)).with_endpoint(self.endpoint.clone()))
}
}