Skip to main content

reddb_server/storage/queue/
consumer_group.rs

1//! Consumer Groups for Queue
2//!
3//! Multiple consumers can read from the same queue. Each consumer group
4//! tracks which messages have been delivered and acknowledged.
5
6use std::collections::{BTreeMap, HashMap};
7
8/// A pending message awaiting acknowledgment
9#[derive(Debug, Clone)]
10pub struct PendingEntry {
11    /// Message sequence number
12    pub seq: u64,
13    /// Consumer that received it
14    pub consumer: String,
15    /// When it was delivered (nanoseconds)
16    pub delivered_at_ns: u64,
17    /// How many times this message has been delivered
18    pub delivery_count: u32,
19}
20
21/// Consumer state within a group
22#[derive(Debug, Clone)]
23pub struct ConsumerState {
24    /// Consumer name
25    pub name: String,
26    /// Number of pending (unacked) messages
27    pub pending_count: usize,
28}
29
30/// A consumer group manages message delivery and acknowledgment
31/// for a set of consumers reading from the same queue.
32pub struct ConsumerGroup {
33    /// Group name
34    pub name: String,
35    /// Pending messages: seq → PendingEntry
36    pending: BTreeMap<u64, PendingEntry>,
37    /// Consumer states
38    consumers: HashMap<String, ConsumerState>,
39    /// Last delivered sequence number
40    last_delivered_seq: u64,
41}
42
43impl ConsumerGroup {
44    /// Create a new consumer group
45    pub fn new(name: impl Into<String>) -> Self {
46        Self {
47            name: name.into(),
48            pending: BTreeMap::new(),
49            consumers: HashMap::new(),
50            last_delivered_seq: 0,
51        }
52    }
53
54    /// Register a consumer (or return existing)
55    pub fn add_consumer(&mut self, name: &str) {
56        self.consumers
57            .entry(name.to_string())
58            .or_insert(ConsumerState {
59                name: name.to_string(),
60                pending_count: 0,
61            });
62    }
63
64    /// Record that a message was delivered to a consumer
65    pub fn deliver(&mut self, seq: u64, consumer: &str) {
66        let now_ns = std::time::SystemTime::now()
67            .duration_since(std::time::UNIX_EPOCH)
68            .unwrap_or_default()
69            .as_nanos() as u64;
70
71        let delivery_count = self
72            .pending
73            .get(&seq)
74            .map(|p| p.delivery_count + 1)
75            .unwrap_or(1);
76
77        self.pending.insert(
78            seq,
79            PendingEntry {
80                seq,
81                consumer: consumer.to_string(),
82                delivered_at_ns: now_ns,
83                delivery_count,
84            },
85        );
86
87        if let Some(state) = self.consumers.get_mut(consumer) {
88            state.pending_count += 1;
89        }
90
91        if seq > self.last_delivered_seq {
92            self.last_delivered_seq = seq;
93        }
94    }
95
96    /// Acknowledge a message (remove from pending)
97    pub fn ack(&mut self, seq: u64) -> bool {
98        if let Some(entry) = self.pending.remove(&seq) {
99            if let Some(state) = self.consumers.get_mut(&entry.consumer) {
100                state.pending_count = state.pending_count.saturating_sub(1);
101            }
102            true
103        } else {
104            false
105        }
106    }
107
108    /// Negative acknowledge: re-make the message available
109    pub fn nack(&mut self, seq: u64) -> bool {
110        self.pending.remove(&seq).is_some()
111    }
112
113    /// Get all pending entries for a consumer
114    pub fn pending_for_consumer(&self, consumer: &str) -> Vec<&PendingEntry> {
115        self.pending
116            .values()
117            .filter(|p| p.consumer == consumer)
118            .collect()
119    }
120
121    /// Get all pending entries
122    pub fn all_pending(&self) -> Vec<&PendingEntry> {
123        self.pending.values().collect()
124    }
125
126    /// Number of pending messages
127    pub fn pending_count(&self) -> usize {
128        self.pending.len()
129    }
130
131    /// List consumers
132    pub fn consumers(&self) -> Vec<&ConsumerState> {
133        self.consumers.values().collect()
134    }
135
136    /// Check if a message is pending
137    pub fn is_pending(&self, seq: u64) -> bool {
138        self.pending.contains_key(&seq)
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn test_consumer_group_basic() {
148        let mut group = ConsumerGroup::new("workers");
149        group.add_consumer("worker1");
150        group.add_consumer("worker2");
151
152        group.deliver(1, "worker1");
153        group.deliver(2, "worker2");
154
155        assert_eq!(group.pending_count(), 2);
156        assert!(group.is_pending(1));
157        assert!(group.is_pending(2));
158    }
159
160    #[test]
161    fn test_consumer_group_ack() {
162        let mut group = ConsumerGroup::new("workers");
163        group.add_consumer("worker1");
164
165        group.deliver(1, "worker1");
166        group.deliver(2, "worker1");
167
168        assert!(group.ack(1));
169        assert!(!group.is_pending(1));
170        assert!(group.is_pending(2));
171        assert_eq!(group.pending_count(), 1);
172    }
173
174    #[test]
175    fn test_consumer_group_nack() {
176        let mut group = ConsumerGroup::new("workers");
177        group.add_consumer("worker1");
178
179        group.deliver(1, "worker1");
180        assert!(group.nack(1));
181        assert!(!group.is_pending(1));
182    }
183
184    #[test]
185    fn test_consumer_group_pending_for_consumer() {
186        let mut group = ConsumerGroup::new("workers");
187        group.add_consumer("w1");
188        group.add_consumer("w2");
189
190        group.deliver(1, "w1");
191        group.deliver(2, "w2");
192        group.deliver(3, "w1");
193
194        let w1_pending = group.pending_for_consumer("w1");
195        assert_eq!(w1_pending.len(), 2);
196    }
197}