use std::sync::Arc;
use uuid::Uuid;
use crate::types::ConsumerRecord;
pub(super) const DEFAULT_SHARE_MAX_POLL_RECORDS: i32 = 500;
pub type AcknowledgementCommitCallback =
Arc<dyn Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AcknowledgeType {
Accept,
Release,
Reject,
}
impl AcknowledgeType {
pub(super) fn protocol_value(self) -> i8 {
match self {
Self::Accept => 1,
Self::Release => 2,
Self::Reject => 3,
}
}
}
#[derive(Debug, Clone)]
pub struct ShareRecord {
pub record: ConsumerRecord,
pub delivery_count: i16,
}
#[derive(Debug, Clone, Default)]
pub struct ShareRecords {
records: Vec<ShareRecord>,
}
impl ShareRecords {
pub fn new(records: Vec<ShareRecord>) -> Self {
Self { records }
}
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
pub fn len(&self) -> usize {
self.records.len()
}
pub fn iter(&self) -> impl Iterator<Item = &ShareRecord> {
self.records.iter()
}
pub fn into_inner(self) -> Vec<ShareRecord> {
self.records
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ShareAcquireMode {
#[default]
BatchOptimized,
RecordLimit,
}
impl ShareAcquireMode {
pub fn as_str(self) -> &'static str {
match self {
Self::BatchOptimized => "batch_optimized",
Self::RecordLimit => "record_limit",
}
}
pub fn protocol_id(self) -> i8 {
match self {
Self::BatchOptimized => 0,
Self::RecordLimit => 1,
}
}
}
#[derive(Clone)]
pub struct ShareConsumerOptions {
pub share_acquire_mode: ShareAcquireMode,
pub max_poll_records: i32,
pub acknowledgement_commit_callback: Option<AcknowledgementCommitCallback>,
}
impl Default for ShareConsumerOptions {
fn default() -> Self {
Self {
share_acquire_mode: ShareAcquireMode::BatchOptimized,
max_poll_records: DEFAULT_SHARE_MAX_POLL_RECORDS,
acknowledgement_commit_callback: None,
}
}
}
impl ShareConsumerOptions {
pub fn with_share_acquire_mode(mut self, share_acquire_mode: ShareAcquireMode) -> Self {
self.share_acquire_mode = share_acquire_mode;
self
}
pub fn with_max_poll_records(mut self, max_poll_records: i32) -> Self {
self.max_poll_records = max_poll_records.max(1);
self
}
pub fn with_acknowledgement_commit_callback<F>(mut self, callback: F) -> Self
where
F: Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static,
{
self.acknowledgement_commit_callback = Some(Arc::new(callback));
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShareAcknowledgementCommit {
pub topic: String,
pub topic_id: Uuid,
pub partition: i32,
pub offsets: Vec<i64>,
pub error: Option<String>,
}
#[derive(Debug, Clone)]
pub(super) struct ShareAssignment {
pub(super) topic_id: Uuid,
pub(super) topic: String,
pub(super) partition: i32,
pub(super) leader_id: i32,
pub(super) leader_epoch: i32,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub(super) struct TopicIdPartitionKey {
pub(super) topic_id: Uuid,
pub(super) topic: String,
pub(super) partition: i32,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn share_acquire_mode_matches_java_names_and_ids() {
assert_eq!(ShareAcquireMode::BatchOptimized.as_str(), "batch_optimized");
assert_eq!(ShareAcquireMode::BatchOptimized.protocol_id(), 0);
assert_eq!(ShareAcquireMode::RecordLimit.as_str(), "record_limit");
assert_eq!(ShareAcquireMode::RecordLimit.protocol_id(), 1);
}
}