kafka_protocol/messages/
fetch_response.rs

1//! FetchResponse
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FetchResponse.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-16
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AbortedTransaction {
24    /// The producer id associated with the aborted transaction.
25    ///
26    /// Supported API versions: 4-16
27    pub producer_id: super::ProducerId,
28
29    /// The first offset in the aborted transaction.
30    ///
31    /// Supported API versions: 4-16
32    pub first_offset: i64,
33
34    /// Other tagged fields
35    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AbortedTransaction {
39    /// Sets `producer_id` to the passed value.
40    ///
41    /// The producer id associated with the aborted transaction.
42    ///
43    /// Supported API versions: 4-16
44    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
45        self.producer_id = value;
46        self
47    }
48    /// Sets `first_offset` to the passed value.
49    ///
50    /// The first offset in the aborted transaction.
51    ///
52    /// Supported API versions: 4-16
53    pub fn with_first_offset(mut self, value: i64) -> Self {
54        self.first_offset = value;
55        self
56    }
57    /// Sets unknown_tagged_fields to the passed value.
58    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59        self.unknown_tagged_fields = value;
60        self
61    }
62    /// Inserts an entry into unknown_tagged_fields.
63    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64        self.unknown_tagged_fields.insert(key, value);
65        self
66    }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AbortedTransaction {
71    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72        if version >= 4 {
73            types::Int64.encode(buf, &self.producer_id)?;
74        } else {
75            if self.producer_id != 0 {
76                bail!("A field is set that is not available on the selected protocol version");
77            }
78        }
79        if version >= 4 {
80            types::Int64.encode(buf, &self.first_offset)?;
81        } else {
82            if self.first_offset != 0 {
83                bail!("A field is set that is not available on the selected protocol version");
84            }
85        }
86        if version >= 12 {
87            let num_tagged_fields = self.unknown_tagged_fields.len();
88            if num_tagged_fields > std::u32::MAX as usize {
89                bail!(
90                    "Too many tagged fields to encode ({} fields)",
91                    num_tagged_fields
92                );
93            }
94            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
95
96            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
97        }
98        Ok(())
99    }
100    fn compute_size(&self, version: i16) -> Result<usize> {
101        let mut total_size = 0;
102        if version >= 4 {
103            total_size += types::Int64.compute_size(&self.producer_id)?;
104        } else {
105            if self.producer_id != 0 {
106                bail!("A field is set that is not available on the selected protocol version");
107            }
108        }
109        if version >= 4 {
110            total_size += types::Int64.compute_size(&self.first_offset)?;
111        } else {
112            if self.first_offset != 0 {
113                bail!("A field is set that is not available on the selected protocol version");
114            }
115        }
116        if version >= 12 {
117            let num_tagged_fields = self.unknown_tagged_fields.len();
118            if num_tagged_fields > std::u32::MAX as usize {
119                bail!(
120                    "Too many tagged fields to encode ({} fields)",
121                    num_tagged_fields
122                );
123            }
124            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
125
126            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
127        }
128        Ok(total_size)
129    }
130}
131
132#[cfg(feature = "client")]
133impl Decodable for AbortedTransaction {
134    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
135        let producer_id = if version >= 4 {
136            types::Int64.decode(buf)?
137        } else {
138            (0).into()
139        };
140        let first_offset = if version >= 4 {
141            types::Int64.decode(buf)?
142        } else {
143            0
144        };
145        let mut unknown_tagged_fields = BTreeMap::new();
146        if version >= 12 {
147            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
148            for _ in 0..num_tagged_fields {
149                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
150                let size: u32 = types::UnsignedVarInt.decode(buf)?;
151                let unknown_value = buf.try_get_bytes(size as usize)?;
152                unknown_tagged_fields.insert(tag as i32, unknown_value);
153            }
154        }
155        Ok(Self {
156            producer_id,
157            first_offset,
158            unknown_tagged_fields,
159        })
160    }
161}
162
163impl Default for AbortedTransaction {
164    fn default() -> Self {
165        Self {
166            producer_id: (0).into(),
167            first_offset: 0,
168            unknown_tagged_fields: BTreeMap::new(),
169        }
170    }
171}
172
173impl Message for AbortedTransaction {
174    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
175    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
176}
177
178/// Valid versions: 0-16
179#[non_exhaustive]
180#[derive(Debug, Clone, PartialEq)]
181pub struct EpochEndOffset {
182    ///
183    ///
184    /// Supported API versions: 12-16
185    pub epoch: i32,
186
187    ///
188    ///
189    /// Supported API versions: 12-16
190    pub end_offset: i64,
191
192    /// Other tagged fields
193    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
194}
195
196impl EpochEndOffset {
197    /// Sets `epoch` to the passed value.
198    ///
199    ///
200    ///
201    /// Supported API versions: 12-16
202    pub fn with_epoch(mut self, value: i32) -> Self {
203        self.epoch = value;
204        self
205    }
206    /// Sets `end_offset` to the passed value.
207    ///
208    ///
209    ///
210    /// Supported API versions: 12-16
211    pub fn with_end_offset(mut self, value: i64) -> Self {
212        self.end_offset = value;
213        self
214    }
215    /// Sets unknown_tagged_fields to the passed value.
216    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
217        self.unknown_tagged_fields = value;
218        self
219    }
220    /// Inserts an entry into unknown_tagged_fields.
221    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
222        self.unknown_tagged_fields.insert(key, value);
223        self
224    }
225}
226
227#[cfg(feature = "broker")]
228impl Encodable for EpochEndOffset {
229    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
230        if version >= 12 {
231            types::Int32.encode(buf, &self.epoch)?;
232        } else {
233            if self.epoch != -1 {
234                bail!("A field is set that is not available on the selected protocol version");
235            }
236        }
237        if version >= 12 {
238            types::Int64.encode(buf, &self.end_offset)?;
239        } else {
240            if self.end_offset != -1 {
241                bail!("A field is set that is not available on the selected protocol version");
242            }
243        }
244        if version >= 12 {
245            let num_tagged_fields = self.unknown_tagged_fields.len();
246            if num_tagged_fields > std::u32::MAX as usize {
247                bail!(
248                    "Too many tagged fields to encode ({} fields)",
249                    num_tagged_fields
250                );
251            }
252            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
253
254            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
255        }
256        Ok(())
257    }
258    fn compute_size(&self, version: i16) -> Result<usize> {
259        let mut total_size = 0;
260        if version >= 12 {
261            total_size += types::Int32.compute_size(&self.epoch)?;
262        } else {
263            if self.epoch != -1 {
264                bail!("A field is set that is not available on the selected protocol version");
265            }
266        }
267        if version >= 12 {
268            total_size += types::Int64.compute_size(&self.end_offset)?;
269        } else {
270            if self.end_offset != -1 {
271                bail!("A field is set that is not available on the selected protocol version");
272            }
273        }
274        if version >= 12 {
275            let num_tagged_fields = self.unknown_tagged_fields.len();
276            if num_tagged_fields > std::u32::MAX as usize {
277                bail!(
278                    "Too many tagged fields to encode ({} fields)",
279                    num_tagged_fields
280                );
281            }
282            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
283
284            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
285        }
286        Ok(total_size)
287    }
288}
289
290#[cfg(feature = "client")]
291impl Decodable for EpochEndOffset {
292    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
293        let epoch = if version >= 12 {
294            types::Int32.decode(buf)?
295        } else {
296            -1
297        };
298        let end_offset = if version >= 12 {
299            types::Int64.decode(buf)?
300        } else {
301            -1
302        };
303        let mut unknown_tagged_fields = BTreeMap::new();
304        if version >= 12 {
305            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
306            for _ in 0..num_tagged_fields {
307                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
308                let size: u32 = types::UnsignedVarInt.decode(buf)?;
309                let unknown_value = buf.try_get_bytes(size as usize)?;
310                unknown_tagged_fields.insert(tag as i32, unknown_value);
311            }
312        }
313        Ok(Self {
314            epoch,
315            end_offset,
316            unknown_tagged_fields,
317        })
318    }
319}
320
321impl Default for EpochEndOffset {
322    fn default() -> Self {
323        Self {
324            epoch: -1,
325            end_offset: -1,
326            unknown_tagged_fields: BTreeMap::new(),
327        }
328    }
329}
330
331impl Message for EpochEndOffset {
332    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
333    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
334}
335
336/// Valid versions: 0-16
337#[non_exhaustive]
338#[derive(Debug, Clone, PartialEq)]
339pub struct FetchResponse {
340    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
341    ///
342    /// Supported API versions: 1-16
343    pub throttle_time_ms: i32,
344
345    /// The top level response error code.
346    ///
347    /// Supported API versions: 7-16
348    pub error_code: i16,
349
350    /// The fetch session ID, or 0 if this is not part of a fetch session.
351    ///
352    /// Supported API versions: 7-16
353    pub session_id: i32,
354
355    /// The response topics.
356    ///
357    /// Supported API versions: 0-16
358    pub responses: Vec<FetchableTopicResponse>,
359
360    /// Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.
361    ///
362    /// Supported API versions: 16
363    pub node_endpoints: Vec<NodeEndpoint>,
364
365    /// Other tagged fields
366    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
367}
368
369impl FetchResponse {
370    /// Sets `throttle_time_ms` to the passed value.
371    ///
372    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
373    ///
374    /// Supported API versions: 1-16
375    pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
376        self.throttle_time_ms = value;
377        self
378    }
379    /// Sets `error_code` to the passed value.
380    ///
381    /// The top level response error code.
382    ///
383    /// Supported API versions: 7-16
384    pub fn with_error_code(mut self, value: i16) -> Self {
385        self.error_code = value;
386        self
387    }
388    /// Sets `session_id` to the passed value.
389    ///
390    /// The fetch session ID, or 0 if this is not part of a fetch session.
391    ///
392    /// Supported API versions: 7-16
393    pub fn with_session_id(mut self, value: i32) -> Self {
394        self.session_id = value;
395        self
396    }
397    /// Sets `responses` to the passed value.
398    ///
399    /// The response topics.
400    ///
401    /// Supported API versions: 0-16
402    pub fn with_responses(mut self, value: Vec<FetchableTopicResponse>) -> Self {
403        self.responses = value;
404        self
405    }
406    /// Sets `node_endpoints` to the passed value.
407    ///
408    /// Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.
409    ///
410    /// Supported API versions: 16
411    pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
412        self.node_endpoints = value;
413        self
414    }
415    /// Sets unknown_tagged_fields to the passed value.
416    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
417        self.unknown_tagged_fields = value;
418        self
419    }
420    /// Inserts an entry into unknown_tagged_fields.
421    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
422        self.unknown_tagged_fields.insert(key, value);
423        self
424    }
425}
426
427#[cfg(feature = "broker")]
428impl Encodable for FetchResponse {
429    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
430        if version >= 1 {
431            types::Int32.encode(buf, &self.throttle_time_ms)?;
432        }
433        if version >= 7 {
434            types::Int16.encode(buf, &self.error_code)?;
435        }
436        if version >= 7 {
437            types::Int32.encode(buf, &self.session_id)?;
438        } else {
439            if self.session_id != 0 {
440                bail!("A field is set that is not available on the selected protocol version");
441            }
442        }
443        if version >= 12 {
444            types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
445        } else {
446            types::Array(types::Struct { version }).encode(buf, &self.responses)?;
447        }
448        if version >= 12 {
449            let mut num_tagged_fields = self.unknown_tagged_fields.len();
450            if version >= 16 {
451                if !self.node_endpoints.is_empty() {
452                    num_tagged_fields += 1;
453                }
454            }
455            if num_tagged_fields > std::u32::MAX as usize {
456                bail!(
457                    "Too many tagged fields to encode ({} fields)",
458                    num_tagged_fields
459                );
460            }
461            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
462            if version >= 16 {
463                if !self.node_endpoints.is_empty() {
464                    let computed_size = types::CompactArray(types::Struct { version })
465                        .compute_size(&self.node_endpoints)?;
466                    if computed_size > std::u32::MAX as usize {
467                        bail!(
468                            "Tagged field is too large to encode ({} bytes)",
469                            computed_size
470                        );
471                    }
472                    types::UnsignedVarInt.encode(buf, 0)?;
473                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
474                    types::CompactArray(types::Struct { version })
475                        .encode(buf, &self.node_endpoints)?;
476                }
477            }
478            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
479        }
480        Ok(())
481    }
482    fn compute_size(&self, version: i16) -> Result<usize> {
483        let mut total_size = 0;
484        if version >= 1 {
485            total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
486        }
487        if version >= 7 {
488            total_size += types::Int16.compute_size(&self.error_code)?;
489        }
490        if version >= 7 {
491            total_size += types::Int32.compute_size(&self.session_id)?;
492        } else {
493            if self.session_id != 0 {
494                bail!("A field is set that is not available on the selected protocol version");
495            }
496        }
497        if version >= 12 {
498            total_size +=
499                types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
500        } else {
501            total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
502        }
503        if version >= 12 {
504            let mut num_tagged_fields = self.unknown_tagged_fields.len();
505            if version >= 16 {
506                if !self.node_endpoints.is_empty() {
507                    num_tagged_fields += 1;
508                }
509            }
510            if num_tagged_fields > std::u32::MAX as usize {
511                bail!(
512                    "Too many tagged fields to encode ({} fields)",
513                    num_tagged_fields
514                );
515            }
516            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
517            if version >= 16 {
518                if !self.node_endpoints.is_empty() {
519                    let computed_size = types::CompactArray(types::Struct { version })
520                        .compute_size(&self.node_endpoints)?;
521                    if computed_size > std::u32::MAX as usize {
522                        bail!(
523                            "Tagged field is too large to encode ({} bytes)",
524                            computed_size
525                        );
526                    }
527                    total_size += types::UnsignedVarInt.compute_size(0)?;
528                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
529                    total_size += computed_size;
530                }
531            }
532            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
533        }
534        Ok(total_size)
535    }
536}
537
538#[cfg(feature = "client")]
539impl Decodable for FetchResponse {
540    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
541        let throttle_time_ms = if version >= 1 {
542            types::Int32.decode(buf)?
543        } else {
544            0
545        };
546        let error_code = if version >= 7 {
547            types::Int16.decode(buf)?
548        } else {
549            0
550        };
551        let session_id = if version >= 7 {
552            types::Int32.decode(buf)?
553        } else {
554            0
555        };
556        let responses = if version >= 12 {
557            types::CompactArray(types::Struct { version }).decode(buf)?
558        } else {
559            types::Array(types::Struct { version }).decode(buf)?
560        };
561        let mut node_endpoints = Default::default();
562        let mut unknown_tagged_fields = BTreeMap::new();
563        if version >= 12 {
564            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
565            for _ in 0..num_tagged_fields {
566                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
567                let size: u32 = types::UnsignedVarInt.decode(buf)?;
568                match tag {
569                    0 => {
570                        if version >= 16 {
571                            node_endpoints =
572                                types::CompactArray(types::Struct { version }).decode(buf)?;
573                        } else {
574                            bail!("Tag {} is not valid for version {}", tag, version);
575                        }
576                    }
577                    _ => {
578                        let unknown_value = buf.try_get_bytes(size as usize)?;
579                        unknown_tagged_fields.insert(tag as i32, unknown_value);
580                    }
581                }
582            }
583        }
584        Ok(Self {
585            throttle_time_ms,
586            error_code,
587            session_id,
588            responses,
589            node_endpoints,
590            unknown_tagged_fields,
591        })
592    }
593}
594
595impl Default for FetchResponse {
596    fn default() -> Self {
597        Self {
598            throttle_time_ms: 0,
599            error_code: 0,
600            session_id: 0,
601            responses: Default::default(),
602            node_endpoints: Default::default(),
603            unknown_tagged_fields: BTreeMap::new(),
604        }
605    }
606}
607
608impl Message for FetchResponse {
609    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
610    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
611}
612
613/// Valid versions: 0-16
614#[non_exhaustive]
615#[derive(Debug, Clone, PartialEq)]
616pub struct FetchableTopicResponse {
617    /// The topic name.
618    ///
619    /// Supported API versions: 0-12
620    pub topic: super::TopicName,
621
622    /// The unique topic ID
623    ///
624    /// Supported API versions: 13-16
625    pub topic_id: Uuid,
626
627    /// The topic partitions.
628    ///
629    /// Supported API versions: 0-16
630    pub partitions: Vec<PartitionData>,
631
632    /// Other tagged fields
633    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
634}
635
636impl FetchableTopicResponse {
637    /// Sets `topic` to the passed value.
638    ///
639    /// The topic name.
640    ///
641    /// Supported API versions: 0-12
642    pub fn with_topic(mut self, value: super::TopicName) -> Self {
643        self.topic = value;
644        self
645    }
646    /// Sets `topic_id` to the passed value.
647    ///
648    /// The unique topic ID
649    ///
650    /// Supported API versions: 13-16
651    pub fn with_topic_id(mut self, value: Uuid) -> Self {
652        self.topic_id = value;
653        self
654    }
655    /// Sets `partitions` to the passed value.
656    ///
657    /// The topic partitions.
658    ///
659    /// Supported API versions: 0-16
660    pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
661        self.partitions = value;
662        self
663    }
664    /// Sets unknown_tagged_fields to the passed value.
665    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
666        self.unknown_tagged_fields = value;
667        self
668    }
669    /// Inserts an entry into unknown_tagged_fields.
670    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
671        self.unknown_tagged_fields.insert(key, value);
672        self
673    }
674}
675
676#[cfg(feature = "broker")]
677impl Encodable for FetchableTopicResponse {
678    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
679        if version <= 12 {
680            if version >= 12 {
681                types::CompactString.encode(buf, &self.topic)?;
682            } else {
683                types::String.encode(buf, &self.topic)?;
684            }
685        }
686        if version >= 13 {
687            types::Uuid.encode(buf, &self.topic_id)?;
688        }
689        if version >= 12 {
690            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
691        } else {
692            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
693        }
694        if version >= 12 {
695            let num_tagged_fields = self.unknown_tagged_fields.len();
696            if num_tagged_fields > std::u32::MAX as usize {
697                bail!(
698                    "Too many tagged fields to encode ({} fields)",
699                    num_tagged_fields
700                );
701            }
702            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
703
704            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
705        }
706        Ok(())
707    }
708    fn compute_size(&self, version: i16) -> Result<usize> {
709        let mut total_size = 0;
710        if version <= 12 {
711            if version >= 12 {
712                total_size += types::CompactString.compute_size(&self.topic)?;
713            } else {
714                total_size += types::String.compute_size(&self.topic)?;
715            }
716        }
717        if version >= 13 {
718            total_size += types::Uuid.compute_size(&self.topic_id)?;
719        }
720        if version >= 12 {
721            total_size +=
722                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
723        } else {
724            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
725        }
726        if version >= 12 {
727            let num_tagged_fields = self.unknown_tagged_fields.len();
728            if num_tagged_fields > std::u32::MAX as usize {
729                bail!(
730                    "Too many tagged fields to encode ({} fields)",
731                    num_tagged_fields
732                );
733            }
734            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
735
736            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
737        }
738        Ok(total_size)
739    }
740}
741
742#[cfg(feature = "client")]
743impl Decodable for FetchableTopicResponse {
744    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
745        let topic = if version <= 12 {
746            if version >= 12 {
747                types::CompactString.decode(buf)?
748            } else {
749                types::String.decode(buf)?
750            }
751        } else {
752            Default::default()
753        };
754        let topic_id = if version >= 13 {
755            types::Uuid.decode(buf)?
756        } else {
757            Uuid::nil()
758        };
759        let partitions = if version >= 12 {
760            types::CompactArray(types::Struct { version }).decode(buf)?
761        } else {
762            types::Array(types::Struct { version }).decode(buf)?
763        };
764        let mut unknown_tagged_fields = BTreeMap::new();
765        if version >= 12 {
766            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
767            for _ in 0..num_tagged_fields {
768                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
769                let size: u32 = types::UnsignedVarInt.decode(buf)?;
770                let unknown_value = buf.try_get_bytes(size as usize)?;
771                unknown_tagged_fields.insert(tag as i32, unknown_value);
772            }
773        }
774        Ok(Self {
775            topic,
776            topic_id,
777            partitions,
778            unknown_tagged_fields,
779        })
780    }
781}
782
783impl Default for FetchableTopicResponse {
784    fn default() -> Self {
785        Self {
786            topic: Default::default(),
787            topic_id: Uuid::nil(),
788            partitions: Default::default(),
789            unknown_tagged_fields: BTreeMap::new(),
790        }
791    }
792}
793
794impl Message for FetchableTopicResponse {
795    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
796    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
797}
798
799/// Valid versions: 0-16
800#[non_exhaustive]
801#[derive(Debug, Clone, PartialEq)]
802pub struct LeaderIdAndEpoch {
803    /// The ID of the current leader or -1 if the leader is unknown.
804    ///
805    /// Supported API versions: 12-16
806    pub leader_id: super::BrokerId,
807
808    /// The latest known leader epoch
809    ///
810    /// Supported API versions: 12-16
811    pub leader_epoch: i32,
812
813    /// Other tagged fields
814    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
815}
816
817impl LeaderIdAndEpoch {
818    /// Sets `leader_id` to the passed value.
819    ///
820    /// The ID of the current leader or -1 if the leader is unknown.
821    ///
822    /// Supported API versions: 12-16
823    pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
824        self.leader_id = value;
825        self
826    }
827    /// Sets `leader_epoch` to the passed value.
828    ///
829    /// The latest known leader epoch
830    ///
831    /// Supported API versions: 12-16
832    pub fn with_leader_epoch(mut self, value: i32) -> Self {
833        self.leader_epoch = value;
834        self
835    }
836    /// Sets unknown_tagged_fields to the passed value.
837    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
838        self.unknown_tagged_fields = value;
839        self
840    }
841    /// Inserts an entry into unknown_tagged_fields.
842    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
843        self.unknown_tagged_fields.insert(key, value);
844        self
845    }
846}
847
848#[cfg(feature = "broker")]
849impl Encodable for LeaderIdAndEpoch {
850    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
851        if version >= 12 {
852            types::Int32.encode(buf, &self.leader_id)?;
853        } else {
854            if self.leader_id != -1 {
855                bail!("A field is set that is not available on the selected protocol version");
856            }
857        }
858        if version >= 12 {
859            types::Int32.encode(buf, &self.leader_epoch)?;
860        } else {
861            if self.leader_epoch != -1 {
862                bail!("A field is set that is not available on the selected protocol version");
863            }
864        }
865        if version >= 12 {
866            let num_tagged_fields = self.unknown_tagged_fields.len();
867            if num_tagged_fields > std::u32::MAX as usize {
868                bail!(
869                    "Too many tagged fields to encode ({} fields)",
870                    num_tagged_fields
871                );
872            }
873            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
874
875            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
876        }
877        Ok(())
878    }
879    fn compute_size(&self, version: i16) -> Result<usize> {
880        let mut total_size = 0;
881        if version >= 12 {
882            total_size += types::Int32.compute_size(&self.leader_id)?;
883        } else {
884            if self.leader_id != -1 {
885                bail!("A field is set that is not available on the selected protocol version");
886            }
887        }
888        if version >= 12 {
889            total_size += types::Int32.compute_size(&self.leader_epoch)?;
890        } else {
891            if self.leader_epoch != -1 {
892                bail!("A field is set that is not available on the selected protocol version");
893            }
894        }
895        if version >= 12 {
896            let num_tagged_fields = self.unknown_tagged_fields.len();
897            if num_tagged_fields > std::u32::MAX as usize {
898                bail!(
899                    "Too many tagged fields to encode ({} fields)",
900                    num_tagged_fields
901                );
902            }
903            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
904
905            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
906        }
907        Ok(total_size)
908    }
909}
910
911#[cfg(feature = "client")]
912impl Decodable for LeaderIdAndEpoch {
913    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
914        let leader_id = if version >= 12 {
915            types::Int32.decode(buf)?
916        } else {
917            (-1).into()
918        };
919        let leader_epoch = if version >= 12 {
920            types::Int32.decode(buf)?
921        } else {
922            -1
923        };
924        let mut unknown_tagged_fields = BTreeMap::new();
925        if version >= 12 {
926            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
927            for _ in 0..num_tagged_fields {
928                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
929                let size: u32 = types::UnsignedVarInt.decode(buf)?;
930                let unknown_value = buf.try_get_bytes(size as usize)?;
931                unknown_tagged_fields.insert(tag as i32, unknown_value);
932            }
933        }
934        Ok(Self {
935            leader_id,
936            leader_epoch,
937            unknown_tagged_fields,
938        })
939    }
940}
941
942impl Default for LeaderIdAndEpoch {
943    fn default() -> Self {
944        Self {
945            leader_id: (-1).into(),
946            leader_epoch: -1,
947            unknown_tagged_fields: BTreeMap::new(),
948        }
949    }
950}
951
952impl Message for LeaderIdAndEpoch {
953    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
954    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
955}
956
957/// Valid versions: 0-16
958#[non_exhaustive]
959#[derive(Debug, Clone, PartialEq)]
960pub struct NodeEndpoint {
961    /// The ID of the associated node.
962    ///
963    /// Supported API versions: 16
964    pub node_id: super::BrokerId,
965
966    /// The node's hostname.
967    ///
968    /// Supported API versions: 16
969    pub host: StrBytes,
970
971    /// The node's port.
972    ///
973    /// Supported API versions: 16
974    pub port: i32,
975
976    /// The rack of the node, or null if it has not been assigned to a rack.
977    ///
978    /// Supported API versions: 16
979    pub rack: Option<StrBytes>,
980
981    /// Other tagged fields
982    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
983}
984
985impl NodeEndpoint {
986    /// Sets `node_id` to the passed value.
987    ///
988    /// The ID of the associated node.
989    ///
990    /// Supported API versions: 16
991    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
992        self.node_id = value;
993        self
994    }
995    /// Sets `host` to the passed value.
996    ///
997    /// The node's hostname.
998    ///
999    /// Supported API versions: 16
1000    pub fn with_host(mut self, value: StrBytes) -> Self {
1001        self.host = value;
1002        self
1003    }
1004    /// Sets `port` to the passed value.
1005    ///
1006    /// The node's port.
1007    ///
1008    /// Supported API versions: 16
1009    pub fn with_port(mut self, value: i32) -> Self {
1010        self.port = value;
1011        self
1012    }
1013    /// Sets `rack` to the passed value.
1014    ///
1015    /// The rack of the node, or null if it has not been assigned to a rack.
1016    ///
1017    /// Supported API versions: 16
1018    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
1019        self.rack = value;
1020        self
1021    }
1022    /// Sets unknown_tagged_fields to the passed value.
1023    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1024        self.unknown_tagged_fields = value;
1025        self
1026    }
1027    /// Inserts an entry into unknown_tagged_fields.
1028    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1029        self.unknown_tagged_fields.insert(key, value);
1030        self
1031    }
1032}
1033
1034#[cfg(feature = "broker")]
1035impl Encodable for NodeEndpoint {
1036    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1037        if version >= 16 {
1038            types::Int32.encode(buf, &self.node_id)?;
1039        } else {
1040            if self.node_id != 0 {
1041                bail!("A field is set that is not available on the selected protocol version");
1042            }
1043        }
1044        if version >= 16 {
1045            types::CompactString.encode(buf, &self.host)?;
1046        } else {
1047            if !self.host.is_empty() {
1048                bail!("A field is set that is not available on the selected protocol version");
1049            }
1050        }
1051        if version >= 16 {
1052            types::Int32.encode(buf, &self.port)?;
1053        } else {
1054            if self.port != 0 {
1055                bail!("A field is set that is not available on the selected protocol version");
1056            }
1057        }
1058        if version >= 16 {
1059            types::CompactString.encode(buf, &self.rack)?;
1060        } else {
1061            if !self.rack.is_none() {
1062                bail!("A field is set that is not available on the selected protocol version");
1063            }
1064        }
1065        if version >= 12 {
1066            let num_tagged_fields = self.unknown_tagged_fields.len();
1067            if num_tagged_fields > std::u32::MAX as usize {
1068                bail!(
1069                    "Too many tagged fields to encode ({} fields)",
1070                    num_tagged_fields
1071                );
1072            }
1073            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1074
1075            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1076        }
1077        Ok(())
1078    }
1079    fn compute_size(&self, version: i16) -> Result<usize> {
1080        let mut total_size = 0;
1081        if version >= 16 {
1082            total_size += types::Int32.compute_size(&self.node_id)?;
1083        } else {
1084            if self.node_id != 0 {
1085                bail!("A field is set that is not available on the selected protocol version");
1086            }
1087        }
1088        if version >= 16 {
1089            total_size += types::CompactString.compute_size(&self.host)?;
1090        } else {
1091            if !self.host.is_empty() {
1092                bail!("A field is set that is not available on the selected protocol version");
1093            }
1094        }
1095        if version >= 16 {
1096            total_size += types::Int32.compute_size(&self.port)?;
1097        } else {
1098            if self.port != 0 {
1099                bail!("A field is set that is not available on the selected protocol version");
1100            }
1101        }
1102        if version >= 16 {
1103            total_size += types::CompactString.compute_size(&self.rack)?;
1104        } else {
1105            if !self.rack.is_none() {
1106                bail!("A field is set that is not available on the selected protocol version");
1107            }
1108        }
1109        if version >= 12 {
1110            let num_tagged_fields = self.unknown_tagged_fields.len();
1111            if num_tagged_fields > std::u32::MAX as usize {
1112                bail!(
1113                    "Too many tagged fields to encode ({} fields)",
1114                    num_tagged_fields
1115                );
1116            }
1117            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1118
1119            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1120        }
1121        Ok(total_size)
1122    }
1123}
1124
1125#[cfg(feature = "client")]
1126impl Decodable for NodeEndpoint {
1127    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1128        let node_id = if version >= 16 {
1129            types::Int32.decode(buf)?
1130        } else {
1131            (0).into()
1132        };
1133        let host = if version >= 16 {
1134            types::CompactString.decode(buf)?
1135        } else {
1136            Default::default()
1137        };
1138        let port = if version >= 16 {
1139            types::Int32.decode(buf)?
1140        } else {
1141            0
1142        };
1143        let rack = if version >= 16 {
1144            types::CompactString.decode(buf)?
1145        } else {
1146            None
1147        };
1148        let mut unknown_tagged_fields = BTreeMap::new();
1149        if version >= 12 {
1150            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1151            for _ in 0..num_tagged_fields {
1152                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1153                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1154                let unknown_value = buf.try_get_bytes(size as usize)?;
1155                unknown_tagged_fields.insert(tag as i32, unknown_value);
1156            }
1157        }
1158        Ok(Self {
1159            node_id,
1160            host,
1161            port,
1162            rack,
1163            unknown_tagged_fields,
1164        })
1165    }
1166}
1167
1168impl Default for NodeEndpoint {
1169    fn default() -> Self {
1170        Self {
1171            node_id: (0).into(),
1172            host: Default::default(),
1173            port: 0,
1174            rack: None,
1175            unknown_tagged_fields: BTreeMap::new(),
1176        }
1177    }
1178}
1179
1180impl Message for NodeEndpoint {
1181    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1182    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1183}
1184
1185/// Valid versions: 0-16
1186#[non_exhaustive]
1187#[derive(Debug, Clone, PartialEq)]
1188pub struct PartitionData {
1189    /// The partition index.
1190    ///
1191    /// Supported API versions: 0-16
1192    pub partition_index: i32,
1193
1194    /// The error code, or 0 if there was no fetch error.
1195    ///
1196    /// Supported API versions: 0-16
1197    pub error_code: i16,
1198
1199    /// The current high water mark.
1200    ///
1201    /// Supported API versions: 0-16
1202    pub high_watermark: i64,
1203
1204    /// The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)
1205    ///
1206    /// Supported API versions: 4-16
1207    pub last_stable_offset: i64,
1208
1209    /// The current log start offset.
1210    ///
1211    /// Supported API versions: 5-16
1212    pub log_start_offset: i64,
1213
1214    /// In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge
1215    ///
1216    /// Supported API versions: 12-16
1217    pub diverging_epoch: EpochEndOffset,
1218
1219    ///
1220    ///
1221    /// Supported API versions: 12-16
1222    pub current_leader: LeaderIdAndEpoch,
1223
1224    /// In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.
1225    ///
1226    /// Supported API versions: 12-16
1227    pub snapshot_id: SnapshotId,
1228
1229    /// The aborted transactions.
1230    ///
1231    /// Supported API versions: 4-16
1232    pub aborted_transactions: Option<Vec<AbortedTransaction>>,
1233
1234    /// The preferred read replica for the consumer to use on its next fetch request
1235    ///
1236    /// Supported API versions: 11-16
1237    pub preferred_read_replica: super::BrokerId,
1238
1239    /// The record data.
1240    ///
1241    /// Supported API versions: 0-16
1242    pub records: Option<Bytes>,
1243
1244    /// Other tagged fields
1245    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1246}
1247
1248impl PartitionData {
1249    /// Sets `partition_index` to the passed value.
1250    ///
1251    /// The partition index.
1252    ///
1253    /// Supported API versions: 0-16
1254    pub fn with_partition_index(mut self, value: i32) -> Self {
1255        self.partition_index = value;
1256        self
1257    }
1258    /// Sets `error_code` to the passed value.
1259    ///
1260    /// The error code, or 0 if there was no fetch error.
1261    ///
1262    /// Supported API versions: 0-16
1263    pub fn with_error_code(mut self, value: i16) -> Self {
1264        self.error_code = value;
1265        self
1266    }
1267    /// Sets `high_watermark` to the passed value.
1268    ///
1269    /// The current high water mark.
1270    ///
1271    /// Supported API versions: 0-16
1272    pub fn with_high_watermark(mut self, value: i64) -> Self {
1273        self.high_watermark = value;
1274        self
1275    }
1276    /// Sets `last_stable_offset` to the passed value.
1277    ///
1278    /// The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)
1279    ///
1280    /// Supported API versions: 4-16
1281    pub fn with_last_stable_offset(mut self, value: i64) -> Self {
1282        self.last_stable_offset = value;
1283        self
1284    }
1285    /// Sets `log_start_offset` to the passed value.
1286    ///
1287    /// The current log start offset.
1288    ///
1289    /// Supported API versions: 5-16
1290    pub fn with_log_start_offset(mut self, value: i64) -> Self {
1291        self.log_start_offset = value;
1292        self
1293    }
1294    /// Sets `diverging_epoch` to the passed value.
1295    ///
1296    /// In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge
1297    ///
1298    /// Supported API versions: 12-16
1299    pub fn with_diverging_epoch(mut self, value: EpochEndOffset) -> Self {
1300        self.diverging_epoch = value;
1301        self
1302    }
1303    /// Sets `current_leader` to the passed value.
1304    ///
1305    ///
1306    ///
1307    /// Supported API versions: 12-16
1308    pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
1309        self.current_leader = value;
1310        self
1311    }
1312    /// Sets `snapshot_id` to the passed value.
1313    ///
1314    /// In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.
1315    ///
1316    /// Supported API versions: 12-16
1317    pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
1318        self.snapshot_id = value;
1319        self
1320    }
1321    /// Sets `aborted_transactions` to the passed value.
1322    ///
1323    /// The aborted transactions.
1324    ///
1325    /// Supported API versions: 4-16
1326    pub fn with_aborted_transactions(mut self, value: Option<Vec<AbortedTransaction>>) -> Self {
1327        self.aborted_transactions = value;
1328        self
1329    }
1330    /// Sets `preferred_read_replica` to the passed value.
1331    ///
1332    /// The preferred read replica for the consumer to use on its next fetch request
1333    ///
1334    /// Supported API versions: 11-16
1335    pub fn with_preferred_read_replica(mut self, value: super::BrokerId) -> Self {
1336        self.preferred_read_replica = value;
1337        self
1338    }
1339    /// Sets `records` to the passed value.
1340    ///
1341    /// The record data.
1342    ///
1343    /// Supported API versions: 0-16
1344    pub fn with_records(mut self, value: Option<Bytes>) -> Self {
1345        self.records = value;
1346        self
1347    }
1348    /// Sets unknown_tagged_fields to the passed value.
1349    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1350        self.unknown_tagged_fields = value;
1351        self
1352    }
1353    /// Inserts an entry into unknown_tagged_fields.
1354    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1355        self.unknown_tagged_fields.insert(key, value);
1356        self
1357    }
1358}
1359
1360#[cfg(feature = "broker")]
1361impl Encodable for PartitionData {
1362    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1363        types::Int32.encode(buf, &self.partition_index)?;
1364        types::Int16.encode(buf, &self.error_code)?;
1365        types::Int64.encode(buf, &self.high_watermark)?;
1366        if version >= 4 {
1367            types::Int64.encode(buf, &self.last_stable_offset)?;
1368        }
1369        if version >= 5 {
1370            types::Int64.encode(buf, &self.log_start_offset)?;
1371        }
1372        if version >= 4 {
1373            if version >= 12 {
1374                types::CompactArray(types::Struct { version })
1375                    .encode(buf, &self.aborted_transactions)?;
1376            } else {
1377                types::Array(types::Struct { version }).encode(buf, &self.aborted_transactions)?;
1378            }
1379        }
1380        if version >= 11 {
1381            types::Int32.encode(buf, &self.preferred_read_replica)?;
1382        } else {
1383            if self.preferred_read_replica != -1 {
1384                bail!("A field is set that is not available on the selected protocol version");
1385            }
1386        }
1387        if version >= 12 {
1388            types::CompactBytes.encode(buf, &self.records)?;
1389        } else {
1390            types::Bytes.encode(buf, &self.records)?;
1391        }
1392        if version >= 12 {
1393            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1394            if &self.diverging_epoch != &Default::default() {
1395                num_tagged_fields += 1;
1396            }
1397            if &self.current_leader != &Default::default() {
1398                num_tagged_fields += 1;
1399            }
1400            if &self.snapshot_id != &Default::default() {
1401                num_tagged_fields += 1;
1402            }
1403            if num_tagged_fields > std::u32::MAX as usize {
1404                bail!(
1405                    "Too many tagged fields to encode ({} fields)",
1406                    num_tagged_fields
1407                );
1408            }
1409            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1410            if &self.diverging_epoch != &Default::default() {
1411                let computed_size =
1412                    types::Struct { version }.compute_size(&self.diverging_epoch)?;
1413                if computed_size > std::u32::MAX as usize {
1414                    bail!(
1415                        "Tagged field is too large to encode ({} bytes)",
1416                        computed_size
1417                    );
1418                }
1419                types::UnsignedVarInt.encode(buf, 0)?;
1420                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1421                types::Struct { version }.encode(buf, &self.diverging_epoch)?;
1422            }
1423            if &self.current_leader != &Default::default() {
1424                let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1425                if computed_size > std::u32::MAX as usize {
1426                    bail!(
1427                        "Tagged field is too large to encode ({} bytes)",
1428                        computed_size
1429                    );
1430                }
1431                types::UnsignedVarInt.encode(buf, 1)?;
1432                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1433                types::Struct { version }.encode(buf, &self.current_leader)?;
1434            }
1435            if &self.snapshot_id != &Default::default() {
1436                let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1437                if computed_size > std::u32::MAX as usize {
1438                    bail!(
1439                        "Tagged field is too large to encode ({} bytes)",
1440                        computed_size
1441                    );
1442                }
1443                types::UnsignedVarInt.encode(buf, 2)?;
1444                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1445                types::Struct { version }.encode(buf, &self.snapshot_id)?;
1446            }
1447
1448            write_unknown_tagged_fields(buf, 3.., &self.unknown_tagged_fields)?;
1449        }
1450        Ok(())
1451    }
1452    fn compute_size(&self, version: i16) -> Result<usize> {
1453        let mut total_size = 0;
1454        total_size += types::Int32.compute_size(&self.partition_index)?;
1455        total_size += types::Int16.compute_size(&self.error_code)?;
1456        total_size += types::Int64.compute_size(&self.high_watermark)?;
1457        if version >= 4 {
1458            total_size += types::Int64.compute_size(&self.last_stable_offset)?;
1459        }
1460        if version >= 5 {
1461            total_size += types::Int64.compute_size(&self.log_start_offset)?;
1462        }
1463        if version >= 4 {
1464            if version >= 12 {
1465                total_size += types::CompactArray(types::Struct { version })
1466                    .compute_size(&self.aborted_transactions)?;
1467            } else {
1468                total_size += types::Array(types::Struct { version })
1469                    .compute_size(&self.aborted_transactions)?;
1470            }
1471        }
1472        if version >= 11 {
1473            total_size += types::Int32.compute_size(&self.preferred_read_replica)?;
1474        } else {
1475            if self.preferred_read_replica != -1 {
1476                bail!("A field is set that is not available on the selected protocol version");
1477            }
1478        }
1479        if version >= 12 {
1480            total_size += types::CompactBytes.compute_size(&self.records)?;
1481        } else {
1482            total_size += types::Bytes.compute_size(&self.records)?;
1483        }
1484        if version >= 12 {
1485            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1486            if &self.diverging_epoch != &Default::default() {
1487                num_tagged_fields += 1;
1488            }
1489            if &self.current_leader != &Default::default() {
1490                num_tagged_fields += 1;
1491            }
1492            if &self.snapshot_id != &Default::default() {
1493                num_tagged_fields += 1;
1494            }
1495            if num_tagged_fields > std::u32::MAX as usize {
1496                bail!(
1497                    "Too many tagged fields to encode ({} fields)",
1498                    num_tagged_fields
1499                );
1500            }
1501            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1502            if &self.diverging_epoch != &Default::default() {
1503                let computed_size =
1504                    types::Struct { version }.compute_size(&self.diverging_epoch)?;
1505                if computed_size > std::u32::MAX as usize {
1506                    bail!(
1507                        "Tagged field is too large to encode ({} bytes)",
1508                        computed_size
1509                    );
1510                }
1511                total_size += types::UnsignedVarInt.compute_size(0)?;
1512                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1513                total_size += computed_size;
1514            }
1515            if &self.current_leader != &Default::default() {
1516                let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1517                if computed_size > std::u32::MAX as usize {
1518                    bail!(
1519                        "Tagged field is too large to encode ({} bytes)",
1520                        computed_size
1521                    );
1522                }
1523                total_size += types::UnsignedVarInt.compute_size(1)?;
1524                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1525                total_size += computed_size;
1526            }
1527            if &self.snapshot_id != &Default::default() {
1528                let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1529                if computed_size > std::u32::MAX as usize {
1530                    bail!(
1531                        "Tagged field is too large to encode ({} bytes)",
1532                        computed_size
1533                    );
1534                }
1535                total_size += types::UnsignedVarInt.compute_size(2)?;
1536                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1537                total_size += computed_size;
1538            }
1539
1540            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1541        }
1542        Ok(total_size)
1543    }
1544}
1545
1546#[cfg(feature = "client")]
1547impl Decodable for PartitionData {
1548    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1549        let partition_index = types::Int32.decode(buf)?;
1550        let error_code = types::Int16.decode(buf)?;
1551        let high_watermark = types::Int64.decode(buf)?;
1552        let last_stable_offset = if version >= 4 {
1553            types::Int64.decode(buf)?
1554        } else {
1555            -1
1556        };
1557        let log_start_offset = if version >= 5 {
1558            types::Int64.decode(buf)?
1559        } else {
1560            -1
1561        };
1562        let mut diverging_epoch = Default::default();
1563        let mut current_leader = Default::default();
1564        let mut snapshot_id = Default::default();
1565        let aborted_transactions = if version >= 4 {
1566            if version >= 12 {
1567                types::CompactArray(types::Struct { version }).decode(buf)?
1568            } else {
1569                types::Array(types::Struct { version }).decode(buf)?
1570            }
1571        } else {
1572            Some(Default::default())
1573        };
1574        let preferred_read_replica = if version >= 11 {
1575            types::Int32.decode(buf)?
1576        } else {
1577            (-1).into()
1578        };
1579        let records = if version >= 12 {
1580            types::CompactBytes.decode(buf)?
1581        } else {
1582            types::Bytes.decode(buf)?
1583        };
1584        let mut unknown_tagged_fields = BTreeMap::new();
1585        if version >= 12 {
1586            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1587            for _ in 0..num_tagged_fields {
1588                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1589                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1590                match tag {
1591                    0 => {
1592                        diverging_epoch = types::Struct { version }.decode(buf)?;
1593                    }
1594                    1 => {
1595                        current_leader = types::Struct { version }.decode(buf)?;
1596                    }
1597                    2 => {
1598                        snapshot_id = types::Struct { version }.decode(buf)?;
1599                    }
1600                    _ => {
1601                        let unknown_value = buf.try_get_bytes(size as usize)?;
1602                        unknown_tagged_fields.insert(tag as i32, unknown_value);
1603                    }
1604                }
1605            }
1606        }
1607        Ok(Self {
1608            partition_index,
1609            error_code,
1610            high_watermark,
1611            last_stable_offset,
1612            log_start_offset,
1613            diverging_epoch,
1614            current_leader,
1615            snapshot_id,
1616            aborted_transactions,
1617            preferred_read_replica,
1618            records,
1619            unknown_tagged_fields,
1620        })
1621    }
1622}
1623
1624impl Default for PartitionData {
1625    fn default() -> Self {
1626        Self {
1627            partition_index: 0,
1628            error_code: 0,
1629            high_watermark: 0,
1630            last_stable_offset: -1,
1631            log_start_offset: -1,
1632            diverging_epoch: Default::default(),
1633            current_leader: Default::default(),
1634            snapshot_id: Default::default(),
1635            aborted_transactions: Some(Default::default()),
1636            preferred_read_replica: (-1).into(),
1637            records: Some(Default::default()),
1638            unknown_tagged_fields: BTreeMap::new(),
1639        }
1640    }
1641}
1642
1643impl Message for PartitionData {
1644    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1645    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1646}
1647
1648/// Valid versions: 0-16
1649#[non_exhaustive]
1650#[derive(Debug, Clone, PartialEq)]
1651pub struct SnapshotId {
1652    ///
1653    ///
1654    /// Supported API versions: 0-16
1655    pub end_offset: i64,
1656
1657    ///
1658    ///
1659    /// Supported API versions: 0-16
1660    pub epoch: i32,
1661
1662    /// Other tagged fields
1663    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1664}
1665
1666impl SnapshotId {
1667    /// Sets `end_offset` to the passed value.
1668    ///
1669    ///
1670    ///
1671    /// Supported API versions: 0-16
1672    pub fn with_end_offset(mut self, value: i64) -> Self {
1673        self.end_offset = value;
1674        self
1675    }
1676    /// Sets `epoch` to the passed value.
1677    ///
1678    ///
1679    ///
1680    /// Supported API versions: 0-16
1681    pub fn with_epoch(mut self, value: i32) -> Self {
1682        self.epoch = value;
1683        self
1684    }
1685    /// Sets unknown_tagged_fields to the passed value.
1686    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1687        self.unknown_tagged_fields = value;
1688        self
1689    }
1690    /// Inserts an entry into unknown_tagged_fields.
1691    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1692        self.unknown_tagged_fields.insert(key, value);
1693        self
1694    }
1695}
1696
1697#[cfg(feature = "broker")]
1698impl Encodable for SnapshotId {
1699    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1700        types::Int64.encode(buf, &self.end_offset)?;
1701        types::Int32.encode(buf, &self.epoch)?;
1702        if version >= 12 {
1703            let num_tagged_fields = self.unknown_tagged_fields.len();
1704            if num_tagged_fields > std::u32::MAX as usize {
1705                bail!(
1706                    "Too many tagged fields to encode ({} fields)",
1707                    num_tagged_fields
1708                );
1709            }
1710            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1711
1712            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1713        }
1714        Ok(())
1715    }
1716    fn compute_size(&self, version: i16) -> Result<usize> {
1717        let mut total_size = 0;
1718        total_size += types::Int64.compute_size(&self.end_offset)?;
1719        total_size += types::Int32.compute_size(&self.epoch)?;
1720        if version >= 12 {
1721            let num_tagged_fields = self.unknown_tagged_fields.len();
1722            if num_tagged_fields > std::u32::MAX as usize {
1723                bail!(
1724                    "Too many tagged fields to encode ({} fields)",
1725                    num_tagged_fields
1726                );
1727            }
1728            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1729
1730            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1731        }
1732        Ok(total_size)
1733    }
1734}
1735
1736#[cfg(feature = "client")]
1737impl Decodable for SnapshotId {
1738    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1739        let end_offset = types::Int64.decode(buf)?;
1740        let epoch = types::Int32.decode(buf)?;
1741        let mut unknown_tagged_fields = BTreeMap::new();
1742        if version >= 12 {
1743            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1744            for _ in 0..num_tagged_fields {
1745                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1746                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1747                let unknown_value = buf.try_get_bytes(size as usize)?;
1748                unknown_tagged_fields.insert(tag as i32, unknown_value);
1749            }
1750        }
1751        Ok(Self {
1752            end_offset,
1753            epoch,
1754            unknown_tagged_fields,
1755        })
1756    }
1757}
1758
1759impl Default for SnapshotId {
1760    fn default() -> Self {
1761        Self {
1762            end_offset: -1,
1763            epoch: -1,
1764            unknown_tagged_fields: BTreeMap::new(),
1765        }
1766    }
1767}
1768
1769impl Message for SnapshotId {
1770    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1771    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1772}
1773
1774impl HeaderVersion for FetchResponse {
1775    fn header_version(version: i16) -> i16 {
1776        if version >= 12 {
1777            1
1778        } else {
1779            0
1780        }
1781    }
1782}