pub trait TransportReceiver: TransportBase {
type Token: CommitToken;
// Required methods
fn recv(
&self,
max: usize,
) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send;
fn commit(
&self,
tokens: &[Self::Token],
) -> impl Future<Output = TransportResult<()>> + Send;
// Provided method
fn recv_limited(
&self,
limits: RecvLimits,
) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send { ... }
}transport only.Expand description
Receive-side transport – generic over commit token type.
Input stages (receiver, fetcher) use concrete implementations directly for type-safe token handling.
Required Associated Types§
Sourcetype Token: CommitToken
type Token: CommitToken
The token type for this transport.
Required Methods§
Sourcefn recv(
&self,
max: usize,
) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send
fn recv( &self, max: usize, ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send
Receive up to max records as one WorkBatch.
Returns immediately with available records (may be fewer than max).
Returns an empty batch if no records are available. The source acks for
the whole block live on WorkBatch::commit_tokens – they are decoupled
from records.len() so a downstream fan-out cannot disturb them.
Filter behaviour: if the transport has inbound filters configured,
recv() removes records matching action: drop filters and carries
records matching action: dlq filters in WorkBatch.dlq_entries
alongside the passing WorkBatch.records. Route the DLQ entries via
your own DLQ handle – they cannot be silently lost.
§Example
let batch = transport.recv(100).await?;
for entry in batch.dlq_entries {
dlq.send(DlqEntry::new("filter", entry.reason, entry.payload)).await?;
}
for record in batch.records { /* process */ }§Cancel-safety (REQUIRED of implementors)
The governed run loop polls recv() inside a tokio::select! and DROPS
the future when shutdown or a ticker wins, so it must not leave records
half-consumed at an .await – either gather records synchronously (no
.await between taking a record off the wire and returning it) or buffer
internally. The in-tree Kafka (synchronous poll) and memory (awaits only
on an empty buffer) impls satisfy this; a custom impl that holds records
across an .await will drop data on cancellation.
Provided Methods§
Sourcefn recv_limited(
&self,
limits: RecvLimits,
) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send
fn recv_limited( &self, limits: RecvLimits, ) -> impl Future<Output = TransportResult<WorkBatch<Self::Token>>> + Send
Byte-aware receive: bound a single poll by BOTH a record cap and a
payload-byte cap (see RecvLimits).
The governed driver uses this so the self-regulation byte budget bounds
RECEIVE memory, not just the post-recv sub-block lease: a poll retains at
most limits.max_bytes of payload (plus one oversized record), so the
inbound footprint is bounded BEFORE the sub-block split, never after.
Default impl: falls back to recv(limits.max_records)
– record-bounded only, byte cap ignored. Only transports that buffer a
whole poll’s bytes in one allocation (Kafka’s recv-arena) override it.
Channel/stream transports (Memory, gRPC, …) already retain only one
record’s bytes at a time, so the fallback is correct for them.
Filter behaviour and the commit_tokens contract match
recv.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".