pub trait Consumer:
Sized
+ Send
+ Sync {
type Error: Error;
type Message<'a>: Message
where Self: 'a;
type NextFuture<'a>: Future<Output = Result<Self::Message<'a>, StreamErr<Self::Error>>>
where Self: 'a;
type Stream<'a>: Stream<Item = Result<Self::Message<'a>, StreamErr<Self::Error>>>
where Self: 'a;
// Required methods
fn seek(
&mut self,
to: OffsetDateTime,
) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send;
fn rewind(
&mut self,
offset: SeqPos,
) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send;
fn assign(
&mut self,
ss: (StreamKey, ShardId),
) -> Result<(), StreamErr<Self::Error>>;
fn unassign(
&mut self,
ss: (StreamKey, ShardId),
) -> Result<(), StreamErr<Self::Error>>;
fn next(&self) -> Self::NextFuture<'_>;
fn stream<'a, 'b>(&'b mut self) -> Self::Stream<'a>
where 'b: 'a;
}Expand description
Common interface of consumers, to be implemented by all backends.
Required Associated Typesยง
type Error: Error
type Message<'a>: Message where Self: 'a
type NextFuture<'a>: Future<Output = Result<Self::Message<'a>, StreamErr<Self::Error>>> where Self: 'a
type Stream<'a>: Stream<Item = Result<Self::Message<'a>, StreamErr<Self::Error>>> where Self: 'a
Required Methodsยง
Sourcefn seek(
&mut self,
to: OffsetDateTime,
) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send
fn seek( &mut self, to: OffsetDateTime, ) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send
Seek all streams to an arbitrary point in time. It will start consuming from the earliest message
with a timestamp later than to.
If the consumer is not already assigned, shard ZERO will be used.
Sourcefn rewind(
&mut self,
offset: SeqPos,
) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send
fn rewind( &mut self, offset: SeqPos, ) -> impl Future<Output = Result<(), StreamErr<Self::Error>>> + Send
Rewind all streams to a particular sequence number.
If the consumer is not already assigned, shard ZERO will be used.
Sourcefn assign(
&mut self,
ss: (StreamKey, ShardId),
) -> Result<(), StreamErr<Self::Error>>
fn assign( &mut self, ss: (StreamKey, ShardId), ) -> Result<(), StreamErr<Self::Error>>
Assign this consumer to a particular shard. Can be called multiple times to assign
to multiple shards. Returns error StreamKeyNotFound if the stream is not currently subscribed.
It will only take effect on the next Consumer::seek or Consumer::rewind.
Sourcefn unassign(
&mut self,
ss: (StreamKey, ShardId),
) -> Result<(), StreamErr<Self::Error>>
fn unassign( &mut self, ss: (StreamKey, ShardId), ) -> Result<(), StreamErr<Self::Error>>
Unassign a shard. Returns error StreamKeyNotFound if the stream is not currently subscribed.
Returns error StreamKeyEmpty if all streams have been unassigned.
Sourcefn next(&self) -> Self::NextFuture<'_>
fn next(&self) -> Self::NextFuture<'_>
Poll and receive one message: it awaits until there are new messages. This method can be called from multiple threads.
Dyn Compatibilityยง
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".
Implementations on Foreign Typesยง
Sourceยงimpl Consumer for KafkaConsumer
impl Consumer for KafkaConsumer
Sourceยงasync fn seek(
&mut self,
timestamp: OffsetDateTime,
) -> Result<(), StreamErr<KafkaError>>
async fn seek( &mut self, timestamp: OffsetDateTime, ) -> Result<(), StreamErr<KafkaError>>
Seek all streams to the given point in time, with all partitions assigned.
ยงWarning
This async method is not cancel safe. You must await this future, and this Consumer will be unusable for any operations until it finishes.
Sourceยงasync fn rewind(&mut self, offset: SeqPos) -> Result<(), StreamErr<KafkaError>>
async fn rewind(&mut self, offset: SeqPos) -> Result<(), StreamErr<KafkaError>>
Rewind all streams across all assigned partitions.
Call Consumer::assign to assign a partition beforehand,
or KafkaConsumer::reassign_partitions to assign all partitions.
Sourceยงfn stream<'a, 'b>(&'b mut self) -> <KafkaConsumer as Consumer>::Stream<'a>where
'b: 'a,
fn stream<'a, 'b>(&'b mut self) -> <KafkaConsumer as Consumer>::Stream<'a>where
'b: 'a,
Note: Kafka stream never ends.
type Error = KafkaError
type Message<'a> = KafkaMessage<'a>
type NextFuture<'a> = Map<StreamFuture<MessageStream<'a, DefaultConsumerContext>>, fn((Option<Result<BorrowedMessage<'a>, KafkaError>>, MessageStream<'a, DefaultConsumerContext>)) -> Result<KafkaMessage<'a>, StreamErr<KafkaError>>>
type Stream<'a> = Map<MessageStream<'a, DefaultConsumerContext>, fn(Result<BorrowedMessage<'a>, KafkaError>) -> Result<KafkaMessage<'a>, StreamErr<KafkaError>>>
fn assign( &mut self, _: (StreamKey, ShardId), ) -> Result<(), StreamErr<KafkaError>>
fn unassign( &mut self, s: (StreamKey, ShardId), ) -> Result<(), StreamErr<KafkaError>>
fn next(&self) -> <KafkaConsumer as Consumer>::NextFuture<'_>
Sourceยงimpl Consumer for StdioConsumer
impl Consumer for StdioConsumer
Sourceยงfn assign(&mut self, _: (StreamKey, ShardId)) -> Result<(), StreamErr<StdioErr>>
fn assign(&mut self, _: (StreamKey, ShardId)) -> Result<(), StreamErr<StdioErr>>
Always succeed if the stream exists. There is only shard ZERO anyway.
Sourceยงfn unassign(
&mut self,
_: (StreamKey, ShardId),
) -> Result<(), StreamErr<StdioErr>>
fn unassign( &mut self, _: (StreamKey, ShardId), ) -> Result<(), StreamErr<StdioErr>>
Always fail. There is only shard ZERO anyway.