Skip to main content

oxirs_stream/
consumer_group.rs

1//! Kafka-style consumer group coordination.
2//!
3//! Provides group membership management (join/leave/heartbeat), partition
4//! assignment strategies (Range, RoundRobin, Sticky), cooperative incremental
5//! rebalancing, offset commit tracking, heartbeat monitoring, coordinator
6//! election, consumer lag calculation, and assignment history tracking.
7
8use std::collections::{HashMap, VecDeque};
9
10// ─────────────────────────────────────────────────────────────────────────────
11// Public types
12// ─────────────────────────────────────────────────────────────────────────────
13
14/// Partition assignment strategy used during rebalancing.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum AssignmentStrategy {
17    /// Contiguous range of partitions per consumer (ordered by consumer ID).
18    Range,
19    /// Round-robin distribution across consumers.
20    RoundRobin,
21    /// Sticky: minimise partition movement across rebalances.
22    Sticky,
23}
24
25/// State of a consumer within the group.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum ConsumerState {
28    /// Consumer has joined and is receiving heartbeats.
29    Active,
30    /// Consumer missed heartbeat deadline but is not yet evicted.
31    Lagging,
32    /// Consumer has been evicted or explicitly left.
33    Dead,
34}
35
36/// A member of the consumer group.
37#[derive(Debug, Clone)]
38pub struct ConsumerMember {
39    /// Unique consumer identifier.
40    pub consumer_id: String,
41    /// Millisecond timestamp of the last heartbeat received.
42    pub last_heartbeat_ms: u64,
43    /// Current lifecycle state.
44    pub state: ConsumerState,
45    /// Partitions currently assigned to this consumer.
46    pub assigned_partitions: Vec<u32>,
47    /// Millisecond timestamp when this consumer joined the group.
48    pub joined_at_ms: u64,
49}
50
51/// Per-partition committed offset.
52#[derive(Debug, Clone)]
53pub struct PartitionOffset {
54    /// Partition identifier.
55    pub partition: u32,
56    /// Last committed offset for this partition.
57    pub committed_offset: u64,
58    /// End-of-log offset (latest produced).
59    pub log_end_offset: u64,
60    /// Consumer ID that owns this partition, if assigned.
61    pub owner: Option<String>,
62}
63
64impl PartitionOffset {
65    /// Consumer lag: messages produced but not yet committed.
66    pub fn lag(&self) -> u64 {
67        self.log_end_offset.saturating_sub(self.committed_offset)
68    }
69}
70
71/// Snapshot of a single rebalance event.
72#[derive(Debug, Clone)]
73pub struct RebalanceEvent {
74    /// Monotonic generation number.
75    pub generation: u64,
76    /// Timestamp when the rebalance occurred (ms).
77    pub timestamp_ms: u64,
78    /// Assignment strategy that was in effect.
79    pub strategy: AssignmentStrategy,
80    /// Partition assignments after the rebalance: consumer_id -> partitions.
81    pub assignments: HashMap<String, Vec<u32>>,
82    /// Number of partitions that moved between consumers.
83    pub partitions_moved: usize,
84}
85
86/// Result of a rebalance operation.
87#[derive(Debug, Clone)]
88pub struct RebalanceResult {
89    /// New assignment mapping: consumer_id -> partitions.
90    pub assignments: HashMap<String, Vec<u32>>,
91    /// Number of partitions that changed owner.
92    pub partitions_moved: usize,
93    /// Rebalance generation number.
94    pub generation: u64,
95}
96
97/// Aggregate statistics for the consumer group.
98#[derive(Debug, Clone, Default)]
99pub struct GroupStats {
100    /// Number of active consumers.
101    pub active_consumers: usize,
102    /// Number of lagging consumers.
103    pub lagging_consumers: usize,
104    /// Total number of partitions.
105    pub total_partitions: u32,
106    /// Total consumer lag across all partitions.
107    pub total_lag: u64,
108    /// Number of rebalances that have occurred.
109    pub rebalance_count: u64,
110    /// Number of unassigned partitions.
111    pub unassigned_partitions: u32,
112}
113
114/// Errors from consumer group operations.
115#[derive(Debug)]
116pub enum GroupError {
117    /// Consumer already exists in the group.
118    DuplicateConsumer(String),
119    /// Consumer not found in the group.
120    ConsumerNotFound(String),
121    /// Partition is out of range.
122    InvalidPartition(u32),
123    /// No active consumers available for assignment.
124    NoActiveConsumers,
125    /// Group has no coordinator elected.
126    NoCoordinator,
127}
128
129impl std::fmt::Display for GroupError {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        match self {
132            GroupError::DuplicateConsumer(id) => write!(f, "consumer already exists: {id}"),
133            GroupError::ConsumerNotFound(id) => write!(f, "consumer not found: {id}"),
134            GroupError::InvalidPartition(p) => write!(f, "invalid partition: {p}"),
135            GroupError::NoActiveConsumers => write!(f, "no active consumers in group"),
136            GroupError::NoCoordinator => write!(f, "no group coordinator elected"),
137        }
138    }
139}
140
141impl std::error::Error for GroupError {}
142
143// ─────────────────────────────────────────────────────────────────────────────
144// ConsumerGroup
145// ─────────────────────────────────────────────────────────────────────────────
146
147/// Kafka-style consumer group coordinator.
148///
149/// Manages consumer membership, partition assignment, offset tracking, and
150/// rebalancing across the group.
151pub struct ConsumerGroup {
152    /// Group identifier.
153    group_id: String,
154    /// Registered consumers keyed by consumer_id.
155    members: HashMap<String, ConsumerMember>,
156    /// Per-partition offset tracking.
157    offsets: HashMap<u32, PartitionOffset>,
158    /// Total number of partitions in the topic.
159    partition_count: u32,
160    /// Assignment strategy in effect.
161    strategy: AssignmentStrategy,
162    /// Current coordinator consumer ID (if elected).
163    coordinator: Option<String>,
164    /// Heartbeat timeout in milliseconds; consumers exceeding this are marked lagging.
165    heartbeat_timeout_ms: u64,
166    /// Session timeout in milliseconds; consumers exceeding this are evicted.
167    session_timeout_ms: u64,
168    /// Current rebalance generation counter.
169    generation: u64,
170    /// History of rebalance events (most recent first).
171    rebalance_history: VecDeque<RebalanceEvent>,
172    /// Maximum number of rebalance events to keep in history.
173    max_history: usize,
174}
175
176impl ConsumerGroup {
177    /// Create a new consumer group.
178    ///
179    /// `partition_count` is the number of topic partitions to manage.
180    /// `heartbeat_timeout_ms` controls when a consumer is marked lagging.
181    /// `session_timeout_ms` controls when a consumer is evicted.
182    pub fn new(
183        group_id: impl Into<String>,
184        partition_count: u32,
185        strategy: AssignmentStrategy,
186        heartbeat_timeout_ms: u64,
187        session_timeout_ms: u64,
188    ) -> Self {
189        let mut offsets = HashMap::new();
190        for p in 0..partition_count {
191            offsets.insert(
192                p,
193                PartitionOffset {
194                    partition: p,
195                    committed_offset: 0,
196                    log_end_offset: 0,
197                    owner: None,
198                },
199            );
200        }
201
202        Self {
203            group_id: group_id.into(),
204            members: HashMap::new(),
205            offsets,
206            partition_count,
207            strategy,
208            coordinator: None,
209            heartbeat_timeout_ms,
210            session_timeout_ms,
211            generation: 0,
212            rebalance_history: VecDeque::new(),
213            max_history: 100,
214        }
215    }
216
217    /// Return the group identifier.
218    pub fn group_id(&self) -> &str {
219        &self.group_id
220    }
221
222    /// Return the number of partitions.
223    pub fn partition_count(&self) -> u32 {
224        self.partition_count
225    }
226
227    /// Return the current generation.
228    pub fn generation(&self) -> u64 {
229        self.generation
230    }
231
232    /// Return the current coordinator, if any.
233    pub fn coordinator(&self) -> Option<&str> {
234        self.coordinator.as_deref()
235    }
236
237    /// Return the current assignment strategy.
238    pub fn strategy(&self) -> &AssignmentStrategy {
239        &self.strategy
240    }
241
242    /// Set the assignment strategy.
243    pub fn set_strategy(&mut self, strategy: AssignmentStrategy) {
244        self.strategy = strategy;
245    }
246
247    /// Number of registered members.
248    pub fn member_count(&self) -> usize {
249        self.members.len()
250    }
251
252    /// Get a member by ID.
253    pub fn get_member(&self, consumer_id: &str) -> Option<&ConsumerMember> {
254        self.members.get(consumer_id)
255    }
256
257    /// Get partition offset information.
258    pub fn get_offset(&self, partition: u32) -> Option<&PartitionOffset> {
259        self.offsets.get(&partition)
260    }
261
262    /// Return the rebalance history.
263    pub fn rebalance_history(&self) -> &VecDeque<RebalanceEvent> {
264        &self.rebalance_history
265    }
266
267    // ─── Membership ──────────────────────────────────────────────────────────
268
269    /// Join a consumer to the group.
270    ///
271    /// Returns an error if the consumer already exists. The first consumer
272    /// to join becomes the coordinator.
273    pub fn join(&mut self, consumer_id: impl Into<String>, now_ms: u64) -> Result<(), GroupError> {
274        let id = consumer_id.into();
275        if self.members.contains_key(&id) {
276            return Err(GroupError::DuplicateConsumer(id));
277        }
278
279        let member = ConsumerMember {
280            consumer_id: id.clone(),
281            last_heartbeat_ms: now_ms,
282            state: ConsumerState::Active,
283            assigned_partitions: Vec::new(),
284            joined_at_ms: now_ms,
285        };
286        self.members.insert(id.clone(), member);
287
288        // First consumer becomes coordinator
289        if self.coordinator.is_none() {
290            self.coordinator = Some(id);
291        }
292
293        Ok(())
294    }
295
296    /// Remove a consumer from the group.
297    ///
298    /// If the removed consumer was the coordinator, elects a new one.
299    pub fn leave(&mut self, consumer_id: &str) -> Result<(), GroupError> {
300        if self.members.remove(consumer_id).is_none() {
301            return Err(GroupError::ConsumerNotFound(consumer_id.to_string()));
302        }
303
304        // Clear partition ownership for this consumer
305        for offset in self.offsets.values_mut() {
306            if offset.owner.as_deref() == Some(consumer_id) {
307                offset.owner = None;
308            }
309        }
310
311        // Re-elect coordinator if needed
312        if self.coordinator.as_deref() == Some(consumer_id) {
313            self.elect_coordinator();
314        }
315
316        Ok(())
317    }
318
319    /// Record a heartbeat from a consumer.
320    pub fn heartbeat(&mut self, consumer_id: &str, now_ms: u64) -> Result<(), GroupError> {
321        let member = self
322            .members
323            .get_mut(consumer_id)
324            .ok_or_else(|| GroupError::ConsumerNotFound(consumer_id.to_string()))?;
325
326        member.last_heartbeat_ms = now_ms;
327        if member.state == ConsumerState::Lagging {
328            member.state = ConsumerState::Active;
329        }
330
331        Ok(())
332    }
333
334    /// Check all consumers for heartbeat timeouts.
335    ///
336    /// Marks consumers as `Lagging` if they exceed `heartbeat_timeout_ms`,
337    /// and evicts (marks `Dead` and removes partitions) those exceeding
338    /// `session_timeout_ms`.
339    ///
340    /// Returns the IDs of consumers that were evicted.
341    pub fn check_heartbeats(&mut self, now_ms: u64) -> Vec<String> {
342        let mut evicted = Vec::new();
343
344        let member_ids: Vec<String> = self.members.keys().cloned().collect();
345        for id in member_ids {
346            if let Some(member) = self.members.get_mut(&id) {
347                let elapsed = now_ms.saturating_sub(member.last_heartbeat_ms);
348
349                if elapsed >= self.session_timeout_ms {
350                    member.state = ConsumerState::Dead;
351                    evicted.push(id.clone());
352                } else if elapsed >= self.heartbeat_timeout_ms {
353                    member.state = ConsumerState::Lagging;
354                }
355            }
356        }
357
358        // Remove evicted consumers and clear their partition ownership
359        for id in &evicted {
360            self.members.remove(id);
361            for offset in self.offsets.values_mut() {
362                if offset.owner.as_deref() == Some(id.as_str()) {
363                    offset.owner = None;
364                }
365            }
366        }
367
368        // Re-elect coordinator if evicted
369        if let Some(ref coord) = self.coordinator {
370            if evicted.contains(coord) {
371                self.elect_coordinator();
372            }
373        }
374
375        evicted
376    }
377
378    // ─── Coordinator Election ────────────────────────────────────────────────
379
380    /// Elect a coordinator from the active members.
381    ///
382    /// Selects the lexicographically smallest active consumer ID.
383    pub fn elect_coordinator(&mut self) {
384        let mut candidates: Vec<&str> = self
385            .members
386            .values()
387            .filter(|m| m.state == ConsumerState::Active)
388            .map(|m| m.consumer_id.as_str())
389            .collect();
390
391        candidates.sort();
392        self.coordinator = candidates.first().map(|s| s.to_string());
393    }
394
395    // ─── Offset Management ──────────────────────────────────────────────────
396
397    /// Commit an offset for a partition.
398    pub fn commit_offset(&mut self, partition: u32, offset: u64) -> Result<(), GroupError> {
399        let po = self
400            .offsets
401            .get_mut(&partition)
402            .ok_or(GroupError::InvalidPartition(partition))?;
403        po.committed_offset = offset;
404        Ok(())
405    }
406
407    /// Update the log-end offset for a partition (producer side).
408    pub fn update_log_end_offset(&mut self, partition: u32, offset: u64) -> Result<(), GroupError> {
409        let po = self
410            .offsets
411            .get_mut(&partition)
412            .ok_or(GroupError::InvalidPartition(partition))?;
413        po.log_end_offset = offset;
414        Ok(())
415    }
416
417    /// Calculate the total consumer lag across all partitions.
418    pub fn total_lag(&self) -> u64 {
419        self.offsets.values().map(|po| po.lag()).sum()
420    }
421
422    /// Calculate per-consumer lag: sum of lags on partitions assigned to each consumer.
423    pub fn consumer_lag(&self) -> HashMap<String, u64> {
424        let mut lags: HashMap<String, u64> = HashMap::new();
425        for po in self.offsets.values() {
426            if let Some(ref owner) = po.owner {
427                *lags.entry(owner.clone()).or_insert(0) += po.lag();
428            }
429        }
430        lags
431    }
432
433    // ─── Rebalancing ────────────────────────────────────────────────────────
434
435    /// Trigger a rebalance using the current assignment strategy.
436    ///
437    /// Returns the result with partition assignments and movement count.
438    pub fn rebalance(&mut self, now_ms: u64) -> Result<RebalanceResult, GroupError> {
439        let active_ids = self.active_consumer_ids();
440        if active_ids.is_empty() {
441            return Err(GroupError::NoActiveConsumers);
442        }
443
444        // Build old assignments for movement calculation
445        let old_assignments = self.current_assignments();
446
447        let new_assignments = match &self.strategy {
448            AssignmentStrategy::Range => self.assign_range(&active_ids),
449            AssignmentStrategy::RoundRobin => self.assign_round_robin(&active_ids),
450            AssignmentStrategy::Sticky => self.assign_sticky(&active_ids, &old_assignments),
451        };
452
453        let partitions_moved = self.count_moved(&old_assignments, &new_assignments);
454
455        // Apply assignments to members and offsets
456        self.apply_assignments(&new_assignments);
457
458        self.generation += 1;
459
460        // Record in history
461        let event = RebalanceEvent {
462            generation: self.generation,
463            timestamp_ms: now_ms,
464            strategy: self.strategy.clone(),
465            assignments: new_assignments.clone(),
466            partitions_moved,
467        };
468        self.rebalance_history.push_front(event);
469        while self.rebalance_history.len() > self.max_history {
470            self.rebalance_history.pop_back();
471        }
472
473        Ok(RebalanceResult {
474            assignments: new_assignments,
475            partitions_moved,
476            generation: self.generation,
477        })
478    }
479
480    /// Get current partition assignments as consumer_id -> partitions.
481    pub fn current_assignments(&self) -> HashMap<String, Vec<u32>> {
482        let mut assignments: HashMap<String, Vec<u32>> = HashMap::new();
483        for member in self.members.values() {
484            assignments.insert(
485                member.consumer_id.clone(),
486                member.assigned_partitions.clone(),
487            );
488        }
489        assignments
490    }
491
492    /// Aggregate statistics for the group.
493    pub fn stats(&self) -> GroupStats {
494        let active_consumers = self
495            .members
496            .values()
497            .filter(|m| m.state == ConsumerState::Active)
498            .count();
499        let lagging_consumers = self
500            .members
501            .values()
502            .filter(|m| m.state == ConsumerState::Lagging)
503            .count();
504        let unassigned_partitions = self
505            .offsets
506            .values()
507            .filter(|po| po.owner.is_none())
508            .count() as u32;
509
510        GroupStats {
511            active_consumers,
512            lagging_consumers,
513            total_partitions: self.partition_count,
514            total_lag: self.total_lag(),
515            rebalance_count: self.generation,
516            unassigned_partitions,
517        }
518    }
519
520    // ─── Private helpers ─────────────────────────────────────────────────────
521
522    fn active_consumer_ids(&self) -> Vec<String> {
523        let mut ids: Vec<String> = self
524            .members
525            .values()
526            .filter(|m| m.state == ConsumerState::Active)
527            .map(|m| m.consumer_id.clone())
528            .collect();
529        ids.sort();
530        ids
531    }
532
533    /// Range assignment: partitions are divided into contiguous ranges.
534    fn assign_range(&self, consumer_ids: &[String]) -> HashMap<String, Vec<u32>> {
535        let n = consumer_ids.len();
536        let mut result: HashMap<String, Vec<u32>> = HashMap::new();
537        if n == 0 {
538            return result;
539        }
540
541        let per_consumer = self.partition_count as usize / n;
542        let remainder = self.partition_count as usize % n;
543
544        let mut partition = 0u32;
545        for (i, cid) in consumer_ids.iter().enumerate() {
546            let count = per_consumer + if i < remainder { 1 } else { 0 };
547            let mut parts = Vec::new();
548            for _ in 0..count {
549                parts.push(partition);
550                partition += 1;
551            }
552            result.insert(cid.clone(), parts);
553        }
554
555        result
556    }
557
558    /// Round-robin assignment: partitions distributed one-by-one across consumers.
559    fn assign_round_robin(&self, consumer_ids: &[String]) -> HashMap<String, Vec<u32>> {
560        let n = consumer_ids.len();
561        let mut result: HashMap<String, Vec<u32>> = HashMap::new();
562        if n == 0 {
563            return result;
564        }
565
566        for cid in consumer_ids {
567            result.insert(cid.clone(), Vec::new());
568        }
569
570        for p in 0..self.partition_count {
571            let idx = p as usize % n;
572            if let Some(parts) = result.get_mut(&consumer_ids[idx]) {
573                parts.push(p);
574            }
575        }
576
577        result
578    }
579
580    /// Sticky assignment: try to keep existing assignments, only move unassigned
581    /// or partitions from departed consumers.
582    fn assign_sticky(
583        &self,
584        consumer_ids: &[String],
585        old_assignments: &HashMap<String, Vec<u32>>,
586    ) -> HashMap<String, Vec<u32>> {
587        let n = consumer_ids.len();
588        let mut result: HashMap<String, Vec<u32>> = HashMap::new();
589        if n == 0 {
590            return result;
591        }
592
593        for cid in consumer_ids {
594            result.insert(cid.clone(), Vec::new());
595        }
596
597        // Phase 1: Keep existing assignments for consumers that are still active.
598        let mut assigned: Vec<bool> = vec![false; self.partition_count as usize];
599        for (cid, parts) in old_assignments {
600            if result.contains_key(cid) {
601                for &p in parts {
602                    if (p as usize) < assigned.len() {
603                        assigned[p as usize] = true;
604                        if let Some(v) = result.get_mut(cid) {
605                            v.push(p);
606                        }
607                    }
608                }
609            }
610        }
611
612        // Phase 2: Distribute unassigned partitions round-robin.
613        let mut rr_idx = 0usize;
614        for p in 0..self.partition_count {
615            if !assigned[p as usize] {
616                let cid = &consumer_ids[rr_idx % n];
617                if let Some(v) = result.get_mut(cid) {
618                    v.push(p);
619                }
620                rr_idx += 1;
621            }
622        }
623
624        // Phase 3: Rebalance if any consumer has too many.
625        let target = self.partition_count as usize / n;
626        let target_extra = self.partition_count as usize % n;
627
628        // Collect surplus partitions from over-loaded consumers.
629        let mut surplus: Vec<u32> = Vec::new();
630        let mut sorted_ids = consumer_ids.to_vec();
631        sorted_ids.sort();
632
633        for (i, cid) in sorted_ids.iter().enumerate() {
634            let max_allowed = target + if i < target_extra { 1 } else { 0 };
635            if let Some(v) = result.get_mut(cid) {
636                while v.len() > max_allowed {
637                    if let Some(p) = v.pop() {
638                        surplus.push(p);
639                    }
640                }
641            }
642        }
643
644        // Distribute surplus to under-loaded consumers.
645        let mut surplus_iter = surplus.into_iter();
646        for (i, cid) in sorted_ids.iter().enumerate() {
647            let max_allowed = target + if i < target_extra { 1 } else { 0 };
648            if let Some(v) = result.get_mut(cid) {
649                while v.len() < max_allowed {
650                    if let Some(p) = surplus_iter.next() {
651                        v.push(p);
652                    } else {
653                        break;
654                    }
655                }
656            }
657        }
658
659        result
660    }
661
662    /// Count how many partitions changed owner between old and new assignments.
663    fn count_moved(
664        &self,
665        old: &HashMap<String, Vec<u32>>,
666        new: &HashMap<String, Vec<u32>>,
667    ) -> usize {
668        // Build partition -> owner maps.
669        let mut old_owner: HashMap<u32, &str> = HashMap::new();
670        for (cid, parts) in old {
671            for &p in parts {
672                old_owner.insert(p, cid.as_str());
673            }
674        }
675
676        let mut new_owner: HashMap<u32, &str> = HashMap::new();
677        for (cid, parts) in new {
678            for &p in parts {
679                new_owner.insert(p, cid.as_str());
680            }
681        }
682
683        let mut moved = 0usize;
684        for p in 0..self.partition_count {
685            let old_o = old_owner.get(&p).copied();
686            let new_o = new_owner.get(&p).copied();
687            if old_o != new_o {
688                moved += 1;
689            }
690        }
691        moved
692    }
693
694    /// Apply computed assignments to members and partition offsets.
695    fn apply_assignments(&mut self, assignments: &HashMap<String, Vec<u32>>) {
696        // Clear all member assignments and partition owners first.
697        for member in self.members.values_mut() {
698            member.assigned_partitions.clear();
699        }
700        for offset in self.offsets.values_mut() {
701            offset.owner = None;
702        }
703
704        // Apply new assignments.
705        for (cid, parts) in assignments {
706            if let Some(member) = self.members.get_mut(cid) {
707                member.assigned_partitions = parts.clone();
708            }
709            for &p in parts {
710                if let Some(offset) = self.offsets.get_mut(&p) {
711                    offset.owner = Some(cid.clone());
712                }
713            }
714        }
715    }
716}
717
718// ─────────────────────────────────────────────────────────────────────────────
719// Tests
720// ─────────────────────────────────────────────────────────────────────────────
721
722#[cfg(test)]
723mod tests {
724    use super::*;
725
726    fn make_group(partitions: u32, strategy: AssignmentStrategy) -> ConsumerGroup {
727        ConsumerGroup::new("test-group", partitions, strategy, 3000, 10_000)
728    }
729
730    // ── Membership Tests ─────────────────────────────────────────────────
731
732    #[test]
733    fn test_join_single_consumer() {
734        let mut g = make_group(4, AssignmentStrategy::Range);
735        assert!(g.join("c1", 1000).is_ok());
736        assert_eq!(g.member_count(), 1);
737        assert_eq!(g.coordinator(), Some("c1"));
738    }
739
740    #[test]
741    fn test_join_duplicate_consumer_error() {
742        let mut g = make_group(4, AssignmentStrategy::Range);
743        g.join("c1", 1000).ok();
744        let err = g.join("c1", 2000);
745        assert!(err.is_err());
746    }
747
748    #[test]
749    fn test_leave_consumer() {
750        let mut g = make_group(4, AssignmentStrategy::Range);
751        g.join("c1", 1000).ok();
752        g.join("c2", 1000).ok();
753        assert!(g.leave("c1").is_ok());
754        assert_eq!(g.member_count(), 1);
755    }
756
757    #[test]
758    fn test_leave_nonexistent_error() {
759        let mut g = make_group(4, AssignmentStrategy::Range);
760        let err = g.leave("nobody");
761        assert!(err.is_err());
762    }
763
764    #[test]
765    fn test_coordinator_election_on_join() {
766        let mut g = make_group(4, AssignmentStrategy::Range);
767        g.join("c2", 1000).ok();
768        assert_eq!(g.coordinator(), Some("c2"));
769        g.join("c1", 1000).ok();
770        // Coordinator stays as the first joiner
771        assert_eq!(g.coordinator(), Some("c2"));
772    }
773
774    #[test]
775    fn test_coordinator_reelection_on_leave() {
776        let mut g = make_group(4, AssignmentStrategy::Range);
777        g.join("c1", 1000).ok();
778        g.join("c2", 1000).ok();
779        assert_eq!(g.coordinator(), Some("c1"));
780        g.leave("c1").ok();
781        // c2 becomes coordinator
782        assert_eq!(g.coordinator(), Some("c2"));
783    }
784
785    // ── Heartbeat Tests ──────────────────────────────────────────────────
786
787    #[test]
788    fn test_heartbeat_updates_timestamp() {
789        let mut g = make_group(4, AssignmentStrategy::Range);
790        g.join("c1", 1000).ok();
791        g.heartbeat("c1", 5000).ok();
792        let member = g.get_member("c1");
793        assert!(member.is_some());
794        assert_eq!(member.map(|m| m.last_heartbeat_ms), Some(5000));
795    }
796
797    #[test]
798    fn test_heartbeat_nonexistent_error() {
799        let mut g = make_group(4, AssignmentStrategy::Range);
800        let err = g.heartbeat("nobody", 1000);
801        assert!(err.is_err());
802    }
803
804    #[test]
805    fn test_check_heartbeats_marks_lagging() {
806        let mut g = make_group(4, AssignmentStrategy::Range);
807        g.join("c1", 0).ok();
808        // 4000ms elapsed > 3000ms heartbeat timeout
809        let evicted = g.check_heartbeats(4000);
810        assert!(evicted.is_empty());
811        assert_eq!(
812            g.get_member("c1").map(|m| m.state.clone()),
813            Some(ConsumerState::Lagging)
814        );
815    }
816
817    #[test]
818    fn test_check_heartbeats_evicts_dead() {
819        let mut g = make_group(4, AssignmentStrategy::Range);
820        g.join("c1", 0).ok();
821        // 11000ms > 10000ms session timeout
822        let evicted = g.check_heartbeats(11_000);
823        assert_eq!(evicted, vec!["c1"]);
824        assert_eq!(g.member_count(), 0);
825    }
826
827    #[test]
828    fn test_heartbeat_restores_lagging_to_active() {
829        let mut g = make_group(4, AssignmentStrategy::Range);
830        g.join("c1", 0).ok();
831        g.check_heartbeats(4000);
832        assert_eq!(
833            g.get_member("c1").map(|m| m.state.clone()),
834            Some(ConsumerState::Lagging)
835        );
836        g.heartbeat("c1", 4500).ok();
837        assert_eq!(
838            g.get_member("c1").map(|m| m.state.clone()),
839            Some(ConsumerState::Active)
840        );
841    }
842
843    // ── Offset Management Tests ──────────────────────────────────────────
844
845    #[test]
846    fn test_commit_offset() {
847        let mut g = make_group(4, AssignmentStrategy::Range);
848        assert!(g.commit_offset(0, 100).is_ok());
849        assert_eq!(g.get_offset(0).map(|o| o.committed_offset), Some(100));
850    }
851
852    #[test]
853    fn test_commit_offset_invalid_partition() {
854        let mut g = make_group(4, AssignmentStrategy::Range);
855        assert!(g.commit_offset(99, 100).is_err());
856    }
857
858    #[test]
859    fn test_update_log_end_offset() {
860        let mut g = make_group(4, AssignmentStrategy::Range);
861        assert!(g.update_log_end_offset(2, 500).is_ok());
862        assert_eq!(g.get_offset(2).map(|o| o.log_end_offset), Some(500));
863    }
864
865    #[test]
866    fn test_partition_lag() {
867        let mut g = make_group(4, AssignmentStrategy::Range);
868        g.update_log_end_offset(0, 1000).ok();
869        g.commit_offset(0, 300).ok();
870        assert_eq!(g.get_offset(0).map(|o| o.lag()), Some(700));
871    }
872
873    #[test]
874    fn test_total_lag() {
875        let mut g = make_group(4, AssignmentStrategy::Range);
876        for p in 0..4 {
877            g.update_log_end_offset(p, 100).ok();
878            g.commit_offset(p, 30).ok();
879        }
880        assert_eq!(g.total_lag(), 280); // 70 * 4
881    }
882
883    #[test]
884    fn test_consumer_lag_per_consumer() {
885        let mut g = make_group(4, AssignmentStrategy::Range);
886        g.join("c1", 0).ok();
887        g.join("c2", 0).ok();
888        g.rebalance(0).ok();
889
890        for p in 0..4 {
891            g.update_log_end_offset(p, 100).ok();
892            g.commit_offset(p, 50).ok();
893        }
894
895        let lags = g.consumer_lag();
896        assert!(lags.contains_key("c1"));
897        assert!(lags.contains_key("c2"));
898        let total: u64 = lags.values().sum();
899        assert_eq!(total, 200);
900    }
901
902    // ── Range Assignment Tests ───────────────────────────────────────────
903
904    #[test]
905    fn test_range_assignment_even() {
906        let mut g = make_group(6, AssignmentStrategy::Range);
907        g.join("c1", 0).ok();
908        g.join("c2", 0).ok();
909        g.join("c3", 0).ok();
910        let result = g.rebalance(0);
911        assert!(result.is_ok());
912        let r = result.ok();
913        assert!(r.is_some());
914        let r = r.expect("rebalance result");
915        assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(2));
916        assert_eq!(r.assignments.get("c2").map(|v| v.len()), Some(2));
917        assert_eq!(r.assignments.get("c3").map(|v| v.len()), Some(2));
918    }
919
920    #[test]
921    fn test_range_assignment_uneven() {
922        let mut g = make_group(7, AssignmentStrategy::Range);
923        g.join("c1", 0).ok();
924        g.join("c2", 0).ok();
925        g.join("c3", 0).ok();
926        let r = g.rebalance(0).expect("rebalance result");
927        // 7 / 3 = 2 remainder 1, first consumer gets extra
928        let total: usize = r.assignments.values().map(|v| v.len()).sum();
929        assert_eq!(total, 7);
930    }
931
932    #[test]
933    fn test_range_assignment_single_consumer() {
934        let mut g = make_group(4, AssignmentStrategy::Range);
935        g.join("c1", 0).ok();
936        let r = g.rebalance(0).expect("rebalance result");
937        assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(4));
938    }
939
940    #[test]
941    fn test_range_assignment_contiguous() {
942        let mut g = make_group(6, AssignmentStrategy::Range);
943        g.join("c1", 0).ok();
944        g.join("c2", 0).ok();
945        let r = g.rebalance(0).expect("rebalance result");
946        let c1_parts = r.assignments.get("c1").cloned().unwrap_or_default();
947        let c2_parts = r.assignments.get("c2").cloned().unwrap_or_default();
948        // c1 should get [0,1,2] and c2 should get [3,4,5]
949        assert_eq!(c1_parts, vec![0, 1, 2]);
950        assert_eq!(c2_parts, vec![3, 4, 5]);
951    }
952
953    // ── RoundRobin Assignment Tests ──────────────────────────────────────
954
955    #[test]
956    fn test_roundrobin_assignment() {
957        let mut g = make_group(6, AssignmentStrategy::RoundRobin);
958        g.join("c1", 0).ok();
959        g.join("c2", 0).ok();
960        let r = g.rebalance(0).expect("rebalance result");
961        assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(3));
962        assert_eq!(r.assignments.get("c2").map(|v| v.len()), Some(3));
963    }
964
965    #[test]
966    fn test_roundrobin_interleaved() {
967        let mut g = make_group(4, AssignmentStrategy::RoundRobin);
968        g.join("c1", 0).ok();
969        g.join("c2", 0).ok();
970        let r = g.rebalance(0).expect("rebalance result");
971        let c1_parts = r.assignments.get("c1").cloned().unwrap_or_default();
972        let c2_parts = r.assignments.get("c2").cloned().unwrap_or_default();
973        assert_eq!(c1_parts, vec![0, 2]);
974        assert_eq!(c2_parts, vec![1, 3]);
975    }
976
977    // ── Sticky Assignment Tests ──────────────────────────────────────────
978
979    #[test]
980    fn test_sticky_preserves_existing() {
981        let mut g = make_group(4, AssignmentStrategy::Sticky);
982        g.join("c1", 0).ok();
983        g.join("c2", 0).ok();
984        g.rebalance(0).ok();
985
986        // Add a third consumer and rebalance
987        g.join("c3", 100).ok();
988        let r = g.rebalance(100).expect("rebalance result");
989        let total: usize = r.assignments.values().map(|v| v.len()).sum();
990        assert_eq!(total, 4);
991    }
992
993    #[test]
994    fn test_sticky_minimal_movement() {
995        let mut g = make_group(6, AssignmentStrategy::Sticky);
996        g.join("c1", 0).ok();
997        g.join("c2", 0).ok();
998        let r1 = g.rebalance(0).expect("rebalance result");
999
1000        // Add a third consumer
1001        g.join("c3", 100).ok();
1002        let r2 = g.rebalance(100).expect("rebalance result");
1003
1004        // Sticky should move fewer partitions than total
1005        assert!(r2.partitions_moved <= r1.assignments.values().map(|v| v.len()).sum::<usize>());
1006    }
1007
1008    // ── Rebalance Protocol Tests ─────────────────────────────────────────
1009
1010    #[test]
1011    fn test_rebalance_no_consumers_error() {
1012        let mut g = make_group(4, AssignmentStrategy::Range);
1013        let err = g.rebalance(0);
1014        assert!(err.is_err());
1015    }
1016
1017    #[test]
1018    fn test_rebalance_increments_generation() {
1019        let mut g = make_group(4, AssignmentStrategy::Range);
1020        g.join("c1", 0).ok();
1021        assert_eq!(g.generation(), 0);
1022        g.rebalance(0).ok();
1023        assert_eq!(g.generation(), 1);
1024        g.rebalance(100).ok();
1025        assert_eq!(g.generation(), 2);
1026    }
1027
1028    #[test]
1029    fn test_rebalance_history_recorded() {
1030        let mut g = make_group(4, AssignmentStrategy::Range);
1031        g.join("c1", 0).ok();
1032        g.rebalance(0).ok();
1033        g.rebalance(100).ok();
1034        assert_eq!(g.rebalance_history().len(), 2);
1035        // Most recent first
1036        assert_eq!(g.rebalance_history().front().map(|e| e.generation), Some(2));
1037    }
1038
1039    #[test]
1040    fn test_rebalance_updates_partition_owners() {
1041        let mut g = make_group(4, AssignmentStrategy::Range);
1042        g.join("c1", 0).ok();
1043        g.rebalance(0).ok();
1044        for p in 0..4 {
1045            assert_eq!(g.get_offset(p).and_then(|o| o.owner.as_deref()), Some("c1"));
1046        }
1047    }
1048
1049    #[test]
1050    fn test_rebalance_after_leave() {
1051        let mut g = make_group(4, AssignmentStrategy::Range);
1052        g.join("c1", 0).ok();
1053        g.join("c2", 0).ok();
1054        g.rebalance(0).ok();
1055        g.leave("c2").ok();
1056        let r = g.rebalance(100).expect("rebalance result");
1057        // All partitions should go to c1
1058        assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(4));
1059    }
1060
1061    // ── Stats Tests ──────────────────────────────────────────────────────
1062
1063    #[test]
1064    fn test_group_stats() {
1065        let mut g = make_group(4, AssignmentStrategy::Range);
1066        g.join("c1", 0).ok();
1067        g.join("c2", 0).ok();
1068        let stats = g.stats();
1069        assert_eq!(stats.active_consumers, 2);
1070        assert_eq!(stats.total_partitions, 4);
1071        assert_eq!(stats.unassigned_partitions, 4); // not yet rebalanced
1072    }
1073
1074    #[test]
1075    fn test_stats_after_rebalance() {
1076        let mut g = make_group(4, AssignmentStrategy::Range);
1077        g.join("c1", 0).ok();
1078        g.rebalance(0).ok();
1079        let stats = g.stats();
1080        assert_eq!(stats.unassigned_partitions, 0);
1081        assert_eq!(stats.rebalance_count, 1);
1082    }
1083
1084    #[test]
1085    fn test_stats_with_lag() {
1086        let mut g = make_group(2, AssignmentStrategy::Range);
1087        g.join("c1", 0).ok();
1088        g.rebalance(0).ok();
1089        g.update_log_end_offset(0, 100).ok();
1090        g.update_log_end_offset(1, 200).ok();
1091        let stats = g.stats();
1092        assert_eq!(stats.total_lag, 300);
1093    }
1094
1095    // ── Edge Cases ───────────────────────────────────────────────────────
1096
1097    #[test]
1098    fn test_zero_partitions() {
1099        let mut g = make_group(0, AssignmentStrategy::Range);
1100        g.join("c1", 0).ok();
1101        let r = g.rebalance(0).expect("rebalance result");
1102        assert_eq!(r.assignments.get("c1").map(|v| v.len()), Some(0));
1103    }
1104
1105    #[test]
1106    fn test_more_consumers_than_partitions() {
1107        let mut g = make_group(2, AssignmentStrategy::Range);
1108        g.join("c1", 0).ok();
1109        g.join("c2", 0).ok();
1110        g.join("c3", 0).ok();
1111        let r = g.rebalance(0).expect("rebalance result");
1112        let total: usize = r.assignments.values().map(|v| v.len()).sum();
1113        assert_eq!(total, 2);
1114    }
1115
1116    #[test]
1117    fn test_set_strategy() {
1118        let mut g = make_group(4, AssignmentStrategy::Range);
1119        g.set_strategy(AssignmentStrategy::RoundRobin);
1120        assert_eq!(g.strategy(), &AssignmentStrategy::RoundRobin);
1121    }
1122
1123    #[test]
1124    fn test_current_assignments_empty() {
1125        let g = make_group(4, AssignmentStrategy::Range);
1126        let a = g.current_assignments();
1127        assert!(a.is_empty());
1128    }
1129
1130    #[test]
1131    fn test_partition_offset_lag_zero() {
1132        let po = PartitionOffset {
1133            partition: 0,
1134            committed_offset: 100,
1135            log_end_offset: 100,
1136            owner: None,
1137        };
1138        assert_eq!(po.lag(), 0);
1139    }
1140
1141    #[test]
1142    fn test_partition_offset_lag_underflow() {
1143        let po = PartitionOffset {
1144            partition: 0,
1145            committed_offset: 200,
1146            log_end_offset: 100,
1147            owner: None,
1148        };
1149        assert_eq!(po.lag(), 0); // saturating_sub
1150    }
1151
1152    #[test]
1153    fn test_eviction_clears_partition_ownership() {
1154        let mut g = make_group(4, AssignmentStrategy::Range);
1155        g.join("c1", 0).ok();
1156        g.rebalance(0).ok();
1157        // Verify c1 owns partitions
1158        assert!(g.get_offset(0).and_then(|o| o.owner.as_deref()).is_some());
1159        // Evict via session timeout
1160        g.check_heartbeats(11_000);
1161        // Ownership should be cleared
1162        for p in 0..4 {
1163            assert_eq!(g.get_offset(p).and_then(|o| o.owner.as_deref()), None);
1164        }
1165    }
1166
1167    #[test]
1168    fn test_coordinator_none_after_all_leave() {
1169        let mut g = make_group(4, AssignmentStrategy::Range);
1170        g.join("c1", 0).ok();
1171        g.leave("c1").ok();
1172        assert_eq!(g.coordinator(), None);
1173    }
1174
1175    #[test]
1176    fn test_roundrobin_three_consumers_five_partitions() {
1177        let mut g = make_group(5, AssignmentStrategy::RoundRobin);
1178        g.join("c1", 0).ok();
1179        g.join("c2", 0).ok();
1180        g.join("c3", 0).ok();
1181        let r = g.rebalance(0).expect("rebalance result");
1182        let c1 = r.assignments.get("c1").map(|v| v.len()).unwrap_or(0);
1183        let c2 = r.assignments.get("c2").map(|v| v.len()).unwrap_or(0);
1184        let c3 = r.assignments.get("c3").map(|v| v.len()).unwrap_or(0);
1185        assert_eq!(c1 + c2 + c3, 5);
1186        // At most 1 difference between min and max
1187        let max_val = c1.max(c2).max(c3);
1188        let min_val = c1.min(c2).min(c3);
1189        assert!(max_val - min_val <= 1);
1190    }
1191
1192    #[test]
1193    fn test_rebalance_history_max_limit() {
1194        let mut g = ConsumerGroup::new("test", 4, AssignmentStrategy::Range, 3000, 10_000);
1195        g.join("c1", 0).ok();
1196        // Set small history limit
1197        g.max_history = 3;
1198        for i in 0..10 {
1199            g.rebalance(i * 100).ok();
1200        }
1201        assert!(g.rebalance_history().len() <= 3);
1202    }
1203
1204    #[test]
1205    fn test_multiple_heartbeats() {
1206        let mut g = make_group(4, AssignmentStrategy::Range);
1207        g.join("c1", 0).ok();
1208        for t in (1000..5000).step_by(500) {
1209            assert!(g.heartbeat("c1", t).is_ok());
1210        }
1211        assert_eq!(g.get_member("c1").map(|m| m.last_heartbeat_ms), Some(4500));
1212    }
1213
1214    #[test]
1215    fn test_group_id_accessor() {
1216        let g = make_group(4, AssignmentStrategy::Range);
1217        assert_eq!(g.group_id(), "test-group");
1218    }
1219
1220    #[test]
1221    fn test_partition_count_accessor() {
1222        let g = make_group(8, AssignmentStrategy::Range);
1223        assert_eq!(g.partition_count(), 8);
1224    }
1225
1226    #[test]
1227    fn test_group_error_display() {
1228        let e = GroupError::DuplicateConsumer("c1".to_string());
1229        assert!(format!("{e}").contains("c1"));
1230
1231        let e = GroupError::ConsumerNotFound("c2".to_string());
1232        assert!(format!("{e}").contains("c2"));
1233
1234        let e = GroupError::InvalidPartition(99);
1235        assert!(format!("{e}").contains("99"));
1236
1237        let e = GroupError::NoActiveConsumers;
1238        assert!(!format!("{e}").is_empty());
1239
1240        let e = GroupError::NoCoordinator;
1241        assert!(!format!("{e}").is_empty());
1242    }
1243}