osproxy-engine 1.0.0

Pipeline orchestration: auth -> resolve -> rewrite -> sink -> reverse.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//! Async fan-out write-mode tests (`docs/04` ยง9). Split from `pipeline_tests.rs`;
//! shares that module's `pipeline()`/`ctx()` harness via `use super::*`.
//
// JUSTIFY(file-length): one cohesive suite for the async write mode, single-doc,
// bulk, and delete-by-query all exercise the same `RecordingQueue` + `header`
// scaffolding defined here; splitting by sub-path would duplicate that harness
// across files (and the shared `pipeline()`/`ctx()` is reachable only as a
// sibling module). Kept together as the one place the mode's behavior is proven.

use super::*;
use crate::asyncwrite::{QueueError, QueuedWrite, WriteQueue};
use osproxy_core::{EndpointKind, PrincipalId, RequestId};
use osproxy_spi::Principal;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};

/// A recording queue for tests: captures every enqueued write, and can be told
/// to refuse so the enqueue-failure path is exercised.
#[derive(Default)]
struct RecordingQueue {
    writes: Mutex<Vec<QueuedWrite>>,
    fail: bool,
}

impl WriteQueue for RecordingQueue {
    fn enabled(&self) -> bool {
        true
    }
    fn enqueue<'a>(
        &'a self,
        write: QueuedWrite,
    ) -> Pin<Box<dyn Future<Output = Result<(), QueueError>> + Send + 'a>> {
        Box::pin(async move {
            if self.fail {
                return Err(QueueError {
                    reason: "broker unavailable",
                });
            }
            self.writes
                .lock()
                .unwrap_or_else(std::sync::PoisonError::into_inner)
                .push(write);
            Ok(())
        })
    }
}

fn header(name: &str, value: &str) -> (String, String) {
    (name.to_owned(), value.to_owned())
}

#[tokio::test]
async fn async_ingest_enqueues_and_returns_202_without_touching_the_sink() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async")];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":7}"#,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 202);
    let body = String::from_utf8(resp.body).unwrap();
    assert!(body.contains(r#""status":"accepted""#), "{body}");
    assert!(body.contains(r#""result":"queued""#), "{body}");
    // op id defaults to the request id when no X-Op-Id is supplied.
    assert!(body.contains(r#""op_id":"r""#), "{body}");

    // The op was durably enqueued, and never forwarded to the upstream sink.
    let writes = queue.writes.lock().unwrap();
    assert_eq!(writes.len(), 1);
    assert_eq!(writes[0].partition_key, "acme");
    assert_eq!(writes[0].op_id, "r");
    assert!(
        p.sink().recorded().is_empty(),
        "sync sink must stay untouched"
    );
}

#[tokio::test]
async fn async_request_with_no_queue_is_refused_422() {
    // The default pipeline has NoQueue: async must be refused, never dropped.
    let p = pipeline();
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async")];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":7}"#,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 422);
}

#[tokio::test]
async fn async_client_supplied_op_id_is_honored_and_invalid_falls_back() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));

    // A valid client op id rides through to the queued write.
    let rid = RequestId::from("r1");
    let headers = vec![
        header("x-write-mode", "async"),
        header("x-op-id", "client-key-1"),
    ];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":1}"#,
    );
    p.handle(&c).await.unwrap();

    // A malformed op id is ignored in favor of the proxy request id.
    let rid2 = RequestId::from("r2");
    let headers2 = vec![
        header("x-write-mode", "async"),
        header("x-op-id", "bad key"),
    ];
    let c2 = ctx(
        &principal,
        &rid2,
        &headers2,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":2}"#,
    );
    p.handle(&c2).await.unwrap();

    let writes = queue.writes.lock().unwrap();
    assert_eq!(writes[0].op_id, "client-key-1");
    assert_eq!(writes[1].op_id, "r2");
}

