reddb_server/storage/queue/
consumer_group.rs1use std::collections::{BTreeMap, HashMap};
7
8#[derive(Debug, Clone)]
10pub struct PendingEntry {
11 pub seq: u64,
13 pub consumer: String,
15 pub delivered_at_ns: u64,
17 pub delivery_count: u32,
19}
20
21#[derive(Debug, Clone)]
23pub struct ConsumerState {
24 pub name: String,
26 pub pending_count: usize,
28}
29
30pub struct ConsumerGroup {
33 pub name: String,
35 pending: BTreeMap<u64, PendingEntry>,
37 consumers: HashMap<String, ConsumerState>,
39 last_delivered_seq: u64,
41}
42
43impl ConsumerGroup {
44 pub fn new(name: impl Into<String>) -> Self {
46 Self {
47 name: name.into(),
48 pending: BTreeMap::new(),
49 consumers: HashMap::new(),
50 last_delivered_seq: 0,
51 }
52 }
53
54 pub fn add_consumer(&mut self, name: &str) {
56 self.consumers
57 .entry(name.to_string())
58 .or_insert(ConsumerState {
59 name: name.to_string(),
60 pending_count: 0,
61 });
62 }
63
64 pub fn deliver(&mut self, seq: u64, consumer: &str) {
66 let now_ns = std::time::SystemTime::now()
67 .duration_since(std::time::UNIX_EPOCH)
68 .unwrap_or_default()
69 .as_nanos() as u64;
70
71 let delivery_count = self
72 .pending
73 .get(&seq)
74 .map(|p| p.delivery_count + 1)
75 .unwrap_or(1);
76
77 self.pending.insert(
78 seq,
79 PendingEntry {
80 seq,
81 consumer: consumer.to_string(),
82 delivered_at_ns: now_ns,
83 delivery_count,
84 },
85 );
86
87 if let Some(state) = self.consumers.get_mut(consumer) {
88 state.pending_count += 1;
89 }
90
91 if seq > self.last_delivered_seq {
92 self.last_delivered_seq = seq;
93 }
94 }
95
96 pub fn ack(&mut self, seq: u64) -> bool {
98 if let Some(entry) = self.pending.remove(&seq) {
99 if let Some(state) = self.consumers.get_mut(&entry.consumer) {
100 state.pending_count = state.pending_count.saturating_sub(1);
101 }
102 true
103 } else {
104 false
105 }
106 }
107
108 pub fn nack(&mut self, seq: u64) -> bool {
110 self.pending.remove(&seq).is_some()
111 }
112
113 pub fn pending_for_consumer(&self, consumer: &str) -> Vec<&PendingEntry> {
115 self.pending
116 .values()
117 .filter(|p| p.consumer == consumer)
118 .collect()
119 }
120
121 pub fn all_pending(&self) -> Vec<&PendingEntry> {
123 self.pending.values().collect()
124 }
125
126 pub fn pending_count(&self) -> usize {
128 self.pending.len()
129 }
130
131 pub fn consumers(&self) -> Vec<&ConsumerState> {
133 self.consumers.values().collect()
134 }
135
136 pub fn is_pending(&self, seq: u64) -> bool {
138 self.pending.contains_key(&seq)
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[test]
147 fn test_consumer_group_basic() {
148 let mut group = ConsumerGroup::new("workers");
149 group.add_consumer("worker1");
150 group.add_consumer("worker2");
151
152 group.deliver(1, "worker1");
153 group.deliver(2, "worker2");
154
155 assert_eq!(group.pending_count(), 2);
156 assert!(group.is_pending(1));
157 assert!(group.is_pending(2));
158 }
159
160 #[test]
161 fn test_consumer_group_ack() {
162 let mut group = ConsumerGroup::new("workers");
163 group.add_consumer("worker1");
164
165 group.deliver(1, "worker1");
166 group.deliver(2, "worker1");
167
168 assert!(group.ack(1));
169 assert!(!group.is_pending(1));
170 assert!(group.is_pending(2));
171 assert_eq!(group.pending_count(), 1);
172 }
173
174 #[test]
175 fn test_consumer_group_nack() {
176 let mut group = ConsumerGroup::new("workers");
177 group.add_consumer("worker1");
178
179 group.deliver(1, "worker1");
180 assert!(group.nack(1));
181 assert!(!group.is_pending(1));
182 }
183
184 #[test]
185 fn test_consumer_group_pending_for_consumer() {
186 let mut group = ConsumerGroup::new("workers");
187 group.add_consumer("w1");
188 group.add_consumer("w2");
189
190 group.deliver(1, "w1");
191 group.deliver(2, "w2");
192 group.deliver(3, "w1");
193
194 let w1_pending = group.pending_for_consumer("w1");
195 assert_eq!(w1_pending.len(), 2);
196 }
197}