osproxy-engine 1.0.2

Pipeline orchestration: auth -> resolve -> rewrite -> sink -> reverse.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
//! Bulk (`_bulk`) demux: the hard path (`docs/04` §3).
//!
//! A single NDJSON body may carry documents for **different partitions →
//! different targets**. We resolve each operation's partition (caching the
//! placement per partition for the request), demux the operations by target,
//! dispatch each target's sub-batch, then **re-interleave** the per-item results
//! in the body's original order, so the client sees a normal OpenSearch bulk
//! response with positional per-item status. A per-item failure (e.g. an
//! unresolved partition) is positioned in place; the bulk as a whole still
//! returns 200 with `errors: true`. The per-item preparation lives in
//! [`crate::bulkprep`]; this module owns the orchestration and the response.
//!
//! Memory is bounded (NFR-P7): a target's sub-batch is flushed as soon as it
//! reaches [`FLUSH_THRESHOLD`], so the transformed working set stays a bounded
//! multiple of the threshold rather than growing to the whole body.
//
// JUSTIFY(file-length): one cohesive bulk module, the sync (buffered), async
// fan-out, and streamed (ADR-014 stage 4) demuxes all share the same
// demux/flush/gate/re-interleave machinery and per-item response shaping.
// Splitting a variant into its own file would scatter that shared machinery or
// force it pub(crate) across files for no real separation.

use std::collections::HashMap;

use bytes::{Buf as _, BytesMut};
use futures_util::stream::StreamExt as _;
use http_body_util::BodyExt as _;
use osproxy_core::Target;
use osproxy_rewrite::{parse_bulk, parse_bulk_action, BulkAction, BulkItem, RewriteError};
use osproxy_sink::{ByteBody, DocOp, OpResult, Sink, SinkError, WriteAck, WriteBatch, WriteOp};
use osproxy_spi::RequestCtx;
use osproxy_tenancy::Router;

use crate::asyncwrite::{
    op_id_for, unavailable_response, unsupported_async, unsupported_response, QueuedWrite,
    WriteQueue,
};
use crate::bulkline::{BulkBody, Line};
use crate::bulkprep::{prepare, Prepared};
use crate::error::RequestError;
use crate::pipeline::PipelineResponse;

/// The largest a single target's sub-batch grows (in op count) before it is
/// flushed mid-stream, bounding the transformed working set held in memory (NFR-P7).
const FLUSH_THRESHOLD: usize = 256;

/// The largest a single target's buffered op **bytes** grow before a flush,
/// bounds the working set by size as well as count, so a handful of very large
/// documents flush early instead of holding up to [`FLUSH_THRESHOLD`] of them.
const BYTE_FLUSH_THRESHOLD: usize = 4 * 1024 * 1024;

/// The most per-target sub-batches dispatched at once in the final flush, so a
/// wide fan-out cannot open an unbounded number of upstream requests (NFR-P).
const MAX_DISPATCH_CONCURRENCY: usize = 8;

/// One target's buffered `(ordinal, prepared-op)` entries awaiting dispatch.
type Entries = Vec<(usize, Prepared)>;

/// Runs a `_bulk` request: parse, demux by target, dispatch, re-interleave.
///
/// # Errors
///
/// Returns [`RequestError::Rewrite`] only if the whole body is unparseable;
/// per-operation failures are reported positionally in the response, not as a
/// request error.
pub(crate) async fn ingest_bulk<R: Router, S: Sink>(
    router: &R,
    sink: &S,
    ctx: &RequestCtx<'_>,
    retry: crate::RetryPolicy,
    up_trace: Option<osproxy_core::TraceContext>,
) -> Result<PipelineResponse, RequestError> {
    let items = parse_bulk(ctx.body())?;
    let n = items.len();

    // Per-item response line (filled now for failures, on flush for the rest) and
    // the per-target demux buffers. A target flushes once it reaches
    // FLUSH_THRESHOLD, so the transformed working set stays bounded (NFR-P7).
    let mut lines: Vec<Option<Line>> = Vec::new();
    lines.resize_with(n, || None);
    let mut buffers: HashMap<Target, Entries> = HashMap::new();
    let mut sizes: HashMap<Target, usize> = HashMap::new();
    let mut cache = crate::bulkprep::ResolutionCache::new();

    for (ordinal, item) in items.into_iter().enumerate() {
        match prepare(router, ctx, &mut cache, item, retry, up_trace.as_ref()).await {
            Ok(p) => {
                buffer_and_flush(
                    router,
                    sink,
                    &mut buffers,
                    &mut sizes,
                    &mut lines,
                    ordinal,
                    p,
                )
                .await;
            }
            Err(fail) => lines[ordinal] = Some(fail.into_line()),
        }
    }

    flush_remaining(router, sink, buffers, &mut lines).await;
    render(&lines)
}

