pub struct RedisConsumerOptions { /* private fields */ }
Expand description

Options for Consumers, including mode, group and other streaming mechanisms.

Implementations§

source§

impl RedisConsumerOptions

source

pub fn consumer_id(&self) -> Option<&ConsumerId>

Unlike Kafka, Redis requires consumers to self-assign consumer IDs. If unset, SeaStreamer uses a combination of host id + process id + thread id + timestamp.

source

pub fn set_consumer_id(&mut self, consumer_id: ConsumerId) -> &mut Self

source

pub fn consumer_timeout(&self) -> Option<&Duration>

If None, defaults to crate::DEFAULT_TIMEOUT.

source

pub fn set_consumer_timeout( &mut self, consumer_timeout: Option<Duration> ) -> &mut Self

source

pub fn set_auto_stream_reset(&mut self, v: AutoStreamReset) -> &mut Self

Where to stream from when the consumer group does not exists.

If unset, defaults to Latest.

source

pub fn auto_stream_reset(&self) -> &AutoStreamReset

source

pub fn set_auto_commit(&mut self, v: AutoCommit) -> &mut Self

There are two moving parts: auto-ack and auto-commit. You can enable one, or both. You can also opt out.

If unset, defaults to Delayed.

source

pub fn auto_commit(&self) -> &AutoCommit

source

pub fn set_auto_commit_delay(&mut self, v: Duration) -> &mut Self

The time needed for an ACK to realize. It is timed from the moment next returns. This option is only relevant when auto_commit is Delayed.

If unset, defaults to DEFAULT_AUTO_COMMIT_DELAY.

source

pub fn auto_commit_delay(&self) -> &Duration

source

pub fn set_auto_commit_interval(&mut self, v: Duration) -> &mut Self

The minimum interval for acks to be committed to the server. This option is only relevant when auto_commit is Rolling.

If unset, defaults to DEFAULT_AUTO_COMMIT_INTERVAL.

source

pub fn auto_commit_interval(&self) -> &Duration

source

pub fn set_auto_claim_interval(&mut self, v: Option<Duration>) -> &mut Self

The minimum interval for checking the XPENDING of others in the group. This option is only relevant when mode is LoadBalanced.

Defaults to DEFAULT_AUTO_CLAIM_INTERVAL. None means never.

source

pub fn auto_claim_interval(&self) -> Option<&Duration>

source

pub fn set_auto_claim_idle(&mut self, v: Duration) -> &mut Self

The idle time for a consumer to be considered dead and allow others to XCLAIM its messages. This option is only relevant when mode is LoadBalanced.

Defaults to DEFAULT_AUTO_CLAIM_IDLE. None means never.

source

pub fn auto_claim_idle(&self) -> &Duration

source

pub fn set_batch_size(&mut self, v: usize) -> &mut Self

Maximum number of messages to read from Redis in one request. A larger N would reduce the number of roundtrips. However, this also prevent messages from being chunked properly to load balance among consumers.

Choose this number by considering the throughput of the stream, number of consumers in one group, and the time required to process each message.

Cannot be 0. If unset: if mode is LoadBalanced, defaults to DEFAULT_LOAD_BALANCED_BATCH_SIZE. Otherwise, defaults to DEFAULT_BATCH_SIZE.

source

pub fn batch_size(&self) -> &usize

source

pub fn shard_ownership(&self) -> &ShardOwnership

Default is Shared.

source

pub fn set_shard_ownership( &mut self, shard_ownership: ShardOwnership ) -> &mut Self

source

pub fn set_mkstream(&mut self, enabled: bool) -> &mut Self

If set to false, a XGROUP CREATE <key> <groupname> <id or $> command will be used create consumer groups on the stream. If the stream key does not already exist, the command will fail and the consumer will fail to initialize.

Setting this to true will cause the consumer to run XGROUP CREATE <key> <groupname> <id or $> MKSTREAM commands when first getting new messages, allowing the consumer to initialize even if a producer has never written a message to a stream at the same key.

Default is false.

source

pub fn mkstream(&self) -> &bool

source

pub fn pre_fetch(&self) -> bool

Whether to pre-fetch the next page as the consumer is streaming, which results in less jitter.

If false, it only fetches when Consumer::next is called, aka. on demand.

This is a side effect of currently set consumer mode and auto commit options. e.g. when reading with NOACK the consumer should not pre-fetch.

Trait Implementations§

source§

impl Clone for RedisConsumerOptions

source§

fn clone(&self) -> RedisConsumerOptions

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl ConsumerOptions for RedisConsumerOptions

source§

fn set_consumer_group( &mut self, group_id: ConsumerGroup ) -> RedisResult<&mut Self>

SeaStreamer Redis offers two load-balancing mechanisms:

(Fine-grained) Shared shard

Multiple consumers in the same group can share the same shard. This is load-balanced in a first-ask-first-served manner, according to the Redis documentation. This can be considered dynamic load-balancing: faster consumers will consume more messages.

This is the vanilla Redis consumer group behaviour.

(Coarse) Owned shard

This feature is still WIP

Multiple consumers within the same group do not share a shard. Each consumer will attempt to claim ownership of a shard, and other consumers will not step in. However, if a consumer has been idle for too long (defined by auto_claim_idle), another consumer will step in and claim ownership of the shard.

As new consumers join the group, if the shard distribution is not in a ‘fair state’ (at most 1 shard per consumer), ownership of a shard may be transferred among consumers.

This mimicks Kafka’s consumer group behaviour.

This is implemented via a dedicated key (for each stream) in the same Redis db, and managed co-operatively among consumers.

§

type Error = RedisErr

source§

fn new(mode: ConsumerMode) -> Self

source§

fn mode(&self) -> RedisResult<&ConsumerMode>

Get currently set ConsumerMode
source§

fn consumer_group(&self) -> RedisResult<&ConsumerGroup>

Get currently set consumer group; may return StreamErr::ConsumerGroupNotSet.
source§

impl Debug for RedisConsumerOptions

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl Default for RedisConsumerOptions

source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more