kevy_store/stream/
claim.rs1use super::group::{ConsumerGroup, ensure_consumer};
6use super::{EntryBatch, PelEntry, StreamData, StreamId, XClaimOpts};
7use crate::value::SmallBytes;
8use crate::StoreError;
9
10pub struct AutoclaimResult {
14 pub next_cursor: StreamId,
15 pub claimed_ids: Vec<StreamId>,
16 pub deleted_ids: Vec<StreamId>,
17}
18
19impl StreamData {
20 pub fn claim(
24 &mut self,
25 group: &[u8],
26 new_owner: &[u8],
27 ids: &[StreamId],
28 opts: &XClaimOpts,
29 now_ms: u64,
30 ) -> Result<Vec<StreamId>, StoreError> {
31 let Some(g) = self.groups.get_mut(group) else {
32 return Err(StoreError::NoSuchKey);
33 };
34 let new_owner_smb = SmallBytes::from_slice(new_owner);
35 ensure_consumer(g, &new_owner_smb, now_ms);
36 let mut claimed = Vec::new();
37 for id in ids {
38 if !claim_one(g, &self.entries, *id, &new_owner_smb, opts, now_ms) {
39 continue;
40 }
41 claimed.push(*id);
42 }
43 Ok(claimed)
44 }
45
46 #[allow(clippy::too_many_arguments)]
51 pub fn autoclaim(
52 &mut self,
53 group: &[u8],
54 new_owner: &[u8],
55 min_idle_ms: u64,
56 start: StreamId,
57 count: usize,
58 justid: bool,
59 now_ms: u64,
60 ) -> Result<AutoclaimResult, StoreError> {
61 let opts = XClaimOpts {
62 min_idle_ms,
63 idle_override_ms: None,
64 time_override_ms: None,
65 retrycount_override: None,
66 force: false,
67 justid,
68 };
69 let candidates: Vec<StreamId> = {
70 let Some(g) = self.groups.get(group) else {
71 return Err(StoreError::NoSuchKey);
72 };
73 g.pel
74 .range(start..=StreamId::MAX)
75 .filter(|(_, p)| now_ms.saturating_sub(p.delivery_time_ms) >= min_idle_ms)
76 .take(count)
77 .map(|(id, _)| *id)
78 .collect()
79 };
80 let next_cursor = candidates
81 .last()
82 .map_or(StreamId::MIN, |id| id.next());
83 let claimed = self.claim(group, new_owner, &candidates, &opts, now_ms)?;
84 let mut deleted = Vec::new();
85 for id in &candidates {
86 if !self.entries.contains_key(id) && !claimed.contains(id) {
87 deleted.push(*id);
88 }
89 }
90 Ok(AutoclaimResult { next_cursor, claimed_ids: claimed, deleted_ids: deleted })
91 }
92
93 pub fn payloads_for(&self, ids: &[StreamId]) -> EntryBatch {
97 ids.iter()
98 .filter_map(|id| {
99 self.entries.get(id).map(|fv| {
100 (
101 *id,
102 fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
103 )
104 })
105 })
106 .collect()
107 }
108}
109
110fn claim_one(
115 g: &mut ConsumerGroup,
116 entries: &std::collections::BTreeMap<StreamId, Vec<(SmallBytes, SmallBytes)>>,
117 id: StreamId,
118 new_owner: &SmallBytes,
119 opts: &XClaimOpts,
120 now_ms: u64,
121) -> bool {
122 let entry_present = g.pel.contains_key(&id);
123 if !entry_present && !opts.force {
124 return false;
125 }
126 if !entries.contains_key(&id) {
127 if let Some(p) = g.pel.remove(&id)
128 && let Some(cs) = g.consumers.get_mut(p.consumer.as_slice())
129 {
130 cs.pel_count = cs.pel_count.saturating_sub(1);
131 }
132 return false;
133 }
134 if let Some(existing) = g.pel.get(&id) {
135 let idle = now_ms.saturating_sub(existing.delivery_time_ms);
136 if idle < opts.min_idle_ms {
137 return false;
138 }
139 }
140 let new_dt = opts
141 .time_override_ms
142 .or_else(|| opts.idle_override_ms.map(|i| now_ms.saturating_sub(i)))
143 .unwrap_or(now_ms);
144 let new_dc = opts.retrycount_override.unwrap_or_else(|| {
145 let base = g.pel.get(&id).map_or(0, |p| p.delivery_count);
146 if opts.justid { base.max(1) } else { base.saturating_add(1) }
147 });
148 let prev = g.pel.insert(
149 id,
150 PelEntry {
151 consumer: new_owner.clone(),
152 delivery_time_ms: new_dt,
153 delivery_count: new_dc,
154 },
155 );
156 transfer_ownership_counts(g, prev.as_ref(), new_owner);
157 true
158}
159
160fn transfer_ownership_counts(
161 g: &mut ConsumerGroup,
162 prev: Option<&PelEntry>,
163 new_owner: &SmallBytes,
164) {
165 match prev {
166 Some(p) if p.consumer != *new_owner => {
167 if let Some(cs) = g.consumers.get_mut(p.consumer.as_slice()) {
168 cs.pel_count = cs.pel_count.saturating_sub(1);
169 }
170 if let Some(cs) = g.consumers.get_mut(new_owner.as_slice()) {
171 cs.pel_count = cs.pel_count.saturating_add(1);
172 }
173 }
174 Some(_) => {}
175 None => {
176 if let Some(cs) = g.consumers.get_mut(new_owner.as_slice()) {
177 cs.pel_count = cs.pel_count.saturating_add(1);
178 }
179 }
180 }
181}