1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i8, get_i32, get_i64, put_i8, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{
12 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
13};
14use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
15
16pub const API_KEY: i16 = 1;
17pub const MIN_VERSION: i16 = 4;
18pub const MAX_VERSION: i16 = 18;
19pub const FLEXIBLE_MIN: i16 = 12;
20
21#[inline]
22fn is_flexible(version: i16) -> bool {
23 version >= FLEXIBLE_MIN
24}
25
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct FetchRequest {
28 pub replica_id: i32,
29 pub max_wait_ms: i32,
30 pub min_bytes: i32,
31 pub max_bytes: i32,
32 pub isolation_level: i8,
33 pub session_id: i32,
34 pub session_epoch: i32,
35 pub topics: Vec<FetchTopic>,
36 pub forgotten_topics_data: Vec<ForgottenTopic>,
37 pub rack_id: String,
38 pub cluster_id: Option<String>,
39 pub replica_state: ReplicaState,
40 pub unknown_tagged_fields: UnknownTaggedFields,
41}
42impl Default for FetchRequest {
43 fn default() -> Self {
44 Self {
45 replica_id: -1i32,
46 max_wait_ms: 0i32,
47 min_bytes: 0i32,
48 max_bytes: 2_147_483_647i32,
49 isolation_level: 0i8,
50 session_id: 0i32,
51 session_epoch: -1i32,
52 topics: Vec::new(),
53 forgotten_topics_data: Vec::new(),
54 rack_id: String::new(),
55 cluster_id: None,
56 replica_state: Default::default(),
57 unknown_tagged_fields: Default::default(),
58 }
59 }
60}
61impl Encode for FetchRequest {
62 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
63 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
64 return Err(ProtocolError::UnsupportedVersion {
65 api_key: API_KEY,
66 version,
67 });
68 }
69 let flex = is_flexible(version);
70 if (0..=14).contains(&version) {
71 put_i32(buf, self.replica_id);
72 }
73 if version >= 0 {
74 put_i32(buf, self.max_wait_ms);
75 }
76 if version >= 0 {
77 put_i32(buf, self.min_bytes);
78 }
79 if version >= 3 {
80 put_i32(buf, self.max_bytes);
81 }
82 if version >= 4 {
83 put_i8(buf, self.isolation_level);
84 }
85 if version >= 7 {
86 put_i32(buf, self.session_id);
87 }
88 if version >= 7 {
89 put_i32(buf, self.session_epoch);
90 }
91 if version >= 0 {
92 {
93 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
94 for it in &self.topics {
95 it.encode(buf, version)?;
96 }
97 }
98 }
99 if version >= 7 {
100 {
101 crate::primitives::array::put_array_len(
102 buf,
103 (self.forgotten_topics_data).len(),
104 flex,
105 );
106 for it in &self.forgotten_topics_data {
107 it.encode(buf, version)?;
108 }
109 }
110 }
111 if version >= 11 {
112 if flex {
113 put_compact_string(buf, &self.rack_id);
114 } else {
115 put_string(buf, &self.rack_id);
116 }
117 }
118 if flex {
119 let mut tagged = WriteTaggedFields::new();
120 if !(self.cluster_id.is_none()) {
121 let payload = encode_to_bytes(
122 if flex {
123 compact_nullable_string_len(self.cluster_id.as_deref())
124 } else {
125 nullable_string_len(self.cluster_id.as_deref())
126 },
127 |b| {
128 if flex {
129 put_compact_nullable_string(b, self.cluster_id.as_deref());
130 } else {
131 put_nullable_string(b, self.cluster_id.as_deref());
132 }
133 Ok(())
134 },
135 );
136 tagged.add(0, payload);
137 }
138 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
139 let payload = encode_to_bytes(self.replica_state.encoded_len(version), |b| {
140 self.replica_state.encode(b, version)?;
141 Ok(())
142 });
143 tagged.add(1, payload);
144 }
145 tagged.write(buf, &self.unknown_tagged_fields);
146 }
147 Ok(())
148 }
149 fn encoded_len(&self, version: i16) -> usize {
150 let flex = is_flexible(version);
151 let mut n: usize = 0;
152 if (0..=14).contains(&version) {
153 n += 4;
154 }
155 if version >= 0 {
156 n += 4;
157 }
158 if version >= 0 {
159 n += 4;
160 }
161 if version >= 3 {
162 n += 4;
163 }
164 if version >= 4 {
165 n += 1;
166 }
167 if version >= 7 {
168 n += 4;
169 }
170 if version >= 7 {
171 n += 4;
172 }
173 if version >= 0 {
174 n += {
175 let prefix =
176 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
177 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
178 prefix + body
179 };
180 }
181 if version >= 7 {
182 n += {
183 let prefix = crate::primitives::array::array_len_prefix_len(
184 (self.forgotten_topics_data).len(),
185 flex,
186 );
187 let body: usize = (self.forgotten_topics_data)
188 .iter()
189 .map(|it| it.encoded_len(version))
190 .sum();
191 prefix + body
192 };
193 }
194 if version >= 11 {
195 n += if flex {
196 compact_string_len(&self.rack_id)
197 } else {
198 string_len(&self.rack_id)
199 };
200 }
201 if flex {
202 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
203 if !(self.cluster_id.is_none()) {
204 known_pairs.push((
205 0,
206 if flex {
207 compact_nullable_string_len(self.cluster_id.as_deref())
208 } else {
209 nullable_string_len(self.cluster_id.as_deref())
210 },
211 ));
212 }
213 if !(crate::codegen_helpers::is_default(&self.replica_state)) {
214 known_pairs.push((1, self.replica_state.encoded_len(version)));
215 }
216 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
217 }
218 n
219 }
220}
221impl Decode<'_> for FetchRequest {
222 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
223 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
224 return Err(ProtocolError::UnsupportedVersion {
225 api_key: API_KEY,
226 version,
227 });
228 }
229 let flex = is_flexible(version);
230 let mut out = Self::default();
231 if (0..=14).contains(&version) {
232 out.replica_id = get_i32(buf)?;
233 }
234 if version >= 0 {
235 out.max_wait_ms = get_i32(buf)?;
236 }
237 if version >= 0 {
238 out.min_bytes = get_i32(buf)?;
239 }
240 if version >= 3 {
241 out.max_bytes = get_i32(buf)?;
242 }
243 if version >= 4 {
244 out.isolation_level = get_i8(buf)?;
245 }
246 if version >= 7 {
247 out.session_id = get_i32(buf)?;
248 }
249 if version >= 7 {
250 out.session_epoch = get_i32(buf)?;
251 }
252 if version >= 0 {
253 out.topics = {
254 let n = crate::primitives::array::get_array_len(buf, flex)?;
255 let mut v = Vec::with_capacity(n);
256 for _ in 0..n {
257 v.push(FetchTopic::decode(buf, version)?);
258 }
259 v
260 };
261 }
262 if version >= 7 {
263 out.forgotten_topics_data = {
264 let n = crate::primitives::array::get_array_len(buf, flex)?;
265 let mut v = Vec::with_capacity(n);
266 for _ in 0..n {
267 v.push(ForgottenTopic::decode(buf, version)?);
268 }
269 v
270 };
271 }
272 if version >= 11 {
273 out.rack_id = if flex {
274 get_compact_string_owned(buf)?
275 } else {
276 get_string_owned(buf)?
277 };
278 }
279 if flex {
280 let mut tag_cluster_id = None;
281 let mut tag_replica_state = None;
282 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
283 0 => {
284 tag_cluster_id = Some({
285 let b: &mut &[u8] = payload;
286 if flex {
287 get_compact_nullable_string_owned(b)?
288 } else {
289 get_nullable_string_owned(b)?
290 }
291 });
292 Ok(true)
293 }
294 1 => {
295 tag_replica_state = Some({
296 let b: &mut &[u8] = payload;
297 ReplicaState::decode(b, version)?
298 });
299 Ok(true)
300 }
301 _ => Ok(false),
302 })?;
303 if let Some(v) = tag_cluster_id {
304 out.cluster_id = v;
305 }
306 if let Some(v) = tag_replica_state {
307 out.replica_state = v;
308 }
309 }
310 Ok(out)
311 }
312}
313#[cfg(test)]
314impl FetchRequest {
315 #[must_use]
316 pub fn populated(version: i16) -> Self {
317 let mut m = Self::default();
318 if (0..=14).contains(&version) {
319 m.replica_id = 1i32;
320 }
321 if version >= 0 {
322 m.max_wait_ms = 1i32;
323 }
324 if version >= 0 {
325 m.min_bytes = 1i32;
326 }
327 if version >= 3 {
328 m.max_bytes = 1i32;
329 }
330 if version >= 4 {
331 m.isolation_level = 1i8;
332 }
333 if version >= 7 {
334 m.session_id = 1i32;
335 }
336 if version >= 7 {
337 m.session_epoch = 1i32;
338 }
339 if version >= 0 {
340 m.topics = vec![FetchTopic::populated(version)];
341 }
342 if version >= 7 {
343 m.forgotten_topics_data = vec![ForgottenTopic::populated(version)];
344 }
345 if version >= 11 {
346 m.rack_id = "x".to_string();
347 }
348 if version >= 12 {
349 m.cluster_id = Some("x".to_string());
350 }
351 if version >= 15 {
352 m.replica_state = ReplicaState::populated(version);
353 }
354 m
355 }
356}
357#[derive(Debug, Clone, PartialEq, Eq)]
358pub struct ReplicaState {
359 pub replica_id: i32,
360 pub replica_epoch: i64,
361 pub unknown_tagged_fields: UnknownTaggedFields,
362}
363impl Default for ReplicaState {
364 fn default() -> Self {
365 Self {
366 replica_id: -1i32,
367 replica_epoch: -1i64,
368 unknown_tagged_fields: Default::default(),
369 }
370 }
371}
372impl Encode for ReplicaState {
373 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
374 let flex = version >= 12;
375 if version >= 15 {
376 put_i32(buf, self.replica_id);
377 }
378 if version >= 15 {
379 put_i64(buf, self.replica_epoch);
380 }
381 if flex {
382 let tagged = WriteTaggedFields::new();
383 tagged.write(buf, &self.unknown_tagged_fields);
384 }
385 Ok(())
386 }
387 fn encoded_len(&self, version: i16) -> usize {
388 let flex = version >= 12;
389 let mut n: usize = 0;
390 if version >= 15 {
391 n += 4;
392 }
393 if version >= 15 {
394 n += 8;
395 }
396 if flex {
397 let known_pairs: Vec<(u32, usize)> = Vec::new();
398 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
399 }
400 n
401 }
402}
403impl Decode<'_> for ReplicaState {
404 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
405 let flex = version >= 12;
406 let mut out = Self::default();
407 if version >= 15 {
408 out.replica_id = get_i32(buf)?;
409 }
410 if version >= 15 {
411 out.replica_epoch = get_i64(buf)?;
412 }
413 if flex {
414 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
415 }
416 Ok(out)
417 }
418}
419#[cfg(test)]
420impl ReplicaState {
421 #[must_use]
422 pub fn populated(version: i16) -> Self {
423 let mut m = Self::default();
424 if version >= 15 {
425 m.replica_id = 1i32;
426 }
427 if version >= 15 {
428 m.replica_epoch = 1i64;
429 }
430 m
431 }
432}
433#[derive(Debug, Clone, PartialEq, Eq, Default)]
434pub struct FetchTopic {
435 pub topic: String,
436 pub topic_id: crate::primitives::uuid::Uuid,
437 pub partitions: Vec<FetchPartition>,
438 pub unknown_tagged_fields: UnknownTaggedFields,
439}
440impl Encode for FetchTopic {
441 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
442 let flex = version >= 12;
443 if (0..=12).contains(&version) {
444 if flex {
445 put_compact_string(buf, &self.topic);
446 } else {
447 put_string(buf, &self.topic);
448 }
449 }
450 if version >= 13 {
451 crate::primitives::uuid::put_uuid(buf, self.topic_id);
452 }
453 if version >= 0 {
454 {
455 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
456 for it in &self.partitions {
457 it.encode(buf, version)?;
458 }
459 }
460 }
461 if flex {
462 let tagged = WriteTaggedFields::new();
463 tagged.write(buf, &self.unknown_tagged_fields);
464 }
465 Ok(())
466 }
467 fn encoded_len(&self, version: i16) -> usize {
468 let flex = version >= 12;
469 let mut n: usize = 0;
470 if (0..=12).contains(&version) {
471 n += if flex {
472 compact_string_len(&self.topic)
473 } else {
474 string_len(&self.topic)
475 };
476 }
477 if version >= 13 {
478 n += 16;
479 }
480 if version >= 0 {
481 n += {
482 let prefix =
483 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
484 let body: usize = (self.partitions)
485 .iter()
486 .map(|it| it.encoded_len(version))
487 .sum();
488 prefix + body
489 };
490 }
491 if flex {
492 let known_pairs: Vec<(u32, usize)> = Vec::new();
493 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
494 }
495 n
496 }
497}
498impl Decode<'_> for FetchTopic {
499 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
500 let flex = version >= 12;
501 let mut out = Self::default();
502 if (0..=12).contains(&version) {
503 out.topic = if flex {
504 get_compact_string_owned(buf)?
505 } else {
506 get_string_owned(buf)?
507 };
508 }
509 if version >= 13 {
510 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
511 }
512 if version >= 0 {
513 out.partitions = {
514 let n = crate::primitives::array::get_array_len(buf, flex)?;
515 let mut v = Vec::with_capacity(n);
516 for _ in 0..n {
517 v.push(FetchPartition::decode(buf, version)?);
518 }
519 v
520 };
521 }
522 if flex {
523 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
524 }
525 Ok(out)
526 }
527}
528#[cfg(test)]
529impl FetchTopic {
530 #[must_use]
531 pub fn populated(version: i16) -> Self {
532 let mut m = Self::default();
533 if (0..=12).contains(&version) {
534 m.topic = "x".to_string();
535 }
536 if version >= 13 {
537 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
538 }
539 if version >= 0 {
540 m.partitions = vec![FetchPartition::populated(version)];
541 }
542 m
543 }
544}
545#[derive(Debug, Clone, PartialEq, Eq)]
546pub struct FetchPartition {
547 pub partition: i32,
548 pub current_leader_epoch: i32,
549 pub fetch_offset: i64,
550 pub last_fetched_epoch: i32,
551 pub log_start_offset: i64,
552 pub partition_max_bytes: i32,
553 pub replica_directory_id: crate::primitives::uuid::Uuid,
554 pub high_watermark: i64,
555 pub unknown_tagged_fields: UnknownTaggedFields,
556}
557impl Default for FetchPartition {
558 fn default() -> Self {
559 Self {
560 partition: 0i32,
561 current_leader_epoch: -1i32,
562 fetch_offset: 0i64,
563 last_fetched_epoch: -1i32,
564 log_start_offset: -1i64,
565 partition_max_bytes: 0i32,
566 replica_directory_id: Default::default(),
567 high_watermark: 9_223_372_036_854_775_807i64,
568 unknown_tagged_fields: Default::default(),
569 }
570 }
571}
572impl Encode for FetchPartition {
573 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
574 let flex = version >= 12;
575 if version >= 0 {
576 put_i32(buf, self.partition);
577 }
578 if version >= 9 {
579 put_i32(buf, self.current_leader_epoch);
580 }
581 if version >= 0 {
582 put_i64(buf, self.fetch_offset);
583 }
584 if version >= 12 {
585 put_i32(buf, self.last_fetched_epoch);
586 }
587 if version >= 5 {
588 put_i64(buf, self.log_start_offset);
589 }
590 if version >= 0 {
591 put_i32(buf, self.partition_max_bytes);
592 }
593 if flex {
594 let mut tagged = WriteTaggedFields::new();
595 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
596 let payload = encode_to_bytes(16, |b| {
597 crate::primitives::uuid::put_uuid(b, self.replica_directory_id);
598 Ok(())
599 });
600 tagged.add(0, payload);
601 }
602 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
603 let payload = encode_to_bytes(8, |b| {
604 put_i64(b, self.high_watermark);
605 Ok(())
606 });
607 tagged.add(1, payload);
608 }
609 tagged.write(buf, &self.unknown_tagged_fields);
610 }
611 Ok(())
612 }
613 fn encoded_len(&self, version: i16) -> usize {
614 let flex = version >= 12;
615 let mut n: usize = 0;
616 if version >= 0 {
617 n += 4;
618 }
619 if version >= 9 {
620 n += 4;
621 }
622 if version >= 0 {
623 n += 8;
624 }
625 if version >= 12 {
626 n += 4;
627 }
628 if version >= 5 {
629 n += 8;
630 }
631 if version >= 0 {
632 n += 4;
633 }
634 if flex {
635 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
636 if !(crate::codegen_helpers::is_default(&self.replica_directory_id)) {
637 known_pairs.push((0, 16));
638 }
639 if !(self.high_watermark == 9_223_372_036_854_775_807i64) {
640 known_pairs.push((1, 8));
641 }
642 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
643 }
644 n
645 }
646}
647impl Decode<'_> for FetchPartition {
648 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
649 let flex = version >= 12;
650 let mut out = Self::default();
651 if version >= 0 {
652 out.partition = get_i32(buf)?;
653 }
654 if version >= 9 {
655 out.current_leader_epoch = get_i32(buf)?;
656 }
657 if version >= 0 {
658 out.fetch_offset = get_i64(buf)?;
659 }
660 if version >= 12 {
661 out.last_fetched_epoch = get_i32(buf)?;
662 }
663 if version >= 5 {
664 out.log_start_offset = get_i64(buf)?;
665 }
666 if version >= 0 {
667 out.partition_max_bytes = get_i32(buf)?;
668 }
669 if flex {
670 let mut tag_replica_directory_id = None;
671 let mut tag_high_watermark = None;
672 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
673 0 => {
674 tag_replica_directory_id = Some({
675 let b: &mut &[u8] = payload;
676 crate::primitives::uuid::get_uuid(b)?
677 });
678 Ok(true)
679 }
680 1 => {
681 tag_high_watermark = Some({
682 let b: &mut &[u8] = payload;
683 get_i64(b)?
684 });
685 Ok(true)
686 }
687 _ => Ok(false),
688 })?;
689 if let Some(v) = tag_replica_directory_id {
690 out.replica_directory_id = v;
691 }
692 if let Some(v) = tag_high_watermark {
693 out.high_watermark = v;
694 }
695 }
696 Ok(out)
697 }
698}
699#[cfg(test)]
700impl FetchPartition {
701 #[must_use]
702 pub fn populated(version: i16) -> Self {
703 let mut m = Self::default();
704 if version >= 0 {
705 m.partition = 1i32;
706 }
707 if version >= 9 {
708 m.current_leader_epoch = 1i32;
709 }
710 if version >= 0 {
711 m.fetch_offset = 1i64;
712 }
713 if version >= 12 {
714 m.last_fetched_epoch = 1i32;
715 }
716 if version >= 5 {
717 m.log_start_offset = 1i64;
718 }
719 if version >= 0 {
720 m.partition_max_bytes = 1i32;
721 }
722 if version >= 17 {
723 m.replica_directory_id = crate::primitives::uuid::Uuid([1u8; 16]);
724 }
725 if version >= 18 {
726 m.high_watermark = 1i64;
727 }
728 m
729 }
730}
731#[derive(Debug, Clone, PartialEq, Eq, Default)]
732pub struct ForgottenTopic {
733 pub topic: String,
734 pub topic_id: crate::primitives::uuid::Uuid,
735 pub partitions: Vec<i32>,
736 pub unknown_tagged_fields: UnknownTaggedFields,
737}
738impl Encode for ForgottenTopic {
739 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
740 let flex = version >= 12;
741 if (7..=12).contains(&version) {
742 if flex {
743 put_compact_string(buf, &self.topic);
744 } else {
745 put_string(buf, &self.topic);
746 }
747 }
748 if version >= 13 {
749 crate::primitives::uuid::put_uuid(buf, self.topic_id);
750 }
751 if version >= 7 {
752 {
753 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
754 for it in &self.partitions {
755 put_i32(buf, *it);
756 }
757 }
758 }
759 if flex {
760 let tagged = WriteTaggedFields::new();
761 tagged.write(buf, &self.unknown_tagged_fields);
762 }
763 Ok(())
764 }
765 fn encoded_len(&self, version: i16) -> usize {
766 let flex = version >= 12;
767 let mut n: usize = 0;
768 if (7..=12).contains(&version) {
769 n += if flex {
770 compact_string_len(&self.topic)
771 } else {
772 string_len(&self.topic)
773 };
774 }
775 if version >= 13 {
776 n += 16;
777 }
778 if version >= 7 {
779 n += {
780 let prefix =
781 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
782 let body: usize = (self.partitions).iter().map(|_| 4).sum();
783 prefix + body
784 };
785 }
786 if flex {
787 let known_pairs: Vec<(u32, usize)> = Vec::new();
788 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
789 }
790 n
791 }
792}
793impl Decode<'_> for ForgottenTopic {
794 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
795 let flex = version >= 12;
796 let mut out = Self::default();
797 if (7..=12).contains(&version) {
798 out.topic = if flex {
799 get_compact_string_owned(buf)?
800 } else {
801 get_string_owned(buf)?
802 };
803 }
804 if version >= 13 {
805 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
806 }
807 if version >= 7 {
808 out.partitions = {
809 let n = crate::primitives::array::get_array_len(buf, flex)?;
810 let mut v = Vec::with_capacity(n);
811 for _ in 0..n {
812 v.push(get_i32(buf)?);
813 }
814 v
815 };
816 }
817 if flex {
818 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
819 }
820 Ok(out)
821 }
822}
823#[cfg(test)]
824impl ForgottenTopic {
825 #[must_use]
826 pub fn populated(version: i16) -> Self {
827 let mut m = Self::default();
828 if (7..=12).contains(&version) {
829 m.topic = "x".to_string();
830 }
831 if version >= 13 {
832 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
833 }
834 if version >= 7 {
835 m.partitions = vec![1i32];
836 }
837 m
838 }
839}
840
841#[must_use]
844#[allow(unused_comparisons)]
845pub fn default_json(version: i16) -> ::serde_json::Value {
846 let mut obj = ::serde_json::Map::new();
847 if version >= 12 {
848 obj.insert("clusterId".to_string(), ::serde_json::Value::Null);
849 }
850 if version <= 14 {
851 obj.insert("replicaId".to_string(), ::serde_json::json!(-1));
852 }
853 if version >= 15 {
854 obj.insert("replicaState".to_string(), {
855 let mut m = ::serde_json::Map::new();
856 m.insert("replicaId".to_string(), ::serde_json::json!(-1));
857 m.insert("replicaEpoch".to_string(), ::serde_json::json!(-1));
858 ::serde_json::Value::Object(m)
859 });
860 }
861 obj.insert("maxWaitMs".to_string(), ::serde_json::json!(0));
862 obj.insert("minBytes".to_string(), ::serde_json::json!(0));
863 if version >= 3 {
864 obj.insert("maxBytes".to_string(), ::serde_json::json!(2147483647));
865 }
866 if version >= 4 {
867 obj.insert("isolationLevel".to_string(), ::serde_json::json!(0));
868 }
869 if version >= 7 {
870 obj.insert("sessionId".to_string(), ::serde_json::json!(0));
871 }
872 if version >= 7 {
873 obj.insert("sessionEpoch".to_string(), ::serde_json::json!(-1));
874 }
875 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
876 if version >= 7 {
877 obj.insert(
878 "forgottenTopicsData".to_string(),
879 ::serde_json::Value::Array(vec![]),
880 );
881 }
882 if version >= 11 {
883 obj.insert(
884 "rackId".to_string(),
885 ::serde_json::Value::String(String::new()),
886 );
887 }
888 ::serde_json::Value::Object(obj)
889}
890
891impl crate::ProtocolRequest for FetchRequest {
892 const API_KEY: i16 = API_KEY;
893 const MIN_VERSION: i16 = MIN_VERSION;
894 const MAX_VERSION: i16 = MAX_VERSION;
895 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
896 type Response = super::fetch_response::FetchResponse;
897}