osproxy_engine/pipeline.rs
1//! The request pipeline: orchestrates a classified request through routing,
2//! transform, and delivery, returning a response for the transport to write.
3//!
4//! M1 implements the single-document ingest path (`docs/04` §1): resolve the
5//! routing decision, build the epoch-stamped write batch, dispatch it to the
6//! sink, and shape the acknowledgement into an OpenSearch-style response. M2
7//! adds the get-by-id read path (`docs/04` §5): resolve, map the logical id to
8//! the physical id, fetch, and shape the stored document back into the client's
9//! logical view. Search and bulk attach here in later milestones.
10//
11// JUSTIFY(file-length): this is the central request orchestrator, the lifecycle
12// (classify → route → transform → dispatch → trace → diagnostics decision) is one
13// cohesive flow, and the per-request directive evaluation it owns is the seam
14// every observability/capture feature attaches to. Tests already live in
15// `pipeline_tests.rs`; splitting the flow itself would scatter the lifecycle.
16
17use std::sync::Arc;
18
19use osproxy_core::{Clock, CursorSigner, EndpointKind, RequestId, SystemClock};
20use osproxy_observe::{
21 explain_json, resource_spans, BreakGlassBuffer, ClassifyInfo, DiagLevel, DiagnosticSink,
22 DirectiveSet, DirectiveStore, DirectiveVerifier, EgressInfo, ExplainStore, NoVerifier,
23 NoopDiagnosticSink, NoopExporter, RequestAttrs, RequestTrace, SpanExporter,
24};
25use osproxy_sink::{ByteBody, Reader, Sink, StreamingForward};
26use osproxy_spi::{RequestCtx, SpiError};
27use osproxy_tenancy::Router;
28use serde_json::Value;
29
30use crate::error::RequestError;
31use crate::observe::{error_context, logical_index};
32use crate::search_stream::StreamSearch;
33
34/// How many recent request explanations `/debug/explain` retains per instance.
35const EXPLAIN_CAPACITY: usize = 1024;
36
37/// How many explanations the break-glass tape holds once a `ring_buffer`
38/// directive turns it on. Bounded so an "on" directive cannot grow memory.
39const BREAK_GLASS_CAPACITY: usize = 256;
40
41/// The response the pipeline produces for a handled request.
42///
43/// A status plus a JSON body, mirroring the relevant fields of an OpenSearch
44/// response so the transport can relay it to the client unchanged.
45#[derive(Clone, PartialEq, Eq, Debug)]
46pub struct PipelineResponse {
47 /// The HTTP status to return to the client.
48 pub status: u16,
49 /// The JSON response body.
50 pub body: Vec<u8>,
51 /// The response content type. `None` ⇒ `application/json`: every response the
52 /// proxy *shapes* is JSON, so that is the default. It is set only on the
53 /// verbatim admin/cursor passthrough, where the upstream may answer with a
54 /// non-JSON type (e.g. `_cat` returns `text/plain`); forcing `application/json`
55 /// there would mislabel the body (`docs/03` §6).
56 pub content_type: Option<String>,
57}
58
59impl PipelineResponse {
60 /// A JSON response, the shape every tenancy-aware endpoint returns.
61 #[must_use]
62 pub fn json(status: u16, body: Vec<u8>) -> Self {
63 Self {
64 status,
65 body,
66 content_type: None,
67 }
68 }
69
70 /// Carries the upstream content type verbatim (the admin/cursor passthrough),
71 /// so a non-JSON upstream body is not mislabeled `application/json`.
72 #[must_use]
73 pub fn with_content_type(mut self, content_type: Option<String>) -> Self {
74 self.content_type = content_type;
75 self
76 }
77}
78
79/// Orchestrates requests through a tenancy router and a sink.
80///
81/// Generic over the [`Router`] implementation and the [`Sink`], so the hot path
82/// is monomorphized (no dyn dispatch), a deployment can supply its own router,
83/// and tests can swap in an in-memory sink.
84pub struct Pipeline<R, S> {
85 pub(crate) router: R,
86 pub(crate) sink: S,
87 pub(crate) retry: crate::RetryPolicy,
88 explain: Arc<ExplainStore>,
89 exporter: Arc<dyn SpanExporter>,
90 clock: Arc<dyn Clock>,
91 service_name: String,
92 /// The verbosity applied when no directive raises it. Default [`DiagLevel::Shape`]
93 /// so a configured exporter exports every request; lower it to `Off` to make
94 /// export purely directive-driven (targeted sampling).
95 baseline: DiagLevel,
96 /// Whether traffic capture is on for every request before any directive.
97 /// Default `false`: capture is off until a published `capture` directive
98 /// selects requests (capture on demand). Set `true` for an always-capture
99 /// deployment (e.g. a dedicated capture/migration proxy).
100 baseline_capture: bool,
101 /// The fleet-wide directive source, polled fresh per request so an operator
102 /// can flip verbosity without a restart. Defaults to an empty static set.
103 directive_store: Arc<dyn DirectiveStore>,
104 verifier: Arc<dyn DirectiveVerifier>,
105 /// The break-glass tape, captured into only when a `ring_buffer` directive
106 /// applies to a request. Empty (near-zero cost) until then.
107 break_glass: Arc<BreakGlassBuffer>,
108 /// The fleet-coherent diagnostic sink: a directive-selected capture is pushed
109 /// here (keyed by `trace_id`) as well as into the local break-glass ring, so an
110 /// aggregator can serve it fleet-wide. Default [`NoopDiagnosticSink`] (off): the
111 /// capture stays in the local ring only.
112 diagnostic_sink: Arc<dyn DiagnosticSink>,
113 /// Signs/verifies scroll & PIT affinity envelopes (`docs/03` §6). `None` =
114 /// affinity **off** (the opt-in default): cursor requests fail closed with a
115 /// `CursorUnresolvable` error rather than route blindly.
116 pub(crate) cursor_signer: Option<Arc<dyn CursorSigner>>,
117 /// The admin pass-through policy (`docs/03` §6): which cluster answers
118 /// allow-listed `_cat`/`_cluster`/`_nodes` requests, and which path prefixes
119 /// are permitted. `None` = reject all admin requests (the default).
120 pub(crate) admin_policy: Option<crate::admin::AdminPolicy>,
121 /// Tenant-agnostic passthrough (`None` = pure tenancy mode, the default).
122 /// When set, requests the policy matches (by logical index) are forwarded
123 /// verbatim with no rewrite; unmatched requests stay tenant-isolated. A
124 /// prefix-free policy passes everything through (transparent/capture proxy).
125 pub(crate) passthrough: Option<crate::passthrough::PassthroughPolicy>,
126 /// The write mode applied when a request does not select one with the
127 /// `X-Write-Mode` header. Default [`crate::WriteMode::Sync`], async fan-out is
128 /// opt-in (`docs/04` §9).
129 pub(crate) baseline_write_mode: crate::asyncwrite::WriteMode,
130 /// The durable queue async writes are enqueued onto. Default
131 /// [`crate::asyncwrite::NoQueue`]: async requests are refused (`422`) until a
132 /// real queue is wired in.
133 pub(crate) write_queue: Arc<dyn crate::asyncwrite::WriteQueue>,
134 /// Whether `_delete_by_query` may be expanded into per-match deletes in async
135 /// mode (`docs/04` §9). Default `false`: DBQ is rejected until opted in, since
136 /// it reads the match set and enqueues a delete each.
137 pub(crate) delete_by_query_expansion: bool,
138}
139
140/// The diagnostics decision for one request: how much to record/export, whether
141/// to capture it into the break-glass tape, and whether to tee it to the fleet
142/// traffic-capture sink.
143#[derive(Clone, Copy)]
144struct Diagnostics {
145 level: DiagLevel,
146 capture: bool,
147 traffic_capture: bool,
148}
149
150impl<R, S> std::fmt::Debug for Pipeline<R, S> {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 // The injected exporter/clock are not `Debug`; show the rest of the shape.
153 f.debug_struct("Pipeline")
154 .field("retry", &self.retry)
155 .field("service_name", &self.service_name)
156 .field("exporting", &self.exporter.enabled())
157 .finish_non_exhaustive()
158 }
159}
160
161impl<R: Router, S: Sink + Reader> Pipeline<R, S> {
162 /// Builds a pipeline from a router and a sink (default backend-retry policy,
163 /// no span export).
164 pub fn new(router: R, sink: S) -> Self {
165 Self {
166 router,
167 sink,
168 retry: crate::RetryPolicy::default(),
169 explain: Arc::new(ExplainStore::new(EXPLAIN_CAPACITY)),
170 exporter: Arc::new(NoopExporter),
171 clock: Arc::new(SystemClock),
172 service_name: "osproxy".to_owned(),
173 baseline: DiagLevel::Shape,
174 baseline_capture: false,
175 // An empty static set as the default store: `Arc<DirectiveSet>` is
176 // itself a `DirectiveStore`, so this is `Arc<dyn DirectiveStore>` over
177 // a constant snapshot. Swap it for a fleet store via builder.
178 directive_store: Arc::new(Arc::new(DirectiveSet::new())),
179 verifier: Arc::new(NoVerifier),
180 break_glass: Arc::new(BreakGlassBuffer::new(BREAK_GLASS_CAPACITY)),
181 diagnostic_sink: Arc::new(NoopDiagnosticSink),
182 cursor_signer: None,
183 admin_policy: None,
184 passthrough: None,
185 baseline_write_mode: crate::asyncwrite::WriteMode::Sync,
186 write_queue: Arc::new(crate::asyncwrite::NoQueue),
187 delete_by_query_expansion: false,
188 }
189 }
190
191 /// Enables the `_delete_by_query` async expansion (builder style). Without it,
192 /// DBQ is rejected even in async mode (`docs/04` §9).
193 #[must_use]
194 pub fn with_delete_by_query_expansion(mut self, on: bool) -> Self {
195 self.delete_by_query_expansion = on;
196 self
197 }
198
199 /// Sets the baseline write mode applied when a request does not carry an
200 /// `X-Write-Mode` header (builder style). Default [`crate::WriteMode::Sync`]; set
201 /// [`crate::WriteMode::Async`] to make durable fan-out the deployment default
202 /// (`docs/04` §9).
203 #[must_use]
204 pub fn with_baseline_write_mode(mut self, mode: crate::asyncwrite::WriteMode) -> Self {
205 self.baseline_write_mode = mode;
206 self
207 }
208
209 /// Sets the durable queue async writes are enqueued onto (builder style).
210 /// Without it, async requests are refused with `422` rather than dropped.
211 #[must_use]
212 pub fn with_write_queue(mut self, queue: Arc<dyn crate::asyncwrite::WriteQueue>) -> Self {
213 self.write_queue = queue;
214 self
215 }
216
217 /// The write mode for `ctx`: the validated `X-Write-Mode` header if present,
218 /// else the deployment baseline. An unparseable header falls back to the
219 /// baseline rather than erroring, an unknown mode is not a hard failure.
220 pub(crate) fn write_mode(&self, ctx: &RequestCtx<'_>) -> crate::asyncwrite::WriteMode {
221 self.resolve_write_mode(ctx.headers().get("x-write-mode"))
222 }
223
224 /// The write mode from a raw `X-Write-Mode` header value (or its absence),
225 /// the precedence shared by [`write_mode`](Self::write_mode) and
226 /// [`is_sync_write`](Self::is_sync_write): a valid header wins, else the baseline.
227 fn resolve_write_mode(&self, header: Option<&str>) -> crate::asyncwrite::WriteMode {
228 header
229 .and_then(crate::asyncwrite::WriteMode::parse)
230 .unwrap_or(self.baseline_write_mode)
231 }
232
233 /// Enables tenant-agnostic passthrough: every request is forwarded verbatim to
234 /// `policy`'s cluster with no tenancy rewrite. Use this for a transparent or
235 /// capture/migration proxy. Without it, the pipeline routes by tenancy (the
236 /// default).
237 #[must_use]
238 pub fn with_passthrough(mut self, policy: crate::passthrough::PassthroughPolicy) -> Self {
239 self.passthrough = Some(policy);
240 self
241 }
242
243 /// Enables opt-in admin pass-through (`docs/03` §6): allow-listed
244 /// `_cat`/`_cluster`/`_nodes` requests are forwarded verbatim to `policy`'s
245 /// cluster. Without this, every admin request is rejected (the default).
246 #[must_use]
247 pub fn with_admin_passthrough(mut self, policy: crate::admin::AdminPolicy) -> Self {
248 self.admin_policy = Some(policy);
249 self
250 }
251
252 /// Enables opt-in scroll/PIT cursor affinity (`docs/03` §6) with `signer`
253 /// signing the cluster↔cursor envelope. Without this, cursor requests fail
254 /// closed (`CursorUnresolvable`) rather than route to an unknown cluster.
255 #[must_use]
256 pub fn with_cursor_signer(mut self, signer: Arc<dyn CursorSigner>) -> Self {
257 self.cursor_signer = Some(signer);
258 self
259 }
260
261 /// Sets the placement-backend retry policy (builder style).
262 #[must_use]
263 pub fn with_retry_policy(mut self, retry: crate::RetryPolicy) -> Self {
264 self.retry = retry;
265 self
266 }
267
268 /// Sets the OTLP span exporter (builder style). Default is no export.
269 #[must_use]
270 pub fn with_exporter(mut self, exporter: Arc<dyn SpanExporter>) -> Self {
271 self.exporter = exporter;
272 self
273 }
274
275 /// Swaps the clock used to stamp span timestamps (tests inject a `ManualClock`).
276 #[must_use]
277 pub fn with_clock(mut self, clock: Arc<dyn Clock>) -> Self {
278 self.clock = clock;
279 self
280 }
281
282 /// Sets the `service.name` reported on exported spans (builder style).
283 #[must_use]
284 pub fn with_service_name(mut self, service_name: impl Into<String>) -> Self {
285 self.service_name = service_name.into();
286 self
287 }
288
289 /// Sets the baseline diagnostics level applied to every request before
290 /// directives (builder style). Default [`DiagLevel::Shape`]; set to
291 /// [`DiagLevel::Off`] to export only what a directive selects.
292 #[must_use]
293 pub fn with_baseline_level(mut self, baseline: DiagLevel) -> Self {
294 self.baseline = baseline;
295 self
296 }
297
298 /// Sets whether traffic capture is on for every request before directives
299 /// (builder style). Default `false` (capture on demand via a published
300 /// directive); set `true` for an always-capture deployment.
301 #[must_use]
302 pub fn with_baseline_capture(mut self, on: bool) -> Self {
303 self.baseline_capture = on;
304 self
305 }
306
307 /// Sets a fixed set of active diagnostics directives (builder style). For a
308 /// fleet-wide, restart-free source use [`Pipeline::with_directive_store`].
309 #[must_use]
310 pub fn with_directives(mut self, directives: Arc<DirectiveSet>) -> Self {
311 // `Arc<DirectiveSet>` is itself a `DirectiveStore` (a constant snapshot).
312 self.directive_store = Arc::new(directives);
313 self
314 }
315
316 /// Sets the fleet-wide directive store (builder style). The pipeline polls it
317 /// fresh per request, so a controller publishing a new set flips verbosity
318 /// across the fleet without a restart (`docs/05` §3).
319 #[must_use]
320 pub fn with_directive_store(mut self, store: Arc<dyn DirectiveStore>) -> Self {
321 self.directive_store = store;
322 self
323 }
324
325 /// Sets the verifier for the signed `X-Debug-Directive` header (builder
326 /// style). Default rejects all headers; a real verifier enables the surgical,
327 /// single-request directive channel.
328 #[must_use]
329 pub fn with_directive_verifier(mut self, verifier: Arc<dyn DirectiveVerifier>) -> Self {
330 self.verifier = verifier;
331 self
332 }
333
334 /// Shares the break-glass tape (builder style), so a debug endpoint can read
335 /// the captured sequence and tests can inspect it.
336 #[must_use]
337 pub fn with_break_glass(mut self, break_glass: Arc<BreakGlassBuffer>) -> Self {
338 self.break_glass = break_glass;
339 self
340 }
341
342 /// Sets the fleet-coherent diagnostic sink (builder style): a directive-selected
343 /// capture is pushed here (keyed by `trace_id`) in addition to the local
344 /// break-glass ring, so an aggregator can serve it fleet-wide (`docs/05` §5).
345 #[must_use]
346 pub fn with_diagnostic_sink(mut self, sink: Arc<dyn DiagnosticSink>) -> Self {
347 self.diagnostic_sink = sink;
348 self
349 }
350
351 /// The assembled `/debug/explain` document for a past request, if retained.
352 #[must_use]
353 pub fn explain(&self, request_id: &RequestId) -> Option<Value> {
354 self.explain.get(request_id)
355 }
356
357 /// The break-glass tape, the explanations captured while a `ring_buffer`
358 /// directive was in effect (`docs/05` §5).
359 #[must_use]
360 pub fn break_glass(&self) -> &Arc<BreakGlassBuffer> {
361 &self.break_glass
362 }
363
364 /// The underlying sink (e.g. to inspect what an in-memory sink recorded).
365 #[must_use]
366 pub fn sink(&self) -> &S {
367 &self.sink
368 }
369
370 /// The trace context to inject onto an upstream request, or `None` when the
371 /// proxy is not adding a span of its own (span export off). With export off the
372 /// proxy stays transparent to tracing: the client's own trace headers (W3C, B3,
373 /// anything) ride through verbatim in the forwarded header set, and the proxy
374 /// inserts no `traceparent` of its own (`docs/05`). With export on it injects
375 /// its hop's `traceparent`, overriding the client's so the upstream span nests
376 /// under the proxy's.
377 pub(crate) fn upstream_trace(
378 &self,
379 ctx: &RequestCtx<'_>,
380 ) -> Option<osproxy_core::TraceContext> {
381 self.exporter
382 .enabled()
383 .then(|| crate::endpoints::wire_trace(ctx))
384 }
385
386 /// Handles an authenticated request, dispatching on its endpoint class.
387 ///
388 /// Records a shape-only causal trace for every request (success or failure)
389 /// into the explain store, so `/debug/explain/{id}` can reconstruct it
390 /// (`docs/05`).
391 ///
392 /// # Errors
393 ///
394 /// Returns [`RequestError`] if the endpoint is unsupported in M1, routing
395 /// fails, the body transform fails, or the sink rejects the write.
396 pub async fn handle(&self, ctx: &RequestCtx<'_>) -> Result<PipelineResponse, RequestError> {
397 self.handle_with_capture(ctx).await.0
398 }
399
400 /// Like [`Self::handle`], but also returns whether this request should be teed
401 /// to the fleet traffic-capture sink, the live per-request capture decision,
402 /// applied by the ingress to both success and error responses.
403 pub async fn handle_with_capture(
404 &self,
405 ctx: &RequestCtx<'_>,
406 ) -> (Result<PipelineResponse, RequestError>, bool) {
407 // Only pay for span timing/encoding when an exporter is actually active,
408 // "Off" stays near-zero cost (`docs/05`).
409 let exporting = self.exporter.enabled();
410 let start_nanos = if exporting {
411 self.clock.unix_nanos()
412 } else {
413 0
414 };
415
416 let mut trace = RequestTrace::new();
417 // The same W3C context propagated to downstream calls is recorded here, so
418 // `/debug/explain` and the exported OTLP span share the request's ids.
419 trace.record_context(crate::endpoints::wire_trace(ctx));
420 trace.record_classify(ClassifyInfo {
421 endpoint: ctx.endpoint(),
422 logical_index: logical_index(ctx.logical_index()),
423 });
424
425 let result = self.dispatch(ctx, &mut trace).await;
426 match &result {
427 Ok(resp) => trace.record_egress(EgressInfo {
428 status: resp.status,
429 response_bytes: resp.body.len(),
430 }),
431 Err(err) => trace.record_error(error_context(err)),
432 }
433 self.explain.record(ctx.request_id().clone(), &trace);
434
435 let diag = self.diagnostics(ctx, &trace);
436
437 // Break-glass: capture the explanation when a `ring_buffer`/`capture`
438 // directive selected this request (`docs/05` §5). Off by default, so this
439 // stays empty until an operator flips it on. The doc is built once and both
440 // retained in the local ring and pushed to the fleet diagnostic sink
441 // (keyed by `trace_id`) so it is reachable on any instance.
442 if diag.capture {
443 let doc = explain_json(ctx.request_id(), &trace);
444 if self.diagnostic_sink.enabled() {
445 self.diagnostic_sink.emit(doc.clone());
446 }
447 self.break_glass.capture(doc);
448 }
449
450 // Export the span when an exporter is active AND the diagnostics level for
451 // this request reaches at least `Shape`, so directives can restrict export
452 // to a targeted, sampled subset (`docs/05` §3). Background, best-effort.
453 if exporting && diag.level >= DiagLevel::Shape {
454 let end_nanos = self.clock.unix_nanos();
455 if let Some(payload) = resource_spans(
456 &self.service_name,
457 ctx.request_id(),
458 &trace,
459 start_nanos,
460 end_nanos,
461 ) {
462 self.exporter.export(payload);
463 }
464 }
465 (result, diag.traffic_capture)
466 }
467
468 /// The diagnostics decision for a finished request: the baseline level raised
469 /// by any directive that targets it (by tenant/index/principal/endpoint),
470 /// plus whether any applying directive wants break-glass capture. Evaluated at
471 /// the current time so expiry/sampling apply; the directive store is polled
472 /// fresh and the signed header verified exactly once.
473 fn diagnostics(&self, ctx: &RequestCtx<'_>, trace: &RequestTrace) -> Diagnostics {
474 let attrs = RequestAttrs {
475 tenant: trace.resolved_partition(),
476 index: ctx.logical_index(),
477 principal: ctx.principal_id(),
478 endpoint: ctx.endpoint(),
479 };
480 let now = self.clock.now();
481 let request = ctx.request_id();
482 // Poll the fleet directive store fresh (a cheap Arc clone of the current
483 // snapshot) so a published flip takes effect without a restart.
484 let snapshot = self.directive_store.load();
485 let mut level = self.baseline.max(snapshot.evaluate(&attrs, now, request));
486 let mut capture = snapshot.wants_ring_buffer(&attrs, now, request);
487 // Traffic capture is on when the deployment baseline says always-on or any
488 // published `capture` directive selects this request (capture on demand).
489 let mut traffic_capture =
490 self.baseline_capture || snapshot.wants_capture(&attrs, now, request);
491 // Fold in a verified single-request directive from the signed
492 // `X-Debug-Directive` header, if present and valid (`docs/05` §3).
493 if let Some(directive) = ctx
494 .headers()
495 .get("x-debug-directive")
496 .and_then(|h| self.verifier.verify(h))
497 {
498 if let Some(from_header) = directive.level_if_applies(&attrs, now, request) {
499 level = level.max(from_header);
500 capture |= directive.ring_buffer;
501 traffic_capture |= directive.capture;
502 }
503 }
504 Diagnostics {
505 level,
506 capture,
507 traffic_capture,
508 }
509 }
510
511 /// Whether the effective write mode for a request with these headers is sync,
512 /// the `X-Write-Mode` header if present and valid, else the deployment
513 /// baseline. Lets the transport decide to stream-demux a `_bulk` (sync only;
514 /// async fan-out keeps the buffered path) from the head alone (ADR-014 stage 4).
515 #[must_use]
516 pub fn is_sync_write(&self, headers: &[(String, String)]) -> bool {
517 let header = headers
518 .iter()
519 .find(|(k, _)| k.eq_ignore_ascii_case("x-write-mode"))
520 .map(|(_, v)| v.as_str());
521 self.resolve_write_mode(header) == crate::asyncwrite::WriteMode::Sync
522 }
523
524 /// Whether a request for `logical_index` is a tenant-agnostic passthrough that
525 /// can be **streamed** verbatim (ADR-014 stage 2). Body-free so the transport
526 /// can decide before buffering. `false` when no passthrough policy is set or
527 /// the index is not matched (the request then takes the buffered tenancy path).
528 #[must_use]
529 pub fn is_passthrough(&self, logical_index: &str) -> bool {
530 self.passthrough
531 .as_ref()
532 .is_some_and(|p| p.matches_index(logical_index))
533 }
534
535 /// Handles a verbatim passthrough request whose body is supplied as a
536 /// **stream** (ADR-014 stage 2): forward it to the passthrough cluster without
537 /// buffering. Mirrors [`handle_with_capture`](Self::handle_with_capture)'s
538 /// trace lifecycle (classify → dispatch → egress, recorded into the explain
539 /// store), minus the buffered-body diagnostics: traffic capture is never
540 /// available here because the body is not retained, so the returned flag is
541 /// always `false`.
542 pub async fn forward_streamed(
543 &self,
544 ctx: &RequestCtx<'_>,
545 body: ByteBody,
546 ) -> (Result<StreamingForward, RequestError>, bool) {
547 let mut trace = Self::begin_streamed_trace(ctx);
548 let result = match self.passthrough.as_ref() {
549 Some(policy) => self.forward_stream(ctx, policy, body, &mut trace).await,
550 // Only reachable if a caller streams a request `is_passthrough` rejects.
551 None => Err(RequestError::Spi(SpiError::UnsupportedEndpoint {
552 endpoint: ctx.endpoint(),
553 })),
554 };
555 // The response body is a live stream of unknown length, so egress records
556 // the status with zero bytes (the size is not known until it has flowed).
557 match &result {
558 Ok(f) => trace.record_egress(EgressInfo {
559 status: f.status,
560 response_bytes: 0,
561 }),
562 Err(err) => trace.record_error(error_context(err)),
563 }
564 self.explain.record(ctx.request_id().clone(), &trace);
565 (result, false)
566 }
567
568 /// Handles a `_search` whose response is streamed back through the hit
569 /// transform (ADR-014, final stage): the upstream body is never buffered, each
570 /// hit is shaped incrementally and every sibling (notably `aggregations`) is
571 /// forwarded verbatim. Same trace lifecycle as
572 /// [`forward_streamed`](Self::forward_streamed): the body length is unknown
573 /// until it flows, so egress records the status with zero bytes. The request
574 /// query body is small and already buffered in `ctx`; only the response
575 /// streams. Returns the result plus `false`, capture is never available on a
576 /// streamed path (and the caller only streams when capture is off).
577 pub async fn search_streamed(
578 &self,
579 ctx: &RequestCtx<'_>,
580 ) -> (Result<StreamSearch, RequestError>, bool) {
581 let mut trace = Self::begin_streamed_trace(ctx);
582 let result = self.run_search_stream(ctx, &mut trace).await;
583 match &result {
584 Ok(s) => trace.record_egress(EgressInfo {
585 status: s.status,
586 response_bytes: 0,
587 }),
588 Err(err) => trace.record_error(error_context(err)),
589 }
590 self.explain.record(ctx.request_id().clone(), &trace);
591 (result, false)
592 }
593
594 /// Handles a `_bulk` request whose body is supplied as a **stream** (ADR-014
595 /// stage 4): frame and demux the NDJSON incrementally so the whole batch is
596 /// never buffered. Same trace lifecycle as [`forward_streamed`](Self::forward_streamed)
597 /// (classify → egress, into the explain store); per-op outcomes live
598 /// positionally in the response body, as in the buffered bulk path. Sync write
599 /// mode only, the streaming decision is made by the caller; async fan-out
600 /// keeps the buffered path.
601 pub async fn handle_bulk_streamed(
602 &self,
603 ctx: &RequestCtx<'_>,
604 body: ByteBody,
605 ) -> (Result<PipelineResponse, RequestError>, bool) {
606 // Bulk records its outcome positionally in the response, not per-stage, so
607 // the trace passes straight from open to close with no mid-stage spans.
608 let trace = Self::begin_streamed_trace(ctx);
609 let up_trace = self.upstream_trace(ctx);
610 let result = crate::bulk::ingest_bulk_streamed(
611 &self.router,
612 &self.sink,
613 ctx,
614 body,
615 self.retry,
616 up_trace,
617 )
618 .await;
619 self.finish_streamed_trace(ctx, trace, result)
620 }
621
622 /// Opens the shape-only trace for a streamed request: context + classify, the
623 /// stages known before dispatch. Shared by the streamed forward and bulk paths.
624 fn begin_streamed_trace(ctx: &RequestCtx<'_>) -> RequestTrace {
625 let mut trace = RequestTrace::new();
626 trace.record_context(crate::endpoints::wire_trace(ctx));
627 trace.record_classify(ClassifyInfo {
628 endpoint: ctx.endpoint(),
629 logical_index: logical_index(ctx.logical_index()),
630 });
631 trace
632 }
633
634 /// Closes a streamed request's trace (egress or error) and records it into the
635 /// explain store. Returns the result plus `false`, traffic capture is never
636 /// available on a streamed path (the body is not retained to tee).
637 fn finish_streamed_trace(
638 &self,
639 ctx: &RequestCtx<'_>,
640 mut trace: RequestTrace,
641 result: Result<PipelineResponse, RequestError>,
642 ) -> (Result<PipelineResponse, RequestError>, bool) {
643 match &result {
644 Ok(resp) => trace.record_egress(EgressInfo {
645 status: resp.status,
646 response_bytes: resp.body.len(),
647 }),
648 Err(err) => trace.record_error(error_context(err)),
649 }
650 self.explain.record(ctx.request_id().clone(), &trace);
651 (result, false)
652 }
653
654 /// Dispatches on endpoint class, recording the per-stage spans into `trace`.
655 async fn dispatch(
656 &self,
657 ctx: &RequestCtx<'_>,
658 trace: &mut RequestTrace,
659 ) -> Result<PipelineResponse, RequestError> {
660 // Tenant-agnostic passthrough short-circuits tenancy dispatch for the
661 // requests it matches (by logical index); unmatched requests fall through
662 // to tenancy below (fail-closed).
663 if let Some(policy) = self.passthrough.as_ref().filter(|p| p.matches(ctx)) {
664 return self.forward(ctx, policy, trace).await;
665 }
666 match ctx.endpoint() {
667 EndpointKind::IngestDoc => self.ingest_doc(ctx, trace).await,
668 EndpointKind::IngestBulk => self.ingest_bulk(ctx, trace).await,
669 EndpointKind::GetById => self.get_by_id(ctx, trace).await,
670 EndpointKind::MultiGet => self.multi_get(ctx, trace).await,
671 EndpointKind::DeleteById => self.delete_by_id(ctx, trace).await,
672 EndpointKind::DeleteByQuery => self.delete_by_query(ctx, trace).await,
673 EndpointKind::Search => self.search(ctx, trace).await,
674 EndpointKind::MultiSearch => self.multi_search(ctx, trace).await,
675 EndpointKind::Count => self.count(ctx, trace).await,
676 EndpointKind::Cursor => self.cursor(ctx, trace).await,
677 EndpointKind::Admin => self.admin(ctx, trace).await,
678 other => Err(RequestError::Spi(SpiError::UnsupportedEndpoint {
679 endpoint: other,
680 })),
681 }
682 }
683}
684
685#[cfg(test)]
686#[path = "pipeline_tests.rs"]
687mod tests;