#![allow(clippy::unwrap_used)]
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use osproxy_core::{
ClusterId, EndpointKind, ErrorCode, FieldName, IndexName, PartitionId, PrincipalId, RequestId,
};
use osproxy_engine::{Pipeline, PipelineResponse, RequestError, RetryPolicy};
use osproxy_sink::MemorySink;
use osproxy_spi::{
BodyDoc, HeaderView, HttpMethod, InjectedField, InjectedValue, JsonPath, PartitionKeySpec,
Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec, SpiError, TenancySpi,
};
use osproxy_tenancy::{PlacementTable, TenancyRouter};
use serde_json::json;
struct FlakyTenancy {
table: Arc<PlacementTable>,
fail_first: u32,
calls: AtomicU32,
}
impl TenancySpi for FlakyTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::AnyOf(vec![
PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
PartitionKeySpec::Header("x-tenant".to_owned()),
]),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<osproxy_spi::DocIdRule> {
Some(osproxy_spi::DocIdRule::new(osproxy_spi::IdTemplate::new(
"{partition}:{body.id}",
)))
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, partition: &PartitionId) -> Result<PlacementAt, SpiError> {
if self.calls.fetch_add(1, Ordering::SeqCst) < self.fail_first {
return Err(SpiError::PlacementBackend { retryable: true });
}
self.table
.get(partition)
.ok_or_else(|| SpiError::PlacementMissing {
partition: partition.clone(),
})
}
}
fn pipeline(fail_first: u32) -> Pipeline<TenancyRouter<FlakyTenancy>, MemorySink> {
let table = Arc::new(PlacementTable::new());
table.set(
PartitionId::from("acme"),
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from("orders-shared"),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
},
);
Pipeline::new(
TenancyRouter::new(FlakyTenancy {
table,
fail_first,
calls: AtomicU32::new(0),
}),
MemorySink::new(),
)
.with_retry_policy(RetryPolicy {
max_attempts: 3,
base_backoff: Duration::ZERO,
max_backoff: Duration::ZERO,
})
}
async fn ingest(
p: &Pipeline<TenancyRouter<FlakyTenancy>, MemorySink>,
) -> Result<PipelineResponse, RequestError> {
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers: Vec<(String, String)> = vec![];
let body = serde_json::to_vec(&json!({ "tenant_id": "acme", "id": 7, "msg": "hi" })).unwrap();
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::IngestDoc,
Protocol::Http1,
"orders-logical",
HeaderView::new(&headers),
&body,
);
p.handle(&ctx).await
}
#[tokio::test]
async fn a_transient_backend_blip_is_retried_and_the_write_succeeds() {
let p = pipeline(2);
let resp = ingest(&p).await.unwrap();
assert!(resp.status >= 200 && resp.status < 300);
assert_eq!(
p.sink().recorded().len(),
1,
"the write committed after retry"
);
}
#[tokio::test]
async fn a_persistently_unavailable_backend_surfaces_a_retryable_error() {
let p = pipeline(u32::MAX);
let err = ingest(&p).await.unwrap_err();
assert_eq!(err.code(), ErrorCode::PlacementBackendUnavailable);
assert!(err.retryable());
assert!(p.sink().recorded().is_empty(), "nothing committed");
}
#[tokio::test]
async fn the_mget_per_item_resolve_retries_a_transient_blip() {
let p = pipeline(2);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![("x-tenant".to_owned(), "acme".to_owned())];
let body = serde_json::to_vec(&json!({ "docs": [{ "_id": "7" }] })).unwrap();
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Post,
EndpointKind::MultiGet,
Protocol::Http1,
"orders-logical",
HeaderView::new(&headers),
&body,
);
let resp = p.handle(&ctx).await.unwrap();
let doc: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
let entry = &doc["docs"][0];
assert_ne!(
entry["error"]["type"], "placement_missing",
"the per-item resolve retried the blip rather than failing the item: {entry}"
);
}