Skip to main content

hermesmq_core/
queue.rs

1use std::collections::{BTreeMap, BTreeSet, VecDeque};
2
3use serde::{Deserialize, Serialize};
4
5use crate::raft::{AppRequest, AppResponse, Delivered, ProduceItem};
6use crate::types::{LeaseId, Message, Offset, Priority};
7
8const DEDUP_CAPACITY: usize = 100_000;
9const RESERVED_DEN: u64 = 4;
10const DEFAULT_RETAIN_MAX_MESSAGES: u64 = 1_000_000;
11const GROUP_GC_THRESHOLD: usize = 1024;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14struct Lease {
15    offset: Offset,
16    deadline_ms: u64,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20struct GroupState {
21    ack_watermark: Offset,
22    acked_above: BTreeSet<Offset>,
23    in_flight: BTreeMap<LeaseId, Lease>,
24    leased_offsets: BTreeMap<Offset, u64>,
25    poll_count: u64,
26}
27
28impl GroupState {
29    fn mark_acked(&mut self, offset: Offset) {
30        if offset < self.ack_watermark {
31            return;
32        }
33        self.acked_above.insert(offset);
34        while self.acked_above.remove(&self.ack_watermark) {
35            self.ack_watermark += 1;
36        }
37    }
38
39    fn expire(&mut self, now_ms: u64) {
40        let expired: Vec<LeaseId> = self
41            .in_flight
42            .iter()
43            .filter(|(_, l)| l.deadline_ms <= now_ms)
44            .map(|(id, _)| *id)
45            .collect();
46        for id in expired {
47            if let Some(l) = self.in_flight.remove(&id) {
48                self.leased_offsets.remove(&l.offset);
49            }
50        }
51    }
52
53    fn advance_watermark(&mut self, to: Offset) {
54        if to > self.ack_watermark {
55            self.ack_watermark = to;
56        }
57        let watermark = self.ack_watermark;
58        self.acked_above.retain(|o| *o >= watermark);
59        let drop: Vec<LeaseId> = self
60            .in_flight
61            .iter()
62            .filter(|(_, l)| l.offset < watermark)
63            .map(|(id, _)| *id)
64            .collect();
65        for id in drop {
66            if let Some(l) = self.in_flight.remove(&id) {
67                self.leased_offsets.remove(&l.offset);
68            }
69        }
70        while self.acked_above.remove(&self.ack_watermark) {
71            self.ack_watermark += 1;
72        }
73    }
74}
75
76#[derive(Debug, Default, Clone, Serialize, Deserialize)]
77struct TopicState {
78    next_offset: Offset,
79    messages: BTreeMap<Offset, Message>,
80    dedup: BTreeMap<(String, u64), Offset>,
81    dedup_order: VecDeque<(String, u64)>,
82    groups: BTreeMap<String, GroupState>,
83    rate_milli_per_sec: u64,
84    burst: u32,
85    retain_max_messages: u64,
86    retain_max_age_ms: u64,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct Queue {
91    topics: BTreeMap<String, TopicState>,
92    next_lease_id: LeaseId,
93}
94
95impl Queue {
96    pub fn apply(&mut self, req: AppRequest) -> AppResponse {
97        match req {
98            AppRequest::CreateTopic { topic } => {
99                self.topics.entry(topic.0).or_default();
100                AppResponse::TopicCreated
101            }
102            AppRequest::DeleteTopic { topic } => {
103                self.topics.remove(&topic.0);
104                AppResponse::TopicDeleted
105            }
106            AppRequest::Produce {
107                topic,
108                priority,
109                content_type,
110                payload,
111                producer_id,
112                seq,
113                ts_ms,
114            } => {
115                let offset = self.produce_one(ProduceItem {
116                    topic,
117                    priority,
118                    content_type,
119                    payload,
120                    producer_id,
121                    seq,
122                    ts_ms,
123                });
124                AppResponse::Produced { offset }
125            }
126            AppRequest::ProduceMany { items } => {
127                let offsets = items.into_iter().map(|item| self.produce_one(item)).collect();
128                AppResponse::ProducedMany { offsets }
129            }
130            AppRequest::Poll {
131                topic,
132                group,
133                max,
134                visibility_timeout_ms,
135                ts_ms,
136            } => {
137                let mut next_lease = self.next_lease_id;
138                let items = poll(
139                    &mut self.topics,
140                    &topic.0,
141                    &group.0,
142                    max,
143                    visibility_timeout_ms,
144                    ts_ms,
145                    &mut next_lease,
146                );
147                self.next_lease_id = next_lease;
148                AppResponse::Polled { items }
149            }
150            AppRequest::Ack {
151                topic,
152                group,
153                lease_id,
154            } => {
155                ack(&mut self.topics, &topic.0, &group.0, lease_id);
156                reclaim_topic(&mut self.topics, &topic.0);
157                AppResponse::Acked
158            }
159            AppRequest::AckMany {
160                topic,
161                group,
162                lease_ids,
163            } => {
164                for lease_id in lease_ids {
165                    ack(&mut self.topics, &topic.0, &group.0, lease_id);
166                }
167                reclaim_topic(&mut self.topics, &topic.0);
168                AppResponse::Acked
169            }
170            AppRequest::NackMany {
171                topic,
172                group,
173                lease_ids,
174            } => {
175                for lease_id in lease_ids {
176                    nack(&mut self.topics, &topic.0, &group.0, lease_id);
177                }
178                AppResponse::Nacked
179            }
180            AppRequest::Nack {
181                topic,
182                group,
183                lease_id,
184            } => {
185                nack(&mut self.topics, &topic.0, &group.0, lease_id);
186                AppResponse::Nacked
187            }
188            AppRequest::CommitOffset {
189                topic,
190                group,
191                offset,
192            } => {
193                commit(&mut self.topics, &topic.0, &group.0, offset);
194                reclaim_topic(&mut self.topics, &topic.0);
195                AppResponse::Committed
196            }
197            AppRequest::SetRateLimit {
198                topic,
199                rate_milli_per_sec,
200                burst,
201            } => {
202                let t = self.topics.entry(topic.0).or_default();
203                t.rate_milli_per_sec = rate_milli_per_sec;
204                t.burst = burst;
205                AppResponse::RateLimitSet
206            }
207            AppRequest::SetRetention {
208                topic,
209                max_messages,
210                max_age_ms,
211            } => {
212                let t = self.topics.entry(topic.0).or_default();
213                t.retain_max_messages = max_messages;
214                t.retain_max_age_ms = max_age_ms;
215                AppResponse::RetentionSet
216            }
217        }
218    }
219
220    fn produce_one(&mut self, item: ProduceItem) -> Offset {
221        let t = self.topics.entry(item.topic.0).or_default();
222        let dedup = !item.producer_id.is_empty();
223        if dedup {
224            if let Some(offset) = t.dedup.get(&(item.producer_id.clone(), item.seq)) {
225                return *offset;
226            }
227        }
228        let offset = t.next_offset;
229        t.next_offset += 1;
230        t.messages.insert(
231            offset,
232            Message {
233                offset,
234                priority: item.priority,
235                content_type: item.content_type,
236                payload: item.payload,
237                ts_ms: item.ts_ms,
238            },
239        );
240        if dedup {
241            let key = (item.producer_id, item.seq);
242            t.dedup.insert(key.clone(), offset);
243            t.dedup_order.push_back(key);
244            while t.dedup_order.len() > DEDUP_CAPACITY {
245                if let Some(old) = t.dedup_order.pop_front() {
246                    t.dedup.remove(&old);
247                }
248            }
249        }
250        purge_retained(t, item.ts_ms);
251        offset
252    }
253
254    pub fn rate_config(&self, topic: &str) -> Option<(u64, u32)> {
255        self.topics.get(topic).and_then(|t| {
256            if t.rate_milli_per_sec > 0 {
257                Some((t.rate_milli_per_sec, t.burst))
258            } else {
259                None
260            }
261        })
262    }
263
264    pub fn has_deliverable(&self, topic: &str, group: &str, now_ms: u64) -> bool {
265        let Some(t) = self.topics.get(topic) else {
266            return false;
267        };
268        match t.groups.get(group) {
269            None => !t.messages.is_empty(),
270            Some(g) => t.messages.range(g.ack_watermark..).any(|(offset, _)| {
271                !g.acked_above.contains(offset)
272                    && g.leased_offsets
273                        .get(offset)
274                        .is_none_or(|deadline| *deadline <= now_ms)
275            }),
276        }
277    }
278
279    pub fn metrics(&self) -> QueueMetrics {
280        let mut metrics = QueueMetrics {
281            topics: self.topics.len() as u64,
282            messages: 0,
283            in_flight: 0,
284        };
285        for t in self.topics.values() {
286            metrics.messages += t.messages.len() as u64;
287            for g in t.groups.values() {
288                metrics.in_flight += g.in_flight.len() as u64;
289            }
290        }
291        metrics
292    }
293}
294
295#[derive(Debug, Clone, Copy, Default)]
296pub struct QueueMetrics {
297    pub topics: u64,
298    pub messages: u64,
299    pub in_flight: u64,
300}
301
302fn poll(
303    topics: &mut BTreeMap<String, TopicState>,
304    topic: &str,
305    group: &str,
306    max: u32,
307    visibility_timeout_ms: u64,
308    ts_ms: u64,
309    next_lease: &mut LeaseId,
310) -> Vec<Delivered> {
311    let Some(t) = topics.get_mut(topic) else {
312        return Vec::new();
313    };
314    let first = t.messages.keys().next().copied().unwrap_or(t.next_offset);
315    let g = t.groups.entry(group.to_string()).or_insert_with(|| GroupState {
316        ack_watermark: first,
317        ..GroupState::default()
318    });
319    g.expire(ts_ms);
320
321    let max = max as usize;
322    let mut candidates: Vec<(Priority, Offset)> = Vec::new();
323    for (offset, message) in t.messages.range(g.ack_watermark..) {
324        if !g.acked_above.contains(offset) && !g.leased_offsets.contains_key(offset) {
325            candidates.push((message.priority, *offset));
326        }
327    }
328
329    let poll_count = g.poll_count;
330    g.poll_count += 1;
331
332    if candidates.is_empty() || max == 0 {
333        return Vec::new();
334    }
335
336    let base = max / RESERVED_DEN as usize;
337    let bonus = usize::from((poll_count + 1) % RESERVED_DEN == 0);
338    let reserved = (base + bonus).min(max);
339    let priority_slots = max - reserved;
340
341    let by_offset = candidates.clone();
342    let mut by_priority = candidates;
343    by_priority.sort_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
344
345    let mut chosen_set: BTreeSet<Offset> = BTreeSet::new();
346    let mut chosen: Vec<(Priority, Offset)> = Vec::new();
347    for item in by_priority.iter() {
348        if chosen.len() >= priority_slots {
349            break;
350        }
351        if chosen_set.insert(item.1) {
352            chosen.push(*item);
353        }
354    }
355    for item in by_offset.iter() {
356        if chosen.len() >= max {
357            break;
358        }
359        if chosen_set.insert(item.1) {
360            chosen.push(*item);
361        }
362    }
363    for item in by_priority.iter() {
364        if chosen.len() >= max {
365            break;
366        }
367        if chosen_set.insert(item.1) {
368            chosen.push(*item);
369        }
370    }
371    chosen.sort_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
372
373    let mut items = Vec::new();
374    for (_, offset) in chosen {
375        let lease_id = *next_lease;
376        *next_lease += 1;
377        let deadline_ms = ts_ms.saturating_add(visibility_timeout_ms);
378        g.in_flight.insert(lease_id, Lease { offset, deadline_ms });
379        g.leased_offsets.insert(offset, deadline_ms);
380        let message = &t.messages[&offset];
381        items.push(Delivered {
382            lease_id,
383            offset,
384            priority: message.priority,
385            content_type: message.content_type,
386            payload: message.payload.clone(),
387            ts_ms: message.ts_ms,
388        });
389    }
390    items
391}
392
393fn ack(topics: &mut BTreeMap<String, TopicState>, topic: &str, group: &str, lease_id: LeaseId) {
394    if let Some(t) = topics.get_mut(topic) {
395        if let Some(g) = t.groups.get_mut(group) {
396            if let Some(lease) = g.in_flight.remove(&lease_id) {
397                g.leased_offsets.remove(&lease.offset);
398                g.mark_acked(lease.offset);
399            }
400        }
401    }
402}
403
404fn nack(topics: &mut BTreeMap<String, TopicState>, topic: &str, group: &str, lease_id: LeaseId) {
405    if let Some(t) = topics.get_mut(topic) {
406        if let Some(g) = t.groups.get_mut(group) {
407            if let Some(lease) = g.in_flight.remove(&lease_id) {
408                g.leased_offsets.remove(&lease.offset);
409            }
410        }
411    }
412}
413
414fn effective_retention(t: &TopicState) -> (u64, u64) {
415    if t.retain_max_messages == 0 && t.retain_max_age_ms == 0 {
416        (DEFAULT_RETAIN_MAX_MESSAGES, 0)
417    } else {
418        (t.retain_max_messages, t.retain_max_age_ms)
419    }
420}
421
422fn purge_retained(t: &mut TopicState, now_ms: u64) {
423    let (max_messages, max_age_ms) = effective_retention(t);
424    let mut purge: BTreeSet<Offset> = BTreeSet::new();
425    if max_age_ms > 0 {
426        for (offset, message) in t.messages.iter() {
427            if now_ms.saturating_sub(message.ts_ms) > max_age_ms {
428                purge.insert(*offset);
429            } else {
430                break;
431            }
432        }
433    }
434    if max_messages > 0 {
435        let target = max_messages as usize;
436        let kept = t.messages.len() - purge.len();
437        if kept > target {
438            let mut need = kept - target;
439            for offset in t.messages.keys() {
440                if need == 0 {
441                    break;
442                }
443                if purge.insert(*offset) {
444                    need -= 1;
445                }
446            }
447        }
448    }
449    if purge.is_empty() {
450        return;
451    }
452    for offset in &purge {
453        t.messages.remove(offset);
454    }
455    let boundary = t.messages.keys().next().copied().unwrap_or(t.next_offset);
456    for g in t.groups.values_mut() {
457        if boundary > g.ack_watermark {
458            g.advance_watermark(boundary);
459        }
460    }
461}
462
463fn reclaim_topic(topics: &mut BTreeMap<String, TopicState>, topic: &str) {
464    if let Some(t) = topics.get_mut(topic) {
465        reclaim(t);
466    }
467}
468
469fn reclaim(t: &mut TopicState) {
470    if let Some(boundary) = t.groups.values().map(|g| g.ack_watermark).min() {
471        if t.messages.keys().next().is_some_and(|first| boundary > *first) {
472            t.messages = t.messages.split_off(&boundary);
473        }
474    }
475    gc_idle_groups(t);
476}
477
478fn gc_idle_groups(t: &mut TopicState) {
479    if t.groups.len() <= GROUP_GC_THRESHOLD || !t.messages.is_empty() {
480        return;
481    }
482    let next = t.next_offset;
483    t.groups.retain(|_, g| {
484        !(g.ack_watermark >= next
485            && g.in_flight.is_empty()
486            && g.leased_offsets.is_empty()
487            && g.acked_above.is_empty())
488    });
489}
490
491fn group_entry<'a>(t: &'a mut TopicState, group: &str) -> &'a mut GroupState {
492    let first = t.messages.keys().next().copied().unwrap_or(t.next_offset);
493    t.groups.entry(group.to_string()).or_insert_with(|| GroupState {
494        ack_watermark: first,
495        ..GroupState::default()
496    })
497}
498
499fn commit(topics: &mut BTreeMap<String, TopicState>, topic: &str, group: &str, offset: Offset) {
500    if let Some(t) = topics.get_mut(topic) {
501        let g = group_entry(t, group);
502        g.advance_watermark(offset);
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509    use crate::types::{ContentType, GroupId, Priority, TopicId};
510    use bytes::Bytes;
511    use proptest::prelude::*;
512
513    fn produce(q: &mut Queue, topic: &str, priority: u8, body: &[u8], producer: &str, seq: u64) -> Offset {
514        match q.apply(AppRequest::Produce {
515            topic: TopicId::from(topic),
516            priority: Priority(priority),
517            content_type: ContentType::Raw,
518            payload: Bytes::copy_from_slice(body),
519            producer_id: producer.to_string(),
520            seq,
521            ts_ms: 0,
522        }) {
523            AppResponse::Produced { offset } => offset,
524            other => panic!("expected Produced, got {other:?}"),
525        }
526    }
527
528    fn poll_offsets(q: &mut Queue, topic: &str, group: &str, max: u32, vis: u64, ts: u64) -> Vec<(LeaseId, Offset)> {
529        match q.apply(AppRequest::Poll {
530            topic: TopicId::from(topic),
531            group: GroupId::from(group),
532            max,
533            visibility_timeout_ms: vis,
534            ts_ms: ts,
535        }) {
536            AppResponse::Polled { items } => items.into_iter().map(|d| (d.lease_id, d.offset)).collect(),
537            other => panic!("expected Polled, got {other:?}"),
538        }
539    }
540
541    #[test]
542    fn produce_assigns_monotonic_offsets() {
543        let mut q = Queue::default();
544        assert_eq!(produce(&mut q, "t", 0, b"a", "p", 1), 0);
545        assert_eq!(produce(&mut q, "t", 0, b"b", "p", 2), 1);
546        assert_eq!(produce(&mut q, "t", 0, b"c", "p", 3), 2);
547    }
548
549    #[test]
550    fn produce_dedup_returns_same_offset() {
551        let mut q = Queue::default();
552        let first = produce(&mut q, "t", 0, b"a", "p", 1);
553        let dup = produce(&mut q, "t", 0, b"a-again", "p", 1);
554        assert_eq!(first, dup);
555        assert_eq!(produce(&mut q, "t", 0, b"b", "p", 2), 1);
556    }
557
558    #[test]
559    fn empty_producer_id_disables_dedup() {
560        let mut q = Queue::default();
561        assert_eq!(produce(&mut q, "t", 0, b"a", "", 0), 0);
562        assert_eq!(produce(&mut q, "t", 0, b"b", "", 0), 1);
563        assert_eq!(produce(&mut q, "t", 0, b"c", "", 0), 2);
564    }
565
566    #[test]
567    fn poll_orders_by_priority_then_offset() {
568        let mut q = Queue::default();
569        produce(&mut q, "t", 0, b"a", "p", 1);
570        produce(&mut q, "t", 5, b"b", "p", 2);
571        produce(&mut q, "t", 3, b"c", "p", 3);
572        let got: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 0)
573            .into_iter()
574            .map(|(_, o)| o)
575            .collect();
576        assert_eq!(got, vec![1, 2, 0]);
577    }
578
579    #[test]
580    fn leased_messages_are_not_redelivered_until_expiry() {
581        let mut q = Queue::default();
582        produce(&mut q, "t", 0, b"a", "p", 1);
583        let first = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
584        assert_eq!(first.len(), 1);
585        let again = poll_offsets(&mut q, "t", "g", 10, 1000, 500);
586        assert!(again.is_empty());
587        let after_expiry = poll_offsets(&mut q, "t", "g", 10, 1000, 2000);
588        assert_eq!(after_expiry.len(), 1);
589    }
590
591    #[test]
592    fn ack_makes_message_done() {
593        let mut q = Queue::default();
594        produce(&mut q, "t", 0, b"a", "p", 1);
595        let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
596        let (lease_id, _) = leased[0];
597        q.apply(AppRequest::Ack {
598            topic: TopicId::from("t"),
599            group: GroupId::from("g"),
600            lease_id,
601        });
602        let after = poll_offsets(&mut q, "t", "g", 10, 1000, 5000);
603        assert!(after.is_empty());
604    }
605
606    #[test]
607    fn nack_makes_message_immediately_redeliverable() {
608        let mut q = Queue::default();
609        produce(&mut q, "t", 0, b"a", "p", 1);
610        let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
611        let (lease_id, _) = leased[0];
612        q.apply(AppRequest::Nack {
613            topic: TopicId::from("t"),
614            group: GroupId::from("g"),
615            lease_id,
616        });
617        let after = poll_offsets(&mut q, "t", "g", 10, 1000, 1);
618        assert_eq!(after.len(), 1);
619    }
620
621    #[test]
622    fn anti_starvation_serves_oldest_within_reserved_cadence() {
623        let mut q = Queue::default();
624        produce(&mut q, "t", 0, b"low", "", 0);
625        for _ in 0..5 {
626            produce(&mut q, "t", 7, b"high", "", 0);
627        }
628        let mut delivered = Vec::new();
629        for k in 0..4u64 {
630            let items = poll_offsets(&mut q, "t", "g", 1, 1_000_000, k);
631            if let Some((_, offset)) = items.first() {
632                delivered.push(*offset);
633            }
634        }
635        assert!(
636            delivered.contains(&0),
637            "low-priority oldest message must be served within the reserved cadence, got {delivered:?}"
638        );
639    }
640
641    #[test]
642    fn retention_by_count_keeps_newest() {
643        let mut q = Queue::default();
644        q.apply(AppRequest::SetRetention {
645            topic: TopicId::from("t"),
646            max_messages: 3,
647            max_age_ms: 0,
648        });
649        for i in 0..5 {
650            produce(&mut q, "t", 0, &[i], "", 0);
651        }
652        let offsets: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 0)
653            .into_iter()
654            .map(|(_, o)| o)
655            .collect();
656        assert_eq!(offsets, vec![2, 3, 4]);
657    }
658
659    #[test]
660    fn retention_by_age_drops_old() {
661        let mut q = Queue::default();
662        q.apply(AppRequest::SetRetention {
663            topic: TopicId::from("t"),
664            max_messages: 0,
665            max_age_ms: 1000,
666        });
667        q.apply(AppRequest::Produce {
668            topic: TopicId::from("t"),
669            priority: Priority(0),
670            content_type: ContentType::Raw,
671            payload: Bytes::from_static(b"old"),
672            producer_id: String::new(),
673            seq: 0,
674            ts_ms: 0,
675        });
676        q.apply(AppRequest::Produce {
677            topic: TopicId::from("t"),
678            priority: Priority(0),
679            content_type: ContentType::Raw,
680            payload: Bytes::from_static(b"new"),
681            producer_id: String::new(),
682            seq: 0,
683            ts_ms: 5000,
684        });
685        let offsets: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 5000)
686            .into_iter()
687            .map(|(_, o)| o)
688            .collect();
689        assert_eq!(offsets, vec![1]);
690    }
691
692    #[test]
693    fn retention_purge_advances_watermark_and_releases_ack_tracking() {
694        let mut q = Queue::default();
695        q.apply(AppRequest::SetRetention {
696            topic: TopicId::from("t"),
697            max_messages: 3,
698            max_age_ms: 0,
699        });
700        produce(&mut q, "t", 0, b"seed", "", 0);
701        poll_offsets(&mut q, "t", "g", 1, 1, 0);
702        for i in 0..5 {
703            produce(&mut q, "t", 0, &[i], "", 0);
704        }
705        let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 10);
706        let offsets: Vec<Offset> = leased.iter().map(|(_, o)| *o).collect();
707        assert_eq!(offsets, vec![3, 4, 5]);
708        for (lease_id, _) in leased {
709            q.apply(AppRequest::Ack {
710                topic: TopicId::from("t"),
711                group: GroupId::from("g"),
712                lease_id,
713            });
714        }
715        let g = &q.topics["t"].groups["g"];
716        assert_eq!(g.ack_watermark, 6, "watermark must advance past purged offsets");
717        assert!(g.acked_above.is_empty(), "acked_above must drain once the watermark advances");
718        assert!(g.in_flight.is_empty());
719        assert!(g.leased_offsets.is_empty());
720    }
721
722    #[test]
723    fn fully_acked_messages_are_reclaimed_from_memory() {
724        let mut q = Queue::default();
725        for i in 0..10 {
726            produce(&mut q, "t", 0, &[i], "", 0);
727        }
728        assert_eq!(q.metrics().messages, 10);
729        let leased = poll_offsets(&mut q, "t", "g", 100, 1000, 0);
730        for (lease_id, _) in leased {
731            q.apply(AppRequest::Ack {
732                topic: TopicId::from("t"),
733                group: GroupId::from("g"),
734                lease_id,
735            });
736        }
737        assert_eq!(q.metrics().messages, 0, "fully acked messages must be freed from RAM");
738    }
739
740    #[test]
741    fn reclaim_holds_the_line_for_the_slowest_group() {
742        let mut q = Queue::default();
743        for i in 0..10 {
744            produce(&mut q, "t", 0, &[i], "", 0);
745        }
746        let _g2_registers = poll_offsets(&mut q, "t", "g2", 1, 1_000_000, 0);
747        let leased = poll_offsets(&mut q, "t", "g1", 100, 1000, 0);
748        for (lease_id, _) in leased {
749            q.apply(AppRequest::Ack {
750                topic: TopicId::from("t"),
751                group: GroupId::from("g1"),
752                lease_id,
753            });
754        }
755        assert_eq!(
756            q.metrics().messages,
757            10,
758            "messages must be retained while g2 still lags"
759        );
760        q.apply(AppRequest::CommitOffset {
761            topic: TopicId::from("t"),
762            group: GroupId::from("g2"),
763            offset: 10,
764        });
765        assert_eq!(
766            q.metrics().messages,
767            0,
768            "memory is freed once every group has consumed past the messages"
769        );
770    }
771
772    #[test]
773    fn unconsumed_topic_is_never_reclaimed() {
774        let mut q = Queue::default();
775        for i in 0..5 {
776            produce(&mut q, "t", 0, &[i], "", 0);
777        }
778        assert_eq!(q.metrics().messages, 5, "no groups means nothing is consumed yet");
779    }
780
781    #[test]
782    fn drained_topic_sheds_idle_caught_up_groups() {
783        let mut t = TopicState::default();
784        t.next_offset = 10;
785        for i in 0..(GROUP_GC_THRESHOLD + 5) {
786            t.groups.insert(
787                format!("g{i}"),
788                GroupState {
789                    ack_watermark: 10,
790                    ..GroupState::default()
791                },
792            );
793        }
794        gc_idle_groups(&mut t);
795        assert!(t.groups.is_empty(), "a drained topic must forget caught-up idle groups");
796    }
797
798    #[test]
799    fn small_group_counts_are_left_alone() {
800        let mut t = TopicState::default();
801        t.next_offset = 10;
802        t.groups.insert(
803            "g".to_string(),
804            GroupState {
805                ack_watermark: 10,
806                ..GroupState::default()
807            },
808        );
809        gc_idle_groups(&mut t);
810        assert_eq!(t.groups.len(), 1, "below the threshold nothing is touched");
811    }
812
813    #[test]
814    fn group_holding_a_lease_or_lagging_survives_gc() {
815        let mut t = TopicState::default();
816        t.next_offset = 10;
817        for i in 0..(GROUP_GC_THRESHOLD + 1) {
818            t.groups.insert(
819                format!("g{i}"),
820                GroupState {
821                    ack_watermark: 10,
822                    ..GroupState::default()
823                },
824            );
825        }
826        let mut leased = GroupState {
827            ack_watermark: 10,
828            ..GroupState::default()
829        };
830        leased.in_flight.insert(1, Lease { offset: 5, deadline_ms: 0 });
831        t.groups.insert("leased".to_string(), leased);
832        t.groups.insert(
833            "lagging".to_string(),
834            GroupState {
835                ack_watermark: 3,
836                ..GroupState::default()
837            },
838        );
839        gc_idle_groups(&mut t);
840        assert!(t.groups.contains_key("leased"), "a group with an in-flight lease must be kept");
841        assert!(t.groups.contains_key("lagging"), "a lagging group must be kept");
842        assert_eq!(t.groups.len(), 2);
843    }
844
845    #[test]
846    fn default_retention_cap_applies_when_unset() {
847        let t = TopicState::default();
848        assert_eq!(effective_retention(&t), (DEFAULT_RETAIN_MAX_MESSAGES, 0));
849    }
850
851    #[test]
852    fn explicit_retention_overrides_default_cap() {
853        let mut by_count = TopicState::default();
854        by_count.retain_max_messages = 50;
855        assert_eq!(effective_retention(&by_count), (50, 0));
856
857        let mut by_age = TopicState::default();
858        by_age.retain_max_age_ms = 1000;
859        assert_eq!(effective_retention(&by_age), (0, 1000));
860    }
861
862    #[test]
863    fn default_cap_bounds_an_unconsumed_topic() {
864        let mut t = TopicState::default();
865        for offset in 0..(DEFAULT_RETAIN_MAX_MESSAGES + 5) {
866            t.messages.insert(
867                offset,
868                Message {
869                    offset,
870                    priority: Priority(0),
871                    content_type: ContentType::Raw,
872                    payload: Bytes::new(),
873                    ts_ms: 0,
874                },
875            );
876            t.next_offset = offset + 1;
877        }
878        purge_retained(&mut t, 0);
879        assert_eq!(t.messages.len() as u64, DEFAULT_RETAIN_MAX_MESSAGES);
880        assert_eq!(t.messages.keys().next().copied(), Some(5));
881    }
882
883    #[test]
884    fn produce_many_assigns_offsets_in_order_and_dedups() {
885        let mut q = Queue::default();
886        let item = |payload: &[u8], producer: &str, seq: u64| ProduceItem {
887            topic: TopicId::from("t"),
888            priority: Priority(0),
889            content_type: ContentType::Raw,
890            payload: Bytes::copy_from_slice(payload),
891            producer_id: producer.to_string(),
892            seq,
893            ts_ms: 0,
894        };
895        let resp = q.apply(AppRequest::ProduceMany {
896            items: vec![
897                item(b"a", "p", 1),
898                item(b"b", "p", 2),
899                item(b"a-retry", "p", 1),
900                item(b"c", "", 0),
901            ],
902        });
903        match resp {
904            AppResponse::ProducedMany { offsets } => {
905                assert_eq!(offsets, vec![0, 1, 0, 2], "dedup inside a batch must return the original offset");
906            }
907            other => panic!("expected ProducedMany, got {other:?}"),
908        }
909        let polled = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
910        assert_eq!(polled.len(), 3, "the dedup re-send must not create a fourth message");
911    }
912
913    #[test]
914    fn ack_many_acks_all_leases() {
915        let mut q = Queue::default();
916        for i in 0..3 {
917            produce(&mut q, "t", 0, &[i], "", 0);
918        }
919        let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
920        let lease_ids: Vec<LeaseId> = leased.iter().map(|(id, _)| *id).collect();
921        q.apply(AppRequest::AckMany {
922            topic: TopicId::from("t"),
923            group: GroupId::from("g"),
924            lease_ids,
925        });
926        let after = poll_offsets(&mut q, "t", "g", 10, 1000, 5000);
927        assert!(after.is_empty(), "all leases must be acked in one request");
928    }
929
930    #[test]
931    fn nack_many_releases_all_leases() {
932        let mut q = Queue::default();
933        for i in 0..3 {
934            produce(&mut q, "t", 0, &[i], "", 0);
935        }
936        let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
937        let lease_ids: Vec<LeaseId> = leased.iter().map(|(id, _)| *id).collect();
938        q.apply(AppRequest::NackMany {
939            topic: TopicId::from("t"),
940            group: GroupId::from("g"),
941            lease_ids,
942        });
943        let again = poll_offsets(&mut q, "t", "g", 10, 1000, 1);
944        assert_eq!(again.len(), 3, "nacked messages must be immediately redeliverable");
945    }
946
947    #[test]
948    fn separate_groups_each_see_every_message() {
949        let mut q = Queue::default();
950        produce(&mut q, "t", 0, b"a", "p", 1);
951        let g1 = poll_offsets(&mut q, "t", "g1", 10, 1000, 0);
952        let g2 = poll_offsets(&mut q, "t", "g2", 10, 1000, 0);
953        assert_eq!(g1.len(), 1);
954        assert_eq!(g2.len(), 1);
955    }
956
957    #[test]
958    fn competing_consumers_in_group_split_work() {
959        let mut q = Queue::default();
960        for i in 0..4 {
961            produce(&mut q, "t", 0, &[i], "", 0);
962        }
963        let a = poll_offsets(&mut q, "t", "g", 2, 100_000, 0);
964        let b = poll_offsets(&mut q, "t", "g", 2, 100_000, 1);
965        assert_eq!(a.len(), 2);
966        assert_eq!(b.len(), 2);
967        let mut all: Vec<Offset> = a.iter().chain(b.iter()).map(|(_, o)| *o).collect();
968        all.sort();
969        assert_eq!(all, vec![0, 1, 2, 3], "each message delivered to exactly one consumer");
970    }
971
972    #[test]
973    fn single_consumer_sees_fifo() {
974        let mut q = Queue::default();
975        for i in 0..5 {
976            produce(&mut q, "t", 0, &[i], "", 0);
977        }
978        let offsets: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 0)
979            .into_iter()
980            .map(|(_, o)| o)
981            .collect();
982        assert_eq!(offsets, vec![0, 1, 2, 3, 4]);
983    }
984
985    proptest! {
986        #[test]
987        fn dedup_then_drain_is_exactly_once(keys in prop::collection::vec(0u8..6, 1..40)) {
988            let mut q = Queue::default();
989            let mut distinct = BTreeSet::new();
990            for (i, k) in keys.iter().enumerate() {
991                let producer = format!("p{k}");
992                distinct.insert(producer.clone());
993                q.apply(AppRequest::Produce {
994                    topic: TopicId::from("t"),
995                    priority: Priority(0),
996                    content_type: ContentType::Raw,
997                    payload: Bytes::from(vec![*k]),
998                    producer_id: producer,
999                    seq: 0,
1000                    ts_ms: i as u64,
1001                });
1002            }
1003
1004            let mut drained = 0usize;
1005            let mut ts = 1_000u64;
1006            loop {
1007                let resp = q.apply(AppRequest::Poll {
1008                    topic: TopicId::from("t"),
1009                    group: GroupId::from("g"),
1010                    max: 8,
1011                    visibility_timeout_ms: 1000,
1012                    ts_ms: ts,
1013                });
1014                ts += 2000;
1015                let items = match resp {
1016                    AppResponse::Polled { items } => items,
1017                    other => panic!("expected Polled, got {other:?}"),
1018                };
1019                if items.is_empty() {
1020                    break;
1021                }
1022                for d in items {
1023                    drained += 1;
1024                    q.apply(AppRequest::Ack {
1025                        topic: TopicId::from("t"),
1026                        group: GroupId::from("g"),
1027                        lease_id: d.lease_id,
1028                    });
1029                }
1030                prop_assert!(ts < 10_000_000);
1031            }
1032            prop_assert_eq!(drained, distinct.len());
1033        }
1034    }
1035}