1use std::collections::{HashMap, VecDeque};
15use std::time::{Duration, Instant};
16
17use crate::ring::{CdcEvent, CdcRing};
18
19#[derive(Debug, Clone)]
24pub struct PendingEntry {
25 pub event_seq: u64,
27 pub delivery_count: u32,
29 pub first_delivery: Instant,
31 pub last_delivery: Instant,
33}
34
35#[derive(Debug)]
37pub struct ConsumerState {
38 pub last_acked_seq: u64,
40 pub pending: VecDeque<PendingEntry>,
42 pub needs_resync: bool,
45}
46
47impl ConsumerState {
48 fn new(start_seq: u64) -> Self {
49 Self {
50 last_acked_seq: start_seq,
51 pending: VecDeque::new(),
52 needs_resync: false,
53 }
54 }
55}
56
57#[derive(Debug)]
63pub struct ConsumerGroup {
64 pub name: String,
66 pub consumers: HashMap<String, ConsumerState>,
68 pub last_delivered_seq: u64,
70 pub idle_timeout: Duration,
72}
73
74impl ConsumerGroup {
75 pub fn new(name: String, start_seq: u64, idle_timeout: Duration) -> Self {
77 Self {
78 name,
79 consumers: HashMap::new(),
80 last_delivered_seq: start_seq,
81 idle_timeout,
82 }
83 }
84}
85
86#[derive(Debug)]
88pub struct GroupReadResult {
89 pub events: Vec<CdcEvent>,
91 pub gap: bool,
94}
95
96#[derive(Debug, Clone)]
98pub struct PendingSummary {
99 pub seq: u64,
101 pub consumer: String,
103 pub idle_ms: u64,
105 pub delivery_count: u32,
107}
108
109#[derive(Debug, thiserror::Error)]
111pub enum ConsumerGroupError {
112 #[error("BUSYGROUP consumer group name already exists")]
114 GroupExists,
115 #[error("NOGROUP no such consumer group '{0}'")]
117 NoGroup(String),
118 #[error("ERR CDC ring not available")]
120 NoRing,
121}
122
123pub struct ConsumerGroupManager {
129 groups: HashMap<String, ConsumerGroup>,
130 default_idle_timeout: Duration,
131}
132
133impl ConsumerGroupManager {
134 pub fn new(default_idle_timeout: Duration) -> Self {
137 Self {
138 groups: HashMap::new(),
139 default_idle_timeout,
140 }
141 }
142
143 pub fn create_group(&mut self, name: &str, start_seq: u64) -> Result<(), ConsumerGroupError> {
147 if self.groups.contains_key(name) {
148 return Err(ConsumerGroupError::GroupExists);
149 }
150 self.groups.insert(
151 name.to_string(),
152 ConsumerGroup::new(name.to_string(), start_seq, self.default_idle_timeout),
153 );
154 Ok(())
155 }
156
157 pub fn read_group(
163 &mut self,
164 ring: &CdcRing,
165 group_name: &str,
166 consumer_name: &str,
167 count: usize,
168 ) -> Result<GroupReadResult, ConsumerGroupError> {
169 let group = self
170 .groups
171 .get_mut(group_name)
172 .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
173
174 let now = Instant::now();
175
176 if !group.consumers.contains_key(consumer_name) {
177 group.consumers.insert(
178 consumer_name.to_string(),
179 ConsumerState::new(group.last_delivered_seq),
180 );
181 }
182
183 let mut delivered = Vec::new();
184 let mut gap = false;
185
186 let idle_timeout = group.idle_timeout;
187 let timed_out = collect_timed_out_entries(group, consumer_name, idle_timeout, now, count);
188
189 for (seq, original_consumer) in &timed_out {
190 if let Some(event) = ring.get(*seq) {
191 delivered.push(event.clone());
192
193 if let Some(consumer_state) = group.consumers.get_mut(original_consumer.as_str()) {
194 consumer_state.pending.retain(|p| p.event_seq != *seq);
195 }
196 }
197 }
198
199 let consumer = group
200 .consumers
201 .get_mut(consumer_name)
202 .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
203
204 for (seq, _) in &timed_out {
205 consumer.pending.push_back(PendingEntry {
206 event_seq: *seq,
207 delivery_count: 2,
208 first_delivery: now,
209 last_delivery: now,
210 });
211 }
212
213 let remaining = count.saturating_sub(delivered.len());
214 if remaining > 0 {
215 let read_result = ring.read(group.last_delivered_seq, remaining);
216 if read_result.gap {
217 gap = true;
218 consumer.needs_resync = true;
219 }
220
221 for event in &read_result.events {
222 consumer.pending.push_back(PendingEntry {
223 event_seq: event.seq,
224 delivery_count: 1,
225 first_delivery: now,
226 last_delivery: now,
227 });
228 delivered.push(event.clone());
229 }
230
231 group.last_delivered_seq = read_result.next_seq;
232 }
233
234 Ok(GroupReadResult {
235 events: delivered,
236 gap,
237 })
238 }
239
240 pub fn ack(&mut self, group_name: &str, seqs: &[u64]) -> Result<usize, ConsumerGroupError> {
245 let group = self
246 .groups
247 .get_mut(group_name)
248 .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
249
250 let mut acked = 0;
251 for consumer in group.consumers.values_mut() {
252 let before = consumer.pending.len();
253 consumer.pending.retain(|p| !seqs.contains(&p.event_seq));
254 let removed = before - consumer.pending.len();
255 acked += removed;
256
257 for &seq in seqs {
258 if seq > consumer.last_acked_seq {
259 consumer.last_acked_seq = seq;
260 }
261 }
262 }
263
264 Ok(acked)
265 }
266
267 pub fn pending(&self, group_name: &str) -> Result<Vec<PendingSummary>, ConsumerGroupError> {
270 let group = self
271 .groups
272 .get(group_name)
273 .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
274
275 let now = Instant::now();
276 let mut result = Vec::new();
277
278 for (consumer_name, state) in &group.consumers {
279 for entry in &state.pending {
280 let idle = now.duration_since(entry.last_delivery);
281 result.push(PendingSummary {
282 seq: entry.event_seq,
283 consumer: consumer_name.clone(),
284 idle_ms: idle.as_millis() as u64,
285 delivery_count: entry.delivery_count,
286 });
287 }
288 }
289
290 result.sort_by_key(|p| p.seq);
291 Ok(result)
292 }
293
294 pub fn claim(
299 &mut self,
300 group_name: &str,
301 target_consumer: &str,
302 min_idle: Duration,
303 seqs: &[u64],
304 ) -> Result<Vec<u64>, ConsumerGroupError> {
305 let group = self
306 .groups
307 .get_mut(group_name)
308 .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
309
310 let now = Instant::now();
311 let mut claimed_entries: Vec<(u64, PendingEntry)> = Vec::new();
312
313 for consumer in group.consumers.values_mut() {
314 let mut remaining = VecDeque::new();
315 for entry in consumer.pending.drain(..) {
316 let idle = now.duration_since(entry.last_delivery);
317 if seqs.contains(&entry.event_seq) && idle >= min_idle {
318 claimed_entries.push((
319 entry.event_seq,
320 PendingEntry {
321 event_seq: entry.event_seq,
322 delivery_count: entry.delivery_count + 1,
323 first_delivery: entry.first_delivery,
324 last_delivery: now,
325 },
326 ));
327 } else {
328 remaining.push_back(entry);
329 }
330 }
331 consumer.pending = remaining;
332 }
333
334 if !group.consumers.contains_key(target_consumer) {
335 group.consumers.insert(
336 target_consumer.to_string(),
337 ConsumerState::new(group.last_delivered_seq),
338 );
339 }
340
341 let claimed_seqs: Vec<u64> = claimed_entries.iter().map(|(seq, _)| *seq).collect();
342
343 let target = group
344 .consumers
345 .get_mut(target_consumer)
346 .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
347
348 for (_, entry) in claimed_entries {
349 target.pending.push_back(entry);
350 }
351
352 Ok(claimed_seqs)
353 }
354
355 pub fn has_group(&self, name: &str) -> bool {
357 self.groups.contains_key(name)
358 }
359
360 pub fn group_count(&self) -> usize {
362 self.groups.len()
363 }
364
365 pub fn check_gap(
370 &mut self,
371 ring: &CdcRing,
372 group_name: &str,
373 consumer_name: &str,
374 ) -> Result<bool, ConsumerGroupError> {
375 let group = self
376 .groups
377 .get_mut(group_name)
378 .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
379
380 let consumer = match group.consumers.get_mut(consumer_name) {
381 Some(c) => c,
382 None => return Ok(false),
383 };
384
385 if ring.start_seq() > consumer.last_acked_seq {
386 consumer.needs_resync = true;
387 return Ok(true);
388 }
389
390 Ok(consumer.needs_resync)
391 }
392}
393
394impl Default for ConsumerGroupManager {
395 fn default() -> Self {
396 Self::new(Duration::from_secs(30))
397 }
398}
399
400fn collect_timed_out_entries(
401 group: &ConsumerGroup,
402 exclude_consumer: &str,
403 idle_timeout: Duration,
404 now: Instant,
405 max: usize,
406) -> Vec<(u64, String)> {
407 let mut timed_out = Vec::new();
408 for (name, state) in &group.consumers {
409 if name == exclude_consumer {
410 continue;
411 }
412 for entry in &state.pending {
413 if now.duration_since(entry.last_delivery) >= idle_timeout {
414 timed_out.push((entry.event_seq, name.clone()));
415 if timed_out.len() >= max {
416 return timed_out;
417 }
418 }
419 }
420 }
421 timed_out
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::ring::{CdcOp, CdcRing};
428
429 fn make_ring(n: usize) -> CdcRing {
430 let mut ring = CdcRing::new(100);
431 for i in 0..n {
432 ring.push(
433 CdcOp::Set,
434 format!("key:{}", i).into_bytes(),
435 Some(format!("val:{}", i).into_bytes()),
436 i as u64 * 100,
437 );
438 }
439 ring
440 }
441
442 #[test]
443 fn test_create_group() {
444 let mut mgr = ConsumerGroupManager::default();
445 assert!(mgr.create_group("mygroup", 0).is_ok());
446 assert!(mgr.has_group("mygroup"));
447 assert_eq!(mgr.group_count(), 1);
448 }
449
450 #[test]
451 fn test_create_duplicate_group() {
452 let mut mgr = ConsumerGroupManager::default();
453 mgr.create_group("mygroup", 0).unwrap();
454 let result = mgr.create_group("mygroup", 0);
455 assert!(matches!(result, Err(ConsumerGroupError::GroupExists)));
456 }
457
458 #[test]
459 fn test_read_group_basic() {
460 let ring = make_ring(5);
461 let mut mgr = ConsumerGroupManager::default();
462 mgr.create_group("g1", 0).unwrap();
463
464 let result = mgr.read_group(&ring, "g1", "c1", 3).unwrap();
465 assert_eq!(result.events.len(), 3);
466 assert!(!result.gap);
467 assert_eq!(result.events[0].seq, 0);
468 assert_eq!(result.events[2].seq, 2);
469
470 let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
471 assert_eq!(result.events.len(), 2);
472 assert_eq!(result.events[0].seq, 3);
473
474 let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
475 assert_eq!(result.events.len(), 0);
476 }
477
478 #[test]
479 fn test_ack_removes_pending() {
480 let ring = make_ring(5);
481 let mut mgr = ConsumerGroupManager::default();
482 mgr.create_group("g1", 0).unwrap();
483
484 mgr.read_group(&ring, "g1", "c1", 3).unwrap();
485 let pending = mgr.pending("g1").unwrap();
486 assert_eq!(pending.len(), 3);
487
488 let acked = mgr.ack("g1", &[0, 1]).unwrap();
489 assert_eq!(acked, 2);
490
491 let pending = mgr.pending("g1").unwrap();
492 assert_eq!(pending.len(), 1);
493 assert_eq!(pending[0].seq, 2);
494 }
495
496 #[test]
497 fn test_ack_nonexistent_group() {
498 let mut mgr = ConsumerGroupManager::default();
499 let result = mgr.ack("nogroup", &[0]);
500 assert!(matches!(result, Err(ConsumerGroupError::NoGroup(_))));
501 }
502
503 #[test]
504 fn test_pending_lists_all_consumers() {
505 let ring = make_ring(10);
506 let mut mgr = ConsumerGroupManager::default();
507 mgr.create_group("g1", 0).unwrap();
508
509 mgr.read_group(&ring, "g1", "c1", 3).unwrap();
510 mgr.read_group(&ring, "g1", "c2", 3).unwrap();
511
512 let pending = mgr.pending("g1").unwrap();
513 assert_eq!(pending.len(), 6);
514 }
515
516 #[test]
517 fn test_multiple_consumers_get_different_events() {
518 let ring = make_ring(6);
519 let mut mgr = ConsumerGroupManager::default();
520 mgr.create_group("g1", 0).unwrap();
521
522 let r1 = mgr.read_group(&ring, "g1", "c1", 3).unwrap();
523 assert_eq!(r1.events.len(), 3);
524 assert_eq!(r1.events[0].seq, 0);
525
526 let r2 = mgr.read_group(&ring, "g1", "c2", 3).unwrap();
527 assert_eq!(r2.events.len(), 3);
528 assert_eq!(r2.events[0].seq, 3);
529 }
530
531 #[test]
532 fn test_gap_detection() {
533 let mut ring = CdcRing::new(4);
534 for i in 0..10 {
535 ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
536 }
537
538 let mut mgr = ConsumerGroupManager::default();
539 mgr.create_group("g1", 0).unwrap();
540
541 let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
542 assert!(result.gap);
543
544 let has_gap = mgr.check_gap(&ring, "g1", "c1").unwrap();
545 assert!(has_gap);
546 }
547
548 #[test]
549 fn test_claim_transfers_entries() {
550 let ring = make_ring(5);
551 let mut mgr = ConsumerGroupManager::new(Duration::from_millis(0));
552 mgr.create_group("g1", 0).unwrap();
553
554 mgr.read_group(&ring, "g1", "c1", 3).unwrap();
555
556 let claimed = mgr
557 .claim("g1", "c2", Duration::from_millis(0), &[0, 1])
558 .unwrap();
559 assert_eq!(claimed.len(), 2);
560
561 let pending = mgr.pending("g1").unwrap();
562 let c1_pending: Vec<_> = pending.iter().filter(|p| p.consumer == "c1").collect();
563 let c2_pending: Vec<_> = pending.iter().filter(|p| p.consumer == "c2").collect();
564 assert_eq!(c1_pending.len(), 1);
565 assert_eq!(c2_pending.len(), 2);
566 }
567
568 #[test]
569 fn test_claim_respects_min_idle() {
570 let ring = make_ring(5);
571 let mut mgr = ConsumerGroupManager::default();
572 mgr.create_group("g1", 0).unwrap();
573
574 mgr.read_group(&ring, "g1", "c1", 3).unwrap();
575
576 let claimed = mgr
577 .claim("g1", "c2", Duration::from_secs(9999), &[0, 1])
578 .unwrap();
579 assert_eq!(claimed.len(), 0);
580 }
581
582 #[test]
583 fn test_read_nonexistent_group() {
584 let ring = make_ring(5);
585 let mut mgr = ConsumerGroupManager::default();
586 let result = mgr.read_group(&ring, "nogroup", "c1", 5);
587 assert!(matches!(result, Err(ConsumerGroupError::NoGroup(_))));
588 }
589
590 #[test]
591 fn test_group_start_seq() {
592 let ring = make_ring(10);
593 let mut mgr = ConsumerGroupManager::default();
594 mgr.create_group("g1", 5).unwrap();
595
596 let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
597 assert_eq!(result.events.len(), 5);
598 assert_eq!(result.events[0].seq, 5);
599 }
600
601 #[test]
602 fn test_ack_advances_last_acked() {
603 let ring = make_ring(5);
604 let mut mgr = ConsumerGroupManager::default();
605 mgr.create_group("g1", 0).unwrap();
606
607 mgr.read_group(&ring, "g1", "c1", 5).unwrap();
608 mgr.ack("g1", &[0, 1, 4]).unwrap();
609
610 let pending = mgr.pending("g1").unwrap();
611 assert_eq!(pending.len(), 2);
612
613 let seqs: Vec<u64> = pending.iter().map(|p| p.seq).collect();
614 assert!(seqs.contains(&2));
615 assert!(seqs.contains(&3));
616 }
617}