pub mod coordinator;
#[cfg(test)]
mod tests;
use crate::protocol::{PartitionId, TopicName};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::SystemTime;
pub use coordinator::{ConsumerGroupCoordinator, ConsumerGroupManager};
pub type ConsumerGroupId = String;
pub type ConsumerId = String;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ConsumerGroupState {
PreparingRebalance,
CompletingRebalance,
Stable,
Empty,
Dead,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerGroupMember {
pub consumer_id: ConsumerId,
pub group_id: ConsumerGroupId,
pub client_id: String,
pub client_host: String,
pub session_timeout_ms: u64,
pub rebalance_timeout_ms: u64,
pub subscribed_topics: Vec<TopicName>,
pub assigned_partitions: Vec<TopicPartition>,
pub last_heartbeat: SystemTime,
pub is_leader: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct TopicPartition {
pub topic: TopicName,
pub partition: PartitionId,
}
impl TopicPartition {
pub fn new(topic: impl Into<String>, partition: PartitionId) -> Self {
Self {
topic: topic.into(),
partition,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerGroupMetadata {
pub group_id: ConsumerGroupId,
pub state: ConsumerGroupState,
pub protocol_type: String,
pub protocol_name: String,
pub leader_id: Option<ConsumerId>,
pub members: HashMap<ConsumerId, ConsumerGroupMember>,
pub generation_id: i32,
pub created_at: SystemTime,
pub state_timestamp: SystemTime,
}
#[derive(Debug, Clone, PartialEq)]
pub enum AssignmentStrategy {
RoundRobin,
Range,
Sticky,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerOffset {
pub group_id: ConsumerGroupId,
pub topic: TopicName,
pub partition: PartitionId,
pub offset: i64,
pub metadata: Option<String>,
pub commit_timestamp: SystemTime,
pub expire_timestamp: Option<SystemTime>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OffsetCommitRequest {
pub group_id: ConsumerGroupId,
pub consumer_id: ConsumerId,
pub generation_id: i32,
pub retention_time_ms: i64,
pub offsets: Vec<TopicPartitionOffset>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OffsetFetchRequest {
pub group_id: ConsumerGroupId,
pub topic_partitions: Option<Vec<TopicPartition>>, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicPartitionOffset {
pub topic: TopicName,
pub partition: PartitionId,
pub offset: i64,
pub metadata: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ConsumerGroupMessage {
JoinGroup {
group_id: ConsumerGroupId,
consumer_id: ConsumerId,
client_id: String,
client_host: String,
session_timeout_ms: u64,
rebalance_timeout_ms: u64,
protocol_type: String,
group_protocols: Vec<GroupProtocol>,
},
JoinGroupResponse {
error_code: i16,
generation_id: i32,
group_protocol: String,
leader_id: ConsumerId,
consumer_id: ConsumerId,
members: Vec<ConsumerGroupMember>,
},
SyncGroup {
group_id: ConsumerGroupId,
consumer_id: ConsumerId,
generation_id: i32,
group_assignments: HashMap<ConsumerId, Vec<TopicPartition>>,
},
SyncGroupResponse {
error_code: i16,
assignment: Vec<TopicPartition>,
},
Heartbeat {
group_id: ConsumerGroupId,
consumer_id: ConsumerId,
generation_id: i32,
},
HeartbeatResponse { error_code: i16 },
LeaveGroup {
group_id: ConsumerGroupId,
consumer_id: ConsumerId,
},
LeaveGroupResponse { error_code: i16 },
ListGroups,
ListGroupsResponse {
error_code: i16,
groups: Vec<GroupOverview>,
},
DescribeGroups { group_ids: Vec<ConsumerGroupId> },
DescribeGroupsResponse {
groups: Vec<ConsumerGroupDescription>,
},
OffsetCommit {
group_id: ConsumerGroupId,
consumer_id: ConsumerId,
generation_id: i32,
retention_time_ms: i64,
offsets: Vec<TopicPartitionOffset>,
},
OffsetCommitResponse {
error_code: i16,
topic_partition_errors: Vec<TopicPartitionError>,
},
OffsetFetch {
group_id: ConsumerGroupId,
topic_partitions: Option<Vec<TopicPartition>>,
},
OffsetFetchResponse {
error_code: i16,
offsets: Vec<TopicPartitionOffsetResult>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupProtocol {
pub name: String,
pub metadata: Vec<u8>, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupOverview {
pub group_id: ConsumerGroupId,
pub protocol_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerGroupDescription {
pub error_code: i16,
pub group_id: ConsumerGroupId,
pub state: ConsumerGroupState,
pub protocol_type: String,
pub protocol_data: String,
pub members: Vec<MemberDescription>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemberDescription {
pub consumer_id: ConsumerId,
pub client_id: String,
pub client_host: String,
pub member_metadata: Vec<u8>,
pub member_assignment: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicPartitionError {
pub topic: TopicName,
pub partition: PartitionId,
pub error_code: i16,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicPartitionOffsetResult {
pub topic: TopicName,
pub partition: PartitionId,
pub offset: i64,
pub leader_epoch: i32,
pub metadata: Option<String>,
pub error_code: i16,
}
#[derive(Debug, Clone)]
pub struct GroupStats {
pub group_id: ConsumerGroupId,
pub state: ConsumerGroupState,
pub member_count: usize,
pub generation_id: i32,
pub total_assigned_partitions: usize,
pub leader_id: Option<ConsumerId>,
pub assignment_strategy: AssignmentStrategy,
pub created_at: SystemTime,
pub last_state_change: SystemTime,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroupConfig {
pub default_session_timeout_ms: u64,
pub default_rebalance_timeout_ms: u64,
pub min_session_timeout_ms: u64,
pub max_session_timeout_ms: u64,
pub consumer_expiration_check_interval_ms: u64,
pub default_assignment_strategy: AssignmentStrategy,
pub group_metadata_retention_ms: u64,
}
impl Default for ConsumerGroupConfig {
fn default() -> Self {
Self {
default_session_timeout_ms: 10000, default_rebalance_timeout_ms: 60000, min_session_timeout_ms: 6000, max_session_timeout_ms: 1800000, consumer_expiration_check_interval_ms: 3000, default_assignment_strategy: AssignmentStrategy::RoundRobin,
group_metadata_retention_ms: 86400000, }
}
}
pub struct PartitionAssignor {
strategy: AssignmentStrategy,
}
impl PartitionAssignor {
pub fn new(strategy: AssignmentStrategy) -> Self {
Self { strategy }
}
pub fn assign(
&self,
consumers: &[ConsumerId],
partitions: &[TopicPartition],
) -> HashMap<ConsumerId, Vec<TopicPartition>> {
if consumers.is_empty() || partitions.is_empty() {
return HashMap::new();
}
match self.strategy {
AssignmentStrategy::RoundRobin => self.round_robin_assign(consumers, partitions),
AssignmentStrategy::Range => self.range_assign(consumers, partitions),
AssignmentStrategy::Sticky => self.sticky_assign(consumers, partitions),
}
}
fn round_robin_assign(
&self,
consumers: &[ConsumerId],
partitions: &[TopicPartition],
) -> HashMap<ConsumerId, Vec<TopicPartition>> {
let mut assignments = HashMap::new();
for consumer in consumers {
assignments.insert(consumer.clone(), Vec::new());
}
for (index, partition) in partitions.iter().enumerate() {
let consumer_index = index % consumers.len();
let consumer = &consumers[consumer_index];
assignments
.get_mut(consumer)
.unwrap()
.push(partition.clone());
}
assignments
}
fn range_assign(
&self,
consumers: &[ConsumerId],
partitions: &[TopicPartition],
) -> HashMap<ConsumerId, Vec<TopicPartition>> {
let mut assignments = HashMap::new();
let mut topic_partitions: HashMap<String, Vec<TopicPartition>> = HashMap::new();
for partition in partitions {
topic_partitions
.entry(partition.topic.clone())
.or_insert_with(Vec::new)
.push(partition.clone());
}
for consumer in consumers {
assignments.insert(consumer.clone(), Vec::new());
}
for (_topic, mut topic_parts) in topic_partitions {
topic_parts.sort_by_key(|tp| tp.partition);
let partitions_per_consumer = topic_parts.len() / consumers.len();
let remainder = topic_parts.len() % consumers.len();
let mut partition_index = 0;
for (consumer_index, consumer) in consumers.iter().enumerate() {
let extra = if consumer_index < remainder { 1 } else { 0 };
let count = partitions_per_consumer + extra;
for _ in 0..count {
if partition_index < topic_parts.len() {
assignments
.get_mut(consumer)
.unwrap()
.push(topic_parts[partition_index].clone());
partition_index += 1;
}
}
}
}
assignments
}
fn sticky_assign(
&self,
consumers: &[ConsumerId],
partitions: &[TopicPartition],
) -> HashMap<ConsumerId, Vec<TopicPartition>> {
let mut assignments: HashMap<ConsumerId, Vec<TopicPartition>> = HashMap::new();
for consumer in consumers {
assignments.insert(consumer.clone(), Vec::new());
}
if partitions.is_empty() {
return assignments;
}
let mut topic_partitions: HashMap<String, Vec<TopicPartition>> = HashMap::new();
for partition in partitions {
topic_partitions
.entry(partition.topic.clone())
.or_insert_with(Vec::new)
.push(partition.clone());
}
for (_topic, mut topic_parts) in topic_partitions {
topic_parts.sort_by_key(|tp| tp.partition);
for partition in topic_parts {
let target_consumer = consumers
.iter()
.min_by_key(|consumer| assignments.get(*consumer).unwrap().len())
.unwrap();
assignments
.get_mut(target_consumer)
.unwrap()
.push(partition);
}
}
assignments
}
}
pub mod error_codes {
pub const NONE: i16 = 0;
pub const UNKNOWN_CONSUMER_ID: i16 = 25;
pub const CONSUMER_COORDINATOR_NOT_AVAILABLE: i16 = 15;
pub const NOT_COORDINATOR: i16 = 16;
pub const ILLEGAL_GENERATION: i16 = 22;
pub const INCONSISTENT_GROUP_PROTOCOL: i16 = 23;
pub const INVALID_GROUP_ID: i16 = 24;
pub const UNKNOWN_GROUP_ID: i16 = 25;
pub const INVALID_SESSION_TIMEOUT: i16 = 26;
pub const REBALANCE_IN_PROGRESS: i16 = 27;
pub const INVALID_COMMIT_OFFSET_SIZE: i16 = 28;
pub const TOPIC_AUTHORIZATION_FAILED: i16 = 29;
pub const GROUP_AUTHORIZATION_FAILED: i16 = 30;
}