kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
//! Share group consumer API.
//!
//! Share groups let multiple consumers cooperatively process records from the
//! same partitions. Records are explicitly acknowledged as accepted, released,
//! or rejected.
//!
//! ```no_run
//! # async fn example() -> kafkit_client::Result<()> {
//! use kafkit_client::{AcknowledgeType, ConsumerConfig, KafkaShareConsumer};
//!
//! let mut consumer = KafkaShareConsumer::connect(
//!     ConsumerConfig::new("localhost:9092", "orders-share"),
//! )
//! .await?;
//!
//! consumer.subscribe(vec!["orders".to_owned()]).await?;
//! let records = consumer.poll().await?;
//! for record in records.iter() {
//!     consumer.acknowledge(record, AcknowledgeType::Accept);
//! }
//! consumer.commit_sync().await?;
//! consumer.shutdown().await?;
//! # Ok(())
//! # }
//! ```
//!
mod acknowledgements;
mod commit;
mod coordinator;
mod fetch;
mod types;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use tracing::{debug, instrument};

use crate::Result;
use crate::config::ConsumerConfig;
use crate::consumer::util::kafka_uuid_string;
use crate::metadata::MetadataCache;
use crate::network::BrokerConnection;
use crate::telemetry;
use crate::types::{TopicPartition, TopicPartitionKey};

use self::acknowledgements::Acknowledgements;
pub use self::types::{
    AcknowledgeType, AcknowledgementCommitCallback, ShareAcknowledgementCommit, ShareAcquireMode,
    ShareConsumerOptions, ShareRecord, ShareRecords,
};
use self::types::{ShareAssignment, TopicIdPartitionKey};

const SHARE_MEMBER_JOIN_EPOCH: i32 = 0;
const SHARE_MEMBER_LEAVE_EPOCH: i32 = -1;
const SHARE_SESSION_OPEN_EPOCH: i32 = 0;
const SHARE_SESSION_CLOSE_EPOCH: i32 = -1;
const SHARE_COORDINATOR_RETRY_ATTEMPTS: usize = 120;

/// A Kafka share group consumer.
pub struct KafkaShareConsumer {
    config: ConsumerConfig,
    metadata: MetadataCache,
    coordinator: Option<BrokerConnection>,
    leader_connections: HashMap<i32, BrokerConnection>,
    subscriptions: Vec<String>,
    assignments: HashMap<TopicPartitionKey, ShareAssignment>,
    member_id: String,
    member_epoch: i32,
    heartbeat_interval: Duration,
    share_session_epochs: HashMap<i32, i32>,
    pending_acks: HashMap<TopicIdPartitionKey, Acknowledgements>,
    share_acquire_mode: ShareAcquireMode,
    max_poll_records: i32,
    next_record_limit_leader_index: usize,
    acknowledgement_commit_callback: Option<AcknowledgementCommitCallback>,
}

