Trait sea_streamer::Consumer

source ·
pub trait Consumer: Sized + Send + Sync {
    type Error: Error;
    type Message<'a>: Message
       where Self: 'a;
    type NextFuture<'a>: Future<Output = Result<Self::Message<'a>, StreamErr<Self::Error>>>
       where Self: 'a;
    type Stream<'a>: Stream<Item = Result<Self::Message<'a>, StreamErr<Self::Error>>>
       where Self: 'a;

    // Required methods
    fn seek<'life0, 'async_trait>(
        &'life0 mut self,
        to: OffsetDateTime
    ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<Self::Error>>> + Send + 'async_trait, Global>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn rewind<'life0, 'async_trait>(
        &'life0 mut self,
        offset: SeqPos
    ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<Self::Error>>> + Send + 'async_trait, Global>>
       where 'life0: 'async_trait,
             Self: 'async_trait;
    fn assign(
        &mut self,
        ss: (StreamKey, ShardId)
    ) -> Result<(), StreamErr<Self::Error>>;
    fn unassign(
        &mut self,
        ss: (StreamKey, ShardId)
    ) -> Result<(), StreamErr<Self::Error>>;
    fn next(&self) -> Self::NextFuture<'_>;
    fn stream<'a, 'b>(&'b mut self) -> Self::Stream<'a>
       where 'b: 'a;
}
Expand description

Common interface of consumers, to be implemented by all backends.

Required Associated Types§

source

type Error: Error

source

type Message<'a>: Message where Self: 'a

source

type NextFuture<'a>: Future<Output = Result<Self::Message<'a>, StreamErr<Self::Error>>> where Self: 'a

source

type Stream<'a>: Stream<Item = Result<Self::Message<'a>, StreamErr<Self::Error>>> where Self: 'a

Required Methods§

source

fn seek<'life0, 'async_trait>( &'life0 mut self, to: OffsetDateTime ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<Self::Error>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, Self: 'async_trait,

Seek all streams to an arbitrary point in time. It will start consuming from the earliest message with a timestamp later than to.

If the consumer is not already assigned, shard ZERO will be used.

source

fn rewind<'life0, 'async_trait>( &'life0 mut self, offset: SeqPos ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<Self::Error>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, Self: 'async_trait,

Rewind all streams to a particular sequence number.

If the consumer is not already assigned, shard ZERO will be used.

source

fn assign( &mut self, ss: (StreamKey, ShardId) ) -> Result<(), StreamErr<Self::Error>>

Assign this consumer to a particular shard. Can be called multiple times to assign to multiple shards. Returns error StreamKeyNotFound if the stream is not currently subscribed.

It will only take effect on the next Consumer::seek or Consumer::rewind.

source

fn unassign( &mut self, ss: (StreamKey, ShardId) ) -> Result<(), StreamErr<Self::Error>>

Unassign a shard. Returns error StreamKeyNotFound if the stream is not currently subscribed. Returns error StreamKeyEmpty if all streams have been unassigned.

source

fn next(&self) -> Self::NextFuture<'_>

Poll and receive one message: it awaits until there are new messages. This method can be called from multiple threads.

source

fn stream<'a, 'b>(&'b mut self) -> Self::Stream<'a>where 'b: 'a,

Returns an async stream. You cannot create multiple streams from the same consumer, nor perform any operation while streaming.

Implementations on Foreign Types§

source§

impl Consumer for KafkaConsumer

source§

fn seek<'life0, 'async_trait>( &'life0 mut self, timestamp: OffsetDateTime ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<KafkaError>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, KafkaConsumer: 'async_trait,

Seek all streams to the given point in time, with all partitions assigned.

Warning

This async method is not cancel safe. You must await this future, and this Consumer will be unusable for any operations until it finishes.

source§

fn rewind<'life0, 'async_trait>( &'life0 mut self, offset: SeqPos ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<KafkaError>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, KafkaConsumer: 'async_trait,

Rewind all streams across all assigned partitions. Call [Consumer::assign] to assign a partition beforehand, or KafkaConsumer::reassign_partitions to assign all partitions.

source§

fn stream<'a, 'b>(&'b mut self) -> <KafkaConsumer as Consumer>::Stream<'a>where 'b: 'a,

Note: Kafka stream never ends.

§

type Error = KafkaError

§

type Message<'a> = KafkaMessage<'a>

§

type NextFuture<'a> = Map<StreamFuture<MessageStream<'a>>, fn(_: (Option<Result<BorrowedMessage<'a>, KafkaError>>, MessageStream<'a>)) -> Result<KafkaMessage<'a>, StreamErr<KafkaError>>>

§

type Stream<'a> = Map<MessageStream<'a>, fn(_: Result<BorrowedMessage<'a>, KafkaError>) -> Result<KafkaMessage<'a>, StreamErr<KafkaError>>>

source§

fn assign( &mut self, _: (StreamKey, ShardId) ) -> Result<(), StreamErr<KafkaError>>

source§

fn unassign( &mut self, s: (StreamKey, ShardId) ) -> Result<(), StreamErr<KafkaError>>

source§

fn next(&self) -> <KafkaConsumer as Consumer>::NextFuture<'_>

source§

impl Consumer for StdioConsumer

source§

fn assign(&mut self, _: (StreamKey, ShardId)) -> Result<(), StreamErr<StdioErr>>

Always succeed if the stream exists. There is only shard ZERO anyway.

source§

fn unassign( &mut self, _: (StreamKey, ShardId) ) -> Result<(), StreamErr<StdioErr>>

Always fail. There is only shard ZERO anyway.

§

type Error = StdioErr

§

type Message<'a> = SharedMessage

§

type NextFuture<'a> = MapErr<RecvFut<'a, SharedMessage>, fn(_: RecvError) -> StreamErr<StdioErr>>

§

type Stream<'a> = Map<RecvStream<'a, SharedMessage>, fn(_: SharedMessage) -> Result<SharedMessage, StreamErr<StdioErr>>>

source§

fn seek<'life0, 'async_trait>( &'life0 mut self, __arg1: OffsetDateTime ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<StdioErr>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, StdioConsumer: 'async_trait,

source§

fn rewind<'life0, 'async_trait>( &'life0 mut self, __arg1: SeqPos ) -> Pin<Box<dyn Future<Output = Result<(), StreamErr<StdioErr>>> + Send + 'async_trait, Global>>where 'life0: 'async_trait, StdioConsumer: 'async_trait,

source§

fn next(&self) -> <StdioConsumer as Consumer>::NextFuture<'_>

source§

fn stream<'a, 'b>(&'b mut self) -> <StdioConsumer as Consumer>::Stream<'a>where 'b: 'a,

Implementors§