Skip to main content

kevy_store/stream/
claim.rs

1//! `XCLAIM` / `XAUTOCLAIM` impls — split out of `stream/group.rs` so
2//! both files stay under the project's ≤500-LOC rule. Owns the
3//! `AutoclaimResult` return type alongside the methods that produce it.
4
5use super::group::{ConsumerGroup, ensure_consumer};
6use super::{EntryBatch, PelEntry, StreamData, StreamId, XClaimOpts};
7use crate::value::SmallBytes;
8use crate::StoreError;
9
10/// Snapshot of `XAUTOCLAIM` work in progress: cursor for the next
11/// call, IDs successfully transferred, and IDs skipped because the
12/// stream has since deleted them.
13pub struct AutoclaimResult {
14    pub next_cursor: StreamId,
15    pub claimed_ids: Vec<StreamId>,
16    pub deleted_ids: Vec<StreamId>,
17}
18
19impl StreamData {
20    /// `XCLAIM key group consumer min-idle-ms id [id ...] [...]`.
21    /// Returns the IDs successfully claimed (the dispatcher decides
22    /// whether to emit JUSTID or full entries).
23    pub fn claim(
24        &mut self,
25        group: &[u8],
26        new_owner: &[u8],
27        ids: &[StreamId],
28        opts: &XClaimOpts,
29        now_ms: u64,
30    ) -> Result<Vec<StreamId>, StoreError> {
31        let Some(g) = self.groups.get_mut(group) else {
32            return Err(StoreError::NoSuchKey);
33        };
34        let new_owner_smb = SmallBytes::from_slice(new_owner);
35        ensure_consumer(g, &new_owner_smb, now_ms);
36        let mut claimed = Vec::new();
37        for id in ids {
38            if !claim_one(g, &self.entries, *id, &new_owner_smb, opts, now_ms) {
39                continue;
40            }
41            claimed.push(*id);
42        }
43        Ok(claimed)
44    }
45
46    /// `XAUTOCLAIM key group consumer min-idle-ms start [COUNT n]
47    /// [JUSTID]`. Walks the PEL from `start` onward, claiming the
48    /// first `count` entries whose idle ≥ `min_idle_ms`. Returns
49    /// `(next_cursor_id, claimed_ids, deleted_ids)`.
50    #[allow(clippy::too_many_arguments)]
51    pub fn autoclaim(
52        &mut self,
53        group: &[u8],
54        new_owner: &[u8],
55        min_idle_ms: u64,
56        start: StreamId,
57        count: usize,
58        justid: bool,
59        now_ms: u64,
60    ) -> Result<AutoclaimResult, StoreError> {
61        let opts = XClaimOpts {
62            min_idle_ms,
63            idle_override_ms: None,
64            time_override_ms: None,
65            retrycount_override: None,
66            force: false,
67            justid,
68        };
69        let candidates: Vec<StreamId> = {
70            let Some(g) = self.groups.get(group) else {
71                return Err(StoreError::NoSuchKey);
72            };
73            g.pel
74                .range(start..=StreamId::MAX)
75                .filter(|(_, p)| now_ms.saturating_sub(p.delivery_time_ms) >= min_idle_ms)
76                .take(count)
77                .map(|(id, _)| *id)
78                .collect()
79        };
80        let next_cursor = candidates
81            .last()
82            .map_or(StreamId::MIN, |id| id.next());
83        let claimed = self.claim(group, new_owner, &candidates, &opts, now_ms)?;
84        let mut deleted = Vec::new();
85        for id in &candidates {
86            if !self.entries.contains_key(id) && !claimed.contains(id) {
87                deleted.push(*id);
88            }
89        }
90        Ok(AutoclaimResult { next_cursor, claimed_ids: claimed, deleted_ids: deleted })
91    }
92
93    /// Field-value payload list pairing with `ids` (from
94    /// [`Self::claim`] / [`Self::autoclaim`]). Skips IDs that were
95    /// XDELed between claim and emit.
96    pub fn payloads_for(&self, ids: &[StreamId]) -> EntryBatch {
97        ids.iter()
98            .filter_map(|id| {
99                self.entries.get(id).map(|fv| {
100                    (
101                        *id,
102                        fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
103                    )
104                })
105            })
106            .collect()
107    }
108}
109
110/// Attempt one XCLAIM. Returns `true` if the entry was successfully
111/// transferred to `new_owner`. The `entries` ref is the stream's
112/// entry map (passed in to avoid an extra `&mut self` borrow when
113/// `claim` is called over a slice of IDs).
114fn claim_one(
115    g: &mut ConsumerGroup,
116    entries: &std::collections::BTreeMap<StreamId, Vec<(SmallBytes, SmallBytes)>>,
117    id: StreamId,
118    new_owner: &SmallBytes,
119    opts: &XClaimOpts,
120    now_ms: u64,
121) -> bool {
122    let entry_present = g.pel.contains_key(&id);
123    if !entry_present && !opts.force {
124        return false;
125    }
126    if !entries.contains_key(&id) {
127        if let Some(p) = g.pel.remove(&id)
128            && let Some(cs) = g.consumers.get_mut(p.consumer.as_slice())
129        {
130            cs.pel_count = cs.pel_count.saturating_sub(1);
131        }
132        return false;
133    }
134    if let Some(existing) = g.pel.get(&id) {
135        let idle = now_ms.saturating_sub(existing.delivery_time_ms);
136        if idle < opts.min_idle_ms {
137            return false;
138        }
139    }
140    let new_dt = opts
141        .time_override_ms
142        .or_else(|| opts.idle_override_ms.map(|i| now_ms.saturating_sub(i)))
143        .unwrap_or(now_ms);
144    let new_dc = opts.retrycount_override.unwrap_or_else(|| {
145        let base = g.pel.get(&id).map_or(0, |p| p.delivery_count);
146        if opts.justid { base.max(1) } else { base.saturating_add(1) }
147    });
148    let prev = g.pel.insert(
149        id,
150        PelEntry {
151            consumer: new_owner.clone(),
152            delivery_time_ms: new_dt,
153            delivery_count: new_dc,
154        },
155    );
156    transfer_ownership_counts(g, prev.as_ref(), new_owner);
157    true
158}
159
160fn transfer_ownership_counts(
161    g: &mut ConsumerGroup,
162    prev: Option<&PelEntry>,
163    new_owner: &SmallBytes,
164) {
165    match prev {
166        Some(p) if p.consumer != *new_owner => {
167            if let Some(cs) = g.consumers.get_mut(p.consumer.as_slice()) {
168                cs.pel_count = cs.pel_count.saturating_sub(1);
169            }
170            if let Some(cs) = g.consumers.get_mut(new_owner.as_slice()) {
171                cs.pel_count = cs.pel_count.saturating_add(1);
172            }
173        }
174        Some(_) => {}
175        None => {
176            if let Some(cs) = g.consumers.get_mut(new_owner.as_slice()) {
177                cs.pel_count = cs.pel_count.saturating_add(1);
178            }
179        }
180    }
181}