impl KafkaShareConsumer {
    #[instrument(
        name = "share_consumer.connect",
        level = "debug",
        skip(config),
        fields(
            bootstrap_server_count = config.bootstrap_servers.len(),
            client_id = %config.client_id,
            group_id = %config.group_id
        )
    )]
    /// Connects to Kafka and returns the client.
    pub async fn connect(config: ConsumerConfig) -> Result<Self> {
        Self::connect_with_options(config, ShareConsumerOptions::default()).await
    }

    /// Connect With Options.
    pub async fn connect_with_options(
        config: ConsumerConfig,
        options: ShareConsumerOptions,
    ) -> Result<Self> {
        let mut consumer = Self {
            config,
            metadata: MetadataCache::default(),
            coordinator: None,
            leader_connections: HashMap::new(),
            subscriptions: Vec::new(),
            assignments: HashMap::new(),
            member_id: kafka_uuid_string(),
            member_epoch: SHARE_MEMBER_JOIN_EPOCH,
            heartbeat_interval: Duration::from_secs(5),
            share_session_epochs: HashMap::new(),
            pending_acks: HashMap::new(),
            share_acquire_mode: options.share_acquire_mode,
            max_poll_records: options.max_poll_records.max(1),
            next_record_limit_leader_index: 0,
            acknowledgement_commit_callback: options.acknowledgement_commit_callback,
        };
        consumer.ensure_coordinator_with_retries().await?;
        debug!("share consumer connected");
        Ok(consumer)
    }

    /// Sets share acquire mode and returns the updated value.
    pub fn with_share_acquire_mode(mut self, share_acquire_mode: ShareAcquireMode) -> Self {
        self.share_acquire_mode = share_acquire_mode;
        self
    }

    /// Share Acquire Mode.
    pub fn share_acquire_mode(&self) -> ShareAcquireMode {
        self.share_acquire_mode
    }

    /// Set Acknowledgement Commit Callback.
    pub fn set_acknowledgement_commit_callback<F>(&mut self, callback: F)
    where
        F: Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static,
    {
        self.acknowledgement_commit_callback = Some(Arc::new(callback));
    }

    /// Subscribe.
    pub async fn subscribe(&mut self, topics: Vec<String>) -> Result<()> {
        self.subscriptions = topics;
        self.refresh_metadata_if_needed().await?;
        self.heartbeat_with_retries(true).await?;
        Ok(())
    }

    /// Subscription.
    pub fn subscription(&self) -> &[String] {
        &self.subscriptions
    }

    /// Member Id.
    pub fn member_id(&self) -> &str {
        &self.member_id
    }

    /// Member Epoch.
    pub fn member_epoch(&self) -> i32 {
        self.member_epoch
    }

    /// Assignment.
    pub fn assignment(&self) -> Vec<TopicPartition> {
        let mut assignment = self
            .assignments
            .values()
            .map(|partition| TopicPartition::new(partition.topic.clone(), partition.partition))
            .collect::<Vec<_>>();
        assignment.sort_by(|left, right| {
            left.topic
                .cmp(&right.topic)
                .then(left.partition.cmp(&right.partition))
        });
        assignment
    }

    /// Polls Kafka for records.
    pub async fn poll(&mut self) -> Result<ShareRecords> {
        self.poll_for(self.config.fetch_max_wait).await
    }

    /// Poll For.
    pub async fn poll_for(&mut self, timeout: Duration) -> Result<ShareRecords> {
        let started_at = Instant::now();
        let result = async {
            self.refresh_metadata_if_needed().await?;
            self.heartbeat_with_retries(false).await?;

            let mut records = Vec::new();
            let mut assignments_by_leader =
                self.assignments_by_leader().into_iter().collect::<Vec<_>>();
            assignments_by_leader.sort_by_key(|(leader_id, _)| *leader_id);

            match self.share_acquire_mode {
                ShareAcquireMode::BatchOptimized => {
                    for (leader_id, assignments) in assignments_by_leader {
                        records.extend(
                            self.fetch_from_leader(leader_id, assignments, timeout, true)
                                .await?,
                        );
                    }
                }
                ShareAcquireMode::RecordLimit => {
                    if assignments_by_leader.is_empty() {
                        return Ok(ShareRecords::new(records));
                    }

                    let fetch_index =
                        self.next_record_limit_leader_index % assignments_by_leader.len();
                    self.next_record_limit_leader_index =
                        (self.next_record_limit_leader_index + 1) % assignments_by_leader.len();

                    for (index, (leader_id, assignments)) in
                        assignments_by_leader.into_iter().enumerate()
                    {
                        let fetch_records = index == fetch_index;
                        if fetch_records || self.has_acks_for_assignments(&assignments) {
                            records.extend(
                                self.fetch_from_leader(
                                    leader_id,
                                    assignments,
                                    timeout,
                                    fetch_records,
                                )
                                .await?,
                            );
                        }
                    }
                }
            }
            Ok(ShareRecords::new(records))
        }
        .await;
        let record_count = result.as_ref().map(ShareRecords::len).unwrap_or(0);
        let poll_result = match &result {
            Ok(records) if records.is_empty() => "empty",
            Ok(_) => "records",
            Err(_) => "error",
        };
        telemetry::record_share_poll_completed(
            &self.config.client_id,
            &self.config.group_id,
            started_at.elapsed(),
            record_count,
            poll_result,
        );
        result
    }

    /// Acknowledge.
    pub fn acknowledge(&mut self, record: &ShareRecord, acknowledge_type: AcknowledgeType) {
        let Some(topic_id) = self.metadata.topic_id(&record.record.topic) else {
            return;
        };
        let key = TopicIdPartitionKey {
            topic_id,
            topic: record.record.topic.clone(),
            partition: record.record.partition,
        };
        self.pending_acks
            .entry(key)
            .or_default()
            .add(record.record.offset, acknowledge_type);
        telemetry::record_share_acknowledgement_queued(
            &self.config.client_id,
            &self.config.group_id,
            &record.record.topic,
            record.record.partition,
            acknowledge_type_label(acknowledge_type),
        );
    }

    /// Commit Sync.
    pub async fn commit_sync(&mut self) -> Result<()> {
        let acknowledgements = std::mem::take(&mut self.pending_acks);
        if acknowledgements.is_empty() {
            return Ok(());
        }

        let mut grouped = HashMap::<i32, HashMap<TopicIdPartitionKey, Acknowledgements>>::new();
        for (key, acks) in acknowledgements {
            if acks.is_empty() {
                continue;
            }
            let Some(leader_id) = self.metadata.leader_for(&key.topic, key.partition) else {
                self.pending_acks.insert(key, acks);
                for (_, remaining_acks) in grouped {
                    self.restore_acknowledgements(remaining_acks);
                }
                return Err(anyhow!(
                    "cannot acknowledge share record without a current partition leader"
                )
                .into());
            };
            grouped.entry(leader_id).or_default().insert(key, acks);
        }

        let mut groups = grouped.into_iter().collect::<Vec<_>>();
        while let Some((leader_id, acks)) = groups.pop() {
            if let Err(error) = self
                .send_acknowledge_to_leader(leader_id, acks, false)
                .await
            {
                for (_, remaining_acks) in groups {
                    self.restore_acknowledgements(remaining_acks);
                }
                return Err(error.into());
            }
        }
        Ok(())
    }

    /// Shuts the client down and waits for in-flight work to finish.
    pub async fn shutdown(mut self) -> Result<()> {
        let _ = self.commit_sync().await;
        let leader_ids = self
            .share_session_epochs
            .keys()
            .copied()
            .collect::<Vec<_>>();
        for leader_id in leader_ids {
            let _ = self
                .send_acknowledge_to_leader(leader_id, HashMap::new(), true)
                .await;
        }
        let _ = self.leave_group().await;
        Ok(())
    }
}

fn acknowledge_type_label(acknowledge_type: AcknowledgeType) -> &'static str {
    match acknowledge_type {
        AcknowledgeType::Accept => "accept",
        AcknowledgeType::Release => "release",
        AcknowledgeType::Reject => "reject",
    }
}