Skip to main content

rivven_core/
consumer_group.rs

1//! Consumer Group Coordination
2//!
3//! Kafka-compatible consumer groups with automatic partition rebalancing.
4//!
5//! ## Features
6//!
7//! - Multiple assignment strategies (range, round-robin, sticky)
8//! - Automatic rebalancing on member join/leave
9//! - Offset commit/fetch with Raft-based durability
10//! - Heartbeat-based failure detection
11//! - Generation-based protocol (prevents split-brain)
12//! - **Static Membership (KIP-345)**: Stable consumer identity for K8s
13//! - **Cooperative Rebalancing (KIP-429)**: Incremental partition reassignment
14//!
15//! ## Static Membership (KIP-345)
16//!
17//! Static membership allows consumers to maintain their identity across restarts,
18//! preventing unnecessary rebalances in Kubernetes environments.
19//!
20//! ```text
21//! Without static membership:
22//!   Pod restart → new member_id → REBALANCE (stop-the-world)
23//!
24//! With static membership:
25//!   Pod restart → same group.instance.id → REJOIN (keep assignments)
26//! ```
27//!
28//! ### Configuration
29//!
30//! Set `group.instance.id` on the consumer to enable static membership:
31//! - Instance ID must be unique within the group
32//! - On rejoin, previous member is fenced and new member inherits assignments
33//! - Session timeout still applies for failure detection
34//!
35//! ## Cooperative Rebalancing (KIP-429)
36//!
37//! Cooperative rebalancing eliminates "stop-the-world" rebalances by using
38//! incremental partition reassignment:
39//!
40//! ```text
41//! Eager (traditional):
42//!   All consumers STOP → Reassign ALL → All consumers RESTART
43//!   (service interruption during entire rebalance)
44//!
45//! Cooperative (KIP-429):
46//!   1st rebalance: Revoke ONLY partitions that need moving
47//!   2nd rebalance: Assign revoked partitions to new owners
48//!   (consumers keep processing non-moving partitions)
49//! ```
50//!
51//! ### Rebalance Protocol Selection
52//!
53//! - **Eager**: All partitions revoked immediately (Kafka default pre-2.4)
54//! - **Cooperative**: Only changed partitions revoked (requires all members support)
55//! - The group uses the highest common protocol (cooperative if all support it)
56//!
57//! ## Protocol
58//!
59//! 1. **JoinGroup**: Consumer joins group, triggers rebalance
60//! 2. **SyncGroup**: Leader assigns partitions, followers receive assignments
61//! 3. **Heartbeat**: Keep-alive messages, detect failures
62//! 4. **LeaveGroup**: Graceful departure, triggers rebalance
63//! 5. **OffsetCommit**: Persist consumer offsets
64//! 6. **OffsetFetch**: Retrieve committed offsets
65//!
66//! ## State Machine
67//!
68//! ```text
69//! Empty → PreparingRebalance → CompletingRebalance → Stable
70//!   ↑                                                   │
71//!   └───────────────────────────────────────────────────┘
72//!           (member leave/timeout/join)
73//! ```
74
75use serde::{Deserialize, Serialize};
76use std::collections::{HashMap, HashSet};
77use std::time::{Duration, SystemTime};
78
79/// Unique consumer group identifier
80pub type GroupId = String;
81
82/// Unique consumer member identifier (generated by coordinator)
83pub type MemberId = String;
84
85/// Static group instance identifier (KIP-345)
86///
87/// When set, this provides a stable identity for the consumer across restarts.
88/// The coordinator maps `group_instance_id` → `member_id` to recognize returning members.
89pub type GroupInstanceId = String;
90
91/// Consumer group state
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub enum GroupState {
94    /// No members, awaiting first join
95    Empty,
96
97    /// Rebalance triggered, waiting for all members to rejoin
98    PreparingRebalance,
99
100    /// Leader computing assignments, followers waiting
101    CompletingRebalance,
102
103    /// All members have assignments, operating normally
104    Stable,
105
106    /// Group marked for deletion (all members left)
107    Dead,
108}
109
110/// Partition assignment strategy
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
112pub enum AssignmentStrategy {
113    /// Range: Assign contiguous partition ranges (default)
114    #[default]
115    Range,
116
117    /// RoundRobin: Distribute partitions evenly in round-robin
118    RoundRobin,
119
120    /// Sticky: Minimize partition movement during rebalance
121    Sticky,
122}
123
124/// Rebalance protocol (KIP-429)
125///
126/// Determines how partition reassignment is handled during rebalance.
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
128pub enum RebalanceProtocol {
129    /// Eager: All partitions revoked during rebalance (stop-the-world)
130    /// This is the traditional Kafka behavior before version 2.4
131    #[default]
132    Eager,
133
134    /// Cooperative: Incremental rebalance, only moved partitions are revoked
135    /// Requires all group members to support cooperative protocol
136    Cooperative,
137}
138
139impl RebalanceProtocol {
140    /// Select the highest common protocol supported by all members
141    /// Returns Cooperative only if ALL members support it
142    pub fn select_common(protocols: &[Self]) -> Self {
143        if protocols.is_empty() {
144            return RebalanceProtocol::Eager;
145        }
146
147        // Cooperative requires all members to support it
148        if protocols
149            .iter()
150            .all(|p| *p == RebalanceProtocol::Cooperative)
151        {
152            RebalanceProtocol::Cooperative
153        } else {
154            RebalanceProtocol::Eager
155        }
156    }
157}
158
159/// Consumer group member
160#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
161pub struct GroupMember {
162    /// Unique member ID (generated by coordinator)
163    pub member_id: MemberId,
164
165    /// Static instance ID (KIP-345) - stable identity across restarts
166    /// When set, enables static membership for this consumer
167    pub group_instance_id: Option<GroupInstanceId>,
168
169    /// Client-provided ID (for sticky assignment)
170    pub client_id: String,
171
172    /// Topics subscribed to
173    pub subscriptions: Vec<String>,
174
175    /// Current partition assignment
176    pub assignment: Vec<PartitionAssignment>,
177
178    /// Partitions pending revocation (cooperative protocol only)
179    /// These partitions must be explicitly revoked before being reassigned
180    pub pending_revocation: Vec<PartitionAssignment>,
181
182    /// Last heartbeat timestamp (milliseconds since epoch for serialization)
183    #[serde(
184        serialize_with = "serialize_systemtime",
185        deserialize_with = "deserialize_systemtime"
186    )]
187    pub last_heartbeat: SystemTime,
188
189    /// Member metadata (client version, etc.)
190    pub metadata: Vec<u8>,
191
192    /// Is this a static member (has group_instance_id)
193    pub is_static: bool,
194
195    /// Supported rebalance protocols (KIP-429)
196    /// The group selects the highest common protocol
197    pub supported_protocols: Vec<RebalanceProtocol>,
198}
199
200// Helper functions for SystemTime serialization
201fn serialize_systemtime<S>(time: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
202where
203    S: serde::Serializer,
204{
205    let duration = time
206        .duration_since(SystemTime::UNIX_EPOCH)
207        .map_err(serde::ser::Error::custom)?;
208    serializer.serialize_u128(duration.as_millis())
209}
210
211fn deserialize_systemtime<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
212where
213    D: serde::Deserializer<'de>,
214{
215    let millis = u128::deserialize(deserializer)?;
216    Ok(SystemTime::UNIX_EPOCH + std::time::Duration::from_millis(millis as u64))
217}
218
219/// Partition assignment for a member
220#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
221pub struct PartitionAssignment {
222    pub topic: String,
223    pub partition: u32,
224}
225
226/// Result of a rebalance operation (KIP-429)
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub enum RebalanceResult {
229    /// Rebalance completed immediately (eager protocol or no revocations needed)
230    Complete,
231
232    /// Awaiting partition revocations (cooperative protocol)
233    /// Contains the revocations requested and the pending final assignments
234    AwaitingRevocations {
235        /// Partitions each member needs to revoke
236        revocations: HashMap<MemberId, Vec<PartitionAssignment>>,
237        /// Final assignments to apply after revocations are acknowledged
238        pending_assignments: HashMap<MemberId, Vec<PartitionAssignment>>,
239    },
240}
241
242/// Consumer group metadata
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
244pub struct ConsumerGroup {
245    /// Group identifier
246    pub group_id: GroupId,
247
248    /// Current state
249    pub state: GroupState,
250
251    /// Generation ID (increments on each rebalance)
252    pub generation_id: u32,
253
254    /// Leader member ID (computes assignments)
255    pub leader_id: Option<MemberId>,
256
257    /// Protocol name (for compatibility)
258    pub protocol_name: String,
259
260    /// Assignment strategy
261    pub assignment_strategy: AssignmentStrategy,
262
263    /// Selected rebalance protocol (KIP-429)
264    /// Determined by the highest common protocol all members support
265    pub rebalance_protocol: RebalanceProtocol,
266
267    /// Active members
268    pub members: HashMap<MemberId, GroupMember>,
269
270    /// Static membership mapping: group_instance_id → member_id (KIP-345)
271    /// Used to recognize returning static members without triggering rebalance
272    pub static_members: HashMap<GroupInstanceId, MemberId>,
273
274    /// Pending static members awaiting sync (instance_id → saved assignment)
275    /// Stores assignments for static members that disconnected but haven't timed out
276    pub pending_static_members: HashMap<GroupInstanceId, Vec<PartitionAssignment>>,
277
278    /// Partitions awaiting revocation acknowledgment (cooperative protocol)
279    /// Maps member_id → partitions that need to be revoked before reassignment
280    pub awaiting_revocation: HashMap<MemberId, Vec<PartitionAssignment>>,
281
282    /// Committed offsets (topic → partition → offset)
283    pub offsets: HashMap<String, HashMap<u32, i64>>,
284
285    /// Session timeout (member considered dead if no heartbeat)
286    #[serde(
287        serialize_with = "serialize_duration",
288        deserialize_with = "deserialize_duration"
289    )]
290    pub session_timeout: Duration,
291
292    /// Rebalance timeout (max time for rebalance completion)
293    #[serde(
294        serialize_with = "serialize_duration",
295        deserialize_with = "deserialize_duration"
296    )]
297    pub rebalance_timeout: Duration,
298}
299
300// Helper functions for Duration serialization
301fn serialize_duration<S>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error>
302where
303    S: serde::Serializer,
304{
305    serializer.serialize_u64(duration.as_millis() as u64)
306}
307
308fn deserialize_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
309where
310    D: serde::Deserializer<'de>,
311{
312    let millis = u64::deserialize(deserializer)?;
313    Ok(Duration::from_millis(millis))
314}
315
316impl ConsumerGroup {
317    /// Create a new consumer group
318    pub fn new(group_id: GroupId, session_timeout: Duration, rebalance_timeout: Duration) -> Self {
319        Self {
320            group_id,
321            state: GroupState::Empty,
322            generation_id: 0,
323            leader_id: None,
324            protocol_name: "consumer".to_string(),
325            assignment_strategy: AssignmentStrategy::default(),
326            rebalance_protocol: RebalanceProtocol::Eager,
327            members: HashMap::new(),
328            static_members: HashMap::new(),
329            pending_static_members: HashMap::new(),
330            awaiting_revocation: HashMap::new(),
331            offsets: HashMap::new(),
332            session_timeout,
333            rebalance_timeout,
334        }
335    }
336
337    /// Add a member to the group
338    ///
339    /// For static members (with `group_instance_id`):
340    /// - If the instance ID is known and has a pending assignment, restore it without rebalance
341    /// - If the instance ID is known but has an active member, fence the old member
342    /// - Otherwise, treat as new member and trigger rebalance
343    ///
344    /// For dynamic members (without `group_instance_id`):
345    /// - Always trigger rebalance
346    pub fn add_member(
347        &mut self,
348        member_id: MemberId,
349        client_id: String,
350        subscriptions: Vec<String>,
351        metadata: Vec<u8>,
352    ) {
353        self.add_member_full(
354            member_id,
355            None,
356            client_id,
357            subscriptions,
358            metadata,
359            vec![RebalanceProtocol::Eager],
360        )
361    }
362
363    /// Add a member with optional static instance ID (KIP-345)
364    pub fn add_member_with_instance_id(
365        &mut self,
366        member_id: MemberId,
367        group_instance_id: Option<GroupInstanceId>,
368        client_id: String,
369        subscriptions: Vec<String>,
370        metadata: Vec<u8>,
371    ) {
372        self.add_member_full(
373            member_id,
374            group_instance_id,
375            client_id,
376            subscriptions,
377            metadata,
378            vec![RebalanceProtocol::Eager],
379        )
380    }
381
382    /// Add a member with full configuration including supported protocols (KIP-429)
383    pub fn add_member_full(
384        &mut self,
385        member_id: MemberId,
386        group_instance_id: Option<GroupInstanceId>,
387        client_id: String,
388        subscriptions: Vec<String>,
389        metadata: Vec<u8>,
390        supported_protocols: Vec<RebalanceProtocol>,
391    ) {
392        let is_static = group_instance_id.is_some();
393        let supported_protocols = if supported_protocols.is_empty() {
394            vec![RebalanceProtocol::Eager]
395        } else {
396            supported_protocols
397        };
398
399        // Check if this is a returning static member
400        if let Some(ref instance_id) = group_instance_id {
401            // Check for existing member with same instance ID (fencing)
402            if let Some(old_member_id) = self.static_members.get(instance_id).cloned() {
403                if old_member_id != member_id {
404                    // Fence the old member - remove it without triggering rebalance
405                    self.members.remove(&old_member_id);
406                }
407            }
408
409            // Check for pending assignment (member returning after disconnect)
410            if let Some(saved_assignment) = self.pending_static_members.remove(instance_id) {
411                // Static member rejoining - restore assignment without rebalance
412                let member = GroupMember {
413                    member_id: member_id.clone(),
414                    group_instance_id: Some(instance_id.clone()),
415                    client_id,
416                    subscriptions,
417                    assignment: saved_assignment,
418                    pending_revocation: Vec::new(),
419                    last_heartbeat: SystemTime::now(),
420                    metadata,
421                    is_static: true,
422                    supported_protocols: supported_protocols.clone(),
423                };
424
425                self.members.insert(member_id.clone(), member);
426                self.static_members
427                    .insert(instance_id.clone(), member_id.clone());
428
429                // Update group's selected protocol
430                self.update_rebalance_protocol();
431
432                // First member becomes leader
433                if self.leader_id.is_none() {
434                    self.leader_id = Some(member_id);
435                }
436
437                // No rebalance needed - member restored with previous assignment
438                if self.state == GroupState::Empty {
439                    self.state = GroupState::Stable;
440                }
441                return;
442            }
443
444            // Register static member mapping
445            self.static_members
446                .insert(instance_id.clone(), member_id.clone());
447        }
448
449        // Create new member
450        let member = GroupMember {
451            member_id: member_id.clone(),
452            group_instance_id,
453            client_id,
454            subscriptions,
455            assignment: Vec::new(),
456            pending_revocation: Vec::new(),
457            last_heartbeat: SystemTime::now(),
458            metadata,
459            is_static,
460            supported_protocols,
461        };
462
463        self.members.insert(member_id.clone(), member);
464
465        // Update group's selected protocol
466        self.update_rebalance_protocol();
467
468        // First member becomes leader
469        if self.leader_id.is_none() {
470            self.leader_id = Some(member_id);
471        }
472
473        // Trigger rebalance (but only if not Empty state)
474        if self.state != GroupState::Empty {
475            self.transition_to_preparing_rebalance();
476        } else if self.members.len() == 1 {
477            // First member added - move to PreparingRebalance
478            self.state = GroupState::PreparingRebalance;
479        }
480    }
481
482    /// Check if a static member with the given instance ID exists
483    pub fn has_static_member(&self, instance_id: &GroupInstanceId) -> bool {
484        self.static_members.contains_key(instance_id)
485    }
486
487    /// Get member ID for a static instance ID
488    pub fn get_member_for_instance(&self, instance_id: &GroupInstanceId) -> Option<&MemberId> {
489        self.static_members.get(instance_id)
490    }
491
492    /// Fence a static member (remove and replace with new member)
493    ///
494    /// Called when a new member joins with the same group.instance.id as an existing member.
495    /// This prevents split-brain scenarios.
496    pub fn fence_static_member(&mut self, instance_id: &GroupInstanceId) -> Option<MemberId> {
497        if let Some(old_member_id) = self.static_members.get(instance_id).cloned() {
498            // Save the old member's assignment for potential restoration
499            if let Some(old_member) = self.members.get(&old_member_id) {
500                if !old_member.assignment.is_empty() {
501                    self.pending_static_members
502                        .insert(instance_id.clone(), old_member.assignment.clone());
503                }
504            }
505
506            // Remove the old member
507            self.members.remove(&old_member_id);
508
509            // If leader left, elect new leader
510            if self.leader_id.as_ref() == Some(&old_member_id) {
511                self.leader_id = self.members.keys().next().cloned();
512            }
513
514            Some(old_member_id)
515        } else {
516            None
517        }
518    }
519
520    /// Remove a member from the group
521    ///
522    /// For static members: Saves assignment for potential restoration (no rebalance until timeout)
523    /// For dynamic members: Triggers immediate rebalance
524    pub fn remove_member(&mut self, member_id: &MemberId) -> bool {
525        if let Some(member) = self.members.remove(member_id) {
526            // Handle static member removal differently
527            if member.is_static {
528                if let Some(ref instance_id) = member.group_instance_id {
529                    // Save assignment for potential restoration
530                    if !member.assignment.is_empty() {
531                        self.pending_static_members
532                            .insert(instance_id.clone(), member.assignment);
533                    }
534                    // Keep the static_members mapping until timeout
535                    // This allows the member to rejoin without rebalance
536                }
537            } else {
538                // Dynamic member - trigger rebalance immediately
539                if !self.members.is_empty() {
540                    self.transition_to_preparing_rebalance();
541                }
542            }
543
544            // If leader left, elect new leader
545            if self.leader_id.as_ref() == Some(member_id) {
546                self.leader_id = self.members.keys().next().cloned();
547            }
548
549            // If no members left, transition to Empty
550            if self.members.is_empty() {
551                self.state = GroupState::Empty;
552                self.generation_id = 0;
553                self.leader_id = None;
554                // Clear all static member mappings
555                self.static_members.clear();
556                self.pending_static_members.clear();
557            }
558
559            true
560        } else {
561            false
562        }
563    }
564
565    /// Remove a static member permanently (triggers rebalance)
566    ///
567    /// Called when a static member's session times out or explicitly leaves.
568    pub fn remove_static_member(&mut self, instance_id: &GroupInstanceId) -> bool {
569        if let Some(member_id) = self.static_members.remove(instance_id) {
570            self.pending_static_members.remove(instance_id);
571
572            if self.members.remove(&member_id).is_some() {
573                // If leader left, elect new leader
574                if self.leader_id.as_ref() == Some(&member_id) {
575                    self.leader_id = self.members.keys().next().cloned();
576                }
577
578                // If no members left, transition to Empty
579                if self.members.is_empty() {
580                    self.state = GroupState::Empty;
581                    self.generation_id = 0;
582                    self.leader_id = None;
583                    self.static_members.clear();
584                    self.pending_static_members.clear();
585                } else {
586                    // Trigger rebalance
587                    self.transition_to_preparing_rebalance();
588                }
589
590                return true;
591            }
592        }
593
594        // Also check pending members
595        if self.pending_static_members.remove(instance_id).is_some() {
596            self.static_members.remove(instance_id);
597            // Pending member timeout doesn't trigger rebalance since they weren't active
598            return true;
599        }
600
601        false
602    }
603
604    /// Update member heartbeat
605    pub fn heartbeat(&mut self, member_id: &MemberId) -> Result<(), String> {
606        if let Some(member) = self.members.get_mut(member_id) {
607            member.last_heartbeat = SystemTime::now();
608            Ok(())
609        } else {
610            Err(format!("Unknown member: {}", member_id))
611        }
612    }
613
614    /// Check for timed-out members
615    ///
616    /// For static members: Only removes if instance ID hasn't rejoined within session timeout
617    /// For dynamic members: Removes immediately on timeout
618    pub fn check_timeouts(&mut self) -> Vec<MemberId> {
619        let now = SystemTime::now();
620        let mut timed_out = Vec::new();
621        let mut static_timeouts: Vec<GroupInstanceId> = Vec::new();
622
623        // Check active members
624        for (member_id, member) in &self.members {
625            if let Ok(elapsed) = now.duration_since(member.last_heartbeat) {
626                if elapsed > self.session_timeout {
627                    timed_out.push(member_id.clone());
628
629                    // Track static member timeouts separately
630                    if let Some(ref instance_id) = member.group_instance_id {
631                        static_timeouts.push(instance_id.clone());
632                    }
633                }
634            }
635        }
636
637        // Remove timed-out dynamic members immediately
638        for member_id in &timed_out {
639            let member = self.members.get(member_id);
640            if let Some(m) = member {
641                if !m.is_static {
642                    self.remove_member(member_id);
643                }
644            }
645        }
646
647        // For static members, save assignment but remove from active
648        for instance_id in &static_timeouts {
649            // The remove_static_member call will trigger rebalance
650            self.remove_static_member(instance_id);
651        }
652
653        timed_out
654    }
655
656    /// Check for timed-out pending static members
657    ///
658    /// Pending static members are those that disconnected but haven't timed out yet.
659    /// Their assignments are preserved for potential restoration.
660    pub fn check_pending_static_timeouts(
661        &mut self,
662        _pending_timeout: Duration,
663    ) -> Vec<GroupInstanceId> {
664        // This would typically be called periodically to clean up
665        // pending static members that never rejoined.
666        // For now, we just return an empty vec as we don't track
667        // pending timestamps separately (could be enhanced).
668        Vec::new()
669    }
670
671    // =========================================================================
672    // Cooperative Rebalancing (KIP-429)
673    // =========================================================================
674
675    /// Update the group's selected rebalance protocol based on member support
676    ///
677    /// The group uses cooperative protocol only if ALL members support it.
678    /// Otherwise, falls back to eager protocol.
679    fn update_rebalance_protocol(&mut self) {
680        let protocols: Vec<RebalanceProtocol> = self
681            .members
682            .values()
683            .flat_map(|m| {
684                // Find the highest protocol each member supports
685                if m.supported_protocols
686                    .contains(&RebalanceProtocol::Cooperative)
687                {
688                    Some(RebalanceProtocol::Cooperative)
689                } else {
690                    Some(RebalanceProtocol::Eager)
691                }
692            })
693            .collect();
694
695        self.rebalance_protocol = RebalanceProtocol::select_common(&protocols);
696    }
697
698    /// Check if the group uses cooperative rebalancing
699    pub fn is_cooperative(&self) -> bool {
700        self.rebalance_protocol == RebalanceProtocol::Cooperative
701    }
702
703    /// Compute partitions that need to be revoked for cooperative rebalance
704    ///
705    /// Returns a map of member_id → partitions to revoke.
706    /// Only partitions that are moving to a different consumer are revoked.
707    pub fn compute_revocations(
708        &self,
709        new_assignments: &HashMap<MemberId, Vec<PartitionAssignment>>,
710    ) -> HashMap<MemberId, Vec<PartitionAssignment>> {
711        let mut revocations: HashMap<MemberId, Vec<PartitionAssignment>> = HashMap::new();
712
713        for (member_id, member) in &self.members {
714            let new_assignment = new_assignments.get(member_id);
715
716            // Find partitions that are being taken away
717            let mut to_revoke = Vec::new();
718            for partition in &member.assignment {
719                let still_assigned = new_assignment
720                    .map(|a| a.contains(partition))
721                    .unwrap_or(false);
722
723                if !still_assigned {
724                    to_revoke.push(partition.clone());
725                }
726            }
727
728            if !to_revoke.is_empty() {
729                revocations.insert(member_id.clone(), to_revoke);
730            }
731        }
732
733        revocations
734    }
735
736    /// Start cooperative rebalance phase 1: Request revocations
737    ///
738    /// In cooperative rebalancing, we don't immediately reassign all partitions.
739    /// Instead, we first ask members to revoke partitions that need moving.
740    pub fn request_revocations(
741        &mut self,
742        revocations: HashMap<MemberId, Vec<PartitionAssignment>>,
743    ) {
744        // Store pending revocations
745        self.awaiting_revocation = revocations.clone();
746
747        // Mark partitions as pending revocation on each member
748        for (member_id, partitions) in revocations {
749            if let Some(member) = self.members.get_mut(&member_id) {
750                member.pending_revocation = partitions;
751            }
752        }
753
754        // Transition to waiting for revocation acknowledgments
755        // (reuse CompletingRebalance state for this phase)
756        self.state = GroupState::CompletingRebalance;
757    }
758
759    /// Acknowledge revocation from a member (cooperative protocol)
760    ///
761    /// Called when a member confirms it has stopped processing revoked partitions.
762    /// Returns true if all pending revocations have been acknowledged.
763    pub fn acknowledge_revocation(&mut self, member_id: &MemberId) -> bool {
764        // Remove from awaiting list
765        self.awaiting_revocation.remove(member_id);
766
767        // Clear pending revocation on member
768        if let Some(member) = self.members.get_mut(member_id) {
769            // Remove revoked partitions from assignment
770            let revoked: HashSet<_> = member.pending_revocation.drain(..).collect();
771            member.assignment.retain(|p| !revoked.contains(p));
772        }
773
774        // Check if all revocations are complete
775        self.awaiting_revocation.is_empty()
776    }
777
778    /// Check if there are pending revocations
779    pub fn has_pending_revocations(&self) -> bool {
780        !self.awaiting_revocation.is_empty()
781    }
782
783    /// Get pending revocations for a member
784    pub fn get_pending_revocations(&self, member_id: &MemberId) -> Vec<PartitionAssignment> {
785        self.members
786            .get(member_id)
787            .map(|m| m.pending_revocation.clone())
788            .unwrap_or_default()
789    }
790
791    /// Complete cooperative rebalance with final assignments
792    ///
793    /// Called after all revocations are acknowledged. Assigns partitions
794    /// that were revoked to their new owners.
795    pub fn complete_cooperative_rebalance(
796        &mut self,
797        final_assignments: HashMap<MemberId, Vec<PartitionAssignment>>,
798    ) {
799        // Merge new assignments with existing (non-revoked) assignments
800        for (member_id, new_partitions) in final_assignments {
801            if let Some(member) = self.members.get_mut(&member_id) {
802                // Add new partitions to existing assignment
803                for partition in new_partitions {
804                    if !member.assignment.contains(&partition) {
805                        member.assignment.push(partition);
806                    }
807                }
808            }
809        }
810
811        // Clear revocation state
812        self.awaiting_revocation.clear();
813
814        // Increment generation
815        self.generation_id += 1;
816        self.state = GroupState::Stable;
817    }
818
819    /// Perform a full rebalance (handles both eager and cooperative)
820    ///
821    /// For eager: Immediately assigns all partitions
822    /// For cooperative: Initiates two-phase revocation process
823    pub fn rebalance_with_strategy(
824        &mut self,
825        new_assignments: HashMap<MemberId, Vec<PartitionAssignment>>,
826    ) -> RebalanceResult {
827        match self.rebalance_protocol {
828            RebalanceProtocol::Eager => {
829                // Eager: Immediately replace all assignments
830                self.complete_rebalance(new_assignments);
831                RebalanceResult::Complete
832            }
833            RebalanceProtocol::Cooperative => {
834                // Cooperative: Check if revocations are needed
835                let revocations = self.compute_revocations(&new_assignments);
836
837                if revocations.is_empty() {
838                    // No revocations needed - can complete immediately
839                    self.complete_cooperative_rebalance(new_assignments);
840                    RebalanceResult::Complete
841                } else {
842                    // Need to revoke first
843                    self.request_revocations(revocations.clone());
844                    RebalanceResult::AwaitingRevocations {
845                        revocations,
846                        pending_assignments: new_assignments,
847                    }
848                }
849            }
850        }
851    }
852
853    /// Transition to PreparingRebalance state
854    fn transition_to_preparing_rebalance(&mut self) {
855        if self.state != GroupState::Empty {
856            self.state = GroupState::PreparingRebalance;
857        }
858    }
859
860    /// Complete rebalance with assignments
861    pub fn complete_rebalance(&mut self, assignments: HashMap<MemberId, Vec<PartitionAssignment>>) {
862        // Update member assignments
863        for (member_id, partitions) in assignments {
864            if let Some(member) = self.members.get_mut(&member_id) {
865                member.assignment = partitions;
866            }
867        }
868
869        // Increment generation
870        self.generation_id += 1;
871        self.state = GroupState::Stable;
872    }
873
874    /// Commit offset for a partition
875    pub fn commit_offset(&mut self, topic: &str, partition: u32, offset: i64) {
876        self.offsets
877            .entry(topic.to_string())
878            .or_default()
879            .insert(partition, offset);
880    }
881
882    /// Fetch committed offset for a partition
883    pub fn fetch_offset(&self, topic: &str, partition: u32) -> Option<i64> {
884        self.offsets.get(topic)?.get(&partition).copied()
885    }
886
887    /// Get all partition assignments for the group
888    pub fn all_assignments(&self) -> HashMap<PartitionAssignment, MemberId> {
889        let mut assignments = HashMap::new();
890        for (member_id, member) in &self.members {
891            for partition in &member.assignment {
892                assignments.insert(partition.clone(), member_id.clone());
893            }
894        }
895        assignments
896    }
897}
898
899/// Partition assignment strategies
900pub mod assignment {
901    use super::*;
902
903    /// Range assignment: Assign contiguous partition ranges
904    ///
905    /// Example: 3 consumers, 10 partitions
906    /// - Consumer 0: partitions 0-3
907    /// - Consumer 1: partitions 4-6
908    /// - Consumer 2: partitions 7-9
909    pub fn range_assignment(
910        members: &[MemberId],
911        topic_partitions: &HashMap<String, u32>,
912    ) -> HashMap<MemberId, Vec<PartitionAssignment>> {
913        let mut assignments: HashMap<MemberId, Vec<PartitionAssignment>> = HashMap::new();
914
915        if members.is_empty() {
916            return assignments;
917        }
918
919        for (topic, partition_count) in topic_partitions {
920            let partitions_per_member = partition_count / members.len() as u32;
921            let extra_partitions = partition_count % members.len() as u32;
922
923            let mut current_partition = 0;
924
925            for (idx, member_id) in members.iter().enumerate() {
926                let mut member_partitions = partitions_per_member;
927                if (idx as u32) < extra_partitions {
928                    member_partitions += 1;
929                }
930
931                for _ in 0..member_partitions {
932                    assignments
933                        .entry(member_id.clone())
934                        .or_default()
935                        .push(PartitionAssignment {
936                            topic: topic.clone(),
937                            partition: current_partition,
938                        });
939                    current_partition += 1;
940                }
941            }
942        }
943
944        assignments
945    }
946
947    /// Round-robin assignment: Distribute partitions evenly
948    ///
949    /// Example: 3 consumers, 10 partitions
950    /// - Consumer 0: partitions 0, 3, 6, 9
951    /// - Consumer 1: partitions 1, 4, 7
952    /// - Consumer 2: partitions 2, 5, 8
953    pub fn round_robin_assignment(
954        members: &[MemberId],
955        topic_partitions: &HashMap<String, u32>,
956    ) -> HashMap<MemberId, Vec<PartitionAssignment>> {
957        let mut assignments: HashMap<MemberId, Vec<PartitionAssignment>> = HashMap::new();
958
959        if members.is_empty() {
960            return assignments;
961        }
962
963        let mut member_idx = 0;
964
965        for (topic, partition_count) in topic_partitions {
966            for partition in 0..*partition_count {
967                assignments
968                    .entry(members[member_idx].clone())
969                    .or_default()
970                    .push(PartitionAssignment {
971                        topic: topic.clone(),
972                        partition,
973                    });
974
975                member_idx = (member_idx + 1) % members.len();
976            }
977        }
978
979        assignments
980    }
981
982    /// Sticky assignment: Minimize partition movement during rebalance
983    ///
984    /// Preserves previous assignments as much as possible.
985    pub fn sticky_assignment(
986        members: &[MemberId],
987        topic_partitions: &HashMap<String, u32>,
988        previous_assignments: &HashMap<MemberId, Vec<PartitionAssignment>>,
989    ) -> HashMap<MemberId, Vec<PartitionAssignment>> {
990        let mut assignments: HashMap<MemberId, Vec<PartitionAssignment>> = HashMap::new();
991
992        if members.is_empty() {
993            return assignments;
994        }
995
996        // Collect all partitions
997        let mut all_partitions = Vec::new();
998        for (topic, partition_count) in topic_partitions {
999            for partition in 0..*partition_count {
1000                all_partitions.push(PartitionAssignment {
1001                    topic: topic.clone(),
1002                    partition,
1003                });
1004            }
1005        }
1006
1007        // Track which partitions are assigned
1008        let mut assigned: HashSet<PartitionAssignment> = HashSet::new();
1009
1010        // Step 1: Preserve previous assignments for existing members
1011        for member_id in members {
1012            if let Some(prev_partitions) = previous_assignments.get(member_id) {
1013                let valid_partitions: Vec<_> = prev_partitions
1014                    .iter()
1015                    .filter(|p| all_partitions.contains(p) && !assigned.contains(p))
1016                    .cloned()
1017                    .collect();
1018
1019                for partition in &valid_partitions {
1020                    assigned.insert(partition.clone());
1021                }
1022
1023                assignments.insert(member_id.clone(), valid_partitions);
1024            }
1025        }
1026
1027        // Step 2: Distribute unassigned partitions using round-robin
1028        let unassigned: Vec<_> = all_partitions
1029            .into_iter()
1030            .filter(|p| !assigned.contains(p))
1031            .collect();
1032
1033        let mut member_idx = 0;
1034        for partition in unassigned {
1035            assignments
1036                .entry(members[member_idx].clone())
1037                .or_default()
1038                .push(partition);
1039
1040            member_idx = (member_idx + 1) % members.len();
1041        }
1042
1043        assignments
1044    }
1045}
1046
1047#[cfg(test)]
1048mod tests {
1049    use super::*;
1050
1051    #[test]
1052    fn test_consumer_group_creation() {
1053        let group = ConsumerGroup::new(
1054            "test-group".to_string(),
1055            Duration::from_secs(30),
1056            Duration::from_secs(60),
1057        );
1058
1059        assert_eq!(group.group_id, "test-group");
1060        assert_eq!(group.state, GroupState::Empty);
1061        assert_eq!(group.generation_id, 0);
1062        assert!(group.leader_id.is_none());
1063        assert!(group.members.is_empty());
1064    }
1065
1066    #[test]
1067    fn test_add_first_member_becomes_leader() {
1068        let mut group = ConsumerGroup::new(
1069            "test-group".to_string(),
1070            Duration::from_secs(30),
1071            Duration::from_secs(60),
1072        );
1073
1074        group.add_member(
1075            "member-1".to_string(),
1076            "client-1".to_string(),
1077            vec!["topic-1".to_string()],
1078            vec![],
1079        );
1080
1081        assert_eq!(group.members.len(), 1);
1082        assert_eq!(group.leader_id, Some("member-1".to_string()));
1083        assert_eq!(group.state, GroupState::PreparingRebalance);
1084    }
1085
1086    #[test]
1087    fn test_remove_member_triggers_rebalance() {
1088        let mut group = ConsumerGroup::new(
1089            "test-group".to_string(),
1090            Duration::from_secs(30),
1091            Duration::from_secs(60),
1092        );
1093
1094        group.add_member(
1095            "member-1".to_string(),
1096            "client-1".to_string(),
1097            vec![],
1098            vec![],
1099        );
1100        group.add_member(
1101            "member-2".to_string(),
1102            "client-2".to_string(),
1103            vec![],
1104            vec![],
1105        );
1106
1107        group.state = GroupState::Stable;
1108
1109        group.remove_member(&"member-2".to_string());
1110
1111        assert_eq!(group.members.len(), 1);
1112        assert_eq!(group.state, GroupState::PreparingRebalance);
1113    }
1114
1115    #[test]
1116    fn test_remove_last_member_transitions_to_empty() {
1117        let mut group = ConsumerGroup::new(
1118            "test-group".to_string(),
1119            Duration::from_secs(30),
1120            Duration::from_secs(60),
1121        );
1122
1123        group.add_member(
1124            "member-1".to_string(),
1125            "client-1".to_string(),
1126            vec![],
1127            vec![],
1128        );
1129        group.remove_member(&"member-1".to_string());
1130
1131        assert_eq!(group.state, GroupState::Empty);
1132        assert_eq!(group.generation_id, 0);
1133        assert!(group.leader_id.is_none());
1134    }
1135
1136    #[test]
1137    fn test_offset_commit_and_fetch() {
1138        let mut group = ConsumerGroup::new(
1139            "test-group".to_string(),
1140            Duration::from_secs(30),
1141            Duration::from_secs(60),
1142        );
1143
1144        group.commit_offset("topic-1", 0, 100);
1145        group.commit_offset("topic-1", 1, 200);
1146        group.commit_offset("topic-2", 0, 300);
1147
1148        assert_eq!(group.fetch_offset("topic-1", 0), Some(100));
1149        assert_eq!(group.fetch_offset("topic-1", 1), Some(200));
1150        assert_eq!(group.fetch_offset("topic-2", 0), Some(300));
1151        assert_eq!(group.fetch_offset("topic-1", 2), None);
1152    }
1153
1154    #[test]
1155    fn test_range_assignment() {
1156        let members = vec!["m1".to_string(), "m2".to_string(), "m3".to_string()];
1157        let mut topic_partitions = HashMap::new();
1158        topic_partitions.insert("topic-1".to_string(), 10);
1159
1160        let assignments = assignment::range_assignment(&members, &topic_partitions);
1161
1162        // m1: 0-3 (4 partitions)
1163        // m2: 4-6 (3 partitions)
1164        // m3: 7-9 (3 partitions)
1165        assert_eq!(assignments.get("m1").unwrap().len(), 4);
1166        assert_eq!(assignments.get("m2").unwrap().len(), 3);
1167        assert_eq!(assignments.get("m3").unwrap().len(), 3);
1168
1169        // Verify m1 has partitions 0-3
1170        let m1_partitions: Vec<u32> = assignments
1171            .get("m1")
1172            .unwrap()
1173            .iter()
1174            .map(|p| p.partition)
1175            .collect();
1176        assert_eq!(m1_partitions, vec![0, 1, 2, 3]);
1177    }
1178
1179    #[test]
1180    fn test_round_robin_assignment() {
1181        let members = vec!["m1".to_string(), "m2".to_string(), "m3".to_string()];
1182        let mut topic_partitions = HashMap::new();
1183        topic_partitions.insert("topic-1".to_string(), 10);
1184
1185        let assignments = assignment::round_robin_assignment(&members, &topic_partitions);
1186
1187        // m1: 0, 3, 6, 9 (4 partitions)
1188        // m2: 1, 4, 7 (3 partitions)
1189        // m3: 2, 5, 8 (3 partitions)
1190        assert_eq!(assignments.get("m1").unwrap().len(), 4);
1191        assert_eq!(assignments.get("m2").unwrap().len(), 3);
1192        assert_eq!(assignments.get("m3").unwrap().len(), 3);
1193
1194        let m1_partitions: Vec<u32> = assignments
1195            .get("m1")
1196            .unwrap()
1197            .iter()
1198            .map(|p| p.partition)
1199            .collect();
1200        assert_eq!(m1_partitions, vec![0, 3, 6, 9]);
1201    }
1202
1203    #[test]
1204    fn test_sticky_assignment_preserves_assignments() {
1205        let members = vec!["m1".to_string(), "m2".to_string()];
1206        let mut topic_partitions = HashMap::new();
1207        topic_partitions.insert("topic-1".to_string(), 4);
1208
1209        // Previous assignment: m1=[0,1], m2=[2,3]
1210        let mut previous = HashMap::new();
1211        previous.insert(
1212            "m1".to_string(),
1213            vec![
1214                PartitionAssignment {
1215                    topic: "topic-1".to_string(),
1216                    partition: 0,
1217                },
1218                PartitionAssignment {
1219                    topic: "topic-1".to_string(),
1220                    partition: 1,
1221                },
1222            ],
1223        );
1224        previous.insert(
1225            "m2".to_string(),
1226            vec![
1227                PartitionAssignment {
1228                    topic: "topic-1".to_string(),
1229                    partition: 2,
1230                },
1231                PartitionAssignment {
1232                    topic: "topic-1".to_string(),
1233                    partition: 3,
1234                },
1235            ],
1236        );
1237
1238        let assignments = assignment::sticky_assignment(&members, &topic_partitions, &previous);
1239
1240        // Should preserve previous assignments
1241        assert_eq!(assignments.get("m1").unwrap().len(), 2);
1242        assert_eq!(assignments.get("m2").unwrap().len(), 2);
1243
1244        let m1_partitions: HashSet<u32> = assignments
1245            .get("m1")
1246            .unwrap()
1247            .iter()
1248            .map(|p| p.partition)
1249            .collect();
1250        assert!(m1_partitions.contains(&0));
1251        assert!(m1_partitions.contains(&1));
1252    }
1253
1254    #[test]
1255    fn test_sticky_assignment_redistributes_on_new_member() {
1256        let members = vec!["m1".to_string(), "m2".to_string(), "m3".to_string()];
1257        let mut topic_partitions = HashMap::new();
1258        topic_partitions.insert("topic-1".to_string(), 6);
1259
1260        // Previous: m1=[0,1,2], m2=[3,4,5], m3 is new
1261        let mut previous = HashMap::new();
1262        previous.insert(
1263            "m1".to_string(),
1264            vec![
1265                PartitionAssignment {
1266                    topic: "topic-1".to_string(),
1267                    partition: 0,
1268                },
1269                PartitionAssignment {
1270                    topic: "topic-1".to_string(),
1271                    partition: 1,
1272                },
1273                PartitionAssignment {
1274                    topic: "topic-1".to_string(),
1275                    partition: 2,
1276                },
1277            ],
1278        );
1279        previous.insert(
1280            "m2".to_string(),
1281            vec![
1282                PartitionAssignment {
1283                    topic: "topic-1".to_string(),
1284                    partition: 3,
1285                },
1286                PartitionAssignment {
1287                    topic: "topic-1".to_string(),
1288                    partition: 4,
1289                },
1290                PartitionAssignment {
1291                    topic: "topic-1".to_string(),
1292                    partition: 5,
1293                },
1294            ],
1295        );
1296
1297        let assignments = assignment::sticky_assignment(&members, &topic_partitions, &previous);
1298
1299        // Sticky assignment preserves previous, so m1 keeps 0,1,2 and m2 keeps 3,4,5
1300        // m3 gets none from previous. Then redistribution happens.
1301        // Total: 6 partitions across 3 members
1302        let total_assigned: usize = assignments.values().map(|v| v.len()).sum();
1303        assert_eq!(total_assigned, 6, "All 6 partitions should be assigned");
1304
1305        // m1 and m2 should keep some of their previous assignments
1306        let m1_partitions: HashSet<u32> = assignments
1307            .get("m1")
1308            .unwrap()
1309            .iter()
1310            .map(|p| p.partition)
1311            .collect();
1312        let m2_partitions: HashSet<u32> = assignments
1313            .get("m2")
1314            .unwrap()
1315            .iter()
1316            .map(|p| p.partition)
1317            .collect();
1318
1319        // Should have some overlap with previous
1320        let m1_kept = m1_partitions.iter().filter(|p| **p <= 2).count();
1321        let m2_kept = m2_partitions.iter().filter(|p| **p >= 3).count();
1322
1323        assert!(
1324            m1_kept > 0 || m2_kept > 0,
1325            "Sticky assignment should preserve some assignments"
1326        );
1327    }
1328
1329    // =========================================================================
1330    // Static Membership (KIP-345) Tests
1331    // =========================================================================
1332
1333    #[test]
1334    fn test_static_member_add() {
1335        let mut group = ConsumerGroup::new(
1336            "test-group".to_string(),
1337            Duration::from_secs(30),
1338            Duration::from_secs(60),
1339        );
1340
1341        // Add static member with instance ID
1342        group.add_member_with_instance_id(
1343            "member-1".to_string(),
1344            Some("instance-1".to_string()),
1345            "client-1".to_string(),
1346            vec!["topic-1".to_string()],
1347            vec![],
1348        );
1349
1350        assert_eq!(group.members.len(), 1);
1351        assert!(group.has_static_member(&"instance-1".to_string()));
1352        assert_eq!(
1353            group.get_member_for_instance(&"instance-1".to_string()),
1354            Some(&"member-1".to_string())
1355        );
1356
1357        let member = group.members.get("member-1").unwrap();
1358        assert!(member.is_static);
1359        assert_eq!(member.group_instance_id, Some("instance-1".to_string()));
1360    }
1361
1362    #[test]
1363    fn test_static_member_rejoin_no_rebalance() {
1364        let mut group = ConsumerGroup::new(
1365            "test-group".to_string(),
1366            Duration::from_secs(30),
1367            Duration::from_secs(60),
1368        );
1369
1370        // Add two members
1371        group.add_member_with_instance_id(
1372            "member-1".to_string(),
1373            Some("instance-1".to_string()),
1374            "client-1".to_string(),
1375            vec!["topic-1".to_string()],
1376            vec![],
1377        );
1378        group.add_member(
1379            "member-2".to_string(),
1380            "client-2".to_string(),
1381            vec!["topic-1".to_string()],
1382            vec![],
1383        );
1384
1385        // Complete rebalance with assignments
1386        let mut assignments = HashMap::new();
1387        assignments.insert(
1388            "member-1".to_string(),
1389            vec![PartitionAssignment {
1390                topic: "topic-1".to_string(),
1391                partition: 0,
1392            }],
1393        );
1394        assignments.insert(
1395            "member-2".to_string(),
1396            vec![PartitionAssignment {
1397                topic: "topic-1".to_string(),
1398                partition: 1,
1399            }],
1400        );
1401        group.complete_rebalance(assignments);
1402        assert_eq!(group.state, GroupState::Stable);
1403        let gen_before = group.generation_id;
1404
1405        // Remove static member (simulates disconnect)
1406        group.remove_member(&"member-1".to_string());
1407
1408        // Assignment should be saved for restoration
1409        assert!(group.pending_static_members.contains_key("instance-1"));
1410
1411        // Static member rejoins with same instance ID but new member ID
1412        group.add_member_with_instance_id(
1413            "member-1-new".to_string(),
1414            Some("instance-1".to_string()),
1415            "client-1".to_string(),
1416            vec!["topic-1".to_string()],
1417            vec![],
1418        );
1419
1420        // Should restore assignment without rebalance
1421        let member = group.members.get("member-1-new").unwrap();
1422        assert_eq!(member.assignment.len(), 1);
1423        assert_eq!(member.assignment[0].partition, 0);
1424
1425        // Generation should not have changed (no rebalance)
1426        assert_eq!(group.generation_id, gen_before);
1427
1428        // Pending assignment should be cleared
1429        assert!(!group.pending_static_members.contains_key("instance-1"));
1430    }
1431
1432    #[test]
1433    fn test_static_member_fencing() {
1434        let mut group = ConsumerGroup::new(
1435            "test-group".to_string(),
1436            Duration::from_secs(30),
1437            Duration::from_secs(60),
1438        );
1439
1440        // Add static member
1441        group.add_member_with_instance_id(
1442            "member-1".to_string(),
1443            Some("instance-1".to_string()),
1444            "client-1".to_string(),
1445            vec!["topic-1".to_string()],
1446            vec![],
1447        );
1448
1449        assert!(group.members.contains_key("member-1"));
1450
1451        // New member joins with same instance ID (fencing)
1452        group.add_member_with_instance_id(
1453            "member-1-new".to_string(),
1454            Some("instance-1".to_string()),
1455            "client-1".to_string(),
1456            vec!["topic-1".to_string()],
1457            vec![],
1458        );
1459
1460        // Old member should be fenced (removed)
1461        assert!(!group.members.contains_key("member-1"));
1462        // New member should be active
1463        assert!(group.members.contains_key("member-1-new"));
1464        // Instance mapping should point to new member
1465        assert_eq!(
1466            group.get_member_for_instance(&"instance-1".to_string()),
1467            Some(&"member-1-new".to_string())
1468        );
1469    }
1470
1471    #[test]
1472    fn test_dynamic_member_removal_triggers_rebalance() {
1473        let mut group = ConsumerGroup::new(
1474            "test-group".to_string(),
1475            Duration::from_secs(30),
1476            Duration::from_secs(60),
1477        );
1478
1479        // Add one static and one dynamic member
1480        group.add_member_with_instance_id(
1481            "static-member".to_string(),
1482            Some("instance-1".to_string()),
1483            "client-1".to_string(),
1484            vec!["topic-1".to_string()],
1485            vec![],
1486        );
1487        group.add_member(
1488            "dynamic-member".to_string(),
1489            "client-2".to_string(),
1490            vec!["topic-1".to_string()],
1491            vec![],
1492        );
1493
1494        group.state = GroupState::Stable;
1495
1496        // Remove dynamic member
1497        group.remove_member(&"dynamic-member".to_string());
1498
1499        // Should trigger rebalance for dynamic member removal
1500        assert_eq!(group.state, GroupState::PreparingRebalance);
1501    }
1502
1503    #[test]
1504    fn test_static_member_timeout_triggers_rebalance() {
1505        let mut group = ConsumerGroup::new(
1506            "test-group".to_string(),
1507            Duration::from_millis(10), // Short timeout for testing
1508            Duration::from_secs(60),
1509        );
1510
1511        // Add static member
1512        group.add_member_with_instance_id(
1513            "member-1".to_string(),
1514            Some("instance-1".to_string()),
1515            "client-1".to_string(),
1516            vec!["topic-1".to_string()],
1517            vec![],
1518        );
1519        group.add_member(
1520            "member-2".to_string(),
1521            "client-2".to_string(),
1522            vec!["topic-1".to_string()],
1523            vec![],
1524        );
1525
1526        group.state = GroupState::Stable;
1527
1528        // Simulate timeout by removing static member permanently
1529        group.remove_static_member(&"instance-1".to_string());
1530
1531        // Should trigger rebalance
1532        assert_eq!(group.state, GroupState::PreparingRebalance);
1533        // Static member mapping should be cleared
1534        assert!(!group.has_static_member(&"instance-1".to_string()));
1535    }
1536
1537    #[test]
1538    fn test_mixed_static_and_dynamic_members() {
1539        let mut group = ConsumerGroup::new(
1540            "test-group".to_string(),
1541            Duration::from_secs(30),
1542            Duration::from_secs(60),
1543        );
1544
1545        // Add mix of static and dynamic members
1546        group.add_member_with_instance_id(
1547            "static-1".to_string(),
1548            Some("instance-1".to_string()),
1549            "client-1".to_string(),
1550            vec!["topic-1".to_string()],
1551            vec![],
1552        );
1553        group.add_member_with_instance_id(
1554            "static-2".to_string(),
1555            Some("instance-2".to_string()),
1556            "client-2".to_string(),
1557            vec!["topic-1".to_string()],
1558            vec![],
1559        );
1560        group.add_member(
1561            "dynamic-1".to_string(),
1562            "client-3".to_string(),
1563            vec!["topic-1".to_string()],
1564            vec![],
1565        );
1566
1567        assert_eq!(group.members.len(), 3);
1568        assert_eq!(group.static_members.len(), 2);
1569
1570        // Static members should be identified correctly
1571        let static1 = group.members.get("static-1").unwrap();
1572        let static2 = group.members.get("static-2").unwrap();
1573        let dynamic1 = group.members.get("dynamic-1").unwrap();
1574
1575        assert!(static1.is_static);
1576        assert!(static2.is_static);
1577        assert!(!dynamic1.is_static);
1578    }
1579
1580    #[test]
1581    fn test_all_members_leave_clears_static_mappings() {
1582        let mut group = ConsumerGroup::new(
1583            "test-group".to_string(),
1584            Duration::from_secs(30),
1585            Duration::from_secs(60),
1586        );
1587
1588        group.add_member_with_instance_id(
1589            "member-1".to_string(),
1590            Some("instance-1".to_string()),
1591            "client-1".to_string(),
1592            vec!["topic-1".to_string()],
1593            vec![],
1594        );
1595
1596        group.remove_member(&"member-1".to_string());
1597
1598        // Even though member left, pending assignment is preserved
1599        // But let's also remove the static mapping to test full cleanup
1600        group.remove_static_member(&"instance-1".to_string());
1601
1602        // Group should be empty and all mappings cleared
1603        assert_eq!(group.state, GroupState::Empty);
1604        assert!(group.static_members.is_empty());
1605        assert!(group.pending_static_members.is_empty());
1606    }
1607
1608    // =========================================================================
1609    // Cooperative Rebalancing (KIP-429) Tests
1610    // =========================================================================
1611
1612    #[test]
1613    fn test_rebalance_protocol_selection_all_eager() {
1614        let protocols = vec![RebalanceProtocol::Eager, RebalanceProtocol::Eager];
1615        assert_eq!(
1616            RebalanceProtocol::select_common(&protocols),
1617            RebalanceProtocol::Eager
1618        );
1619    }
1620
1621    #[test]
1622    fn test_rebalance_protocol_selection_all_cooperative() {
1623        let protocols = vec![
1624            RebalanceProtocol::Cooperative,
1625            RebalanceProtocol::Cooperative,
1626        ];
1627        assert_eq!(
1628            RebalanceProtocol::select_common(&protocols),
1629            RebalanceProtocol::Cooperative
1630        );
1631    }
1632
1633    #[test]
1634    fn test_rebalance_protocol_selection_mixed() {
1635        // Mixed protocols should fall back to eager
1636        let protocols = vec![RebalanceProtocol::Cooperative, RebalanceProtocol::Eager];
1637        assert_eq!(
1638            RebalanceProtocol::select_common(&protocols),
1639            RebalanceProtocol::Eager
1640        );
1641    }
1642
1643    #[test]
1644    fn test_cooperative_member_add_updates_protocol() {
1645        let mut group = ConsumerGroup::new(
1646            "test-group".to_string(),
1647            Duration::from_secs(30),
1648            Duration::from_secs(60),
1649        );
1650
1651        // Add member with cooperative support
1652        group.add_member_full(
1653            "member-1".to_string(),
1654            None,
1655            "client-1".to_string(),
1656            vec!["topic-1".to_string()],
1657            vec![],
1658            vec![RebalanceProtocol::Cooperative],
1659        );
1660
1661        // Single cooperative member means cooperative protocol
1662        assert!(group.is_cooperative());
1663
1664        // Add member with only eager support
1665        group.add_member_full(
1666            "member-2".to_string(),
1667            None,
1668            "client-2".to_string(),
1669            vec!["topic-1".to_string()],
1670            vec![],
1671            vec![RebalanceProtocol::Eager],
1672        );
1673
1674        // Mixed members means eager protocol
1675        assert!(!group.is_cooperative());
1676        assert_eq!(group.rebalance_protocol, RebalanceProtocol::Eager);
1677    }
1678
1679    #[test]
1680    fn test_compute_revocations() {
1681        let mut group = ConsumerGroup::new(
1682            "test-group".to_string(),
1683            Duration::from_secs(30),
1684            Duration::from_secs(60),
1685        );
1686
1687        // Add two cooperative members
1688        group.add_member_full(
1689            "member-1".to_string(),
1690            None,
1691            "client-1".to_string(),
1692            vec!["topic-1".to_string()],
1693            vec![],
1694            vec![RebalanceProtocol::Cooperative],
1695        );
1696        group.add_member_full(
1697            "member-2".to_string(),
1698            None,
1699            "client-2".to_string(),
1700            vec!["topic-1".to_string()],
1701            vec![],
1702            vec![RebalanceProtocol::Cooperative],
1703        );
1704
1705        // Assign partitions
1706        let mut initial = HashMap::new();
1707        initial.insert(
1708            "member-1".to_string(),
1709            vec![
1710                PartitionAssignment {
1711                    topic: "topic-1".to_string(),
1712                    partition: 0,
1713                },
1714                PartitionAssignment {
1715                    topic: "topic-1".to_string(),
1716                    partition: 1,
1717                },
1718            ],
1719        );
1720        initial.insert(
1721            "member-2".to_string(),
1722            vec![
1723                PartitionAssignment {
1724                    topic: "topic-1".to_string(),
1725                    partition: 2,
1726                },
1727                PartitionAssignment {
1728                    topic: "topic-1".to_string(),
1729                    partition: 3,
1730                },
1731            ],
1732        );
1733        group.complete_rebalance(initial);
1734
1735        // New assignment moves partition 1 from member-1 to member-2
1736        let mut new_assignment = HashMap::new();
1737        new_assignment.insert(
1738            "member-1".to_string(),
1739            vec![PartitionAssignment {
1740                topic: "topic-1".to_string(),
1741                partition: 0,
1742            }],
1743        );
1744        new_assignment.insert(
1745            "member-2".to_string(),
1746            vec![
1747                PartitionAssignment {
1748                    topic: "topic-1".to_string(),
1749                    partition: 1,
1750                },
1751                PartitionAssignment {
1752                    topic: "topic-1".to_string(),
1753                    partition: 2,
1754                },
1755                PartitionAssignment {
1756                    topic: "topic-1".to_string(),
1757                    partition: 3,
1758                },
1759            ],
1760        );
1761
1762        let revocations = group.compute_revocations(&new_assignment);
1763
1764        // Only member-1 should have revocations (partition 1)
1765        assert!(revocations.contains_key("member-1"));
1766        assert!(!revocations.contains_key("member-2"));
1767
1768        let m1_revoked = revocations.get("member-1").unwrap();
1769        assert_eq!(m1_revoked.len(), 1);
1770        assert_eq!(m1_revoked[0].partition, 1);
1771    }
1772
1773    #[test]
1774    fn test_cooperative_rebalance_two_phase() {
1775        let mut group = ConsumerGroup::new(
1776            "test-group".to_string(),
1777            Duration::from_secs(30),
1778            Duration::from_secs(60),
1779        );
1780
1781        // Add two cooperative members
1782        group.add_member_full(
1783            "member-1".to_string(),
1784            None,
1785            "client-1".to_string(),
1786            vec!["topic-1".to_string()],
1787            vec![],
1788            vec![RebalanceProtocol::Cooperative],
1789        );
1790        group.add_member_full(
1791            "member-2".to_string(),
1792            None,
1793            "client-2".to_string(),
1794            vec!["topic-1".to_string()],
1795            vec![],
1796            vec![RebalanceProtocol::Cooperative],
1797        );
1798
1799        assert!(group.is_cooperative());
1800
1801        // Initial assignment
1802        let mut initial = HashMap::new();
1803        initial.insert(
1804            "member-1".to_string(),
1805            vec![
1806                PartitionAssignment {
1807                    topic: "topic-1".to_string(),
1808                    partition: 0,
1809                },
1810                PartitionAssignment {
1811                    topic: "topic-1".to_string(),
1812                    partition: 1,
1813                },
1814            ],
1815        );
1816        initial.insert("member-2".to_string(), vec![]);
1817        group.complete_rebalance(initial);
1818
1819        let gen_before = group.generation_id;
1820
1821        // New member-2 should get partition 1
1822        let mut new_assignment = HashMap::new();
1823        new_assignment.insert(
1824            "member-1".to_string(),
1825            vec![PartitionAssignment {
1826                topic: "topic-1".to_string(),
1827                partition: 0,
1828            }],
1829        );
1830        new_assignment.insert(
1831            "member-2".to_string(),
1832            vec![PartitionAssignment {
1833                topic: "topic-1".to_string(),
1834                partition: 1,
1835            }],
1836        );
1837
1838        // Phase 1: Request revocations
1839        let result = group.rebalance_with_strategy(new_assignment.clone());
1840
1841        match result {
1842            RebalanceResult::AwaitingRevocations {
1843                revocations,
1844                pending_assignments: _,
1845            } => {
1846                // Should have revocation for member-1
1847                assert!(revocations.contains_key("member-1"));
1848                assert!(group.has_pending_revocations());
1849                assert_eq!(group.state, GroupState::CompletingRebalance);
1850            }
1851            RebalanceResult::Complete => panic!("Expected AwaitingRevocations"),
1852        }
1853
1854        // Member-1 still has both partitions (hasn't acked revocation yet)
1855        let m1 = group.members.get("member-1").unwrap();
1856        assert_eq!(m1.assignment.len(), 2);
1857        assert_eq!(m1.pending_revocation.len(), 1);
1858
1859        // Phase 2: Acknowledge revocation
1860        let all_acked = group.acknowledge_revocation(&"member-1".to_string());
1861        assert!(all_acked);
1862
1863        // After ack, member-1 should only have partition 0
1864        let m1 = group.members.get("member-1").unwrap();
1865        assert_eq!(m1.assignment.len(), 1);
1866        assert_eq!(m1.assignment[0].partition, 0);
1867
1868        // Complete with final assignments
1869        group.complete_cooperative_rebalance(new_assignment);
1870
1871        // Member-2 should now have partition 1
1872        let m2 = group.members.get("member-2").unwrap();
1873        assert_eq!(m2.assignment.len(), 1);
1874        assert_eq!(m2.assignment[0].partition, 1);
1875
1876        // Generation should have incremented
1877        assert_eq!(group.generation_id, gen_before + 1);
1878        assert_eq!(group.state, GroupState::Stable);
1879    }
1880
1881    #[test]
1882    fn test_eager_rebalance_immediate() {
1883        let mut group = ConsumerGroup::new(
1884            "test-group".to_string(),
1885            Duration::from_secs(30),
1886            Duration::from_secs(60),
1887        );
1888
1889        // Add two eager members
1890        group.add_member(
1891            "member-1".to_string(),
1892            "client-1".to_string(),
1893            vec!["topic-1".to_string()],
1894            vec![],
1895        );
1896        group.add_member(
1897            "member-2".to_string(),
1898            "client-2".to_string(),
1899            vec!["topic-1".to_string()],
1900            vec![],
1901        );
1902
1903        assert!(!group.is_cooperative());
1904
1905        // Eager rebalance should complete immediately
1906        let mut new_assignment = HashMap::new();
1907        new_assignment.insert(
1908            "member-1".to_string(),
1909            vec![PartitionAssignment {
1910                topic: "topic-1".to_string(),
1911                partition: 0,
1912            }],
1913        );
1914        new_assignment.insert(
1915            "member-2".to_string(),
1916            vec![PartitionAssignment {
1917                topic: "topic-1".to_string(),
1918                partition: 1,
1919            }],
1920        );
1921
1922        let result = group.rebalance_with_strategy(new_assignment);
1923
1924        assert_eq!(result, RebalanceResult::Complete);
1925        assert_eq!(group.state, GroupState::Stable);
1926        assert!(!group.has_pending_revocations());
1927    }
1928
1929    #[test]
1930    fn test_cooperative_no_revocations_needed() {
1931        let mut group = ConsumerGroup::new(
1932            "test-group".to_string(),
1933            Duration::from_secs(30),
1934            Duration::from_secs(60),
1935        );
1936
1937        // Add cooperative member
1938        group.add_member_full(
1939            "member-1".to_string(),
1940            None,
1941            "client-1".to_string(),
1942            vec!["topic-1".to_string()],
1943            vec![],
1944            vec![RebalanceProtocol::Cooperative],
1945        );
1946
1947        // Assign partition 0
1948        let mut initial = HashMap::new();
1949        initial.insert(
1950            "member-1".to_string(),
1951            vec![PartitionAssignment {
1952                topic: "topic-1".to_string(),
1953                partition: 0,
1954            }],
1955        );
1956        group.complete_rebalance(initial);
1957
1958        // Add partition 1 (no revocation needed, just addition)
1959        let mut new_assignment = HashMap::new();
1960        new_assignment.insert(
1961            "member-1".to_string(),
1962            vec![
1963                PartitionAssignment {
1964                    topic: "topic-1".to_string(),
1965                    partition: 0,
1966                },
1967                PartitionAssignment {
1968                    topic: "topic-1".to_string(),
1969                    partition: 1,
1970                },
1971            ],
1972        );
1973
1974        // Should complete immediately since no partitions need revocation
1975        let result = group.rebalance_with_strategy(new_assignment);
1976
1977        assert_eq!(result, RebalanceResult::Complete);
1978        assert_eq!(group.state, GroupState::Stable);
1979
1980        let m1 = group.members.get("member-1").unwrap();
1981        assert_eq!(m1.assignment.len(), 2);
1982    }
1983}