1use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
4use crate::primitives::string_bytes::{
5 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
6 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
7 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
8};
9use crate::primitives::string_bytes::{
10 get_compact_nullable_bytes_owned, get_nullable_bytes_owned, put_bytes, put_compact_bytes,
11 put_compact_nullable_bytes, put_nullable_bytes,
12};
13use crate::tagged_fields::{
14 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
15};
16use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
17use bytes::{Buf, BufMut};
18pub const API_KEY: i16 = 1;
19pub const MIN_VERSION: i16 = 4;
20pub const MAX_VERSION: i16 = 18;
21pub const FLEXIBLE_MIN: i16 = 12;
22#[inline]
23fn is_flexible(version: i16) -> bool {
24 version >= FLEXIBLE_MIN
25}
26#[derive(Debug, Clone, PartialEq, Eq, Default)]
27pub struct FetchResponse {
28 pub throttle_time_ms: i32,
29 pub error_code: i16,
30 pub session_id: i32,
31 pub responses: Vec<FetchableTopicResponse>,
32 pub node_endpoints: Vec<NodeEndpoint>,
33 pub unknown_tagged_fields: UnknownTaggedFields,
34}
35impl Encode for FetchResponse {
36 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
37 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
38 return Err(ProtocolError::UnsupportedVersion {
39 api_key: API_KEY,
40 version,
41 });
42 }
43 let flex = is_flexible(version);
44 if version >= 1 {
45 put_i32(buf, self.throttle_time_ms);
46 }
47 if version >= 7 {
48 put_i16(buf, self.error_code);
49 }
50 if version >= 7 {
51 put_i32(buf, self.session_id);
52 }
53 if version >= 0 {
54 {
55 crate::primitives::array::put_array_len(buf, (self.responses).len(), flex);
56 for it in &self.responses {
57 it.encode(buf, version)?;
58 }
59 }
60 }
61 if flex {
62 let mut tagged = WriteTaggedFields::new();
63 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
64 let payload = encode_to_bytes(
65 {
66 let prefix = crate::primitives::array::array_len_prefix_len(
67 (self.node_endpoints).len(),
68 flex,
69 );
70 let body: usize = (self.node_endpoints)
71 .iter()
72 .map(|it| it.encoded_len(version))
73 .sum();
74 prefix + body
75 },
76 |b| {
77 {
78 crate::primitives::array::put_array_len(
79 b,
80 (self.node_endpoints).len(),
81 flex,
82 );
83 for it in &self.node_endpoints {
84 it.encode(b, version)?;
85 }
86 };
87 Ok(())
88 },
89 );
90 tagged.add(0, payload);
91 }
92 tagged.write(buf, &self.unknown_tagged_fields);
93 }
94 Ok(())
95 }
96 fn encoded_len(&self, version: i16) -> usize {
97 let flex = is_flexible(version);
98 let mut n: usize = 0;
99 if version >= 1 {
100 n += 4;
101 }
102 if version >= 7 {
103 n += 2;
104 }
105 if version >= 7 {
106 n += 4;
107 }
108 if version >= 0 {
109 n += {
110 let prefix =
111 crate::primitives::array::array_len_prefix_len((self.responses).len(), flex);
112 let body: usize = (self.responses)
113 .iter()
114 .map(|it| it.encoded_len(version))
115 .sum();
116 prefix + body
117 };
118 }
119 if flex {
120 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
121 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
122 known_pairs.push((0, {
123 let prefix = crate::primitives::array::array_len_prefix_len(
124 (self.node_endpoints).len(),
125 flex,
126 );
127 let body: usize = (self.node_endpoints)
128 .iter()
129 .map(|it| it.encoded_len(version))
130 .sum();
131 prefix + body
132 }));
133 }
134 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
135 }
136 n
137 }
138}
139impl Decode<'_> for FetchResponse {
140 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
141 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
142 return Err(ProtocolError::UnsupportedVersion {
143 api_key: API_KEY,
144 version,
145 });
146 }
147 let flex = is_flexible(version);
148 let mut out = Self::default();
149 if version >= 1 {
150 out.throttle_time_ms = get_i32(buf)?;
151 }
152 if version >= 7 {
153 out.error_code = get_i16(buf)?;
154 }
155 if version >= 7 {
156 out.session_id = get_i32(buf)?;
157 }
158 if version >= 0 {
159 out.responses = {
160 let n = crate::primitives::array::get_array_len(buf, flex)?;
161 let mut v = Vec::with_capacity(n);
162 for _ in 0..n {
163 v.push(FetchableTopicResponse::decode(buf, version)?);
164 }
165 v
166 };
167 }
168 if flex {
169 let mut tag_node_endpoints = None;
170 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
171 0 => {
172 tag_node_endpoints = Some({
173 let b: &mut &[u8] = payload;
174 {
175 let n = crate::primitives::array::get_array_len(b, flex)?;
176 let mut v = Vec::with_capacity(n);
177 for _ in 0..n {
178 v.push(NodeEndpoint::decode(b, version)?);
179 }
180 v
181 }
182 });
183 Ok(true)
184 }
185 _ => Ok(false),
186 })?;
187 if let Some(v) = tag_node_endpoints {
188 out.node_endpoints = v;
189 }
190 }
191 Ok(out)
192 }
193}
194#[cfg(test)]
195impl FetchResponse {
196 #[must_use]
197 pub fn populated(version: i16) -> Self {
198 let mut m = Self::default();
199 if version >= 1 {
200 m.throttle_time_ms = 1i32;
201 }
202 if version >= 7 {
203 m.error_code = 1i16;
204 }
205 if version >= 7 {
206 m.session_id = 1i32;
207 }
208 if version >= 0 {
209 m.responses = vec![FetchableTopicResponse::populated(version)];
210 }
211 if version >= 16 {
212 m.node_endpoints = vec![NodeEndpoint::populated(version)];
213 }
214 m
215 }
216}
217#[derive(Debug, Clone, PartialEq, Eq, Default)]
218pub struct FetchableTopicResponse {
219 pub topic: String,
220 pub topic_id: crate::primitives::uuid::Uuid,
221 pub partitions: Vec<PartitionData>,
222 pub unknown_tagged_fields: UnknownTaggedFields,
223}
224impl Encode for FetchableTopicResponse {
225 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
226 let flex = version >= 12;
227 if (0..=12).contains(&version) {
228 if flex {
229 put_compact_string(buf, &self.topic);
230 } else {
231 put_string(buf, &self.topic);
232 }
233 }
234 if version >= 13 {
235 crate::primitives::uuid::put_uuid(buf, self.topic_id);
236 }
237 if version >= 0 {
238 {
239 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
240 for it in &self.partitions {
241 it.encode(buf, version)?;
242 }
243 }
244 }
245 if flex {
246 let tagged = WriteTaggedFields::new();
247 tagged.write(buf, &self.unknown_tagged_fields);
248 }
249 Ok(())
250 }
251 fn encoded_len(&self, version: i16) -> usize {
252 let flex = version >= 12;
253 let mut n: usize = 0;
254 if (0..=12).contains(&version) {
255 n += if flex {
256 compact_string_len(&self.topic)
257 } else {
258 string_len(&self.topic)
259 };
260 }
261 if version >= 13 {
262 n += 16;
263 }
264 if version >= 0 {
265 n += {
266 let prefix =
267 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
268 let body: usize = (self.partitions)
269 .iter()
270 .map(|it| it.encoded_len(version))
271 .sum();
272 prefix + body
273 };
274 }
275 if flex {
276 let known_pairs: Vec<(u32, usize)> = Vec::new();
277 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
278 }
279 n
280 }
281}
282impl Decode<'_> for FetchableTopicResponse {
283 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
284 let flex = version >= 12;
285 let mut out = Self::default();
286 if (0..=12).contains(&version) {
287 out.topic = if flex {
288 get_compact_string_owned(buf)?
289 } else {
290 get_string_owned(buf)?
291 };
292 }
293 if version >= 13 {
294 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
295 }
296 if version >= 0 {
297 out.partitions = {
298 let n = crate::primitives::array::get_array_len(buf, flex)?;
299 let mut v = Vec::with_capacity(n);
300 for _ in 0..n {
301 v.push(PartitionData::decode(buf, version)?);
302 }
303 v
304 };
305 }
306 if flex {
307 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
308 }
309 Ok(out)
310 }
311}
312#[cfg(test)]
313impl FetchableTopicResponse {
314 #[must_use]
315 pub fn populated(version: i16) -> Self {
316 let mut m = Self::default();
317 if (0..=12).contains(&version) {
318 m.topic = "x".to_string();
319 }
320 if version >= 13 {
321 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
322 }
323 if version >= 0 {
324 m.partitions = vec![PartitionData::populated(version)];
325 }
326 m
327 }
328}
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct PartitionData {
331 pub partition_index: i32,
332 pub error_code: i16,
333 pub high_watermark: i64,
334 pub last_stable_offset: i64,
335 pub log_start_offset: i64,
336 pub aborted_transactions: Option<Vec<AbortedTransaction>>,
337 pub preferred_read_replica: i32,
338 pub records: Option<crate::records::RecordsPayload>,
339 pub diverging_epoch: EpochEndOffset,
340 pub current_leader: LeaderIdAndEpoch,
341 pub snapshot_id: SnapshotId,
342 pub unknown_tagged_fields: UnknownTaggedFields,
343}
344impl Default for PartitionData {
345 fn default() -> Self {
346 Self {
347 partition_index: 0i32,
348 error_code: 0i16,
349 high_watermark: 0i64,
350 last_stable_offset: -1i64,
351 log_start_offset: -1i64,
352 aborted_transactions: None,
353 preferred_read_replica: -1i32,
354 records: None,
355 diverging_epoch: Default::default(),
356 current_leader: Default::default(),
357 snapshot_id: Default::default(),
358 unknown_tagged_fields: Default::default(),
359 }
360 }
361}
362impl Encode for PartitionData {
363 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
364 let flex = version >= 12;
365 if version >= 0 {
366 put_i32(buf, self.partition_index);
367 }
368 if version >= 0 {
369 put_i16(buf, self.error_code);
370 }
371 if version >= 0 {
372 put_i64(buf, self.high_watermark);
373 }
374 if version >= 4 {
375 put_i64(buf, self.last_stable_offset);
376 }
377 if version >= 5 {
378 put_i64(buf, self.log_start_offset);
379 }
380 if version >= 4 {
381 {
382 let len = (self.aborted_transactions).as_ref().map(Vec::len);
383 crate::primitives::array::put_nullable_array_len(buf, len, flex);
384 if let Some(v) = &self.aborted_transactions {
385 for it in v {
386 it.encode(buf, version)?;
387 }
388 }
389 }
390 }
391 if version >= 11 {
392 put_i32(buf, self.preferred_read_replica);
393 }
394 if version >= 0 {
395 match &self.records {
396 None => {
397 if flex {
398 put_compact_nullable_bytes(buf, None);
399 } else {
400 put_nullable_bytes(buf, None);
401 }
402 }
403 Some(__rb) => {
404 let mut __rb_buf = bytes::BytesMut::new();
405 <crate::records::RecordsPayload as crate::Encode>::encode(
406 __rb,
407 &mut __rb_buf,
408 version,
409 )?;
410 if flex {
411 put_compact_bytes(buf, &__rb_buf);
412 } else {
413 put_bytes(buf, &__rb_buf);
414 }
415 }
416 }
417 }
418 if flex {
419 let mut tagged = WriteTaggedFields::new();
420 if !(crate::codegen_helpers::is_default(&self.diverging_epoch)) {
421 let payload = encode_to_bytes(self.diverging_epoch.encoded_len(version), |b| {
422 self.diverging_epoch.encode(b, version)?;
423 Ok(())
424 });
425 tagged.add(0, payload);
426 }
427 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
428 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| {
429 self.current_leader.encode(b, version)?;
430 Ok(())
431 });
432 tagged.add(1, payload);
433 }
434 if !(crate::codegen_helpers::is_default(&self.snapshot_id)) {
435 let payload = encode_to_bytes(self.snapshot_id.encoded_len(version), |b| {
436 self.snapshot_id.encode(b, version)?;
437 Ok(())
438 });
439 tagged.add(2, payload);
440 }
441 tagged.write(buf, &self.unknown_tagged_fields);
442 }
443 Ok(())
444 }
445 fn encoded_len(&self, version: i16) -> usize {
446 let flex = version >= 12;
447 let mut n: usize = 0;
448 if version >= 0 {
449 n += 4;
450 }
451 if version >= 0 {
452 n += 2;
453 }
454 if version >= 0 {
455 n += 8;
456 }
457 if version >= 4 {
458 n += 8;
459 }
460 if version >= 5 {
461 n += 8;
462 }
463 if version >= 4 {
464 n += {
465 let opt: Option<&Vec<_>> = (self.aborted_transactions).as_ref();
466 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
467 opt.map(std::vec::Vec::len),
468 flex,
469 );
470 let body: usize =
471 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
472 prefix + body
473 };
474 }
475 if version >= 11 {
476 n += 4;
477 }
478 if version >= 0 {
479 n += match &self.records {
480 None => {
481 if flex {
482 crate::primitives::varint::uvarint_len(0)
483 } else {
484 4
485 }
486 }
487 Some(__rb) => {
488 let __rb_len = <crate::records::RecordsPayload as crate::Encode>::encoded_len(
489 __rb, version,
490 );
491 if flex {
492 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
493 } else {
494 4 + __rb_len
495 }
496 }
497 };
498 }
499 if flex {
500 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
501 if !(crate::codegen_helpers::is_default(&self.diverging_epoch)) {
502 known_pairs.push((0, self.diverging_epoch.encoded_len(version)));
503 }
504 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
505 known_pairs.push((1, self.current_leader.encoded_len(version)));
506 }
507 if !(crate::codegen_helpers::is_default(&self.snapshot_id)) {
508 known_pairs.push((2, self.snapshot_id.encoded_len(version)));
509 }
510 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
511 }
512 n
513 }
514}
515impl Decode<'_> for PartitionData {
516 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
517 let flex = version >= 12;
518 let mut out = Self::default();
519 if version >= 0 {
520 out.partition_index = get_i32(buf)?;
521 }
522 if version >= 0 {
523 out.error_code = get_i16(buf)?;
524 }
525 if version >= 0 {
526 out.high_watermark = get_i64(buf)?;
527 }
528 if version >= 4 {
529 out.last_stable_offset = get_i64(buf)?;
530 }
531 if version >= 5 {
532 out.log_start_offset = get_i64(buf)?;
533 }
534 if version >= 4 {
535 out.aborted_transactions = {
536 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
537 match opt {
538 None => None,
539 Some(n) => {
540 let mut v = Vec::with_capacity(n);
541 for _ in 0..n {
542 v.push(AbortedTransaction::decode(buf, version)?);
543 }
544 Some(v)
545 }
546 }
547 };
548 }
549 if version >= 11 {
550 out.preferred_read_replica = get_i32(buf)?;
551 }
552 if version >= 0 {
553 out.records = {
554 let __rb_opt = if flex {
555 get_compact_nullable_bytes_owned(buf)?
556 } else {
557 get_nullable_bytes_owned(buf)?
558 };
559 match __rb_opt {
560 None => None,
561 Some(__rb_bytes) => {
562 let mut __rb_cur: &[u8] = &__rb_bytes;
563 Some(crate::records::RecordsPayload::decode_lenient(
564 &mut __rb_cur,
565 version,
566 )?)
567 }
568 }
569 };
570 }
571 if flex {
572 let mut tag_diverging_epoch = None;
573 let mut tag_current_leader = None;
574 let mut tag_snapshot_id = None;
575 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
576 0 => {
577 tag_diverging_epoch = Some({
578 let b: &mut &[u8] = payload;
579 EpochEndOffset::decode(b, version)?
580 });
581 Ok(true)
582 }
583 1 => {
584 tag_current_leader = Some({
585 let b: &mut &[u8] = payload;
586 LeaderIdAndEpoch::decode(b, version)?
587 });
588 Ok(true)
589 }
590 2 => {
591 tag_snapshot_id = Some({
592 let b: &mut &[u8] = payload;
593 SnapshotId::decode(b, version)?
594 });
595 Ok(true)
596 }
597 _ => Ok(false),
598 })?;
599 if let Some(v) = tag_diverging_epoch {
600 out.diverging_epoch = v;
601 }
602 if let Some(v) = tag_current_leader {
603 out.current_leader = v;
604 }
605 if let Some(v) = tag_snapshot_id {
606 out.snapshot_id = v;
607 }
608 }
609 Ok(out)
610 }
611}
612#[cfg(test)]
613impl PartitionData {
614 #[must_use]
615 pub fn populated(version: i16) -> Self {
616 let mut m = Self::default();
617 if version >= 0 {
618 m.partition_index = 1i32;
619 }
620 if version >= 0 {
621 m.error_code = 1i16;
622 }
623 if version >= 0 {
624 m.high_watermark = 1i64;
625 }
626 if version >= 4 {
627 m.last_stable_offset = 1i64;
628 }
629 if version >= 5 {
630 m.log_start_offset = 1i64;
631 }
632 if version >= 4 {
633 m.aborted_transactions = Some(vec![AbortedTransaction::populated(version)]);
634 }
635 if version >= 11 {
636 m.preferred_read_replica = 1i32;
637 }
638 if version >= 12 {
639 m.diverging_epoch = EpochEndOffset::populated(version);
640 }
641 if version >= 12 {
642 m.current_leader = LeaderIdAndEpoch::populated(version);
643 }
644 if version >= 12 {
645 m.snapshot_id = SnapshotId::populated(version);
646 }
647 m
648 }
649}
650#[derive(Debug, Clone, PartialEq, Eq)]
651pub struct EpochEndOffset {
652 pub epoch: i32,
653 pub end_offset: i64,
654 pub unknown_tagged_fields: UnknownTaggedFields,
655}
656impl Default for EpochEndOffset {
657 fn default() -> Self {
658 Self {
659 epoch: -1i32,
660 end_offset: -1i64,
661 unknown_tagged_fields: Default::default(),
662 }
663 }
664}
665impl Encode for EpochEndOffset {
666 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
667 let flex = version >= 12;
668 if version >= 12 {
669 put_i32(buf, self.epoch);
670 }
671 if version >= 12 {
672 put_i64(buf, self.end_offset);
673 }
674 if flex {
675 let tagged = WriteTaggedFields::new();
676 tagged.write(buf, &self.unknown_tagged_fields);
677 }
678 Ok(())
679 }
680 fn encoded_len(&self, version: i16) -> usize {
681 let flex = version >= 12;
682 let mut n: usize = 0;
683 if version >= 12 {
684 n += 4;
685 }
686 if version >= 12 {
687 n += 8;
688 }
689 if flex {
690 let known_pairs: Vec<(u32, usize)> = Vec::new();
691 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
692 }
693 n
694 }
695}
696impl Decode<'_> for EpochEndOffset {
697 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
698 let flex = version >= 12;
699 let mut out = Self::default();
700 if version >= 12 {
701 out.epoch = get_i32(buf)?;
702 }
703 if version >= 12 {
704 out.end_offset = get_i64(buf)?;
705 }
706 if flex {
707 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
708 }
709 Ok(out)
710 }
711}
712#[cfg(test)]
713impl EpochEndOffset {
714 #[must_use]
715 pub fn populated(version: i16) -> Self {
716 let mut m = Self::default();
717 if version >= 12 {
718 m.epoch = 1i32;
719 }
720 if version >= 12 {
721 m.end_offset = 1i64;
722 }
723 m
724 }
725}
726#[derive(Debug, Clone, PartialEq, Eq)]
727pub struct LeaderIdAndEpoch {
728 pub leader_id: i32,
729 pub leader_epoch: i32,
730 pub unknown_tagged_fields: UnknownTaggedFields,
731}
732impl Default for LeaderIdAndEpoch {
733 fn default() -> Self {
734 Self {
735 leader_id: -1i32,
736 leader_epoch: -1i32,
737 unknown_tagged_fields: Default::default(),
738 }
739 }
740}
741impl Encode for LeaderIdAndEpoch {
742 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
743 let flex = version >= 12;
744 if version >= 12 {
745 put_i32(buf, self.leader_id);
746 }
747 if version >= 12 {
748 put_i32(buf, self.leader_epoch);
749 }
750 if flex {
751 let tagged = WriteTaggedFields::new();
752 tagged.write(buf, &self.unknown_tagged_fields);
753 }
754 Ok(())
755 }
756 fn encoded_len(&self, version: i16) -> usize {
757 let flex = version >= 12;
758 let mut n: usize = 0;
759 if version >= 12 {
760 n += 4;
761 }
762 if version >= 12 {
763 n += 4;
764 }
765 if flex {
766 let known_pairs: Vec<(u32, usize)> = Vec::new();
767 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
768 }
769 n
770 }
771}
772impl Decode<'_> for LeaderIdAndEpoch {
773 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
774 let flex = version >= 12;
775 let mut out = Self::default();
776 if version >= 12 {
777 out.leader_id = get_i32(buf)?;
778 }
779 if version >= 12 {
780 out.leader_epoch = get_i32(buf)?;
781 }
782 if flex {
783 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
784 }
785 Ok(out)
786 }
787}
788#[cfg(test)]
789impl LeaderIdAndEpoch {
790 #[must_use]
791 pub fn populated(version: i16) -> Self {
792 let mut m = Self::default();
793 if version >= 12 {
794 m.leader_id = 1i32;
795 }
796 if version >= 12 {
797 m.leader_epoch = 1i32;
798 }
799 m
800 }
801}
802#[derive(Debug, Clone, PartialEq, Eq)]
803pub struct SnapshotId {
804 pub end_offset: i64,
805 pub epoch: i32,
806 pub unknown_tagged_fields: UnknownTaggedFields,
807}
808impl Default for SnapshotId {
809 fn default() -> Self {
810 Self {
811 end_offset: -1i64,
812 epoch: -1i32,
813 unknown_tagged_fields: Default::default(),
814 }
815 }
816}
817impl Encode for SnapshotId {
818 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
819 let flex = version >= 12;
820 if version >= 0 {
821 put_i64(buf, self.end_offset);
822 }
823 if version >= 0 {
824 put_i32(buf, self.epoch);
825 }
826 if flex {
827 let tagged = WriteTaggedFields::new();
828 tagged.write(buf, &self.unknown_tagged_fields);
829 }
830 Ok(())
831 }
832 fn encoded_len(&self, version: i16) -> usize {
833 let flex = version >= 12;
834 let mut n: usize = 0;
835 if version >= 0 {
836 n += 8;
837 }
838 if version >= 0 {
839 n += 4;
840 }
841 if flex {
842 let known_pairs: Vec<(u32, usize)> = Vec::new();
843 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
844 }
845 n
846 }
847}
848impl Decode<'_> for SnapshotId {
849 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
850 let flex = version >= 12;
851 let mut out = Self::default();
852 if version >= 0 {
853 out.end_offset = get_i64(buf)?;
854 }
855 if version >= 0 {
856 out.epoch = get_i32(buf)?;
857 }
858 if flex {
859 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
860 }
861 Ok(out)
862 }
863}
864#[cfg(test)]
865impl SnapshotId {
866 #[must_use]
867 pub fn populated(version: i16) -> Self {
868 let mut m = Self::default();
869 if version >= 0 {
870 m.end_offset = 1i64;
871 }
872 if version >= 0 {
873 m.epoch = 1i32;
874 }
875 m
876 }
877}
878#[derive(Debug, Clone, PartialEq, Eq, Default)]
879pub struct AbortedTransaction {
880 pub producer_id: i64,
881 pub first_offset: i64,
882 pub unknown_tagged_fields: UnknownTaggedFields,
883}
884impl Encode for AbortedTransaction {
885 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
886 let flex = version >= 12;
887 if version >= 4 {
888 put_i64(buf, self.producer_id);
889 }
890 if version >= 4 {
891 put_i64(buf, self.first_offset);
892 }
893 if flex {
894 let tagged = WriteTaggedFields::new();
895 tagged.write(buf, &self.unknown_tagged_fields);
896 }
897 Ok(())
898 }
899 fn encoded_len(&self, version: i16) -> usize {
900 let flex = version >= 12;
901 let mut n: usize = 0;
902 if version >= 4 {
903 n += 8;
904 }
905 if version >= 4 {
906 n += 8;
907 }
908 if flex {
909 let known_pairs: Vec<(u32, usize)> = Vec::new();
910 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
911 }
912 n
913 }
914}
915impl Decode<'_> for AbortedTransaction {
916 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
917 let flex = version >= 12;
918 let mut out = Self::default();
919 if version >= 4 {
920 out.producer_id = get_i64(buf)?;
921 }
922 if version >= 4 {
923 out.first_offset = get_i64(buf)?;
924 }
925 if flex {
926 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
927 }
928 Ok(out)
929 }
930}
931#[cfg(test)]
932impl AbortedTransaction {
933 #[must_use]
934 pub fn populated(version: i16) -> Self {
935 let mut m = Self::default();
936 if version >= 4 {
937 m.producer_id = 1i64;
938 }
939 if version >= 4 {
940 m.first_offset = 1i64;
941 }
942 m
943 }
944}
945#[derive(Debug, Clone, PartialEq, Eq, Default)]
946pub struct NodeEndpoint {
947 pub node_id: i32,
948 pub host: String,
949 pub port: i32,
950 pub rack: Option<String>,
951 pub unknown_tagged_fields: UnknownTaggedFields,
952}
953impl Encode for NodeEndpoint {
954 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
955 let flex = version >= 12;
956 if version >= 16 {
957 put_i32(buf, self.node_id);
958 }
959 if version >= 16 {
960 if flex {
961 put_compact_string(buf, &self.host);
962 } else {
963 put_string(buf, &self.host);
964 }
965 }
966 if version >= 16 {
967 put_i32(buf, self.port);
968 }
969 if version >= 16 {
970 if flex {
971 put_compact_nullable_string(buf, self.rack.as_deref());
972 } else {
973 put_nullable_string(buf, self.rack.as_deref());
974 }
975 }
976 if flex {
977 let tagged = WriteTaggedFields::new();
978 tagged.write(buf, &self.unknown_tagged_fields);
979 }
980 Ok(())
981 }
982 fn encoded_len(&self, version: i16) -> usize {
983 let flex = version >= 12;
984 let mut n: usize = 0;
985 if version >= 16 {
986 n += 4;
987 }
988 if version >= 16 {
989 n += if flex {
990 compact_string_len(&self.host)
991 } else {
992 string_len(&self.host)
993 };
994 }
995 if version >= 16 {
996 n += 4;
997 }
998 if version >= 16 {
999 n += if flex {
1000 compact_nullable_string_len(self.rack.as_deref())
1001 } else {
1002 nullable_string_len(self.rack.as_deref())
1003 };
1004 }
1005 if flex {
1006 let known_pairs: Vec<(u32, usize)> = Vec::new();
1007 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1008 }
1009 n
1010 }
1011}
1012impl Decode<'_> for NodeEndpoint {
1013 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
1014 let flex = version >= 12;
1015 let mut out = Self::default();
1016 if version >= 16 {
1017 out.node_id = get_i32(buf)?;
1018 }
1019 if version >= 16 {
1020 out.host = if flex {
1021 get_compact_string_owned(buf)?
1022 } else {
1023 get_string_owned(buf)?
1024 };
1025 }
1026 if version >= 16 {
1027 out.port = get_i32(buf)?;
1028 }
1029 if version >= 16 {
1030 out.rack = if flex {
1031 get_compact_nullable_string_owned(buf)?
1032 } else {
1033 get_nullable_string_owned(buf)?
1034 };
1035 }
1036 if flex {
1037 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1038 }
1039 Ok(out)
1040 }
1041}
1042#[cfg(test)]
1043impl NodeEndpoint {
1044 #[must_use]
1045 pub fn populated(version: i16) -> Self {
1046 let mut m = Self::default();
1047 if version >= 16 {
1048 m.node_id = 1i32;
1049 }
1050 if version >= 16 {
1051 m.host = "x".to_string();
1052 }
1053 if version >= 16 {
1054 m.port = 1i32;
1055 }
1056 if version >= 16 {
1057 m.rack = Some("x".to_string());
1058 }
1059 m
1060 }
1061}
1062#[must_use]
1065#[allow(unused_comparisons)]
1066pub fn default_json(version: i16) -> ::serde_json::Value {
1067 let mut obj = ::serde_json::Map::new();
1068 if version >= 1 {
1069 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
1070 }
1071 if version >= 7 {
1072 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
1073 }
1074 if version >= 7 {
1075 obj.insert("sessionId".to_string(), ::serde_json::json!(0));
1076 }
1077 obj.insert("responses".to_string(), ::serde_json::Value::Array(vec![]));
1078 if version >= 16 {
1079 obj.insert(
1080 "nodeEndpoints".to_string(),
1081 ::serde_json::Value::Array(vec![]),
1082 );
1083 }
1084 ::serde_json::Value::Object(obj)
1085}