/// Serializes the positional response lines into the bulk body
/// `{"took":0,"errors":_,"items":[…]}`, straight to bytes with no `Value` tree.
/// `errors` is derived from the lines (any per-item error). Shared by the buffered,
/// streamed, and async fan-out paths.
fn render(lines: &[Option<Line>]) -> Result<PipelineResponse, RequestError> {
    let errors = lines.iter().flatten().any(Line::is_error);
    let body = serde_json::to_vec(&BulkBody {
        took: 0,
        errors,
        items: lines,
    })
    .map_err(|_| RequestError::Internal {
        reason: "serializing bulk response",
    })?;
    Ok(PipelineResponse {
        status: 200,
        body,
        content_type: None,
    })
}

/// The largest a single bulk line (one action or one source) may grow before the
/// streaming reader rejects the request, bounds the per-op buffer so one giant
/// line cannot exhaust memory even though the batch as a whole is streamed.
const MAX_LINE_BYTES: usize = 64 * 1024 * 1024;

/// Streams a `_bulk` request from the inbound body (ADR-014 stage 4): the NDJSON
/// is framed incrementally and each op is demuxed/dispatched as it is read, so the
/// **whole batch is never held**, only the bounded per-target flush buffers and
/// the response lines. Same re-interleaved response and per-item semantics as
/// [`ingest_bulk`]; only the source differs (a stream, not a buffered body).
///
/// # Errors
///
/// Returns [`RequestError`] if a line is unparseable or the body stream fails.
/// Unlike the buffered path (which parses the whole body before dispatching), a
/// mid-stream parse error surfaces after earlier ops were already applied, the
/// honest consequence of not buffering (mirrors a streaming bulk upstream).
pub(crate) async fn ingest_bulk_streamed<R: Router, S: Sink>(
    router: &R,
    sink: &S,
    ctx: &RequestCtx<'_>,
    body: ByteBody,
    retry: crate::RetryPolicy,
    up_trace: Option<osproxy_core::TraceContext>,
) -> Result<PipelineResponse, RequestError> {
    let mut reader = NdjsonReader::new(body);
    let mut lines: Vec<Option<Line>> = Vec::new();
    let mut buffers: HashMap<Target, Entries> = HashMap::new();
    let mut sizes: HashMap<Target, usize> = HashMap::new();
    let mut cache = crate::bulkprep::ResolutionCache::new();

    let mut ordinal = 0usize;
    while let Some(item) = reader.next_op().await? {
        let ord = ordinal;
        ordinal += 1;
        lines.push(None);
        match prepare(router, ctx, &mut cache, item, retry, up_trace.as_ref()).await {
            // Flush a target mid-stream once it reaches the count or byte threshold,
            // so the transformed working set stays bounded (NFR-P7), the same
            // backpressure as the buffered path, here over a live stream.
            Ok(p) => {
                buffer_and_flush(router, sink, &mut buffers, &mut sizes, &mut lines, ord, p).await;
            }
            Err(fail) => lines[ord] = Some(fail.into_line()),
        }
    }
    flush_remaining(router, sink, buffers, &mut lines).await;
    render(&lines)
}

/// An incremental NDJSON reader over a streamed body: pulls frames on demand and
/// yields one bulk op at a time, buffering only the current (and at most the next)
/// line. Blank lines are skipped, matching [`parse_bulk`].
///
/// The buffer is a [`BytesMut`]: consumed bytes are released with an O(1) cursor
/// advance (`split_to`), and the newline search resumes from `scan` rather than
/// rescanning, so framing a batch is linear in its size, never quadratic, even
/// when one frame carries many lines.
struct NdjsonReader {
    body: ByteBody,
    buf: BytesMut,
    /// How far into `buf` the newline search has already looked (no rescanning a
    /// prefix after a frame is appended).
    scan: usize,
    done: bool,
}

