Skip to main content

TransportReceiver

Trait TransportReceiver 

Source
pub trait TransportReceiver: TransportBase {
    type Token: CommitToken;

    // Required methods
    fn recv(
        &self,
        max: usize,
    ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send;
    fn commit(
        &self,
        tokens: &[Self::Token],
    ) -> impl Future<Output = TransportResult<()>> + Send;

    // Provided method
    fn recv_limited(
        &self,
        limits: RecvLimits,
    ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send { ... }
}
Available on crate feature transport only.
Expand description

Receive-side transport – generic over commit token type.

Extends TransportBase with receive and commit capability. Input stages (receiver, fetcher) use concrete implementations directly for type-safe token handling.

Required Associated Types§

Source

type Token: CommitToken

The token type for this transport.

Required Methods§

Source

fn recv( &self, max: usize, ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send

Receive up to max records as one WorkBatch.

Returns immediately with available records (may be fewer than max). Returns an empty batch if no records are available. The source acks for the whole block live on WorkBatch::commit_tokens – they are decoupled from records.len() so a downstream fan-out cannot disturb them.

Filter behaviour: if the transport has inbound filters configured, recv() removes records matching action: drop filters and carries records matching action: dlq filters in WorkBatch.dlq_entries alongside the passing WorkBatch.records. Route the DLQ entries via your own DLQ handle – they cannot be silently lost.

§Example
let batch = transport.recv(100).await?;
for entry in batch.dlq_entries {
    dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
}
for record in batch.records { /* process */ }
Source

fn commit( &self, tokens: &[Self::Token], ) -> impl Future<Output = TransportResult<()>> + Send

Commit/acknowledge processed messages.

  • Kafka: commits consumer offsets
  • gRPC: no-op (no persistence)
  • Redis: XACK
  • File: advances read position
  • Memory: advances internal sequence

Provided Methods§

Source

fn recv_limited( &self, limits: RecvLimits, ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send

Byte-aware receive: bound a single poll by BOTH a record cap and a payload-byte cap (see RecvLimits).

This is the receive limit the governed driver uses so the self-regulation byte budget actually constrains RECEIVE memory – not just the post-recv sub-block lease. A single poll retains at most limits.max_bytes of payload (plus one oversized record under the floor), so the in-flight inbound footprint is bounded BEFORE the sub-block split, never after.

Default impl: falls back to recv(limits.max_records) – record-bounded only, byte cap ignored. This keeps every transport green with no churn; only transports that buffer a whole poll’s bytes in one allocation (Kafka’s recv-arena) override it to honour max_bytes. Channel/stream transports (Memory, gRPC, …) already retain only one record’s bytes at a time, so the record-bounded fallback is correct for them.

Filter behaviour and the commit_tokens contract are identical to recv.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§