#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use osproxy_core::{
ClusterId, EndpointKind, Epoch, ErrorCode, FieldName, IndexName, PartitionId, PrincipalId,
RequestId,
};
use osproxy_engine::{Pipeline, PipelineResponse, RequestError};
use osproxy_sink::MemorySink;
use osproxy_spi::{
BodyDoc, DocIdRule, HeaderView, HttpMethod, IdTemplate, InjectedField, InjectedValue, JsonPath,
PartitionKeySpec, Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec,
SpiError, TenancySpi,
};
use osproxy_tenancy::{PlacementTable, TenancyRouter, WriteAdmission};
use serde_json::json;
struct MigratingTenancy {
table: Arc<PlacementTable>,
}
impl TenancySpi for MigratingTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, partition: &PartitionId) -> Result<PlacementAt, SpiError> {
self.table
.get(partition)
.ok_or_else(|| SpiError::PlacementMissing {
partition: partition.clone(),
})
}
async fn admit_write(&self, partition: &PartitionId, epoch: Epoch) -> bool {
self.table.admit_write(partition, epoch) == WriteAdmission::Admit
}
}
fn shared_on(cluster: &str) -> Placement {
Placement::SharedIndex {
cluster: ClusterId::from(cluster),
index: IndexName::from("orders-shared"),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
}
}
async fn ingest(
pipeline: &Pipeline<TenancyRouter<MigratingTenancy>, MemorySink>,
rid: &str,
) -> Result<PipelineResponse, RequestError> {
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from(rid);
let headers: Vec<(String, String)> = vec![];
let body = serde_json::to_vec(&json!({ "tenant_id": "acme", "id": 1, "msg": "hi" })).unwrap();
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Put,
EndpointKind::IngestDoc,
Protocol::Http1,
"orders-logical",
HeaderView::new(&headers),
&body,
);
pipeline.handle(&ctx).await
}
#[tokio::test]
async fn ingest_is_gated_through_the_migration_lifecycle() {
let table = Arc::new(PlacementTable::new());
let p = PartitionId::from("acme");
table.set(p.clone(), shared_on("eu-1"));
let pipeline = Pipeline::new(
TenancyRouter::new(MigratingTenancy {
table: Arc::clone(&table),
}),
MemorySink::new(),
);
assert!(ingest(&pipeline, "active").await.unwrap().status >= 200);
assert_eq!(pipeline.sink().recorded().len(), 1);
table.begin_migration(&p, shared_on("us-1")).unwrap();
assert!(ingest(&pipeline, "draining").await.is_ok());
assert_eq!(pipeline.sink().recorded().len(), 2);
let doc = pipeline.explain(&RequestId::from("draining")).unwrap();
assert_eq!(doc["spans"]["spi.resolve"]["migration"], "draining");
table.enter_cutover(&p).unwrap();
let err = ingest(&pipeline, "cutover").await.unwrap_err();
assert_eq!(err.code(), ErrorCode::StaleEpoch);
assert!(err.retryable(), "stale-epoch must be retryable");
assert_eq!(
pipeline.sink().recorded().len(),
2,
"no write during cutover"
);
table.complete_migration(&p).unwrap();
assert!(ingest(&pipeline, "flipped").await.is_ok());
let recorded = pipeline.sink().recorded();
assert_eq!(recorded.len(), 3);
assert_eq!(
recorded[2].ops()[0].target.cluster,
ClusterId::from("us-1"),
"post-migration write lands on the new cluster"
);
}