pub struct StreamData { /* private fields */ }Expand description
One stream’s storage: every entry in entries plus the per-stream
scalar state Redis exposes via XINFO STREAM, plus the consumer
groups map (sprint B). An empty groups map costs ~8 bytes and
makes the no-group fast path (sprint A XADD/XREAD) zero-overhead.
Implementations§
Source§impl StreamData
impl StreamData
Sourcepub fn claim(
&mut self,
group: &[u8],
new_owner: &[u8],
ids: &[StreamId],
opts: &XClaimOpts,
now_ms: u64,
) -> Result<Vec<StreamId>, StoreError>
pub fn claim( &mut self, group: &[u8], new_owner: &[u8], ids: &[StreamId], opts: &XClaimOpts, now_ms: u64, ) -> Result<Vec<StreamId>, StoreError>
XCLAIM key group consumer min-idle-ms id [id ...] [...].
Returns the IDs successfully claimed (the dispatcher decides
whether to emit JUSTID or full entries).
Sourcepub fn autoclaim(
&mut self,
group: &[u8],
new_owner: &[u8],
min_idle_ms: u64,
start: StreamId,
count: usize,
justid: bool,
now_ms: u64,
) -> Result<AutoclaimResult, StoreError>
pub fn autoclaim( &mut self, group: &[u8], new_owner: &[u8], min_idle_ms: u64, start: StreamId, count: usize, justid: bool, now_ms: u64, ) -> Result<AutoclaimResult, StoreError>
XAUTOCLAIM key group consumer min-idle-ms start [COUNT n] [JUSTID]. Walks the PEL from start onward, claiming the
first count entries whose idle ≥ min_idle_ms. Returns
(next_cursor_id, claimed_ids, deleted_ids).
Sourcepub fn payloads_for(&self, ids: &[StreamId]) -> EntryBatch
pub fn payloads_for(&self, ids: &[StreamId]) -> EntryBatch
Field-value payload list pairing with ids (from
Self::claim / Self::autoclaim). Skips IDs that were
XDELed between claim and emit.
Source§impl StreamData
impl StreamData
Sourcepub fn group_create(
&mut self,
name: &[u8],
mode: GroupCreateMode,
) -> Result<bool, StoreError>
pub fn group_create( &mut self, name: &[u8], mode: GroupCreateMode, ) -> Result<bool, StoreError>
XGROUP CREATE key group <id|$> [MKSTREAM]. Returns true if
a new group was created; false if the group already existed
(caller should report Redis’s -BUSYGROUP error in that case).
Sourcepub fn group_destroy(&mut self, name: &[u8]) -> bool
pub fn group_destroy(&mut self, name: &[u8]) -> bool
XGROUP DESTROY key group. Returns true if a group was dropped.
Sourcepub fn group_setid(&mut self, name: &[u8], mode: GroupCreateMode) -> bool
pub fn group_setid(&mut self, name: &[u8], mode: GroupCreateMode) -> bool
XGROUP SETID key group <id|$>. Returns false if the group
doesn’t exist.
Sourcepub fn group_create_consumer(
&mut self,
group: &[u8],
consumer: &[u8],
now_ms: u64,
) -> bool
pub fn group_create_consumer( &mut self, group: &[u8], consumer: &[u8], now_ms: u64, ) -> bool
XGROUP CREATECONSUMER key group consumer. Returns true if a
new consumer was inserted, false if it already existed or the
group is missing.
Sourcepub fn group_del_consumer(&mut self, group: &[u8], consumer: &[u8]) -> u64
pub fn group_del_consumer(&mut self, group: &[u8], consumer: &[u8]) -> u64
XGROUP DELCONSUMER key group consumer. Returns the number of
PEL entries dropped along with the consumer (matches Redis).
Sourcepub fn readgroup(
&mut self,
group: &[u8],
consumer: &[u8],
last_seen_arg: ReadGroupId,
count: Option<usize>,
noack: bool,
now_ms: u64,
) -> Result<EntryBatch, StoreError>
pub fn readgroup( &mut self, group: &[u8], consumer: &[u8], last_seen_arg: ReadGroupId, count: Option<usize>, noack: bool, now_ms: u64, ) -> Result<EntryBatch, StoreError>
XREADGROUP GROUP g c [COUNT n] STREAMS key id. ID > →
“new entries since last_delivered_id” (updates last_delivered);
ID <x> → “PEL entries for this consumer with id > x” (does
NOT update last_delivered, used for replay).
Sourcepub fn ack(&mut self, group: &[u8], ids: &[StreamId]) -> u64
pub fn ack(&mut self, group: &[u8], ids: &[StreamId]) -> u64
XACK key group id [...]. Returns count of PEL entries removed.
Sourcepub fn pending_summary(&self, group: &[u8]) -> Option<PendingSummary>
pub fn pending_summary(&self, group: &[u8]) -> Option<PendingSummary>
XPENDING key group — the summary form (4-tuple).
Source§impl StreamData
impl StreamData
Sourcepub fn contains_entry(&self, id: StreamId) -> bool
pub fn contains_entry(&self, id: StreamId) -> bool
Does an entry with id currently exist? AOF rewrite uses this
to filter tombstone PEL rows (XCLAIM can’t re-create those).
Sourcepub fn export_groups(&self) -> Vec<LoadedGroup>
pub fn export_groups(&self) -> Vec<LoadedGroup>
Dump every group into the primitive exchange form.
Sourcepub fn import_groups(&mut self, groups: Vec<LoadedGroup>)
pub fn import_groups(&mut self, groups: Vec<LoadedGroup>)
Rebuild the group map from the exchange form (loader-side twin of
Self::export_groups). Per-consumer pel_count is recomputed;
a PEL owner missing from the consumer roster (hand-built or
corrupt file) gets a roster slot rather than a panic.
Sourcepub fn xsetid(
&mut self,
last_id: StreamId,
entries_added: Option<u64>,
max_deleted_id: Option<StreamId>,
) -> Result<(), StoreError>
pub fn xsetid( &mut self, last_id: StreamId, entries_added: Option<u64>, max_deleted_id: Option<StreamId>, ) -> Result<(), StoreError>
XSETID key last-id [ENTRIESADDED n] [MAXDELETEDID id] — overwrite
the stream’s scalar state. Rejects a last_id below the current
top entry (Redis: “smaller than the target stream top item”).
Source§impl StreamData
impl StreamData
Sourcepub fn last_id(&self) -> StreamId
pub fn last_id(&self) -> StreamId
Last ID ever assigned. Resets to MIN only when the whole key
is deleted (we never down-rev a stream).
Sourcepub fn entries_added(&self) -> u64
pub fn entries_added(&self) -> u64
XINFO STREAM helpers.
pub fn max_deleted_id(&self) -> StreamId
Sourcepub fn iter_entries(
&self,
) -> impl Iterator<Item = (StreamId, &[(SmallBytes, SmallBytes)])>
pub fn iter_entries( &self, ) -> impl Iterator<Item = (StreamId, &[(SmallBytes, SmallBytes)])>
Iterate every entry in ID-ascending order. Snapshot serializers walk this to dump the stream.
Sourcepub fn first_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])>
pub fn first_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])>
First (smallest-ID) entry — None if empty.
Sourcepub fn last_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])>
pub fn last_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])>
Last (largest-ID) entry — None if empty.
Sourcepub fn groups_iter(&self) -> impl Iterator<Item = (&[u8], &ConsumerGroup)>
pub fn groups_iter(&self) -> impl Iterator<Item = (&[u8], &ConsumerGroup)>
Iterate (group_name, group) pairs — used by XINFO GROUPS.
Sourcepub fn group(&self, name: &[u8]) -> Option<&ConsumerGroup>
pub fn group(&self, name: &[u8]) -> Option<&ConsumerGroup>
Lookup one group by name (for XINFO CONSUMERS).
Sourcepub fn group_count(&self) -> usize
pub fn group_count(&self) -> usize
Group count — XINFO STREAM’s groups field.
Sourcepub fn load_entry(
&mut self,
id: StreamId,
fields: Vec<(SmallBytes, SmallBytes)>,
)
pub fn load_entry( &mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>, )
Snapshot-loader entry-point: insert a pre-existing entry without
touching scalar state. Used by Store::load_stream; the loader
pumps every entry then calls Self::set_loaded_state once.
Sourcepub fn set_loaded_state(
&mut self,
last_id: StreamId,
max_deleted_id: StreamId,
entries_added: u64,
)
pub fn set_loaded_state( &mut self, last_id: StreamId, max_deleted_id: StreamId, entries_added: u64, )
Snapshot-loader: restore the per-stream scalars after every
entry has been pushed via Self::load_entry.
Sourcepub fn resolve_xadd_id(
&self,
spec: XAddIdSpec,
now_ms: u64,
) -> Result<StreamId, StoreError>
pub fn resolve_xadd_id( &self, spec: XAddIdSpec, now_ms: u64, ) -> Result<StreamId, StoreError>
Translate XADD’s XAddIdSpec into a concrete StreamId,
rejecting any spec that would not be strictly greater than
self.last_id. now_ms is injected so tests can pin wall-clock.
Sourcepub fn range(
&self,
start: StreamId,
end: StreamId,
count: Option<usize>,
) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>
pub fn range( &self, start: StreamId, end: StreamId, count: Option<usize>, ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>
XRANGE — inclusive [start, end], optionally COUNT-bounded.
Sourcepub fn revrange(
&self,
start: StreamId,
end: StreamId,
count: Option<usize>,
) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>
pub fn revrange( &self, start: StreamId, end: StreamId, count: Option<usize>, ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>
XREVRANGE — same [start, end] interval, descending order.
Sourcepub fn read_after(
&self,
last_seen: StreamId,
count: Option<usize>,
) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>
pub fn read_after( &self, last_seen: StreamId, count: Option<usize>, ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>
XREAD — entries strictly after last_seen, optionally COUNT-bounded.
Trait Implementations§
Source§impl Clone for StreamData
impl Clone for StreamData
Source§fn clone(&self) -> StreamData
fn clone(&self) -> StreamData
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more