pub trait ConsumerOptions: Default + Clone + Send {
    type Error: Error;

    // Required methods
    fn new(mode: ConsumerMode) -> Self;
    fn mode(&self) -> Result<&ConsumerMode, StreamErr<Self::Error>>;
    fn consumer_group(&self) -> Result<&ConsumerGroup, StreamErr<Self::Error>>;
    fn set_consumer_group(
        &mut self,
        group_id: ConsumerGroup
    ) -> Result<&mut Self, StreamErr<Self::Error>>;
}
Expand description

Common options of a Consumer.

Required Associated Types§

Required Methods§

source

fn new(mode: ConsumerMode) -> Self

source

fn mode(&self) -> Result<&ConsumerMode, StreamErr<Self::Error>>

Get currently set ConsumerMode

source

fn consumer_group(&self) -> Result<&ConsumerGroup, StreamErr<Self::Error>>

Get currently set consumer group; may return StreamErr::ConsumerGroupNotSet.

source

fn set_consumer_group( &mut self, group_id: ConsumerGroup ) -> Result<&mut Self, StreamErr<Self::Error>>

Set consumer group for this consumer. Note the semantic is implementation-specific.

Implementations on Foreign Types§

source§

impl ConsumerOptions for KafkaConsumerOptions

source§

fn set_consumer_group( &mut self, group: ConsumerGroup ) -> Result<&mut KafkaConsumerOptions, StreamErr<KafkaError>>

If multiple consumers shares the same group, only one consumer in the group will receive a message, i.e. it is load-balanced.

However, the load-balancing mechanism is what makes Kafka different:

Each stream is divided into multiple shards (known as partition), and each partition will be assigned to only one consumer in a group.

Say there are 2 consumers (in the group) and 2 partitions, then each consumer will receive messages from one partition, and they are thus load-balanced.

If there are 2 consumers and 3 partitions, then one consumer will be assigned 2 partitions, and the other will be assigned only 1.

However if the stream has only 1 partition, even if there are many consumers, these messages will only be received by the assigned consumer, and other consumers will be in stand-by mode, resulting in a hot-failover setup.

§

type Error = KafkaError

source§

fn new(mode: ConsumerMode) -> KafkaConsumerOptions

source§

fn mode(&self) -> Result<&ConsumerMode, StreamErr<KafkaError>>

source§

fn consumer_group(&self) -> Result<&ConsumerGroup, StreamErr<KafkaError>>

source§

impl ConsumerOptions for StdioConsumerOptions

source§

fn set_consumer_group( &mut self, group: ConsumerGroup ) -> Result<&mut StdioConsumerOptions, StreamErr<StdioErr>>

If multiple consumers share the same group, only one in the group will receive a message. This is load-balanced in a round-robin fashion.

§

type Error = StdioErr

source§

fn new(mode: ConsumerMode) -> StdioConsumerOptions

source§

fn mode(&self) -> Result<&ConsumerMode, StreamErr<StdioErr>>

source§

fn consumer_group(&self) -> Result<&ConsumerGroup, StreamErr<StdioErr>>

Implementors§