Skip to main content

kora_cdc/
consumer.rs

1//! CDC consumer groups with acknowledgement tracking and redelivery.
2//!
3//! A [`ConsumerGroup`] coordinates multiple named consumers reading from the
4//! same [`CdcRing`](crate::ring::CdcRing). Each consumer maintains its own
5//! cursor and a pending-entry list. Events are delivered at-least-once:
6//! delivered entries remain pending until explicitly acknowledged, and entries
7//! that exceed a configurable idle timeout become reclaimable by other
8//! consumers via the [`claim`](ConsumerGroupManager::claim) operation.
9//!
10//! The [`ConsumerGroupManager`] owns all groups for a single ring and
11//! exposes the full lifecycle: group creation, consumer reads, acknowledgement,
12//! pending-entry inspection, ownership transfer, and gap detection.
13
14use std::collections::{HashMap, VecDeque};
15use std::time::{Duration, Instant};
16
17use crate::ring::{CdcEvent, CdcRing};
18
19/// An event that has been delivered to a consumer but not yet acknowledged.
20///
21/// Tracks delivery metadata so the system can detect stalled consumers
22/// and redeliver entries that have exceeded the idle timeout.
23#[derive(Debug, Clone)]
24pub struct PendingEntry {
25    /// Sequence number of the underlying [`CdcEvent`](crate::ring::CdcEvent).
26    pub event_seq: u64,
27    /// How many times this entry has been delivered (initial + redeliveries).
28    pub delivery_count: u32,
29    /// Instant of the very first delivery attempt.
30    pub first_delivery: Instant,
31    /// Instant of the most recent delivery attempt.
32    pub last_delivery: Instant,
33}
34
35/// Mutable state for one consumer within a [`ConsumerGroup`].
36#[derive(Debug)]
37pub struct ConsumerState {
38    /// Highest sequence number that has been acknowledged.
39    pub last_acked_seq: u64,
40    /// Ordered queue of entries delivered but not yet acknowledged.
41    pub pending: VecDeque<PendingEntry>,
42    /// Set to `true` when a gap is detected, indicating the consumer
43    /// missed events and may need to reconcile externally.
44    pub needs_resync: bool,
45}
46
47impl ConsumerState {
48    fn new(start_seq: u64) -> Self {
49        Self {
50            last_acked_seq: start_seq,
51            pending: VecDeque::new(),
52            needs_resync: false,
53        }
54    }
55}
56
57/// A named group of consumers sharing a single CDC stream.
58///
59/// The group maintains a high-watermark (`last_delivered_seq`) so that
60/// each new read delivers only events that no consumer in the group has
61/// seen yet. Individual consumer state is tracked in [`ConsumerState`].
62#[derive(Debug)]
63pub struct ConsumerGroup {
64    /// Human-readable group name.
65    pub name: String,
66    /// Per-consumer tracking state, keyed by consumer name.
67    pub consumers: HashMap<String, ConsumerState>,
68    /// Next sequence to deliver (high watermark for the group).
69    pub last_delivered_seq: u64,
70    /// Duration after which an unacknowledged entry becomes reclaimable.
71    pub idle_timeout: Duration,
72}
73
74impl ConsumerGroup {
75    /// Create a consumer group that begins reading at `start_seq`.
76    pub fn new(name: String, start_seq: u64, idle_timeout: Duration) -> Self {
77        Self {
78            name,
79            consumers: HashMap::new(),
80            last_delivered_seq: start_seq,
81            idle_timeout,
82        }
83    }
84}
85
86/// Outcome of a [`ConsumerGroupManager::read_group`] call.
87#[derive(Debug)]
88pub struct GroupReadResult {
89    /// Events delivered to the consumer in this batch.
90    pub events: Vec<CdcEvent>,
91    /// `true` when the ring evicted events before they could be delivered,
92    /// indicating the consumer has fallen behind.
93    pub gap: bool,
94}
95
96/// Snapshot of a single pending entry, used for inspection.
97#[derive(Debug, Clone)]
98pub struct PendingSummary {
99    /// Sequence number of the pending event.
100    pub seq: u64,
101    /// Name of the consumer that currently owns this entry.
102    pub consumer: String,
103    /// Milliseconds elapsed since the most recent delivery.
104    pub idle_ms: u64,
105    /// Total number of delivery attempts.
106    pub delivery_count: u32,
107}
108
109/// Errors returned by [`ConsumerGroupManager`] operations.
110#[derive(Debug, thiserror::Error)]
111pub enum ConsumerGroupError {
112    /// Attempted to create a group whose name is already taken.
113    #[error("BUSYGROUP consumer group name already exists")]
114    GroupExists,
115    /// Referenced a group name that does not exist.
116    #[error("NOGROUP no such consumer group '{0}'")]
117    NoGroup(String),
118    /// The backing CDC ring buffer is unavailable.
119    #[error("ERR CDC ring not available")]
120    NoRing,
121}
122
123/// Owns all consumer groups for a single CDC ring.
124///
125/// Provides the full group lifecycle: creation, per-consumer reads,
126/// acknowledgement, pending-entry inspection, ownership transfer (claim),
127/// and gap detection.
128pub struct ConsumerGroupManager {
129    groups: HashMap<String, ConsumerGroup>,
130    default_idle_timeout: Duration,
131}
132
133impl ConsumerGroupManager {
134    /// Create a manager whose newly created groups use `default_idle_timeout`
135    /// for pending-entry redelivery.
136    pub fn new(default_idle_timeout: Duration) -> Self {
137        Self {
138            groups: HashMap::new(),
139            default_idle_timeout,
140        }
141    }
142
143    /// Create a consumer group that begins delivering events at `start_seq`.
144    ///
145    /// Returns [`ConsumerGroupError::GroupExists`] if `name` is already taken.
146    pub fn create_group(&mut self, name: &str, start_seq: u64) -> Result<(), ConsumerGroupError> {
147        if self.groups.contains_key(name) {
148            return Err(ConsumerGroupError::GroupExists);
149        }
150        self.groups.insert(
151            name.to_string(),
152            ConsumerGroup::new(name.to_string(), start_seq, self.default_idle_timeout),
153        );
154        Ok(())
155    }
156
157    /// Deliver up to `count` events to `consumer_name` within `group_name`.
158    ///
159    /// Before reading fresh events from the ring, timed-out pending entries
160    /// owned by *other* consumers are reclaimed and delivered first. The
161    /// consumer is auto-created if it does not already exist.
162    pub fn read_group(
163        &mut self,
164        ring: &CdcRing,
165        group_name: &str,
166        consumer_name: &str,
167        count: usize,
168    ) -> Result<GroupReadResult, ConsumerGroupError> {
169        let group = self
170            .groups
171            .get_mut(group_name)
172            .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
173
174        let now = Instant::now();
175
176        if !group.consumers.contains_key(consumer_name) {
177            group.consumers.insert(
178                consumer_name.to_string(),
179                ConsumerState::new(group.last_delivered_seq),
180            );
181        }
182
183        let mut delivered = Vec::new();
184        let mut gap = false;
185
186        let idle_timeout = group.idle_timeout;
187        let timed_out = collect_timed_out_entries(group, consumer_name, idle_timeout, now, count);
188
189        for (seq, original_consumer) in &timed_out {
190            if let Some(event) = ring.get(*seq) {
191                delivered.push(event.clone());
192
193                if let Some(consumer_state) = group.consumers.get_mut(original_consumer.as_str()) {
194                    consumer_state.pending.retain(|p| p.event_seq != *seq);
195                }
196            }
197        }
198
199        let consumer = group
200            .consumers
201            .get_mut(consumer_name)
202            .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
203
204        for (seq, _) in &timed_out {
205            consumer.pending.push_back(PendingEntry {
206                event_seq: *seq,
207                delivery_count: 2,
208                first_delivery: now,
209                last_delivery: now,
210            });
211        }
212
213        let remaining = count.saturating_sub(delivered.len());
214        if remaining > 0 {
215            let read_result = ring.read(group.last_delivered_seq, remaining);
216            if read_result.gap {
217                gap = true;
218                consumer.needs_resync = true;
219            }
220
221            for event in &read_result.events {
222                consumer.pending.push_back(PendingEntry {
223                    event_seq: event.seq,
224                    delivery_count: 1,
225                    first_delivery: now,
226                    last_delivery: now,
227                });
228                delivered.push(event.clone());
229            }
230
231            group.last_delivered_seq = read_result.next_seq;
232        }
233
234        Ok(GroupReadResult {
235            events: delivered,
236            gap,
237        })
238    }
239
240    /// Acknowledge one or more events by sequence number.
241    ///
242    /// Acknowledged entries are removed from every consumer's pending list
243    /// within the group. Returns the total number of entries removed.
244    pub fn ack(&mut self, group_name: &str, seqs: &[u64]) -> Result<usize, ConsumerGroupError> {
245        let group = self
246            .groups
247            .get_mut(group_name)
248            .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
249
250        let mut acked = 0;
251        for consumer in group.consumers.values_mut() {
252            let before = consumer.pending.len();
253            consumer.pending.retain(|p| !seqs.contains(&p.event_seq));
254            let removed = before - consumer.pending.len();
255            acked += removed;
256
257            for &seq in seqs {
258                if seq > consumer.last_acked_seq {
259                    consumer.last_acked_seq = seq;
260                }
261            }
262        }
263
264        Ok(acked)
265    }
266
267    /// Return a snapshot of every pending entry across all consumers in the group,
268    /// sorted by sequence number.
269    pub fn pending(&self, group_name: &str) -> Result<Vec<PendingSummary>, ConsumerGroupError> {
270        let group = self
271            .groups
272            .get(group_name)
273            .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
274
275        let now = Instant::now();
276        let mut result = Vec::new();
277
278        for (consumer_name, state) in &group.consumers {
279            for entry in &state.pending {
280                let idle = now.duration_since(entry.last_delivery);
281                result.push(PendingSummary {
282                    seq: entry.event_seq,
283                    consumer: consumer_name.clone(),
284                    idle_ms: idle.as_millis() as u64,
285                    delivery_count: entry.delivery_count,
286                });
287            }
288        }
289
290        result.sort_by_key(|p| p.seq);
291        Ok(result)
292    }
293
294    /// Transfer ownership of pending entries to `target_consumer`.
295    ///
296    /// Only entries whose idle time meets or exceeds `min_idle` are
297    /// transferred. Returns the sequence numbers that were claimed.
298    pub fn claim(
299        &mut self,
300        group_name: &str,
301        target_consumer: &str,
302        min_idle: Duration,
303        seqs: &[u64],
304    ) -> Result<Vec<u64>, ConsumerGroupError> {
305        let group = self
306            .groups
307            .get_mut(group_name)
308            .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
309
310        let now = Instant::now();
311        let mut claimed_entries: Vec<(u64, PendingEntry)> = Vec::new();
312
313        for consumer in group.consumers.values_mut() {
314            let mut remaining = VecDeque::new();
315            for entry in consumer.pending.drain(..) {
316                let idle = now.duration_since(entry.last_delivery);
317                if seqs.contains(&entry.event_seq) && idle >= min_idle {
318                    claimed_entries.push((
319                        entry.event_seq,
320                        PendingEntry {
321                            event_seq: entry.event_seq,
322                            delivery_count: entry.delivery_count + 1,
323                            first_delivery: entry.first_delivery,
324                            last_delivery: now,
325                        },
326                    ));
327                } else {
328                    remaining.push_back(entry);
329                }
330            }
331            consumer.pending = remaining;
332        }
333
334        if !group.consumers.contains_key(target_consumer) {
335            group.consumers.insert(
336                target_consumer.to_string(),
337                ConsumerState::new(group.last_delivered_seq),
338            );
339        }
340
341        let claimed_seqs: Vec<u64> = claimed_entries.iter().map(|(seq, _)| *seq).collect();
342
343        let target = group
344            .consumers
345            .get_mut(target_consumer)
346            .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
347
348        for (_, entry) in claimed_entries {
349            target.pending.push_back(entry);
350        }
351
352        Ok(claimed_seqs)
353    }
354
355    /// Return `true` if a group with the given name exists.
356    pub fn has_group(&self, name: &str) -> bool {
357        self.groups.contains_key(name)
358    }
359
360    /// Return the number of registered groups.
361    pub fn group_count(&self) -> usize {
362        self.groups.len()
363    }
364
365    /// Check whether `consumer_name` in `group_name` has fallen behind the ring.
366    ///
367    /// Returns `true` if the consumer's last acknowledged sequence has been
368    /// evicted from the ring, or if the consumer was already marked for resync.
369    pub fn check_gap(
370        &mut self,
371        ring: &CdcRing,
372        group_name: &str,
373        consumer_name: &str,
374    ) -> Result<bool, ConsumerGroupError> {
375        let group = self
376            .groups
377            .get_mut(group_name)
378            .ok_or_else(|| ConsumerGroupError::NoGroup(group_name.to_string()))?;
379
380        let consumer = match group.consumers.get_mut(consumer_name) {
381            Some(c) => c,
382            None => return Ok(false),
383        };
384
385        if ring.start_seq() > consumer.last_acked_seq {
386            consumer.needs_resync = true;
387            return Ok(true);
388        }
389
390        Ok(consumer.needs_resync)
391    }
392}
393
394impl Default for ConsumerGroupManager {
395    fn default() -> Self {
396        Self::new(Duration::from_secs(30))
397    }
398}
399
400fn collect_timed_out_entries(
401    group: &ConsumerGroup,
402    exclude_consumer: &str,
403    idle_timeout: Duration,
404    now: Instant,
405    max: usize,
406) -> Vec<(u64, String)> {
407    let mut timed_out = Vec::new();
408    for (name, state) in &group.consumers {
409        if name == exclude_consumer {
410            continue;
411        }
412        for entry in &state.pending {
413            if now.duration_since(entry.last_delivery) >= idle_timeout {
414                timed_out.push((entry.event_seq, name.clone()));
415                if timed_out.len() >= max {
416                    return timed_out;
417                }
418            }
419        }
420    }
421    timed_out
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::ring::{CdcOp, CdcRing};
428
429    fn make_ring(n: usize) -> CdcRing {
430        let mut ring = CdcRing::new(100);
431        for i in 0..n {
432            ring.push(
433                CdcOp::Set,
434                format!("key:{}", i).into_bytes(),
435                Some(format!("val:{}", i).into_bytes()),
436                i as u64 * 100,
437            );
438        }
439        ring
440    }
441
442    #[test]
443    fn test_create_group() {
444        let mut mgr = ConsumerGroupManager::default();
445        assert!(mgr.create_group("mygroup", 0).is_ok());
446        assert!(mgr.has_group("mygroup"));
447        assert_eq!(mgr.group_count(), 1);
448    }
449
450    #[test]
451    fn test_create_duplicate_group() {
452        let mut mgr = ConsumerGroupManager::default();
453        mgr.create_group("mygroup", 0).unwrap();
454        let result = mgr.create_group("mygroup", 0);
455        assert!(matches!(result, Err(ConsumerGroupError::GroupExists)));
456    }
457
458    #[test]
459    fn test_read_group_basic() {
460        let ring = make_ring(5);
461        let mut mgr = ConsumerGroupManager::default();
462        mgr.create_group("g1", 0).unwrap();
463
464        let result = mgr.read_group(&ring, "g1", "c1", 3).unwrap();
465        assert_eq!(result.events.len(), 3);
466        assert!(!result.gap);
467        assert_eq!(result.events[0].seq, 0);
468        assert_eq!(result.events[2].seq, 2);
469
470        let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
471        assert_eq!(result.events.len(), 2);
472        assert_eq!(result.events[0].seq, 3);
473
474        let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
475        assert_eq!(result.events.len(), 0);
476    }
477
478    #[test]
479    fn test_ack_removes_pending() {
480        let ring = make_ring(5);
481        let mut mgr = ConsumerGroupManager::default();
482        mgr.create_group("g1", 0).unwrap();
483
484        mgr.read_group(&ring, "g1", "c1", 3).unwrap();
485        let pending = mgr.pending("g1").unwrap();
486        assert_eq!(pending.len(), 3);
487
488        let acked = mgr.ack("g1", &[0, 1]).unwrap();
489        assert_eq!(acked, 2);
490
491        let pending = mgr.pending("g1").unwrap();
492        assert_eq!(pending.len(), 1);
493        assert_eq!(pending[0].seq, 2);
494    }
495
496    #[test]
497    fn test_ack_nonexistent_group() {
498        let mut mgr = ConsumerGroupManager::default();
499        let result = mgr.ack("nogroup", &[0]);
500        assert!(matches!(result, Err(ConsumerGroupError::NoGroup(_))));
501    }
502
503    #[test]
504    fn test_pending_lists_all_consumers() {
505        let ring = make_ring(10);
506        let mut mgr = ConsumerGroupManager::default();
507        mgr.create_group("g1", 0).unwrap();
508
509        mgr.read_group(&ring, "g1", "c1", 3).unwrap();
510        mgr.read_group(&ring, "g1", "c2", 3).unwrap();
511
512        let pending = mgr.pending("g1").unwrap();
513        assert_eq!(pending.len(), 6);
514    }
515
516    #[test]
517    fn test_multiple_consumers_get_different_events() {
518        let ring = make_ring(6);
519        let mut mgr = ConsumerGroupManager::default();
520        mgr.create_group("g1", 0).unwrap();
521
522        let r1 = mgr.read_group(&ring, "g1", "c1", 3).unwrap();
523        assert_eq!(r1.events.len(), 3);
524        assert_eq!(r1.events[0].seq, 0);
525
526        let r2 = mgr.read_group(&ring, "g1", "c2", 3).unwrap();
527        assert_eq!(r2.events.len(), 3);
528        assert_eq!(r2.events[0].seq, 3);
529    }
530
531    #[test]
532    fn test_gap_detection() {
533        let mut ring = CdcRing::new(4);
534        for i in 0..10 {
535            ring.push(CdcOp::Set, format!("k{}", i).into_bytes(), None, i as u64);
536        }
537
538        let mut mgr = ConsumerGroupManager::default();
539        mgr.create_group("g1", 0).unwrap();
540
541        let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
542        assert!(result.gap);
543
544        let has_gap = mgr.check_gap(&ring, "g1", "c1").unwrap();
545        assert!(has_gap);
546    }
547
548    #[test]
549    fn test_claim_transfers_entries() {
550        let ring = make_ring(5);
551        let mut mgr = ConsumerGroupManager::new(Duration::from_millis(0));
552        mgr.create_group("g1", 0).unwrap();
553
554        mgr.read_group(&ring, "g1", "c1", 3).unwrap();
555
556        let claimed = mgr
557            .claim("g1", "c2", Duration::from_millis(0), &[0, 1])
558            .unwrap();
559        assert_eq!(claimed.len(), 2);
560
561        let pending = mgr.pending("g1").unwrap();
562        let c1_pending: Vec<_> = pending.iter().filter(|p| p.consumer == "c1").collect();
563        let c2_pending: Vec<_> = pending.iter().filter(|p| p.consumer == "c2").collect();
564        assert_eq!(c1_pending.len(), 1);
565        assert_eq!(c2_pending.len(), 2);
566    }
567
568    #[test]
569    fn test_claim_respects_min_idle() {
570        let ring = make_ring(5);
571        let mut mgr = ConsumerGroupManager::default();
572        mgr.create_group("g1", 0).unwrap();
573
574        mgr.read_group(&ring, "g1", "c1", 3).unwrap();
575
576        let claimed = mgr
577            .claim("g1", "c2", Duration::from_secs(9999), &[0, 1])
578            .unwrap();
579        assert_eq!(claimed.len(), 0);
580    }
581
582    #[test]
583    fn test_read_nonexistent_group() {
584        let ring = make_ring(5);
585        let mut mgr = ConsumerGroupManager::default();
586        let result = mgr.read_group(&ring, "nogroup", "c1", 5);
587        assert!(matches!(result, Err(ConsumerGroupError::NoGroup(_))));
588    }
589
590    #[test]
591    fn test_group_start_seq() {
592        let ring = make_ring(10);
593        let mut mgr = ConsumerGroupManager::default();
594        mgr.create_group("g1", 5).unwrap();
595
596        let result = mgr.read_group(&ring, "g1", "c1", 10).unwrap();
597        assert_eq!(result.events.len(), 5);
598        assert_eq!(result.events[0].seq, 5);
599    }
600
601    #[test]
602    fn test_ack_advances_last_acked() {
603        let ring = make_ring(5);
604        let mut mgr = ConsumerGroupManager::default();
605        mgr.create_group("g1", 0).unwrap();
606
607        mgr.read_group(&ring, "g1", "c1", 5).unwrap();
608        mgr.ack("g1", &[0, 1, 4]).unwrap();
609
610        let pending = mgr.pending("g1").unwrap();
611        assert_eq!(pending.len(), 2);
612
613        let seqs: Vec<u64> = pending.iter().map(|p| p.seq).collect();
614        assert!(seqs.contains(&2));
615        assert!(seqs.contains(&3));
616    }
617}