1use crate::primitives::fixed::{get_i8, get_i32, get_i64, put_i8, put_i32, put_i64};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::tagged_fields::{
10 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
11};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13use bytes::{Buf, BufMut};
14pub const API_KEY: i16 = 1;
15pub const MIN_VERSION: i16 = 4;
16pub const MAX_VERSION: i16 = 18;
17pub const FLEXIBLE_MIN: i16 = 12;
18#[inline]
19fn is_flexible(version: i16) -> bool {
20 version >= FLEXIBLE_MIN
21}
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct FetchRequest {
24 pub replica_id: i32,
25 pub max_wait_ms: i32,
26 pub min_bytes: i32,
27 pub max_bytes: i32,
28 pub isolation_level: i8,
29 pub session_id: i32,
30 pub session_epoch: i32,
31 pub topics: Vec<FetchTopic>,
32 pub forgotten_topics_data: Vec<ForgottenTopic>,
33 pub rack_id: String,
34 pub cluster_id: Option<String>,
35 pub replica_state: ReplicaState,
36 pub unknown_tagged_fields: UnknownTaggedFields,
37}
38impl Default for FetchRequest {
39 fn default() -> Self {
40 Self {
41 replica_id: -1i32,
42 max_wait_ms: 0i32,
43 min_bytes: 0i32,
44 max_bytes: 2_147_483_647i32,
45 isolation_level: 0i8,
46 session_id: 0i32,
47 session_epoch: -1i32,
48 topics: Vec::new(),
49 forgotten_topics_data: Vec::new(),
50 rack_id: String::new(),
51 cluster_id: None,
52 replica_state: Default::default(),
53 unknown_tagged_fields: Default::default(),
54 }
55 }
56}
57impl Encode for FetchRequest {
58 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
59 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
60 return Err(ProtocolError::UnsupportedVersion {
61 api_key: API_KEY,
62 version,
63 });
64 }
65 let flex = is_flexible(version);
66 if (0..=14).contains(&version) {
67 put_i32(buf, self.replica_id);
68 }
69 if version >= 0 {
70 put_i32(buf, self.max_wait_ms);
71 }
72 if version >= 0 {
73 put_i32(buf, self.min_bytes);
74 }
75 if version >= 3 {
76 put_i32(buf, self.max_bytes);
77 }
78 if version >= 4 {
79 put_i8(buf, self.isolation_level);
80 }
81 if version >= 7 {
82 put_i32(buf, self.session_id);
83 }
84 if version >= 7 {
85 put_i32(buf, self.session_epoch);
86 }
87 if version >= 0 {
88 {
89 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
90 for it in &self.topics {
91 it.encode(buf, version)?;
92 }
93 }
94 }
95 if version >= 7 {
96 {
97 crate::primitives::array::put_array_len(
98 buf,
99 (self.forgotten_topics_data).len(),
100 flex,
101 );
102 for it in &self.forgotten_topics_data {
103 it.encode(buf, version)?;
104 }
105 }
106 }
107 if version >= 11 {
108 if flex {
109 put_compact_string(buf, &self.rack_id);
110 } else {
111 put_string(buf, &self.rack_id);
112 }
113 }
114 if flex {
115 let mut tagged = WriteTaggedFields::new();
116 if !(self.cluster_id.is_none()) {
117 let payload = encode_to_bytes(
118 if flex {
119 compact_nullable_string_len(self.cluster_id.as_deref())
120 } else {
121 nullable_string_len(self.cluster_id.as_deref())
122 },
123 |b| {
124 if flex {
125 put_compact_nullable_string(b, self.cluster_id.as_deref());
126 } else {
127 put_nullable_string(b, self.cluster_id.as_deref());
128 }
129 Ok(())
130 },
131 );
132 tagged.add(0, payload);
133 }
134 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
135 let payload = encode_to_bytes(self.replica_state.encoded_len(version), |b| {
136 self.replica_state.encode(b, version)?;
137 Ok(())
138 });
139 tagged.add(1, payload);
140 }
141 tagged.write(buf, &self.unknown_tagged_fields);
142 }
143 Ok(())
144 }
145 fn encoded_len(&self, version: i16) -> usize {
146 let flex = is_flexible(version);
147 let mut n: usize = 0;
148 if (0..=14).contains(&version) {
149 n += 4;
150 }
151 if version >= 0 {
152 n += 4;
153 }
154 if version >= 0 {
155 n += 4;
156 }
157 if version >= 3 {
158 n += 4;
159 }
160 if version >= 4 {
161 n += 1;
162 }
163 if version >= 7 {
164 n += 4;
165 }
166 if version >= 7 {
167 n += 4;
168 }
169 if version >= 0 {
170 n += {
171 let prefix =
172 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
173 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
174 prefix + body
175 };
176 }
177 if version >= 7 {
178 n += {
179 let prefix = crate::primitives::array::array_len_prefix_len(
180 (self.forgotten_topics_data).len(),
181 flex,
182 );
183 let body: usize = (self.forgotten_topics_data)
184 .iter()
185 .map(|it| it.encoded_len(version))
186 .sum();
187 prefix + body
188 };
189 }
190 if version >= 11 {
191 n += if flex {
192 compact_string_len(&self.rack_id)
193 } else {
194 string_len(&self.rack_id)
195 };
196 }
197 if flex {
198 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
199 if !(self.cluster_id.is_none()) {
200 known_pairs.push((
201 0,
202 if flex {
203 compact_nullable_string_len(self.cluster_id.as_deref())
204 } else {
205 nullable_string_len(self.cluster_id.as_deref())
206 },
207 ));
208 }
209 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
210 known_pairs.push((1, self.replica_state.encoded_len(version)));
211 }
212 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
213 }
214 n
215 }
216}
217impl Decode<'_> for FetchRequest {
218 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
219 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
220 return Err(ProtocolError::UnsupportedVersion {
221 api_key: API_KEY,
222 version,
223 });
224 }
225 let flex = is_flexible(version);
226 let mut out = Self::default();
227 if (0..=14).contains(&version) {
228 out.replica_id = get_i32(buf)?;
229 }
230 if version >= 0 {
231 out.max_wait_ms = get_i32(buf)?;
232 }
233 if version >= 0 {
234 out.min_bytes = get_i32(buf)?;
235 }
236 if version >= 3 {
237 out.max_bytes = get_i32(buf)?;
238 }
239 if version >= 4 {
240 out.isolation_level = get_i8(buf)?;
241 }
242 if version >= 7 {
243 out.session_id = get_i32(buf)?;
244 }
245 if version >= 7 {
246 out.session_epoch = get_i32(buf)?;
247 }
248 if version >= 0 {
249 out.topics = {
250 let n = crate::primitives::array::get_array_len(buf, flex)?;
251 let mut v = Vec::with_capacity(n);
252 for _ in 0..n {
253 v.push(FetchTopic::decode(buf, version)?);
254 }
255 v
256 };
257 }
258 if version >= 7 {
259 out.forgotten_topics_data = {
260 let n = crate::primitives::array::get_array_len(buf, flex)?;
261 let mut v = Vec::with_capacity(n);
262 for _ in 0..n {
263 v.push(ForgottenTopic::decode(buf, version)?);
264 }
265 v
266 };
267 }
268 if version >= 11 {
269 out.rack_id = if flex {
270 get_compact_string_owned(buf)?
271 } else {
272 get_string_owned(buf)?
273 };
274 }
275 if flex {
276 let mut tag_cluster_id = None;
277 let mut tag_replica_state = None;
278 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
279 0 => {
280 tag_cluster_id = Some({
281 let b: &mut &[u8] = payload;
282 if flex {
283 get_compact_nullable_string_owned(b)?
284 } else {
285 get_nullable_string_owned(b)?
286 }
287 });
288 Ok(true)
289 }
290 1 => {
291 tag_replica_state = Some({
292 let b: &mut &[u8] = payload;
293 ReplicaState::decode(b, version)?
294 });
295 Ok(true)
296 }
297 _ => Ok(false),
298 })?;
299 if let Some(v) = tag_cluster_id {
300 out.cluster_id = v;
301 }
302 if let Some(v) = tag_replica_state {
303 out.replica_state = v;
304 }
305 }
306 Ok(out)
307 }
308}
309#[cfg(test)]
310impl FetchRequest {
311 #[must_use]
312 pub fn populated(version: i16) -> Self {
313 let mut m = Self::default();
314 if (0..=14).contains(&version) {
315 m.replica_id = 1i32;
316 }
317 if version >= 0 {
318 m.max_wait_ms = 1i32;
319 }
320 if version >= 0 {
321 m.min_bytes = 1i32;
322 }
323 if version >= 3 {
324 m.max_bytes = 1i32;
325 }
326 if version >= 4 {
327 m.isolation_level = 1i8;
328 }
329 if version >= 7 {
330 m.session_id = 1i32;
331 }
332 if version >= 7 {
333 m.session_epoch = 1i32;
334 }
335 if version >= 0 {
336 m.topics = vec![FetchTopic::populated(version)];
337 }
338 if version >= 7 {
339 m.forgotten_topics_data = vec![ForgottenTopic::populated(version)];
340 }
341 if version >= 11 {
342 m.rack_id = "x".to_string();
343 }
344 if version >= 12 {
345 m.cluster_id = Some("x".to_string());
346 }
347 if version >= 15 {
348 m.replica_state = ReplicaState::populated(version);
349 }
350 m
351 }
352}
353#[derive(Debug, Clone, PartialEq, Eq)]
354pub struct ReplicaState {
355 pub replica_id: i32,
356 pub replica_epoch: i64,
357 pub unknown_tagged_fields: UnknownTaggedFields,
358}
359impl Default for ReplicaState {
360 fn default() -> Self {
361 Self {
362 replica_id: -1i32,
363 replica_epoch: -1i64,
364 unknown_tagged_fields: Default::default(),
365 }
366 }
367}
368impl Encode for ReplicaState {
369 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
370 let flex = version >= 12;
371 if version >= 15 {
372 put_i32(buf, self.replica_id);
373 }
374 if version >= 15 {
375 put_i64(buf, self.replica_epoch);
376 }
377 if flex {
378 let tagged = WriteTaggedFields::new();
379 tagged.write(buf, &self.unknown_tagged_fields);
380 }
381 Ok(())
382 }
383 fn encoded_len(&self, version: i16) -> usize {
384 let flex = version >= 12;
385 let mut n: usize = 0;
386 if version >= 15 {
387 n += 4;
388 }
389 if version >= 15 {
390 n += 8;
391 }
392 if flex {
393 let known_pairs: Vec<(u32, usize)> = Vec::new();
394 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
395 }
396 n
397 }
398}
399impl Decode<'_> for ReplicaState {
400 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
401 let flex = version >= 12;
402 let mut out = Self::default();
403 if version >= 15 {
404 out.replica_id = get_i32(buf)?;
405 }
406 if version >= 15 {
407 out.replica_epoch = get_i64(buf)?;
408 }
409 if flex {
410 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
411 }
412 Ok(out)
413 }
414}
415#[cfg(test)]
416impl ReplicaState {
417 #[must_use]
418 pub fn populated(version: i16) -> Self {
419 let mut m = Self::default();
420 if version >= 15 {
421 m.replica_id = 1i32;
422 }
423 if version >= 15 {
424 m.replica_epoch = 1i64;
425 }
426 m
427 }
428}
429#[derive(Debug, Clone, PartialEq, Eq, Default)]
430pub struct FetchTopic {
431 pub topic: String,
432 pub topic_id: crate::primitives::uuid::Uuid,
433 pub partitions: Vec<FetchPartition>,
434 pub unknown_tagged_fields: UnknownTaggedFields,
435}
436impl Encode for FetchTopic {
437 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
438 let flex = version >= 12;
439 if (0..=12).contains(&version) {
440 if flex {
441 put_compact_string(buf, &self.topic);
442 } else {
443 put_string(buf, &self.topic);
444 }
445 }
446 if version >= 13 {
447 crate::primitives::uuid::put_uuid(buf, self.topic_id);
448 }
449 if version >= 0 {
450 {
451 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
452 for it in &self.partitions {
453 it.encode(buf, version)?;
454 }
455 }
456 }
457 if flex {
458 let tagged = WriteTaggedFields::new();
459 tagged.write(buf, &self.unknown_tagged_fields);
460 }
461 Ok(())
462 }
463 fn encoded_len(&self, version: i16) -> usize {
464 let flex = version >= 12;
465 let mut n: usize = 0;
466 if (0..=12).contains(&version) {
467 n += if flex {
468 compact_string_len(&self.topic)
469 } else {
470 string_len(&self.topic)
471 };
472 }
473 if version >= 13 {
474 n += 16;
475 }
476 if version >= 0 {
477 n += {
478 let prefix =
479 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
480 let body: usize = (self.partitions)
481 .iter()
482 .map(|it| it.encoded_len(version))
483 .sum();
484 prefix + body
485 };
486 }
487 if flex {
488 let known_pairs: Vec<(u32, usize)> = Vec::new();
489 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
490 }
491 n
492 }
493}
494impl Decode<'_> for FetchTopic {
495 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
496 let flex = version >= 12;
497 let mut out = Self::default();
498 if (0..=12).contains(&version) {
499 out.topic = if flex {
500 get_compact_string_owned(buf)?
501 } else {
502 get_string_owned(buf)?
503 };
504 }
505 if version >= 13 {
506 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
507 }
508 if version >= 0 {
509 out.partitions = {
510 let n = crate::primitives::array::get_array_len(buf, flex)?;
511 let mut v = Vec::with_capacity(n);
512 for _ in 0..n {
513 v.push(FetchPartition::decode(buf, version)?);
514 }
515 v
516 };
517 }
518 if flex {
519 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
520 }
521 Ok(out)
522 }
523}
524#[cfg(test)]
525impl FetchTopic {
526 #[must_use]
527 pub fn populated(version: i16) -> Self {
528 let mut m = Self::default();
529 if (0..=12).contains(&version) {
530 m.topic = "x".to_string();
531 }
532 if version >= 13 {
533 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
534 }
535 if version >= 0 {
536 m.partitions = vec![FetchPartition::populated(version)];
537 }
538 m
539 }
540}
541#[derive(Debug, Clone, PartialEq, Eq)]
542pub struct FetchPartition {
543 pub partition: i32,
544 pub current_leader_epoch: i32,
545 pub fetch_offset: i64,
546 pub last_fetched_epoch: i32,
547 pub log_start_offset: i64,
548 pub partition_max_bytes: i32,
549 pub replica_directory_id: crate::primitives::uuid::Uuid,
550 pub high_watermark: i64,
551 pub unknown_tagged_fields: UnknownTaggedFields,
552}
553impl Default for FetchPartition {
554 fn default() -> Self {
555 Self {
556 partition: 0i32,
557 current_leader_epoch: -1i32,
558 fetch_offset: 0i64,
559 last_fetched_epoch: -1i32,
560 log_start_offset: -1i64,
561 partition_max_bytes: 0i32,
562 replica_directory_id: Default::default(),
563 high_watermark: 9_223_372_036_854_775_807i64,
564 unknown_tagged_fields: Default::default(),
565 }
566 }
567}
568impl Encode for FetchPartition {
569 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
570 let flex = version >= 12;
571 if version >= 0 {
572 put_i32(buf, self.partition);
573 }
574 if version >= 9 {
575 put_i32(buf, self.current_leader_epoch);
576 }
577 if version >= 0 {
578 put_i64(buf, self.fetch_offset);
579 }
580 if version >= 12 {
581 put_i32(buf, self.last_fetched_epoch);
582 }
583 if version >= 5 {
584 put_i64(buf, self.log_start_offset);
585 }
586 if version >= 0 {
587 put_i32(buf, self.partition_max_bytes);
588 }
589 if flex {
590 let mut tagged = WriteTaggedFields::new();
591 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
592 let payload = encode_to_bytes(16, |b| {
593 crate::primitives::uuid::put_uuid(b, self.replica_directory_id);
594 Ok(())
595 });
596 tagged.add(0, payload);
597 }
598 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
599 let payload = encode_to_bytes(8, |b| {
600 put_i64(b, self.high_watermark);
601 Ok(())
602 });
603 tagged.add(1, payload);
604 }
605 tagged.write(buf, &self.unknown_tagged_fields);
606 }
607 Ok(())
608 }
609 fn encoded_len(&self, version: i16) -> usize {
610 let flex = version >= 12;
611 let mut n: usize = 0;
612 if version >= 0 {
613 n += 4;
614 }
615 if version >= 9 {
616 n += 4;
617 }
618 if version >= 0 {
619 n += 8;
620 }
621 if version >= 12 {
622 n += 4;
623 }
624 if version >= 5 {
625 n += 8;
626 }
627 if version >= 0 {
628 n += 4;
629 }
630 if flex {
631 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
632 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
633 known_pairs.push((0, 16));
634 }
635 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
636 known_pairs.push((1, 8));
637 }
638 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
639 }
640 n
641 }
642}
643impl Decode<'_> for FetchPartition {
644 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
645 let flex = version >= 12;
646 let mut out = Self::default();
647 if version >= 0 {
648 out.partition = get_i32(buf)?;
649 }
650 if version >= 9 {
651 out.current_leader_epoch = get_i32(buf)?;
652 }
653 if version >= 0 {
654 out.fetch_offset = get_i64(buf)?;
655 }
656 if version >= 12 {
657 out.last_fetched_epoch = get_i32(buf)?;
658 }
659 if version >= 5 {
660 out.log_start_offset = get_i64(buf)?;
661 }
662 if version >= 0 {
663 out.partition_max_bytes = get_i32(buf)?;
664 }
665 if flex {
666 let mut tag_replica_directory_id = None;
667 let mut tag_high_watermark = None;
668 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
669 0 => {
670 tag_replica_directory_id = Some({
671 let b: &mut &[u8] = payload;
672 crate::primitives::uuid::get_uuid(b)?
673 });
674 Ok(true)
675 }
676 1 => {
677 tag_high_watermark = Some({
678 let b: &mut &[u8] = payload;
679 get_i64(b)?
680 });
681 Ok(true)
682 }
683 _ => Ok(false),
684 })?;
685 if let Some(v) = tag_replica_directory_id {
686 out.replica_directory_id = v;
687 }
688 if let Some(v) = tag_high_watermark {
689 out.high_watermark = v;
690 }
691 }
692 Ok(out)
693 }
694}
695#[cfg(test)]
696impl FetchPartition {
697 #[must_use]
698 pub fn populated(version: i16) -> Self {
699 let mut m = Self::default();
700 if version >= 0 {
701 m.partition = 1i32;
702 }
703 if version >= 9 {
704 m.current_leader_epoch = 1i32;
705 }
706 if version >= 0 {
707 m.fetch_offset = 1i64;
708 }
709 if version >= 12 {
710 m.last_fetched_epoch = 1i32;
711 }
712 if version >= 5 {
713 m.log_start_offset = 1i64;
714 }
715 if version >= 0 {
716 m.partition_max_bytes = 1i32;
717 }
718 if version >= 17 {
719 m.replica_directory_id = crate::primitives::uuid::Uuid([1u8; 16]);
720 }
721 if version >= 18 {
722 m.high_watermark = 1i64;
723 }
724 m
725 }
726}
727#[derive(Debug, Clone, PartialEq, Eq, Default)]
728pub struct ForgottenTopic {
729 pub topic: String,
730 pub topic_id: crate::primitives::uuid::Uuid,
731 pub partitions: Vec<i32>,
732 pub unknown_tagged_fields: UnknownTaggedFields,
733}
734impl Encode for ForgottenTopic {
735 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
736 let flex = version >= 12;
737 if (7..=12).contains(&version) {
738 if flex {
739 put_compact_string(buf, &self.topic);
740 } else {
741 put_string(buf, &self.topic);
742 }
743 }
744 if version >= 13 {
745 crate::primitives::uuid::put_uuid(buf, self.topic_id);
746 }
747 if version >= 7 {
748 {
749 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
750 for it in &self.partitions {
751 put_i32(buf, *it);
752 }
753 }
754 }
755 if flex {
756 let tagged = WriteTaggedFields::new();
757 tagged.write(buf, &self.unknown_tagged_fields);
758 }
759 Ok(())
760 }
761 fn encoded_len(&self, version: i16) -> usize {
762 let flex = version >= 12;
763 let mut n: usize = 0;
764 if (7..=12).contains(&version) {
765 n += if flex {
766 compact_string_len(&self.topic)
767 } else {
768 string_len(&self.topic)
769 };
770 }
771 if version >= 13 {
772 n += 16;
773 }
774 if version >= 7 {
775 n += {
776 let prefix =
777 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
778 let body: usize = (self.partitions).iter().map(|_| 4).sum();
779 prefix + body
780 };
781 }
782 if flex {
783 let known_pairs: Vec<(u32, usize)> = Vec::new();
784 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
785 }
786 n
787 }
788}
789impl Decode<'_> for ForgottenTopic {
790 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
791 let flex = version >= 12;
792 let mut out = Self::default();
793 if (7..=12).contains(&version) {
794 out.topic = if flex {
795 get_compact_string_owned(buf)?
796 } else {
797 get_string_owned(buf)?
798 };
799 }
800 if version >= 13 {
801 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
802 }
803 if version >= 7 {
804 out.partitions = {
805 let n = crate::primitives::array::get_array_len(buf, flex)?;
806 let mut v = Vec::with_capacity(n);
807 for _ in 0..n {
808 v.push(get_i32(buf)?);
809 }
810 v
811 };
812 }
813 if flex {
814 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
815 }
816 Ok(out)
817 }
818}
819#[cfg(test)]
820impl ForgottenTopic {
821 #[must_use]
822 pub fn populated(version: i16) -> Self {
823 let mut m = Self::default();
824 if (7..=12).contains(&version) {
825 m.topic = "x".to_string();
826 }
827 if version >= 13 {
828 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
829 }
830 if version >= 7 {
831 m.partitions = vec![1i32];
832 }
833 m
834 }
835}
836#[must_use]
839#[allow(unused_comparisons)]
840pub fn default_json(version: i16) -> ::serde_json::Value {
841 let mut obj = ::serde_json::Map::new();
842 if version >= 12 {
843 obj.insert("clusterId".to_string(), ::serde_json::Value::Null);
844 }
845 if version <= 14 {
846 obj.insert("replicaId".to_string(), ::serde_json::json!(-1));
847 }
848 if version >= 15 {
849 obj.insert("replicaState".to_string(), {
850 let mut m = ::serde_json::Map::new();
851 m.insert("replicaId".to_string(), ::serde_json::json!(-1));
852 m.insert("replicaEpoch".to_string(), ::serde_json::json!(-1));
853 ::serde_json::Value::Object(m)
854 });
855 }
856 obj.insert("maxWaitMs".to_string(), ::serde_json::json!(0));
857 obj.insert("minBytes".to_string(), ::serde_json::json!(0));
858 if version >= 3 {
859 obj.insert("maxBytes".to_string(), ::serde_json::json!(2147483647));
860 }
861 if version >= 4 {
862 obj.insert("isolationLevel".to_string(), ::serde_json::json!(0));
863 }
864 if version >= 7 {
865 obj.insert("sessionId".to_string(), ::serde_json::json!(0));
866 }
867 if version >= 7 {
868 obj.insert("sessionEpoch".to_string(), ::serde_json::json!(-1));
869 }
870 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
871 if version >= 7 {
872 obj.insert(
873 "forgottenTopicsData".to_string(),
874 ::serde_json::Value::Array(vec![]),
875 );
876 }
877 if version >= 11 {
878 obj.insert(
879 "rackId".to_string(),
880 ::serde_json::Value::String(String::new()),
881 );
882 }
883 ::serde_json::Value::Object(obj)
884}
885impl crate::ProtocolRequest for FetchRequest {
886 const API_KEY: i16 = API_KEY;
887 const MIN_VERSION: i16 = MIN_VERSION;
888 const MAX_VERSION: i16 = MAX_VERSION;
889 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
890 type Response = super::fetch_response::FetchResponse;
891}