pub struct Pipeline<R, S> { /* private fields */ }Expand description
Implementations§
Source§impl<R: Router, S: Sink + Reader> Pipeline<R, S>
impl<R: Router, S: Sink + Reader> Pipeline<R, S>
Sourcepub fn new(router: R, sink: S) -> Self
pub fn new(router: R, sink: S) -> Self
Builds a pipeline from a router and a sink (default backend-retry policy, no span export).
Sourcepub fn with_delete_by_query_expansion(self, on: bool) -> Self
pub fn with_delete_by_query_expansion(self, on: bool) -> Self
Enables the _delete_by_query async expansion (builder style). Without it,
DBQ is rejected even in async mode (docs/04 §9).
Sourcepub fn with_baseline_write_mode(self, mode: WriteMode) -> Self
pub fn with_baseline_write_mode(self, mode: WriteMode) -> Self
Sets the baseline write mode applied when a request does not carry an
X-Write-Mode header (builder style). Default crate::WriteMode::Sync; set
crate::WriteMode::Async to make durable fan-out the deployment default
(docs/04 §9).
Sourcepub fn with_write_queue(self, queue: Arc<dyn WriteQueue>) -> Self
pub fn with_write_queue(self, queue: Arc<dyn WriteQueue>) -> Self
Sets the durable queue async writes are enqueued onto (builder style).
Without it, async requests are refused with 422 rather than dropped.
Sourcepub fn with_passthrough(self, policy: PassthroughPolicy) -> Self
pub fn with_passthrough(self, policy: PassthroughPolicy) -> Self
Enables tenant-agnostic passthrough: every request is forwarded verbatim to
policy’s cluster with no tenancy rewrite. Use this for a transparent or
capture/migration proxy. Without it, the pipeline routes by tenancy (the
default).
Sourcepub fn with_admin_passthrough(self, policy: AdminPolicy) -> Self
pub fn with_admin_passthrough(self, policy: AdminPolicy) -> Self
Enables opt-in admin pass-through (docs/03 §6): allow-listed
_cat/_cluster/_nodes requests are forwarded verbatim to policy’s
cluster. Without this, every admin request is rejected (the default).
Sourcepub fn with_cursor_signer(self, signer: Arc<dyn CursorSigner>) -> Self
pub fn with_cursor_signer(self, signer: Arc<dyn CursorSigner>) -> Self
Enables opt-in scroll/PIT cursor affinity (docs/03 §6) with signer
signing the cluster↔cursor envelope. Without this, cursor requests fail
closed (CursorUnresolvable) rather than route to an unknown cluster.
Sourcepub fn with_retry_policy(self, retry: RetryPolicy) -> Self
pub fn with_retry_policy(self, retry: RetryPolicy) -> Self
Sets the placement-backend retry policy (builder style).
Sourcepub fn with_exporter(self, exporter: Arc<dyn SpanExporter>) -> Self
pub fn with_exporter(self, exporter: Arc<dyn SpanExporter>) -> Self
Sets the OTLP span exporter (builder style). Default is no export.
Sourcepub fn with_clock(self, clock: Arc<dyn Clock>) -> Self
pub fn with_clock(self, clock: Arc<dyn Clock>) -> Self
Swaps the clock used to stamp span timestamps (tests inject a ManualClock).
Sourcepub fn with_service_name(self, service_name: impl Into<String>) -> Self
pub fn with_service_name(self, service_name: impl Into<String>) -> Self
Sets the service.name reported on exported spans (builder style).
Sourcepub fn with_baseline_level(self, baseline: DiagLevel) -> Self
pub fn with_baseline_level(self, baseline: DiagLevel) -> Self
Sets the baseline diagnostics level applied to every request before
directives (builder style). Default DiagLevel::Shape; set to
DiagLevel::Off to export only what a directive selects.
Sourcepub fn with_baseline_capture(self, on: bool) -> Self
pub fn with_baseline_capture(self, on: bool) -> Self
Sets whether traffic capture is on for every request before directives
(builder style). Default false (capture on demand via a published
directive); set true for an always-capture deployment.
Sourcepub fn with_directives(self, directives: Arc<DirectiveSet>) -> Self
pub fn with_directives(self, directives: Arc<DirectiveSet>) -> Self
Sets a fixed set of active diagnostics directives (builder style). For a
fleet-wide, restart-free source use Pipeline::with_directive_store.
Sourcepub fn with_directive_store(self, store: Arc<dyn DirectiveStore>) -> Self
pub fn with_directive_store(self, store: Arc<dyn DirectiveStore>) -> Self
Sets the fleet-wide directive store (builder style). The pipeline polls it
fresh per request, so a controller publishing a new set flips verbosity
across the fleet without a restart (docs/05 §3).
Sourcepub fn with_directive_verifier(
self,
verifier: Arc<dyn DirectiveVerifier>,
) -> Self
pub fn with_directive_verifier( self, verifier: Arc<dyn DirectiveVerifier>, ) -> Self
Sets the verifier for the signed X-Debug-Directive header (builder
style). Default rejects all headers; a real verifier enables the surgical,
single-request directive channel.
Sourcepub fn with_break_glass(self, break_glass: Arc<BreakGlassBuffer>) -> Self
pub fn with_break_glass(self, break_glass: Arc<BreakGlassBuffer>) -> Self
Shares the break-glass tape (builder style), so a debug endpoint can read the captured sequence and tests can inspect it.
Sourcepub fn with_diagnostic_sink(self, sink: Arc<dyn DiagnosticSink>) -> Self
pub fn with_diagnostic_sink(self, sink: Arc<dyn DiagnosticSink>) -> Self
Sets the fleet-coherent diagnostic sink (builder style): a directive-selected
capture is pushed here (keyed by trace_id) in addition to the local
break-glass ring, so an aggregator can serve it fleet-wide (docs/05 §5).
Sourcepub fn explain(&self, request_id: &RequestId) -> Option<Value>
pub fn explain(&self, request_id: &RequestId) -> Option<Value>
The assembled /debug/explain document for a past request, if retained.
Sourcepub fn break_glass(&self) -> &Arc<BreakGlassBuffer>
pub fn break_glass(&self) -> &Arc<BreakGlassBuffer>
The break-glass tape, the explanations captured while a ring_buffer
directive was in effect (docs/05 §5).
Sourcepub fn sink(&self) -> &S
pub fn sink(&self) -> &S
The underlying sink (e.g. to inspect what an in-memory sink recorded).
Sourcepub async fn handle(
&self,
ctx: &RequestCtx<'_>,
) -> Result<PipelineResponse, RequestError>
pub async fn handle( &self, ctx: &RequestCtx<'_>, ) -> Result<PipelineResponse, RequestError>
Handles an authenticated request, dispatching on its endpoint class.
Records a shape-only causal trace for every request (success or failure)
into the explain store, so /debug/explain/{id} can reconstruct it
(docs/05).
§Errors
Returns RequestError if the endpoint is unsupported in M1, routing
fails, the body transform fails, or the sink rejects the write.
Sourcepub async fn handle_with_capture(
&self,
ctx: &RequestCtx<'_>,
) -> (Result<PipelineResponse, RequestError>, bool)
pub async fn handle_with_capture( &self, ctx: &RequestCtx<'_>, ) -> (Result<PipelineResponse, RequestError>, bool)
Like Self::handle, but also returns whether this request should be teed
to the fleet traffic-capture sink, the live per-request capture decision,
applied by the ingress to both success and error responses.
Sourcepub fn is_sync_write(&self, headers: &[(String, String)]) -> bool
pub fn is_sync_write(&self, headers: &[(String, String)]) -> bool
Whether the effective write mode for a request with these headers is sync,
the X-Write-Mode header if present and valid, else the deployment
baseline. Lets the transport decide to stream-demux a _bulk (sync only;
async fan-out keeps the buffered path) from the head alone (ADR-014 stage 4).
Sourcepub fn is_passthrough(&self, logical_index: &str) -> bool
pub fn is_passthrough(&self, logical_index: &str) -> bool
Whether a request for logical_index is a tenant-agnostic passthrough that
can be streamed verbatim (ADR-014 stage 2). Body-free so the transport
can decide before buffering. false when no passthrough policy is set or
the index is not matched (the request then takes the buffered tenancy path).
Sourcepub async fn forward_streamed(
&self,
ctx: &RequestCtx<'_>,
body: ByteBody,
) -> (Result<StreamingForward, RequestError>, bool)
pub async fn forward_streamed( &self, ctx: &RequestCtx<'_>, body: ByteBody, ) -> (Result<StreamingForward, RequestError>, bool)
Handles a verbatim passthrough request whose body is supplied as a
stream (ADR-014 stage 2): forward it to the passthrough cluster without
buffering. Mirrors handle_with_capture’s
trace lifecycle (classify → dispatch → egress, recorded into the explain
store), minus the buffered-body diagnostics: traffic capture is never
available here because the body is not retained, so the returned flag is
always false.
Sourcepub async fn search_streamed(
&self,
ctx: &RequestCtx<'_>,
) -> (Result<StreamSearch, RequestError>, bool)
pub async fn search_streamed( &self, ctx: &RequestCtx<'_>, ) -> (Result<StreamSearch, RequestError>, bool)
Handles a _search whose response is streamed back through the hit
transform (ADR-014, final stage): the upstream body is never buffered, each
hit is shaped incrementally and every sibling (notably aggregations) is
forwarded verbatim. Same trace lifecycle as
forward_streamed: the body length is unknown
until it flows, so egress records the status with zero bytes. The request
query body is small and already buffered in ctx; only the response
streams. Returns the result plus false, capture is never available on a
streamed path (and the caller only streams when capture is off).
Sourcepub async fn handle_bulk_streamed(
&self,
ctx: &RequestCtx<'_>,
body: ByteBody,
) -> (Result<PipelineResponse, RequestError>, bool)
pub async fn handle_bulk_streamed( &self, ctx: &RequestCtx<'_>, body: ByteBody, ) -> (Result<PipelineResponse, RequestError>, bool)
Handles a _bulk request whose body is supplied as a stream (ADR-014
stage 4): frame and demux the NDJSON incrementally so the whole batch is
never buffered. Same trace lifecycle as forward_streamed
(classify → egress, into the explain store); per-op outcomes live
positionally in the response body, as in the buffered bulk path. Sync write
mode only, the streaming decision is made by the caller; async fan-out
keeps the buffered path.