1use crate::primitives::fixed::{
3 get_bool, get_i8, get_i32, get_i64, put_bool, put_i8, put_i32, put_i64,
4};
5use crate::primitives::string_bytes::{
6 compact_nullable_string_len, nullable_string_len, put_compact_nullable_string,
7 put_nullable_string,
8};
9use crate::primitives::string_bytes_borrowed::{
10 get_compact_nullable_string_borrowed, get_nullable_string_borrowed,
11};
12use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
13use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
14use bytes::BufMut;
15pub const API_KEY: i16 = 78;
16pub const MIN_VERSION: i16 = 1;
17pub const MAX_VERSION: i16 = 2;
18pub const FLEXIBLE_MIN: i16 = 0;
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct ShareFetchRequest<'a> {
25 pub group_id: Option<&'a str>,
26 pub member_id: Option<&'a str>,
27 pub share_session_epoch: i32,
28 pub max_wait_ms: i32,
29 pub min_bytes: i32,
30 pub max_bytes: i32,
31 pub max_records: i32,
32 pub batch_size: i32,
33 pub share_acquire_mode: i8,
34 pub is_renew_ack: bool,
35 pub topics: Vec<FetchTopic>,
36 pub forgotten_topics_data: Vec<ForgottenTopic>,
37 pub unknown_tagged_fields: UnknownTaggedFields,
38}
39impl Default for ShareFetchRequest<'_> {
40 fn default() -> Self {
41 Self {
42 group_id: None,
43 member_id: None,
44 share_session_epoch: 0i32,
45 max_wait_ms: 0i32,
46 min_bytes: 0i32,
47 max_bytes: 2_147_483_647i32,
48 max_records: 0i32,
49 batch_size: 0i32,
50 share_acquire_mode: 0i8,
51 is_renew_ack: false,
52 topics: Vec::new(),
53 forgotten_topics_data: Vec::new(),
54 unknown_tagged_fields: Default::default(),
55 }
56 }
57}
58impl ShareFetchRequest<'_> {
59 pub fn to_owned(&self) -> crate::owned::share_fetch_request::ShareFetchRequest {
60 crate::owned::share_fetch_request::ShareFetchRequest {
61 group_id: (self.group_id).map(std::string::ToString::to_string),
62 member_id: (self.member_id).map(std::string::ToString::to_string),
63 share_session_epoch: (self.share_session_epoch),
64 max_wait_ms: (self.max_wait_ms),
65 min_bytes: (self.min_bytes),
66 max_bytes: (self.max_bytes),
67 max_records: (self.max_records),
68 batch_size: (self.batch_size),
69 share_acquire_mode: (self.share_acquire_mode),
70 is_renew_ack: (self.is_renew_ack),
71 topics: (self.topics).iter().map(FetchTopic::to_owned).collect(),
72 forgotten_topics_data: (self.forgotten_topics_data)
73 .iter()
74 .map(ForgottenTopic::to_owned)
75 .collect(),
76 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
77 }
78 }
79}
80impl Encode for ShareFetchRequest<'_> {
81 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
82 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
83 return Err(ProtocolError::UnsupportedVersion {
84 api_key: API_KEY,
85 version,
86 });
87 }
88 let flex = is_flexible(version);
89 if version >= 0 {
90 if flex {
91 put_compact_nullable_string(buf, self.group_id);
92 } else {
93 put_nullable_string(buf, self.group_id);
94 }
95 }
96 if version >= 0 {
97 if flex {
98 put_compact_nullable_string(buf, self.member_id);
99 } else {
100 put_nullable_string(buf, self.member_id);
101 }
102 }
103 if version >= 0 {
104 put_i32(buf, self.share_session_epoch);
105 }
106 if version >= 0 {
107 put_i32(buf, self.max_wait_ms);
108 }
109 if version >= 0 {
110 put_i32(buf, self.min_bytes);
111 }
112 if version >= 0 {
113 put_i32(buf, self.max_bytes);
114 }
115 if version >= 1 {
116 put_i32(buf, self.max_records);
117 }
118 if version >= 1 {
119 put_i32(buf, self.batch_size);
120 }
121 if version >= 2 {
122 put_i8(buf, self.share_acquire_mode);
123 }
124 if version >= 2 {
125 put_bool(buf, self.is_renew_ack);
126 }
127 if version >= 0 {
128 {
129 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
130 for it in &self.topics {
131 it.encode(buf, version)?;
132 }
133 }
134 }
135 if version >= 0 {
136 {
137 crate::primitives::array::put_array_len(
138 buf,
139 (self.forgotten_topics_data).len(),
140 flex,
141 );
142 for it in &self.forgotten_topics_data {
143 it.encode(buf, version)?;
144 }
145 }
146 }
147 if flex {
148 let tagged = WriteTaggedFields::new();
149 tagged.write(buf, &self.unknown_tagged_fields);
150 }
151 Ok(())
152 }
153 fn encoded_len(&self, version: i16) -> usize {
154 let flex = is_flexible(version);
155 let mut n: usize = 0;
156 if version >= 0 {
157 n += if flex {
158 compact_nullable_string_len(self.group_id)
159 } else {
160 nullable_string_len(self.group_id)
161 };
162 }
163 if version >= 0 {
164 n += if flex {
165 compact_nullable_string_len(self.member_id)
166 } else {
167 nullable_string_len(self.member_id)
168 };
169 }
170 if version >= 0 {
171 n += 4;
172 }
173 if version >= 0 {
174 n += 4;
175 }
176 if version >= 0 {
177 n += 4;
178 }
179 if version >= 0 {
180 n += 4;
181 }
182 if version >= 1 {
183 n += 4;
184 }
185 if version >= 1 {
186 n += 4;
187 }
188 if version >= 2 {
189 n += 1;
190 }
191 if version >= 2 {
192 n += 1;
193 }
194 if version >= 0 {
195 n += {
196 let prefix =
197 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
198 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
199 prefix + body
200 };
201 }
202 if version >= 0 {
203 n += {
204 let prefix = crate::primitives::array::array_len_prefix_len(
205 (self.forgotten_topics_data).len(),
206 flex,
207 );
208 let body: usize = (self.forgotten_topics_data)
209 .iter()
210 .map(|it| it.encoded_len(version))
211 .sum();
212 prefix + body
213 };
214 }
215 if flex {
216 let known_pairs: Vec<(u32, usize)> = Vec::new();
217 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
218 }
219 n
220 }
221}
222impl<'de> DecodeBorrow<'de> for ShareFetchRequest<'de> {
223 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
224 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
225 return Err(ProtocolError::UnsupportedVersion {
226 api_key: API_KEY,
227 version,
228 });
229 }
230 let flex = is_flexible(version);
231 let mut out = Self::default();
232 if version >= 0 {
233 out.group_id = if flex {
234 get_compact_nullable_string_borrowed(buf)?
235 } else {
236 get_nullable_string_borrowed(buf)?
237 };
238 }
239 if version >= 0 {
240 out.member_id = if flex {
241 get_compact_nullable_string_borrowed(buf)?
242 } else {
243 get_nullable_string_borrowed(buf)?
244 };
245 }
246 if version >= 0 {
247 out.share_session_epoch = get_i32(buf)?;
248 }
249 if version >= 0 {
250 out.max_wait_ms = get_i32(buf)?;
251 }
252 if version >= 0 {
253 out.min_bytes = get_i32(buf)?;
254 }
255 if version >= 0 {
256 out.max_bytes = get_i32(buf)?;
257 }
258 if version >= 1 {
259 out.max_records = get_i32(buf)?;
260 }
261 if version >= 1 {
262 out.batch_size = get_i32(buf)?;
263 }
264 if version >= 2 {
265 out.share_acquire_mode = get_i8(buf)?;
266 }
267 if version >= 2 {
268 out.is_renew_ack = get_bool(buf)?;
269 }
270 if version >= 0 {
271 out.topics = {
272 let n = crate::primitives::array::get_array_len(buf, flex)?;
273 let mut v = Vec::with_capacity(n);
274 for _ in 0..n {
275 v.push(FetchTopic::decode_borrow(buf, version)?);
276 }
277 v
278 };
279 }
280 if version >= 0 {
281 out.forgotten_topics_data = {
282 let n = crate::primitives::array::get_array_len(buf, flex)?;
283 let mut v = Vec::with_capacity(n);
284 for _ in 0..n {
285 v.push(ForgottenTopic::decode_borrow(buf, version)?);
286 }
287 v
288 };
289 }
290 if flex {
291 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
292 }
293 Ok(out)
294 }
295}
296#[cfg(test)]
297impl ShareFetchRequest<'_> {
298 #[must_use]
299 pub fn populated(version: i16) -> Self {
300 let mut m = Self::default();
301 if version >= 0 {
302 m.group_id = Some("x");
303 }
304 if version >= 0 {
305 m.member_id = Some("x");
306 }
307 if version >= 0 {
308 m.share_session_epoch = 1i32;
309 }
310 if version >= 0 {
311 m.max_wait_ms = 1i32;
312 }
313 if version >= 0 {
314 m.min_bytes = 1i32;
315 }
316 if version >= 0 {
317 m.max_bytes = 1i32;
318 }
319 if version >= 1 {
320 m.max_records = 1i32;
321 }
322 if version >= 1 {
323 m.batch_size = 1i32;
324 }
325 if version >= 2 {
326 m.share_acquire_mode = 1i8;
327 }
328 if version >= 2 {
329 m.is_renew_ack = true;
330 }
331 if version >= 0 {
332 m.topics = vec![FetchTopic::populated(version)];
333 }
334 if version >= 0 {
335 m.forgotten_topics_data = vec![ForgottenTopic::populated(version)];
336 }
337 m
338 }
339}
340#[derive(Debug, Clone, PartialEq, Eq, Default)]
341pub struct FetchTopic {
342 pub topic_id: crate::primitives::uuid::Uuid,
343 pub partitions: Vec<FetchPartition>,
344 pub unknown_tagged_fields: UnknownTaggedFields,
345}
346impl FetchTopic {
347 pub fn to_owned(&self) -> crate::owned::share_fetch_request::FetchTopic {
348 crate::owned::share_fetch_request::FetchTopic {
349 topic_id: (self.topic_id),
350 partitions: (self.partitions)
351 .iter()
352 .map(FetchPartition::to_owned)
353 .collect(),
354 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
355 }
356 }
357}
358impl Encode for FetchTopic {
359 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
360 let flex = version >= 0;
361 if version >= 0 {
362 crate::primitives::uuid::put_uuid(buf, self.topic_id);
363 }
364 if version >= 0 {
365 {
366 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
367 for it in &self.partitions {
368 it.encode(buf, version)?;
369 }
370 }
371 }
372 if flex {
373 let tagged = WriteTaggedFields::new();
374 tagged.write(buf, &self.unknown_tagged_fields);
375 }
376 Ok(())
377 }
378 fn encoded_len(&self, version: i16) -> usize {
379 let flex = version >= 0;
380 let mut n: usize = 0;
381 if version >= 0 {
382 n += 16;
383 }
384 if version >= 0 {
385 n += {
386 let prefix =
387 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
388 let body: usize = (self.partitions)
389 .iter()
390 .map(|it| it.encoded_len(version))
391 .sum();
392 prefix + body
393 };
394 }
395 if flex {
396 let known_pairs: Vec<(u32, usize)> = Vec::new();
397 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
398 }
399 n
400 }
401}
402impl<'de> DecodeBorrow<'de> for FetchTopic {
403 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
404 let flex = version >= 0;
405 let mut out = Self::default();
406 if version >= 0 {
407 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
408 }
409 if version >= 0 {
410 out.partitions = {
411 let n = crate::primitives::array::get_array_len(buf, flex)?;
412 let mut v = Vec::with_capacity(n);
413 for _ in 0..n {
414 v.push(FetchPartition::decode_borrow(buf, version)?);
415 }
416 v
417 };
418 }
419 if flex {
420 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
421 }
422 Ok(out)
423 }
424}
425#[cfg(test)]
426impl FetchTopic {
427 #[must_use]
428 pub fn populated(version: i16) -> Self {
429 let mut m = Self::default();
430 if version >= 0 {
431 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
432 }
433 if version >= 0 {
434 m.partitions = vec![FetchPartition::populated(version)];
435 }
436 m
437 }
438}
439#[derive(Debug, Clone, PartialEq, Eq, Default)]
440pub struct FetchPartition {
441 pub partition_index: i32,
442 pub partition_max_bytes: i32,
443 pub acknowledgement_batches: Vec<AcknowledgementBatch>,
444 pub unknown_tagged_fields: UnknownTaggedFields,
445}
446impl FetchPartition {
447 pub fn to_owned(&self) -> crate::owned::share_fetch_request::FetchPartition {
448 crate::owned::share_fetch_request::FetchPartition {
449 partition_index: (self.partition_index),
450 partition_max_bytes: (self.partition_max_bytes),
451 acknowledgement_batches: (self.acknowledgement_batches)
452 .iter()
453 .map(AcknowledgementBatch::to_owned)
454 .collect(),
455 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
456 }
457 }
458}
459impl Encode for FetchPartition {
460 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
461 let flex = version >= 0;
462 if version >= 0 {
463 put_i32(buf, self.partition_index);
464 }
465 if version == 0 {
466 put_i32(buf, self.partition_max_bytes);
467 }
468 if version >= 0 {
469 {
470 crate::primitives::array::put_array_len(
471 buf,
472 (self.acknowledgement_batches).len(),
473 flex,
474 );
475 for it in &self.acknowledgement_batches {
476 it.encode(buf, version)?;
477 }
478 }
479 }
480 if flex {
481 let tagged = WriteTaggedFields::new();
482 tagged.write(buf, &self.unknown_tagged_fields);
483 }
484 Ok(())
485 }
486 fn encoded_len(&self, version: i16) -> usize {
487 let flex = version >= 0;
488 let mut n: usize = 0;
489 if version >= 0 {
490 n += 4;
491 }
492 if version == 0 {
493 n += 4;
494 }
495 if version >= 0 {
496 n += {
497 let prefix = crate::primitives::array::array_len_prefix_len(
498 (self.acknowledgement_batches).len(),
499 flex,
500 );
501 let body: usize = (self.acknowledgement_batches)
502 .iter()
503 .map(|it| it.encoded_len(version))
504 .sum();
505 prefix + body
506 };
507 }
508 if flex {
509 let known_pairs: Vec<(u32, usize)> = Vec::new();
510 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
511 }
512 n
513 }
514}
515impl<'de> DecodeBorrow<'de> for FetchPartition {
516 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
517 let flex = version >= 0;
518 let mut out = Self::default();
519 if version >= 0 {
520 out.partition_index = get_i32(buf)?;
521 }
522 if version == 0 {
523 out.partition_max_bytes = get_i32(buf)?;
524 }
525 if version >= 0 {
526 out.acknowledgement_batches = {
527 let n = crate::primitives::array::get_array_len(buf, flex)?;
528 let mut v = Vec::with_capacity(n);
529 for _ in 0..n {
530 v.push(AcknowledgementBatch::decode_borrow(buf, version)?);
531 }
532 v
533 };
534 }
535 if flex {
536 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
537 }
538 Ok(out)
539 }
540}
541#[cfg(test)]
542impl FetchPartition {
543 #[must_use]
544 pub fn populated(version: i16) -> Self {
545 let mut m = Self::default();
546 if version >= 0 {
547 m.partition_index = 1i32;
548 }
549 if version == 0 {
550 m.partition_max_bytes = 1i32;
551 }
552 if version >= 0 {
553 m.acknowledgement_batches = vec![AcknowledgementBatch::populated(version)];
554 }
555 m
556 }
557}
558#[derive(Debug, Clone, PartialEq, Eq, Default)]
559pub struct AcknowledgementBatch {
560 pub first_offset: i64,
561 pub last_offset: i64,
562 pub acknowledge_types: Vec<i8>,
563 pub unknown_tagged_fields: UnknownTaggedFields,
564}
565impl AcknowledgementBatch {
566 pub fn to_owned(&self) -> crate::owned::share_fetch_request::AcknowledgementBatch {
567 crate::owned::share_fetch_request::AcknowledgementBatch {
568 first_offset: (self.first_offset),
569 last_offset: (self.last_offset),
570 acknowledge_types: (self.acknowledge_types).clone(),
571 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
572 }
573 }
574}
575impl Encode for AcknowledgementBatch {
576 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
577 let flex = version >= 0;
578 if version >= 0 {
579 put_i64(buf, self.first_offset);
580 }
581 if version >= 0 {
582 put_i64(buf, self.last_offset);
583 }
584 if version >= 0 {
585 {
586 crate::primitives::array::put_array_len(buf, (self.acknowledge_types).len(), flex);
587 for it in &self.acknowledge_types {
588 put_i8(buf, *it);
589 }
590 }
591 }
592 if flex {
593 let tagged = WriteTaggedFields::new();
594 tagged.write(buf, &self.unknown_tagged_fields);
595 }
596 Ok(())
597 }
598 fn encoded_len(&self, version: i16) -> usize {
599 let flex = version >= 0;
600 let mut n: usize = 0;
601 if version >= 0 {
602 n += 8;
603 }
604 if version >= 0 {
605 n += 8;
606 }
607 if version >= 0 {
608 n += {
609 let prefix = crate::primitives::array::array_len_prefix_len(
610 (self.acknowledge_types).len(),
611 flex,
612 );
613 let body: usize = (self.acknowledge_types).iter().map(|_| 1).sum();
614 prefix + body
615 };
616 }
617 if flex {
618 let known_pairs: Vec<(u32, usize)> = Vec::new();
619 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
620 }
621 n
622 }
623}
624impl<'de> DecodeBorrow<'de> for AcknowledgementBatch {
625 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
626 let flex = version >= 0;
627 let mut out = Self::default();
628 if version >= 0 {
629 out.first_offset = get_i64(buf)?;
630 }
631 if version >= 0 {
632 out.last_offset = get_i64(buf)?;
633 }
634 if version >= 0 {
635 out.acknowledge_types = {
636 let n = crate::primitives::array::get_array_len(buf, flex)?;
637 let mut v = Vec::with_capacity(n);
638 for _ in 0..n {
639 v.push(get_i8(buf)?);
640 }
641 v
642 };
643 }
644 if flex {
645 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
646 }
647 Ok(out)
648 }
649}
650#[cfg(test)]
651impl AcknowledgementBatch {
652 #[must_use]
653 pub fn populated(version: i16) -> Self {
654 let mut m = Self::default();
655 if version >= 0 {
656 m.first_offset = 1i64;
657 }
658 if version >= 0 {
659 m.last_offset = 1i64;
660 }
661 if version >= 0 {
662 m.acknowledge_types = vec![1i8];
663 }
664 m
665 }
666}
667#[derive(Debug, Clone, PartialEq, Eq, Default)]
668pub struct ForgottenTopic {
669 pub topic_id: crate::primitives::uuid::Uuid,
670 pub partitions: Vec<i32>,
671 pub unknown_tagged_fields: UnknownTaggedFields,
672}
673impl ForgottenTopic {
674 pub fn to_owned(&self) -> crate::owned::share_fetch_request::ForgottenTopic {
675 crate::owned::share_fetch_request::ForgottenTopic {
676 topic_id: (self.topic_id),
677 partitions: (self.partitions).clone(),
678 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
679 }
680 }
681}
682impl Encode for ForgottenTopic {
683 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
684 let flex = version >= 0;
685 if version >= 0 {
686 crate::primitives::uuid::put_uuid(buf, self.topic_id);
687 }
688 if version >= 0 {
689 {
690 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
691 for it in &self.partitions {
692 put_i32(buf, *it);
693 }
694 }
695 }
696 if flex {
697 let tagged = WriteTaggedFields::new();
698 tagged.write(buf, &self.unknown_tagged_fields);
699 }
700 Ok(())
701 }
702 fn encoded_len(&self, version: i16) -> usize {
703 let flex = version >= 0;
704 let mut n: usize = 0;
705 if version >= 0 {
706 n += 16;
707 }
708 if version >= 0 {
709 n += {
710 let prefix =
711 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
712 let body: usize = (self.partitions).iter().map(|_| 4).sum();
713 prefix + body
714 };
715 }
716 if flex {
717 let known_pairs: Vec<(u32, usize)> = Vec::new();
718 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
719 }
720 n
721 }
722}
723impl<'de> DecodeBorrow<'de> for ForgottenTopic {
724 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
725 let flex = version >= 0;
726 let mut out = Self::default();
727 if version >= 0 {
728 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
729 }
730 if version >= 0 {
731 out.partitions = {
732 let n = crate::primitives::array::get_array_len(buf, flex)?;
733 let mut v = Vec::with_capacity(n);
734 for _ in 0..n {
735 v.push(get_i32(buf)?);
736 }
737 v
738 };
739 }
740 if flex {
741 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
742 }
743 Ok(out)
744 }
745}
746#[cfg(test)]
747impl ForgottenTopic {
748 #[must_use]
749 pub fn populated(version: i16) -> Self {
750 let mut m = Self::default();
751 if version >= 0 {
752 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
753 }
754 if version >= 0 {
755 m.partitions = vec![1i32];
756 }
757 m
758 }
759}