1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_bool, get_i32, get_i64, get_i8, put_bool, put_i32, put_i64, put_i8};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
10 string_len,
11};
12use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
13use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
14
15pub const API_KEY: i16 = 78;
16pub const MIN_VERSION: i16 = 1;
17pub const MAX_VERSION: i16 = 2;
18pub const FLEXIBLE_MIN: i16 = 0;
19
20#[inline]
21fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct ShareFetchRequest {
25 pub group_id: Option<String>,
26 pub member_id: Option<String>,
27 pub share_session_epoch: i32,
28 pub max_wait_ms: i32,
29 pub min_bytes: i32,
30 pub max_bytes: i32,
31 pub max_records: i32,
32 pub batch_size: i32,
33 pub share_acquire_mode: i8,
34 pub is_renew_ack: bool,
35 pub topics: Vec<FetchTopic>,
36 pub forgotten_topics_data: Vec<ForgottenTopic>,
37 pub unknown_tagged_fields: UnknownTaggedFields,
38}
39
40impl Default for ShareFetchRequest {
41 fn default() -> Self {
42 Self {
43 group_id: None,
44 member_id: None,
45 share_session_epoch: 0i32,
46 max_wait_ms: 0i32,
47 min_bytes: 0i32,
48 max_bytes: 2_147_483_647i32,
49 max_records: 0i32,
50 batch_size: 0i32,
51 share_acquire_mode: 0i8,
52 is_renew_ack: false,
53 topics: Vec::new(),
54 forgotten_topics_data: Vec::new(),
55 unknown_tagged_fields: Default::default(),
56 }
57 }
58}
59
60impl Encode for ShareFetchRequest {
61 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
62 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
63 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
64 }
65 let flex = is_flexible(version);
66 if version >= 0 { if flex { put_compact_nullable_string(buf, self.group_id.as_deref()) } else { put_nullable_string(buf, self.group_id.as_deref()) } }
67 if version >= 0 { if flex { put_compact_nullable_string(buf, self.member_id.as_deref()) } else { put_nullable_string(buf, self.member_id.as_deref()) } }
68 if version >= 0 { put_i32(buf, self.share_session_epoch) }
69 if version >= 0 { put_i32(buf, self.max_wait_ms) }
70 if version >= 0 { put_i32(buf, self.min_bytes) }
71 if version >= 0 { put_i32(buf, self.max_bytes) }
72 if version >= 1 { put_i32(buf, self.max_records) }
73 if version >= 1 { put_i32(buf, self.batch_size) }
74 if version >= 2 { put_i8(buf, self.share_acquire_mode) }
75 if version >= 2 { put_bool(buf, self.is_renew_ack) }
76 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
77 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.forgotten_topics_data).len(), flex); for it in &self.forgotten_topics_data { it.encode(buf, version)?; } } }
78 if flex {
79 let tagged = WriteTaggedFields::new();
80 tagged.write(buf, &self.unknown_tagged_fields);
81 }
82 Ok(())
83 }
84 fn encoded_len(&self, version: i16) -> usize {
85 let flex = is_flexible(version);
86 let mut n: usize = 0;
87 if version >= 0 { n += if flex { compact_nullable_string_len(self.group_id.as_deref()) } else { nullable_string_len(self.group_id.as_deref()) }; }
88 if version >= 0 { n += if flex { compact_nullable_string_len(self.member_id.as_deref()) } else { nullable_string_len(self.member_id.as_deref()) }; }
89 if version >= 0 { n += 4; }
90 if version >= 0 { n += 4; }
91 if version >= 0 { n += 4; }
92 if version >= 0 { n += 4; }
93 if version >= 1 { n += 4; }
94 if version >= 1 { n += 4; }
95 if version >= 2 { n += 1; }
96 if version >= 2 { n += 1; }
97 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
98 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.forgotten_topics_data).len(), flex); let body: usize = (self.forgotten_topics_data).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
99 if flex {
100 let known_pairs: Vec<(u32, usize)> = Vec::new();
101 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
102 }
103 n
104 }
105}
106
107impl<'de> Decode<'de> for ShareFetchRequest {
108 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
109 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
110 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
111 }
112 let flex = is_flexible(version);
113 let mut out = Self::default();
114 if version >= 0 { out.group_id = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
115 if version >= 0 { out.member_id = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
116 if version >= 0 { out.share_session_epoch = get_i32(buf)?; }
117 if version >= 0 { out.max_wait_ms = get_i32(buf)?; }
118 if version >= 0 { out.min_bytes = get_i32(buf)?; }
119 if version >= 0 { out.max_bytes = get_i32(buf)?; }
120 if version >= 1 { out.max_records = get_i32(buf)?; }
121 if version >= 1 { out.batch_size = get_i32(buf)?; }
122 if version >= 2 { out.share_acquire_mode = get_i8(buf)?; }
123 if version >= 2 { out.is_renew_ack = get_bool(buf)?; }
124 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchTopic::decode(buf, version)?); } v }; }
125 if version >= 0 { out.forgotten_topics_data = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(ForgottenTopic::decode(buf, version)?); } v }; }
126 if flex {
127 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
128 Ok(false)
129 })?;
130 }
131 Ok(out)
132 }
133}
134
135#[derive(Debug, Clone, PartialEq, Eq, Default)]
136pub struct FetchTopic {
137 pub topic_id: crate::primitives::uuid::Uuid,
138 pub partitions: Vec<FetchPartition>,
139 pub unknown_tagged_fields: UnknownTaggedFields,
140}
141
142impl Encode for FetchTopic {
143 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
144 let flex = version >= 0;
145 if version >= 0 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
146 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
147 if flex {
148 let tagged = WriteTaggedFields::new();
149 tagged.write(buf, &self.unknown_tagged_fields);
150 }
151 Ok(())
152 }
153 fn encoded_len(&self, version: i16) -> usize {
154 let flex = version >= 0;
155 let mut n: usize = 0;
156 if version >= 0 { n += 16; }
157 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
158 if flex {
159 let known_pairs: Vec<(u32, usize)> = Vec::new();
160 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
161 }
162 n
163 }
164}
165
166impl<'de> Decode<'de> for FetchTopic {
167 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
168 let flex = version >= 0;
169 let mut out = Self::default();
170 if version >= 0 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
171 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchPartition::decode(buf, version)?); } v }; }
172 if flex {
173 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
174 Ok(false)
175 })?;
176 }
177 Ok(out)
178 }
179}
180
181#[derive(Debug, Clone, PartialEq, Eq, Default)]
182pub struct FetchPartition {
183 pub partition_index: i32,
184 pub partition_max_bytes: i32,
185 pub acknowledgement_batches: Vec<AcknowledgementBatch>,
186 pub unknown_tagged_fields: UnknownTaggedFields,
187}
188
189impl Encode for FetchPartition {
190 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
191 let flex = version >= 0;
192 if version >= 0 { put_i32(buf, self.partition_index) }
193 if version >= 0 && version <= 0 { put_i32(buf, self.partition_max_bytes) }
194 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.acknowledgement_batches).len(), flex); for it in &self.acknowledgement_batches { it.encode(buf, version)?; } } }
195 if flex {
196 let tagged = WriteTaggedFields::new();
197 tagged.write(buf, &self.unknown_tagged_fields);
198 }
199 Ok(())
200 }
201 fn encoded_len(&self, version: i16) -> usize {
202 let flex = version >= 0;
203 let mut n: usize = 0;
204 if version >= 0 { n += 4; }
205 if version >= 0 && version <= 0 { n += 4; }
206 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.acknowledgement_batches).len(), flex); let body: usize = (self.acknowledgement_batches).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
207 if flex {
208 let known_pairs: Vec<(u32, usize)> = Vec::new();
209 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
210 }
211 n
212 }
213}
214
215impl<'de> Decode<'de> for FetchPartition {
216 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
217 let flex = version >= 0;
218 let mut out = Self::default();
219 if version >= 0 { out.partition_index = get_i32(buf)?; }
220 if version >= 0 && version <= 0 { out.partition_max_bytes = get_i32(buf)?; }
221 if version >= 0 { out.acknowledgement_batches = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(AcknowledgementBatch::decode(buf, version)?); } v }; }
222 if flex {
223 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
224 Ok(false)
225 })?;
226 }
227 Ok(out)
228 }
229}
230
231#[derive(Debug, Clone, PartialEq, Eq, Default)]
232pub struct AcknowledgementBatch {
233 pub first_offset: i64,
234 pub last_offset: i64,
235 pub acknowledge_types: Vec<i8>,
236 pub unknown_tagged_fields: UnknownTaggedFields,
237}
238
239impl Encode for AcknowledgementBatch {
240 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
241 let flex = version >= 0;
242 if version >= 0 { put_i64(buf, self.first_offset) }
243 if version >= 0 { put_i64(buf, self.last_offset) }
244 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.acknowledge_types).len(), flex); for it in &self.acknowledge_types { put_i8(buf, *it); } } }
245 if flex {
246 let tagged = WriteTaggedFields::new();
247 tagged.write(buf, &self.unknown_tagged_fields);
248 }
249 Ok(())
250 }
251 fn encoded_len(&self, version: i16) -> usize {
252 let flex = version >= 0;
253 let mut n: usize = 0;
254 if version >= 0 { n += 8; }
255 if version >= 0 { n += 8; }
256 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.acknowledge_types).len(), flex); let body: usize = (self.acknowledge_types).iter().map(|_| 1).sum(); prefix + body }; }
257 if flex {
258 let known_pairs: Vec<(u32, usize)> = Vec::new();
259 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
260 }
261 n
262 }
263}
264
265impl<'de> Decode<'de> for AcknowledgementBatch {
266 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
267 let flex = version >= 0;
268 let mut out = Self::default();
269 if version >= 0 { out.first_offset = get_i64(buf)?; }
270 if version >= 0 { out.last_offset = get_i64(buf)?; }
271 if version >= 0 { out.acknowledge_types = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i8(buf)?); } v }; }
272 if flex {
273 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
274 Ok(false)
275 })?;
276 }
277 Ok(out)
278 }
279}
280
281#[derive(Debug, Clone, PartialEq, Eq, Default)]
282pub struct ForgottenTopic {
283 pub topic_id: crate::primitives::uuid::Uuid,
284 pub partitions: Vec<i32>,
285 pub unknown_tagged_fields: UnknownTaggedFields,
286}
287
288impl Encode for ForgottenTopic {
289 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
290 let flex = version >= 0;
291 if version >= 0 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
292 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { put_i32(buf, *it); } } }
293 if flex {
294 let tagged = WriteTaggedFields::new();
295 tagged.write(buf, &self.unknown_tagged_fields);
296 }
297 Ok(())
298 }
299 fn encoded_len(&self, version: i16) -> usize {
300 let flex = version >= 0;
301 let mut n: usize = 0;
302 if version >= 0 { n += 16; }
303 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|_| 4).sum(); prefix + body }; }
304 if flex {
305 let known_pairs: Vec<(u32, usize)> = Vec::new();
306 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
307 }
308 n
309 }
310}
311
312impl<'de> Decode<'de> for ForgottenTopic {
313 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
314 let flex = version >= 0;
315 let mut out = Self::default();
316 if version >= 0 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
317 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i32(buf)?); } v }; }
318 if flex {
319 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
320 Ok(false)
321 })?;
322 }
323 Ok(out)
324 }
325}
326
327#[must_use]
330#[allow(unused_comparisons)]
331pub fn default_json(version: i16) -> ::serde_json::Value {
332 let mut obj = ::serde_json::Map::new();
333 obj.insert("groupId".to_string(), ::serde_json::Value::Null);
334 obj.insert("memberId".to_string(), ::serde_json::Value::Null);
335 obj.insert("shareSessionEpoch".to_string(), ::serde_json::json!(0));
336 obj.insert("maxWaitMs".to_string(), ::serde_json::json!(0));
337 obj.insert("minBytes".to_string(), ::serde_json::json!(0));
338 obj.insert("maxBytes".to_string(), ::serde_json::json!(2147483647));
339 if version >= 1 {
340 obj.insert("maxRecords".to_string(), ::serde_json::json!(0));
341 }
342 if version >= 1 {
343 obj.insert("batchSize".to_string(), ::serde_json::json!(0));
344 }
345 if version >= 2 {
346 obj.insert("shareAcquireMode".to_string(), ::serde_json::json!(0));
347 }
348 if version >= 2 {
349 obj.insert("isRenewAck".to_string(), ::serde_json::Value::Bool(false));
350 }
351 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
352 obj.insert("forgottenTopicsData".to_string(), ::serde_json::Value::Array(vec![]));
353 ::serde_json::Value::Object(obj)
354}
355
356impl crate::ProtocolRequest for ShareFetchRequest {
357 const API_KEY: i16 = API_KEY;
358 const MIN_VERSION: i16 = MIN_VERSION;
359 const MAX_VERSION: i16 = MAX_VERSION;
360 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
361 type Response = super::share_fetch_response::ShareFetchResponse;
362}