Skip to main content

Pipeline

Struct Pipeline 

Source
pub struct Pipeline<R, S> { /* private fields */ }
Expand description

Orchestrates requests through a tenancy router and a sink.

Generic over the Router implementation and the Sink, so the hot path is monomorphized (no dyn dispatch), a deployment can supply its own router, and tests can swap in an in-memory sink.

Implementations§

Source§

impl<R: Router, S: Sink + Reader> Pipeline<R, S>

Source

pub fn new(router: R, sink: S) -> Self

Builds a pipeline from a router and a sink (default backend-retry policy, no span export).

Source

pub fn with_delete_by_query_expansion(self, on: bool) -> Self

Enables the _delete_by_query async expansion (builder style). Without it, DBQ is rejected even in async mode (docs/04 §9).

Source

pub fn with_baseline_write_mode(self, mode: WriteMode) -> Self

Sets the baseline write mode applied when a request does not carry an X-Write-Mode header (builder style). Default crate::WriteMode::Sync; set crate::WriteMode::Async to make durable fan-out the deployment default (docs/04 §9).

Source

pub fn with_write_queue(self, queue: Arc<dyn WriteQueue>) -> Self

Sets the durable queue async writes are enqueued onto (builder style). Without it, async requests are refused with 422 rather than dropped.

Source

pub fn with_passthrough(self, policy: PassthroughPolicy) -> Self

Enables tenant-agnostic passthrough: every request is forwarded verbatim to policy’s cluster with no tenancy rewrite. Use this for a transparent or capture/migration proxy. Without it, the pipeline routes by tenancy (the default).

Source

pub fn with_admin_passthrough(self, policy: AdminPolicy) -> Self

Enables opt-in admin pass-through (docs/03 §6): allow-listed _cat/_cluster/_nodes requests are forwarded verbatim to policy’s cluster. Without this, every admin request is rejected (the default).

Source

pub fn with_cursor_signer(self, signer: Arc<dyn CursorSigner>) -> Self

Enables opt-in scroll/PIT cursor affinity (docs/03 §6) with signer signing the cluster↔cursor envelope. Without this, cursor requests fail closed (CursorUnresolvable) rather than route to an unknown cluster.

Source

pub fn with_retry_policy(self, retry: RetryPolicy) -> Self

Sets the placement-backend retry policy (builder style).

Source

pub fn with_exporter(self, exporter: Arc<dyn SpanExporter>) -> Self

Sets the OTLP span exporter (builder style). Default is no export.

Source

pub fn with_clock(self, clock: Arc<dyn Clock>) -> Self

Swaps the clock used to stamp span timestamps (tests inject a ManualClock).

Source

pub fn with_service_name(self, service_name: impl Into<String>) -> Self

Sets the service.name reported on exported spans (builder style).

Source

pub fn with_baseline_level(self, baseline: DiagLevel) -> Self

Sets the baseline diagnostics level applied to every request before directives (builder style). Default DiagLevel::Shape; set to DiagLevel::Off to export only what a directive selects.

Source

pub fn with_baseline_capture(self, on: bool) -> Self

Sets whether traffic capture is on for every request before directives (builder style). Default false (capture on demand via a published directive); set true for an always-capture deployment.

Source

pub fn with_directives(self, directives: Arc<DirectiveSet>) -> Self

Sets a fixed set of active diagnostics directives (builder style). For a fleet-wide, restart-free source use Pipeline::with_directive_store.

Source

pub fn with_directive_store(self, store: Arc<dyn DirectiveStore>) -> Self

Sets the fleet-wide directive store (builder style). The pipeline polls it fresh per request, so a controller publishing a new set flips verbosity across the fleet without a restart (docs/05 §3).

Source

pub fn with_directive_verifier( self, verifier: Arc<dyn DirectiveVerifier>, ) -> Self

Sets the verifier for the signed X-Debug-Directive header (builder style). Default rejects all headers; a real verifier enables the surgical, single-request directive channel.

Source

pub fn with_break_glass(self, break_glass: Arc<BreakGlassBuffer>) -> Self

Shares the break-glass tape (builder style), so a debug endpoint can read the captured sequence and tests can inspect it.

Source

pub fn with_diagnostic_sink(self, sink: Arc<dyn DiagnosticSink>) -> Self

