1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::primitives::string_bytes::{
12 get_compact_nullable_bytes_owned, get_nullable_bytes_owned, put_bytes, put_compact_bytes,
13 put_compact_nullable_bytes, put_nullable_bytes,
14};
15use crate::tagged_fields::{
16 WriteTaggedFields, encode_to_bytes, read_tagged_fields, tagged_fields_len,
17};
18use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
19
20pub const API_KEY: i16 = 1;
21pub const MIN_VERSION: i16 = 4;
22pub const MAX_VERSION: i16 = 18;
23pub const FLEXIBLE_MIN: i16 = 12;
24
25#[inline]
26fn is_flexible(version: i16) -> bool {
27 version >= FLEXIBLE_MIN
28}
29
30#[derive(Debug, Clone, PartialEq, Eq, Default)]
31pub struct FetchResponse {
32 pub throttle_time_ms: i32,
33 pub error_code: i16,
34 pub session_id: i32,
35 pub responses: Vec<FetchableTopicResponse>,
36 pub node_endpoints: Vec<NodeEndpoint>,
37 pub unknown_tagged_fields: UnknownTaggedFields,
38}
39impl Encode for FetchResponse {
40 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
41 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
42 return Err(ProtocolError::UnsupportedVersion {
43 api_key: API_KEY,
44 version,
45 });
46 }
47 let flex = is_flexible(version);
48 if version >= 1 {
49 put_i32(buf, self.throttle_time_ms);
50 }
51 if version >= 7 {
52 put_i16(buf, self.error_code);
53 }
54 if version >= 7 {
55 put_i32(buf, self.session_id);
56 }
57 if version >= 0 {
58 {
59 crate::primitives::array::put_array_len(buf, (self.responses).len(), flex);
60 for it in &self.responses {
61 it.encode(buf, version)?;
62 }
63 }
64 }
65 if flex {
66 let mut tagged = WriteTaggedFields::new();
67 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
68 let payload = encode_to_bytes(
69 {
70 let prefix = crate::primitives::array::array_len_prefix_len(
71 (self.node_endpoints).len(),
72 flex,
73 );
74 let body: usize = (self.node_endpoints)
75 .iter()
76 .map(|it| it.encoded_len(version))
77 .sum();
78 prefix + body
79 },
80 |b| {
81 {
82 crate::primitives::array::put_array_len(
83 b,
84 (self.node_endpoints).len(),
85 flex,
86 );
87 for it in &self.node_endpoints {
88 it.encode(b, version)?;
89 }
90 };
91 Ok(())
92 },
93 );
94 tagged.add(0, payload);
95 }
96 tagged.write(buf, &self.unknown_tagged_fields);
97 }
98 Ok(())
99 }
100 fn encoded_len(&self, version: i16) -> usize {
101 let flex = is_flexible(version);
102 let mut n: usize = 0;
103 if version >= 1 {
104 n += 4;
105 }
106 if version >= 7 {
107 n += 2;
108 }
109 if version >= 7 {
110 n += 4;
111 }
112 if version >= 0 {
113 n += {
114 let prefix =
115 crate::primitives::array::array_len_prefix_len((self.responses).len(), flex);
116 let body: usize = (self.responses)
117 .iter()
118 .map(|it| it.encoded_len(version))
119 .sum();
120 prefix + body
121 };
122 }
123 if flex {
124 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
125 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
126 known_pairs.push((0, {
127 let prefix = crate::primitives::array::array_len_prefix_len(
128 (self.node_endpoints).len(),
129 flex,
130 );
131 let body: usize = (self.node_endpoints)
132 .iter()
133 .map(|it| it.encoded_len(version))
134 .sum();
135 prefix + body
136 }));
137 }
138 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
139 }
140 n
141 }
142}
143impl Decode<'_> for FetchResponse {
144 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
145 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
146 return Err(ProtocolError::UnsupportedVersion {
147 api_key: API_KEY,
148 version,
149 });
150 }
151 let flex = is_flexible(version);
152 let mut out = Self::default();
153 if version >= 1 {
154 out.throttle_time_ms = get_i32(buf)?;
155 }
156 if version >= 7 {
157 out.error_code = get_i16(buf)?;
158 }
159 if version >= 7 {
160 out.session_id = get_i32(buf)?;
161 }
162 if version >= 0 {
163 out.responses = {
164 let n = crate::primitives::array::get_array_len(buf, flex)?;
165 let mut v = Vec::with_capacity(n);
166 for _ in 0..n {
167 v.push(FetchableTopicResponse::decode(buf, version)?);
168 }
169 v
170 };
171 }
172 if flex {
173 let mut tag_node_endpoints = None;
174 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
175 0 => {
176 tag_node_endpoints = Some({
177 let b: &mut &[u8] = payload;
178 {
179 let n = crate::primitives::array::get_array_len(b, flex)?;
180 let mut v = Vec::with_capacity(n);
181 for _ in 0..n {
182 v.push(NodeEndpoint::decode(b, version)?);
183 }
184 v
185 }
186 });
187 Ok(true)
188 }
189 _ => Ok(false),
190 })?;
191 if let Some(v) = tag_node_endpoints {
192 out.node_endpoints = v;
193 }
194 }
195 Ok(out)
196 }
197}
198#[cfg(test)]
199impl FetchResponse {
200 #[must_use]
201 pub fn populated(version: i16) -> Self {
202 let mut m = Self::default();
203 if version >= 1 {
204 m.throttle_time_ms = 1i32;
205 }
206 if version >= 7 {
207 m.error_code = 1i16;
208 }
209 if version >= 7 {
210 m.session_id = 1i32;
211 }
212 if version >= 0 {
213 m.responses = vec![FetchableTopicResponse::populated(version)];
214 }
215 if version >= 16 {
216 m.node_endpoints = vec![NodeEndpoint::populated(version)];
217 }
218 m
219 }
220}
221#[derive(Debug, Clone, PartialEq, Eq, Default)]
222pub struct FetchableTopicResponse {
223 pub topic: String,
224 pub topic_id: crate::primitives::uuid::Uuid,
225 pub partitions: Vec<PartitionData>,
226 pub unknown_tagged_fields: UnknownTaggedFields,
227}
228impl Encode for FetchableTopicResponse {
229 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
230 let flex = version >= 12;
231 if (0..=12).contains(&version) {
232 if flex {
233 put_compact_string(buf, &self.topic);
234 } else {
235 put_string(buf, &self.topic);
236 }
237 }
238 if version >= 13 {
239 crate::primitives::uuid::put_uuid(buf, self.topic_id);
240 }
241 if version >= 0 {
242 {
243 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
244 for it in &self.partitions {
245 it.encode(buf, version)?;
246 }
247 }
248 }
249 if flex {
250 let tagged = WriteTaggedFields::new();
251 tagged.write(buf, &self.unknown_tagged_fields);
252 }
253 Ok(())
254 }
255 fn encoded_len(&self, version: i16) -> usize {
256 let flex = version >= 12;
257 let mut n: usize = 0;
258 if (0..=12).contains(&version) {
259 n += if flex {
260 compact_string_len(&self.topic)
261 } else {
262 string_len(&self.topic)
263 };
264 }
265 if version >= 13 {
266 n += 16;
267 }
268 if version >= 0 {
269 n += {
270 let prefix =
271 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
272 let body: usize = (self.partitions)
273 .iter()
274 .map(|it| it.encoded_len(version))
275 .sum();
276 prefix + body
277 };
278 }
279 if flex {
280 let known_pairs: Vec<(u32, usize)> = Vec::new();
281 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
282 }
283 n
284 }
285}
286impl Decode<'_> for FetchableTopicResponse {
287 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
288 let flex = version >= 12;
289 let mut out = Self::default();
290 if (0..=12).contains(&version) {
291 out.topic = if flex {
292 get_compact_string_owned(buf)?
293 } else {
294 get_string_owned(buf)?
295 };
296 }
297 if version >= 13 {
298 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
299 }
300 if version >= 0 {
301 out.partitions = {
302 let n = crate::primitives::array::get_array_len(buf, flex)?;
303 let mut v = Vec::with_capacity(n);
304 for _ in 0..n {
305 v.push(PartitionData::decode(buf, version)?);
306 }
307 v
308 };
309 }
310 if flex {
311 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
312 }
313 Ok(out)
314 }
315}
316#[cfg(test)]
317impl FetchableTopicResponse {
318 #[must_use]
319 pub fn populated(version: i16) -> Self {
320 let mut m = Self::default();
321 if (0..=12).contains(&version) {
322 m.topic = "x".to_string();
323 }
324 if version >= 13 {
325 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
326 }
327 if version >= 0 {
328 m.partitions = vec![PartitionData::populated(version)];
329 }
330 m
331 }
332}
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct PartitionData {
335 pub partition_index: i32,
336 pub error_code: i16,
337 pub high_watermark: i64,
338 pub last_stable_offset: i64,
339 pub log_start_offset: i64,
340 pub aborted_transactions: Option<Vec<AbortedTransaction>>,
341 pub preferred_read_replica: i32,
342 pub records: Option<crate::records::RecordsPayload>,
343 pub diverging_epoch: EpochEndOffset,
344 pub current_leader: LeaderIdAndEpoch,
345 pub snapshot_id: SnapshotId,
346 pub unknown_tagged_fields: UnknownTaggedFields,
347}
348impl Default for PartitionData {
349 fn default() -> Self {
350 Self {
351 partition_index: 0i32,
352 error_code: 0i16,
353 high_watermark: 0i64,
354 last_stable_offset: -1i64,
355 log_start_offset: -1i64,
356 aborted_transactions: None,
357 preferred_read_replica: -1i32,
358 records: None,
359 diverging_epoch: Default::default(),
360 current_leader: Default::default(),
361 snapshot_id: Default::default(),
362 unknown_tagged_fields: Default::default(),
363 }
364 }
365}
366impl Encode for PartitionData {
367 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
368 let flex = version >= 12;
369 if version >= 0 {
370 put_i32(buf, self.partition_index);
371 }
372 if version >= 0 {
373 put_i16(buf, self.error_code);
374 }
375 if version >= 0 {
376 put_i64(buf, self.high_watermark);
377 }
378 if version >= 4 {
379 put_i64(buf, self.last_stable_offset);
380 }
381 if version >= 5 {
382 put_i64(buf, self.log_start_offset);
383 }
384 if version >= 4 {
385 {
386 let len = (self.aborted_transactions).as_ref().map(Vec::len);
387 crate::primitives::array::put_nullable_array_len(buf, len, flex);
388 if let Some(v) = &self.aborted_transactions {
389 for it in v {
390 it.encode(buf, version)?;
391 }
392 }
393 }
394 }
395 if version >= 11 {
396 put_i32(buf, self.preferred_read_replica);
397 }
398 if version >= 0 {
399 match &self.records {
400 None => {
401 if flex {
402 put_compact_nullable_bytes(buf, None);
403 } else {
404 put_nullable_bytes(buf, None);
405 }
406 }
407 Some(__rb) => {
408 let mut __rb_buf = bytes::BytesMut::new();
409 <crate::records::RecordsPayload as crate::Encode>::encode(
410 __rb,
411 &mut __rb_buf,
412 version,
413 )?;
414 if flex {
415 put_compact_bytes(buf, &__rb_buf);
416 } else {
417 put_bytes(buf, &__rb_buf);
418 }
419 }
420 }
421 }
422 if flex {
423 let mut tagged = WriteTaggedFields::new();
424 if !(crate::codegen_helpers::is_default(&self.diverging_epoch)) {
425 let payload = encode_to_bytes(self.diverging_epoch.encoded_len(version), |b| {
426 self.diverging_epoch.encode(b, version)?;
427 Ok(())
428 });
429 tagged.add(0, payload);
430 }
431 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
432 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| {
433 self.current_leader.encode(b, version)?;
434 Ok(())
435 });
436 tagged.add(1, payload);
437 }
438 if !(crate::codegen_helpers::is_default(&self.snapshot_id)) {
439 let payload = encode_to_bytes(self.snapshot_id.encoded_len(version), |b| {
440 self.snapshot_id.encode(b, version)?;
441 Ok(())
442 });
443 tagged.add(2, payload);
444 }
445 tagged.write(buf, &self.unknown_tagged_fields);
446 }
447 Ok(())
448 }
449 fn encoded_len(&self, version: i16) -> usize {
450 let flex = version >= 12;
451 let mut n: usize = 0;
452 if version >= 0 {
453 n += 4;
454 }
455 if version >= 0 {
456 n += 2;
457 }
458 if version >= 0 {
459 n += 8;
460 }
461 if version >= 4 {
462 n += 8;
463 }
464 if version >= 5 {
465 n += 8;
466 }
467 if version >= 4 {
468 n += {
469 let opt: Option<&Vec<_>> = (self.aborted_transactions).as_ref();
470 let prefix = crate::primitives::array::nullable_array_len_prefix_len(
471 opt.map(std::vec::Vec::len),
472 flex,
473 );
474 let body: usize =
475 opt.map_or(0, |v| v.iter().map(|it| it.encoded_len(version)).sum());
476 prefix + body
477 };
478 }
479 if version >= 11 {
480 n += 4;
481 }
482 if version >= 0 {
483 n += match &self.records {
484 None => {
485 if flex {
486 crate::primitives::varint::uvarint_len(0)
487 } else {
488 4
489 }
490 }
491 Some(__rb) => {
492 let __rb_len = <crate::records::RecordsPayload as crate::Encode>::encoded_len(
493 __rb, version,
494 );
495 if flex {
496 crate::primitives::string_bytes::compact_bytes_len_from_size(__rb_len)
497 } else {
498 4 + __rb_len
499 }
500 }
501 };
502 }
503 if flex {
504 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
505 if !(crate::codegen_helpers::is_default(&self.diverging_epoch)) {
506 known_pairs.push((0, self.diverging_epoch.encoded_len(version)));
507 }
508 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
509 known_pairs.push((1, self.current_leader.encoded_len(version)));
510 }
511 if !(crate::codegen_helpers::is_default(&self.snapshot_id)) {
512 known_pairs.push((2, self.snapshot_id.encoded_len(version)));
513 }
514 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
515 }
516 n
517 }
518}
519impl Decode<'_> for PartitionData {
520 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
521 let flex = version >= 12;
522 let mut out = Self::default();
523 if version >= 0 {
524 out.partition_index = get_i32(buf)?;
525 }
526 if version >= 0 {
527 out.error_code = get_i16(buf)?;
528 }
529 if version >= 0 {
530 out.high_watermark = get_i64(buf)?;
531 }
532 if version >= 4 {
533 out.last_stable_offset = get_i64(buf)?;
534 }
535 if version >= 5 {
536 out.log_start_offset = get_i64(buf)?;
537 }
538 if version >= 4 {
539 out.aborted_transactions = {
540 let opt = crate::primitives::array::get_nullable_array_len(buf, flex)?;
541 match opt {
542 None => None,
543 Some(n) => {
544 let mut v = Vec::with_capacity(n);
545 for _ in 0..n {
546 v.push(AbortedTransaction::decode(buf, version)?);
547 }
548 Some(v)
549 }
550 }
551 };
552 }
553 if version >= 11 {
554 out.preferred_read_replica = get_i32(buf)?;
555 }
556 if version >= 0 {
557 out.records = {
558 let __rb_opt = if flex {
559 get_compact_nullable_bytes_owned(buf)?
560 } else {
561 get_nullable_bytes_owned(buf)?
562 };
563 match __rb_opt {
564 None => None,
565 Some(__rb_bytes) => {
566 let mut __rb_cur: &[u8] = &__rb_bytes;
567 Some(crate::records::RecordsPayload::decode_lenient(
568 &mut __rb_cur,
569 version,
570 )?)
571 }
572 }
573 };
574 }
575 if flex {
576 let mut tag_diverging_epoch = None;
577 let mut tag_current_leader = None;
578 let mut tag_snapshot_id = None;
579 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| match tag {
580 0 => {
581 tag_diverging_epoch = Some({
582 let b: &mut &[u8] = payload;
583 EpochEndOffset::decode(b, version)?
584 });
585 Ok(true)
586 }
587 1 => {
588 tag_current_leader = Some({
589 let b: &mut &[u8] = payload;
590 LeaderIdAndEpoch::decode(b, version)?
591 });
592 Ok(true)
593 }
594 2 => {
595 tag_snapshot_id = Some({
596 let b: &mut &[u8] = payload;
597 SnapshotId::decode(b, version)?
598 });
599 Ok(true)
600 }
601 _ => Ok(false),
602 })?;
603 if let Some(v) = tag_diverging_epoch {
604 out.diverging_epoch = v;
605 }
606 if let Some(v) = tag_current_leader {
607 out.current_leader = v;
608 }
609 if let Some(v) = tag_snapshot_id {
610 out.snapshot_id = v;
611 }
612 }
613 Ok(out)
614 }
615}
616#[cfg(test)]
617impl PartitionData {
618 #[must_use]
619 pub fn populated(version: i16) -> Self {
620 let mut m = Self::default();
621 if version >= 0 {
622 m.partition_index = 1i32;
623 }
624 if version >= 0 {
625 m.error_code = 1i16;
626 }
627 if version >= 0 {
628 m.high_watermark = 1i64;
629 }
630 if version >= 4 {
631 m.last_stable_offset = 1i64;
632 }
633 if version >= 5 {
634 m.log_start_offset = 1i64;
635 }
636 if version >= 4 {
637 m.aborted_transactions = Some(vec![AbortedTransaction::populated(version)]);
638 }
639 if version >= 11 {
640 m.preferred_read_replica = 1i32;
641 }
642 if version >= 12 {
643 m.diverging_epoch = EpochEndOffset::populated(version);
644 }
645 if version >= 12 {
646 m.current_leader = LeaderIdAndEpoch::populated(version);
647 }
648 if version >= 12 {
649 m.snapshot_id = SnapshotId::populated(version);
650 }
651 m
652 }
653}
654#[derive(Debug, Clone, PartialEq, Eq)]
655pub struct EpochEndOffset {
656 pub epoch: i32,
657 pub end_offset: i64,
658 pub unknown_tagged_fields: UnknownTaggedFields,
659}
660impl Default for EpochEndOffset {
661 fn default() -> Self {
662 Self {
663 epoch: -1i32,
664 end_offset: -1i64,
665 unknown_tagged_fields: Default::default(),
666 }
667 }
668}
669impl Encode for EpochEndOffset {
670 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
671 let flex = version >= 12;
672 if version >= 12 {
673 put_i32(buf, self.epoch);
674 }
675 if version >= 12 {
676 put_i64(buf, self.end_offset);
677 }
678 if flex {
679 let tagged = WriteTaggedFields::new();
680 tagged.write(buf, &self.unknown_tagged_fields);
681 }
682 Ok(())
683 }
684 fn encoded_len(&self, version: i16) -> usize {
685 let flex = version >= 12;
686 let mut n: usize = 0;
687 if version >= 12 {
688 n += 4;
689 }
690 if version >= 12 {
691 n += 8;
692 }
693 if flex {
694 let known_pairs: Vec<(u32, usize)> = Vec::new();
695 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
696 }
697 n
698 }
699}
700impl Decode<'_> for EpochEndOffset {
701 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
702 let flex = version >= 12;
703 let mut out = Self::default();
704 if version >= 12 {
705 out.epoch = get_i32(buf)?;
706 }
707 if version >= 12 {
708 out.end_offset = get_i64(buf)?;
709 }
710 if flex {
711 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
712 }
713 Ok(out)
714 }
715}
716#[cfg(test)]
717impl EpochEndOffset {
718 #[must_use]
719 pub fn populated(version: i16) -> Self {
720 let mut m = Self::default();
721 if version >= 12 {
722 m.epoch = 1i32;
723 }
724 if version >= 12 {
725 m.end_offset = 1i64;
726 }
727 m
728 }
729}
730#[derive(Debug, Clone, PartialEq, Eq)]
731pub struct LeaderIdAndEpoch {
732 pub leader_id: i32,
733 pub leader_epoch: i32,
734 pub unknown_tagged_fields: UnknownTaggedFields,
735}
736impl Default for LeaderIdAndEpoch {
737 fn default() -> Self {
738 Self {
739 leader_id: -1i32,
740 leader_epoch: -1i32,
741 unknown_tagged_fields: Default::default(),
742 }
743 }
744}
745impl Encode for LeaderIdAndEpoch {
746 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
747 let flex = version >= 12;
748 if version >= 12 {
749 put_i32(buf, self.leader_id);
750 }
751 if version >= 12 {
752 put_i32(buf, self.leader_epoch);
753 }
754 if flex {
755 let tagged = WriteTaggedFields::new();
756 tagged.write(buf, &self.unknown_tagged_fields);
757 }
758 Ok(())
759 }
760 fn encoded_len(&self, version: i16) -> usize {
761 let flex = version >= 12;
762 let mut n: usize = 0;
763 if version >= 12 {
764 n += 4;
765 }
766 if version >= 12 {
767 n += 4;
768 }
769 if flex {
770 let known_pairs: Vec<(u32, usize)> = Vec::new();
771 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
772 }
773 n
774 }
775}
776impl Decode<'_> for LeaderIdAndEpoch {
777 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
778 let flex = version >= 12;
779 let mut out = Self::default();
780 if version >= 12 {
781 out.leader_id = get_i32(buf)?;
782 }
783 if version >= 12 {
784 out.leader_epoch = get_i32(buf)?;
785 }
786 if flex {
787 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
788 }
789 Ok(out)
790 }
791}
792#[cfg(test)]
793impl LeaderIdAndEpoch {
794 #[must_use]
795 pub fn populated(version: i16) -> Self {
796 let mut m = Self::default();
797 if version >= 12 {
798 m.leader_id = 1i32;
799 }
800 if version >= 12 {
801 m.leader_epoch = 1i32;
802 }
803 m
804 }
805}
806#[derive(Debug, Clone, PartialEq, Eq)]
807pub struct SnapshotId {
808 pub end_offset: i64,
809 pub epoch: i32,
810 pub unknown_tagged_fields: UnknownTaggedFields,
811}
812impl Default for SnapshotId {
813 fn default() -> Self {
814 Self {
815 end_offset: -1i64,
816 epoch: -1i32,
817 unknown_tagged_fields: Default::default(),
818 }
819 }
820}
821impl Encode for SnapshotId {
822 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
823 let flex = version >= 12;
824 if version >= 0 {
825 put_i64(buf, self.end_offset);
826 }
827 if version >= 0 {
828 put_i32(buf, self.epoch);
829 }
830 if flex {
831 let tagged = WriteTaggedFields::new();
832 tagged.write(buf, &self.unknown_tagged_fields);
833 }
834 Ok(())
835 }
836 fn encoded_len(&self, version: i16) -> usize {
837 let flex = version >= 12;
838 let mut n: usize = 0;
839 if version >= 0 {
840 n += 8;
841 }
842 if version >= 0 {
843 n += 4;
844 }
845 if flex {
846 let known_pairs: Vec<(u32, usize)> = Vec::new();
847 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
848 }
849 n
850 }
851}
852impl Decode<'_> for SnapshotId {
853 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
854 let flex = version >= 12;
855 let mut out = Self::default();
856 if version >= 0 {
857 out.end_offset = get_i64(buf)?;
858 }
859 if version >= 0 {
860 out.epoch = get_i32(buf)?;
861 }
862 if flex {
863 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
864 }
865 Ok(out)
866 }
867}
868#[cfg(test)]
869impl SnapshotId {
870 #[must_use]
871 pub fn populated(version: i16) -> Self {
872 let mut m = Self::default();
873 if version >= 0 {
874 m.end_offset = 1i64;
875 }
876 if version >= 0 {
877 m.epoch = 1i32;
878 }
879 m
880 }
881}
882#[derive(Debug, Clone, PartialEq, Eq, Default)]
883pub struct AbortedTransaction {
884 pub producer_id: i64,
885 pub first_offset: i64,
886 pub unknown_tagged_fields: UnknownTaggedFields,
887}
888impl Encode for AbortedTransaction {
889 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
890 let flex = version >= 12;
891 if version >= 4 {
892 put_i64(buf, self.producer_id);
893 }
894 if version >= 4 {
895 put_i64(buf, self.first_offset);
896 }
897 if flex {
898 let tagged = WriteTaggedFields::new();
899 tagged.write(buf, &self.unknown_tagged_fields);
900 }
901 Ok(())
902 }
903 fn encoded_len(&self, version: i16) -> usize {
904 let flex = version >= 12;
905 let mut n: usize = 0;
906 if version >= 4 {
907 n += 8;
908 }
909 if version >= 4 {
910 n += 8;
911 }
912 if flex {
913 let known_pairs: Vec<(u32, usize)> = Vec::new();
914 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
915 }
916 n
917 }
918}
919impl Decode<'_> for AbortedTransaction {
920 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
921 let flex = version >= 12;
922 let mut out = Self::default();
923 if version >= 4 {
924 out.producer_id = get_i64(buf)?;
925 }
926 if version >= 4 {
927 out.first_offset = get_i64(buf)?;
928 }
929 if flex {
930 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
931 }
932 Ok(out)
933 }
934}
935#[cfg(test)]
936impl AbortedTransaction {
937 #[must_use]
938 pub fn populated(version: i16) -> Self {
939 let mut m = Self::default();
940 if version >= 4 {
941 m.producer_id = 1i64;
942 }
943 if version >= 4 {
944 m.first_offset = 1i64;
945 }
946 m
947 }
948}
949#[derive(Debug, Clone, PartialEq, Eq, Default)]
950pub struct NodeEndpoint {
951 pub node_id: i32,
952 pub host: String,
953 pub port: i32,
954 pub rack: Option<String>,
955 pub unknown_tagged_fields: UnknownTaggedFields,
956}
957impl Encode for NodeEndpoint {
958 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
959 let flex = version >= 12;
960 if version >= 16 {
961 put_i32(buf, self.node_id);
962 }
963 if version >= 16 {
964 if flex {
965 put_compact_string(buf, &self.host);
966 } else {
967 put_string(buf, &self.host);
968 }
969 }
970 if version >= 16 {
971 put_i32(buf, self.port);
972 }
973 if version >= 16 {
974 if flex {
975 put_compact_nullable_string(buf, self.rack.as_deref());
976 } else {
977 put_nullable_string(buf, self.rack.as_deref());
978 }
979 }
980 if flex {
981 let tagged = WriteTaggedFields::new();
982 tagged.write(buf, &self.unknown_tagged_fields);
983 }
984 Ok(())
985 }
986 fn encoded_len(&self, version: i16) -> usize {
987 let flex = version >= 12;
988 let mut n: usize = 0;
989 if version >= 16 {
990 n += 4;
991 }
992 if version >= 16 {
993 n += if flex {
994 compact_string_len(&self.host)
995 } else {
996 string_len(&self.host)
997 };
998 }
999 if version >= 16 {
1000 n += 4;
1001 }
1002 if version >= 16 {
1003 n += if flex {
1004 compact_nullable_string_len(self.rack.as_deref())
1005 } else {
1006 nullable_string_len(self.rack.as_deref())
1007 };
1008 }
1009 if flex {
1010 let known_pairs: Vec<(u32, usize)> = Vec::new();
1011 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
1012 }
1013 n
1014 }
1015}
1016impl Decode<'_> for NodeEndpoint {
1017 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
1018 let flex = version >= 12;
1019 let mut out = Self::default();
1020 if version >= 16 {
1021 out.node_id = get_i32(buf)?;
1022 }
1023 if version >= 16 {
1024 out.host = if flex {
1025 get_compact_string_owned(buf)?
1026 } else {
1027 get_string_owned(buf)?
1028 };
1029 }
1030 if version >= 16 {
1031 out.port = get_i32(buf)?;
1032 }
1033 if version >= 16 {
1034 out.rack = if flex {
1035 get_compact_nullable_string_owned(buf)?
1036 } else {
1037 get_nullable_string_owned(buf)?
1038 };
1039 }
1040 if flex {
1041 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
1042 }
1043 Ok(out)
1044 }
1045}
1046#[cfg(test)]
1047impl NodeEndpoint {
1048 #[must_use]
1049 pub fn populated(version: i16) -> Self {
1050 let mut m = Self::default();
1051 if version >= 16 {
1052 m.node_id = 1i32;
1053 }
1054 if version >= 16 {
1055 m.host = "x".to_string();
1056 }
1057 if version >= 16 {
1058 m.port = 1i32;
1059 }
1060 if version >= 16 {
1061 m.rack = Some("x".to_string());
1062 }
1063 m
1064 }
1065}
1066
1067#[must_use]
1070#[allow(unused_comparisons)]
1071pub fn default_json(version: i16) -> ::serde_json::Value {
1072 let mut obj = ::serde_json::Map::new();
1073 if version >= 1 {
1074 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
1075 }
1076 if version >= 7 {
1077 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
1078 }
1079 if version >= 7 {
1080 obj.insert("sessionId".to_string(), ::serde_json::json!(0));
1081 }
1082 obj.insert("responses".to_string(), ::serde_json::Value::Array(vec![]));
1083 if version >= 16 {
1084 obj.insert(
1085 "nodeEndpoints".to_string(),
1086 ::serde_json::Value::Array(vec![]),
1087 );
1088 }
1089 ::serde_json::Value::Object(obj)
1090}