Skip to main content

kevy_store/stream/
group.rs

1//! Consumer groups for v2-7 streams (sprint B). The group state lives
2//! inside its parent [`crate::stream::StreamData`] so XADD / XDEL can
3//! see the group map without an extra lookup. This file owns the
4//! types + the in-stream operations; the `Store`-side wrappers live in
5//! `stream/store.rs` next to the rest of the public API.
6
7use std::collections::BTreeMap;
8
9use kevy_map::KevyMap;
10
11use super::{EntryBatch, StreamData, StreamId};
12pub(super) use super::claim::AutoclaimResult;
13use crate::value::SmallBytes;
14use crate::StoreError;
15
16/// One consumer group's state. Sorted PEL plus a map of known
17/// consumers (with cached pel_count for O(1) XINFO answers).
18#[derive(Clone)]
19pub struct ConsumerGroup {
20    /// Highest ID delivered to any consumer in this group. Bumped by
21    /// XREADGROUP with `>`; settable via XGROUP SETID.
22    pub last_delivered_id: StreamId,
23    /// Pending-Entries List: every ID delivered but not yet ACKed.
24    /// Sorted by ID for `XPENDING start end` range queries.
25    pub pel: BTreeMap<StreamId, PelEntry>,
26    /// Consumers known to this group (by name).
27    pub consumers: KevyMap<SmallBytes, Box<ConsumerState>>,
28}
29
30impl ConsumerGroup {
31    /// Highest ID delivered by this group — for `XINFO GROUPS`.
32    pub fn last_delivered_id(&self) -> StreamId {
33        self.last_delivered_id
34    }
35    /// Total pending entries — `XINFO GROUPS`'s `pending`.
36    pub fn pending_count(&self) -> usize {
37        self.pel.len()
38    }
39    /// Known consumer count — `XINFO GROUPS`'s `consumers`.
40    pub fn consumer_count(&self) -> usize {
41        self.consumers.len()
42    }
43    /// Iterate `(consumer_name, consumer)` pairs — `XINFO CONSUMERS`.
44    pub fn consumers_iter(&self) -> impl Iterator<Item = (&[u8], &ConsumerState)> {
45        self.consumers.iter().map(|(k, v)| (k.as_slice(), v.as_ref()))
46    }
47}
48
49impl ConsumerState {
50    /// `XINFO CONSUMERS`' `pending` field.
51    pub fn pending_count(&self) -> usize {
52        self.pel_count
53    }
54    /// Last unix-ms this consumer interacted with the group.
55    pub fn last_seen_ms(&self) -> u64 {
56        self.last_seen_ms
57    }
58}
59
60impl Default for ConsumerGroup {
61    fn default() -> Self {
62        Self {
63            last_delivered_id: StreamId::MIN,
64            pel: BTreeMap::new(),
65            consumers: KevyMap::default(),
66        }
67    }
68}
69
70/// One pending entry: who got it, when, and how many times.
71#[derive(Clone, Debug)]
72pub struct PelEntry {
73    /// Owning consumer's name. Used by XPENDING's `consumer` filter
74    /// and XCLAIM's ownership transfer.
75    pub consumer: SmallBytes,
76    /// Last delivery wall-clock (unix-ms). XCLAIM compares idle =
77    /// `now - delivery_time_ms` against its `min-idle-ms` arg.
78    pub delivery_time_ms: u64,
79    /// Number of times this entry has been delivered (=1 on first
80    /// XREADGROUP, +=1 on each XCLAIM that doesn't have JUSTID).
81    pub delivery_count: u32,
82}
83
84/// Per-consumer cached counters so `XINFO CONSUMERS` answers in O(1).
85#[derive(Clone, Debug)]
86#[allow(dead_code)]
87pub struct ConsumerState {
88    /// Consumer name. Read by XINFO CONSUMERS (sprint C).
89    pub name: SmallBytes,
90    /// Last wall-clock (unix-ms) the consumer interacted with the
91    /// group (any XREADGROUP / XACK / XCLAIM touch).
92    pub last_seen_ms: u64,
93    /// Cached size of this consumer's slice of the PEL.
94    pub pel_count: usize,
95}
96
97/// `XGROUP CREATE` ID argument: either an explicit ID or `$`
98/// (= current stream's `last_id`, resolved by the caller).
99#[derive(Clone, Copy, Debug, PartialEq, Eq)]
100pub enum GroupCreateMode {
101    /// `<ms>-<seq>` literal — the group's `last_delivered_id` starts here.
102    AtId(StreamId),
103    /// `$` — resolve to the stream's current `last_id` at create time.
104    AtCurrent,
105}
106
107/// Summary form of `XPENDING key group` (only 3 args): total pending,
108/// min/max IDs across the PEL, and per-consumer aggregate counts.
109pub struct PendingSummary {
110    /// Total pending entries across all consumers.
111    pub total: u64,
112    /// Smallest and largest pending IDs, or `None` if the PEL is empty.
113    pub id_range: Option<(StreamId, StreamId)>,
114    /// `(consumer, count)` pairs in arbitrary order.
115    pub by_consumer: Vec<(Vec<u8>, u64)>,
116}
117
118/// Extended form of `XPENDING key group [IDLE ms] start end count
119/// [consumer]`: one row per matching PEL entry.
120pub struct PendingExtended {
121    /// Per-entry rows in ID-ascending order.
122    pub rows: Vec<PendingExtendedRow>,
123}
124
125/// One row of the extended XPENDING reply.
126pub struct PendingExtendedRow {
127    /// Entry ID.
128    pub id: StreamId,
129    /// Owning consumer's name.
130    pub consumer: Vec<u8>,
131    /// Idle time in milliseconds (now - delivery_time_ms).
132    pub idle_ms: u64,
133    /// Delivery count.
134    pub delivery_count: u32,
135}
136
137/// Knobs for [`crate::StreamData`]'s `xclaim`: `min-idle-ms` plus the
138/// `IDLE`/`TIME`/`RETRYCOUNT`/`FORCE`/`JUSTID` flag tail.
139pub struct XClaimOpts {
140    /// Only claim entries idle for at least this many ms.
141    pub min_idle_ms: u64,
142    /// Override post-claim idle to this many ms (else 0 — XCLAIM resets
143    /// the clock so the new owner has the full idle window).
144    pub idle_override_ms: Option<u64>,
145    /// Override post-claim delivery_time_ms to this absolute unix-ms.
146    /// Takes precedence over `idle_override_ms` if both set.
147    pub time_override_ms: Option<u64>,
148    /// Override post-claim `delivery_count` (else +=1).
149    pub retrycount_override: Option<u32>,
150    /// `FORCE`: claim even if the entry isn't in the PEL yet (creates
151    /// a fresh PEL row with delivery_count=1).
152    pub force: bool,
153    /// `JUSTID`: skip the +=1 on `delivery_count` (used by tools that
154    /// don't intend a real redelivery).
155    pub justid: bool,
156}
157
158impl StreamData {
159    /// `XGROUP CREATE key group <id|$> [MKSTREAM]`. Returns `true` if
160    /// a new group was created; `false` if the group already existed
161    /// (caller should report Redis's `-BUSYGROUP` error in that case).
162    pub fn group_create(
163        &mut self,
164        name: &[u8],
165        mode: GroupCreateMode,
166    ) -> Result<bool, StoreError> {
167        if self.groups.contains_key(name) {
168            return Ok(false);
169        }
170        let last_delivered_id = match mode {
171            GroupCreateMode::AtId(id) => id,
172            GroupCreateMode::AtCurrent => self.last_id,
173        };
174        self.groups.insert(
175            SmallBytes::from_slice(name),
176            Box::new(ConsumerGroup {
177                last_delivered_id,
178                pel: BTreeMap::new(),
179                consumers: KevyMap::default(),
180            }),
181        );
182        Ok(true)
183    }
184
185    /// `XGROUP DESTROY key group`. Returns `true` if a group was dropped.
186    pub fn group_destroy(&mut self, name: &[u8]) -> bool {
187        self.groups.remove(name).is_some()
188    }
189
190    /// `XGROUP SETID key group <id|$>`. Returns `false` if the group
191    /// doesn't exist.
192    pub fn group_setid(&mut self, name: &[u8], mode: GroupCreateMode) -> bool {
193        let Some(g) = self.groups.get_mut(name) else {
194            return false;
195        };
196        g.last_delivered_id = match mode {
197            GroupCreateMode::AtId(id) => id,
198            GroupCreateMode::AtCurrent => self.last_id,
199        };
200        true
201    }
202
203    /// `XGROUP CREATECONSUMER key group consumer`. Returns `true` if a
204    /// new consumer was inserted, `false` if it already existed or the
205    /// group is missing.
206    pub fn group_create_consumer(&mut self, group: &[u8], consumer: &[u8], now_ms: u64) -> bool {
207        let Some(g) = self.groups.get_mut(group) else {
208            return false;
209        };
210        if g.consumers.contains_key(consumer) {
211            return false;
212        }
213        g.consumers.insert(
214            SmallBytes::from_slice(consumer),
215            Box::new(ConsumerState {
216                name: SmallBytes::from_slice(consumer),
217                last_seen_ms: now_ms,
218                pel_count: 0,
219            }),
220        );
221        true
222    }
223
224    /// `XGROUP DELCONSUMER key group consumer`. Returns the number of
225    /// PEL entries dropped along with the consumer (matches Redis).
226    pub fn group_del_consumer(&mut self, group: &[u8], consumer: &[u8]) -> u64 {
227        let Some(g) = self.groups.get_mut(group) else {
228            return 0;
229        };
230        let dropped = g.pel.len();
231        g.pel.retain(|_, p| p.consumer.as_slice() != consumer);
232        let dropped = dropped - g.pel.len();
233        g.consumers.remove(consumer);
234        dropped as u64
235    }
236
237    /// `XREADGROUP GROUP g c [COUNT n] STREAMS key id`. ID `>` →
238    /// "new entries since last_delivered_id" (updates last_delivered);
239    /// ID `<x>` → "PEL entries for this consumer with id > x" (does
240    /// NOT update last_delivered, used for replay).
241    pub fn readgroup(
242        &mut self,
243        group: &[u8],
244        consumer: &[u8],
245        last_seen_arg: ReadGroupId,
246        count: Option<usize>,
247        noack: bool,
248        now_ms: u64,
249    ) -> Result<EntryBatch, StoreError> {
250        let Some(g) = self.groups.get_mut(group) else {
251            return Err(StoreError::NoSuchKey);
252        };
253        let consumer_smb = SmallBytes::from_slice(consumer);
254        ensure_consumer(g, &consumer_smb, now_ms);
255        if let Some(cs) = g.consumers.get_mut(consumer_smb.as_slice()) {
256            cs.last_seen_ms = now_ms;
257        }
258        match last_seen_arg {
259            ReadGroupId::New => {
260                let start = g.last_delivered_id.next();
261                let entries: Vec<(StreamId, &[(SmallBytes, SmallBytes)])> = self
262                    .entries
263                    .range(start..=StreamId::MAX)
264                    .map(|(id, fv)| (*id, fv.as_slice()))
265                    .collect();
266                let take = match count {
267                    Some(n) => entries.into_iter().take(n).collect::<Vec<_>>(),
268                    None => entries,
269                };
270                if take.is_empty() {
271                    return Ok(Vec::new());
272                }
273                if !noack {
274                    record_deliveries(g, &consumer_smb, &take, now_ms);
275                }
276                let g_mut = self.groups.get_mut(group).expect("present");
277                if let Some((last_id, _)) = take.last() {
278                    g_mut.last_delivered_id = *last_id;
279                }
280                Ok(super::clone_entries(take))
281            }
282            ReadGroupId::ReplayAfter(after) => {
283                let mut hit: Vec<(StreamId, Vec<(SmallBytes, SmallBytes)>)> = Vec::new();
284                let consumer_match = consumer_smb.clone();
285                for (id, pel_entry) in g.pel.range(after.next()..=StreamId::MAX) {
286                    if pel_entry.consumer != consumer_match {
287                        continue;
288                    }
289                    if let Some(fv) = self.entries.get(id) {
290                        hit.push((*id, fv.clone()));
291                    }
292                    if let Some(n) = count
293                        && hit.len() >= n
294                    {
295                        break;
296                    }
297                }
298                Ok(hit
299                    .into_iter()
300                    .map(|(id, fv)| {
301                        (
302                            id,
303                            fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
304                        )
305                    })
306                    .collect())
307            }
308        }
309    }
310
311    /// `XACK key group id [...]`. Returns count of PEL entries removed.
312    pub fn ack(&mut self, group: &[u8], ids: &[StreamId]) -> u64 {
313        let Some(g) = self.groups.get_mut(group) else {
314            return 0;
315        };
316        let mut n = 0u64;
317        for id in ids {
318            if let Some(p) = g.pel.remove(id) {
319                if let Some(cs) = g.consumers.get_mut(p.consumer.as_slice()) {
320                    cs.pel_count = cs.pel_count.saturating_sub(1);
321                }
322                n += 1;
323            }
324        }
325        n
326    }
327
328    /// `XPENDING key group` — the summary form (4-tuple).
329    pub fn pending_summary(&self, group: &[u8]) -> Option<PendingSummary> {
330        let g = self.groups.get(group)?;
331        let total = g.pel.len() as u64;
332        let id_range = match (g.pel.keys().next(), g.pel.keys().next_back()) {
333            (Some(lo), Some(hi)) => Some((*lo, *hi)),
334            _ => None,
335        };
336        let mut counts: Vec<(Vec<u8>, u64)> = Vec::new();
337        for p in g.pel.values() {
338            if let Some((_, n)) = counts.iter_mut().find(|(name, _)| name == p.consumer.as_slice()) {
339                *n += 1;
340            } else {
341                counts.push((p.consumer.to_vec(), 1));
342            }
343        }
344        Some(PendingSummary { total, id_range, by_consumer: counts })
345    }
346
347    /// `XPENDING key group [IDLE ms] start end count [consumer]`.
348    #[allow(clippy::too_many_arguments)]
349    pub fn pending_extended(
350        &self,
351        group: &[u8],
352        idle_min_ms: Option<u64>,
353        start: StreamId,
354        end: StreamId,
355        count: usize,
356        consumer_filter: Option<&[u8]>,
357        now_ms: u64,
358    ) -> Option<PendingExtended> {
359        let g = self.groups.get(group)?;
360        let mut rows = Vec::with_capacity(count.min(g.pel.len()));
361        for (id, p) in g.pel.range(start..=end) {
362            if rows.len() >= count {
363                break;
364            }
365            let idle = now_ms.saturating_sub(p.delivery_time_ms);
366            if let Some(min) = idle_min_ms
367                && idle < min
368            {
369                continue;
370            }
371            if let Some(c) = consumer_filter
372                && p.consumer.as_slice() != c
373            {
374                continue;
375            }
376            rows.push(PendingExtendedRow {
377                id: *id,
378                consumer: p.consumer.to_vec(),
379                idle_ms: idle,
380                delivery_count: p.delivery_count,
381            });
382        }
383        Some(PendingExtended { rows })
384    }
385
386}
387
388/// XREADGROUP's per-stream ID: either `>` (= new entries) or an explicit
389/// "after this id" for PEL replay.
390#[derive(Clone, Copy, Debug, PartialEq, Eq)]
391pub enum ReadGroupId {
392    /// `>` — new entries only.
393    New,
394    /// `<id>` — replay PEL entries strictly after this id.
395    ReplayAfter(StreamId),
396}
397
398/// Idempotent insert: ensure the named consumer exists in this group's
399/// roster so subsequent `pel_count`/`last_seen_ms` updates have a slot.
400pub(super) fn ensure_consumer(g: &mut ConsumerGroup, name: &SmallBytes, now_ms: u64) {
401    if g.consumers.get(name.as_slice()).is_none() {
402        g.consumers.insert(
403            name.clone(),
404            Box::new(ConsumerState {
405                name: name.clone(),
406                last_seen_ms: now_ms,
407                pel_count: 0,
408            }),
409        );
410    }
411}
412
413fn record_deliveries(
414    g: &mut ConsumerGroup,
415    consumer: &SmallBytes,
416    entries: &[(StreamId, &[(SmallBytes, SmallBytes)])],
417    now_ms: u64,
418) {
419    let mut new_for_consumer = 0usize;
420    for (id, _) in entries {
421        let entry = g.pel.entry(*id).or_insert_with(|| {
422            new_for_consumer += 1;
423            PelEntry {
424                consumer: consumer.clone(),
425                delivery_time_ms: now_ms,
426                delivery_count: 0,
427            }
428        });
429        if entry.consumer != *consumer {
430            // Ownership transfer via the read path is unusual; Redis
431            // does it on `>` reads only when the PEL already had an
432            // entry from a previous owner — treat as XCLAIM-style.
433            if let Some(prev) = g.consumers.get_mut(entry.consumer.as_slice()) {
434                prev.pel_count = prev.pel_count.saturating_sub(1);
435            }
436            entry.consumer = consumer.clone();
437            new_for_consumer += 1;
438        }
439        entry.delivery_time_ms = now_ms;
440        entry.delivery_count = entry.delivery_count.saturating_add(1);
441    }
442    if let Some(cs) = g.consumers.get_mut(consumer.as_slice()) {
443        cs.pel_count = cs.pel_count.saturating_add(new_for_consumer);
444    }
445}