#[tokio::test]
async fn async_enqueue_failure_is_reported_503() {
    let queue = Arc::new(RecordingQueue {
        fail: true,
        ..Default::default()
    });
    let p = pipeline().with_write_queue(queue as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async")];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":7}"#,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 503);
    let body = String::from_utf8(resp.body).unwrap();
    assert!(body.contains(r#""op_id":"r""#), "{body}");
}

#[tokio::test]
async fn sync_remains_the_default_without_a_header() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":7}"#,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 201);
    assert!(
        queue.writes.lock().unwrap().is_empty(),
        "sync must not enqueue"
    );
}

#[tokio::test]
async fn baseline_async_makes_fan_out_the_default() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline()
        .with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>)
        .with_baseline_write_mode(crate::asyncwrite::WriteMode::Async);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    // No header: the deployment baseline selects async.
    let headers = vec![];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":7}"#,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 202);
    assert_eq!(queue.writes.lock().unwrap().len(), 1);

    // ...and an explicit per-request sync header overrides the baseline.
    let rid2 = RequestId::from("r2");
    let headers2 = vec![header("x-write-mode", "sync")];
    let c2 = ctx(
        &principal,
        &rid2,
        &headers2,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":8}"#,
    );
    let resp2 = p.handle(&c2).await.unwrap();
    assert_eq!(resp2.status, 201);
    assert_eq!(
        queue.writes.lock().unwrap().len(),
        1,
        "sync override must not enqueue"
    );
}

#[tokio::test]
async fn async_rejects_optimistic_concurrency_with_400() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async")];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":7}"#,
    )
    .with_query(Some("if_seq_no=3&if_primary_term=1"));
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 400);
    let body = String::from_utf8(resp.body).unwrap();
    assert!(body.contains("optimistic concurrency"), "{body}");
    assert!(
        queue.writes.lock().unwrap().is_empty(),
        "rejected op must not enqueue"
    );
}

#[tokio::test]
async fn async_rejects_scripted_update_path_with_400() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async")];
    // The canonical OpenSearch update path is `/{index}/_update/{id}`.
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestDoc,
        br#"{"tenant_id":"acme","id":7,"doc":{"x":1}}"#,
    )
    .with_path("/orders/_update/7");
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 400);
    assert!(queue.writes.lock().unwrap().is_empty());
}

// --- async bulk (docs/04 ยง9) ----------------------------------------------

const ASYNC_BULK: &[u8] = b"{\"index\":{\"_id\":\"1\"}}\n{\"tenant_id\":\"acme\",\"id\":1}\n{\"update\":{\"_id\":\"acme:9\"}}\n{\"doc\":{\"x\":1}}\n{\"delete\":{\"_id\":\"acme:2\"}}\n";

#[tokio::test]
async fn async_bulk_enqueues_each_item_with_a_per_item_op_id() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestBulk,
        ASYNC_BULK,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 200);

    let body: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
    let items = body["items"].as_array().unwrap();
    assert_eq!(items.len(), 3);
    // index โ†’ queued with op id "r:0"
    assert_eq!(items[0]["index"]["status"], 202);
    assert_eq!(items[0]["index"]["result"], "queued");
    assert_eq!(items[0]["index"]["op_id"], "r:0");
    // update โ†’ rejected 400, not enqueued
    assert_eq!(items[1]["update"]["status"], 400);
    // delete โ†’ queued with op id "r:2"
    assert_eq!(items[2]["delete"]["status"], 202);
    assert_eq!(items[2]["delete"]["op_id"], "r:2");
    assert_eq!(body["errors"], true); // the rejected update sets the flag

    // Only the two honorable ops were enqueued, keyed by their partition.
    let writes = queue.writes.lock().unwrap();
    assert_eq!(writes.len(), 2);
    assert!(writes.iter().all(|w| w.partition_key == "acme"));
    assert_eq!(writes[0].op_id, "r:0");
    assert_eq!(writes[1].op_id, "r:2");
}

#[tokio::test]
async fn async_bulk_with_no_queue_is_refused_422() {
    let p = pipeline(); // default NoQueue
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::IngestBulk,
        ASYNC_BULK,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 422);
}

#[tokio::test]
async fn async_bulk_rejects_a_per_item_optimistic_concurrency_precondition() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline().with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>);
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
    // First item carries if_seq_no (CAS) โ†’ rejected; second is plain โ†’ enqueued.
    let body = b"{\"index\":{\"_id\":\"1\",\"if_seq_no\":3,\"if_primary_term\":1}}\n{\"tenant_id\":\"acme\",\"id\":1}\n{\"index\":{}}\n{\"tenant_id\":\"acme\",\"id\":2}\n";
    let c = ctx(&principal, &rid, &headers, EndpointKind::IngestBulk, body);
    let resp = p.handle(&c).await.unwrap();
    let doc: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
    let items = doc["items"].as_array().unwrap();
    assert_eq!(
        items[0]["index"]["status"], 400,
        "CAS precondition rejected"
    );
    assert_eq!(items[1]["index"]["status"], 202, "plain item queued");
    assert_eq!(doc["errors"], true);
    // Only the honorable item was enqueued, the precondition was never dropped.
    assert_eq!(queue.writes.lock().unwrap().len(), 1);
}

