Skip to main content

osproxy_engine/
pipeline.rs

1//! The request pipeline: orchestrates a classified request through routing,
2//! transform, and delivery, returning a response for the transport to write.
3//!
4//! M1 implements the single-document ingest path (`docs/04` §1): resolve the
5//! routing decision, build the epoch-stamped write batch, dispatch it to the
6//! sink, and shape the acknowledgement into an OpenSearch-style response. M2
7//! adds the get-by-id read path (`docs/04` §5): resolve, map the logical id to
8//! the physical id, fetch, and shape the stored document back into the client's
9//! logical view. Search and bulk attach here in later milestones.
10//
11// JUSTIFY(file-length): this is the central request orchestrator, the lifecycle
12// (classify → route → transform → dispatch → trace → diagnostics decision) is one
13// cohesive flow, and the per-request directive evaluation it owns is the seam
14// every observability/capture feature attaches to. Tests already live in
15// `pipeline_tests.rs`; splitting the flow itself would scatter the lifecycle.
16
17use std::sync::Arc;
18
19use osproxy_core::{Clock, CursorSigner, EndpointKind, RequestId, SystemClock};
20use osproxy_observe::{
21    explain_json, resource_spans, BreakGlassBuffer, ClassifyInfo, DiagLevel, DiagnosticSink,
22    DirectiveSet, DirectiveStore, DirectiveVerifier, EgressInfo, ExplainStore, NoVerifier,
23    NoopDiagnosticSink, NoopExporter, RequestAttrs, RequestTrace, SpanExporter,
24};
25use osproxy_sink::{ByteBody, Reader, Sink, StreamingForward};
26use osproxy_spi::{RequestCtx, SpiError};
27use osproxy_tenancy::Router;
28use serde_json::Value;
29
30use crate::error::RequestError;
31use crate::observe::{error_context, logical_index};
32use crate::search_stream::StreamSearch;
33
34/// How many recent request explanations `/debug/explain` retains per instance.
35const EXPLAIN_CAPACITY: usize = 1024;
36
37/// How many explanations the break-glass tape holds once a `ring_buffer`
38/// directive turns it on. Bounded so an "on" directive cannot grow memory.
39const BREAK_GLASS_CAPACITY: usize = 256;
40
41/// The response the pipeline produces for a handled request.
42///
43/// A status plus a JSON body, mirroring the relevant fields of an OpenSearch
44/// response so the transport can relay it to the client unchanged.
45#[derive(Clone, PartialEq, Eq, Debug)]
46pub struct PipelineResponse {
47    /// The HTTP status to return to the client.
48    pub status: u16,
49    /// The JSON response body.
50    pub body: Vec<u8>,
51    /// The response content type. `None` ⇒ `application/json`: every response the
52    /// proxy *shapes* is JSON, so that is the default. It is set only on the
53    /// verbatim admin/cursor passthrough, where the upstream may answer with a
54    /// non-JSON type (e.g. `_cat` returns `text/plain`); forcing `application/json`
55    /// there would mislabel the body (`docs/03` §6).
56    pub content_type: Option<String>,
57}
58
59impl PipelineResponse {
60    /// A JSON response, the shape every tenancy-aware endpoint returns.
61    #[must_use]
62    pub fn json(status: u16, body: Vec<u8>) -> Self {
63        Self {
64            status,
65            body,
66            content_type: None,
67        }
68    }
69
70    /// Carries the upstream content type verbatim (the admin/cursor passthrough),
71    /// so a non-JSON upstream body is not mislabeled `application/json`.
72    #[must_use]
73    pub fn with_content_type(mut self, content_type: Option<String>) -> Self {
74        self.content_type = content_type;
75        self
76    }
77}
78
79/// Orchestrates requests through a tenancy router and a sink.
80///
81/// Generic over the [`Router`] implementation and the [`Sink`], so the hot path
82/// is monomorphized (no dyn dispatch), a deployment can supply its own router,
83/// and tests can swap in an in-memory sink.
84pub struct Pipeline<R, S> {
85    pub(crate) router: R,
86    pub(crate) sink: S,
87    pub(crate) retry: crate::RetryPolicy,
88    explain: Arc<ExplainStore>,
89    exporter: Arc<dyn SpanExporter>,
90    clock: Arc<dyn Clock>,
91    service_name: String,
92    /// The verbosity applied when no directive raises it. Default [`DiagLevel::Shape`]
93    /// so a configured exporter exports every request; lower it to `Off` to make
94    /// export purely directive-driven (targeted sampling).
95    baseline: DiagLevel,
96    /// Whether traffic capture is on for every request before any directive.
97    /// Default `false`: capture is off until a published `capture` directive
98    /// selects requests (capture on demand). Set `true` for an always-capture
99    /// deployment (e.g. a dedicated capture/migration proxy).
100    baseline_capture: bool,
101    /// The fleet-wide directive source, polled fresh per request so an operator
102    /// can flip verbosity without a restart. Defaults to an empty static set.
103    directive_store: Arc<dyn DirectiveStore>,
104    verifier: Arc<dyn DirectiveVerifier>,
105    /// The break-glass tape, captured into only when a `ring_buffer` directive
106    /// applies to a request. Empty (near-zero cost) until then.
107    break_glass: Arc<BreakGlassBuffer>,
108    /// The fleet-coherent diagnostic sink: a directive-selected capture is pushed
109    /// here (keyed by `trace_id`) as well as into the local break-glass ring, so an
110    /// aggregator can serve it fleet-wide. Default [`NoopDiagnosticSink`] (off): the
111    /// capture stays in the local ring only.
112    diagnostic_sink: Arc<dyn DiagnosticSink>,
113    /// Signs/verifies scroll & PIT affinity envelopes (`docs/03` §6). `None` =
114    /// affinity **off** (the opt-in default): cursor requests fail closed with a
115    /// `CursorUnresolvable` error rather than route blindly.
116    pub(crate) cursor_signer: Option<Arc<dyn CursorSigner>>,
117    /// The admin pass-through policy (`docs/03` §6): which cluster answers
118    /// allow-listed `_cat`/`_cluster`/`_nodes` requests, and which path prefixes
119    /// are permitted. `None` = reject all admin requests (the default).
120    pub(crate) admin_policy: Option<crate::admin::AdminPolicy>,
121    /// Tenant-agnostic passthrough (`None` = pure tenancy mode, the default).
122    /// When set, requests the policy matches (by logical index) are forwarded
123    /// verbatim with no rewrite; unmatched requests stay tenant-isolated. A
124    /// prefix-free policy passes everything through (transparent/capture proxy).
125    pub(crate) passthrough: Option<crate::passthrough::PassthroughPolicy>,
126    /// The write mode applied when a request does not select one with the
127    /// `X-Write-Mode` header. Default [`crate::WriteMode::Sync`], async fan-out is
128    /// opt-in (`docs/04` §9).
129    pub(crate) baseline_write_mode: crate::asyncwrite::WriteMode,
130    /// The durable queue async writes are enqueued onto. Default
131    /// [`crate::asyncwrite::NoQueue`]: async requests are refused (`422`) until a
132    /// real queue is wired in.
133    pub(crate) write_queue: Arc<dyn crate::asyncwrite::WriteQueue>,
134    /// Whether `_delete_by_query` may be expanded into per-match deletes in async
135    /// mode (`docs/04` §9). Default `false`: DBQ is rejected until opted in, since
136    /// it reads the match set and enqueues a delete each.
137    pub(crate) delete_by_query_expansion: bool,
138}
139
140/// The diagnostics decision for one request: how much to record/export, whether
141/// to capture it into the break-glass tape, and whether to tee it to the fleet
142/// traffic-capture sink.
143#[derive(Clone, Copy)]
144struct Diagnostics {
145    level: DiagLevel,
146    capture: bool,
147    traffic_capture: bool,
148}
149
150impl<R, S> std::fmt::Debug for Pipeline<R, S> {
151    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152        // The injected exporter/clock are not `Debug`; show the rest of the shape.
153        f.debug_struct("Pipeline")
154            .field("retry", &self.retry)
155            .field("service_name", &self.service_name)
156            .field("exporting", &self.exporter.enabled())
157            .finish_non_exhaustive()
158    }
159}
160
161impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
162    /// Builds a pipeline from a router and a sink (default backend-retry policy,
163    /// no span export).
164    pub fn new(router: R, sink: S) -> Self {
165        Self {
166            router,
167            sink,
168            retry: crate::RetryPolicy::default(),
169            explain: Arc::new(ExplainStore::new(EXPLAIN_CAPACITY)),
170            exporter: Arc::new(NoopExporter),
171            clock: Arc::new(SystemClock),
172            service_name: "osproxy".to_owned(),
173            baseline: DiagLevel::Shape,
174            baseline_capture: false,
175            // An empty static set as the default store: `Arc<DirectiveSet>` is
176            // itself a `DirectiveStore`, so this is `Arc<dyn DirectiveStore>` over
177            // a constant snapshot. Swap it for a fleet store via builder.
178            directive_store: Arc::new(Arc::new(DirectiveSet::new())),
179            verifier: Arc::new(NoVerifier),
180            break_glass: Arc::new(BreakGlassBuffer::new(BREAK_GLASS_CAPACITY)),
181            diagnostic_sink: Arc::new(NoopDiagnosticSink),
182            cursor_signer: None,
183            admin_policy: None,
184            passthrough: None,
185            baseline_write_mode: crate::asyncwrite::WriteMode::Sync,
186            write_queue: Arc::new(crate::asyncwrite::NoQueue),
187            delete_by_query_expansion: false,
188        }
189    }
190
191    /// Enables the `_delete_by_query` async expansion (builder style). Without it,
192    /// DBQ is rejected even in async mode (`docs/04` §9).
193    #[must_use]
194    pub fn with_delete_by_query_expansion(mut self, on: bool) -> Self {
195        self.delete_by_query_expansion = on;
196        self
197    }
198
199    /// Sets the baseline write mode applied when a request does not carry an
200    /// `X-Write-Mode` header (builder style). Default [`crate::WriteMode::Sync`]; set
201    /// [`crate::WriteMode::Async`] to make durable fan-out the deployment default
202    /// (`docs/04` §9).
203    #[must_use]
204    pub fn with_baseline_write_mode(mut self, mode: crate::asyncwrite::WriteMode) -> Self {
205        self.baseline_write_mode = mode;
206        self
207    }
208
209    /// Sets the durable queue async writes are enqueued onto (builder style).
210    /// Without it, async requests are refused with `422` rather than dropped.
211    #[must_use]
212    pub fn with_write_queue(mut self, queue: Arc<dyn crate::asyncwrite::WriteQueue>) -> Self {
213        self.write_queue = queue;
214        self
215    }
216
217    /// The write mode for `ctx`: the validated `X-Write-Mode` header if present,
218    /// else the deployment baseline. An unparseable header falls back to the
219    /// baseline rather than erroring, an unknown mode is not a hard failure.
220    pub(crate) fn write_mode(&self, ctx: &RequestCtx<'_>) -> crate::asyncwrite::WriteMode {
221        self.resolve_write_mode(ctx.headers().get("x-write-mode"))
222    }
223
224    /// The write mode from a raw `X-Write-Mode` header value (or its absence),
225    /// the precedence shared by [`write_mode`](Self::write_mode) and
226    /// [`is_sync_write`](Self::is_sync_write): a valid header wins, else the baseline.
227    fn resolve_write_mode(&self, header: Option<&str>) -> crate::asyncwrite::WriteMode {
228        header
229            .and_then(crate::asyncwrite::WriteMode::parse)
230            .unwrap_or(self.baseline_write_mode)
231    }
232
233    /// Enables tenant-agnostic passthrough: every request is forwarded verbatim to
234    /// `policy`'s cluster with no tenancy rewrite. Use this for a transparent or
235    /// capture/migration proxy. Without it, the pipeline routes by tenancy (the
236    /// default).
237    #[must_use]
238    pub fn with_passthrough(mut self, policy: crate::passthrough::PassthroughPolicy) -> Self {
239        self.passthrough = Some(policy);
240        self
241    }
242
243    /// Enables opt-in admin pass-through (`docs/03` §6): allow-listed
244    /// `_cat`/`_cluster`/`_nodes` requests are forwarded verbatim to `policy`'s
245    /// cluster. Without this, every admin request is rejected (the default).
246    #[must_use]
247    pub fn with_admin_passthrough(mut self, policy: crate::admin::AdminPolicy) -> Self {
248        self.admin_policy = Some(policy);
249        self
250    }
251
252    /// Enables opt-in scroll/PIT cursor affinity (`docs/03` §6) with `signer`
253    /// signing the cluster↔cursor envelope. Without this, cursor requests fail
254    /// closed (`CursorUnresolvable`) rather than route to an unknown cluster.
255    #[must_use]
256    pub fn with_cursor_signer(mut self, signer: Arc<dyn CursorSigner>) -> Self {
257        self.cursor_signer = Some(signer);
258        self
259    }
260
261    /// Sets the placement-backend retry policy (builder style).
262    #[must_use]
263    pub fn with_retry_policy(mut self, retry: crate::RetryPolicy) -> Self {
264        self.retry = retry;
265        self
266    }
267
268    /// Sets the OTLP span exporter (builder style). Default is no export.
269    #[must_use]
270    pub fn with_exporter(mut self, exporter: Arc<dyn SpanExporter>) -> Self {
271        self.exporter = exporter;
272        self
273    }
274
275    /// Swaps the clock used to stamp span timestamps (tests inject a `ManualClock`).
276    #[must_use]
277    pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
278        self.clock = clock;
279        self
280    }
281
282    /// Sets the `service.name` reported on exported spans (builder style).
283    #[must_use]
284    pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
285        self.service_name = service_name.into();
286        self
287    }
288
289    /// Sets the baseline diagnostics level applied to every request before
290    /// directives (builder style). Default [`DiagLevel::Shape`]; set to
291    /// [`DiagLevel::Off`] to export only what a directive selects.
292    #[must_use]
293    pub fn with_baseline_level(mut self, baseline: DiagLevel) -> Self {
294        self.baseline = baseline;
295        self
296    }
297
298    /// Sets whether traffic capture is on for every request before directives
299    /// (builder style). Default `false` (capture on demand via a published
300    /// directive); set `true` for an always-capture deployment.
301    #[must_use]
302    pub fn with_baseline_capture(mut self, on: bool) -> Self {
303        self.baseline_capture = on;
304        self
305    }
306
307    /// Sets a fixed set of active diagnostics directives (builder style). For a
308    /// fleet-wide, restart-free source use [`Pipeline::with_directive_store`].
309    #[must_use]
310    pub fn with_directives(mut self, directives: Arc<DirectiveSet>) -> Self {
311        // `Arc<DirectiveSet>` is itself a `DirectiveStore` (a constant snapshot).
312        self.directive_store = Arc::new(directives);
313        self
314    }
315
316    /// Sets the fleet-wide directive store (builder style). The pipeline polls it
317    /// fresh per request, so a controller publishing a new set flips verbosity
318    /// across the fleet without a restart (`docs/05` §3).
319    #[must_use]
320    pub fn with_directive_store(mut self, store: Arc<dyn DirectiveStore>) -> Self {
321        self.directive_store = store;
322        self
323    }
324
325    /// Sets the verifier for the signed `X-Debug-Directive` header (builder
326    /// style). Default rejects all headers; a real verifier enables the surgical,
327    /// single-request directive channel.
328    #[must_use]
329    pub fn with_directive_verifier(mut self, verifier: Arc<dyn DirectiveVerifier>) -> Self {
330        self.verifier = verifier;
331        self
332    }
333
334    /// Shares the break-glass tape (builder style), so a debug endpoint can read
335    /// the captured sequence and tests can inspect it.
336    #[must_use]
337    pub fn with_break_glass(mut self, break_glass: Arc<BreakGlassBuffer>) -> Self {
338        self.break_glass = break_glass;
339        self
340    }
341
342    /// Sets the fleet-coherent diagnostic sink (builder style): a directive-selected
343    /// capture is pushed here (keyed by `trace_id`) in addition to the local
344    /// break-glass ring, so an aggregator can serve it fleet-wide (`docs/05` §5).
345    #[must_use]
346    pub fn with_diagnostic_sink(mut self, sink: Arc<dyn DiagnosticSink>) -> Self {
347        self.diagnostic_sink = sink;
348        self
349    }
350
351    /// The assembled `/debug/explain` document for a past request, if retained.
352    #[must_use]
353    pub fn explain(&self, request_id: &RequestId) -> Option<Value> {
354        self.explain.get(request_id)
355    }
356
357    /// The break-glass tape, the explanations captured while a `ring_buffer`
358    /// directive was in effect (`docs/05` §5).
359    #[must_use]
360    pub fn break_glass(&self) -> &Arc<BreakGlassBuffer> {
361        &self.break_glass
362    }
363
364    /// The underlying sink (e.g. to inspect what an in-memory sink recorded).
365    #[must_use]
366    pub fn sink(&self) -> &S {
367        &self.sink
368    }
369
370    /// The trace context to inject onto an upstream request, or `None` when the
371    /// proxy is not adding a span of its own (span export off). With export off the
372    /// proxy stays transparent to tracing: the client's own trace headers (W3C, B3,
373    /// anything) ride through verbatim in the forwarded header set, and the proxy
374    /// inserts no `traceparent` of its own (`docs/05`). With export on it injects
375    /// its hop's `traceparent`, overriding the client's so the upstream span nests
376    /// under the proxy's.
377    pub(crate) fn upstream_trace(
378        &self,
379        ctx: &RequestCtx<'_>,
380    ) -> Option<osproxy_core::TraceContext> {
381        self.exporter
382            .enabled()
383            .then(|| crate::endpoints::wire_trace(ctx))
384    }
385
386    /// Handles an authenticated request, dispatching on its endpoint class.
387    ///
388    /// Records a shape-only causal trace for every request (success or failure)
389    /// into the explain store, so `/debug/explain/{id}` can reconstruct it
390    /// (`docs/05`).
391    ///
392    /// # Errors
393    ///
394    /// Returns [`RequestError`] if the endpoint is unsupported in M1, routing
395    /// fails, the body transform fails, or the sink rejects the write.
396    pub async fn handle(&self, ctx: &RequestCtx<'_>) -> Result<PipelineResponse, RequestError> {
397        self.handle_with_capture(ctx).await.0
398    }
399
400    /// Like [`Self::handle`], but also returns whether this request should be teed
401    /// to the fleet traffic-capture sink, the live per-request capture decision,
402    /// applied by the ingress to both success and error responses.
403    pub async fn handle_with_capture(
404        &self,
405        ctx: &RequestCtx<'_>,
406    ) -> (Result<PipelineResponse, RequestError>, bool) {
407        // Only pay for span timing/encoding when an exporter is actually active,
408        // "Off" stays near-zero cost (`docs/05`).
409        let exporting = self.exporter.enabled();
410        let start_nanos = if exporting {
411            self.clock.unix_nanos()
412        } else {
413            0
414        };
415
416        let mut trace = RequestTrace::new();
417        // The same W3C context propagated to downstream calls is recorded here, so
418        // `/debug/explain` and the exported OTLP span share the request's ids.
419        trace.record_context(crate::endpoints::wire_trace(ctx));
420        trace.record_classify(ClassifyInfo {
421            endpoint: ctx.endpoint(),
422            logical_index: logical_index(ctx.logical_index()),
423        });
424
425        let result = self.dispatch(ctx, &mut trace).await;
426        match &result {
427            Ok(resp) => trace.record_egress(EgressInfo {
428                status: resp.status,
429                response_bytes: resp.body.len(),
430            }),
431            Err(err) => trace.record_error(error_context(err)),
432        }
433        self.explain.record(ctx.request_id().clone(), &trace);
434
435        let diag = self.diagnostics(ctx, &trace);
436
437        // Break-glass: capture the explanation when a `ring_buffer`/`capture`
438        // directive selected this request (`docs/05` §5). Off by default, so this
439        // stays empty until an operator flips it on. The doc is built once and both
440        // retained in the local ring and pushed to the fleet diagnostic sink
441        // (keyed by `trace_id`) so it is reachable on any instance.
442        if diag.capture {
443            let doc = explain_json(ctx.request_id(), &trace);
444            if self.diagnostic_sink.enabled() {
445                self.diagnostic_sink.emit(doc.clone());
446            }
447            self.break_glass.capture(doc);
448        }
449
450        // Export the span when an exporter is active AND the diagnostics level for
451        // this request reaches at least `Shape`, so directives can restrict export
452        // to a targeted, sampled subset (`docs/05` §3). Background, best-effort.
453        if exporting && diag.level >= DiagLevel::Shape {
454            let end_nanos = self.clock.unix_nanos();
455            if let Some(payload) = resource_spans(
456                &self.service_name,
457                ctx.request_id(),
458                &trace,
459                start_nanos,
460                end_nanos,
461            ) {
462                self.exporter.export(payload);
463            }
464        }
465        (result, diag.traffic_capture)
466    }
467
468    /// The diagnostics decision for a finished request: the baseline level raised
469    /// by any directive that targets it (by tenant/index/principal/endpoint),
470    /// plus whether any applying directive wants break-glass capture. Evaluated at
471    /// the current time so expiry/sampling apply; the directive store is polled
472    /// fresh and the signed header verified exactly once.
473    fn diagnostics(&self, ctx: &RequestCtx<'_>, trace: &RequestTrace) -> Diagnostics {
474        let attrs = RequestAttrs {
475            tenant: trace.resolved_partition(),
476            index: ctx.logical_index(),
477            principal: ctx.principal_id(),
478            endpoint: ctx.endpoint(),
479        };
480        let now = self.clock.now();
481        let request = ctx.request_id();
482        // Poll the fleet directive store fresh (a cheap Arc clone of the current
483        // snapshot) so a published flip takes effect without a restart.
484        let snapshot = self.directive_store.load();
485        let mut level = self.baseline.max(snapshot.evaluate(&attrs, now, request));
486        let mut capture = snapshot.wants_ring_buffer(&attrs, now, request);
487        // Traffic capture is on when the deployment baseline says always-on or any
488        // published `capture` directive selects this request (capture on demand).
489        let mut traffic_capture =
490            self.baseline_capture || snapshot.wants_capture(&attrs, now, request);
491        // Fold in a verified single-request directive from the signed
492        // `X-Debug-Directive` header, if present and valid (`docs/05` §3).
493        if let Some(directive) = ctx
494            .headers()
495            .get("x-debug-directive")
496            .and_then(|h| self.verifier.verify(h))
497        {
498            if let Some(from_header) = directive.level_if_applies(&attrs, now, request) {
499                level = level.max(from_header);
500                capture |= directive.ring_buffer;
501                traffic_capture |= directive.capture;
502            }
503        }
504        Diagnostics {
505            level,
506            capture,
507            traffic_capture,
508        }
509    }
510
511    /// Whether the effective write mode for a request with these headers is sync,
512    /// the `X-Write-Mode` header if present and valid, else the deployment
513    /// baseline. Lets the transport decide to stream-demux a `_bulk` (sync only;
514    /// async fan-out keeps the buffered path) from the head alone (ADR-014 stage 4).
515    #[must_use]
516    pub fn is_sync_write(&self, headers: &[(String, String)]) -> bool {
517        let header = headers
518            .iter()
519            .find(|(k, _)| k.eq_ignore_ascii_case("x-write-mode"))
520            .map(|(_, v)| v.as_str());
521        self.resolve_write_mode(header) == crate::asyncwrite::WriteMode::Sync
522    }
523
524    /// Whether a request for `logical_index` is a tenant-agnostic passthrough that
525    /// can be **streamed** verbatim (ADR-014 stage 2). Body-free so the transport
526    /// can decide before buffering. `false` when no passthrough policy is set or
527    /// the index is not matched (the request then takes the buffered tenancy path).
528    #[must_use]
529    pub fn is_passthrough(&self, logical_index: &str) -> bool {
530        self.passthrough
531            .as_ref()
532            .is_some_and(|p| p.matches_index(logical_index))
533    }
534
535    /// Handles a verbatim passthrough request whose body is supplied as a
536    /// **stream** (ADR-014 stage 2): forward it to the passthrough cluster without
537    /// buffering. Mirrors [`handle_with_capture`](Self::handle_with_capture)'s
538    /// trace lifecycle (classify → dispatch → egress, recorded into the explain
539    /// store), minus the buffered-body diagnostics: traffic capture is never
540    /// available here because the body is not retained, so the returned flag is
541    /// always `false`.
542    pub async fn forward_streamed(
543        &self,
544        ctx: &RequestCtx<'_>,
545        body: ByteBody,
546    ) -> (Result<StreamingForward, RequestError>, bool) {
547        let mut trace = Self::begin_streamed_trace(ctx);
548        let result = match self.passthrough.as_ref() {
549            Some(policy) => self.forward_stream(ctx, policy, body, &mut trace).await,
550            // Only reachable if a caller streams a request `is_passthrough` rejects.
551            None => Err(RequestError::Spi(SpiError::UnsupportedEndpoint {
552                endpoint: ctx.endpoint(),
553            })),
554        };
555        // The response body is a live stream of unknown length, so egress records
556        // the status with zero bytes (the size is not known until it has flowed).
557        match &result {
558            Ok(f) => trace.record_egress(EgressInfo {
559                status: f.status,
560                response_bytes: 0,
561            }),
562            Err(err) => trace.record_error(error_context(err)),
563        }
564        self.explain.record(ctx.request_id().clone(), &trace);
565        (result, false)
566    }
567
568    /// Handles a `_search` whose response is streamed back through the hit
569    /// transform (ADR-014, final stage): the upstream body is never buffered, each
570    /// hit is shaped incrementally and every sibling (notably `aggregations`) is
571    /// forwarded verbatim. Same trace lifecycle as
572    /// [`forward_streamed`](Self::forward_streamed): the body length is unknown
573    /// until it flows, so egress records the status with zero bytes. The request
574    /// query body is small and already buffered in `ctx`; only the response
575    /// streams. Returns the result plus `false`, capture is never available on a
576    /// streamed path (and the caller only streams when capture is off).
577    pub async fn search_streamed(
578        &self,
579        ctx: &RequestCtx<'_>,
580    ) -> (Result<StreamSearch, RequestError>, bool) {
581        let mut trace = Self::begin_streamed_trace(ctx);
582        let result = self.run_search_stream(ctx, &mut trace).await;
583        match &result {
584            Ok(s) => trace.record_egress(EgressInfo {
585                status: s.status,
586                response_bytes: 0,
587            }),
588            Err(err) => trace.record_error(error_context(err)),
589        }
590        self.explain.record(ctx.request_id().clone(), &trace);
591        (result, false)
592    }
593
594    /// Handles a `_bulk` request whose body is supplied as a **stream** (ADR-014
595    /// stage 4): frame and demux the NDJSON incrementally so the whole batch is
596    /// never buffered. Same trace lifecycle as [`forward_streamed`](Self::forward_streamed)
597    /// (classify → egress, into the explain store); per-op outcomes live
598    /// positionally in the response body, as in the buffered bulk path. Sync write
599    /// mode only, the streaming decision is made by the caller; async fan-out
600    /// keeps the buffered path.
601    pub async fn handle_bulk_streamed(
602        &self,
603        ctx: &RequestCtx<'_>,
604        body: ByteBody,
605    ) -> (Result<PipelineResponse, RequestError>, bool) {
606        // Bulk records its outcome positionally in the response, not per-stage, so
607        // the trace passes straight from open to close with no mid-stage spans.
608        let trace = Self::begin_streamed_trace(ctx);
609        let up_trace = self.upstream_trace(ctx);
610        let result = crate::bulk::ingest_bulk_streamed(
611            &self.router,
612            &self.sink,
613            ctx,
614            body,
615            self.retry,
616            up_trace,
617        )
618        .await;
619        self.finish_streamed_trace(ctx, trace, result)
620    }
621
622    /// Opens the shape-only trace for a streamed request: context + classify, the
623    /// stages known before dispatch. Shared by the streamed forward and bulk paths.
624    fn begin_streamed_trace(ctx: &RequestCtx<'_>) -> RequestTrace {
625        let mut trace = RequestTrace::new();
626        trace.record_context(crate::endpoints::wire_trace(ctx));
627        trace.record_classify(ClassifyInfo {
628            endpoint: ctx.endpoint(),
629            logical_index: logical_index(ctx.logical_index()),
630        });
631        trace
632    }
633
634    /// Closes a streamed request's trace (egress or error) and records it into the
635    /// explain store. Returns the result plus `false`, traffic capture is never
636    /// available on a streamed path (the body is not retained to tee).
637    fn finish_streamed_trace(
638        &self,
639        ctx: &RequestCtx<'_>,
640        mut trace: RequestTrace,
641        result: Result<PipelineResponse, RequestError>,
642    ) -> (Result<PipelineResponse, RequestError>, bool) {
643        match &result {
644            Ok(resp) => trace.record_egress(EgressInfo {
645                status: resp.status,
646                response_bytes: resp.body.len(),
647            }),
648            Err(err) => trace.record_error(error_context(err)),
649        }
650        self.explain.record(ctx.request_id().clone(), &trace);
651        (result, false)
652    }
653
654    /// Dispatches on endpoint class, recording the per-stage spans into `trace`.
655    async fn dispatch(
656        &self,
657        ctx: &RequestCtx<'_>,
658        trace: &mut RequestTrace,
659    ) -> Result<PipelineResponse, RequestError> {
660        // Tenant-agnostic passthrough short-circuits tenancy dispatch for the
661        // requests it matches (by logical index); unmatched requests fall through
662        // to tenancy below (fail-closed).
663        if let Some(policy) = self.passthrough.as_ref().filter(|p| p.matches(ctx)) {
664            return self.forward(ctx, policy, trace).await;
665        }
666        match ctx.endpoint() {
667            EndpointKind::IngestDoc => self.ingest_doc(ctx, trace).await,
668            EndpointKind::IngestBulk => self.ingest_bulk(ctx, trace).await,
669            EndpointKind::GetById => self.get_by_id(ctx, trace).await,
670            EndpointKind::MultiGet => self.multi_get(ctx, trace).await,
671            EndpointKind::DeleteById => self.delete_by_id(ctx, trace).await,
672            EndpointKind::DeleteByQuery => self.delete_by_query(ctx, trace).await,
673            EndpointKind::Search => self.search(ctx, trace).await,
674            EndpointKind::MultiSearch => self.multi_search(ctx, trace).await,
675            EndpointKind::Count => self.count(ctx, trace).await,
676            EndpointKind::Cursor => self.cursor(ctx, trace).await,
677            EndpointKind::Admin => self.admin(ctx, trace).await,
678            other => Err(RequestError::Spi(SpiError::UnsupportedEndpoint {
679                endpoint: other,
680            })),
681        }
682    }
683}
684
685#[cfg(test)]
686#[path = "pipeline_tests.rs"]
687mod tests;