use super::*;
use crate::asyncwrite::{QueueError, QueuedWrite, WriteQueue};
use osproxy_core::{EndpointKind, PrincipalId, RequestId};
use osproxy_spi::Principal;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct RecordingQueue {
writes: Mutex<Vec<QueuedWrite>>,
fail: bool,
}
impl WriteQueue for RecordingQueue {
fn enabled(&self) -> bool {
true
}
fn enqueue<'a>(
&'a self,
write: QueuedWrite,
) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send + 'a>> {
Box::pin(async move {
if self.fail {
return Err(QueueError {
reason: "broker unavailable",
});
}
self.writes
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push(write);
Ok(())
})
}
}
fn header(name: &str, value: &str) -> (String, String) {
(name.to_owned(), value.to_owned())
}
#[tokio::test]
async fn async_ingest_enqueues_and_returns_202_without_touching_the_sink() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":7}"#,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 202);
let body = String::from_utf8(resp.body).unwrap();
assert!(body.contains(r#""status":"accepted""#), "{body}");
assert!(body.contains(r#""result":"queued""#), "{body}");
assert!(body.contains(r#""op_id":"r""#), "{body}");
let writes = queue.writes.lock().unwrap();
assert_eq!(writes.len(), 1);
assert_eq!(writes[0].partition_key, "acme");
assert_eq!(writes[0].op_id, "r");
assert!(
p.sink().recorded().is_empty(),
"sync sink must stay untouched"
);
}
#[tokio::test]
async fn async_request_with_no_queue_is_refused_422() {
let p = pipeline();
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":7}"#,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 422);
}
#[tokio::test]
async fn async_client_supplied_op_id_is_honored_and_invalid_falls_back() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r1");
let headers = vec![
header("x-write-mode", "async"),
header("x-op-id", "client-key-1"),
];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":1}"#,
);
p.handle(&c).await.unwrap();
let rid2 = RequestId::from("r2");
let headers2 = vec![
header("x-write-mode", "async"),
header("x-op-id", "bad key"),
];
let c2 = ctx(
&principal,
&rid2,
&headers2,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":2}"#,
);
p.handle(&c2).await.unwrap();
let writes = queue.writes.lock().unwrap();
assert_eq!(writes[0].op_id, "client-key-1");
assert_eq!(writes[1].op_id, "r2");
}
#[tokio::test]
async fn async_enqueue_failure_is_reported_503() {
let queue = Arc::new(RecordingQueue {
fail: true,
..Default::default()
});
let p = pipeline().with_write_queue(queue as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":7}"#,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 503);
let body = String::from_utf8(resp.body).unwrap();
assert!(body.contains(r#""op_id":"r""#), "{body}");
}
#[tokio::test]
async fn sync_remains_the_default_without_a_header() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":7}"#,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 201);
assert!(
queue.writes.lock().unwrap().is_empty(),
"sync must not enqueue"
);
}
#[tokio::test]
async fn baseline_async_makes_fan_out_the_default() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline()
.with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>)
.with_baseline_write_mode(crate::asyncwrite::WriteMode::Async);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":7}"#,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 202);
assert_eq!(queue.writes.lock().unwrap().len(), 1);
let rid2 = RequestId::from("r2");
let headers2 = vec![header("x-write-mode", "sync")];
let c2 = ctx(
&principal,
&rid2,
&headers2,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":8}"#,
);
let resp2 = p.handle(&c2).await.unwrap();
assert_eq!(resp2.status, 201);
assert_eq!(
queue.writes.lock().unwrap().len(),
1,
"sync override must not enqueue"
);
}
#[tokio::test]
async fn async_rejects_optimistic_concurrency_with_400() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":7}"#,
)
.with_query(Some("if_seq_no=3&if_primary_term=1"));
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 400);
let body = String::from_utf8(resp.body).unwrap();
assert!(body.contains("optimistic concurrency"), "{body}");
assert!(
queue.writes.lock().unwrap().is_empty(),
"rejected op must not enqueue"
);
}
#[tokio::test]
async fn async_rejects_scripted_update_path_with_400() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestDoc,
br#"{"tenant_id":"acme","id":7,"doc":{"x":1}}"#,
)
.with_path("/orders/_update/7");
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 400);
assert!(queue.writes.lock().unwrap().is_empty());
}
const ASYNC_BULK: &[u8] = b"{\"index\":{\"_id\":\"1\"}}\n{\"tenant_id\":\"acme\",\"id\":1}\n{\"update\":{\"_id\":\"acme:9\"}}\n{\"doc\":{\"x\":1}}\n{\"delete\":{\"_id\":\"acme:2\"}}\n";
#[tokio::test]
async fn async_bulk_enqueues_each_item_with_a_per_item_op_id() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestBulk,
ASYNC_BULK,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 200);
let body: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
let items = body["items"].as_array().unwrap();
assert_eq!(items.len(), 3);
assert_eq!(items[0]["index"]["status"], 202);
assert_eq!(items[0]["index"]["result"], "queued");
assert_eq!(items[0]["index"]["op_id"], "r:0");
assert_eq!(items[1]["update"]["status"], 400);
assert_eq!(items[2]["delete"]["status"], 202);
assert_eq!(items[2]["delete"]["op_id"], "r:2");
assert_eq!(body["errors"], true);
let writes = queue.writes.lock().unwrap();
assert_eq!(writes.len(), 2);
assert!(writes.iter().all(|w| w.partition_key == "acme"));
assert_eq!(writes[0].op_id, "r:0");
assert_eq!(writes[1].op_id, "r:2");
}
#[tokio::test]
async fn async_bulk_with_no_queue_is_refused_422() {
let p = pipeline(); let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::IngestBulk,
ASYNC_BULK,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 422);
}
#[tokio::test]
async fn async_bulk_rejects_a_per_item_optimistic_concurrency_precondition() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
let body = b"{\"index\":{\"_id\":\"1\",\"if_seq_no\":3,\"if_primary_term\":1}}\n{\"tenant_id\":\"acme\",\"id\":1}\n{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":2}\n";
let c = ctx(&principal, &rid, &headers, EndpointKind::IngestBulk, body);
let resp = p.handle(&c).await.unwrap();
let doc: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
let items = doc["items"].as_array().unwrap();
assert_eq!(
items[0]["index"]["status"], 400,
"CAS precondition rejected"
);
assert_eq!(items[1]["index"]["status"], 202, "plain item queued");
assert_eq!(doc["errors"], true);
assert_eq!(queue.writes.lock().unwrap().len(), 1);
}
#[tokio::test]
async fn async_delete_by_query_expands_to_one_enqueued_delete_per_match() {
let queue = Arc::new(RecordingQueue::default());
let p = pipeline()
.with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>)
.with_delete_by_query_expansion(true);
let principal = Principal::new(PrincipalId::from("svc"));
for id in [1, 2] {
let rid = RequestId::from("seed");
let body = format!(r#"{{"tenant_id":"acme","id":{id}}}"#);
let c = ctx(
&principal,
&rid,
&[],
EndpointKind::IngestDoc,
body.as_bytes(),
);
p.handle(&c).await.unwrap();
}
assert!(
queue.writes.lock().unwrap().is_empty(),
"sync seeds must not enqueue"
);
let rid = RequestId::from("r");
let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
let c = ctx(
&principal,
&rid,
&headers,
EndpointKind::DeleteByQuery,
br#"{"query":{"match_all":{}}}"#,
);
let resp = p.handle(&c).await.unwrap();
assert_eq!(resp.status, 200);
let body: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
assert_eq!(body["total"], 2);
assert_eq!(body["deleted"], 2);
assert_eq!(body["version_conflicts"], 0);
let writes = queue.writes.lock().unwrap();
assert_eq!(writes.len(), 2);
let mut ids: Vec<String> = writes
.iter()
.filter_map(|w| match &w.batch.ops()[0].doc {
osproxy_sink::DocOp::Delete { id, .. } => Some(id.clone()),
_ => None,
})
.collect();
ids.sort();
assert_eq!(ids, vec!["acme:1".to_owned(), "acme:2".to_owned()]);
assert_eq!(writes[0].op_id, "r:0");
assert_eq!(writes[1].op_id, "r:1");
}
#[tokio::test]
async fn delete_by_query_is_rejected_unless_async_and_expansion_enabled() {
let queue = Arc::new(RecordingQueue::default());
let principal = Principal::new(PrincipalId::from("svc"));
let rid = RequestId::from("r");
let dbq = br#"{"query":{"match_all":{}}}"#;
let tenant = vec![header("x-tenant", "acme")];
let h = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
let q = || Arc::clone(&queue) as Arc<dyn WriteQueue>;
let p = pipeline()
.with_write_queue(q())
.with_delete_by_query_expansion(true);
let c = ctx(&principal, &rid, &tenant, EndpointKind::DeleteByQuery, dbq);
assert_eq!(p.handle(&c).await.unwrap().status, 400);
let p = pipeline().with_write_queue(q());
let c = ctx(&principal, &rid, &h, EndpointKind::DeleteByQuery, dbq);
assert_eq!(p.handle(&c).await.unwrap().status, 400);
let p = pipeline().with_delete_by_query_expansion(true);
let c = ctx(&principal, &rid, &h, EndpointKind::DeleteByQuery, dbq);
assert_eq!(p.handle(&c).await.unwrap().status, 422);
assert!(
queue.writes.lock().unwrap().is_empty(),
"no deletes enqueued on rejection"
);
}