kafkit_client/consumer/share/
types.rs1use std::sync::Arc;
2
3use uuid::Uuid;
4
5use crate::types::ConsumerRecord;
6
7pub(super) const DEFAULT_SHARE_MAX_POLL_RECORDS: i32 = 500;
8
9pub type AcknowledgementCommitCallback =
11 Arc<dyn Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static>;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum AcknowledgeType {
16 Accept,
18 Release,
20 Reject,
22}
23
24impl AcknowledgeType {
25 pub(super) fn protocol_value(self) -> i8 {
26 match self {
27 Self::Accept => 1,
28 Self::Release => 2,
29 Self::Reject => 3,
30 }
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct ShareRecord {
37 pub record: ConsumerRecord,
39 pub delivery_count: i16,
41}
42
43#[derive(Debug, Clone, Default)]
44pub struct ShareRecords {
46 records: Vec<ShareRecord>,
47}
48
49impl ShareRecords {
50 pub fn new(records: Vec<ShareRecord>) -> Self {
52 Self { records }
53 }
54
55 pub fn is_empty(&self) -> bool {
57 self.records.is_empty()
58 }
59
60 pub fn len(&self) -> usize {
62 self.records.len()
63 }
64
65 pub fn iter(&self) -> impl Iterator<Item = &ShareRecord> {
67 self.records.iter()
68 }
69
70 pub fn into_inner(self) -> Vec<ShareRecord> {
72 self.records
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
77pub enum ShareAcquireMode {
79 #[default]
81 BatchOptimized,
82 RecordLimit,
84}
85
86impl ShareAcquireMode {
87 pub fn as_str(self) -> &'static str {
89 match self {
90 Self::BatchOptimized => "batch_optimized",
91 Self::RecordLimit => "record_limit",
92 }
93 }
94
95 pub fn protocol_id(self) -> i8 {
97 match self {
98 Self::BatchOptimized => 0,
99 Self::RecordLimit => 1,
100 }
101 }
102}
103
104#[derive(Clone)]
105pub struct ShareConsumerOptions {
107 pub share_acquire_mode: ShareAcquireMode,
109 pub max_poll_records: i32,
111 pub acknowledgement_commit_callback: Option<AcknowledgementCommitCallback>,
113}
114
115impl Default for ShareConsumerOptions {
116 fn default() -> Self {
117 Self {
118 share_acquire_mode: ShareAcquireMode::BatchOptimized,
119 max_poll_records: DEFAULT_SHARE_MAX_POLL_RECORDS,
120 acknowledgement_commit_callback: None,
121 }
122 }
123}
124
125impl ShareConsumerOptions {
126 pub fn with_share_acquire_mode(mut self, share_acquire_mode: ShareAcquireMode) -> Self {
128 self.share_acquire_mode = share_acquire_mode;
129 self
130 }
131
132 pub fn with_max_poll_records(mut self, max_poll_records: i32) -> Self {
134 self.max_poll_records = max_poll_records.max(1);
135 self
136 }
137
138 pub fn with_acknowledgement_commit_callback<F>(mut self, callback: F) -> Self
140 where
141 F: Fn(Vec<ShareAcknowledgementCommit>) + Send + Sync + 'static,
142 {
143 self.acknowledgement_commit_callback = Some(Arc::new(callback));
144 self
145 }
146}
147
148#[derive(Debug, Clone, PartialEq, Eq)]
149pub struct ShareAcknowledgementCommit {
151 pub topic: String,
153 pub topic_id: Uuid,
155 pub partition: i32,
157 pub offsets: Vec<i64>,
159 pub error: Option<String>,
161}
162
163#[derive(Debug, Clone)]
164pub(super) struct ShareAssignment {
165 pub(super) topic_id: Uuid,
166 pub(super) topic: String,
167 pub(super) partition: i32,
168 pub(super) leader_id: i32,
169 pub(super) leader_epoch: i32,
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
173pub(super) struct TopicIdPartitionKey {
174 pub(super) topic_id: Uuid,
175 pub(super) topic: String,
176 pub(super) partition: i32,
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182
183 #[test]
184 fn share_acquire_mode_matches_java_names_and_ids() {
185 assert_eq!(ShareAcquireMode::BatchOptimized.as_str(), "batch_optimized");
186 assert_eq!(ShareAcquireMode::BatchOptimized.protocol_id(), 0);
187 assert_eq!(ShareAcquireMode::RecordLimit.as_str(), "record_limit");
188 assert_eq!(ShareAcquireMode::RecordLimit.protocol_id(), 1);
189 }
190}