Struct sea_streamer_redis::RedisConsumerOptions
source ยท pub struct RedisConsumerOptions { /* private fields */ }
Expand description
Options for Consumers, including mode, group and other streaming mechanisms.
Implementationsยง
sourceยงimpl RedisConsumerOptions
impl RedisConsumerOptions
sourcepub fn consumer_id(&self) -> Option<&ConsumerId>
pub fn consumer_id(&self) -> Option<&ConsumerId>
Unlike Kafka, Redis requires consumers to self-assign consumer IDs.
If unset, SeaStreamer uses a combination of host id
+ process id
+ thread id
+ timestamp
.
pub fn set_consumer_id(&mut self, consumer_id: ConsumerId) -> &mut Self
sourcepub fn consumer_timeout(&self) -> Option<&Duration>
pub fn consumer_timeout(&self) -> Option<&Duration>
If None, defaults to crate::DEFAULT_TIMEOUT
.
pub fn set_consumer_timeout( &mut self, consumer_timeout: Option<Duration> ) -> &mut Self
sourcepub fn set_auto_stream_reset(&mut self, v: AutoStreamReset) -> &mut Self
pub fn set_auto_stream_reset(&mut self, v: AutoStreamReset) -> &mut Self
Where to stream from when the consumer group does not exists.
If unset, defaults to Latest
.
pub fn auto_stream_reset(&self) -> &AutoStreamReset
sourcepub fn set_auto_commit(&mut self, v: AutoCommit) -> &mut Self
pub fn set_auto_commit(&mut self, v: AutoCommit) -> &mut Self
There are two moving parts: auto-ack and auto-commit. You can enable one, or both. You can also opt out.
If unset, defaults to Delayed
.
pub fn auto_commit(&self) -> &AutoCommit
sourcepub fn set_auto_commit_delay(&mut self, v: Duration) -> &mut Self
pub fn set_auto_commit_delay(&mut self, v: Duration) -> &mut Self
The time needed for an ACK to realize.
It is timed from the moment next
returns.
This option is only relevant when auto_commit
is Delayed
.
If unset, defaults to DEFAULT_AUTO_COMMIT_DELAY
.
pub fn auto_commit_delay(&self) -> &Duration
sourcepub fn set_auto_commit_interval(&mut self, v: Duration) -> &mut Self
pub fn set_auto_commit_interval(&mut self, v: Duration) -> &mut Self
The minimum interval for acks to be committed to the server.
This option is only relevant when auto_commit
is Rolling
.
If unset, defaults to DEFAULT_AUTO_COMMIT_INTERVAL
.
pub fn auto_commit_interval(&self) -> &Duration
sourcepub fn set_auto_claim_interval(&mut self, v: Option<Duration>) -> &mut Self
pub fn set_auto_claim_interval(&mut self, v: Option<Duration>) -> &mut Self
The minimum interval for checking the XPENDING of others in the group.
This option is only relevant when mode
is LoadBalanced
.
Defaults to DEFAULT_AUTO_CLAIM_INTERVAL
. None means never.
pub fn auto_claim_interval(&self) -> Option<&Duration>
sourcepub fn set_auto_claim_idle(&mut self, v: Duration) -> &mut Self
pub fn set_auto_claim_idle(&mut self, v: Duration) -> &mut Self
The idle time for a consumer to be considered dead and allow others to XCLAIM its messages.
This option is only relevant when mode
is LoadBalanced
.
Defaults to DEFAULT_AUTO_CLAIM_IDLE
. None means never.
pub fn auto_claim_idle(&self) -> &Duration
sourcepub fn set_batch_size(&mut self, v: usize) -> &mut Self
pub fn set_batch_size(&mut self, v: usize) -> &mut Self
Maximum number of messages to read from Redis in one request. A larger N would reduce the number of roundtrips. However, this also prevent messages from being chunked properly to load balance among consumers.
Choose this number by considering the throughput of the stream, number of consumers in one group, and the time required to process each message.
Cannot be 0
. If unset: if mode is LoadBalanced
, defaults to DEFAULT_LOAD_BALANCED_BATCH_SIZE
.
Otherwise, defaults to DEFAULT_BATCH_SIZE
.
pub fn batch_size(&self) -> &usize
sourcepub fn shard_ownership(&self) -> &ShardOwnership
pub fn shard_ownership(&self) -> &ShardOwnership
Default is Shared
.
pub fn set_shard_ownership( &mut self, shard_ownership: ShardOwnership ) -> &mut Self
sourcepub fn set_mkstream(&mut self, enabled: bool) -> &mut Self
pub fn set_mkstream(&mut self, enabled: bool) -> &mut Self
If set to false
, a XGROUP CREATE <key> <groupname> <id or $>
command will be used create consumer
groups on the stream.
If the stream key does not already exist, the command will fail and the consumer will fail to initialize.
Setting this to true
will cause the consumer to run
XGROUP CREATE <key> <groupname> <id or $> MKSTREAM
commands when first getting new messages,
allowing the consumer to initialize even if a producer has never written a message
to a stream at the same key.
Default is false
.
pub fn mkstream(&self) -> &bool
sourcepub fn pre_fetch(&self) -> bool
pub fn pre_fetch(&self) -> bool
Whether to pre-fetch the next page as the consumer is streaming, which results in less jitter.
If false, it only fetches when Consumer::next
is called, aka. on demand.
This is a side effect of currently set consumer mode and auto commit options.
e.g. when reading with NOACK
the consumer should not pre-fetch.
Trait Implementationsยง
sourceยงimpl Clone for RedisConsumerOptions
impl Clone for RedisConsumerOptions
sourceยงfn clone(&self) -> RedisConsumerOptions
fn clone(&self) -> RedisConsumerOptions
1.0.0 ยท sourceยงfn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source
. Read moresourceยงimpl ConsumerOptions for RedisConsumerOptions
impl ConsumerOptions for RedisConsumerOptions
sourceยงfn set_consumer_group(
&mut self,
group_id: ConsumerGroup
) -> RedisResult<&mut Self>
fn set_consumer_group( &mut self, group_id: ConsumerGroup ) -> RedisResult<&mut Self>
SeaStreamer Redis offers two load-balancing mechanisms:
ยง(Fine-grained) Shared shard
Multiple consumers in the same group can share the same shard. This is load-balanced in a first-ask-first-served manner, according to the Redis documentation. This can be considered dynamic load-balancing: faster consumers will consume more messages.
This is the vanilla Redis consumer group behaviour.
ยง(Coarse) Owned shard
This feature is still WIP
Multiple consumers within the same group do not share a shard.
Each consumer will attempt to claim ownership of a shard, and other consumers will not step in.
However, if a consumer has been idle for too long (defined by auto_claim_idle
),
another consumer will step in and claim ownership of the shard.
As new consumers join the group, if the shard distribution is not in a โfair stateโ (at most 1 shard per consumer), ownership of a shard may be transferred among consumers.
This mimicks Kafkaโs consumer group behaviour.
This is implemented via a dedicated key (for each stream) in the same Redis db, and managed co-operatively among consumers.
type Error = RedisErr
fn new(mode: ConsumerMode) -> Self
sourceยงfn mode(&self) -> RedisResult<&ConsumerMode>
fn mode(&self) -> RedisResult<&ConsumerMode>
sourceยงfn consumer_group(&self) -> RedisResult<&ConsumerGroup>
fn consumer_group(&self) -> RedisResult<&ConsumerGroup>
StreamErr::ConsumerGroupNotSet
.