mod acknowledgements;
mod commit;
mod coordinator;
mod fetch;
mod types;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use tracing::{debug, instrument};
use uuid::Uuid;
use crate::Result;
use crate::config::ConsumerConfig;
use crate::metadata::MetadataCache;
use crate::network::BrokerConnection;
use crate::types::{TopicPartition, TopicPartitionKey};
use self::acknowledgements::Acknowledgements;
pub use self::types::{
AcknowledgeType, AcknowledgementCommitCallback, ShareAcknowledgementCommit, ShareAcquireMode,
ShareConsumerOptions, ShareRecord, ShareRecords,
};
use self::types::{ShareAssignment, TopicIdPartitionKey};
const SHARE_MEMBER_JOIN_EPOCH: i32 = 0;
const SHARE_MEMBER_LEAVE_EPOCH: i32 = -1;
const SHARE_SESSION_OPEN_EPOCH: i32 = 0;
const SHARE_SESSION_CLOSE_EPOCH: i32 = -1;
const SHARE_COORDINATOR_RETRY_ATTEMPTS: usize = 120;
pub struct KafkaShareConsumer {
config: ConsumerConfig,
metadata: MetadataCache,
coordinator: Option<BrokerConnection>,
leader_connections: HashMap<i32, BrokerConnection>,
subscriptions: Vec<String>,
assignments: HashMap<TopicPartitionKey, ShareAssignment>,
member_id: String,
member_epoch: i32,
heartbeat_interval: Duration,
share_session_epochs: HashMap<i32, i32>,
pending_acks: HashMap<TopicIdPartitionKey, Acknowledgements>,
share_acquire_mode: ShareAcquireMode,
max_poll_records: i32,
next_record_limit_leader_index: usize,
acknowledgement_commit_callback: Option<AcknowledgementCommitCallback>,
}
impl KafkaShareConsumer {
#[instrument(
name = "share_consumer.connect",
level = "debug",
skip(config),
fields(
bootstrap_server_count = config.bootstrap_servers.len(),
client_id = %config.client_id,
group_id = %config.group_id
)
)]
pub async fn connect(config: ConsumerConfig) -> Result<Self> {
Self::connect_with_options(config, ShareConsumerOptions::default()).await
}
pub async fn connect_with_options(
config: ConsumerConfig,
options: ShareConsumerOptions,
) -> Result<Self> {
let mut consumer = Self {
config,
metadata: MetadataCache::default(),
coordinator: None,
leader_connections: HashMap::new(),
subscriptions: Vec::new(),
assignments: HashMap::new(),
member_id: Uuid::new_v4().to_string(),
member_epoch: SHARE_MEMBER_JOIN_EPOCH,
heartbeat_interval: Duration::from_secs(5),
share_session_epochs: HashMap::new(),
pending_acks: HashMap::new(),
share_acquire_mode: options.share_acquire_mode,
max_poll_records: options.max_poll_records.max(1),
next_record_limit_leader_index: 0,
acknowledgement_commit_callback: options.acknowledgement_commit_callback,
};
consumer.ensure_coordinator_with_retries().await?;
debug!("share consumer connected");
Ok(consumer)
}
pub fn with_share_acquire_mode(mut self, share_acquire_mode: ShareAcquireMode) -> Self {
self.share_acquire_mode = share_acquire_mode;
self
}
pub fn share_acquire_mode(&self) -> ShareAcquireMode {
self.share_acquire_mode
}
pub fn set_acknowledgement_commit_callback<F>(&mut self, callback: F)
where
F: Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static,
{
self.acknowledgement_commit_callback = Some(Arc::new(callback));
}
pub async fn subscribe(&mut self, topics: Vec<String>) -> Result<()> {
self.subscriptions = topics;
self.refresh_metadata_if_needed().await?;
self.heartbeat_with_retries(true).await?;
Ok(())
}
pub fn subscription(&self) -> &[String] {
&self.subscriptions
}
pub fn member_id(&self) -> &str {
&self.member_id
}
pub fn member_epoch(&self) -> i32 {
self.member_epoch
}
pub fn assignment(&self) -> Vec<TopicPartition> {
let mut assignment = self
.assignments
.values()
.map(|partition| TopicPartition::new(partition.topic.clone(), partition.partition))
.collect::<Vec<_>>();
assignment.sort_by(|left, right| {
left.topic
.cmp(&right.topic)
.then(left.partition.cmp(&right.partition))
});
assignment
}
pub async fn poll(&mut self) -> Result<ShareRecords> {
self.poll_for(self.config.fetch_max_wait).await
}
pub async fn poll_for(&mut self, timeout: Duration) -> Result<ShareRecords> {
self.refresh_metadata_if_needed().await?;
self.heartbeat_with_retries(false).await?;
let mut records = Vec::new();
let mut assignments_by_leader =
self.assignments_by_leader().into_iter().collect::<Vec<_>>();
assignments_by_leader.sort_by_key(|(leader_id, _)| *leader_id);
match self.share_acquire_mode {
ShareAcquireMode::BatchOptimized => {
for (leader_id, assignments) in assignments_by_leader {
records.extend(
self.fetch_from_leader(leader_id, assignments, timeout, true)
.await?,
);
}
}
ShareAcquireMode::RecordLimit => {
if assignments_by_leader.is_empty() {
return Ok(ShareRecords::new(records));
}
let fetch_index = self.next_record_limit_leader_index % assignments_by_leader.len();
self.next_record_limit_leader_index =
(self.next_record_limit_leader_index + 1) % assignments_by_leader.len();
for (index, (leader_id, assignments)) in
assignments_by_leader.into_iter().enumerate()
{
let fetch_records = index == fetch_index;
if fetch_records || self.has_acks_for_assignments(&assignments) {
records.extend(
self.fetch_from_leader(leader_id, assignments, timeout, fetch_records)
.await?,
);
}
}
}
}
Ok(ShareRecords::new(records))
}
pub fn acknowledge(&mut self, record: &ShareRecord, acknowledge_type: AcknowledgeType) {
let Some(topic_id) = self.metadata.topic_id(&record.record.topic) else {
return;
};
let key = TopicIdPartitionKey {
topic_id,
topic: record.record.topic.clone(),
partition: record.record.partition,
};
self.pending_acks
.entry(key)
.or_default()
.add(record.record.offset, acknowledge_type);
}
pub async fn commit_sync(&mut self) -> Result<()> {
let acknowledgements = std::mem::take(&mut self.pending_acks);
if acknowledgements.is_empty() {
return Ok(());
}
let mut grouped = HashMap::<i32, HashMap<TopicIdPartitionKey, Acknowledgements>>::new();
for (key, acks) in acknowledgements {
if acks.is_empty() {
continue;
}
let Some(leader_id) = self.metadata.leader_for(&key.topic, key.partition) else {
self.pending_acks.insert(key, acks);
for (_, remaining_acks) in grouped {
self.restore_acknowledgements(remaining_acks);
}
return Err(anyhow!(
"cannot acknowledge share record without a current partition leader"
)
.into());
};
grouped.entry(leader_id).or_default().insert(key, acks);
}
let mut groups = grouped.into_iter().collect::<Vec<_>>();
while let Some((leader_id, acks)) = groups.pop() {
if let Err(error) = self
.send_acknowledge_to_leader(leader_id, acks, false)
.await
{
for (_, remaining_acks) in groups {
self.restore_acknowledgements(remaining_acks);
}
return Err(error.into());
}
}
Ok(())
}
pub async fn shutdown(mut self) -> Result<()> {
let _ = self.commit_sync().await;
let leader_ids = self
.share_session_epochs
.keys()
.copied()
.collect::<Vec<_>>();
for leader_id in leader_ids {
let _ = self
.send_acknowledge_to_leader(leader_id, HashMap::new(), true)
.await;
}
let _ = self.leave_group().await;
Ok(())
}
}