Skip to main content

StreamData

Struct StreamData 

Source
pub struct StreamData { /* private fields */ }
Expand description

One stream’s storage: every entry in entries plus the per-stream scalar state Redis exposes via XINFO STREAM, plus the consumer groups map (sprint B). An empty groups map costs ~8 bytes and makes the no-group fast path (sprint A XADD/XREAD) zero-overhead.

Implementations§

Source§

impl StreamData

Source

pub fn claim( &mut self, group: &[u8], new_owner: &[u8], ids: &[StreamId], opts: &XClaimOpts, now_ms: u64, ) -> Result<Vec<StreamId>, StoreError>

XCLAIM key group consumer min-idle-ms id [id ...] [...]. Returns the IDs successfully claimed (the dispatcher decides whether to emit JUSTID or full entries).

Source

pub fn autoclaim( &mut self, group: &[u8], new_owner: &[u8], min_idle_ms: u64, start: StreamId, count: usize, justid: bool, now_ms: u64, ) -> Result<AutoclaimResult, StoreError>

XAUTOCLAIM key group consumer min-idle-ms start [COUNT n] [JUSTID]. Walks the PEL from start onward, claiming the first count entries whose idle ≥ min_idle_ms. Returns (next_cursor_id, claimed_ids, deleted_ids).

Source

pub fn payloads_for(&self, ids: &[StreamId]) -> EntryBatch

Field-value payload list pairing with ids (from Self::claim / Self::autoclaim). Skips IDs that were XDELed between claim and emit.

Source§

impl StreamData

Source

pub fn group_create( &mut self, name: &[u8], mode: GroupCreateMode, ) -> Result<bool, StoreError>

XGROUP CREATE key group <id|$> [MKSTREAM]. Returns true if a new group was created; false if the group already existed (caller should report Redis’s -BUSYGROUP error in that case).

Source

pub fn group_destroy(&mut self, name: &[u8]) -> bool

XGROUP DESTROY key group. Returns true if a group was dropped.

Source

pub fn group_setid(&mut self, name: &[u8], mode: GroupCreateMode) -> bool

XGROUP SETID key group <id|$>. Returns false if the group doesn’t exist.

Source

pub fn group_create_consumer( &mut self, group: &[u8], consumer: &[u8], now_ms: u64, ) -> bool

XGROUP CREATECONSUMER key group consumer. Returns true if a new consumer was inserted, false if it already existed or the group is missing.

Source

pub fn group_del_consumer(&mut self, group: &[u8], consumer: &[u8]) -> u64

XGROUP DELCONSUMER key group consumer. Returns the number of PEL entries dropped along with the consumer (matches Redis).

Source

pub fn readgroup( &mut self, group: &[u8], consumer: &[u8], last_seen_arg: ReadGroupId, count: Option<usize>, noack: bool, now_ms: u64, ) -> Result<EntryBatch, StoreError>

XREADGROUP GROUP g c [COUNT n] STREAMS key id. ID > → “new entries since last_delivered_id” (updates last_delivered); ID <x> → “PEL entries for this consumer with id > x” (does NOT update last_delivered, used for replay).

Source

pub fn ack(&mut self, group: &[u8], ids: &[StreamId]) -> u64

XACK key group id [...]. Returns count of PEL entries removed.

Source

pub fn pending_summary(&self, group: &[u8]) -> Option<PendingSummary>

XPENDING key group — the summary form (4-tuple).

Source

pub fn pending_extended( &self, group: &[u8], idle_min_ms: Option<u64>, start: StreamId, end: StreamId, count: usize, consumer_filter: Option<&[u8]>, now_ms: u64, ) -> Option<PendingExtended>

XPENDING key group [IDLE ms] start end count [consumer].

Source§

impl StreamData

Source

pub fn contains_entry(&self, id: StreamId) -> bool

Does an entry with id currently exist? AOF rewrite uses this to filter tombstone PEL rows (XCLAIM can’t re-create those).

Source

pub fn export_groups(&self) -> Vec<LoadedGroup>

Dump every group into the primitive exchange form.

Source

pub fn import_groups(&mut self, groups: Vec<LoadedGroup>)

Rebuild the group map from the exchange form (loader-side twin of Self::export_groups). Per-consumer pel_count is recomputed; a PEL owner missing from the consumer roster (hand-built or corrupt file) gets a roster slot rather than a panic.

Source

pub fn xsetid( &mut self, last_id: StreamId, entries_added: Option<u64>, max_deleted_id: Option<StreamId>, ) -> Result<(), StoreError>

XSETID key last-id [ENTRIESADDED n] [MAXDELETEDID id] — overwrite the stream’s scalar state. Rejects a last_id below the current top entry (Redis: “smaller than the target stream top item”).

Source§

impl StreamData

Source

pub fn length(&self) -> u64

Current entry count (never larger than entries_added).

Source

pub fn last_id(&self) -> StreamId

Last ID ever assigned. Resets to MIN only when the whole key is deleted (we never down-rev a stream).

Source

pub fn entries_added(&self) -> u64

XINFO STREAM helpers.

Source

pub fn max_deleted_id(&self) -> StreamId

Source

pub fn iter_entries( &self, ) -> impl Iterator<Item = (StreamId, &[(SmallBytes, SmallBytes)])>

Iterate every entry in ID-ascending order. Snapshot serializers walk this to dump the stream.

Source

pub fn first_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])>

First (smallest-ID) entry — None if empty.

Source

pub fn last_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])>

Last (largest-ID) entry — None if empty.

Source

pub fn groups_iter(&self) -> impl Iterator<Item = (&[u8], &ConsumerGroup)>

Iterate (group_name, group) pairs — used by XINFO GROUPS.

Source

pub fn group(&self, name: &[u8]) -> Option<&ConsumerGroup>

Lookup one group by name (for XINFO CONSUMERS).

Source

pub fn group_count(&self) -> usize

Group count — XINFO STREAM’s groups field.

Source

pub fn load_entry( &mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>, )

Snapshot-loader entry-point: insert a pre-existing entry without touching scalar state. Used by Store::load_stream; the loader pumps every entry then calls Self::set_loaded_state once.

Source

pub fn set_loaded_state( &mut self, last_id: StreamId, max_deleted_id: StreamId, entries_added: u64, )

Snapshot-loader: restore the per-stream scalars after every entry has been pushed via Self::load_entry.

Source

pub fn resolve_xadd_id( &self, spec: XAddIdSpec, now_ms: u64, ) -> Result<StreamId, StoreError>

Translate XADD’s XAddIdSpec into a concrete StreamId, rejecting any spec that would not be strictly greater than self.last_id. now_ms is injected so tests can pin wall-clock.

Source

pub fn range( &self, start: StreamId, end: StreamId, count: Option<usize>, ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>

XRANGE — inclusive [start, end], optionally COUNT-bounded.

Source

pub fn revrange( &self, start: StreamId, end: StreamId, count: Option<usize>, ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>

XREVRANGE — same [start, end] interval, descending order.

Source

pub fn read_after( &self, last_seen: StreamId, count: Option<usize>, ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])>

XREAD — entries strictly after last_seen, optionally COUNT-bounded.

Source

pub fn weight(&self) -> u64

Approximate heap footprint for Value::weight. Walks the entry list once; cheap relative to the size of the stream itself.

Trait Implementations§

Source§

impl Clone for StreamData

Source§

fn clone(&self) -> StreamData

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Default for StreamData

Source§

fn default() -> StreamData

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.