// --- async _delete_by_query expansion (docs/04 ยง9) ------------------------

/// Stores two docs (sync), then runs an async DBQ that expands to one enqueued
/// delete per match, the sink is never asked to delete; the queue is.
#[tokio::test]
async fn async_delete_by_query_expands_to_one_enqueued_delete_per_match() {
    let queue = Arc::new(RecordingQueue::default());
    let p = pipeline()
        .with_write_queue(Arc::clone(&queue) as Arc<dyn WriteQueue>)
        .with_delete_by_query_expansion(true);
    let principal = Principal::new(PrincipalId::from("svc"));

    // Seed two docs synchronously (physical ids acme:1, acme:2).
    for id in [1, 2] {
        let rid = RequestId::from("seed");
        let body = format!(r#"{{"tenant_id":"acme","id":{id}}}"#);
        let c = ctx(
            &principal,
            &rid,
            &[],
            EndpointKind::IngestDoc,
            body.as_bytes(),
        );
        p.handle(&c).await.unwrap();
    }
    assert!(
        queue.writes.lock().unwrap().is_empty(),
        "sync seeds must not enqueue"
    );

    // Async DBQ.
    let rid = RequestId::from("r");
    let headers = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
    let c = ctx(
        &principal,
        &rid,
        &headers,
        EndpointKind::DeleteByQuery,
        br#"{"query":{"match_all":{}}}"#,
    );
    let resp = p.handle(&c).await.unwrap();
    assert_eq!(resp.status, 200);
    let body: serde_json::Value = serde_json::from_slice(&resp.body).unwrap();
    assert_eq!(body["total"], 2);
    assert_eq!(body["deleted"], 2);
    assert_eq!(body["version_conflicts"], 0);

    // Two concrete deletes were enqueued (not dispatched to the sink), keyed by
    // partition, with per-match op ids.
    let writes = queue.writes.lock().unwrap();
    assert_eq!(writes.len(), 2);
    let mut ids: Vec<String> = writes
        .iter()
        .filter_map(|w| match &w.batch.ops()[0].doc {
            osproxy_sink::DocOp::Delete { id, .. } => Some(id.clone()),
            _ => None,
        })
        .collect();
    ids.sort();
    assert_eq!(ids, vec!["acme:1".to_owned(), "acme:2".to_owned()]);
    assert_eq!(writes[0].op_id, "r:0");
    assert_eq!(writes[1].op_id, "r:1");
}

#[tokio::test]
async fn delete_by_query_is_rejected_unless_async_and_expansion_enabled() {
    let queue = Arc::new(RecordingQueue::default());
    let principal = Principal::new(PrincipalId::from("svc"));
    let rid = RequestId::from("r");
    let dbq = br#"{"query":{"match_all":{}}}"#;
    let tenant = vec![header("x-tenant", "acme")];
    let h = vec![header("x-write-mode", "async"), header("x-tenant", "acme")];
    let q = || Arc::clone(&queue) as Arc<dyn WriteQueue>;

    // Sync (no async header) though expansion is on โ†’ 400.
    let p = pipeline()
        .with_write_queue(q())
        .with_delete_by_query_expansion(true);
    let c = ctx(&principal, &rid, &tenant, EndpointKind::DeleteByQuery, dbq);
    assert_eq!(p.handle(&c).await.unwrap().status, 400);

    // Async but expansion off โ†’ 400.
    let p = pipeline().with_write_queue(q());
    let c = ctx(&principal, &rid, &h, EndpointKind::DeleteByQuery, dbq);
    assert_eq!(p.handle(&c).await.unwrap().status, 400);

    // Async + expansion on but no queue โ†’ 422.
    let p = pipeline().with_delete_by_query_expansion(true);
    let c = ctx(&principal, &rid, &h, EndpointKind::DeleteByQuery, dbq);
    assert_eq!(p.handle(&c).await.unwrap().status, 422);

    assert!(
        queue.writes.lock().unwrap().is_empty(),
        "no deletes enqueued on rejection"
    );
}