1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_bool, get_i32, get_i64, get_i8, put_bool, put_i32, put_i64, put_i8};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
9 string_len,
10};
11use crate::primitives::string_bytes_borrowed::{
12 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
13 get_nullable_string_borrowed, get_string_borrowed,
14};
15use crate::tagged_fields::{read_tagged_fields, tagged_fields_len, WriteTaggedFields};
16use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 78;
19pub const MIN_VERSION: i16 = 1;
20pub const MAX_VERSION: i16 = 2;
21pub const FLEXIBLE_MIN: i16 = 0;
22
23#[inline]
24fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ShareFetchRequest<'a> {
28 pub group_id: Option<&'a str>,
29 pub member_id: Option<&'a str>,
30 pub share_session_epoch: i32,
31 pub max_wait_ms: i32,
32 pub min_bytes: i32,
33 pub max_bytes: i32,
34 pub max_records: i32,
35 pub batch_size: i32,
36 pub share_acquire_mode: i8,
37 pub is_renew_ack: bool,
38 pub topics: Vec<FetchTopic>,
39 pub forgotten_topics_data: Vec<ForgottenTopic>,
40 pub unknown_tagged_fields: UnknownTaggedFields,
41}
42
43impl<'a> Default for ShareFetchRequest<'a> {
44 fn default() -> Self {
45 Self {
46 group_id: None,
47 member_id: None,
48 share_session_epoch: 0i32,
49 max_wait_ms: 0i32,
50 min_bytes: 0i32,
51 max_bytes: 2_147_483_647i32,
52 max_records: 0i32,
53 batch_size: 0i32,
54 share_acquire_mode: 0i8,
55 is_renew_ack: false,
56 topics: Vec::new(),
57 forgotten_topics_data: Vec::new(),
58 unknown_tagged_fields: Default::default(),
59 }
60 }
61}
62
63impl<'a> ShareFetchRequest<'a> {
64 pub fn to_owned(&self) -> crate::owned::share_fetch_request::ShareFetchRequest {
65 crate::owned::share_fetch_request::ShareFetchRequest {
66 group_id: (self.group_id).map(|s| s.to_string()),
67 member_id: (self.member_id).map(|s| s.to_string()),
68 share_session_epoch: (self.share_session_epoch),
69 max_wait_ms: (self.max_wait_ms),
70 min_bytes: (self.min_bytes),
71 max_bytes: (self.max_bytes),
72 max_records: (self.max_records),
73 batch_size: (self.batch_size),
74 share_acquire_mode: (self.share_acquire_mode),
75 is_renew_ack: (self.is_renew_ack),
76 topics: (self.topics).iter().map(|it| it.to_owned()).collect(),
77 forgotten_topics_data: (self.forgotten_topics_data).iter().map(|it| it.to_owned()).collect(),
78 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
79 }
80 }
81}
82
83impl<'a> Encode for ShareFetchRequest<'a> {
84 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
85 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
86 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
87 }
88 let flex = is_flexible(version);
89 if version >= 0 { if flex { put_compact_nullable_string(buf, self.group_id) } else { put_nullable_string(buf, self.group_id) } }
90 if version >= 0 { if flex { put_compact_nullable_string(buf, self.member_id) } else { put_nullable_string(buf, self.member_id) } }
91 if version >= 0 { put_i32(buf, self.share_session_epoch) }
92 if version >= 0 { put_i32(buf, self.max_wait_ms) }
93 if version >= 0 { put_i32(buf, self.min_bytes) }
94 if version >= 0 { put_i32(buf, self.max_bytes) }
95 if version >= 1 { put_i32(buf, self.max_records) }
96 if version >= 1 { put_i32(buf, self.batch_size) }
97 if version >= 2 { put_i8(buf, self.share_acquire_mode) }
98 if version >= 2 { put_bool(buf, self.is_renew_ack) }
99 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.topics).len(), flex); for it in &self.topics { it.encode(buf, version)?; } } }
100 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.forgotten_topics_data).len(), flex); for it in &self.forgotten_topics_data { it.encode(buf, version)?; } } }
101 if flex {
102 let tagged = WriteTaggedFields::new();
103 tagged.write(buf, &self.unknown_tagged_fields);
104 }
105 Ok(())
106 }
107 fn encoded_len(&self, version: i16) -> usize {
108 let flex = is_flexible(version);
109 let mut n: usize = 0;
110 if version >= 0 { n += if flex { compact_nullable_string_len(self.group_id) } else { nullable_string_len(self.group_id) }; }
111 if version >= 0 { n += if flex { compact_nullable_string_len(self.member_id) } else { nullable_string_len(self.member_id) }; }
112 if version >= 0 { n += 4; }
113 if version >= 0 { n += 4; }
114 if version >= 0 { n += 4; }
115 if version >= 0 { n += 4; }
116 if version >= 1 { n += 4; }
117 if version >= 1 { n += 4; }
118 if version >= 2 { n += 1; }
119 if version >= 2 { n += 1; }
120 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.topics).len(), flex); let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
121 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.forgotten_topics_data).len(), flex); let body: usize = (self.forgotten_topics_data).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
122 if flex {
123 let known_pairs: Vec<(u32, usize)> = Vec::new();
124 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
125 }
126 n
127 }
128}
129
130impl<'de> DecodeBorrow<'de> for ShareFetchRequest<'de> {
131 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
132 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
133 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
134 }
135 let flex = is_flexible(version);
136 let mut out = Self::default();
137 if version >= 0 { out.group_id = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
138 if version >= 0 { out.member_id = if flex { get_compact_nullable_string_borrowed(buf)? } else { get_nullable_string_borrowed(buf)? }; }
139 if version >= 0 { out.share_session_epoch = get_i32(buf)?; }
140 if version >= 0 { out.max_wait_ms = get_i32(buf)?; }
141 if version >= 0 { out.min_bytes = get_i32(buf)?; }
142 if version >= 0 { out.max_bytes = get_i32(buf)?; }
143 if version >= 1 { out.max_records = get_i32(buf)?; }
144 if version >= 1 { out.batch_size = get_i32(buf)?; }
145 if version >= 2 { out.share_acquire_mode = get_i8(buf)?; }
146 if version >= 2 { out.is_renew_ack = get_bool(buf)?; }
147 if version >= 0 { out.topics = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchTopic::decode_borrow(buf, version)?); } v }; }
148 if version >= 0 { out.forgotten_topics_data = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(ForgottenTopic::decode_borrow(buf, version)?); } v }; }
149 if flex {
150 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
151 Ok(false)
152 })?;
153 }
154 Ok(out)
155 }
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct FetchTopic {
160 pub topic_id: crate::primitives::uuid::Uuid,
161 pub partitions: Vec<FetchPartition>,
162 pub unknown_tagged_fields: UnknownTaggedFields,
163}
164
165impl Default for FetchTopic {
166 fn default() -> Self {
167 Self {
168 topic_id: Default::default(),
169 partitions: Vec::new(),
170 unknown_tagged_fields: Default::default(),
171 }
172 }
173}
174
175impl FetchTopic {
176 pub fn to_owned(&self) -> crate::owned::share_fetch_request::FetchTopic {
177 crate::owned::share_fetch_request::FetchTopic {
178 topic_id: (self.topic_id),
179 partitions: (self.partitions).iter().map(|it| it.to_owned()).collect(),
180 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
181 }
182 }
183}
184
185impl Encode for FetchTopic {
186 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
187 let flex = version >= 0;
188 if version >= 0 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
189 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { it.encode(buf, version)?; } } }
190 if flex {
191 let tagged = WriteTaggedFields::new();
192 tagged.write(buf, &self.unknown_tagged_fields);
193 }
194 Ok(())
195 }
196 fn encoded_len(&self, version: i16) -> usize {
197 let flex = version >= 0;
198 let mut n: usize = 0;
199 if version >= 0 { n += 16; }
200 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
201 if flex {
202 let known_pairs: Vec<(u32, usize)> = Vec::new();
203 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
204 }
205 n
206 }
207}
208
209impl<'de> DecodeBorrow<'de> for FetchTopic {
210 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
211 let flex = version >= 0;
212 let mut out = Self::default();
213 if version >= 0 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
214 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(FetchPartition::decode_borrow(buf, version)?); } v }; }
215 if flex {
216 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
217 Ok(false)
218 })?;
219 }
220 Ok(out)
221 }
222}
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct FetchPartition {
226 pub partition_index: i32,
227 pub partition_max_bytes: i32,
228 pub acknowledgement_batches: Vec<AcknowledgementBatch>,
229 pub unknown_tagged_fields: UnknownTaggedFields,
230}
231
232impl Default for FetchPartition {
233 fn default() -> Self {
234 Self {
235 partition_index: 0i32,
236 partition_max_bytes: 0i32,
237 acknowledgement_batches: Vec::new(),
238 unknown_tagged_fields: Default::default(),
239 }
240 }
241}
242
243impl FetchPartition {
244 pub fn to_owned(&self) -> crate::owned::share_fetch_request::FetchPartition {
245 crate::owned::share_fetch_request::FetchPartition {
246 partition_index: (self.partition_index),
247 partition_max_bytes: (self.partition_max_bytes),
248 acknowledgement_batches: (self.acknowledgement_batches).iter().map(|it| it.to_owned()).collect(),
249 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
250 }
251 }
252}
253
254impl Encode for FetchPartition {
255 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
256 let flex = version >= 0;
257 if version >= 0 { put_i32(buf, self.partition_index) }
258 if version >= 0 && version <= 0 { put_i32(buf, self.partition_max_bytes) }
259 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.acknowledgement_batches).len(), flex); for it in &self.acknowledgement_batches { it.encode(buf, version)?; } } }
260 if flex {
261 let tagged = WriteTaggedFields::new();
262 tagged.write(buf, &self.unknown_tagged_fields);
263 }
264 Ok(())
265 }
266 fn encoded_len(&self, version: i16) -> usize {
267 let flex = version >= 0;
268 let mut n: usize = 0;
269 if version >= 0 { n += 4; }
270 if version >= 0 && version <= 0 { n += 4; }
271 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.acknowledgement_batches).len(), flex); let body: usize = (self.acknowledgement_batches).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
272 if flex {
273 let known_pairs: Vec<(u32, usize)> = Vec::new();
274 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
275 }
276 n
277 }
278}
279
280impl<'de> DecodeBorrow<'de> for FetchPartition {
281 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
282 let flex = version >= 0;
283 let mut out = Self::default();
284 if version >= 0 { out.partition_index = get_i32(buf)?; }
285 if version >= 0 && version <= 0 { out.partition_max_bytes = get_i32(buf)?; }
286 if version >= 0 { out.acknowledgement_batches = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(AcknowledgementBatch::decode_borrow(buf, version)?); } v }; }
287 if flex {
288 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
289 Ok(false)
290 })?;
291 }
292 Ok(out)
293 }
294}
295
296#[derive(Debug, Clone, PartialEq, Eq)]
297pub struct AcknowledgementBatch {
298 pub first_offset: i64,
299 pub last_offset: i64,
300 pub acknowledge_types: Vec<i8>,
301 pub unknown_tagged_fields: UnknownTaggedFields,
302}
303
304impl Default for AcknowledgementBatch {
305 fn default() -> Self {
306 Self {
307 first_offset: 0i64,
308 last_offset: 0i64,
309 acknowledge_types: Vec::new(),
310 unknown_tagged_fields: Default::default(),
311 }
312 }
313}
314
315impl AcknowledgementBatch {
316 pub fn to_owned(&self) -> crate::owned::share_fetch_request::AcknowledgementBatch {
317 crate::owned::share_fetch_request::AcknowledgementBatch {
318 first_offset: (self.first_offset),
319 last_offset: (self.last_offset),
320 acknowledge_types: (self.acknowledge_types).clone(),
321 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
322 }
323 }
324}
325
326impl Encode for AcknowledgementBatch {
327 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
328 let flex = version >= 0;
329 if version >= 0 { put_i64(buf, self.first_offset) }
330 if version >= 0 { put_i64(buf, self.last_offset) }
331 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.acknowledge_types).len(), flex); for it in &self.acknowledge_types { put_i8(buf, *it); } } }
332 if flex {
333 let tagged = WriteTaggedFields::new();
334 tagged.write(buf, &self.unknown_tagged_fields);
335 }
336 Ok(())
337 }
338 fn encoded_len(&self, version: i16) -> usize {
339 let flex = version >= 0;
340 let mut n: usize = 0;
341 if version >= 0 { n += 8; }
342 if version >= 0 { n += 8; }
343 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.acknowledge_types).len(), flex); let body: usize = (self.acknowledge_types).iter().map(|_| 1).sum(); prefix + body }; }
344 if flex {
345 let known_pairs: Vec<(u32, usize)> = Vec::new();
346 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
347 }
348 n
349 }
350}
351
352impl<'de> DecodeBorrow<'de> for AcknowledgementBatch {
353 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
354 let flex = version >= 0;
355 let mut out = Self::default();
356 if version >= 0 { out.first_offset = get_i64(buf)?; }
357 if version >= 0 { out.last_offset = get_i64(buf)?; }
358 if version >= 0 { out.acknowledge_types = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i8(buf)?); } v }; }
359 if flex {
360 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
361 Ok(false)
362 })?;
363 }
364 Ok(out)
365 }
366}
367
368#[derive(Debug, Clone, PartialEq, Eq)]
369pub struct ForgottenTopic {
370 pub topic_id: crate::primitives::uuid::Uuid,
371 pub partitions: Vec<i32>,
372 pub unknown_tagged_fields: UnknownTaggedFields,
373}
374
375impl Default for ForgottenTopic {
376 fn default() -> Self {
377 Self {
378 topic_id: Default::default(),
379 partitions: Vec::new(),
380 unknown_tagged_fields: Default::default(),
381 }
382 }
383}
384
385impl ForgottenTopic {
386 pub fn to_owned(&self) -> crate::owned::share_fetch_request::ForgottenTopic {
387 crate::owned::share_fetch_request::ForgottenTopic {
388 topic_id: (self.topic_id),
389 partitions: (self.partitions).clone(),
390 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
391 }
392 }
393}
394
395impl Encode for ForgottenTopic {
396 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
397 let flex = version >= 0;
398 if version >= 0 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
399 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex); for it in &self.partitions { put_i32(buf, *it); } } }
400 if flex {
401 let tagged = WriteTaggedFields::new();
402 tagged.write(buf, &self.unknown_tagged_fields);
403 }
404 Ok(())
405 }
406 fn encoded_len(&self, version: i16) -> usize {
407 let flex = version >= 0;
408 let mut n: usize = 0;
409 if version >= 0 { n += 16; }
410 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex); let body: usize = (self.partitions).iter().map(|_| 4).sum(); prefix + body }; }
411 if flex {
412 let known_pairs: Vec<(u32, usize)> = Vec::new();
413 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
414 }
415 n
416 }
417}
418
419impl<'de> DecodeBorrow<'de> for ForgottenTopic {
420 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
421 let flex = version >= 0;
422 let mut out = Self::default();
423 if version >= 0 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
424 if version >= 0 { out.partitions = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(get_i32(buf)?); } v }; }
425 if flex {
426 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
427 Ok(false)
428 })?;
429 }
430 Ok(out)
431 }
432}