1use std::collections::BTreeMap;
8
9use kevy_map::KevyMap;
10
11use super::{EntryBatch, StreamData, StreamId};
12pub(super) use super::claim::AutoclaimResult;
13use crate::value::SmallBytes;
14use crate::StoreError;
15
16#[derive(Clone)]
19pub struct ConsumerGroup {
20 pub last_delivered_id: StreamId,
23 pub pel: BTreeMap<StreamId, PelEntry>,
26 pub consumers: KevyMap<SmallBytes, Box<ConsumerState>>,
28}
29
30impl ConsumerGroup {
31 pub fn last_delivered_id(&self) -> StreamId {
33 self.last_delivered_id
34 }
35 pub fn pending_count(&self) -> usize {
37 self.pel.len()
38 }
39 pub fn consumer_count(&self) -> usize {
41 self.consumers.len()
42 }
43 pub fn consumers_iter(&self) -> impl Iterator<Item = (&[u8], &ConsumerState)> {
45 self.consumers.iter().map(|(k, v)| (k.as_slice(), v.as_ref()))
46 }
47}
48
49impl ConsumerState {
50 pub fn pending_count(&self) -> usize {
52 self.pel_count
53 }
54 pub fn last_seen_ms(&self) -> u64 {
56 self.last_seen_ms
57 }
58}
59
60impl Default for ConsumerGroup {
61 fn default() -> Self {
62 Self {
63 last_delivered_id: StreamId::MIN,
64 pel: BTreeMap::new(),
65 consumers: KevyMap::default(),
66 }
67 }
68}
69
70#[derive(Clone, Debug)]
72pub struct PelEntry {
73 pub consumer: SmallBytes,
76 pub delivery_time_ms: u64,
79 pub delivery_count: u32,
82}
83
84#[derive(Clone, Debug)]
86#[allow(dead_code)]
87pub struct ConsumerState {
88 pub name: SmallBytes,
90 pub last_seen_ms: u64,
93 pub pel_count: usize,
95}
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq)]
100pub enum GroupCreateMode {
101 AtId(StreamId),
103 AtCurrent,
105}
106
107pub struct PendingSummary {
110 pub total: u64,
112 pub id_range: Option<(StreamId, StreamId)>,
114 pub by_consumer: Vec<(Vec<u8>, u64)>,
116}
117
118pub struct PendingExtended {
121 pub rows: Vec<PendingExtendedRow>,
123}
124
125pub struct PendingExtendedRow {
127 pub id: StreamId,
129 pub consumer: Vec<u8>,
131 pub idle_ms: u64,
133 pub delivery_count: u32,
135}
136
137pub struct XClaimOpts {
140 pub min_idle_ms: u64,
142 pub idle_override_ms: Option<u64>,
145 pub time_override_ms: Option<u64>,
148 pub retrycount_override: Option<u32>,
150 pub force: bool,
153 pub justid: bool,
156}
157
158impl StreamData {
159 pub fn group_create(
163 &mut self,
164 name: &[u8],
165 mode: GroupCreateMode,
166 ) -> Result<bool, StoreError> {
167 if self.groups.contains_key(name) {
168 return Ok(false);
169 }
170 let last_delivered_id = match mode {
171 GroupCreateMode::AtId(id) => id,
172 GroupCreateMode::AtCurrent => self.last_id,
173 };
174 self.groups.insert(
175 SmallBytes::from_slice(name),
176 Box::new(ConsumerGroup {
177 last_delivered_id,
178 pel: BTreeMap::new(),
179 consumers: KevyMap::default(),
180 }),
181 );
182 Ok(true)
183 }
184
185 pub fn group_destroy(&mut self, name: &[u8]) -> bool {
187 self.groups.remove(name).is_some()
188 }
189
190 pub fn group_setid(&mut self, name: &[u8], mode: GroupCreateMode) -> bool {
193 let Some(g) = self.groups.get_mut(name) else {
194 return false;
195 };
196 g.last_delivered_id = match mode {
197 GroupCreateMode::AtId(id) => id,
198 GroupCreateMode::AtCurrent => self.last_id,
199 };
200 true
201 }
202
203 pub fn group_create_consumer(&mut self, group: &[u8], consumer: &[u8], now_ms: u64) -> bool {
207 let Some(g) = self.groups.get_mut(group) else {
208 return false;
209 };
210 if g.consumers.contains_key(consumer) {
211 return false;
212 }
213 g.consumers.insert(
214 SmallBytes::from_slice(consumer),
215 Box::new(ConsumerState {
216 name: SmallBytes::from_slice(consumer),
217 last_seen_ms: now_ms,
218 pel_count: 0,
219 }),
220 );
221 true
222 }
223
224 pub fn group_del_consumer(&mut self, group: &[u8], consumer: &[u8]) -> u64 {
227 let Some(g) = self.groups.get_mut(group) else {
228 return 0;
229 };
230 let dropped = g.pel.len();
231 g.pel.retain(|_, p| p.consumer.as_slice() != consumer);
232 let dropped = dropped - g.pel.len();
233 g.consumers.remove(consumer);
234 dropped as u64
235 }
236
237 pub fn readgroup(
242 &mut self,
243 group: &[u8],
244 consumer: &[u8],
245 last_seen_arg: ReadGroupId,
246 count: Option<usize>,
247 noack: bool,
248 now_ms: u64,
249 ) -> Result<EntryBatch, StoreError> {
250 let Some(g) = self.groups.get_mut(group) else {
251 return Err(StoreError::NoSuchKey);
252 };
253 let consumer_smb = SmallBytes::from_slice(consumer);
254 ensure_consumer(g, &consumer_smb, now_ms);
255 if let Some(cs) = g.consumers.get_mut(consumer_smb.as_slice()) {
256 cs.last_seen_ms = now_ms;
257 }
258 match last_seen_arg {
259 ReadGroupId::New => {
260 let start = g.last_delivered_id.next();
261 let entries: Vec<(StreamId, &[(SmallBytes, SmallBytes)])> = self
262 .entries
263 .range(start..=StreamId::MAX)
264 .map(|(id, fv)| (*id, fv.as_slice()))
265 .collect();
266 let take = match count {
267 Some(n) => entries.into_iter().take(n).collect::<Vec<_>>(),
268 None => entries,
269 };
270 if take.is_empty() {
271 return Ok(Vec::new());
272 }
273 if !noack {
274 record_deliveries(g, &consumer_smb, &take, now_ms);
275 }
276 let g_mut = self.groups.get_mut(group).expect("present");
277 if let Some((last_id, _)) = take.last() {
278 g_mut.last_delivered_id = *last_id;
279 }
280 Ok(super::clone_entries(take))
281 }
282 ReadGroupId::ReplayAfter(after) => {
283 let mut hit: Vec<(StreamId, Vec<(SmallBytes, SmallBytes)>)> = Vec::new();
284 let consumer_match = consumer_smb.clone();
285 for (id, pel_entry) in g.pel.range(after.next()..=StreamId::MAX) {
286 if pel_entry.consumer != consumer_match {
287 continue;
288 }
289 if let Some(fv) = self.entries.get(id) {
290 hit.push((*id, fv.clone()));
291 }
292 if let Some(n) = count
293 && hit.len() >= n
294 {
295 break;
296 }
297 }
298 Ok(hit
299 .into_iter()
300 .map(|(id, fv)| {
301 (
302 id,
303 fv.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
304 )
305 })
306 .collect())
307 }
308 }
309 }
310
311 pub fn ack(&mut self, group: &[u8], ids: &[StreamId]) -> u64 {
313 let Some(g) = self.groups.get_mut(group) else {
314 return 0;
315 };
316 let mut n = 0u64;
317 for id in ids {
318 if let Some(p) = g.pel.remove(id) {
319 if let Some(cs) = g.consumers.get_mut(p.consumer.as_slice()) {
320 cs.pel_count = cs.pel_count.saturating_sub(1);
321 }
322 n += 1;
323 }
324 }
325 n
326 }
327
328 pub fn pending_summary(&self, group: &[u8]) -> Option<PendingSummary> {
330 let g = self.groups.get(group)?;
331 let total = g.pel.len() as u64;
332 let id_range = match (g.pel.keys().next(), g.pel.keys().next_back()) {
333 (Some(lo), Some(hi)) => Some((*lo, *hi)),
334 _ => None,
335 };
336 let mut counts: Vec<(Vec<u8>, u64)> = Vec::new();
337 for p in g.pel.values() {
338 if let Some((_, n)) = counts.iter_mut().find(|(name, _)| name == p.consumer.as_slice()) {
339 *n += 1;
340 } else {
341 counts.push((p.consumer.to_vec(), 1));
342 }
343 }
344 Some(PendingSummary { total, id_range, by_consumer: counts })
345 }
346
347 #[allow(clippy::too_many_arguments)]
349 pub fn pending_extended(
350 &self,
351 group: &[u8],
352 idle_min_ms: Option<u64>,
353 start: StreamId,
354 end: StreamId,
355 count: usize,
356 consumer_filter: Option<&[u8]>,
357 now_ms: u64,
358 ) -> Option<PendingExtended> {
359 let g = self.groups.get(group)?;
360 let mut rows = Vec::with_capacity(count.min(g.pel.len()));
361 for (id, p) in g.pel.range(start..=end) {
362 if rows.len() >= count {
363 break;
364 }
365 let idle = now_ms.saturating_sub(p.delivery_time_ms);
366 if let Some(min) = idle_min_ms
367 && idle < min
368 {
369 continue;
370 }
371 if let Some(c) = consumer_filter
372 && p.consumer.as_slice() != c
373 {
374 continue;
375 }
376 rows.push(PendingExtendedRow {
377 id: *id,
378 consumer: p.consumer.to_vec(),
379 idle_ms: idle,
380 delivery_count: p.delivery_count,
381 });
382 }
383 Some(PendingExtended { rows })
384 }
385
386}
387
388#[derive(Clone, Copy, Debug, PartialEq, Eq)]
391pub enum ReadGroupId {
392 New,
394 ReplayAfter(StreamId),
396}
397
398pub(super) fn ensure_consumer(g: &mut ConsumerGroup, name: &SmallBytes, now_ms: u64) {
401 if g.consumers.get(name.as_slice()).is_none() {
402 g.consumers.insert(
403 name.clone(),
404 Box::new(ConsumerState {
405 name: name.clone(),
406 last_seen_ms: now_ms,
407 pel_count: 0,
408 }),
409 );
410 }
411}
412
413fn record_deliveries(
414 g: &mut ConsumerGroup,
415 consumer: &SmallBytes,
416 entries: &[(StreamId, &[(SmallBytes, SmallBytes)])],
417 now_ms: u64,
418) {
419 let mut new_for_consumer = 0usize;
420 for (id, _) in entries {
421 let entry = g.pel.entry(*id).or_insert_with(|| {
422 new_for_consumer += 1;
423 PelEntry {
424 consumer: consumer.clone(),
425 delivery_time_ms: now_ms,
426 delivery_count: 0,
427 }
428 });
429 if entry.consumer != *consumer {
430 if let Some(prev) = g.consumers.get_mut(entry.consumer.as_slice()) {
434 prev.pel_count = prev.pel_count.saturating_sub(1);
435 }
436 entry.consumer = consumer.clone();
437 new_for_consumer += 1;
438 }
439 entry.delivery_time_ms = now_ms;
440 entry.delivery_count = entry.delivery_count.saturating_add(1);
441 }
442 if let Some(cs) = g.consumers.get_mut(consumer.as_slice()) {
443 cs.pel_count = cs.pel_count.saturating_add(new_for_consumer);
444 }
445}