#![allow(clippy::unwrap_used, clippy::expect_used)]
use osproxy_core::{
ClusterId, EndpointKind, Epoch, ErrorCode, FieldName, IndexName, PartitionId, RequestId,
};
use osproxy_engine::{Pipeline, RequestError};
use osproxy_sink::{
CountOutcome, MemorySink, ReadOp, ReadOutcome, Reader, SearchOp, SearchOutcome, Sink,
SinkError, WriteAck, WriteBatch,
};
use osproxy_spi::{
BodyDoc, DocIdRule, HeaderView, HttpMethod, IdTemplate, InjectedField, InjectedValue, JsonPath,
PartitionKeySpec, Placement, PlacementAt, Principal, Protocol, RequestCtx, SensitivitySpec,
SpiError, TenancySpi,
};
use osproxy_tenancy::TenancyRouter;
#[derive(Clone, Copy)]
enum Placed {
Ok,
Missing,
BackendDown,
}
#[derive(Clone, Copy, Debug)]
enum Fault {
Reset,
Timeout,
Upstream5xx,
Upstream4xx,
StaleEpoch,
}
impl Fault {
fn sink_error(self) -> SinkError {
match self {
Fault::Reset => SinkError::Transport {
kind: "connection reset",
},
Fault::Timeout => SinkError::Transport {
kind: "upstream timeout",
},
Fault::Upstream5xx => SinkError::Upstream {
status: 503,
retryable: true,
},
Fault::Upstream4xx => SinkError::Upstream {
status: 400,
retryable: false,
},
Fault::StaleEpoch => SinkError::StaleEpoch {
stamped: Epoch::new(1),
current: Epoch::new(2),
},
}
}
fn retryable(self) -> bool {
!matches!(self, Fault::Upstream4xx)
}
fn expected_code(self) -> ErrorCode {
match self {
Fault::StaleEpoch => ErrorCode::StaleEpoch,
_ => ErrorCode::UpstreamFailed,
}
}
const ALL: [Fault; 5] = [
Fault::Reset,
Fault::Timeout,
Fault::Upstream5xx,
Fault::Upstream4xx,
Fault::StaleEpoch,
];
}
struct FaultTenancy {
placed: Placed,
}
impl TenancySpi for FaultTenancy {
fn resolve_partition(
&self,
ctx: &osproxy_spi::RequestCtx<'_>,
body: BodyDoc<'_>,
) -> Result<osproxy_core::PartitionId, osproxy_spi::SpiError> {
osproxy_tenancy::resolve_partition_spec(
&PartitionKeySpec::BodyField(JsonPath::new("tenant_id")),
ctx,
body,
)
}
fn doc_id_rule(&self) -> Option<DocIdRule> {
Some(DocIdRule::new(IdTemplate::new("{partition}:{body.id}")).with_routing(true))
}
fn injected_fields(&self) -> Vec<InjectedField> {
vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)]
}
fn sensitive_fields(&self) -> SensitivitySpec {
SensitivitySpec::none()
}
async fn placement_for(&self, partition: &PartitionId) -> Result<PlacementAt, SpiError> {
match self.placed {
Placed::Ok => Ok(PlacementAt::new(
Placement::SharedIndex {
cluster: ClusterId::from("eu-1"),
index: IndexName::from("shared"),
inject: vec![InjectedField::new(
FieldName::from("_tenant"),
InjectedValue::PartitionId,
)],
},
Epoch::new(1),
)),
Placed::Missing => Err(SpiError::PlacementMissing {
partition: partition.clone(),
}),
Placed::BackendDown => Err(SpiError::PlacementBackend { retryable: true }),
}
}
}
struct FaultSink {
fault: Option<Fault>,
inner: MemorySink,
}
impl FaultSink {
fn new(fault: Option<Fault>) -> Self {
Self {
fault,
inner: MemorySink::new(),
}
}
}
impl Sink for FaultSink {
async fn write(&self, batch: WriteBatch) -> Result<WriteAck, SinkError> {
match self.fault {
Some(f) => Err(f.sink_error()),
None => self.inner.write(batch).await,
}
}
}
impl Reader for FaultSink {
async fn get(&self, op: ReadOp) -> Result<ReadOutcome, SinkError> {
match self.fault {
Some(f) => Err(f.sink_error()),
None => self.inner.get(op).await,
}
}
async fn search(&self, op: SearchOp) -> Result<SearchOutcome, SinkError> {
match self.fault {
Some(f) => Err(f.sink_error()),
None => self.inner.search(op).await,
}
}
async fn count(&self, op: SearchOp) -> Result<CountOutcome, SinkError> {
match self.fault {
Some(f) => Err(f.sink_error()),
None => self.inner.count(op).await,
}
}
}
const GOOD_BODY: &[u8] = br#"{"tenant_id":"acme","id":1}"#;
async fn ingest(
placed: Placed,
fault: Option<Fault>,
body: &[u8],
) -> (
Result<(), RequestError>,
RequestId,
Pipeline<TenancyRouter<FaultTenancy>, FaultSink>,
) {
let pipeline = Pipeline::new(
TenancyRouter::new(FaultTenancy { placed }),
FaultSink::new(fault),
);
let principal = Principal::new(osproxy_core::PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers: Vec<(String, String)> = vec![];
let ctx = RequestCtx::new(
&principal,
&rid,
HttpMethod::Put,
EndpointKind::IngestDoc,
Protocol::Http1,
"orders",
HeaderView::new(&headers),
body,
);
let result = pipeline.handle(&ctx).await.map(|_| ());
(result, rid, pipeline)
}
fn assert_explain_complete(
pipeline: &Pipeline<TenancyRouter<FaultTenancy>, FaultSink>,
rid: &RequestId,
) {
let explain = pipeline.explain(rid).expect("explain retained");
assert_eq!(explain["outcome"], "error");
let err = &explain["error"];
assert!(
err["code"].as_str().is_some_and(|c| !c.is_empty()),
"code: {explain}"
);
assert!(err["retryable"].as_bool().is_some(), "retryable: {explain}");
assert!(
err["remediation"].as_str().is_some_and(|r| !r.is_empty()),
"remediation: {explain}"
);
}
#[tokio::test]
async fn every_upstream_fault_is_typed_classified_and_never_panics() {
for fault in Fault::ALL {
let (result, rid, pipeline) = ingest(Placed::Ok, Some(fault), GOOD_BODY).await;
let err = result.expect_err(&format!("{fault:?} should fail the request"));
assert_eq!(err.code(), fault.expected_code(), "{fault:?} wrong code");
assert_eq!(
err.retryable(),
fault.retryable(),
"{fault:?} misclassified retryable"
);
assert_explain_complete(&pipeline, &rid);
}
}
#[tokio::test]
async fn routing_faults_are_typed_and_classified() {
let (missing, rid_m, p_m) = ingest(Placed::Missing, None, GOOD_BODY).await;
let err = missing.expect_err("missing placement fails");
assert_eq!(err.code(), ErrorCode::PlacementMissing);
assert!(!err.retryable());
assert_explain_complete(&p_m, &rid_m);
let (down, rid_d, p_d) = ingest(Placed::BackendDown, None, GOOD_BODY).await;
let err = down.expect_err("backend down fails");
assert_eq!(err.code(), ErrorCode::PlacementBackendUnavailable);
assert!(err.retryable());
assert_explain_complete(&p_d, &rid_d);
}
#[tokio::test]
async fn malformed_bodies_never_panic_and_stay_typed() {
let cases: [&[u8]; 4] = [
b"", b"not json at all", br#"{"id":1}"#, br#"{"tenant_id":"acme""#, ];
for body in cases {
let (result, rid, pipeline) = ingest(Placed::Ok, None, body).await;
let err = result.expect_err(&format!("malformed body should fail: {body:?}"));
assert!(
!err.retryable(),
"a malformed body is not retryable: {body:?}"
);
assert_explain_complete(&pipeline, &rid);
}
}
#[tokio::test]
async fn the_whole_catalogue_resolves_to_a_typed_outcome() {
for placed in [Placed::Ok, Placed::Missing, Placed::BackendDown] {
for fault in Fault::ALL.map(Some).into_iter().chain([None]) {
let (result, rid, pipeline) = ingest(placed, fault, GOOD_BODY).await;
if result.is_err() {
assert_explain_complete(&pipeline, &rid);
}
}
}
}