Skip to main content

kafkit_client/consumer/share/
types.rs

1use std::sync::Arc;
2
3use uuid::Uuid;
4
5use crate::types::ConsumerRecord;
6
7pub(super) const DEFAULT_SHARE_MAX_POLL_RECORDS: i32 = 500;
8
9/// Alias for acknowledgement commit callback.
10pub type AcknowledgementCommitCallback =
11    Arc<dyn Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static>;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14/// Acknowledge Type.
15pub enum AcknowledgeType {
16    /// Accept.
17    Accept,
18    /// Release.
19    Release,
20    /// Reject.
21    Reject,
22}
23
24impl AcknowledgeType {
25    pub(super) fn protocol_value(self) -> i8 {
26        match self {
27            Self::Accept => 1,
28            Self::Release => 2,
29            Self::Reject => 3,
30        }
31    }
32}
33
34#[derive(Debug, Clone)]
35/// Share Record.
36pub struct ShareRecord {
37    /// Record.
38    pub record: ConsumerRecord,
39    /// Delivery Count.
40    pub delivery_count: i16,
41}
42
43#[derive(Debug, Clone, Default)]
44/// Share Records.
45pub struct ShareRecords {
46    records: Vec<ShareRecord>,
47}
48
49impl ShareRecords {
50    /// Creates a new value.
51    pub fn new(records: Vec<ShareRecord>) -> Self {
52        Self { records }
53    }
54
55    /// Returns whether empty.
56    pub fn is_empty(&self) -> bool {
57        self.records.is_empty()
58    }
59
60    /// Returns the number of records.
61    pub fn len(&self) -> usize {
62        self.records.len()
63    }
64
65    /// Iterates over the records without taking ownership.
66    pub fn iter(&self) -> impl Iterator<Item = &ShareRecord> {
67        self.records.iter()
68    }
69
70    /// Converts this value into inner.
71    pub fn into_inner(self) -> Vec<ShareRecord> {
72        self.records
73    }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
77/// Share Acquire Mode.
78pub enum ShareAcquireMode {
79    /// Batch optimized.
80    #[default]
81    BatchOptimized,
82    /// Record limit.
83    RecordLimit,
84}
85
86impl ShareAcquireMode {
87    /// Returns the Kafka-style name for this value.
88    pub fn as_str(self) -> &'static str {
89        match self {
90            Self::BatchOptimized => "batch_optimized",
91            Self::RecordLimit => "record_limit",
92        }
93    }
94
95    /// Returns the protocol id used by Kafka.
96    pub fn protocol_id(self) -> i8 {
97        match self {
98            Self::BatchOptimized => 0,
99            Self::RecordLimit => 1,
100        }
101    }
102}
103
104#[derive(Clone)]
105/// Share Consumer Options.
106pub struct ShareConsumerOptions {
107    /// Share Acquire Mode.
108    pub share_acquire_mode: ShareAcquireMode,
109    /// Max Poll Records.
110    pub max_poll_records: i32,
111    /// Acknowledgement Commit Callback.
112    pub acknowledgement_commit_callback: Option<AcknowledgementCommitCallback>,
113}
114
115impl Default for ShareConsumerOptions {
116    fn default() -> Self {
117        Self {
118            share_acquire_mode: ShareAcquireMode::BatchOptimized,
119            max_poll_records: DEFAULT_SHARE_MAX_POLL_RECORDS,
120            acknowledgement_commit_callback: None,
121        }
122    }
123}
124
125impl ShareConsumerOptions {
126    /// Sets share acquire mode and returns the updated value.
127    pub fn with_share_acquire_mode(mut self, share_acquire_mode: ShareAcquireMode) -> Self {
128        self.share_acquire_mode = share_acquire_mode;
129        self
130    }
131
132    /// Sets max poll records and returns the updated value.
133    pub fn with_max_poll_records(mut self, max_poll_records: i32) -> Self {
134        self.max_poll_records = max_poll_records.max(1);
135        self
136    }
137
138    /// Sets acknowledgement commit callback and returns the updated value.
139    pub fn with_acknowledgement_commit_callback<F>(mut self, callback: F) -> Self
140    where
141        F: Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static,
142    {
143        self.acknowledgement_commit_callback = Some(Arc::new(callback));
144        self
145    }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149/// Share Acknowledgement Commit.
150pub struct ShareAcknowledgementCommit {
151    /// Topic name.
152    pub topic: String,
153    /// Topic Id.
154    pub topic_id: Uuid,
155    /// Partition number.
156    pub partition: i32,
157    /// Offsets.
158    pub offsets: Vec<i64>,
159    /// Error.
160    pub error: Option<String>,
161}
162
163#[derive(Debug, Clone)]
164pub(super) struct ShareAssignment {
165    pub(super) topic_id: Uuid,
166    pub(super) topic: String,
167    pub(super) partition: i32,
168    pub(super) leader_id: i32,
169    pub(super) leader_epoch: i32,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
173pub(super) struct TopicIdPartitionKey {
174    pub(super) topic_id: Uuid,
175    pub(super) topic: String,
176    pub(super) partition: i32,
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn share_acquire_mode_matches_java_names_and_ids() {
185        assert_eq!(ShareAcquireMode::BatchOptimized.as_str(), "batch_optimized");
186        assert_eq!(ShareAcquireMode::BatchOptimized.protocol_id(), 0);
187        assert_eq!(ShareAcquireMode::RecordLimit.as_str(), "record_limit");
188        assert_eq!(ShareAcquireMode::RecordLimit.protocol_id(), 1);
189    }
190}