Skip to main content

kevy_store/stream/
store.rs

1//! `Store::xadd` / `xlen` / `xrange` / `xrevrange` / `xread` / `xdel` /
2//! `xtrim_*` — the per-keyspace surface for sprint A of v2-7 Streams.
3//! Kept separate from `stream/mod.rs` (which owns the `StreamData` /
4//! `StreamId` types + entry-side ops) so each file stays under the
5//! project's ≤500-LOC rule.
6
7use super::group::{AutoclaimResult, ReadGroupId};
8use super::{
9    GroupCreateMode, PendingExtended, PendingSummary, StreamData, StreamId, XAddIdSpec, XClaimOpts,
10};
11use crate::value::{SmallBytes, Value};
12use crate::{Entry, Store, StoreError};
13use std::sync::Arc;
14
15/// Cloned-out view of stream entries, the cross-module wire form. Keeps
16/// the same shape Redis sends and lets the callers stay decoupled from
17/// the `SmallBytes` interning the store uses internally.
18pub type EntryBatch = Vec<(StreamId, Vec<(Vec<u8>, Vec<u8>)>)>;
19
20impl Store {
21    fn stream_mut(
22        &mut self,
23        key: &[u8],
24        create: bool,
25    ) -> Result<Option<&mut StreamData>, StoreError> {
26        if self.live_entry_mut(key).is_none() {
27            if !create {
28                return Ok(None);
29            }
30            self.insert_entry(
31                SmallBytes::from_slice(key),
32                Entry::new(Value::Stream(Arc::default()), None),
33            );
34        }
35        match &mut self.map.get_mut(key).expect("present").value {
36            Value::Stream(s) => Ok(Some(Arc::make_mut(s))),
37            _ => Err(StoreError::WrongType),
38        }
39    }
40
41    fn stream_ref(&mut self, key: &[u8]) -> Result<Option<&StreamData>, StoreError> {
42        match self.live_entry(key) {
43            None => Ok(None),
44            Some(e) => match &e.value {
45                Value::Stream(s) => Ok(Some(s.as_ref())),
46                _ => Err(StoreError::WrongType),
47            },
48        }
49    }
50
51    /// Read-only access to a stream's `StreamData`, used by `XINFO`
52    /// to inspect entries / groups / consumers without going through
53    /// the wrapper layer. Returns `Ok(None)` for a missing key,
54    /// `WrongType` for a non-stream value at `key`.
55    pub fn stream_view(&mut self, key: &[u8]) -> Result<Option<&StreamData>, StoreError> {
56        self.stream_ref(key)
57    }
58
59    /// `XADD key <spec> field value [field value ...]`. Returns the
60    /// assigned ID. `nomkstream` matches Redis's `NOMKSTREAM` flag —
61    /// suppress key creation, returning `Ok(None)`. `now_ms` is the
62    /// wall-clock used for `XAddIdSpec::AutoAll`.
63    pub fn xadd(
64        &mut self,
65        key: &[u8],
66        spec: XAddIdSpec,
67        fields: Vec<(Vec<u8>, Vec<u8>)>,
68        nomkstream: bool,
69        now_ms: u64,
70    ) -> Result<Option<StreamId>, StoreError> {
71        if nomkstream && self.live_entry(key).is_none() {
72            return Ok(None);
73        }
74        let id;
75        let weight_delta;
76        {
77            let s = self.stream_mut(key, true)?.expect("created");
78            id = s.resolve_xadd_id(spec, now_ms)?;
79            let smb_fields: Vec<(SmallBytes, SmallBytes)> = fields
80                .into_iter()
81                .map(|(f, v)| (SmallBytes::from_slice(&f), SmallBytes::from_slice(&v)))
82                .collect();
83            weight_delta = super::stream_entry_weight(&smb_fields);
84            s.insert(id, smb_fields);
85        }
86        self.bump_if_watched(key);
87        self.account_delta(key, weight_delta as i64);
88        Ok(Some(id))
89    }
90
91    /// `XLEN key`. Returns 0 for a missing key.
92    pub fn xlen(&mut self, key: &[u8]) -> Result<u64, StoreError> {
93        Ok(self.stream_ref(key)?.map_or(0, super::StreamData::length))
94    }
95
96    /// `XRANGE key start end [COUNT n]`.
97    pub fn xrange(
98        &mut self,
99        key: &[u8],
100        start: StreamId,
101        end: StreamId,
102        count: Option<usize>,
103    ) -> Result<EntryBatch, StoreError> {
104        Ok(self
105            .stream_ref(key)?
106            .map_or_else(Vec::new, |s| super::clone_entries(s.range(start, end, count))))
107    }
108
109    /// `XREVRANGE key end start [COUNT n]`.
110    pub fn xrevrange(
111        &mut self,
112        key: &[u8],
113        start: StreamId,
114        end: StreamId,
115        count: Option<usize>,
116    ) -> Result<EntryBatch, StoreError> {
117        Ok(self
118            .stream_ref(key)?
119            .map_or_else(Vec::new, |s| super::clone_entries(s.revrange(start, end, count))))
120    }
121
122    /// `XREAD ... STREAMS key last_seen [...]` — per-key part.
123    pub fn xread(
124        &mut self,
125        key: &[u8],
126        last_seen: StreamId,
127        count: Option<usize>,
128    ) -> Result<EntryBatch, StoreError> {
129        Ok(self
130            .stream_ref(key)?
131            .map_or_else(Vec::new, |s| super::clone_entries(s.read_after(last_seen, count))))
132    }
133
134    /// Resolve `$` as XREAD's "last-seen" to the stream's current last
135    /// ID. Returns `MIN` for a missing key.
136    pub fn xread_dollar_last_id(&mut self, key: &[u8]) -> Result<StreamId, StoreError> {
137        Ok(self.stream_ref(key)?.map_or(StreamId::MIN, super::StreamData::last_id))
138    }
139
140    /// `XDEL key id [...]`. Returns count actually removed.
141    pub fn xdel(&mut self, key: &[u8], ids: &[StreamId]) -> Result<u64, StoreError> {
142        let n;
143        {
144            let Some(s) = self.stream_mut(key, false)? else {
145                return Ok(0);
146            };
147            n = s.del_ids(ids);
148        }
149        if n > 0 {
150            self.bump_if_watched(key);
151            self.reweigh_entry(key);
152        }
153        Ok(n as u64)
154    }
155
156    /// `XTRIM key MAXLEN n`. Returns number removed.
157    pub fn xtrim_maxlen(&mut self, key: &[u8], maxlen: u64) -> Result<u64, StoreError> {
158        let n;
159        {
160            let Some(s) = self.stream_mut(key, false)? else {
161                return Ok(0);
162            };
163            n = s.trim_maxlen(maxlen as usize);
164        }
165        if n > 0 {
166            self.bump_if_watched(key);
167            self.reweigh_entry(key);
168        }
169        Ok(n as u64)
170    }
171
172    /// `XTRIM key MINID id`. Returns number removed.
173    pub fn xtrim_minid(&mut self, key: &[u8], minid: StreamId) -> Result<u64, StoreError> {
174        let n;
175        {
176            let Some(s) = self.stream_mut(key, false)? else {
177                return Ok(0);
178            };
179            n = s.trim_minid(minid);
180        }
181        if n > 0 {
182            self.bump_if_watched(key);
183            self.reweigh_entry(key);
184        }
185        Ok(n as u64)
186    }
187
188    /// `XSETID key last-id [ENTRIESADDED n] [MAXDELETEDID id]`. Returns
189    /// `NoSuchKey` for a missing key (dispatch maps it to Redis's
190    /// "requires the key to exist" wording), `OutOfRange` when `last_id`
191    /// is below the stream's top entry.
192    pub fn xsetid(
193        &mut self,
194        key: &[u8],
195        last_id: StreamId,
196        entries_added: Option<u64>,
197        max_deleted_id: Option<StreamId>,
198    ) -> Result<(), StoreError> {
199        {
200            let Some(s) = self.stream_mut(key, false)? else {
201                return Err(StoreError::NoSuchKey);
202            };
203            s.xsetid(last_id, entries_added, max_deleted_id)?;
204        }
205        self.bump_if_watched(key);
206        Ok(())
207    }
208
209    // ─────── consumer-group surface (sprint B) ───────
210
211    /// `XGROUP CREATE key group <id|$> [MKSTREAM]`. Returns `Ok(true)`
212    /// when a fresh group was added; `Ok(false)` if the group already
213    /// existed (caller emits `-BUSYGROUP`). `mkstream` matches Redis:
214    /// auto-create the stream key when missing.
215    pub fn xgroup_create(
216        &mut self,
217        key: &[u8],
218        group: &[u8],
219        mode: GroupCreateMode,
220        mkstream: bool,
221    ) -> Result<bool, StoreError> {
222        let exists = self.live_entry(key).is_some();
223        if !exists && !mkstream {
224            return Err(StoreError::NoSuchKey);
225        }
226        let s = self.stream_mut(key, true)?.expect("created");
227        let created = s.group_create(group, mode)?;
228        self.bump_if_watched(key);
229        self.reweigh_entry(key);
230        Ok(created)
231    }
232
233    /// `XGROUP DESTROY key group`. Returns `true` if a group was dropped.
234    pub fn xgroup_destroy(&mut self, key: &[u8], group: &[u8]) -> Result<bool, StoreError> {
235        let dropped;
236        {
237            let Some(s) = self.stream_mut(key, false)? else {
238                return Ok(false);
239            };
240            dropped = s.group_destroy(group);
241        }
242        if dropped {
243            self.bump_if_watched(key);
244            self.reweigh_entry(key);
245        }
246        Ok(dropped)
247    }
248
249    /// `XGROUP SETID key group <id|$>`.
250    pub fn xgroup_setid(
251        &mut self,
252        key: &[u8],
253        group: &[u8],
254        mode: GroupCreateMode,
255    ) -> Result<bool, StoreError> {
256        let touched;
257        {
258            let Some(s) = self.stream_mut(key, false)? else {
259                return Ok(false);
260            };
261            touched = s.group_setid(group, mode);
262        }
263        if touched {
264            self.bump_if_watched(key);
265        }
266        Ok(touched)
267    }
268
269    /// `XGROUP CREATECONSUMER key group consumer`.
270    pub fn xgroup_create_consumer(
271        &mut self,
272        key: &[u8],
273        group: &[u8],
274        consumer: &[u8],
275        now_ms: u64,
276    ) -> Result<bool, StoreError> {
277        let Some(s) = self.stream_mut(key, false)? else {
278            return Ok(false);
279        };
280        Ok(s.group_create_consumer(group, consumer, now_ms))
281    }
282
283    /// `XGROUP DELCONSUMER key group consumer`. Returns dropped PEL count.
284    pub fn xgroup_del_consumer(
285        &mut self,
286        key: &[u8],
287        group: &[u8],
288        consumer: &[u8],
289    ) -> Result<u64, StoreError> {
290        let Some(s) = self.stream_mut(key, false)? else {
291            return Ok(0);
292        };
293        Ok(s.group_del_consumer(group, consumer))
294    }
295
296    /// `XREADGROUP GROUP g c [COUNT n] [NOACK] STREAMS key id`.
297    #[allow(clippy::too_many_arguments)]
298    pub fn xreadgroup(
299        &mut self,
300        key: &[u8],
301        group: &[u8],
302        consumer: &[u8],
303        last_seen: ReadGroupId,
304        count: Option<usize>,
305        noack: bool,
306        now_ms: u64,
307    ) -> Result<EntryBatch, StoreError> {
308        let result;
309        {
310            let Some(s) = self.stream_mut(key, false)? else {
311                return Err(StoreError::NoSuchKey);
312            };
313            result = s.readgroup(group, consumer, last_seen, count, noack, now_ms)?;
314        }
315        if !result.is_empty() {
316            self.bump_if_watched(key);
317        }
318        Ok(result)
319    }
320
321    /// Non-destructive: would `XREADGROUP … STREAMS key >` yield new
322    /// entries for `group` right now? True iff the stream's last id is
323    /// past the group's last-delivered id. Used by the cross-shard BLOCK
324    /// arbiter's readiness peek — never advances the group cursor. False
325    /// for a missing key / group.
326    pub fn xreadgroup_has_new(&mut self, key: &[u8], group: &[u8]) -> Result<bool, StoreError> {
327        Ok(self
328            .stream_ref(key)?
329            .and_then(|s| s.group(group).map(|g| s.last_id() > g.last_delivered_id()))
330            .unwrap_or(false))
331    }
332
333    /// `XACK key group id [id ...]`. Returns count of PEL removals.
334    pub fn xack(&mut self, key: &[u8], group: &[u8], ids: &[StreamId]) -> Result<u64, StoreError> {
335        let n;
336        {
337            let Some(s) = self.stream_mut(key, false)? else {
338                return Ok(0);
339            };
340            n = s.ack(group, ids);
341        }
342        if n > 0 {
343            self.bump_if_watched(key);
344        }
345        Ok(n)
346    }
347
348    /// `XPENDING key group` — summary form.
349    pub fn xpending_summary(
350        &mut self,
351        key: &[u8],
352        group: &[u8],
353    ) -> Result<Option<PendingSummary>, StoreError> {
354        Ok(self.stream_ref(key)?.and_then(|s| s.pending_summary(group)))
355    }
356
357    /// `XPENDING key group [IDLE ms] start end count [consumer]` —
358    /// extended form.
359    #[allow(clippy::too_many_arguments)]
360    pub fn xpending_extended(
361        &mut self,
362        key: &[u8],
363        group: &[u8],
364        idle_min_ms: Option<u64>,
365        start: StreamId,
366        end: StreamId,
367        count: usize,
368        consumer_filter: Option<&[u8]>,
369        now_ms: u64,
370    ) -> Result<Option<PendingExtended>, StoreError> {
371        Ok(self.stream_ref(key)?.and_then(|s| {
372            s.pending_extended(group, idle_min_ms, start, end, count, consumer_filter, now_ms)
373        }))
374    }
375
376    /// `XCLAIM key group consumer min-idle-ms id [id ...] [...]`.
377    /// Returns the (id, field-value) pairs successfully claimed —
378    /// dispatcher trims to ID-only when `JUSTID` is set.
379    pub fn xclaim(
380        &mut self,
381        key: &[u8],
382        group: &[u8],
383        new_owner: &[u8],
384        ids: &[StreamId],
385        opts: &XClaimOpts,
386        now_ms: u64,
387    ) -> Result<EntryBatch, StoreError> {
388        let claimed;
389        let payloads;
390        {
391            let Some(s) = self.stream_mut(key, false)? else {
392                return Err(StoreError::NoSuchKey);
393            };
394            claimed = s.claim(group, new_owner, ids, opts, now_ms)?;
395            payloads = s.payloads_for(&claimed);
396        }
397        if !claimed.is_empty() {
398            self.bump_if_watched(key);
399        }
400        Ok(payloads)
401    }
402
403    /// `XAUTOCLAIM key group consumer min-idle-ms start [COUNT n]
404    /// [JUSTID]`. Returns the cursor + claimed payloads + deleted IDs.
405    #[allow(clippy::too_many_arguments)]
406    pub fn xautoclaim(
407        &mut self,
408        key: &[u8],
409        group: &[u8],
410        new_owner: &[u8],
411        min_idle_ms: u64,
412        start: StreamId,
413        count: usize,
414        justid: bool,
415        now_ms: u64,
416    ) -> Result<(StreamId, EntryBatch, Vec<StreamId>), StoreError> {
417        let payloads;
418        let next_cursor;
419        let deleted_ids;
420        {
421            let Some(s) = self.stream_mut(key, false)? else {
422                return Err(StoreError::NoSuchKey);
423            };
424            let AutoclaimResult { next_cursor: nc, claimed_ids, deleted_ids: di } =
425                s.autoclaim(group, new_owner, min_idle_ms, start, count, justid, now_ms)?;
426            payloads = s.payloads_for(&claimed_ids);
427            next_cursor = nc;
428            deleted_ids = di;
429        }
430        if !payloads.is_empty() {
431            self.bump_if_watched(key);
432        }
433        Ok((next_cursor, payloads, deleted_ids))
434    }
435}