1use std::collections::{BTreeMap, BTreeSet, VecDeque};
2
3use serde::{Deserialize, Serialize};
4
5use crate::raft::{AppRequest, AppResponse, Delivered, ProduceItem};
6use crate::types::{LeaseId, Message, Offset, Priority};
7
8const DEDUP_CAPACITY: usize = 100_000;
9const RESERVED_DEN: u64 = 4;
10const DEFAULT_RETAIN_MAX_MESSAGES: u64 = 1_000_000;
11const GROUP_GC_THRESHOLD: usize = 1024;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14struct Lease {
15 offset: Offset,
16 deadline_ms: u64,
17}
18
19#[derive(Debug, Default, Clone, Serialize, Deserialize)]
20struct GroupState {
21 ack_watermark: Offset,
22 acked_above: BTreeSet<Offset>,
23 in_flight: BTreeMap<LeaseId, Lease>,
24 leased_offsets: BTreeMap<Offset, u64>,
25 poll_count: u64,
26}
27
28impl GroupState {
29 fn mark_acked(&mut self, offset: Offset) {
30 if offset < self.ack_watermark {
31 return;
32 }
33 self.acked_above.insert(offset);
34 while self.acked_above.remove(&self.ack_watermark) {
35 self.ack_watermark += 1;
36 }
37 }
38
39 fn expire(&mut self, now_ms: u64) {
40 let expired: Vec<LeaseId> = self
41 .in_flight
42 .iter()
43 .filter(|(_, l)| l.deadline_ms <= now_ms)
44 .map(|(id, _)| *id)
45 .collect();
46 for id in expired {
47 if let Some(l) = self.in_flight.remove(&id) {
48 self.leased_offsets.remove(&l.offset);
49 }
50 }
51 }
52
53 fn advance_watermark(&mut self, to: Offset) {
54 if to > self.ack_watermark {
55 self.ack_watermark = to;
56 }
57 let watermark = self.ack_watermark;
58 self.acked_above.retain(|o| *o >= watermark);
59 let drop: Vec<LeaseId> = self
60 .in_flight
61 .iter()
62 .filter(|(_, l)| l.offset < watermark)
63 .map(|(id, _)| *id)
64 .collect();
65 for id in drop {
66 if let Some(l) = self.in_flight.remove(&id) {
67 self.leased_offsets.remove(&l.offset);
68 }
69 }
70 while self.acked_above.remove(&self.ack_watermark) {
71 self.ack_watermark += 1;
72 }
73 }
74}
75
76#[derive(Debug, Default, Clone, Serialize, Deserialize)]
77struct TopicState {
78 next_offset: Offset,
79 messages: BTreeMap<Offset, Message>,
80 dedup: BTreeMap<(String, u64), Offset>,
81 dedup_order: VecDeque<(String, u64)>,
82 groups: BTreeMap<String, GroupState>,
83 rate_milli_per_sec: u64,
84 burst: u32,
85 retain_max_messages: u64,
86 retain_max_age_ms: u64,
87}
88
89#[derive(Debug, Default, Clone, Serialize, Deserialize)]
90pub struct Queue {
91 topics: BTreeMap<String, TopicState>,
92 next_lease_id: LeaseId,
93}
94
95impl Queue {
96 pub fn apply(&mut self, req: AppRequest) -> AppResponse {
97 match req {
98 AppRequest::CreateTopic { topic } => {
99 self.topics.entry(topic.0).or_default();
100 AppResponse::TopicCreated
101 }
102 AppRequest::DeleteTopic { topic } => {
103 self.topics.remove(&topic.0);
104 AppResponse::TopicDeleted
105 }
106 AppRequest::Produce {
107 topic,
108 priority,
109 content_type,
110 payload,
111 producer_id,
112 seq,
113 ts_ms,
114 } => {
115 let offset = self.produce_one(ProduceItem {
116 topic,
117 priority,
118 content_type,
119 payload,
120 producer_id,
121 seq,
122 ts_ms,
123 });
124 AppResponse::Produced { offset }
125 }
126 AppRequest::ProduceMany { items } => {
127 let offsets = items.into_iter().map(|item| self.produce_one(item)).collect();
128 AppResponse::ProducedMany { offsets }
129 }
130 AppRequest::Poll {
131 topic,
132 group,
133 max,
134 visibility_timeout_ms,
135 ts_ms,
136 } => {
137 let mut next_lease = self.next_lease_id;
138 let items = poll(
139 &mut self.topics,
140 &topic.0,
141 &group.0,
142 max,
143 visibility_timeout_ms,
144 ts_ms,
145 &mut next_lease,
146 );
147 self.next_lease_id = next_lease;
148 AppResponse::Polled { items }
149 }
150 AppRequest::Ack {
151 topic,
152 group,
153 lease_id,
154 } => {
155 ack(&mut self.topics, &topic.0, &group.0, lease_id);
156 reclaim_topic(&mut self.topics, &topic.0);
157 AppResponse::Acked
158 }
159 AppRequest::AckMany {
160 topic,
161 group,
162 lease_ids,
163 } => {
164 for lease_id in lease_ids {
165 ack(&mut self.topics, &topic.0, &group.0, lease_id);
166 }
167 reclaim_topic(&mut self.topics, &topic.0);
168 AppResponse::Acked
169 }
170 AppRequest::NackMany {
171 topic,
172 group,
173 lease_ids,
174 } => {
175 for lease_id in lease_ids {
176 nack(&mut self.topics, &topic.0, &group.0, lease_id);
177 }
178 AppResponse::Nacked
179 }
180 AppRequest::Nack {
181 topic,
182 group,
183 lease_id,
184 } => {
185 nack(&mut self.topics, &topic.0, &group.0, lease_id);
186 AppResponse::Nacked
187 }
188 AppRequest::CommitOffset {
189 topic,
190 group,
191 offset,
192 } => {
193 commit(&mut self.topics, &topic.0, &group.0, offset);
194 reclaim_topic(&mut self.topics, &topic.0);
195 AppResponse::Committed
196 }
197 AppRequest::SetRateLimit {
198 topic,
199 rate_milli_per_sec,
200 burst,
201 } => {
202 let t = self.topics.entry(topic.0).or_default();
203 t.rate_milli_per_sec = rate_milli_per_sec;
204 t.burst = burst;
205 AppResponse::RateLimitSet
206 }
207 AppRequest::SetRetention {
208 topic,
209 max_messages,
210 max_age_ms,
211 } => {
212 let t = self.topics.entry(topic.0).or_default();
213 t.retain_max_messages = max_messages;
214 t.retain_max_age_ms = max_age_ms;
215 AppResponse::RetentionSet
216 }
217 }
218 }
219
220 fn produce_one(&mut self, item: ProduceItem) -> Offset {
221 let t = self.topics.entry(item.topic.0).or_default();
222 let dedup = !item.producer_id.is_empty();
223 if dedup {
224 if let Some(offset) = t.dedup.get(&(item.producer_id.clone(), item.seq)) {
225 return *offset;
226 }
227 }
228 let offset = t.next_offset;
229 t.next_offset += 1;
230 t.messages.insert(
231 offset,
232 Message {
233 offset,
234 priority: item.priority,
235 content_type: item.content_type,
236 payload: item.payload,
237 ts_ms: item.ts_ms,
238 },
239 );
240 if dedup {
241 let key = (item.producer_id, item.seq);
242 t.dedup.insert(key.clone(), offset);
243 t.dedup_order.push_back(key);
244 while t.dedup_order.len() > DEDUP_CAPACITY {
245 if let Some(old) = t.dedup_order.pop_front() {
246 t.dedup.remove(&old);
247 }
248 }
249 }
250 purge_retained(t, item.ts_ms);
251 offset
252 }
253
254 pub fn rate_config(&self, topic: &str) -> Option<(u64, u32)> {
255 self.topics.get(topic).and_then(|t| {
256 if t.rate_milli_per_sec > 0 {
257 Some((t.rate_milli_per_sec, t.burst))
258 } else {
259 None
260 }
261 })
262 }
263
264 pub fn has_deliverable(&self, topic: &str, group: &str, now_ms: u64) -> bool {
265 let Some(t) = self.topics.get(topic) else {
266 return false;
267 };
268 match t.groups.get(group) {
269 None => !t.messages.is_empty(),
270 Some(g) => t.messages.range(g.ack_watermark..).any(|(offset, _)| {
271 !g.acked_above.contains(offset)
272 && g.leased_offsets
273 .get(offset)
274 .is_none_or(|deadline| *deadline <= now_ms)
275 }),
276 }
277 }
278
279 pub fn metrics(&self) -> QueueMetrics {
280 let mut metrics = QueueMetrics {
281 topics: self.topics.len() as u64,
282 messages: 0,
283 in_flight: 0,
284 };
285 for t in self.topics.values() {
286 metrics.messages += t.messages.len() as u64;
287 for g in t.groups.values() {
288 metrics.in_flight += g.in_flight.len() as u64;
289 }
290 }
291 metrics
292 }
293}
294
295#[derive(Debug, Clone, Copy, Default)]
296pub struct QueueMetrics {
297 pub topics: u64,
298 pub messages: u64,
299 pub in_flight: u64,
300}
301
302fn poll(
303 topics: &mut BTreeMap<String, TopicState>,
304 topic: &str,
305 group: &str,
306 max: u32,
307 visibility_timeout_ms: u64,
308 ts_ms: u64,
309 next_lease: &mut LeaseId,
310) -> Vec<Delivered> {
311 let Some(t) = topics.get_mut(topic) else {
312 return Vec::new();
313 };
314 let first = t.messages.keys().next().copied().unwrap_or(t.next_offset);
315 let g = t.groups.entry(group.to_string()).or_insert_with(|| GroupState {
316 ack_watermark: first,
317 ..GroupState::default()
318 });
319 g.expire(ts_ms);
320
321 let max = max as usize;
322 let mut candidates: Vec<(Priority, Offset)> = Vec::new();
323 for (offset, message) in t.messages.range(g.ack_watermark..) {
324 if !g.acked_above.contains(offset) && !g.leased_offsets.contains_key(offset) {
325 candidates.push((message.priority, *offset));
326 }
327 }
328
329 let poll_count = g.poll_count;
330 g.poll_count += 1;
331
332 if candidates.is_empty() || max == 0 {
333 return Vec::new();
334 }
335
336 let base = max / RESERVED_DEN as usize;
337 let bonus = usize::from((poll_count + 1) % RESERVED_DEN == 0);
338 let reserved = (base + bonus).min(max);
339 let priority_slots = max - reserved;
340
341 let by_offset = candidates.clone();
342 let mut by_priority = candidates;
343 by_priority.sort_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
344
345 let mut chosen_set: BTreeSet<Offset> = BTreeSet::new();
346 let mut chosen: Vec<(Priority, Offset)> = Vec::new();
347 for item in by_priority.iter() {
348 if chosen.len() >= priority_slots {
349 break;
350 }
351 if chosen_set.insert(item.1) {
352 chosen.push(*item);
353 }
354 }
355 for item in by_offset.iter() {
356 if chosen.len() >= max {
357 break;
358 }
359 if chosen_set.insert(item.1) {
360 chosen.push(*item);
361 }
362 }
363 for item in by_priority.iter() {
364 if chosen.len() >= max {
365 break;
366 }
367 if chosen_set.insert(item.1) {
368 chosen.push(*item);
369 }
370 }
371 chosen.sort_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
372
373 let mut items = Vec::new();
374 for (_, offset) in chosen {
375 let lease_id = *next_lease;
376 *next_lease += 1;
377 let deadline_ms = ts_ms.saturating_add(visibility_timeout_ms);
378 g.in_flight.insert(lease_id, Lease { offset, deadline_ms });
379 g.leased_offsets.insert(offset, deadline_ms);
380 let message = &t.messages[&offset];
381 items.push(Delivered {
382 lease_id,
383 offset,
384 priority: message.priority,
385 content_type: message.content_type,
386 payload: message.payload.clone(),
387 ts_ms: message.ts_ms,
388 });
389 }
390 items
391}
392
393fn ack(topics: &mut BTreeMap<String, TopicState>, topic: &str, group: &str, lease_id: LeaseId) {
394 if let Some(t) = topics.get_mut(topic) {
395 if let Some(g) = t.groups.get_mut(group) {
396 if let Some(lease) = g.in_flight.remove(&lease_id) {
397 g.leased_offsets.remove(&lease.offset);
398 g.mark_acked(lease.offset);
399 }
400 }
401 }
402}
403
404fn nack(topics: &mut BTreeMap<String, TopicState>, topic: &str, group: &str, lease_id: LeaseId) {
405 if let Some(t) = topics.get_mut(topic) {
406 if let Some(g) = t.groups.get_mut(group) {
407 if let Some(lease) = g.in_flight.remove(&lease_id) {
408 g.leased_offsets.remove(&lease.offset);
409 }
410 }
411 }
412}
413
414fn effective_retention(t: &TopicState) -> (u64, u64) {
415 if t.retain_max_messages == 0 && t.retain_max_age_ms == 0 {
416 (DEFAULT_RETAIN_MAX_MESSAGES, 0)
417 } else {
418 (t.retain_max_messages, t.retain_max_age_ms)
419 }
420}
421
422fn purge_retained(t: &mut TopicState, now_ms: u64) {
423 let (max_messages, max_age_ms) = effective_retention(t);
424 let mut purge: BTreeSet<Offset> = BTreeSet::new();
425 if max_age_ms > 0 {
426 for (offset, message) in t.messages.iter() {
427 if now_ms.saturating_sub(message.ts_ms) > max_age_ms {
428 purge.insert(*offset);
429 } else {
430 break;
431 }
432 }
433 }
434 if max_messages > 0 {
435 let target = max_messages as usize;
436 let kept = t.messages.len() - purge.len();
437 if kept > target {
438 let mut need = kept - target;
439 for offset in t.messages.keys() {
440 if need == 0 {
441 break;
442 }
443 if purge.insert(*offset) {
444 need -= 1;
445 }
446 }
447 }
448 }
449 if purge.is_empty() {
450 return;
451 }
452 for offset in &purge {
453 t.messages.remove(offset);
454 }
455 let boundary = t.messages.keys().next().copied().unwrap_or(t.next_offset);
456 for g in t.groups.values_mut() {
457 if boundary > g.ack_watermark {
458 g.advance_watermark(boundary);
459 }
460 }
461}
462
463fn reclaim_topic(topics: &mut BTreeMap<String, TopicState>, topic: &str) {
464 if let Some(t) = topics.get_mut(topic) {
465 reclaim(t);
466 }
467}
468
469fn reclaim(t: &mut TopicState) {
470 if let Some(boundary) = t.groups.values().map(|g| g.ack_watermark).min() {
471 if t.messages.keys().next().is_some_and(|first| boundary > *first) {
472 t.messages = t.messages.split_off(&boundary);
473 }
474 }
475 gc_idle_groups(t);
476}
477
478fn gc_idle_groups(t: &mut TopicState) {
479 if t.groups.len() <= GROUP_GC_THRESHOLD || !t.messages.is_empty() {
480 return;
481 }
482 let next = t.next_offset;
483 t.groups.retain(|_, g| {
484 !(g.ack_watermark >= next
485 && g.in_flight.is_empty()
486 && g.leased_offsets.is_empty()
487 && g.acked_above.is_empty())
488 });
489}
490
491fn group_entry<'a>(t: &'a mut TopicState, group: &str) -> &'a mut GroupState {
492 let first = t.messages.keys().next().copied().unwrap_or(t.next_offset);
493 t.groups.entry(group.to_string()).or_insert_with(|| GroupState {
494 ack_watermark: first,
495 ..GroupState::default()
496 })
497}
498
499fn commit(topics: &mut BTreeMap<String, TopicState>, topic: &str, group: &str, offset: Offset) {
500 if let Some(t) = topics.get_mut(topic) {
501 let g = group_entry(t, group);
502 g.advance_watermark(offset);
503 }
504}
505
506#[cfg(test)]
507mod tests {
508 use super::*;
509 use crate::types::{ContentType, GroupId, Priority, TopicId};
510 use bytes::Bytes;
511 use proptest::prelude::*;
512
513 fn produce(q: &mut Queue, topic: &str, priority: u8, body: &[u8], producer: &str, seq: u64) -> Offset {
514 match q.apply(AppRequest::Produce {
515 topic: TopicId::from(topic),
516 priority: Priority(priority),
517 content_type: ContentType::Raw,
518 payload: Bytes::copy_from_slice(body),
519 producer_id: producer.to_string(),
520 seq,
521 ts_ms: 0,
522 }) {
523 AppResponse::Produced { offset } => offset,
524 other => panic!("expected Produced, got {other:?}"),
525 }
526 }
527
528 fn poll_offsets(q: &mut Queue, topic: &str, group: &str, max: u32, vis: u64, ts: u64) -> Vec<(LeaseId, Offset)> {
529 match q.apply(AppRequest::Poll {
530 topic: TopicId::from(topic),
531 group: GroupId::from(group),
532 max,
533 visibility_timeout_ms: vis,
534 ts_ms: ts,
535 }) {
536 AppResponse::Polled { items } => items.into_iter().map(|d| (d.lease_id, d.offset)).collect(),
537 other => panic!("expected Polled, got {other:?}"),
538 }
539 }
540
541 #[test]
542 fn produce_assigns_monotonic_offsets() {
543 let mut q = Queue::default();
544 assert_eq!(produce(&mut q, "t", 0, b"a", "p", 1), 0);
545 assert_eq!(produce(&mut q, "t", 0, b"b", "p", 2), 1);
546 assert_eq!(produce(&mut q, "t", 0, b"c", "p", 3), 2);
547 }
548
549 #[test]
550 fn produce_dedup_returns_same_offset() {
551 let mut q = Queue::default();
552 let first = produce(&mut q, "t", 0, b"a", "p", 1);
553 let dup = produce(&mut q, "t", 0, b"a-again", "p", 1);
554 assert_eq!(first, dup);
555 assert_eq!(produce(&mut q, "t", 0, b"b", "p", 2), 1);
556 }
557
558 #[test]
559 fn empty_producer_id_disables_dedup() {
560 let mut q = Queue::default();
561 assert_eq!(produce(&mut q, "t", 0, b"a", "", 0), 0);
562 assert_eq!(produce(&mut q, "t", 0, b"b", "", 0), 1);
563 assert_eq!(produce(&mut q, "t", 0, b"c", "", 0), 2);
564 }
565
566 #[test]
567 fn poll_orders_by_priority_then_offset() {
568 let mut q = Queue::default();
569 produce(&mut q, "t", 0, b"a", "p", 1);
570 produce(&mut q, "t", 5, b"b", "p", 2);
571 produce(&mut q, "t", 3, b"c", "p", 3);
572 let got: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 0)
573 .into_iter()
574 .map(|(_, o)| o)
575 .collect();
576 assert_eq!(got, vec![1, 2, 0]);
577 }
578
579 #[test]
580 fn leased_messages_are_not_redelivered_until_expiry() {
581 let mut q = Queue::default();
582 produce(&mut q, "t", 0, b"a", "p", 1);
583 let first = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
584 assert_eq!(first.len(), 1);
585 let again = poll_offsets(&mut q, "t", "g", 10, 1000, 500);
586 assert!(again.is_empty());
587 let after_expiry = poll_offsets(&mut q, "t", "g", 10, 1000, 2000);
588 assert_eq!(after_expiry.len(), 1);
589 }
590
591 #[test]
592 fn ack_makes_message_done() {
593 let mut q = Queue::default();
594 produce(&mut q, "t", 0, b"a", "p", 1);
595 let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
596 let (lease_id, _) = leased[0];
597 q.apply(AppRequest::Ack {
598 topic: TopicId::from("t"),
599 group: GroupId::from("g"),
600 lease_id,
601 });
602 let after = poll_offsets(&mut q, "t", "g", 10, 1000, 5000);
603 assert!(after.is_empty());
604 }
605
606 #[test]
607 fn nack_makes_message_immediately_redeliverable() {
608 let mut q = Queue::default();
609 produce(&mut q, "t", 0, b"a", "p", 1);
610 let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
611 let (lease_id, _) = leased[0];
612 q.apply(AppRequest::Nack {
613 topic: TopicId::from("t"),
614 group: GroupId::from("g"),
615 lease_id,
616 });
617 let after = poll_offsets(&mut q, "t", "g", 10, 1000, 1);
618 assert_eq!(after.len(), 1);
619 }
620
621 #[test]
622 fn anti_starvation_serves_oldest_within_reserved_cadence() {
623 let mut q = Queue::default();
624 produce(&mut q, "t", 0, b"low", "", 0);
625 for _ in 0..5 {
626 produce(&mut q, "t", 7, b"high", "", 0);
627 }
628 let mut delivered = Vec::new();
629 for k in 0..4u64 {
630 let items = poll_offsets(&mut q, "t", "g", 1, 1_000_000, k);
631 if let Some((_, offset)) = items.first() {
632 delivered.push(*offset);
633 }
634 }
635 assert!(
636 delivered.contains(&0),
637 "low-priority oldest message must be served within the reserved cadence, got {delivered:?}"
638 );
639 }
640
641 #[test]
642 fn retention_by_count_keeps_newest() {
643 let mut q = Queue::default();
644 q.apply(AppRequest::SetRetention {
645 topic: TopicId::from("t"),
646 max_messages: 3,
647 max_age_ms: 0,
648 });
649 for i in 0..5 {
650 produce(&mut q, "t", 0, &[i], "", 0);
651 }
652 let offsets: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 0)
653 .into_iter()
654 .map(|(_, o)| o)
655 .collect();
656 assert_eq!(offsets, vec![2, 3, 4]);
657 }
658
659 #[test]
660 fn retention_by_age_drops_old() {
661 let mut q = Queue::default();
662 q.apply(AppRequest::SetRetention {
663 topic: TopicId::from("t"),
664 max_messages: 0,
665 max_age_ms: 1000,
666 });
667 q.apply(AppRequest::Produce {
668 topic: TopicId::from("t"),
669 priority: Priority(0),
670 content_type: ContentType::Raw,
671 payload: Bytes::from_static(b"old"),
672 producer_id: String::new(),
673 seq: 0,
674 ts_ms: 0,
675 });
676 q.apply(AppRequest::Produce {
677 topic: TopicId::from("t"),
678 priority: Priority(0),
679 content_type: ContentType::Raw,
680 payload: Bytes::from_static(b"new"),
681 producer_id: String::new(),
682 seq: 0,
683 ts_ms: 5000,
684 });
685 let offsets: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 5000)
686 .into_iter()
687 .map(|(_, o)| o)
688 .collect();
689 assert_eq!(offsets, vec![1]);
690 }
691
692 #[test]
693 fn retention_purge_advances_watermark_and_releases_ack_tracking() {
694 let mut q = Queue::default();
695 q.apply(AppRequest::SetRetention {
696 topic: TopicId::from("t"),
697 max_messages: 3,
698 max_age_ms: 0,
699 });
700 produce(&mut q, "t", 0, b"seed", "", 0);
701 poll_offsets(&mut q, "t", "g", 1, 1, 0);
702 for i in 0..5 {
703 produce(&mut q, "t", 0, &[i], "", 0);
704 }
705 let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 10);
706 let offsets: Vec<Offset> = leased.iter().map(|(_, o)| *o).collect();
707 assert_eq!(offsets, vec![3, 4, 5]);
708 for (lease_id, _) in leased {
709 q.apply(AppRequest::Ack {
710 topic: TopicId::from("t"),
711 group: GroupId::from("g"),
712 lease_id,
713 });
714 }
715 let g = &q.topics["t"].groups["g"];
716 assert_eq!(g.ack_watermark, 6, "watermark must advance past purged offsets");
717 assert!(g.acked_above.is_empty(), "acked_above must drain once the watermark advances");
718 assert!(g.in_flight.is_empty());
719 assert!(g.leased_offsets.is_empty());
720 }
721
722 #[test]
723 fn fully_acked_messages_are_reclaimed_from_memory() {
724 let mut q = Queue::default();
725 for i in 0..10 {
726 produce(&mut q, "t", 0, &[i], "", 0);
727 }
728 assert_eq!(q.metrics().messages, 10);
729 let leased = poll_offsets(&mut q, "t", "g", 100, 1000, 0);
730 for (lease_id, _) in leased {
731 q.apply(AppRequest::Ack {
732 topic: TopicId::from("t"),
733 group: GroupId::from("g"),
734 lease_id,
735 });
736 }
737 assert_eq!(q.metrics().messages, 0, "fully acked messages must be freed from RAM");
738 }
739
740 #[test]
741 fn reclaim_holds_the_line_for_the_slowest_group() {
742 let mut q = Queue::default();
743 for i in 0..10 {
744 produce(&mut q, "t", 0, &[i], "", 0);
745 }
746 let _g2_registers = poll_offsets(&mut q, "t", "g2", 1, 1_000_000, 0);
747 let leased = poll_offsets(&mut q, "t", "g1", 100, 1000, 0);
748 for (lease_id, _) in leased {
749 q.apply(AppRequest::Ack {
750 topic: TopicId::from("t"),
751 group: GroupId::from("g1"),
752 lease_id,
753 });
754 }
755 assert_eq!(
756 q.metrics().messages,
757 10,
758 "messages must be retained while g2 still lags"
759 );
760 q.apply(AppRequest::CommitOffset {
761 topic: TopicId::from("t"),
762 group: GroupId::from("g2"),
763 offset: 10,
764 });
765 assert_eq!(
766 q.metrics().messages,
767 0,
768 "memory is freed once every group has consumed past the messages"
769 );
770 }
771
772 #[test]
773 fn unconsumed_topic_is_never_reclaimed() {
774 let mut q = Queue::default();
775 for i in 0..5 {
776 produce(&mut q, "t", 0, &[i], "", 0);
777 }
778 assert_eq!(q.metrics().messages, 5, "no groups means nothing is consumed yet");
779 }
780
781 #[test]
782 fn drained_topic_sheds_idle_caught_up_groups() {
783 let mut t = TopicState::default();
784 t.next_offset = 10;
785 for i in 0..(GROUP_GC_THRESHOLD + 5) {
786 t.groups.insert(
787 format!("g{i}"),
788 GroupState {
789 ack_watermark: 10,
790 ..GroupState::default()
791 },
792 );
793 }
794 gc_idle_groups(&mut t);
795 assert!(t.groups.is_empty(), "a drained topic must forget caught-up idle groups");
796 }
797
798 #[test]
799 fn small_group_counts_are_left_alone() {
800 let mut t = TopicState::default();
801 t.next_offset = 10;
802 t.groups.insert(
803 "g".to_string(),
804 GroupState {
805 ack_watermark: 10,
806 ..GroupState::default()
807 },
808 );
809 gc_idle_groups(&mut t);
810 assert_eq!(t.groups.len(), 1, "below the threshold nothing is touched");
811 }
812
813 #[test]
814 fn group_holding_a_lease_or_lagging_survives_gc() {
815 let mut t = TopicState::default();
816 t.next_offset = 10;
817 for i in 0..(GROUP_GC_THRESHOLD + 1) {
818 t.groups.insert(
819 format!("g{i}"),
820 GroupState {
821 ack_watermark: 10,
822 ..GroupState::default()
823 },
824 );
825 }
826 let mut leased = GroupState {
827 ack_watermark: 10,
828 ..GroupState::default()
829 };
830 leased.in_flight.insert(1, Lease { offset: 5, deadline_ms: 0 });
831 t.groups.insert("leased".to_string(), leased);
832 t.groups.insert(
833 "lagging".to_string(),
834 GroupState {
835 ack_watermark: 3,
836 ..GroupState::default()
837 },
838 );
839 gc_idle_groups(&mut t);
840 assert!(t.groups.contains_key("leased"), "a group with an in-flight lease must be kept");
841 assert!(t.groups.contains_key("lagging"), "a lagging group must be kept");
842 assert_eq!(t.groups.len(), 2);
843 }
844
845 #[test]
846 fn default_retention_cap_applies_when_unset() {
847 let t = TopicState::default();
848 assert_eq!(effective_retention(&t), (DEFAULT_RETAIN_MAX_MESSAGES, 0));
849 }
850
851 #[test]
852 fn explicit_retention_overrides_default_cap() {
853 let mut by_count = TopicState::default();
854 by_count.retain_max_messages = 50;
855 assert_eq!(effective_retention(&by_count), (50, 0));
856
857 let mut by_age = TopicState::default();
858 by_age.retain_max_age_ms = 1000;
859 assert_eq!(effective_retention(&by_age), (0, 1000));
860 }
861
862 #[test]
863 fn default_cap_bounds_an_unconsumed_topic() {
864 let mut t = TopicState::default();
865 for offset in 0..(DEFAULT_RETAIN_MAX_MESSAGES + 5) {
866 t.messages.insert(
867 offset,
868 Message {
869 offset,
870 priority: Priority(0),
871 content_type: ContentType::Raw,
872 payload: Bytes::new(),
873 ts_ms: 0,
874 },
875 );
876 t.next_offset = offset + 1;
877 }
878 purge_retained(&mut t, 0);
879 assert_eq!(t.messages.len() as u64, DEFAULT_RETAIN_MAX_MESSAGES);
880 assert_eq!(t.messages.keys().next().copied(), Some(5));
881 }
882
883 #[test]
884 fn produce_many_assigns_offsets_in_order_and_dedups() {
885 let mut q = Queue::default();
886 let item = |payload: &[u8], producer: &str, seq: u64| ProduceItem {
887 topic: TopicId::from("t"),
888 priority: Priority(0),
889 content_type: ContentType::Raw,
890 payload: Bytes::copy_from_slice(payload),
891 producer_id: producer.to_string(),
892 seq,
893 ts_ms: 0,
894 };
895 let resp = q.apply(AppRequest::ProduceMany {
896 items: vec![
897 item(b"a", "p", 1),
898 item(b"b", "p", 2),
899 item(b"a-retry", "p", 1),
900 item(b"c", "", 0),
901 ],
902 });
903 match resp {
904 AppResponse::ProducedMany { offsets } => {
905 assert_eq!(offsets, vec![0, 1, 0, 2], "dedup inside a batch must return the original offset");
906 }
907 other => panic!("expected ProducedMany, got {other:?}"),
908 }
909 let polled = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
910 assert_eq!(polled.len(), 3, "the dedup re-send must not create a fourth message");
911 }
912
913 #[test]
914 fn ack_many_acks_all_leases() {
915 let mut q = Queue::default();
916 for i in 0..3 {
917 produce(&mut q, "t", 0, &[i], "", 0);
918 }
919 let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
920 let lease_ids: Vec<LeaseId> = leased.iter().map(|(id, _)| *id).collect();
921 q.apply(AppRequest::AckMany {
922 topic: TopicId::from("t"),
923 group: GroupId::from("g"),
924 lease_ids,
925 });
926 let after = poll_offsets(&mut q, "t", "g", 10, 1000, 5000);
927 assert!(after.is_empty(), "all leases must be acked in one request");
928 }
929
930 #[test]
931 fn nack_many_releases_all_leases() {
932 let mut q = Queue::default();
933 for i in 0..3 {
934 produce(&mut q, "t", 0, &[i], "", 0);
935 }
936 let leased = poll_offsets(&mut q, "t", "g", 10, 1000, 0);
937 let lease_ids: Vec<LeaseId> = leased.iter().map(|(id, _)| *id).collect();
938 q.apply(AppRequest::NackMany {
939 topic: TopicId::from("t"),
940 group: GroupId::from("g"),
941 lease_ids,
942 });
943 let again = poll_offsets(&mut q, "t", "g", 10, 1000, 1);
944 assert_eq!(again.len(), 3, "nacked messages must be immediately redeliverable");
945 }
946
947 #[test]
948 fn separate_groups_each_see_every_message() {
949 let mut q = Queue::default();
950 produce(&mut q, "t", 0, b"a", "p", 1);
951 let g1 = poll_offsets(&mut q, "t", "g1", 10, 1000, 0);
952 let g2 = poll_offsets(&mut q, "t", "g2", 10, 1000, 0);
953 assert_eq!(g1.len(), 1);
954 assert_eq!(g2.len(), 1);
955 }
956
957 #[test]
958 fn competing_consumers_in_group_split_work() {
959 let mut q = Queue::default();
960 for i in 0..4 {
961 produce(&mut q, "t", 0, &[i], "", 0);
962 }
963 let a = poll_offsets(&mut q, "t", "g", 2, 100_000, 0);
964 let b = poll_offsets(&mut q, "t", "g", 2, 100_000, 1);
965 assert_eq!(a.len(), 2);
966 assert_eq!(b.len(), 2);
967 let mut all: Vec<Offset> = a.iter().chain(b.iter()).map(|(_, o)| *o).collect();
968 all.sort();
969 assert_eq!(all, vec![0, 1, 2, 3], "each message delivered to exactly one consumer");
970 }
971
972 #[test]
973 fn single_consumer_sees_fifo() {
974 let mut q = Queue::default();
975 for i in 0..5 {
976 produce(&mut q, "t", 0, &[i], "", 0);
977 }
978 let offsets: Vec<Offset> = poll_offsets(&mut q, "t", "g", 10, 1000, 0)
979 .into_iter()
980 .map(|(_, o)| o)
981 .collect();
982 assert_eq!(offsets, vec![0, 1, 2, 3, 4]);
983 }
984
985 proptest! {
986 #[test]
987 fn dedup_then_drain_is_exactly_once(keys in prop::collection::vec(0u8..6, 1..40)) {
988 let mut q = Queue::default();
989 let mut distinct = BTreeSet::new();
990 for (i, k) in keys.iter().enumerate() {
991 let producer = format!("p{k}");
992 distinct.insert(producer.clone());
993 q.apply(AppRequest::Produce {
994 topic: TopicId::from("t"),
995 priority: Priority(0),
996 content_type: ContentType::Raw,
997 payload: Bytes::from(vec![*k]),
998 producer_id: producer,
999 seq: 0,
1000 ts_ms: i as u64,
1001 });
1002 }
1003
1004 let mut drained = 0usize;
1005 let mut ts = 1_000u64;
1006 loop {
1007 let resp = q.apply(AppRequest::Poll {
1008 topic: TopicId::from("t"),
1009 group: GroupId::from("g"),
1010 max: 8,
1011 visibility_timeout_ms: 1000,
1012 ts_ms: ts,
1013 });
1014 ts += 2000;
1015 let items = match resp {
1016 AppResponse::Polled { items } => items,
1017 other => panic!("expected Polled, got {other:?}"),
1018 };
1019 if items.is_empty() {
1020 break;
1021 }
1022 for d in items {
1023 drained += 1;
1024 q.apply(AppRequest::Ack {
1025 topic: TopicId::from("t"),
1026 group: GroupId::from("g"),
1027 lease_id: d.lease_id,
1028 });
1029 }
1030 prop_assert!(ts < 10_000_000);
1031 }
1032 prop_assert_eq!(drained, distinct.len());
1033 }
1034 }
1035}