Skip to main content

kevy_store/stream/
mod.rs

1//! Redis-compatible Streams storage. Each stream is an append-only log
2//! of (ID, field-value-list) entries keyed by a monotonically increasing
3//! `<ms>-<seq>` ID. The entries live in a `BTreeMap<StreamId, _>` so
4//! range queries are O(log n + k) and the iterator natural order is the
5//! ID order (ascending).
6//!
7//! Sprint A scope: bare stream (no consumer groups). The `StreamData`
8//! type carries a `groups` slot reserved for sprint B; this file only
9//! implements the entry-side ops.
10
11use std::collections::BTreeMap;
12#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
13use std::time::{SystemTime, UNIX_EPOCH};
14
15use kevy_map::KevyMap;
16
17use crate::value::{SmallBytes, BTREE_SLOT_BYTES};
18use crate::StoreError;
19
20// ───────────── StreamId ─────────────
21
22/// A stream entry's `<ms>-<seq>` identifier. The `Ord` derivation compares
23/// `ms` first then `seq`, which is exactly the monotonic order the protocol
24/// requires; same derivation gives `Eq`, `Hash`, and the `BTreeMap` key bound.
25#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
26pub struct StreamId {
27    /// Unix milliseconds timestamp component.
28    pub ms: u64,
29    /// Per-ms sequence number, 0-based.
30    pub seq: u64,
31}
32
33impl StreamId {
34    /// The numerically smallest ID; XRANGE `-` start.
35    pub const MIN: StreamId = StreamId { ms: 0, seq: 0 };
36    /// The numerically largest representable ID; XRANGE `+` end.
37    pub const MAX: StreamId = StreamId { ms: u64::MAX, seq: u64::MAX };
38
39    /// Render as the canonical `<ms>-<seq>` wire form.
40    pub fn encode(self) -> Vec<u8> {
41        format!("{}-{}", self.ms, self.seq).into_bytes()
42    }
43
44    /// Step one ID past `self`. Saturates at [`Self::MAX`].
45    #[must_use]
46    pub fn next(self) -> Self {
47        if self.seq < u64::MAX {
48            StreamId { ms: self.ms, seq: self.seq + 1 }
49        } else if self.ms < u64::MAX {
50            StreamId { ms: self.ms + 1, seq: 0 }
51        } else {
52            StreamId::MAX
53        }
54    }
55}
56
57/// XADD's ID argument: either an explicit `<ms>-<seq>` (both parts may
58/// be `*` to auto-fill `seq` only) or fully auto-generate via `*`.
59#[derive(Clone, Copy, Debug, Eq, PartialEq)]
60pub enum XAddIdSpec {
61    /// `*` — generate both `ms` (= current wall-clock) and `seq`.
62    AutoAll,
63    /// `<ms>-*` — caller fixes `ms`, server picks the next free `seq`.
64    AutoSeq(u64),
65    /// `<ms>-<seq>` — caller fully specifies the ID.
66    Explicit(StreamId),
67}
68
69/// Parse an XADD ID argument (`*`, `ms`, `ms-*`, `ms-seq`).
70pub fn parse_xadd_id(s: &[u8]) -> Result<XAddIdSpec, StreamIdError> {
71    if s == b"*" {
72        return Ok(XAddIdSpec::AutoAll);
73    }
74    let txt = std::str::from_utf8(s).map_err(|_| StreamIdError::Invalid)?;
75    match txt.split_once('-') {
76        None => {
77            let ms = txt.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
78            Ok(XAddIdSpec::Explicit(StreamId { ms, seq: 0 }))
79        }
80        Some((ms_s, seq_s)) => {
81            let ms = ms_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
82            if seq_s == "*" {
83                Ok(XAddIdSpec::AutoSeq(ms))
84            } else {
85                let seq = seq_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
86                Ok(XAddIdSpec::Explicit(StreamId { ms, seq }))
87            }
88        }
89    }
90}
91
92/// Parse an XRANGE `start` ID. Accepts `-` (= [`StreamId::MIN`]), bare
93/// `ms` (seq=0), and full `ms-seq`.
94pub fn parse_range_start(s: &[u8]) -> Result<StreamId, StreamIdError> {
95    if s == b"-" {
96        return Ok(StreamId::MIN);
97    }
98    parse_explicit_id(s, /*end=*/ false)
99}
100
101/// Parse an XRANGE `end` ID. Accepts `+` (= [`StreamId::MAX`]), bare `ms`
102/// (seq=u64::MAX so the entire ms is included), and full `ms-seq`.
103pub fn parse_range_end(s: &[u8]) -> Result<StreamId, StreamIdError> {
104    if s == b"+" {
105        return Ok(StreamId::MAX);
106    }
107    parse_explicit_id(s, /*end=*/ true)
108}
109
110/// Parse a fully-explicit ID for XREAD's per-stream "last-seen" arg
111/// (`0`, `0-0`, `5-2`). `$` is handled by the caller (it means "the
112/// stream's current `last_id`", which only Store can resolve).
113pub fn parse_explicit_id(s: &[u8], end: bool) -> Result<StreamId, StreamIdError> {
114    let txt = std::str::from_utf8(s).map_err(|_| StreamIdError::Invalid)?;
115    let (ms_s, seq_s) = match txt.split_once('-') {
116        Some(p) => p,
117        None => (txt, if end { "" } else { "0" }),
118    };
119    let ms = ms_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?;
120    let seq = if seq_s.is_empty() {
121        u64::MAX
122    } else {
123        seq_s.parse::<u64>().map_err(|_| StreamIdError::Invalid)?
124    };
125    Ok(StreamId { ms, seq })
126}
127
128/// Errors `parse_*_id` may emit. Distinct from `StoreError::NotInteger`
129/// so callers can map to the more specific Redis wire shape (`ERR
130/// Invalid stream ID specified as stream command argument`).
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
132pub enum StreamIdError {
133    /// Couldn't parse the bytes as `<ms>[-<seq>]` / `*` / `-` / `+`.
134    Invalid,
135}
136
137// ───────────── StreamData ─────────────
138
139/// One stream's storage: every entry in `entries` plus the per-stream
140/// scalar state Redis exposes via `XINFO STREAM`, plus the consumer
141/// groups map (sprint B). An empty `groups` map costs ~8 bytes and
142/// makes the no-group fast path (sprint A XADD/XREAD) zero-overhead.
143#[derive(Default, Clone)]
144pub struct StreamData {
145    /// Sorted entries; the `BTreeMap` enforces strict-increasing IDs.
146    pub(super) entries: BTreeMap<StreamId, Vec<(SmallBytes, SmallBytes)>>,
147    /// Largest ID **ever** seen on this stream, even after the entry
148    /// has been deleted (XDEL doesn't roll the clock back).
149    pub(super) last_id: StreamId,
150    /// Largest ID that has been deleted (`max_deleted_entry_id` in
151    /// Redis XINFO). Used to detect "deletion-only" gaps for clients.
152    pub(super) max_deleted_id: StreamId,
153    /// Cumulative number of entries ever added — never decreases. Used
154    /// by XINFO STREAM's `entries-added`.
155    pub(super) entries_added: u64,
156    /// Consumer groups keyed by name (sprint B). Boxed so the
157    /// `StreamData` struct stays compact when no groups are attached.
158    pub(super) groups: KevyMap<SmallBytes, Box<group::ConsumerGroup>>,
159}
160
161impl StreamData {
162    /// Current entry count (never larger than `entries_added`).
163    pub fn length(&self) -> u64 {
164        self.entries.len() as u64
165    }
166
167    /// Last ID ever assigned. Resets to `MIN` only when the whole key
168    /// is deleted (we never down-rev a stream).
169    pub fn last_id(&self) -> StreamId {
170        self.last_id
171    }
172
173    /// XINFO STREAM helpers.
174    pub fn entries_added(&self) -> u64 {
175        self.entries_added
176    }
177
178    pub fn max_deleted_id(&self) -> StreamId {
179        self.max_deleted_id
180    }
181
182    /// Iterate every entry in ID-ascending order. Snapshot serializers
183    /// walk this to dump the stream.
184    pub fn iter_entries(
185        &self,
186    ) -> impl Iterator<Item = (StreamId, &[(SmallBytes, SmallBytes)])> {
187        self.entries.iter().map(|(id, fv)| (*id, fv.as_slice()))
188    }
189
190    /// First (smallest-ID) entry — `None` if empty.
191    pub fn first_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])> {
192        self.entries.iter().next().map(|(id, fv)| (*id, fv.as_slice()))
193    }
194
195    /// Last (largest-ID) entry — `None` if empty.
196    pub fn last_entry(&self) -> Option<(StreamId, &[(SmallBytes, SmallBytes)])> {
197        self.entries.iter().next_back().map(|(id, fv)| (*id, fv.as_slice()))
198    }
199
200    /// Iterate `(group_name, group)` pairs — used by `XINFO GROUPS`.
201    pub fn groups_iter(&self) -> impl Iterator<Item = (&[u8], &group::ConsumerGroup)> {
202        self.groups.iter().map(|(k, v)| (k.as_slice(), v.as_ref()))
203    }
204
205    /// Lookup one group by name (for `XINFO CONSUMERS`).
206    pub fn group(&self, name: &[u8]) -> Option<&group::ConsumerGroup> {
207        self.groups.get(name).map(std::convert::AsRef::as_ref)
208    }
209
210    /// Group count — `XINFO STREAM`'s `groups` field.
211    pub fn group_count(&self) -> usize {
212        self.groups.len()
213    }
214
215    /// Snapshot-loader entry-point: insert a pre-existing entry without
216    /// touching scalar state. Used by `Store::load_stream`; the loader
217    /// pumps every entry then calls [`Self::set_loaded_state`] once.
218    pub fn load_entry(&mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>) {
219        self.entries.insert(id, fields);
220    }
221
222    /// Snapshot-loader: restore the per-stream scalars after every
223    /// entry has been pushed via [`Self::load_entry`].
224    pub fn set_loaded_state(
225        &mut self,
226        last_id: StreamId,
227        max_deleted_id: StreamId,
228        entries_added: u64,
229    ) {
230        self.last_id = last_id;
231        self.max_deleted_id = max_deleted_id;
232        self.entries_added = entries_added;
233    }
234
235    /// Insert a pre-resolved entry. Caller is responsible for picking
236    /// the ID via [`StreamData::resolve_xadd_id`] so monotonicity holds.
237    pub(crate) fn insert(&mut self, id: StreamId, fields: Vec<(SmallBytes, SmallBytes)>) {
238        debug_assert!(id > self.last_id || (id == StreamId::MIN && self.last_id == StreamId::MIN));
239        self.entries.insert(id, fields);
240        self.last_id = id;
241        self.entries_added += 1;
242    }
243
244    /// Translate XADD's `XAddIdSpec` into a concrete `StreamId`,
245    /// rejecting any spec that would not be strictly greater than
246    /// `self.last_id`. `now_ms` is injected so tests can pin wall-clock.
247    pub fn resolve_xadd_id(
248        &self,
249        spec: XAddIdSpec,
250        now_ms: u64,
251    ) -> Result<StreamId, StoreError> {
252        let candidate = match spec {
253            XAddIdSpec::AutoAll => {
254                let ms = now_ms.max(self.last_id.ms);
255                if ms == self.last_id.ms {
256                    StreamId { ms, seq: self.last_id.seq + 1 }
257                } else {
258                    StreamId { ms, seq: 0 }
259                }
260            }
261            XAddIdSpec::AutoSeq(ms) => {
262                if ms < self.last_id.ms {
263                    return Err(StoreError::OutOfRange);
264                }
265                if ms == self.last_id.ms {
266                    StreamId { ms, seq: self.last_id.seq + 1 }
267                } else {
268                    StreamId { ms, seq: 0 }
269                }
270            }
271            XAddIdSpec::Explicit(id) => {
272                if id <= self.last_id {
273                    return Err(StoreError::OutOfRange);
274                }
275                if id == StreamId::MIN {
276                    return Err(StoreError::OutOfRange);
277                }
278                id
279            }
280        };
281        Ok(candidate)
282    }
283
284    /// XRANGE — inclusive `[start, end]`, optionally COUNT-bounded.
285    pub fn range(
286        &self,
287        start: StreamId,
288        end: StreamId,
289        count: Option<usize>,
290    ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
291        let iter = self.entries.range(start..=end).map(|(id, fv)| (*id, fv.as_slice()));
292        match count {
293            Some(n) => iter.take(n).collect(),
294            None => iter.collect(),
295        }
296    }
297
298    /// XREVRANGE — same `[start, end]` interval, descending order.
299    pub fn revrange(
300        &self,
301        start: StreamId,
302        end: StreamId,
303        count: Option<usize>,
304    ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
305        let iter = self.entries.range(start..=end).rev().map(|(id, fv)| (*id, fv.as_slice()));
306        match count {
307            Some(n) => iter.take(n).collect(),
308            None => iter.collect(),
309        }
310    }
311
312    /// XREAD — entries strictly after `last_seen`, optionally COUNT-bounded.
313    pub fn read_after(
314        &self,
315        last_seen: StreamId,
316        count: Option<usize>,
317    ) -> Vec<(StreamId, &[(SmallBytes, SmallBytes)])> {
318        if last_seen == StreamId::MAX {
319            return Vec::new();
320        }
321        self.range(last_seen.next(), StreamId::MAX, count)
322    }
323
324    /// XDEL — remove `ids`. Returns the count actually removed (missing
325    /// IDs silently skipped). Updates `max_deleted_id` so XINFO can
326    /// report it.
327    pub(crate) fn del_ids(&mut self, ids: &[StreamId]) -> usize {
328        let mut removed = 0usize;
329        for id in ids {
330            if self.entries.remove(id).is_some() {
331                removed += 1;
332                if *id > self.max_deleted_id {
333                    self.max_deleted_id = *id;
334                }
335            }
336        }
337        removed
338    }
339
340    /// XTRIM MAXLEN — keep the most recent `n` entries.
341    pub(crate) fn trim_maxlen(&mut self, n: usize) -> usize {
342        let len = self.entries.len();
343        if len <= n {
344            return 0;
345        }
346        let drop = len - n;
347        let mut removed = 0;
348        let drop_ids: Vec<StreamId> = self.entries.keys().copied().take(drop).collect();
349        for id in drop_ids {
350            self.entries.remove(&id);
351            if id > self.max_deleted_id {
352                self.max_deleted_id = id;
353            }
354            removed += 1;
355        }
356        removed
357    }
358
359    /// Approximate heap footprint for `Value::weight`. Walks the entry
360    /// list once; cheap relative to the size of the stream itself.
361    pub fn weight(&self) -> u64 {
362        let entry_sum: u64 = self
363            .entries
364            .values()
365            .map(|fv| {
366                24 + fv
367                    .iter()
368                    .map(|(f, v)| 48 + f.heap_bytes() as u64 + v.heap_bytes() as u64)
369                    .sum::<u64>()
370            })
371            .sum();
372        (self.entries.len() as u64).saturating_mul(BTREE_SLOT_BYTES) + entry_sum
373    }
374
375    /// XTRIM MINID — drop every entry with ID < `floor`.
376    pub(crate) fn trim_minid(&mut self, floor: StreamId) -> usize {
377        let drop_ids: Vec<StreamId> = self
378            .entries
379            .range(..floor)
380            .map(|(id, _)| *id)
381            .collect();
382        let removed = drop_ids.len();
383        for id in drop_ids {
384            self.entries.remove(&id);
385            if id > self.max_deleted_id {
386                self.max_deleted_id = id;
387            }
388        }
389        removed
390    }
391}
392
393mod claim;
394mod group;
395mod load;
396mod store;
397#[allow(unused_imports)]
398pub use claim::AutoclaimResult;
399pub use load::{LoadedGroup, LoadedPelEntry};
400#[allow(unused_imports)]
401pub use group::{
402    ConsumerGroup, ConsumerState, GroupCreateMode, PelEntry, PendingExtended,
403    PendingExtendedRow, PendingSummary, ReadGroupId, XClaimOpts,
404};
405pub use store::EntryBatch;
406
407/// Snapshot-loader payload: one stream entry decoded into primitive
408/// tuples `(ms, seq, [(field, value), ...])`. The persist crate emits
409/// these and `Store::load_stream` consumes them.
410pub type LoadedStreamEntry = (u64, u64, Vec<(Vec<u8>, Vec<u8>)>);
411
412// ───────────── small helpers (shared with `store.rs`) ─────────────
413
414/// Wall-clock millis. Shared with dispatchers so every XADD on a shard uses
415/// the same clock source. On native targets reads `SystemTime::now()` (falls
416/// back to 0 on a pre-UNIX-EPOCH clock — impossible on supported platforms);
417/// on `wasm32-unknown-unknown`, where `SystemTime::now()` traps, reads the
418/// host-fed wall clock (see `crate::set_wall_clock_ms`, wasm-only).
419#[cfg(not(all(target_arch = "wasm32", target_os = "unknown")))]
420pub fn now_unix_ms() -> u64 {
421    SystemTime::now()
422        .duration_since(UNIX_EPOCH)
423        .map_or(0, |d| d.as_millis() as u64)
424}
425
426#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
427pub fn now_unix_ms() -> u64 {
428    crate::clock::wall_now_unix_ms()
429}
430
431pub(super) fn stream_entry_weight(fields: &[(SmallBytes, SmallBytes)]) -> u64 {
432    // BTreeMap slot + Vec header + each (field, value) cell + their heap.
433    BTREE_SLOT_BYTES
434        + 24
435        + fields
436            .iter()
437            .map(|(f, v)| 48 + f.heap_bytes() as u64 + v.heap_bytes() as u64)
438            .sum::<u64>()
439}
440
441pub(super) fn clone_entries(
442    src: Vec<(StreamId, &[(SmallBytes, SmallBytes)])>,
443) -> EntryBatch {
444    src.into_iter()
445        .map(|(id, fv)| (id, fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect()))
446        .collect()
447}