kafkit-client 0.1.5

Kafka 4.0+ pure Rust client.
Documentation
//! Share group consumer API.
//!
//! Share groups let multiple consumers cooperatively process records from the
//! same partitions. Records are explicitly acknowledged as accepted, released,
//! or rejected.
//!
//! ```no_run
//! # async fn example() -> kafkit_client::Result<()> {
//! use kafkit_client::{AcknowledgeType, ConsumerConfig, KafkaShareConsumer};
//!
//! let mut consumer = KafkaShareConsumer::connect(
//!     ConsumerConfig::new("localhost:9092", "orders-share"),
//! )
//! .await?;
//!
//! consumer.subscribe(vec!["orders".to_owned()]).await?;
//! let records = consumer.poll().await?;
//! for record in records.iter() {
//!     consumer.acknowledge(record, AcknowledgeType::Accept);
//! }
//! consumer.commit_sync().await?;
//! consumer.shutdown().await?;
//! # Ok(())
//! # }
//! ```
//!
mod acknowledgements;
mod commit;
mod coordinator;
mod fetch;
mod types;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use tracing::{debug, instrument};
use uuid::Uuid;

use crate::Result;
use crate::config::ConsumerConfig;
use crate::metadata::MetadataCache;
use crate::network::BrokerConnection;
use crate::types::{TopicPartition, TopicPartitionKey};

use self::acknowledgements::Acknowledgements;
pub use self::types::{
    AcknowledgeType, AcknowledgementCommitCallback, ShareAcknowledgementCommit, ShareAcquireMode,
    ShareConsumerOptions, ShareRecord, ShareRecords,
};
use self::types::{ShareAssignment, TopicIdPartitionKey};

const SHARE_MEMBER_JOIN_EPOCH: i32 = 0;
const SHARE_MEMBER_LEAVE_EPOCH: i32 = -1;
const SHARE_SESSION_OPEN_EPOCH: i32 = 0;
const SHARE_SESSION_CLOSE_EPOCH: i32 = -1;
const SHARE_COORDINATOR_RETRY_ATTEMPTS: usize = 120;

/// A Kafka share group consumer.
pub struct KafkaShareConsumer {
    config: ConsumerConfig,
    metadata: MetadataCache,
    coordinator: Option<BrokerConnection>,
    leader_connections: HashMap<i32, BrokerConnection>,
    subscriptions: Vec<String>,
    assignments: HashMap<TopicPartitionKey, ShareAssignment>,
    member_id: String,
    member_epoch: i32,
    heartbeat_interval: Duration,
    share_session_epochs: HashMap<i32, i32>,
    pending_acks: HashMap<TopicIdPartitionKey, Acknowledgements>,
    share_acquire_mode: ShareAcquireMode,
    max_poll_records: i32,
    next_record_limit_leader_index: usize,
    acknowledgement_commit_callback: Option<AcknowledgementCommitCallback>,
}

