Skip to main content

TransportReceiver

Trait TransportReceiver 

Source
pub trait TransportReceiver: TransportBase {
    type Token: CommitToken;

    // Required methods
    fn recv(
        &self,
        max: usize,
    ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send;
    fn commit(
        &self,
        tokens: &[Self::Token],
    ) -> impl Future<Output = TransportResult<()>> + Send;

    // Provided method
    fn recv_limited(
        &self,
        limits: RecvLimits,
    ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send { ... }
}
Available on crate feature transport only.
Expand description

Receive-side transport – generic over commit token type.

Input stages (receiver, fetcher) use concrete implementations directly for type-safe token handling.

Required Associated Types§

Source

type Token: CommitToken

The token type for this transport.

Required Methods§

Source

fn recv( &self, max: usize, ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send

Receive up to max records as one WorkBatch.

Returns immediately with available records (may be fewer than max). Returns an empty batch if no records are available. The source acks for the whole block live on WorkBatch::commit_tokens – they are decoupled from records.len() so a downstream fan-out cannot disturb them.

Filter behaviour: if the transport has inbound filters configured, recv() removes records matching action: drop filters and carries records matching action: dlq filters in WorkBatch.dlq_entries alongside the passing WorkBatch.records. Route the DLQ entries via your own DLQ handle – they cannot be silently lost.

§Example
let batch = transport.recv(100).await?;
for entry in batch.dlq_entries {
    dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
}
for record in batch.records { /* process */ }
§Cancel-safety (REQUIRED of implementors)

The governed run loop polls recv() inside a tokio::select! and DROPS the future when shutdown or a ticker wins, so it must not leave records half-consumed at an .await – either gather records synchronously (no .await between taking a record off the wire and returning it) or buffer internally. The in-tree Kafka (synchronous poll) and memory (awaits only on an empty buffer) impls satisfy this; a custom impl that holds records across an .await will drop data on cancellation.

Source

fn commit( &self, tokens: &[Self::Token], ) -> impl Future<Output = TransportResult<()>> + Send

Commit/acknowledge processed messages.

  • Kafka: commits consumer offsets
  • gRPC: no-op (no persistence)
  • Redis: XACK
  • File: advances read position
  • Memory: advances internal sequence

Provided Methods§

Source

fn recv_limited( &self, limits: RecvLimits, ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send

Byte-aware receive: bound a single poll by BOTH a record cap and a payload-byte cap (see RecvLimits).

The governed driver uses this so the self-regulation byte budget bounds RECEIVE memory, not just the post-recv sub-block lease: a poll retains at most limits.max_bytes of payload (plus one oversized record), so the inbound footprint is bounded BEFORE the sub-block split, never after.

Default impl: falls back to recv(limits.max_records) – record-bounded only, byte cap ignored. Only transports that buffer a whole poll’s bytes in one allocation (Kafka’s recv-arena) override it. Channel/stream transports (Memory, gRPC, …) already retain only one record’s bytes at a time, so the fallback is correct for them.

Filter behaviour and the commit_tokens contract match recv.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety".

Implementors§