pub struct RedisStream { /* private fields */ }Expand description
Describes one Redis Streams subscription against crate::RedisBroker.
§Examples
use std::time::Duration;
use ruststream_fred::RedisStream;
// Fresh tail: a normal worker reading new entries.
let fresh = RedisStream::new("orders").group("workers").count(128);
// Recovery: reclaim entries a crashed worker left pending for over 30s.
let recover = RedisStream::reclaim("orders", Duration::from_secs(30)).group("workers");Implementations§
Source§impl RedisStream
impl RedisStream
Sourcepub fn new(key: impl Into<String>) -> Self
pub fn new(key: impl Into<String>) -> Self
A fresh-tail subscription on key: reads new entries via XREADGROUP >.
A consumer group is required; set it with group.
Sourcepub fn reclaim(key: impl Into<String>, min_idle: Duration) -> Self
pub fn reclaim(key: impl Into<String>, min_idle: Duration) -> Self
A recovery subscription on key: reclaims pending entries idle at least min_idle via
XAUTOCLAIM. Run it alongside a new subscriber on the same group to pick up
messages a consumer fetched but died before acking.
min_idle has no default and must exceed the longest legitimate handler runtime: set it too
low and a healthy consumer’s in-flight message gets reclaimed and processed twice.
Sourcepub fn group(self, group: impl Into<String>) -> Self
pub fn group(self, group: impl Into<String>) -> Self
Sets the consumer group. Required for every subscription.
Sourcepub fn consumer(self, consumer: impl Into<String>) -> Self
pub fn consumer(self, consumer: impl Into<String>) -> Self
Sets this consumer’s name within the group. Defaults to an auto-generated unique name.
Sourcepub const fn count(self, count: u64) -> Self
pub const fn count(self, count: u64) -> Self
Upper bound on entries fetched per read. Defaults to 64.
Sourcepub const fn block(self, block: Duration) -> Self
pub const fn block(self, block: Duration) -> Self
How long one read blocks waiting for entries. Defaults to 5 seconds. In fresh-tail mode this
is the XREADGROUP server-side block; in reclaim mode XAUTOCLAIM does not block, so this is
the poll interval slept between scans that find nothing to reclaim.
Sourcepub fn start_id(self, start: StreamStart) -> Self
pub fn start_id(self, start: StreamStart) -> Self
Where a newly created group starts reading. Ignored if the group already exists. Only
meaningful for the fresh-tail new mode.
Sourcepub fn dead_letter(self, key: impl Into<String>) -> Self
pub fn dead_letter(self, key: impl Into<String>) -> Self
Routes dropped and poison messages to the named dead-letter stream instead of discarding
them. Off by default. The copy is tagged with
DEAD_LETTER_REASON_HEADER. See [crate::deadletter].
Sourcepub const fn max_deliveries(self, max: u64) -> Self
pub const fn max_deliveries(self, max: u64) -> Self
Caps how many times a message may be delivered before it is treated as poison (dead-lettered or, with no dead-letter stream, discarded). Off by default.
The cap is checked against both the framework retry-count header (the nack/republish loop)
and the native stream delivery count (the reclaim loop), so a message poisoning either way is
caught.
Sourcepub fn delayed_retry(self, retry: DelayedRetry) -> Self
pub fn delayed_retry(self, retry: DelayedRetry) -> Self
Opts this subscription into durable, crash-safe delayed retry backed by a ZSET delay queue.
Off by default: without it, retry_after(delay) / nack_after(delay) degrade to the
runtime’s broker-agnostic deferred re-publish (at-most-once over the delay window). With it,
a delayed delivery is ZADDed to the named ZSET and replayed from there once due, so the
retry survives a process crash. See DelayedRetry for the key and TTL requirements.
The sweeper that replays due entries runs inside this subscription’s read loop, so its
granularity is the read block interval.
Trait Implementations§
Source§impl Clone for RedisStream
impl Clone for RedisStream
Source§fn clone(&self) -> RedisStream
fn clone(&self) -> RedisStream
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for RedisStream
impl Debug for RedisStream
Source§impl SubscriptionSource<RedisBroker> for RedisStream
impl SubscriptionSource<RedisBroker> for RedisStream
Source§type Subscriber = RedisSubscriber
type Subscriber = RedisSubscriber
Source§async fn subscribe(
self,
broker: &RedisBroker,
) -> Result<Self::Subscriber, RedisError>
async fn subscribe( self, broker: &RedisBroker, ) -> Result<Self::Subscriber, RedisError>
Auto Trait Implementations§
impl Freeze for RedisStream
impl RefUnwindSafe for RedisStream
impl Send for RedisStream
impl Sync for RedisStream
impl Unpin for RedisStream
impl UnsafeUnpin for RedisStream
impl UnwindSafe for RedisStream
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
impl<ST, DT> CastableFrom<ST, Initialized, Initialized> for DT
impl<ST, DT> CastableFrom<ST, Uninit, Uninit> for DT
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more