impl NdjsonReader {
    fn new(body: ByteBody) -> Self {
        Self {
            body,
            buf: BytesMut::new(),
            scan: 0,
            done: false,
        }
    }

    /// Reads the next op: an action line, plus a source line for verbs that carry
    /// one. `Ok(None)` at end of stream.
    async fn next_op(&mut self) -> Result<Option<BulkItem>, RequestError> {
        let Some(action_line) = self.next_line().await? else {
            return Ok(None);
        };
        // Parse the action line exactly once: `ParsedAction` carries the verb and
        // metadata, so finalizing the op below re-uses it rather than re-parsing
        // the action line's JSON (ADR-014 stage 4).
        let parsed = parse_bulk_action(&action_line).map_err(RequestError::from)?;
        let source = if parsed.has_source() {
            Some(
                self.next_line()
                    .await?
                    .ok_or_else(|| RequestError::from(RewriteError::MalformedBulkAction))?,
            )
        } else {
            None
        };
        parsed
            .into_item(source.as_deref())
            .map(Some)
            .map_err(RequestError::from)
    }

    /// Returns the next non-blank line (newline stripped), or `None` at EOF.
    async fn next_line(&mut self) -> Result<Option<BytesMut>, RequestError> {
        loop {
            if let Some(rel) = self.buf[self.scan..].iter().position(|&b| b == b'\n') {
                let nl = self.scan + rel;
                let mut line = self.buf.split_to(nl); // bytes before '\n' (O(1))
                self.buf.advance(1); // drop the '\n'
                self.scan = 0;
                if line.last() == Some(&b'\r') {
                    line.truncate(line.len() - 1);
                }
                if line.iter().all(u8::is_ascii_whitespace) {
                    continue;
                }
                return Ok(Some(line));
            }
            self.scan = self.buf.len(); // searched all of buf; resume here after a frame
            if self.done {
                if self.buf.iter().all(u8::is_ascii_whitespace) {
                    return Ok(None);
                }
                // Trailing line with no final newline: take the whole remainder.
                return Ok(Some(std::mem::take(&mut self.buf)));
            }
            if self.buf.len() > MAX_LINE_BYTES {
                // A client-caused over-cap line is a `413`, not an internal fault.
                return Err(RequestError::PayloadTooLarge {
                    reason: "bulk line exceeds the per-op size cap",
                });
            }
            match self.body.frame().await {
                Some(Ok(frame)) => {
                    if let Ok(data) = frame.into_data() {
                        self.buf.extend_from_slice(&data);
                    }
                }
                Some(Err(_)) => {
                    return Err(RequestError::Internal {
                        reason: "reading bulk body stream",
                    })
                }
                None => self.done = true,
            }
        }
    }
}