Sets the fleet-coherent diagnostic sink (builder style): a directive-selected capture is pushed here (keyed by trace_id) in addition to the local break-glass ring, so an aggregator can serve it fleet-wide (docs/05 §5).

Source

pub fn explain(&self, request_id: &RequestId) -> Option<Value>

The assembled /debug/explain document for a past request, if retained.

Source

pub fn break_glass(&self) -> &Arc<BreakGlassBuffer>

The break-glass tape, the explanations captured while a ring_buffer directive was in effect (docs/05 §5).

Source

pub fn sink(&self) -> &S

The underlying sink (e.g. to inspect what an in-memory sink recorded).

Source

pub async fn handle( &self, ctx: &RequestCtx<'_>, ) -> Result<PipelineResponse, RequestError>

Handles an authenticated request, dispatching on its endpoint class.

Records a shape-only causal trace for every request (success or failure) into the explain store, so /debug/explain/{id} can reconstruct it (docs/05).

§Errors

Returns RequestError if the endpoint is unsupported in M1, routing fails, the body transform fails, or the sink rejects the write.

Source

pub async fn handle_with_capture( &self, ctx: &RequestCtx<'_>, ) -> (Result<PipelineResponse, RequestError>, bool)

Like Self::handle, but also returns whether this request should be teed to the fleet traffic-capture sink, the live per-request capture decision, applied by the ingress to both success and error responses.

Source

pub fn is_sync_write(&self, headers: &[(String, String)]) -> bool

Whether the effective write mode for a request with these headers is sync, the X-Write-Mode header if present and valid, else the deployment baseline. Lets the transport decide to stream-demux a _bulk (sync only; async fan-out keeps the buffered path) from the head alone (ADR-014 stage 4).

Source

pub fn is_passthrough(&self, logical_index: &str) -> bool

Whether a request for logical_index is a tenant-agnostic passthrough that can be streamed verbatim (ADR-014 stage 2). Body-free so the transport can decide before buffering. false when no passthrough policy is set or the index is not matched (the request then takes the buffered tenancy path).

Source

pub async fn forward_streamed( &self, ctx: &RequestCtx<'_>, body: ByteBody, ) -> (Result<StreamingForward, RequestError>, bool)

Handles a verbatim passthrough request whose body is supplied as a stream (ADR-014 stage 2): forward it to the passthrough cluster without buffering. Mirrors handle_with_capture’s trace lifecycle (classify → dispatch → egress, recorded into the explain store), minus the buffered-body diagnostics: traffic capture is never available here because the body is not retained, so the returned flag is always false.

Source

pub async fn search_streamed( &self, ctx: &RequestCtx<'_>, ) -> (Result<StreamSearch, RequestError>, bool)

Handles a _search whose response is streamed back through the hit transform (ADR-014, final stage): the upstream body is never buffered, each hit is shaped incrementally and every sibling (notably aggregations) is forwarded verbatim. Same trace lifecycle as forward_streamed: the body length is unknown until it flows, so egress records the status with zero bytes. The request query body is small and already buffered in ctx; only the response streams. Returns the result plus false, capture is never available on a streamed path (and the caller only streams when capture is off).

Source

pub async fn handle_bulk_streamed( &self, ctx: &RequestCtx<'_>, body: ByteBody, ) -> (Result<PipelineResponse, RequestError>, bool)

Handles a _bulk request whose body is supplied as a stream (ADR-014 stage 4): frame and demux the NDJSON incrementally so the whole batch is never buffered. Same trace lifecycle as forward_streamed (classify → egress, into the explain store); per-op outcomes live positionally in the response body, as in the buffered bulk path. Sync write mode only, the streaming decision is made by the caller; async fan-out keeps the buffered path.

Trait Implementations§

Source§

impl<R, S> Debug for Pipeline<R, S>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<R, S> !RefUnwindSafe for Pipeline<R, S>

§

impl<R, S> !UnwindSafe for Pipeline<R, S>

§

impl<R, S> Freeze for Pipeline<R, S>
where R: Freeze, S: Freeze,

§

impl<R, S> Send for Pipeline<R, S>
where R: Send, S: Send,

§

impl<R, S> Sync for Pipeline<R, S>
where R: Sync, S: Sync,

§

impl<R, S> Unpin for Pipeline<R, S>
where R: Unpin, S: Unpin,

§

impl<R, S> UnsafeUnpin for Pipeline<R, S>
where R: UnsafeUnpin, S: UnsafeUnpin,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more