kafkit-client 0.1.2

Kafka 4.0+ pure Rust client.
Documentation
use std::sync::Arc;

use uuid::Uuid;

use crate::types::ConsumerRecord;

pub(super) const DEFAULT_SHARE_MAX_POLL_RECORDS: i32 = 500;

/// Alias for acknowledgement commit callback.
pub type AcknowledgementCommitCallback =
    Arc<dyn Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static>;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// Acknowledge Type.
pub enum AcknowledgeType {
    /// Accept.
    Accept,
    /// Release.
    Release,
    /// Reject.
    Reject,
}

impl AcknowledgeType {
    pub(super) fn protocol_value(self) -> i8 {
        match self {
            Self::Accept => 1,
            Self::Release => 2,
            Self::Reject => 3,
        }
    }
}

#[derive(Debug, Clone)]
/// Share Record.
pub struct ShareRecord {
    /// Record.
    pub record: ConsumerRecord,
    /// Delivery Count.
    pub delivery_count: i16,
}

#[derive(Debug, Clone, Default)]
/// Share Records.
pub struct ShareRecords {
    records: Vec<ShareRecord>,
}

impl ShareRecords {
    /// Creates a new value.
    pub fn new(records: Vec<ShareRecord>) -> Self {
        Self { records }
    }

    /// Returns whether empty.
    pub fn is_empty(&self) -> bool {
        self.records.is_empty()
    }

    /// Returns the number of records.
    pub fn len(&self) -> usize {
        self.records.len()
    }

    /// Iterates over the records without taking ownership.
    pub fn iter(&self) -> impl Iterator<Item = &ShareRecord> {
        self.records.iter()
    }

    /// Converts this value into inner.
    pub fn into_inner(self) -> Vec<ShareRecord> {
        self.records
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
/// Share Acquire Mode.
pub enum ShareAcquireMode {
    /// Batch optimized.
    #[default]
    BatchOptimized,
    /// Record limit.
    RecordLimit,
}

impl ShareAcquireMode {
    /// Returns the Kafka-style name for this value.
    pub fn as_str(self) -> &'static str {
        match self {
            Self::BatchOptimized => "batch_optimized",
            Self::RecordLimit => "record_limit",
        }
    }

    /// Returns the protocol id used by Kafka.
    pub fn protocol_id(self) -> i8 {
        match self {
            Self::BatchOptimized => 0,
            Self::RecordLimit => 1,
        }
    }
}

#[derive(Clone)]
/// Share Consumer Options.
pub struct ShareConsumerOptions {
    /// Share Acquire Mode.
    pub share_acquire_mode: ShareAcquireMode,
    /// Max Poll Records.
    pub max_poll_records: i32,
    /// Acknowledgement Commit Callback.
    pub acknowledgement_commit_callback: Option<AcknowledgementCommitCallback>,
}

impl Default for ShareConsumerOptions {
    fn default() -> Self {
        Self {
            share_acquire_mode: ShareAcquireMode::BatchOptimized,
            max_poll_records: DEFAULT_SHARE_MAX_POLL_RECORDS,
            acknowledgement_commit_callback: None,
        }
    }
}

impl ShareConsumerOptions {
    /// Sets share acquire mode and returns the updated value.
    pub fn with_share_acquire_mode(mut self, share_acquire_mode: ShareAcquireMode) -> Self {
        self.share_acquire_mode = share_acquire_mode;
        self
    }

    /// Sets max poll records and returns the updated value.
    pub fn with_max_poll_records(mut self, max_poll_records: i32) -> Self {
        self.max_poll_records = max_poll_records.max(1);
        self
    }

    /// Sets acknowledgement commit callback and returns the updated value.
    pub fn with_acknowledgement_commit_callback<F>(mut self, callback: F) -> Self
    where
        F: Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static,
    {
        self.acknowledgement_commit_callback = Some(Arc::new(callback));
        self
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
/// Share Acknowledgement Commit.
pub struct ShareAcknowledgementCommit {
    /// Topic name.
    pub topic: String,
    /// Topic Id.
    pub topic_id: Uuid,
    /// Partition number.
    pub partition: i32,
    /// Offsets.
    pub offsets: Vec<i64>,
    /// Error.
    pub error: Option<String>,
}

#[derive(Debug, Clone)]
pub(super) struct ShareAssignment {
    pub(super) topic_id: Uuid,
    pub(super) topic: String,
    pub(super) partition: i32,
    pub(super) leader_id: i32,
    pub(super) leader_epoch: i32,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(super) struct TopicIdPartitionKey {
    pub(super) topic_id: Uuid,
    pub(super) topic: String,
    pub(super) partition: i32,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn share_acquire_mode_matches_java_names_and_ids() {
        assert_eq!(ShareAcquireMode::BatchOptimized.as_str(), "batch_optimized");
        assert_eq!(ShareAcquireMode::BatchOptimized.protocol_id(), 0);
        assert_eq!(ShareAcquireMode::RecordLimit.as_str(), "record_limit");
        assert_eq!(ShareAcquireMode::RecordLimit.protocol_id(), 1);
    }
}