/// The async fan-out counterpart of [`ingest_bulk`] (`docs/04` §9): each item is
/// resolved/transformed exactly as the sync path, then **durably enqueued** for
/// downstream fan-out instead of dispatched, and reported positionally as
/// `202 queued` with a per-item `op_id` (`{batch_id}:{ordinal}`).
///
/// Whole-request refusals (no queue, or a query-level unsupported op) return the
/// generic envelope, never a partially-applied bulk. A per-item `update` is
/// rejected in place (`400`): a scripted/partial update is not honorable async.
///
/// # Errors
///
/// Returns [`RequestError::Rewrite`] only if the whole body is unparseable.
pub(crate) async fn ingest_bulk_async<R: Router>(
    router: &R,
    queue: &dyn WriteQueue,
    ctx: &RequestCtx<'_>,
    retry: crate::RetryPolicy,
    up_trace: Option<osproxy_core::TraceContext>,
) -> Result<PipelineResponse, RequestError> {
    let index = ctx.logical_index();
    // A query-level unsupported op (optimistic concurrency) refuses the whole
    // bulk; a missing queue refuses it too, never accepted-and-dropped.
    if let Some(reason) = unsupported_async(ctx) {
        return Ok(unsupported_response(reason, index));
    }
    if !queue.enabled() {
        return Ok(unavailable_response(index));
    }

    let items = parse_bulk(ctx.body())?;
    let batch_id = op_id_for(ctx, ctx.request_id());
    let mut lines: Vec<Option<Line>> = Vec::new();
    lines.resize_with(items.len(), || None);
    let mut cache = crate::bulkprep::ResolutionCache::new();

    for (ordinal, item) in items.into_iter().enumerate() {
        // A scripted/partial `_update` has no single current document to merge
        // against under fan-out, and an optimistic-concurrency precondition
        // (`if_seq_no`/`version`/…) is evaluated against the live version that
        // does not exist at enqueue time, reject either in place rather than
        // silently dropping the precondition.
        if matches!(item.action, BulkAction::Update) || item.concurrency_control {
            let logical_index = item.index.clone().unwrap_or_else(|| index.to_owned());
            lines[ordinal] = Some(Line::error(
                item.action.keyword(),
                logical_index,
                item.id.clone(),
                400,
                "unsupported_async",
            ));
            continue;
        }
        match prepare(router, ctx, &mut cache, item, retry, up_trace.as_ref()).await {
            Ok(p) => {
                let op_id = format!("{batch_id}:{ordinal}");
                let write = QueuedWrite {
                    op_id: op_id.clone(),
                    partition_key: p.partition.as_str().to_owned(),
                    batch: WriteBatch::single(p.op.clone()),
                };
                lines[ordinal] = Some(match queue.enqueue(write).await {
                    Ok(()) => queued_line(&p, op_id),
                    Err(_) => enqueue_failed_line(&p),
                });
            }
            Err(fail) => lines[ordinal] = Some(fail.into_line()),
        }
    }

    render(&lines)
}

/// The positioned `202 queued` line for an enqueued async op, carrying the
/// per-item `op_id` the client correlates a downstream outcome against.
fn queued_line(p: &Prepared, op_id: String) -> Line {
    Line::queued(
        p.action,
        p.logical_index.clone(),
        Some(p.logical_id.clone()),
        202,
        op_id,
    )
}

/// The positioned `503` line for an op the queue refused (retryable; the same
/// `op_id` makes a retry idempotent downstream).
fn enqueue_failed_line(p: &Prepared) -> Line {
    error_line(p, 503, "enqueue_failed")
}

/// Buffers a prepared op into its target's demux buffer and flushes that target
/// when it reaches the op-count *or* byte threshold, so the transformed working
/// set stays bounded by size as well as count (NFR-P7). Shared by the buffered and
/// streamed bulk paths.
async fn buffer_and_flush<R: Router, S: Sink>(
    router: &R,
    sink: &S,
    buffers: &mut HashMap<Target, Entries>,
    sizes: &mut HashMap<Target, usize>,
    lines: &mut [Option<Line>],
    ordinal: usize,
    prepared: Prepared,
) {
    let target = prepared.op.target.clone();
    let op_bytes = op_body_len(&prepared.op);
    let buf = buffers.entry(target.clone()).or_default();
    buf.push((ordinal, prepared));
    let over_count = buf.len() >= FLUSH_THRESHOLD;
    let size = sizes.entry(target.clone()).or_default();
    *size += op_bytes;
    if over_count || *size >= BYTE_FLUSH_THRESHOLD {
        let entries = buffers.remove(&target).unwrap_or_default();
        sizes.remove(&target);
        flush(router, sink, entries, lines).await;
    }
}

/// The byte length of an op's document body (0 for a delete), what the flush
/// byte-budget accounts for.
fn op_body_len(op: &WriteOp) -> usize {
    match &op.doc {
        DocOp::Index { body, .. } | DocOp::Create { body, .. } | DocOp::Update { body, .. } => {
            body.len()
        }
        DocOp::Delete { .. } => 0,
    }
}

/// Flushes one target's sub-batch in place: re-check the migration write gate
/// per item, dispatch the admitted ops, and apply each result to `lines` by its
/// original ordinal. Awaited inline, so the transformed bytes are freed before
/// parsing resumes (the mid-stream backpressure that bounds memory).
async fn flush<R: Router, S: Sink>(
    router: &R,
    sink: &S,
    entries: Entries,
    lines: &mut [Option<Line>],
) {
    let (admitted, rejected) = gate(router, entries).await;
    for (ordinal, p) in &rejected {
        lines[*ordinal] = Some(stale_epoch_line(p));
    }
    apply_results(&admitted, sink.write(build_batch(&admitted)).await, lines);
}

