kafka_protocol/messages/
fetch_response.rs

1//! FetchResponse
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FetchResponse.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-17
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AbortedTransaction {
24    /// The producer id associated with the aborted transaction.
25    ///
26    /// Supported API versions: 4-17
27    pub producer_id: super::ProducerId,
28
29    /// The first offset in the aborted transaction.
30    ///
31    /// Supported API versions: 4-17
32    pub first_offset: i64,
33
34    /// Other tagged fields
35    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AbortedTransaction {
39    /// Sets `producer_id` to the passed value.
40    ///
41    /// The producer id associated with the aborted transaction.
42    ///
43    /// Supported API versions: 4-17
44    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
45        self.producer_id = value;
46        self
47    }
48    /// Sets `first_offset` to the passed value.
49    ///
50    /// The first offset in the aborted transaction.
51    ///
52    /// Supported API versions: 4-17
53    pub fn with_first_offset(mut self, value: i64) -> Self {
54        self.first_offset = value;
55        self
56    }
57    /// Sets unknown_tagged_fields to the passed value.
58    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59        self.unknown_tagged_fields = value;
60        self
61    }
62    /// Inserts an entry into unknown_tagged_fields.
63    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64        self.unknown_tagged_fields.insert(key, value);
65        self
66    }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AbortedTransaction {
71    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72        if version < 0 || version > 17 {
73            bail!("specified version not supported by this message type");
74        }
75        if version >= 4 {
76            types::Int64.encode(buf, &self.producer_id)?;
77        } else {
78            if self.producer_id != 0 {
79                bail!("A field is set that is not available on the selected protocol version");
80            }
81        }
82        if version >= 4 {
83            types::Int64.encode(buf, &self.first_offset)?;
84        } else {
85            if self.first_offset != 0 {
86                bail!("A field is set that is not available on the selected protocol version");
87            }
88        }
89        if version >= 12 {
90            let num_tagged_fields = self.unknown_tagged_fields.len();
91            if num_tagged_fields > std::u32::MAX as usize {
92                bail!(
93                    "Too many tagged fields to encode ({} fields)",
94                    num_tagged_fields
95                );
96            }
97            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
98
99            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
100        }
101        Ok(())
102    }
103    fn compute_size(&self, version: i16) -> Result<usize> {
104        let mut total_size = 0;
105        if version >= 4 {
106            total_size += types::Int64.compute_size(&self.producer_id)?;
107        } else {
108            if self.producer_id != 0 {
109                bail!("A field is set that is not available on the selected protocol version");
110            }
111        }
112        if version >= 4 {
113            total_size += types::Int64.compute_size(&self.first_offset)?;
114        } else {
115            if self.first_offset != 0 {
116                bail!("A field is set that is not available on the selected protocol version");
117            }
118        }
119        if version >= 12 {
120            let num_tagged_fields = self.unknown_tagged_fields.len();
121            if num_tagged_fields > std::u32::MAX as usize {
122                bail!(
123                    "Too many tagged fields to encode ({} fields)",
124                    num_tagged_fields
125                );
126            }
127            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
128
129            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
130        }
131        Ok(total_size)
132    }
133}
134
135#[cfg(feature = "client")]
136impl Decodable for AbortedTransaction {
137    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
138        if version < 0 || version > 17 {
139            bail!("specified version not supported by this message type");
140        }
141        let producer_id = if version >= 4 {
142            types::Int64.decode(buf)?
143        } else {
144            (0).into()
145        };
146        let first_offset = if version >= 4 {
147            types::Int64.decode(buf)?
148        } else {
149            0
150        };
151        let mut unknown_tagged_fields = BTreeMap::new();
152        if version >= 12 {
153            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
154            for _ in 0..num_tagged_fields {
155                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
156                let size: u32 = types::UnsignedVarInt.decode(buf)?;
157                let unknown_value = buf.try_get_bytes(size as usize)?;
158                unknown_tagged_fields.insert(tag as i32, unknown_value);
159            }
160        }
161        Ok(Self {
162            producer_id,
163            first_offset,
164            unknown_tagged_fields,
165        })
166    }
167}
168
169impl Default for AbortedTransaction {
170    fn default() -> Self {
171        Self {
172            producer_id: (0).into(),
173            first_offset: 0,
174            unknown_tagged_fields: BTreeMap::new(),
175        }
176    }
177}
178
179impl Message for AbortedTransaction {
180    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
181    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
182}
183
184/// Valid versions: 0-17
185#[non_exhaustive]
186#[derive(Debug, Clone, PartialEq)]
187pub struct EpochEndOffset {
188    ///
189    ///
190    /// Supported API versions: 12-17
191    pub epoch: i32,
192
193    ///
194    ///
195    /// Supported API versions: 12-17
196    pub end_offset: i64,
197
198    /// Other tagged fields
199    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
200}
201
202impl EpochEndOffset {
203    /// Sets `epoch` to the passed value.
204    ///
205    ///
206    ///
207    /// Supported API versions: 12-17
208    pub fn with_epoch(mut self, value: i32) -> Self {
209        self.epoch = value;
210        self
211    }
212    /// Sets `end_offset` to the passed value.
213    ///
214    ///
215    ///
216    /// Supported API versions: 12-17
217    pub fn with_end_offset(mut self, value: i64) -> Self {
218        self.end_offset = value;
219        self
220    }
221    /// Sets unknown_tagged_fields to the passed value.
222    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
223        self.unknown_tagged_fields = value;
224        self
225    }
226    /// Inserts an entry into unknown_tagged_fields.
227    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
228        self.unknown_tagged_fields.insert(key, value);
229        self
230    }
231}
232
233#[cfg(feature = "broker")]
234impl Encodable for EpochEndOffset {
235    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
236        if version < 0 || version > 17 {
237            bail!("specified version not supported by this message type");
238        }
239        if version >= 12 {
240            types::Int32.encode(buf, &self.epoch)?;
241        } else {
242            if self.epoch != -1 {
243                bail!("A field is set that is not available on the selected protocol version");
244            }
245        }
246        if version >= 12 {
247            types::Int64.encode(buf, &self.end_offset)?;
248        } else {
249            if self.end_offset != -1 {
250                bail!("A field is set that is not available on the selected protocol version");
251            }
252        }
253        if version >= 12 {
254            let num_tagged_fields = self.unknown_tagged_fields.len();
255            if num_tagged_fields > std::u32::MAX as usize {
256                bail!(
257                    "Too many tagged fields to encode ({} fields)",
258                    num_tagged_fields
259                );
260            }
261            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
262
263            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
264        }
265        Ok(())
266    }
267    fn compute_size(&self, version: i16) -> Result<usize> {
268        let mut total_size = 0;
269        if version >= 12 {
270            total_size += types::Int32.compute_size(&self.epoch)?;
271        } else {
272            if self.epoch != -1 {
273                bail!("A field is set that is not available on the selected protocol version");
274            }
275        }
276        if version >= 12 {
277            total_size += types::Int64.compute_size(&self.end_offset)?;
278        } else {
279            if self.end_offset != -1 {
280                bail!("A field is set that is not available on the selected protocol version");
281            }
282        }
283        if version >= 12 {
284            let num_tagged_fields = self.unknown_tagged_fields.len();
285            if num_tagged_fields > std::u32::MAX as usize {
286                bail!(
287                    "Too many tagged fields to encode ({} fields)",
288                    num_tagged_fields
289                );
290            }
291            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
292
293            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
294        }
295        Ok(total_size)
296    }
297}
298
299#[cfg(feature = "client")]
300impl Decodable for EpochEndOffset {
301    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
302        if version < 0 || version > 17 {
303            bail!("specified version not supported by this message type");
304        }
305        let epoch = if version >= 12 {
306            types::Int32.decode(buf)?
307        } else {
308            -1
309        };
310        let end_offset = if version >= 12 {
311            types::Int64.decode(buf)?
312        } else {
313            -1
314        };
315        let mut unknown_tagged_fields = BTreeMap::new();
316        if version >= 12 {
317            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
318            for _ in 0..num_tagged_fields {
319                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
320                let size: u32 = types::UnsignedVarInt.decode(buf)?;
321                let unknown_value = buf.try_get_bytes(size as usize)?;
322                unknown_tagged_fields.insert(tag as i32, unknown_value);
323            }
324        }
325        Ok(Self {
326            epoch,
327            end_offset,
328            unknown_tagged_fields,
329        })
330    }
331}
332
333impl Default for EpochEndOffset {
334    fn default() -> Self {
335        Self {
336            epoch: -1,
337            end_offset: -1,
338            unknown_tagged_fields: BTreeMap::new(),
339        }
340    }
341}
342
343impl Message for EpochEndOffset {
344    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
345    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
346}
347
348/// Valid versions: 0-17
349#[non_exhaustive]
350#[derive(Debug, Clone, PartialEq)]
351pub struct FetchResponse {
352    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
353    ///
354    /// Supported API versions: 1-17
355    pub throttle_time_ms: i32,
356
357    /// The top level response error code.
358    ///
359    /// Supported API versions: 7-17
360    pub error_code: i16,
361
362    /// The fetch session ID, or 0 if this is not part of a fetch session.
363    ///
364    /// Supported API versions: 7-17
365    pub session_id: i32,
366
367    /// The response topics.
368    ///
369    /// Supported API versions: 0-17
370    pub responses: Vec<FetchableTopicResponse>,
371
372    /// Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.
373    ///
374    /// Supported API versions: 16-17
375    pub node_endpoints: Vec<NodeEndpoint>,
376
377    /// Other tagged fields
378    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
379}
380
381impl FetchResponse {
382    /// Sets `throttle_time_ms` to the passed value.
383    ///
384    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
385    ///
386    /// Supported API versions: 1-17
387    pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
388        self.throttle_time_ms = value;
389        self
390    }
391    /// Sets `error_code` to the passed value.
392    ///
393    /// The top level response error code.
394    ///
395    /// Supported API versions: 7-17
396    pub fn with_error_code(mut self, value: i16) -> Self {
397        self.error_code = value;
398        self
399    }
400    /// Sets `session_id` to the passed value.
401    ///
402    /// The fetch session ID, or 0 if this is not part of a fetch session.
403    ///
404    /// Supported API versions: 7-17
405    pub fn with_session_id(mut self, value: i32) -> Self {
406        self.session_id = value;
407        self
408    }
409    /// Sets `responses` to the passed value.
410    ///
411    /// The response topics.
412    ///
413    /// Supported API versions: 0-17
414    pub fn with_responses(mut self, value: Vec<FetchableTopicResponse>) -> Self {
415        self.responses = value;
416        self
417    }
418    /// Sets `node_endpoints` to the passed value.
419    ///
420    /// Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.
421    ///
422    /// Supported API versions: 16-17
423    pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
424        self.node_endpoints = value;
425        self
426    }
427    /// Sets unknown_tagged_fields to the passed value.
428    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
429        self.unknown_tagged_fields = value;
430        self
431    }
432    /// Inserts an entry into unknown_tagged_fields.
433    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
434        self.unknown_tagged_fields.insert(key, value);
435        self
436    }
437}
438
439#[cfg(feature = "broker")]
440impl Encodable for FetchResponse {
441    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
442        if version < 0 || version > 17 {
443            bail!("specified version not supported by this message type");
444        }
445        if version >= 1 {
446            types::Int32.encode(buf, &self.throttle_time_ms)?;
447        }
448        if version >= 7 {
449            types::Int16.encode(buf, &self.error_code)?;
450        }
451        if version >= 7 {
452            types::Int32.encode(buf, &self.session_id)?;
453        } else {
454            if self.session_id != 0 {
455                bail!("A field is set that is not available on the selected protocol version");
456            }
457        }
458        if version >= 12 {
459            types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
460        } else {
461            types::Array(types::Struct { version }).encode(buf, &self.responses)?;
462        }
463        if version >= 12 {
464            let mut num_tagged_fields = self.unknown_tagged_fields.len();
465            if version >= 16 {
466                if !self.node_endpoints.is_empty() {
467                    num_tagged_fields += 1;
468                }
469            }
470            if num_tagged_fields > std::u32::MAX as usize {
471                bail!(
472                    "Too many tagged fields to encode ({} fields)",
473                    num_tagged_fields
474                );
475            }
476            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
477            if version >= 16 {
478                if !self.node_endpoints.is_empty() {
479                    let computed_size = types::CompactArray(types::Struct { version })
480                        .compute_size(&self.node_endpoints)?;
481                    if computed_size > std::u32::MAX as usize {
482                        bail!(
483                            "Tagged field is too large to encode ({} bytes)",
484                            computed_size
485                        );
486                    }
487                    types::UnsignedVarInt.encode(buf, 0)?;
488                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
489                    types::CompactArray(types::Struct { version })
490                        .encode(buf, &self.node_endpoints)?;
491                }
492            }
493            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
494        }
495        Ok(())
496    }
497    fn compute_size(&self, version: i16) -> Result<usize> {
498        let mut total_size = 0;
499        if version >= 1 {
500            total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
501        }
502        if version >= 7 {
503            total_size += types::Int16.compute_size(&self.error_code)?;
504        }
505        if version >= 7 {
506            total_size += types::Int32.compute_size(&self.session_id)?;
507        } else {
508            if self.session_id != 0 {
509                bail!("A field is set that is not available on the selected protocol version");
510            }
511        }
512        if version >= 12 {
513            total_size +=
514                types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
515        } else {
516            total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
517        }
518        if version >= 12 {
519            let mut num_tagged_fields = self.unknown_tagged_fields.len();
520            if version >= 16 {
521                if !self.node_endpoints.is_empty() {
522                    num_tagged_fields += 1;
523                }
524            }
525            if num_tagged_fields > std::u32::MAX as usize {
526                bail!(
527                    "Too many tagged fields to encode ({} fields)",
528                    num_tagged_fields
529                );
530            }
531            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
532            if version >= 16 {
533                if !self.node_endpoints.is_empty() {
534                    let computed_size = types::CompactArray(types::Struct { version })
535                        .compute_size(&self.node_endpoints)?;
536                    if computed_size > std::u32::MAX as usize {
537                        bail!(
538                            "Tagged field is too large to encode ({} bytes)",
539                            computed_size
540                        );
541                    }
542                    total_size += types::UnsignedVarInt.compute_size(0)?;
543                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
544                    total_size += computed_size;
545                }
546            }
547            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
548        }
549        Ok(total_size)
550    }
551}
552
553#[cfg(feature = "client")]
554impl Decodable for FetchResponse {
555    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
556        if version < 0 || version > 17 {
557            bail!("specified version not supported by this message type");
558        }
559        let throttle_time_ms = if version >= 1 {
560            types::Int32.decode(buf)?
561        } else {
562            0
563        };
564        let error_code = if version >= 7 {
565            types::Int16.decode(buf)?
566        } else {
567            0
568        };
569        let session_id = if version >= 7 {
570            types::Int32.decode(buf)?
571        } else {
572            0
573        };
574        let responses = if version >= 12 {
575            types::CompactArray(types::Struct { version }).decode(buf)?
576        } else {
577            types::Array(types::Struct { version }).decode(buf)?
578        };
579        let mut node_endpoints = Default::default();
580        let mut unknown_tagged_fields = BTreeMap::new();
581        if version >= 12 {
582            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
583            for _ in 0..num_tagged_fields {
584                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
585                let size: u32 = types::UnsignedVarInt.decode(buf)?;
586                match tag {
587                    0 => {
588                        if version >= 16 {
589                            node_endpoints =
590                                types::CompactArray(types::Struct { version }).decode(buf)?;
591                        } else {
592                            bail!("Tag {} is not valid for version {}", tag, version);
593                        }
594                    }
595                    _ => {
596                        let unknown_value = buf.try_get_bytes(size as usize)?;
597                        unknown_tagged_fields.insert(tag as i32, unknown_value);
598                    }
599                }
600            }
601        }
602        Ok(Self {
603            throttle_time_ms,
604            error_code,
605            session_id,
606            responses,
607            node_endpoints,
608            unknown_tagged_fields,
609        })
610    }
611}
612
613impl Default for FetchResponse {
614    fn default() -> Self {
615        Self {
616            throttle_time_ms: 0,
617            error_code: 0,
618            session_id: 0,
619            responses: Default::default(),
620            node_endpoints: Default::default(),
621            unknown_tagged_fields: BTreeMap::new(),
622        }
623    }
624}
625
626impl Message for FetchResponse {
627    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
628    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
629}
630
631/// Valid versions: 0-17
632#[non_exhaustive]
633#[derive(Debug, Clone, PartialEq)]
634pub struct FetchableTopicResponse {
635    /// The topic name.
636    ///
637    /// Supported API versions: 0-12
638    pub topic: super::TopicName,
639
640    /// The unique topic ID
641    ///
642    /// Supported API versions: 13-17
643    pub topic_id: Uuid,
644
645    /// The topic partitions.
646    ///
647    /// Supported API versions: 0-17
648    pub partitions: Vec<PartitionData>,
649
650    /// Other tagged fields
651    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
652}
653
654impl FetchableTopicResponse {
655    /// Sets `topic` to the passed value.
656    ///
657    /// The topic name.
658    ///
659    /// Supported API versions: 0-12
660    pub fn with_topic(mut self, value: super::TopicName) -> Self {
661        self.topic = value;
662        self
663    }
664    /// Sets `topic_id` to the passed value.
665    ///
666    /// The unique topic ID
667    ///
668    /// Supported API versions: 13-17
669    pub fn with_topic_id(mut self, value: Uuid) -> Self {
670        self.topic_id = value;
671        self
672    }
673    /// Sets `partitions` to the passed value.
674    ///
675    /// The topic partitions.
676    ///
677    /// Supported API versions: 0-17
678    pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
679        self.partitions = value;
680        self
681    }
682    /// Sets unknown_tagged_fields to the passed value.
683    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
684        self.unknown_tagged_fields = value;
685        self
686    }
687    /// Inserts an entry into unknown_tagged_fields.
688    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
689        self.unknown_tagged_fields.insert(key, value);
690        self
691    }
692}
693
694#[cfg(feature = "broker")]
695impl Encodable for FetchableTopicResponse {
696    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
697        if version < 0 || version > 17 {
698            bail!("specified version not supported by this message type");
699        }
700        if version <= 12 {
701            if version >= 12 {
702                types::CompactString.encode(buf, &self.topic)?;
703            } else {
704                types::String.encode(buf, &self.topic)?;
705            }
706        }
707        if version >= 13 {
708            types::Uuid.encode(buf, &self.topic_id)?;
709        }
710        if version >= 12 {
711            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
712        } else {
713            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
714        }
715        if version >= 12 {
716            let num_tagged_fields = self.unknown_tagged_fields.len();
717            if num_tagged_fields > std::u32::MAX as usize {
718                bail!(
719                    "Too many tagged fields to encode ({} fields)",
720                    num_tagged_fields
721                );
722            }
723            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
724
725            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
726        }
727        Ok(())
728    }
729    fn compute_size(&self, version: i16) -> Result<usize> {
730        let mut total_size = 0;
731        if version <= 12 {
732            if version >= 12 {
733                total_size += types::CompactString.compute_size(&self.topic)?;
734            } else {
735                total_size += types::String.compute_size(&self.topic)?;
736            }
737        }
738        if version >= 13 {
739            total_size += types::Uuid.compute_size(&self.topic_id)?;
740        }
741        if version >= 12 {
742            total_size +=
743                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
744        } else {
745            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
746        }
747        if version >= 12 {
748            let num_tagged_fields = self.unknown_tagged_fields.len();
749            if num_tagged_fields > std::u32::MAX as usize {
750                bail!(
751                    "Too many tagged fields to encode ({} fields)",
752                    num_tagged_fields
753                );
754            }
755            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
756
757            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
758        }
759        Ok(total_size)
760    }
761}
762
763#[cfg(feature = "client")]
764impl Decodable for FetchableTopicResponse {
765    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
766        if version < 0 || version > 17 {
767            bail!("specified version not supported by this message type");
768        }
769        let topic = if version <= 12 {
770            if version >= 12 {
771                types::CompactString.decode(buf)?
772            } else {
773                types::String.decode(buf)?
774            }
775        } else {
776            Default::default()
777        };
778        let topic_id = if version >= 13 {
779            types::Uuid.decode(buf)?
780        } else {
781            Uuid::nil()
782        };
783        let partitions = if version >= 12 {
784            types::CompactArray(types::Struct { version }).decode(buf)?
785        } else {
786            types::Array(types::Struct { version }).decode(buf)?
787        };
788        let mut unknown_tagged_fields = BTreeMap::new();
789        if version >= 12 {
790            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
791            for _ in 0..num_tagged_fields {
792                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
793                let size: u32 = types::UnsignedVarInt.decode(buf)?;
794                let unknown_value = buf.try_get_bytes(size as usize)?;
795                unknown_tagged_fields.insert(tag as i32, unknown_value);
796            }
797        }
798        Ok(Self {
799            topic,
800            topic_id,
801            partitions,
802            unknown_tagged_fields,
803        })
804    }
805}
806
807impl Default for FetchableTopicResponse {
808    fn default() -> Self {
809        Self {
810            topic: Default::default(),
811            topic_id: Uuid::nil(),
812            partitions: Default::default(),
813            unknown_tagged_fields: BTreeMap::new(),
814        }
815    }
816}
817
818impl Message for FetchableTopicResponse {
819    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
820    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
821}
822
823/// Valid versions: 0-17
824#[non_exhaustive]
825#[derive(Debug, Clone, PartialEq)]
826pub struct LeaderIdAndEpoch {
827    /// The ID of the current leader or -1 if the leader is unknown.
828    ///
829    /// Supported API versions: 12-17
830    pub leader_id: super::BrokerId,
831
832    /// The latest known leader epoch
833    ///
834    /// Supported API versions: 12-17
835    pub leader_epoch: i32,
836
837    /// Other tagged fields
838    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
839}
840
841impl LeaderIdAndEpoch {
842    /// Sets `leader_id` to the passed value.
843    ///
844    /// The ID of the current leader or -1 if the leader is unknown.
845    ///
846    /// Supported API versions: 12-17
847    pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
848        self.leader_id = value;
849        self
850    }
851    /// Sets `leader_epoch` to the passed value.
852    ///
853    /// The latest known leader epoch
854    ///
855    /// Supported API versions: 12-17
856    pub fn with_leader_epoch(mut self, value: i32) -> Self {
857        self.leader_epoch = value;
858        self
859    }
860    /// Sets unknown_tagged_fields to the passed value.
861    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
862        self.unknown_tagged_fields = value;
863        self
864    }
865    /// Inserts an entry into unknown_tagged_fields.
866    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
867        self.unknown_tagged_fields.insert(key, value);
868        self
869    }
870}
871
872#[cfg(feature = "broker")]
873impl Encodable for LeaderIdAndEpoch {
874    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
875        if version < 0 || version > 17 {
876            bail!("specified version not supported by this message type");
877        }
878        if version >= 12 {
879            types::Int32.encode(buf, &self.leader_id)?;
880        } else {
881            if self.leader_id != -1 {
882                bail!("A field is set that is not available on the selected protocol version");
883            }
884        }
885        if version >= 12 {
886            types::Int32.encode(buf, &self.leader_epoch)?;
887        } else {
888            if self.leader_epoch != -1 {
889                bail!("A field is set that is not available on the selected protocol version");
890            }
891        }
892        if version >= 12 {
893            let num_tagged_fields = self.unknown_tagged_fields.len();
894            if num_tagged_fields > std::u32::MAX as usize {
895                bail!(
896                    "Too many tagged fields to encode ({} fields)",
897                    num_tagged_fields
898                );
899            }
900            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
901
902            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
903        }
904        Ok(())
905    }
906    fn compute_size(&self, version: i16) -> Result<usize> {
907        let mut total_size = 0;
908        if version >= 12 {
909            total_size += types::Int32.compute_size(&self.leader_id)?;
910        } else {
911            if self.leader_id != -1 {
912                bail!("A field is set that is not available on the selected protocol version");
913            }
914        }
915        if version >= 12 {
916            total_size += types::Int32.compute_size(&self.leader_epoch)?;
917        } else {
918            if self.leader_epoch != -1 {
919                bail!("A field is set that is not available on the selected protocol version");
920            }
921        }
922        if version >= 12 {
923            let num_tagged_fields = self.unknown_tagged_fields.len();
924            if num_tagged_fields > std::u32::MAX as usize {
925                bail!(
926                    "Too many tagged fields to encode ({} fields)",
927                    num_tagged_fields
928                );
929            }
930            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
931
932            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
933        }
934        Ok(total_size)
935    }
936}
937
938#[cfg(feature = "client")]
939impl Decodable for LeaderIdAndEpoch {
940    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
941        if version < 0 || version > 17 {
942            bail!("specified version not supported by this message type");
943        }
944        let leader_id = if version >= 12 {
945            types::Int32.decode(buf)?
946        } else {
947            (-1).into()
948        };
949        let leader_epoch = if version >= 12 {
950            types::Int32.decode(buf)?
951        } else {
952            -1
953        };
954        let mut unknown_tagged_fields = BTreeMap::new();
955        if version >= 12 {
956            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
957            for _ in 0..num_tagged_fields {
958                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
959                let size: u32 = types::UnsignedVarInt.decode(buf)?;
960                let unknown_value = buf.try_get_bytes(size as usize)?;
961                unknown_tagged_fields.insert(tag as i32, unknown_value);
962            }
963        }
964        Ok(Self {
965            leader_id,
966            leader_epoch,
967            unknown_tagged_fields,
968        })
969    }
970}
971
972impl Default for LeaderIdAndEpoch {
973    fn default() -> Self {
974        Self {
975            leader_id: (-1).into(),
976            leader_epoch: -1,
977            unknown_tagged_fields: BTreeMap::new(),
978        }
979    }
980}
981
982impl Message for LeaderIdAndEpoch {
983    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
984    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
985}
986
987/// Valid versions: 0-17
988#[non_exhaustive]
989#[derive(Debug, Clone, PartialEq)]
990pub struct NodeEndpoint {
991    /// The ID of the associated node.
992    ///
993    /// Supported API versions: 16-17
994    pub node_id: super::BrokerId,
995
996    /// The node's hostname.
997    ///
998    /// Supported API versions: 16-17
999    pub host: StrBytes,
1000
1001    /// The node's port.
1002    ///
1003    /// Supported API versions: 16-17
1004    pub port: i32,
1005
1006    /// The rack of the node, or null if it has not been assigned to a rack.
1007    ///
1008    /// Supported API versions: 16-17
1009    pub rack: Option<StrBytes>,
1010
1011    /// Other tagged fields
1012    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1013}
1014
1015impl NodeEndpoint {
1016    /// Sets `node_id` to the passed value.
1017    ///
1018    /// The ID of the associated node.
1019    ///
1020    /// Supported API versions: 16-17
1021    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
1022        self.node_id = value;
1023        self
1024    }
1025    /// Sets `host` to the passed value.
1026    ///
1027    /// The node's hostname.
1028    ///
1029    /// Supported API versions: 16-17
1030    pub fn with_host(mut self, value: StrBytes) -> Self {
1031        self.host = value;
1032        self
1033    }
1034    /// Sets `port` to the passed value.
1035    ///
1036    /// The node's port.
1037    ///
1038    /// Supported API versions: 16-17
1039    pub fn with_port(mut self, value: i32) -> Self {
1040        self.port = value;
1041        self
1042    }
1043    /// Sets `rack` to the passed value.
1044    ///
1045    /// The rack of the node, or null if it has not been assigned to a rack.
1046    ///
1047    /// Supported API versions: 16-17
1048    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
1049        self.rack = value;
1050        self
1051    }
1052    /// Sets unknown_tagged_fields to the passed value.
1053    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1054        self.unknown_tagged_fields = value;
1055        self
1056    }
1057    /// Inserts an entry into unknown_tagged_fields.
1058    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1059        self.unknown_tagged_fields.insert(key, value);
1060        self
1061    }
1062}
1063
1064#[cfg(feature = "broker")]
1065impl Encodable for NodeEndpoint {
1066    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1067        if version < 0 || version > 17 {
1068            bail!("specified version not supported by this message type");
1069        }
1070        if version >= 16 {
1071            types::Int32.encode(buf, &self.node_id)?;
1072        } else {
1073            if self.node_id != 0 {
1074                bail!("A field is set that is not available on the selected protocol version");
1075            }
1076        }
1077        if version >= 16 {
1078            types::CompactString.encode(buf, &self.host)?;
1079        } else {
1080            if !self.host.is_empty() {
1081                bail!("A field is set that is not available on the selected protocol version");
1082            }
1083        }
1084        if version >= 16 {
1085            types::Int32.encode(buf, &self.port)?;
1086        } else {
1087            if self.port != 0 {
1088                bail!("A field is set that is not available on the selected protocol version");
1089            }
1090        }
1091        if version >= 16 {
1092            types::CompactString.encode(buf, &self.rack)?;
1093        } else {
1094            if !self.rack.is_none() {
1095                bail!("A field is set that is not available on the selected protocol version");
1096            }
1097        }
1098        if version >= 12 {
1099            let num_tagged_fields = self.unknown_tagged_fields.len();
1100            if num_tagged_fields > std::u32::MAX as usize {
1101                bail!(
1102                    "Too many tagged fields to encode ({} fields)",
1103                    num_tagged_fields
1104                );
1105            }
1106            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1107
1108            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1109        }
1110        Ok(())
1111    }
1112    fn compute_size(&self, version: i16) -> Result<usize> {
1113        let mut total_size = 0;
1114        if version >= 16 {
1115            total_size += types::Int32.compute_size(&self.node_id)?;
1116        } else {
1117            if self.node_id != 0 {
1118                bail!("A field is set that is not available on the selected protocol version");
1119            }
1120        }
1121        if version >= 16 {
1122            total_size += types::CompactString.compute_size(&self.host)?;
1123        } else {
1124            if !self.host.is_empty() {
1125                bail!("A field is set that is not available on the selected protocol version");
1126            }
1127        }
1128        if version >= 16 {
1129            total_size += types::Int32.compute_size(&self.port)?;
1130        } else {
1131            if self.port != 0 {
1132                bail!("A field is set that is not available on the selected protocol version");
1133            }
1134        }
1135        if version >= 16 {
1136            total_size += types::CompactString.compute_size(&self.rack)?;
1137        } else {
1138            if !self.rack.is_none() {
1139                bail!("A field is set that is not available on the selected protocol version");
1140            }
1141        }
1142        if version >= 12 {
1143            let num_tagged_fields = self.unknown_tagged_fields.len();
1144            if num_tagged_fields > std::u32::MAX as usize {
1145                bail!(
1146                    "Too many tagged fields to encode ({} fields)",
1147                    num_tagged_fields
1148                );
1149            }
1150            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1151
1152            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1153        }
1154        Ok(total_size)
1155    }
1156}
1157
1158#[cfg(feature = "client")]
1159impl Decodable for NodeEndpoint {
1160    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1161        if version < 0 || version > 17 {
1162            bail!("specified version not supported by this message type");
1163        }
1164        let node_id = if version >= 16 {
1165            types::Int32.decode(buf)?
1166        } else {
1167            (0).into()
1168        };
1169        let host = if version >= 16 {
1170            types::CompactString.decode(buf)?
1171        } else {
1172            Default::default()
1173        };
1174        let port = if version >= 16 {
1175            types::Int32.decode(buf)?
1176        } else {
1177            0
1178        };
1179        let rack = if version >= 16 {
1180            types::CompactString.decode(buf)?
1181        } else {
1182            None
1183        };
1184        let mut unknown_tagged_fields = BTreeMap::new();
1185        if version >= 12 {
1186            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1187            for _ in 0..num_tagged_fields {
1188                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1189                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1190                let unknown_value = buf.try_get_bytes(size as usize)?;
1191                unknown_tagged_fields.insert(tag as i32, unknown_value);
1192            }
1193        }
1194        Ok(Self {
1195            node_id,
1196            host,
1197            port,
1198            rack,
1199            unknown_tagged_fields,
1200        })
1201    }
1202}
1203
1204impl Default for NodeEndpoint {
1205    fn default() -> Self {
1206        Self {
1207            node_id: (0).into(),
1208            host: Default::default(),
1209            port: 0,
1210            rack: None,
1211            unknown_tagged_fields: BTreeMap::new(),
1212        }
1213    }
1214}
1215
1216impl Message for NodeEndpoint {
1217    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1218    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1219}
1220
1221/// Valid versions: 0-17
1222#[non_exhaustive]
1223#[derive(Debug, Clone, PartialEq)]
1224pub struct PartitionData {
1225    /// The partition index.
1226    ///
1227    /// Supported API versions: 0-17
1228    pub partition_index: i32,
1229
1230    /// The error code, or 0 if there was no fetch error.
1231    ///
1232    /// Supported API versions: 0-17
1233    pub error_code: i16,
1234
1235    /// The current high water mark.
1236    ///
1237    /// Supported API versions: 0-17
1238    pub high_watermark: i64,
1239
1240    /// The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)
1241    ///
1242    /// Supported API versions: 4-17
1243    pub last_stable_offset: i64,
1244
1245    /// The current log start offset.
1246    ///
1247    /// Supported API versions: 5-17
1248    pub log_start_offset: i64,
1249
1250    /// In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge
1251    ///
1252    /// Supported API versions: 12-17
1253    pub diverging_epoch: EpochEndOffset,
1254
1255    ///
1256    ///
1257    /// Supported API versions: 12-17
1258    pub current_leader: LeaderIdAndEpoch,
1259
1260    /// In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.
1261    ///
1262    /// Supported API versions: 12-17
1263    pub snapshot_id: SnapshotId,
1264
1265    /// The aborted transactions.
1266    ///
1267    /// Supported API versions: 4-17
1268    pub aborted_transactions: Option<Vec<AbortedTransaction>>,
1269
1270    /// The preferred read replica for the consumer to use on its next fetch request
1271    ///
1272    /// Supported API versions: 11-17
1273    pub preferred_read_replica: super::BrokerId,
1274
1275    /// The record data.
1276    ///
1277    /// Supported API versions: 0-17
1278    pub records: Option<Bytes>,
1279
1280    /// Other tagged fields
1281    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1282}
1283
1284impl PartitionData {
1285    /// Sets `partition_index` to the passed value.
1286    ///
1287    /// The partition index.
1288    ///
1289    /// Supported API versions: 0-17
1290    pub fn with_partition_index(mut self, value: i32) -> Self {
1291        self.partition_index = value;
1292        self
1293    }
1294    /// Sets `error_code` to the passed value.
1295    ///
1296    /// The error code, or 0 if there was no fetch error.
1297    ///
1298    /// Supported API versions: 0-17
1299    pub fn with_error_code(mut self, value: i16) -> Self {
1300        self.error_code = value;
1301        self
1302    }
1303    /// Sets `high_watermark` to the passed value.
1304    ///
1305    /// The current high water mark.
1306    ///
1307    /// Supported API versions: 0-17
1308    pub fn with_high_watermark(mut self, value: i64) -> Self {
1309        self.high_watermark = value;
1310        self
1311    }
1312    /// Sets `last_stable_offset` to the passed value.
1313    ///
1314    /// The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)
1315    ///
1316    /// Supported API versions: 4-17
1317    pub fn with_last_stable_offset(mut self, value: i64) -> Self {
1318        self.last_stable_offset = value;
1319        self
1320    }
1321    /// Sets `log_start_offset` to the passed value.
1322    ///
1323    /// The current log start offset.
1324    ///
1325    /// Supported API versions: 5-17
1326    pub fn with_log_start_offset(mut self, value: i64) -> Self {
1327        self.log_start_offset = value;
1328        self
1329    }
1330    /// Sets `diverging_epoch` to the passed value.
1331    ///
1332    /// In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge
1333    ///
1334    /// Supported API versions: 12-17
1335    pub fn with_diverging_epoch(mut self, value: EpochEndOffset) -> Self {
1336        self.diverging_epoch = value;
1337        self
1338    }
1339    /// Sets `current_leader` to the passed value.
1340    ///
1341    ///
1342    ///
1343    /// Supported API versions: 12-17
1344    pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
1345        self.current_leader = value;
1346        self
1347    }
1348    /// Sets `snapshot_id` to the passed value.
1349    ///
1350    /// In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.
1351    ///
1352    /// Supported API versions: 12-17
1353    pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
1354        self.snapshot_id = value;
1355        self
1356    }
1357    /// Sets `aborted_transactions` to the passed value.
1358    ///
1359    /// The aborted transactions.
1360    ///
1361    /// Supported API versions: 4-17
1362    pub fn with_aborted_transactions(mut self, value: Option<Vec<AbortedTransaction>>) -> Self {
1363        self.aborted_transactions = value;
1364        self
1365    }
1366    /// Sets `preferred_read_replica` to the passed value.
1367    ///
1368    /// The preferred read replica for the consumer to use on its next fetch request
1369    ///
1370    /// Supported API versions: 11-17
1371    pub fn with_preferred_read_replica(mut self, value: super::BrokerId) -> Self {
1372        self.preferred_read_replica = value;
1373        self
1374    }
1375    /// Sets `records` to the passed value.
1376    ///
1377    /// The record data.
1378    ///
1379    /// Supported API versions: 0-17
1380    pub fn with_records(mut self, value: Option<Bytes>) -> Self {
1381        self.records = value;
1382        self
1383    }
1384    /// Sets unknown_tagged_fields to the passed value.
1385    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1386        self.unknown_tagged_fields = value;
1387        self
1388    }
1389    /// Inserts an entry into unknown_tagged_fields.
1390    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1391        self.unknown_tagged_fields.insert(key, value);
1392        self
1393    }
1394}
1395
1396#[cfg(feature = "broker")]
1397impl Encodable for PartitionData {
1398    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1399        if version < 0 || version > 17 {
1400            bail!("specified version not supported by this message type");
1401        }
1402        types::Int32.encode(buf, &self.partition_index)?;
1403        types::Int16.encode(buf, &self.error_code)?;
1404        types::Int64.encode(buf, &self.high_watermark)?;
1405        if version >= 4 {
1406            types::Int64.encode(buf, &self.last_stable_offset)?;
1407        }
1408        if version >= 5 {
1409            types::Int64.encode(buf, &self.log_start_offset)?;
1410        }
1411        if version >= 4 {
1412            if version >= 12 {
1413                types::CompactArray(types::Struct { version })
1414                    .encode(buf, &self.aborted_transactions)?;
1415            } else {
1416                types::Array(types::Struct { version }).encode(buf, &self.aborted_transactions)?;
1417            }
1418        }
1419        if version >= 11 {
1420            types::Int32.encode(buf, &self.preferred_read_replica)?;
1421        } else {
1422            if self.preferred_read_replica != -1 {
1423                bail!("A field is set that is not available on the selected protocol version");
1424            }
1425        }
1426        if version >= 12 {
1427            types::CompactBytes.encode(buf, &self.records)?;
1428        } else {
1429            types::Bytes.encode(buf, &self.records)?;
1430        }
1431        if version >= 12 {
1432            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1433            if &self.diverging_epoch != &Default::default() {
1434                num_tagged_fields += 1;
1435            }
1436            if &self.current_leader != &Default::default() {
1437                num_tagged_fields += 1;
1438            }
1439            if &self.snapshot_id != &Default::default() {
1440                num_tagged_fields += 1;
1441            }
1442            if num_tagged_fields > std::u32::MAX as usize {
1443                bail!(
1444                    "Too many tagged fields to encode ({} fields)",
1445                    num_tagged_fields
1446                );
1447            }
1448            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1449            if &self.diverging_epoch != &Default::default() {
1450                let computed_size =
1451                    types::Struct { version }.compute_size(&self.diverging_epoch)?;
1452                if computed_size > std::u32::MAX as usize {
1453                    bail!(
1454                        "Tagged field is too large to encode ({} bytes)",
1455                        computed_size
1456                    );
1457                }
1458                types::UnsignedVarInt.encode(buf, 0)?;
1459                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1460                types::Struct { version }.encode(buf, &self.diverging_epoch)?;
1461            }
1462            if &self.current_leader != &Default::default() {
1463                let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1464                if computed_size > std::u32::MAX as usize {
1465                    bail!(
1466                        "Tagged field is too large to encode ({} bytes)",
1467                        computed_size
1468                    );
1469                }
1470                types::UnsignedVarInt.encode(buf, 1)?;
1471                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1472                types::Struct { version }.encode(buf, &self.current_leader)?;
1473            }
1474            if &self.snapshot_id != &Default::default() {
1475                let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1476                if computed_size > std::u32::MAX as usize {
1477                    bail!(
1478                        "Tagged field is too large to encode ({} bytes)",
1479                        computed_size
1480                    );
1481                }
1482                types::UnsignedVarInt.encode(buf, 2)?;
1483                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1484                types::Struct { version }.encode(buf, &self.snapshot_id)?;
1485            }
1486
1487            write_unknown_tagged_fields(buf, 3.., &self.unknown_tagged_fields)?;
1488        }
1489        Ok(())
1490    }
1491    fn compute_size(&self, version: i16) -> Result<usize> {
1492        let mut total_size = 0;
1493        total_size += types::Int32.compute_size(&self.partition_index)?;
1494        total_size += types::Int16.compute_size(&self.error_code)?;
1495        total_size += types::Int64.compute_size(&self.high_watermark)?;
1496        if version >= 4 {
1497            total_size += types::Int64.compute_size(&self.last_stable_offset)?;
1498        }
1499        if version >= 5 {
1500            total_size += types::Int64.compute_size(&self.log_start_offset)?;
1501        }
1502        if version >= 4 {
1503            if version >= 12 {
1504                total_size += types::CompactArray(types::Struct { version })
1505                    .compute_size(&self.aborted_transactions)?;
1506            } else {
1507                total_size += types::Array(types::Struct { version })
1508                    .compute_size(&self.aborted_transactions)?;
1509            }
1510        }
1511        if version >= 11 {
1512            total_size += types::Int32.compute_size(&self.preferred_read_replica)?;
1513        } else {
1514            if self.preferred_read_replica != -1 {
1515                bail!("A field is set that is not available on the selected protocol version");
1516            }
1517        }
1518        if version >= 12 {
1519            total_size += types::CompactBytes.compute_size(&self.records)?;
1520        } else {
1521            total_size += types::Bytes.compute_size(&self.records)?;
1522        }
1523        if version >= 12 {
1524            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1525            if &self.diverging_epoch != &Default::default() {
1526                num_tagged_fields += 1;
1527            }
1528            if &self.current_leader != &Default::default() {
1529                num_tagged_fields += 1;
1530            }
1531            if &self.snapshot_id != &Default::default() {
1532                num_tagged_fields += 1;
1533            }
1534            if num_tagged_fields > std::u32::MAX as usize {
1535                bail!(
1536                    "Too many tagged fields to encode ({} fields)",
1537                    num_tagged_fields
1538                );
1539            }
1540            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1541            if &self.diverging_epoch != &Default::default() {
1542                let computed_size =
1543                    types::Struct { version }.compute_size(&self.diverging_epoch)?;
1544                if computed_size > std::u32::MAX as usize {
1545                    bail!(
1546                        "Tagged field is too large to encode ({} bytes)",
1547                        computed_size
1548                    );
1549                }
1550                total_size += types::UnsignedVarInt.compute_size(0)?;
1551                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1552                total_size += computed_size;
1553            }
1554            if &self.current_leader != &Default::default() {
1555                let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1556                if computed_size > std::u32::MAX as usize {
1557                    bail!(
1558                        "Tagged field is too large to encode ({} bytes)",
1559                        computed_size
1560                    );
1561                }
1562                total_size += types::UnsignedVarInt.compute_size(1)?;
1563                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1564                total_size += computed_size;
1565            }
1566            if &self.snapshot_id != &Default::default() {
1567                let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1568                if computed_size > std::u32::MAX as usize {
1569                    bail!(
1570                        "Tagged field is too large to encode ({} bytes)",
1571                        computed_size
1572                    );
1573                }
1574                total_size += types::UnsignedVarInt.compute_size(2)?;
1575                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1576                total_size += computed_size;
1577            }
1578
1579            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1580        }
1581        Ok(total_size)
1582    }
1583}
1584
1585#[cfg(feature = "client")]
1586impl Decodable for PartitionData {
1587    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1588        if version < 0 || version > 17 {
1589            bail!("specified version not supported by this message type");
1590        }
1591        let partition_index = types::Int32.decode(buf)?;
1592        let error_code = types::Int16.decode(buf)?;
1593        let high_watermark = types::Int64.decode(buf)?;
1594        let last_stable_offset = if version >= 4 {
1595            types::Int64.decode(buf)?
1596        } else {
1597            -1
1598        };
1599        let log_start_offset = if version >= 5 {
1600            types::Int64.decode(buf)?
1601        } else {
1602            -1
1603        };
1604        let mut diverging_epoch = Default::default();
1605        let mut current_leader = Default::default();
1606        let mut snapshot_id = Default::default();
1607        let aborted_transactions = if version >= 4 {
1608            if version >= 12 {
1609                types::CompactArray(types::Struct { version }).decode(buf)?
1610            } else {
1611                types::Array(types::Struct { version }).decode(buf)?
1612            }
1613        } else {
1614            Some(Default::default())
1615        };
1616        let preferred_read_replica = if version >= 11 {
1617            types::Int32.decode(buf)?
1618        } else {
1619            (-1).into()
1620        };
1621        let records = if version >= 12 {
1622            types::CompactBytes.decode(buf)?
1623        } else {
1624            types::Bytes.decode(buf)?
1625        };
1626        let mut unknown_tagged_fields = BTreeMap::new();
1627        if version >= 12 {
1628            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1629            for _ in 0..num_tagged_fields {
1630                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1631                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1632                match tag {
1633                    0 => {
1634                        diverging_epoch = types::Struct { version }.decode(buf)?;
1635                    }
1636                    1 => {
1637                        current_leader = types::Struct { version }.decode(buf)?;
1638                    }
1639                    2 => {
1640                        snapshot_id = types::Struct { version }.decode(buf)?;
1641                    }
1642                    _ => {
1643                        let unknown_value = buf.try_get_bytes(size as usize)?;
1644                        unknown_tagged_fields.insert(tag as i32, unknown_value);
1645                    }
1646                }
1647            }
1648        }
1649        Ok(Self {
1650            partition_index,
1651            error_code,
1652            high_watermark,
1653            last_stable_offset,
1654            log_start_offset,
1655            diverging_epoch,
1656            current_leader,
1657            snapshot_id,
1658            aborted_transactions,
1659            preferred_read_replica,
1660            records,
1661            unknown_tagged_fields,
1662        })
1663    }
1664}
1665
1666impl Default for PartitionData {
1667    fn default() -> Self {
1668        Self {
1669            partition_index: 0,
1670            error_code: 0,
1671            high_watermark: 0,
1672            last_stable_offset: -1,
1673            log_start_offset: -1,
1674            diverging_epoch: Default::default(),
1675            current_leader: Default::default(),
1676            snapshot_id: Default::default(),
1677            aborted_transactions: Some(Default::default()),
1678            preferred_read_replica: (-1).into(),
1679            records: Some(Default::default()),
1680            unknown_tagged_fields: BTreeMap::new(),
1681        }
1682    }
1683}
1684
1685impl Message for PartitionData {
1686    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1687    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1688}
1689
1690/// Valid versions: 0-17
1691#[non_exhaustive]
1692#[derive(Debug, Clone, PartialEq)]
1693pub struct SnapshotId {
1694    ///
1695    ///
1696    /// Supported API versions: 0-17
1697    pub end_offset: i64,
1698
1699    ///
1700    ///
1701    /// Supported API versions: 0-17
1702    pub epoch: i32,
1703
1704    /// Other tagged fields
1705    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1706}
1707
1708impl SnapshotId {
1709    /// Sets `end_offset` to the passed value.
1710    ///
1711    ///
1712    ///
1713    /// Supported API versions: 0-17
1714    pub fn with_end_offset(mut self, value: i64) -> Self {
1715        self.end_offset = value;
1716        self
1717    }
1718    /// Sets `epoch` to the passed value.
1719    ///
1720    ///
1721    ///
1722    /// Supported API versions: 0-17
1723    pub fn with_epoch(mut self, value: i32) -> Self {
1724        self.epoch = value;
1725        self
1726    }
1727    /// Sets unknown_tagged_fields to the passed value.
1728    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1729        self.unknown_tagged_fields = value;
1730        self
1731    }
1732    /// Inserts an entry into unknown_tagged_fields.
1733    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1734        self.unknown_tagged_fields.insert(key, value);
1735        self
1736    }
1737}
1738
1739#[cfg(feature = "broker")]
1740impl Encodable for SnapshotId {
1741    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1742        if version < 0 || version > 17 {
1743            bail!("specified version not supported by this message type");
1744        }
1745        types::Int64.encode(buf, &self.end_offset)?;
1746        types::Int32.encode(buf, &self.epoch)?;
1747        if version >= 12 {
1748            let num_tagged_fields = self.unknown_tagged_fields.len();
1749            if num_tagged_fields > std::u32::MAX as usize {
1750                bail!(
1751                    "Too many tagged fields to encode ({} fields)",
1752                    num_tagged_fields
1753                );
1754            }
1755            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1756
1757            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1758        }
1759        Ok(())
1760    }
1761    fn compute_size(&self, version: i16) -> Result<usize> {
1762        let mut total_size = 0;
1763        total_size += types::Int64.compute_size(&self.end_offset)?;
1764        total_size += types::Int32.compute_size(&self.epoch)?;
1765        if version >= 12 {
1766            let num_tagged_fields = self.unknown_tagged_fields.len();
1767            if num_tagged_fields > std::u32::MAX as usize {
1768                bail!(
1769                    "Too many tagged fields to encode ({} fields)",
1770                    num_tagged_fields
1771                );
1772            }
1773            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1774
1775            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1776        }
1777        Ok(total_size)
1778    }
1779}
1780
1781#[cfg(feature = "client")]
1782impl Decodable for SnapshotId {
1783    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1784        if version < 0 || version > 17 {
1785            bail!("specified version not supported by this message type");
1786        }
1787        let end_offset = types::Int64.decode(buf)?;
1788        let epoch = types::Int32.decode(buf)?;
1789        let mut unknown_tagged_fields = BTreeMap::new();
1790        if version >= 12 {
1791            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1792            for _ in 0..num_tagged_fields {
1793                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1794                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1795                let unknown_value = buf.try_get_bytes(size as usize)?;
1796                unknown_tagged_fields.insert(tag as i32, unknown_value);
1797            }
1798        }
1799        Ok(Self {
1800            end_offset,
1801            epoch,
1802            unknown_tagged_fields,
1803        })
1804    }
1805}
1806
1807impl Default for SnapshotId {
1808    fn default() -> Self {
1809        Self {
1810            end_offset: -1,
1811            epoch: -1,
1812            unknown_tagged_fields: BTreeMap::new(),
1813        }
1814    }
1815}
1816
1817impl Message for SnapshotId {
1818    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1819    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1820}
1821
1822impl HeaderVersion for FetchResponse {
1823    fn header_version(version: i16) -> i16 {
1824        if version >= 12 {
1825            1
1826        } else {
1827            0
1828        }
1829    }
1830}