1use std::collections::{HashMap, VecDeque};
9
10#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum AssignmentStrategy {
17 Range,
19 RoundRobin,
21 Sticky,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum ConsumerState {
28 Active,
30 Lagging,
32 Dead,
34}
35
36#[derive(Debug, Clone)]
38pub struct ConsumerMember {
39 pub consumer_id: String,
41 pub last_heartbeat_ms: u64,
43 pub state: ConsumerState,
45 pub assigned_partitions: Vec<u32>,
47 pub joined_at_ms: u64,
49}
50
51#[derive(Debug, Clone)]
53pub struct PartitionOffset {
54 pub partition: u32,
56 pub committed_offset: u64,
58 pub log_end_offset: u64,
60 pub owner: Option<String>,
62}
63
64impl PartitionOffset {
65 pub fn lag(&self) -> u64 {
67 self.log_end_offset.saturating_sub(self.committed_offset)
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct RebalanceEvent {
74 pub generation: u64,
76 pub timestamp_ms: u64,
78 pub strategy: AssignmentStrategy,
80 pub assignments: HashMap<String, Vec<u32>>,
82 pub partitions_moved: usize,
84}
85
86#[derive(Debug, Clone)]
88pub struct RebalanceResult {
89 pub assignments: HashMap<String, Vec<u32>>,
91 pub partitions_moved: usize,
93 pub generation: u64,
95}
96
97#[derive(Debug, Clone, Default)]
99pub struct GroupStats {
100 pub active_consumers: usize,
102 pub lagging_consumers: usize,
104 pub total_partitions: u32,
106 pub total_lag: u64,
108 pub rebalance_count: u64,
110 pub unassigned_partitions: u32,
112}
113
114#[derive(Debug)]
116pub enum GroupError {
117 DuplicateConsumer(String),
119 ConsumerNotFound(String),
121 InvalidPartition(u32),
123 NoActiveConsumers,
125 NoCoordinator,
127}
128
129impl std::fmt::Display for GroupError {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 match self {
132 GroupError::DuplicateConsumer(id) => write!(f, "consumer already exists: {id}"),
133 GroupError::ConsumerNotFound(id) => write!(f, "consumer not found: {id}"),
134 GroupError::InvalidPartition(p) => write!(f, "invalid partition: {p}"),
135 GroupError::NoActiveConsumers => write!(f, "no active consumers in group"),
136 GroupError::NoCoordinator => write!(f, "no group coordinator elected"),
137 }
138 }
139}
140
141impl std::error::Error for GroupError {}
142
143pub struct ConsumerGroup {
152 group_id: String,
154 members: HashMap<String, ConsumerMember>,
156 offsets: HashMap<u32, PartitionOffset>,
158 partition_count: u32,
160 strategy: AssignmentStrategy,
162 coordinator: Option<String>,
164 heartbeat_timeout_ms: u64,
166 session_timeout_ms: u64,
168 generation: u64,
170 rebalance_history: VecDeque<RebalanceEvent>,
172 max_history: usize,
174}
175
176impl ConsumerGroup {
177 pub fn new(
183 group_id: impl Into<String>,
184 partition_count: u32,
185 strategy: AssignmentStrategy,
186 heartbeat_timeout_ms: u64,
187 session_timeout_ms: u64,
188 ) -> Self {
189 let mut offsets = HashMap::new();
190 for p in 0..partition_count {
191 offsets.insert(
192 p,
193 PartitionOffset {
194 partition: p,
195 committed_offset: 0,
196 log_end_offset: 0,
197 owner: None,
198 },
199 );
200 }
201
202 Self {
203 group_id: group_id.into(),
204 members: HashMap::new(),
205 offsets,
206 partition_count,
207 strategy,
208 coordinator: None,
209 heartbeat_timeout_ms,
210 session_timeout_ms,
211 generation: 0,
212 rebalance_history: VecDeque::new(),
213 max_history: 100,
214 }
215 }
216
217 pub fn group_id(&self) -> &str {
219 &self.group_id
220 }
221
222 pub fn partition_count(&self) -> u32 {
224 self.partition_count
225 }
226
227 pub fn generation(&self) -> u64 {
229 self.generation
230 }
231
232 pub fn coordinator(&self) -> Option<&str> {
234 self.coordinator.as_deref()
235 }
236
237 pub fn strategy(&self) -> &AssignmentStrategy {
239 &self.strategy
240 }
241
242 pub fn set_strategy(&mut self, strategy: AssignmentStrategy) {
244 self.strategy = strategy;
245 }
246
247 pub fn member_count(&self) -> usize {
249 self.members.len()
250 }
251
252 pub fn get_member(&self, consumer_id: &str) -> Option<&ConsumerMember> {
254 self.members.get(consumer_id)
255 }
256
257 pub fn get_offset(&self, partition: u32) -> Option<&PartitionOffset> {
259 self.offsets.get(&partition)
260 }
261
262 pub fn rebalance_history(&self) -> &VecDeque<RebalanceEvent> {
264 &self.rebalance_history
265 }
266
267 pub fn join(&mut self, consumer_id: impl Into<String>, now_ms: u64) -> Result<(), GroupError> {
274 let id = consumer_id.into();
275 if self.members.contains_key(&id) {
276 return Err(GroupError::DuplicateConsumer(id));
277 }
278
279 let member = ConsumerMember {
280 consumer_id: id.clone(),
281 last_heartbeat_ms: now_ms,
282 state: ConsumerState::Active,
283 assigned_partitions: Vec::new(),
284 joined_at_ms: now_ms,
285 };
286 self.members.insert(id.clone(), member);
287
288 if self.coordinator.is_none() {
290 self.coordinator = Some(id);
291 }
292
293 Ok(())
294 }
295
296 pub fn leave(&mut self, consumer_id: &str) -> Result<(), GroupError> {
300 if self.members.remove(consumer_id).is_none() {
301 return Err(GroupError::ConsumerNotFound(consumer_id.to_string()));
302 }
303
304 for offset in self.offsets.values_mut() {
306 if offset.owner.as_deref() == Some(consumer_id) {
307 offset.owner = None;
308 }
309 }
310
311 if self.coordinator.as_deref() == Some(consumer_id) {
313 self.elect_coordinator();
314 }
315
316 Ok(())
317 }
318
319 pub fn heartbeat(&mut self, consumer_id: &str, now_ms: u64) -> Result<(), GroupError> {
321 let member = self
322 .members
323 .get_mut(consumer_id)
324 .ok_or_else(|| GroupError::ConsumerNotFound(consumer_id.to_string()))?;
325
326 member.last_heartbeat_ms = now_ms;
327 if member.state == ConsumerState::Lagging {
328 member.state = ConsumerState::Active;
329 }
330
331 Ok(())
332 }
333
334 pub fn check_heartbeats(&mut self, now_ms: u64) -> Vec<String> {
342 let mut evicted = Vec::new();
343
344 let member_ids: Vec<String> = self.members.keys().cloned().collect();
345 for id in member_ids {
346 if let Some(member) = self.members.get_mut(&id) {
347 let elapsed = now_ms.saturating_sub(member.last_heartbeat_ms);
348
349 if elapsed >= self.session_timeout_ms {
350 member.state = ConsumerState::Dead;
351 evicted.push(id.clone());
352 } else if elapsed >= self.heartbeat_timeout_ms {
353 member.state = ConsumerState::Lagging;
354 }
355 }
356 }
357
358 for id in &evicted {
360 self.members.remove(id);
361 for offset in self.offsets.values_mut() {
362 if offset.owner.as_deref() == Some(id.as_str()) {
363 offset.owner = None;
364 }
365 }
366 }
367
368 if let Some(ref coord) = self.coordinator {
370 if evicted.contains(coord) {
371 self.elect_coordinator();
372 }
373 }
374
375 evicted
376 }
377
378 pub fn elect_coordinator(&mut self) {
384 let mut candidates: Vec<&str> = self
385 .members
386 .values()
387 .filter(|m| m.state == ConsumerState::Active)
388 .map(|m| m.consumer_id.as_str())
389 .collect();
390
391 candidates.sort();
392 self.coordinator = candidates.first().map(|s| s.to_string());
393 }
394
395 pub fn commit_offset(&mut self, partition: u32, offset: u64) -> Result<(), GroupError> {
399 let po = self
400 .offsets
401 .get_mut(&partition)
402 .ok_or(GroupError::InvalidPartition(partition))?;
403 po.committed_offset = offset;
404 Ok(())
405 }
406
407 pub fn update_log_end_offset(&mut self, partition: u32, offset: u64) -> Result<(), GroupError> {
409 let po = self
410 .offsets
411 .get_mut(&partition)
412 .ok_or(GroupError::InvalidPartition(partition))?;
413 po.log_end_offset = offset;
414 Ok(())
415 }
416
417 pub fn total_lag(&self) -> u64 {
419 self.offsets.values().map(|po| po.lag()).sum()
420 }
421
422 pub fn consumer_lag(&self) -> HashMap<String, u64> {
424 let mut lags: HashMap<String, u64> = HashMap::new();
425 for po in self.offsets.values() {
426 if let Some(ref owner) = po.owner {
427 *lags.entry(owner.clone()).or_insert(0) += po.lag();
428 }
429 }
430 lags
431 }
432
433 pub fn rebalance(&mut self, now_ms: u64) -> Result<RebalanceResult, GroupError> {
439 let active_ids = self.active_consumer_ids();
440 if active_ids.is_empty() {
441 return Err(GroupError::NoActiveConsumers);
442 }
443
444 let old_assignments = self.current_assignments();
446
447 let new_assignments = match &self.strategy {
448 AssignmentStrategy::Range => self.assign_range(&active_ids),
449 AssignmentStrategy::RoundRobin => self.assign_round_robin(&active_ids),
450 AssignmentStrategy::Sticky => self.assign_sticky(&active_ids, &old_assignments),
451 };
452
453 let partitions_moved = self.count_moved(&old_assignments, &new_assignments);
454
455 self.apply_assignments(&new_assignments);
457
458 self.generation += 1;
459
460 let event = RebalanceEvent {
462 generation: self.generation,
463 timestamp_ms: now_ms,
464 strategy: self.strategy.clone(),
465 assignments: new_assignments.clone(),
466 partitions_moved,
467 };
468 self.rebalance_history.push_front(event);
469 while self.rebalance_history.len() > self.max_history {
470 self.rebalance_history.pop_back();
471 }
472
473 Ok(RebalanceResult {
474 assignments: new_assignments,
475 partitions_moved,
476 generation: self.generation,
477 })
478 }
479
480 pub fn current_assignments(&self) -> HashMap<String, Vec<u32>> {
482 let mut assignments: HashMap<String, Vec<u32>> = HashMap::new();
483 for member in self.members.values() {
484 assignments.insert(
485 member.consumer_id.clone(),
486 member.assigned_partitions.clone(),
487 );
488 }
489 assignments
490 }
491
492 pub fn stats(&self) -> GroupStats {
494 let active_consumers = self
495 .members
496 .values()
497 .filter(|m| m.state == ConsumerState::Active)
498 .count();
499 let lagging_consumers = self
500 .members
501 .values()
502 .filter(|m| m.state == ConsumerState::Lagging)
503 .count();
504 let unassigned_partitions = self
505 .offsets
506 .values()
507 .filter(|po| po.owner.is_none())
508 .count() as u32;
509
510 GroupStats {
511 active_consumers,
512 lagging_consumers,
513 total_partitions: self.partition_count,
514 total_lag: self.total_lag(),
515 rebalance_count: self.generation,
516 unassigned_partitions,
517 }
518 }
519
520 fn active_consumer_ids(&self) -> Vec<String> {
523 let mut ids: Vec<String> = self
524 .members
525 .values()
526 .filter(|m| m.state == ConsumerState::Active)
527 .map(|m| m.consumer_id.clone())
528 .collect();
529 ids.sort();
530 ids
531 }
532
533 fn assign_range(&self, consumer_ids: &[String]) -> HashMap<String, Vec<u32>> {
535 let n = consumer_ids.len();
536 let mut result: HashMap<String, Vec<u32>> = HashMap::new();
537 if n == 0 {
538 return result;
539 }
540
541 let per_consumer = self.partition_count as usize / n;
542 let remainder = self.partition_count as usize % n;
543
544 let mut partition = 0u32;
545 for (i, cid) in consumer_ids.iter().enumerate() {
546 let count = per_consumer + if i < remainder { 1 } else { 0 };
547 let mut parts = Vec::new();
548 for _ in 0..count {
549 parts.push(partition);
550 partition += 1;
551 }
552 result.insert(cid.clone(), parts);
553 }
554
555 result
556 }
557
558 fn assign_round_robin(&self, consumer_ids: &[String]) -> HashMap<String, Vec<u32>> {
560 let n = consumer_ids.len();
561 let mut result: HashMap<String, Vec<u32>> = HashMap::new();
562 if n == 0 {
563 return result;
564 }
565
566 for cid in consumer_ids {
567 result.insert(cid.clone(), Vec::new());
568 }
569
570 for p in 0..self.partition_count {
571 let idx = p as usize % n;
572 if let Some(parts) = result.get_mut(&consumer_ids[idx]) {
573 parts.push(p);
574 }
575 }
576
577 result
578 }
579
580 fn assign_sticky(
583 &self,
584 consumer_ids: &[String],
585 old_assignments: &HashMap<String, Vec<u32>>,
586 ) -> HashMap<String, Vec<u32>> {
587 let n = consumer_ids.len();
588 let mut result: HashMap<String, Vec<u32>> = HashMap::new();
589 if n == 0 {
590 return result;
591 }
592
593 for cid in consumer_ids {
594 result.insert(cid.clone(), Vec::new());
595 }
596
597 let mut assigned: Vec<bool> = vec![false; self.partition_count as usize];
599 for (cid, parts) in old_assignments {
600 if result.contains_key(cid) {
601 for &p in parts {
602 if (p as usize) < assigned.len() {
603 assigned[p as usize] = true;
604 if let Some(v) = result.get_mut(cid) {
605 v.push(p);
606 }
607 }
608 }
609 }
610 }
611
612 let mut rr_idx = 0usize;
614 for p in 0..self.partition_count {
615 if !assigned[p as usize] {
616 let cid = &consumer_ids[rr_idx % n];
617 if let Some(v) = result.get_mut(cid) {
618 v.push(p);
619 }
620 rr_idx += 1;
621 }
622 }
623
624 let target = self.partition_count as usize / n;
626 let target_extra = self.partition_count as usize % n;
627
628 let mut surplus: Vec<u32> = Vec::new();
630 let mut sorted_ids = consumer_ids.to_vec();
631 sorted_ids.sort();
632
633 for (i, cid) in sorted_ids.iter().enumerate() {
634 let max_allowed = target + if i < target_extra { 1 } else { 0 };
635 if let Some(v) = result.get_mut(cid) {
636 while v.len() > max_allowed {
637 if let Some(p) = v.pop() {
638 surplus.push(p);
639 }
640 }
641 }
642 }
643
644 let mut surplus_iter = surplus.into_iter();
646 for (i, cid) in sorted_ids.iter().enumerate() {
647 let max_allowed = target + if i < target_extra { 1 } else { 0 };
648 if let Some(v) = result.get_mut(cid) {
649 while v.len() < max_allowed {
650 if let Some(p) = surplus_iter.next() {
651 v.push(p);
652 } else {
653 break;
654 }
655 }
656 }
657 }
658
659 result
660 }
661
662 fn count_moved(
664 &self,
665 old: &HashMap<String, Vec<u32>>,
666 new: &HashMap<String, Vec<u32>>,
667 ) -> usize {
668 let mut old_owner: HashMap<u32, &str> = HashMap::new();
670 for (cid, parts) in old {
671 for &p in parts {
672 old_owner.insert(p, cid.as_str());
673 }
674 }
675
676 let mut new_owner: HashMap<u32, &str> = HashMap::new();
677 for (cid, parts) in new {
678 for &p in parts {
679 new_owner.insert(p, cid.as_str());
680 }
681 }
682
683 let mut moved = 0usize;
684 for p in 0..self.partition_count {
685 let old_o = old_owner.get(&p).copied();
686 let new_o = new_owner.get(&p).copied();
687 if old_o != new_o {
688 moved += 1;
689 }
690 }
691 moved
692 }
693
694 fn apply_assignments(&mut self, assignments: &HashMap<String, Vec<u32>>) {
696 for member in self.members.values_mut() {
698 member.assigned_partitions.clear();
699 }
700 for offset in self.offsets.values_mut() {
701 offset.owner = None;
702 }
703
704 for (cid, parts) in assignments {
706 if let Some(member) = self.members.get_mut(cid) {
707 member.assigned_partitions = parts.clone();
708 }
709 for &p in parts {
710 if let Some(offset) = self.offsets.get_mut(&p) {
711 offset.owner = Some(cid.clone());
712 }
713 }
714 }
715 }
716}
717
718#[cfg(test)]
723mod tests {
724 use super::*;
725
726 fn make_group(partitions: u32, strategy: AssignmentStrategy) -> ConsumerGroup {
727 ConsumerGroup::new("test-group", partitions, strategy, 3000, 10_000)
728 }
729
730 #[test]
733 fn test_join_single_consumer() {
734 let mut g = make_group(4, AssignmentStrategy::Range);
735 assert!(g.join("c1", 1000).is_ok());
736 assert_eq!(g.member_count(), 1);
737 assert_eq!(g.coordinator(), Some("c1"));
738 }
739
740 #[test]
741 fn test_join_duplicate_consumer_error() {
742 let mut g = make_group(4, AssignmentStrategy::Range);
743 g.join("c1", 1000).ok();
744 let err = g.join("c1", 2000);
745 assert!(err.is_err());
746 }
747
748 #[test]
749 fn test_leave_consumer() {
750 let mut g = make_group(4, AssignmentStrategy::Range);
751 g.join("c1", 1000).ok();
752 g.join("c2", 1000).ok();
753 assert!(g.leave("c1").is_ok());
754 assert_eq!(g.member_count(), 1);
755 }
756
757 #[test]
758 fn test_leave_nonexistent_error() {
759 let mut g = make_group(4, AssignmentStrategy::Range);
760 let err = g.leave("nobody");
761 assert!(err.is_err());
762 }
763
764 #[test]
765 fn test_coordinator_election_on_join() {
766 let mut g = make_group(4, AssignmentStrategy::Range);
767 g.join("c2", 1000).ok();
768 assert_eq!(g.coordinator(), Some("c2"));
769 g.join("c1", 1000).ok();
770 assert_eq!(g.coordinator(), Some("c2"));
772 }
773
774 #[test]
775 fn test_coordinator_reelection_on_leave() {
776 let mut g = make_group(4, AssignmentStrategy::Range);
777 g.join("c1", 1000).ok();
778 g.join("c2", 1000).ok();
779 assert_eq!(g.coordinator(), Some("c1"));
780 g.leave("c1").ok();
781 assert_eq!(g.coordinator(), Some("c2"));
783 }
784
785 #[test]
788 fn test_heartbeat_updates_timestamp() {
789 let mut g = make_group(4, AssignmentStrategy::Range);
790 g.join("c1", 1000).ok();
791 g.heartbeat("c1", 5000).ok();
792 let member = g.get_member("c1");
793 assert!(member.is_some());
794 assert_eq!(member.map(|m| m.last_heartbeat_ms), Some(5000));
795 }
796
797 #[test]
798 fn test_heartbeat_nonexistent_error() {
799 let mut g = make_group(4, AssignmentStrategy::Range);
800 let err = g.heartbeat("nobody", 1000);
801 assert!(err.is_err());
802 }
803
804 #[test]
805 fn test_check_heartbeats_marks_lagging() {
806 let mut g = make_group(4, AssignmentStrategy::Range);
807 g.join("c1", 0).ok();
808 let evicted = g.check_heartbeats(4000);
810 assert!(evicted.is_empty());
811 assert_eq!(
812 g.get_member("c1").map(|m| m.state.clone()),
813 Some(ConsumerState::Lagging)
814 );
815 }
816
817 #[test]
818 fn test_check_heartbeats_evicts_dead() {
819 let mut g = make_group(4, AssignmentStrategy::Range);
820 g.join("c1", 0).ok();
821 let evicted = g.check_heartbeats(11_000);
823 assert_eq!(evicted, vec!["c1"]);
824 assert_eq!(g.member_count(), 0);
825 }
826
827 #[test]
828 fn test_heartbeat_restores_lagging_to_active() {
829 let mut g = make_group(4, AssignmentStrategy::Range);
830 g.join("c1", 0).ok();
831 g.check_heartbeats(4000);
832 assert_eq!(
833 g.get_member("c1").map(|m| m.state.clone()),
834 Some(ConsumerState::Lagging)
835 );
836 g.heartbeat("c1", 4500).ok();
837 assert_eq!(
838 g.get_member("c1").map(|m| m.state.clone()),
839 Some(ConsumerState::Active)
840 );
841 }
842
843 #[test]
846 fn test_commit_offset() {
847 let mut g = make_group(4, AssignmentStrategy::Range);
848 assert!(g.commit_offset(0, 100).is_ok());
849 assert_eq!(g.get_offset(0).map(|o| o.committed_offset), Some(100));
850 }
851
852 #[test]
853 fn test_commit_offset_invalid_partition() {
854 let mut g = make_group(4, AssignmentStrategy::Range);
855 assert!(g.commit_offset(99, 100).is_err());
856 }
857
858 #[test]
859 fn test_update_log_end_offset() {
860 let mut g = make_group(4, AssignmentStrategy::Range);
861 assert!(g.update_log_end_offset(2, 500).is_ok());
862 assert_eq!(g.get_offset(2).map(|o| o.log_end_offset), Some(500));
863 }
864
865 #[test]
866 fn test_partition_lag() {
867 let mut g = make_group(4, AssignmentStrategy::Range);
868 g.update_log_end_offset(0, 1000).ok();
869 g.commit_offset(0, 300).ok();
870 assert_eq!(g.get_offset(0).map(|o| o.lag()), Some(700));
871 }
872
873 #[test]
874 fn test_total_lag() {
875 let mut g = make_group(4, AssignmentStrategy::Range);
876 for p in 0..4 {
877 g.update_log_end_offset(p, 100).ok();
878 g.commit_offset(p, 30).ok();
879 }
880 assert_eq!(g.total_lag(), 280); }
882
883 #[test]
884 fn test_consumer_lag_per_consumer() {
885 let mut g = make_group(4, AssignmentStrategy::Range);
886 g.join("c1", 0).ok();
887 g.join("c2", 0).ok();
888 g.rebalance(0).ok();
889
890 for p in 0..4 {
891 g.update_log_end_offset(p, 100).ok();
892 g.commit_offset(p, 50).ok();
893 }
894
895 let lags = g.consumer_lag();
896 assert!(lags.contains_key("c1"));
897 assert!(lags.contains_key("c2"));
898 let total: u64 = lags.values().sum();
899 assert_eq!(total, 200);
900 }
901
902 #[test]
905 fn test_range_assignment_even() {
906 let mut g = make_group(6, AssignmentStrategy::Range);
907 g.join("c1", 0).ok();
908 g.join("c2", 0).ok();
909 g.join("c3", 0).ok();
910 let result = g.rebalance(0);
911 assert!(result.is_ok());
912 let r = result.ok();
913 assert!(r.is_some());
914 let r = r.expect("rebalance result");
915 assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(2));
916 assert_eq!(r.assignments.get("c2").map(|v| v.len()), Some(2));
917 assert_eq!(r.assignments.get("c3").map(|v| v.len()), Some(2));
918 }
919
920 #[test]
921 fn test_range_assignment_uneven() {
922 let mut g = make_group(7, AssignmentStrategy::Range);
923 g.join("c1", 0).ok();
924 g.join("c2", 0).ok();
925 g.join("c3", 0).ok();
926 let r = g.rebalance(0).expect("rebalance result");
927 let total: usize = r.assignments.values().map(|v| v.len()).sum();
929 assert_eq!(total, 7);
930 }
931
932 #[test]
933 fn test_range_assignment_single_consumer() {
934 let mut g = make_group(4, AssignmentStrategy::Range);
935 g.join("c1", 0).ok();
936 let r = g.rebalance(0).expect("rebalance result");
937 assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(4));
938 }
939
940 #[test]
941 fn test_range_assignment_contiguous() {
942 let mut g = make_group(6, AssignmentStrategy::Range);
943 g.join("c1", 0).ok();
944 g.join("c2", 0).ok();
945 let r = g.rebalance(0).expect("rebalance result");
946 let c1_parts = r.assignments.get("c1").cloned().unwrap_or_default();
947 let c2_parts = r.assignments.get("c2").cloned().unwrap_or_default();
948 assert_eq!(c1_parts, vec![0, 1, 2]);
950 assert_eq!(c2_parts, vec![3, 4, 5]);
951 }
952
953 #[test]
956 fn test_roundrobin_assignment() {
957 let mut g = make_group(6, AssignmentStrategy::RoundRobin);
958 g.join("c1", 0).ok();
959 g.join("c2", 0).ok();
960 let r = g.rebalance(0).expect("rebalance result");
961 assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(3));
962 assert_eq!(r.assignments.get("c2").map(|v| v.len()), Some(3));
963 }
964
965 #[test]
966 fn test_roundrobin_interleaved() {
967 let mut g = make_group(4, AssignmentStrategy::RoundRobin);
968 g.join("c1", 0).ok();
969 g.join("c2", 0).ok();
970 let r = g.rebalance(0).expect("rebalance result");
971 let c1_parts = r.assignments.get("c1").cloned().unwrap_or_default();
972 let c2_parts = r.assignments.get("c2").cloned().unwrap_or_default();
973 assert_eq!(c1_parts, vec![0, 2]);
974 assert_eq!(c2_parts, vec![1, 3]);
975 }
976
977 #[test]
980 fn test_sticky_preserves_existing() {
981 let mut g = make_group(4, AssignmentStrategy::Sticky);
982 g.join("c1", 0).ok();
983 g.join("c2", 0).ok();
984 g.rebalance(0).ok();
985
986 g.join("c3", 100).ok();
988 let r = g.rebalance(100).expect("rebalance result");
989 let total: usize = r.assignments.values().map(|v| v.len()).sum();
990 assert_eq!(total, 4);
991 }
992
993 #[test]
994 fn test_sticky_minimal_movement() {
995 let mut g = make_group(6, AssignmentStrategy::Sticky);
996 g.join("c1", 0).ok();
997 g.join("c2", 0).ok();
998 let r1 = g.rebalance(0).expect("rebalance result");
999
1000 g.join("c3", 100).ok();
1002 let r2 = g.rebalance(100).expect("rebalance result");
1003
1004 assert!(r2.partitions_moved <= r1.assignments.values().map(|v| v.len()).sum::<usize>());
1006 }
1007
1008 #[test]
1011 fn test_rebalance_no_consumers_error() {
1012 let mut g = make_group(4, AssignmentStrategy::Range);
1013 let err = g.rebalance(0);
1014 assert!(err.is_err());
1015 }
1016
1017 #[test]
1018 fn test_rebalance_increments_generation() {
1019 let mut g = make_group(4, AssignmentStrategy::Range);
1020 g.join("c1", 0).ok();
1021 assert_eq!(g.generation(), 0);
1022 g.rebalance(0).ok();
1023 assert_eq!(g.generation(), 1);
1024 g.rebalance(100).ok();
1025 assert_eq!(g.generation(), 2);
1026 }
1027
1028 #[test]
1029 fn test_rebalance_history_recorded() {
1030 let mut g = make_group(4, AssignmentStrategy::Range);
1031 g.join("c1", 0).ok();
1032 g.rebalance(0).ok();
1033 g.rebalance(100).ok();
1034 assert_eq!(g.rebalance_history().len(), 2);
1035 assert_eq!(g.rebalance_history().front().map(|e| e.generation), Some(2));
1037 }
1038
1039 #[test]
1040 fn test_rebalance_updates_partition_owners() {
1041 let mut g = make_group(4, AssignmentStrategy::Range);
1042 g.join("c1", 0).ok();
1043 g.rebalance(0).ok();
1044 for p in 0..4 {
1045 assert_eq!(g.get_offset(p).and_then(|o| o.owner.as_deref()), Some("c1"));
1046 }
1047 }
1048
1049 #[test]
1050 fn test_rebalance_after_leave() {
1051 let mut g = make_group(4, AssignmentStrategy::Range);
1052 g.join("c1", 0).ok();
1053 g.join("c2", 0).ok();
1054 g.rebalance(0).ok();
1055 g.leave("c2").ok();
1056 let r = g.rebalance(100).expect("rebalance result");
1057 assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(4));
1059 }
1060
1061 #[test]
1064 fn test_group_stats() {
1065 let mut g = make_group(4, AssignmentStrategy::Range);
1066 g.join("c1", 0).ok();
1067 g.join("c2", 0).ok();
1068 let stats = g.stats();
1069 assert_eq!(stats.active_consumers, 2);
1070 assert_eq!(stats.total_partitions, 4);
1071 assert_eq!(stats.unassigned_partitions, 4); }
1073
1074 #[test]
1075 fn test_stats_after_rebalance() {
1076 let mut g = make_group(4, AssignmentStrategy::Range);
1077 g.join("c1", 0).ok();
1078 g.rebalance(0).ok();
1079 let stats = g.stats();
1080 assert_eq!(stats.unassigned_partitions, 0);
1081 assert_eq!(stats.rebalance_count, 1);
1082 }
1083
1084 #[test]
1085 fn test_stats_with_lag() {
1086 let mut g = make_group(2, AssignmentStrategy::Range);
1087 g.join("c1", 0).ok();
1088 g.rebalance(0).ok();
1089 g.update_log_end_offset(0, 100).ok();
1090 g.update_log_end_offset(1, 200).ok();
1091 let stats = g.stats();
1092 assert_eq!(stats.total_lag, 300);
1093 }
1094
1095 #[test]
1098 fn test_zero_partitions() {
1099 let mut g = make_group(0, AssignmentStrategy::Range);
1100 g.join("c1", 0).ok();
1101 let r = g.rebalance(0).expect("rebalance result");
1102 assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(0));
1103 }
1104
1105 #[test]
1106 fn test_more_consumers_than_partitions() {
1107 let mut g = make_group(2, AssignmentStrategy::Range);
1108 g.join("c1", 0).ok();
1109 g.join("c2", 0).ok();
1110 g.join("c3", 0).ok();
1111 let r = g.rebalance(0).expect("rebalance result");
1112 let total: usize = r.assignments.values().map(|v| v.len()).sum();
1113 assert_eq!(total, 2);
1114 }
1115
1116 #[test]
1117 fn test_set_strategy() {
1118 let mut g = make_group(4, AssignmentStrategy::Range);
1119 g.set_strategy(AssignmentStrategy::RoundRobin);
1120 assert_eq!(g.strategy(), &AssignmentStrategy::RoundRobin);
1121 }
1122
1123 #[test]
1124 fn test_current_assignments_empty() {
1125 let g = make_group(4, AssignmentStrategy::Range);
1126 let a = g.current_assignments();
1127 assert!(a.is_empty());
1128 }
1129
1130 #[test]
1131 fn test_partition_offset_lag_zero() {
1132 let po = PartitionOffset {
1133 partition: 0,
1134 committed_offset: 100,
1135 log_end_offset: 100,
1136 owner: None,
1137 };
1138 assert_eq!(po.lag(), 0);
1139 }
1140
1141 #[test]
1142 fn test_partition_offset_lag_underflow() {
1143 let po = PartitionOffset {
1144 partition: 0,
1145 committed_offset: 200,
1146 log_end_offset: 100,
1147 owner: None,
1148 };
1149 assert_eq!(po.lag(), 0); }
1151
1152 #[test]
1153 fn test_eviction_clears_partition_ownership() {
1154 let mut g = make_group(4, AssignmentStrategy::Range);
1155 g.join("c1", 0).ok();
1156 g.rebalance(0).ok();
1157 assert!(g.get_offset(0).and_then(|o| o.owner.as_deref()).is_some());
1159 g.check_heartbeats(11_000);
1161 for p in 0..4 {
1163 assert_eq!(g.get_offset(p).and_then(|o| o.owner.as_deref()), None);
1164 }
1165 }
1166
1167 #[test]
1168 fn test_coordinator_none_after_all_leave() {
1169 let mut g = make_group(4, AssignmentStrategy::Range);
1170 g.join("c1", 0).ok();
1171 g.leave("c1").ok();
1172 assert_eq!(g.coordinator(), None);
1173 }
1174
1175 #[test]
1176 fn test_roundrobin_three_consumers_five_partitions() {
1177 let mut g = make_group(5, AssignmentStrategy::RoundRobin);
1178 g.join("c1", 0).ok();
1179 g.join("c2", 0).ok();
1180 g.join("c3", 0).ok();
1181 let r = g.rebalance(0).expect("rebalance result");
1182 let c1 = r.assignments.get("c1").map(|v| v.len()).unwrap_or(0);
1183 let c2 = r.assignments.get("c2").map(|v| v.len()).unwrap_or(0);
1184 let c3 = r.assignments.get("c3").map(|v| v.len()).unwrap_or(0);
1185 assert_eq!(c1 + c2 + c3, 5);
1186 let max_val = c1.max(c2).max(c3);
1188 let min_val = c1.min(c2).min(c3);
1189 assert!(max_val - min_val <= 1);
1190 }
1191
1192 #[test]
1193 fn test_rebalance_history_max_limit() {
1194 let mut g = ConsumerGroup::new("test", 4, AssignmentStrategy::Range, 3000, 10_000);
1195 g.join("c1", 0).ok();
1196 g.max_history = 3;
1198 for i in 0..10 {
1199 g.rebalance(i * 100).ok();
1200 }
1201 assert!(g.rebalance_history().len() <= 3);
1202 }
1203
1204 #[test]
1205 fn test_multiple_heartbeats() {
1206 let mut g = make_group(4, AssignmentStrategy::Range);
1207 g.join("c1", 0).ok();
1208 for t in (1000..5000).step_by(500) {
1209 assert!(g.heartbeat("c1", t).is_ok());
1210 }
1211 assert_eq!(g.get_member("c1").map(|m| m.last_heartbeat_ms), Some(4500));
1212 }
1213
1214 #[test]
1215 fn test_group_id_accessor() {
1216 let g = make_group(4, AssignmentStrategy::Range);
1217 assert_eq!(g.group_id(), "test-group");
1218 }
1219
1220 #[test]
1221 fn test_partition_count_accessor() {
1222 let g = make_group(8, AssignmentStrategy::Range);
1223 assert_eq!(g.partition_count(), 8);
1224 }
1225
1226 #[test]
1227 fn test_group_error_display() {
1228 let e = GroupError::DuplicateConsumer("c1".to_string());
1229 assert!(format!("{e}").contains("c1"));
1230
1231 let e = GroupError::ConsumerNotFound("c2".to_string());
1232 assert!(format!("{e}").contains("c2"));
1233
1234 let e = GroupError::InvalidPartition(99);
1235 assert!(format!("{e}").contains("99"));
1236
1237 let e = GroupError::NoActiveConsumers;
1238 assert!(!format!("{e}").is_empty());
1239
1240 let e = GroupError::NoCoordinator;
1241 assert!(!format!("{e}").is_empty());
1242 }
1243}