/// Flushes every remaining target's sub-batch **concurrently** (bounded). Each
/// task gates its entries (no `lines` access, so the tasks stay independent) and
/// dispatches the admitted ops; results are applied by ordinal afterward, so
/// completion order cannot disturb re-interleave.
async fn flush_remaining<R: Router, S: Sink>(
    router: &R,
    sink: &S,
    buffers: HashMap<Target, Entries>,
    lines: &mut [Option<Line>],
) {
    type Flushed = (Entries, Entries, Result<WriteAck, SinkError>);
    let pending = buffers.into_values().filter(|v| !v.is_empty());
    let results: Vec<Flushed> = futures_util::stream::iter(pending)
        .map(|entries| async move {
            let (admitted, rejected) = gate(router, entries).await;
            let ack = sink.write(build_batch(&admitted)).await;
            (admitted, rejected, ack)
        })
        .buffer_unordered(MAX_DISPATCH_CONCURRENCY)
        .collect()
        .await;

    for (admitted, rejected, ack) in results {
        for (ordinal, p) in &rejected {
            lines[*ordinal] = Some(stale_epoch_line(p));
        }
        apply_results(&admitted, ack, lines);
    }
}

/// Splits a target's entries by the migration write gate (`docs/06` §2),
/// re-checked here at dispatch: `(admitted, rejected)`. A rejected item resolved
/// against a placement that has since advanced or entered cutover, it is held,
/// never dispatched.
async fn gate<R: Router>(router: &R, entries: Entries) -> (Entries, Entries) {
    let mut admitted = Entries::new();
    let mut rejected = Entries::new();
    for (ordinal, p) in entries {
        if router.admit_write(&p.partition, p.op.epoch).await {
            admitted.push((ordinal, p));
        } else {
            rejected.push((ordinal, p));
        }
    }
    (admitted, rejected)
}

/// The response line for an item held by the migration write gate: a positioned,
/// retryable `409` so the client re-resolves and retries just that item.
fn stale_epoch_line(p: &Prepared) -> Line {
    error_line(p, 409, "stale_epoch")
}

/// An error line built from a prepared op, echoing its logical index/id (never the
/// physical view). The shared shape behind the gate/upstream/enqueue failures.
fn error_line(p: &Prepared, status: u16, error: &'static str) -> Line {
    Line::error(
        p.action,
        p.logical_index.clone(),
        Some(p.logical_id.clone()),
        status,
        error,
    )
}

/// Builds the [`WriteBatch`] for a target's buffered entries.
fn build_batch(entries: &[(usize, Prepared)]) -> WriteBatch {
    entries
        .iter()
        .fold(WriteBatch::new(), |b, (_, p)| b.with(p.op.clone()))
}

/// Applies a sub-batch's outcome to the response lines by ordinal.
fn apply_results(
    entries: &[(usize, Prepared)],
    result: Result<WriteAck, SinkError>,
    lines: &mut [Option<Line>],
) {
    match result {
        Ok(ack) => {
            for ((ordinal, p), op_result) in entries.iter().zip(ack.results()) {
                lines[*ordinal] = Some(success_line(p, op_result));
            }
        }
        Err(_) => {
            for (ordinal, p) in entries {
                lines[*ordinal] = Some(upstream_failure_line(p));
            }
        }
    }
}

/// The response line for a dispatched op. A 2xx/3xx is a positional success
/// (logical id/index); a 4xx upstream rejection (e.g. a `create` id conflict) is
/// surfaced as a positioned, value-free error so the bulk reports `errors:true`.
fn success_line(p: &Prepared, result: &OpResult) -> Line {
    if result.status >= 400 {
        return error_line(p, result.status, error_type_for(result.status));
    }
    let outcome = if result.created { "created" } else { "updated" };
    Line::result(
        p.action,
        p.logical_index.clone(),
        Some(p.logical_id.clone()),
        result.status,
        outcome,
    )
}

/// A value-free error type for a 4xx upstream item status.
fn error_type_for(status: u16) -> &'static str {
    match status {
        409 => "conflict",
        404 => "not_found",
        _ => "rejected",
    }
}

/// The response line for an op whose target failed upstream.
fn upstream_failure_line(p: &Prepared) -> Line {
    error_line(p, 502, "upstream_failed")
}