impl KafkaShareConsumer {
    #[instrument(
        name = "share_consumer.connect",
        level = "debug",
        skip(config),
        fields(
            bootstrap_server_count = config.bootstrap_servers.len(),
            client_id = %config.client_id,
            group_id = %config.group_id
        )
    )]
    /// Connects to Kafka and returns the client.
    pub async fn connect(config: ConsumerConfig) -> Result<Self> {
        Self::connect_with_options(config, ShareConsumerOptions::default()).await
    }

    /// Connect With Options.
    pub async fn connect_with_options(
        config: ConsumerConfig,
        options: ShareConsumerOptions,
    ) -> Result<Self> {
        let mut consumer = Self {
            config,
            metadata: MetadataCache::default(),
            coordinator: None,
            leader_connections: HashMap::new(),
            subscriptions: Vec::new(),
            assignments: HashMap::new(),
            member_id: Uuid::new_v4().to_string(),
            member_epoch: SHARE_MEMBER_JOIN_EPOCH,
            heartbeat_interval: Duration::from_secs(5),
            share_session_epochs: HashMap::new(),
            pending_acks: HashMap::new(),
            share_acquire_mode: options.share_acquire_mode,
            max_poll_records: options.max_poll_records.max(1),
            next_record_limit_leader_index: 0,
            acknowledgement_commit_callback: options.acknowledgement_commit_callback,
        };
        consumer.ensure_coordinator_with_retries().await?;
        debug!("share consumer connected");
        Ok(consumer)
    }

    /// Sets share acquire mode and returns the updated value.
    pub fn with_share_acquire_mode(mut self, share_acquire_mode: ShareAcquireMode) -> Self {
        self.share_acquire_mode = share_acquire_mode;
        self
    }

    /// Share Acquire Mode.
    pub fn share_acquire_mode(&self) -> ShareAcquireMode {
        self.share_acquire_mode
    }

    /// Set Acknowledgement Commit Callback.
    pub fn set_acknowledgement_commit_callback<F>(&mut self, callback: F)
    where
        F: Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static,
    {
        self.acknowledgement_commit_callback = Some(Arc::new(callback));
    }

    /// Subscribe.
    pub async fn subscribe(&mut self, topics: Vec<String>) -> Result<()> {
        self.subscriptions = topics;
        self.refresh_metadata_if_needed().await?;
        self.heartbeat_with_retries(true).await?;
        Ok(())
    }

    /// Subscription.
    pub fn subscription(&self) -> &[String] {
        &self.subscriptions
    }

    /// Member Id.
    pub fn member_id(&self) -> &str {
        &self.member_id
    }

    /// Member Epoch.
    pub fn member_epoch(&self) -> i32 {
        self.member_epoch
    }

    /// Assignment.
    pub fn assignment(&self) -> Vec<TopicPartition> {
        let mut assignment = self
            .assignments
            .values()
            .map(|partition| TopicPartition::new(partition.topic.clone(), partition.partition))
            .collect::<Vec<_>>();
        assignment.sort_by(|left, right| {
            left.topic
                .cmp(&right.topic)
                .then(left.partition.cmp(&right.partition))
        });
        assignment
    }

    /// Polls Kafka for records.
    pub async fn poll(&mut self) -> Result<ShareRecords> {
        self.poll_for(self.config.fetch_max_wait).await
    }

    /// Poll For.
    pub async fn poll_for(&mut self, timeout: Duration) -> Result<ShareRecords> {
        self.refresh_metadata_if_needed().await?;
        self.heartbeat_with_retries(false).await?;

        let mut records = Vec::new();
        let mut assignments_by_leader =
            self.assignments_by_leader().into_iter().collect::<Vec<_>>();
        assignments_by_leader.sort_by_key(|(leader_id, _)| *leader_id);

        match self.share_acquire_mode {
            ShareAcquireMode::BatchOptimized => {
                for (leader_id, assignments) in assignments_by_leader {
                    records.extend(
                        self.fetch_from_leader(leader_id, assignments, timeout, true)
                            .await?,
                    );
                }
            }
            ShareAcquireMode::RecordLimit => {
                if assignments_by_leader.is_empty() {
                    return Ok(ShareRecords::new(records));
                }

                let fetch_index = self.next_record_limit_leader_index % assignments_by_leader.len();
                self.next_record_limit_leader_index =
                    (self.next_record_limit_leader_index + 1) % assignments_by_leader.len();

                for (index, (leader_id, assignments)) in
                    assignments_by_leader.into_iter().enumerate()
                {
                    let fetch_records = index == fetch_index;
                    if fetch_records || self.has_acks_for_assignments(&assignments) {
                        records.extend(
                            self.fetch_from_leader(leader_id, assignments, timeout, fetch_records)
                                .await?,
                        );
                    }
                }
            }
        }
        Ok(ShareRecords::new(records))
    }

    /// Acknowledge.
    pub fn acknowledge(&mut self, record: &ShareRecord, acknowledge_type: AcknowledgeType) {
        let Some(topic_id) = self.metadata.topic_id(&record.record.topic) else {
            return;
        };
        let key = TopicIdPartitionKey {
            topic_id,
            topic: record.record.topic.clone(),
            partition: record.record.partition,
        };
        self.pending_acks
            .entry(key)
            .or_default()
            .add(record.record.offset, acknowledge_type);
    }

    /// Commit Sync.
    pub async fn commit_sync(&mut self) -> Result<()> {
        let acknowledgements = std::mem::take(&mut self.pending_acks);
        if acknowledgements.is_empty() {
            return Ok(());
        }

        let mut grouped = HashMap::<i32, HashMap<TopicIdPartitionKey, Acknowledgements>>::new();
        for (key, acks) in acknowledgements {
            if acks.is_empty() {
                continue;
            }
            let Some(leader_id) = self.metadata.leader_for(&key.topic, key.partition) else {
                self.pending_acks.insert(key, acks);
                for (_, remaining_acks) in grouped {
                    self.restore_acknowledgements(remaining_acks);
                }
                return Err(anyhow!(
                    "cannot acknowledge share record without a current partition leader"
                )
                .into());
            };
            grouped.entry(leader_id).or_default().insert(key, acks);
        }

        let mut groups = grouped.into_iter().collect::<Vec<_>>();
        while let Some((leader_id, acks)) = groups.pop() {
            if let Err(error) = self
                .send_acknowledge_to_leader(leader_id, acks, false)
                .await
            {
                for (_, remaining_acks) in groups {
                    self.restore_acknowledgements(remaining_acks);
                }
                return Err(error.into());
            }
        }
        Ok(())
    }

    /// Shuts the client down and waits for in-flight work to finish.
    pub async fn shutdown(mut self) -> Result<()> {
        let _ = self.commit_sync().await;
        let leader_ids = self
            .share_session_epochs
            .keys()
            .copied()
            .collect::<Vec<_>>();
        for leader_id in leader_ids {
            let _ = self
                .send_acknowledge_to_leader(leader_id, HashMap::new(), true)
                .await;
        }
        let _ = self.leave_group().await;
        Ok(())
    }
}