kafka_protocol/messages/
fetch_response.rs

1//! FetchResponse
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FetchResponse.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 4-18
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AbortedTransaction {
24    /// The producer id associated with the aborted transaction.
25    ///
26    /// Supported API versions: 4-18
27    pub producer_id: super::ProducerId,
28
29    /// The first offset in the aborted transaction.
30    ///
31    /// Supported API versions: 4-18
32    pub first_offset: i64,
33
34    /// Other tagged fields
35    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AbortedTransaction {
39    /// Sets `producer_id` to the passed value.
40    ///
41    /// The producer id associated with the aborted transaction.
42    ///
43    /// Supported API versions: 4-18
44    pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
45        self.producer_id = value;
46        self
47    }
48    /// Sets `first_offset` to the passed value.
49    ///
50    /// The first offset in the aborted transaction.
51    ///
52    /// Supported API versions: 4-18
53    pub fn with_first_offset(mut self, value: i64) -> Self {
54        self.first_offset = value;
55        self
56    }
57    /// Sets unknown_tagged_fields to the passed value.
58    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59        self.unknown_tagged_fields = value;
60        self
61    }
62    /// Inserts an entry into unknown_tagged_fields.
63    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64        self.unknown_tagged_fields.insert(key, value);
65        self
66    }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for AbortedTransaction {
71    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72        if version < 4 || version > 18 {
73            bail!("specified version not supported by this message type");
74        }
75        types::Int64.encode(buf, &self.producer_id)?;
76        types::Int64.encode(buf, &self.first_offset)?;
77        if version >= 12 {
78            let num_tagged_fields = self.unknown_tagged_fields.len();
79            if num_tagged_fields > std::u32::MAX as usize {
80                bail!(
81                    "Too many tagged fields to encode ({} fields)",
82                    num_tagged_fields
83                );
84            }
85            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
86
87            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
88        }
89        Ok(())
90    }
91    fn compute_size(&self, version: i16) -> Result<usize> {
92        let mut total_size = 0;
93        total_size += types::Int64.compute_size(&self.producer_id)?;
94        total_size += types::Int64.compute_size(&self.first_offset)?;
95        if version >= 12 {
96            let num_tagged_fields = self.unknown_tagged_fields.len();
97            if num_tagged_fields > std::u32::MAX as usize {
98                bail!(
99                    "Too many tagged fields to encode ({} fields)",
100                    num_tagged_fields
101                );
102            }
103            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
104
105            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
106        }
107        Ok(total_size)
108    }
109}
110
111#[cfg(feature = "client")]
112impl Decodable for AbortedTransaction {
113    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
114        if version < 4 || version > 18 {
115            bail!("specified version not supported by this message type");
116        }
117        let producer_id = types::Int64.decode(buf)?;
118        let first_offset = types::Int64.decode(buf)?;
119        let mut unknown_tagged_fields = BTreeMap::new();
120        if version >= 12 {
121            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
122            for _ in 0..num_tagged_fields {
123                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
124                let size: u32 = types::UnsignedVarInt.decode(buf)?;
125                let unknown_value = buf.try_get_bytes(size as usize)?;
126                unknown_tagged_fields.insert(tag as i32, unknown_value);
127            }
128        }
129        Ok(Self {
130            producer_id,
131            first_offset,
132            unknown_tagged_fields,
133        })
134    }
135}
136
137impl Default for AbortedTransaction {
138    fn default() -> Self {
139        Self {
140            producer_id: (0).into(),
141            first_offset: 0,
142            unknown_tagged_fields: BTreeMap::new(),
143        }
144    }
145}
146
147impl Message for AbortedTransaction {
148    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
149    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
150}
151
152/// Valid versions: 4-18
153#[non_exhaustive]
154#[derive(Debug, Clone, PartialEq)]
155pub struct EpochEndOffset {
156    /// The largest epoch.
157    ///
158    /// Supported API versions: 12-18
159    pub epoch: i32,
160
161    /// The end offset of the epoch.
162    ///
163    /// Supported API versions: 12-18
164    pub end_offset: i64,
165
166    /// Other tagged fields
167    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
168}
169
170impl EpochEndOffset {
171    /// Sets `epoch` to the passed value.
172    ///
173    /// The largest epoch.
174    ///
175    /// Supported API versions: 12-18
176    pub fn with_epoch(mut self, value: i32) -> Self {
177        self.epoch = value;
178        self
179    }
180    /// Sets `end_offset` to the passed value.
181    ///
182    /// The end offset of the epoch.
183    ///
184    /// Supported API versions: 12-18
185    pub fn with_end_offset(mut self, value: i64) -> Self {
186        self.end_offset = value;
187        self
188    }
189    /// Sets unknown_tagged_fields to the passed value.
190    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
191        self.unknown_tagged_fields = value;
192        self
193    }
194    /// Inserts an entry into unknown_tagged_fields.
195    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
196        self.unknown_tagged_fields.insert(key, value);
197        self
198    }
199}
200
201#[cfg(feature = "broker")]
202impl Encodable for EpochEndOffset {
203    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
204        if version < 4 || version > 18 {
205            bail!("specified version not supported by this message type");
206        }
207        if version >= 12 {
208            types::Int32.encode(buf, &self.epoch)?;
209        } else {
210            if self.epoch != -1 {
211                bail!("A field is set that is not available on the selected protocol version");
212            }
213        }
214        if version >= 12 {
215            types::Int64.encode(buf, &self.end_offset)?;
216        } else {
217            if self.end_offset != -1 {
218                bail!("A field is set that is not available on the selected protocol version");
219            }
220        }
221        if version >= 12 {
222            let num_tagged_fields = self.unknown_tagged_fields.len();
223            if num_tagged_fields > std::u32::MAX as usize {
224                bail!(
225                    "Too many tagged fields to encode ({} fields)",
226                    num_tagged_fields
227                );
228            }
229            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
230
231            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
232        }
233        Ok(())
234    }
235    fn compute_size(&self, version: i16) -> Result<usize> {
236        let mut total_size = 0;
237        if version >= 12 {
238            total_size += types::Int32.compute_size(&self.epoch)?;
239        } else {
240            if self.epoch != -1 {
241                bail!("A field is set that is not available on the selected protocol version");
242            }
243        }
244        if version >= 12 {
245            total_size += types::Int64.compute_size(&self.end_offset)?;
246        } else {
247            if self.end_offset != -1 {
248                bail!("A field is set that is not available on the selected protocol version");
249            }
250        }
251        if version >= 12 {
252            let num_tagged_fields = self.unknown_tagged_fields.len();
253            if num_tagged_fields > std::u32::MAX as usize {
254                bail!(
255                    "Too many tagged fields to encode ({} fields)",
256                    num_tagged_fields
257                );
258            }
259            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
260
261            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
262        }
263        Ok(total_size)
264    }
265}
266
267#[cfg(feature = "client")]
268impl Decodable for EpochEndOffset {
269    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
270        if version < 4 || version > 18 {
271            bail!("specified version not supported by this message type");
272        }
273        let epoch = if version >= 12 {
274            types::Int32.decode(buf)?
275        } else {
276            -1
277        };
278        let end_offset = if version >= 12 {
279            types::Int64.decode(buf)?
280        } else {
281            -1
282        };
283        let mut unknown_tagged_fields = BTreeMap::new();
284        if version >= 12 {
285            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
286            for _ in 0..num_tagged_fields {
287                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
288                let size: u32 = types::UnsignedVarInt.decode(buf)?;
289                let unknown_value = buf.try_get_bytes(size as usize)?;
290                unknown_tagged_fields.insert(tag as i32, unknown_value);
291            }
292        }
293        Ok(Self {
294            epoch,
295            end_offset,
296            unknown_tagged_fields,
297        })
298    }
299}
300
301impl Default for EpochEndOffset {
302    fn default() -> Self {
303        Self {
304            epoch: -1,
305            end_offset: -1,
306            unknown_tagged_fields: BTreeMap::new(),
307        }
308    }
309}
310
311impl Message for EpochEndOffset {
312    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
313    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
314}
315
316/// Valid versions: 4-18
317#[non_exhaustive]
318#[derive(Debug, Clone, PartialEq)]
319pub struct FetchResponse {
320    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
321    ///
322    /// Supported API versions: 4-18
323    pub throttle_time_ms: i32,
324
325    /// The top level response error code.
326    ///
327    /// Supported API versions: 7-18
328    pub error_code: i16,
329
330    /// The fetch session ID, or 0 if this is not part of a fetch session.
331    ///
332    /// Supported API versions: 7-18
333    pub session_id: i32,
334
335    /// The response topics.
336    ///
337    /// Supported API versions: 4-18
338    pub responses: Vec<FetchableTopicResponse>,
339
340    /// Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.
341    ///
342    /// Supported API versions: 16-18
343    pub node_endpoints: Vec<NodeEndpoint>,
344
345    /// Other tagged fields
346    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
347}
348
349impl FetchResponse {
350    /// Sets `throttle_time_ms` to the passed value.
351    ///
352    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
353    ///
354    /// Supported API versions: 4-18
355    pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
356        self.throttle_time_ms = value;
357        self
358    }
359    /// Sets `error_code` to the passed value.
360    ///
361    /// The top level response error code.
362    ///
363    /// Supported API versions: 7-18
364    pub fn with_error_code(mut self, value: i16) -> Self {
365        self.error_code = value;
366        self
367    }
368    /// Sets `session_id` to the passed value.
369    ///
370    /// The fetch session ID, or 0 if this is not part of a fetch session.
371    ///
372    /// Supported API versions: 7-18
373    pub fn with_session_id(mut self, value: i32) -> Self {
374        self.session_id = value;
375        self
376    }
377    /// Sets `responses` to the passed value.
378    ///
379    /// The response topics.
380    ///
381    /// Supported API versions: 4-18
382    pub fn with_responses(mut self, value: Vec<FetchableTopicResponse>) -> Self {
383        self.responses = value;
384        self
385    }
386    /// Sets `node_endpoints` to the passed value.
387    ///
388    /// Endpoints for all current-leaders enumerated in PartitionData, with errors NOT_LEADER_OR_FOLLOWER & FENCED_LEADER_EPOCH.
389    ///
390    /// Supported API versions: 16-18
391    pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
392        self.node_endpoints = value;
393        self
394    }
395    /// Sets unknown_tagged_fields to the passed value.
396    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
397        self.unknown_tagged_fields = value;
398        self
399    }
400    /// Inserts an entry into unknown_tagged_fields.
401    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
402        self.unknown_tagged_fields.insert(key, value);
403        self
404    }
405}
406
407#[cfg(feature = "broker")]
408impl Encodable for FetchResponse {
409    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
410        if version < 4 || version > 18 {
411            bail!("specified version not supported by this message type");
412        }
413        types::Int32.encode(buf, &self.throttle_time_ms)?;
414        if version >= 7 {
415            types::Int16.encode(buf, &self.error_code)?;
416        }
417        if version >= 7 {
418            types::Int32.encode(buf, &self.session_id)?;
419        } else {
420            if self.session_id != 0 {
421                bail!("A field is set that is not available on the selected protocol version");
422            }
423        }
424        if version >= 12 {
425            types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
426        } else {
427            types::Array(types::Struct { version }).encode(buf, &self.responses)?;
428        }
429        if version >= 12 {
430            let mut num_tagged_fields = self.unknown_tagged_fields.len();
431            if version >= 16 {
432                if !self.node_endpoints.is_empty() {
433                    num_tagged_fields += 1;
434                }
435            }
436            if num_tagged_fields > std::u32::MAX as usize {
437                bail!(
438                    "Too many tagged fields to encode ({} fields)",
439                    num_tagged_fields
440                );
441            }
442            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
443            if version >= 16 {
444                if !self.node_endpoints.is_empty() {
445                    let computed_size = types::CompactArray(types::Struct { version })
446                        .compute_size(&self.node_endpoints)?;
447                    if computed_size > std::u32::MAX as usize {
448                        bail!(
449                            "Tagged field is too large to encode ({} bytes)",
450                            computed_size
451                        );
452                    }
453                    types::UnsignedVarInt.encode(buf, 0)?;
454                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
455                    types::CompactArray(types::Struct { version })
456                        .encode(buf, &self.node_endpoints)?;
457                }
458            }
459            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
460        }
461        Ok(())
462    }
463    fn compute_size(&self, version: i16) -> Result<usize> {
464        let mut total_size = 0;
465        total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
466        if version >= 7 {
467            total_size += types::Int16.compute_size(&self.error_code)?;
468        }
469        if version >= 7 {
470            total_size += types::Int32.compute_size(&self.session_id)?;
471        } else {
472            if self.session_id != 0 {
473                bail!("A field is set that is not available on the selected protocol version");
474            }
475        }
476        if version >= 12 {
477            total_size +=
478                types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
479        } else {
480            total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
481        }
482        if version >= 12 {
483            let mut num_tagged_fields = self.unknown_tagged_fields.len();
484            if version >= 16 {
485                if !self.node_endpoints.is_empty() {
486                    num_tagged_fields += 1;
487                }
488            }
489            if num_tagged_fields > std::u32::MAX as usize {
490                bail!(
491                    "Too many tagged fields to encode ({} fields)",
492                    num_tagged_fields
493                );
494            }
495            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
496            if version >= 16 {
497                if !self.node_endpoints.is_empty() {
498                    let computed_size = types::CompactArray(types::Struct { version })
499                        .compute_size(&self.node_endpoints)?;
500                    if computed_size > std::u32::MAX as usize {
501                        bail!(
502                            "Tagged field is too large to encode ({} bytes)",
503                            computed_size
504                        );
505                    }
506                    total_size += types::UnsignedVarInt.compute_size(0)?;
507                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
508                    total_size += computed_size;
509                }
510            }
511            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
512        }
513        Ok(total_size)
514    }
515}
516
517#[cfg(feature = "client")]
518impl Decodable for FetchResponse {
519    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
520        if version < 4 || version > 18 {
521            bail!("specified version not supported by this message type");
522        }
523        let throttle_time_ms = types::Int32.decode(buf)?;
524        let error_code = if version >= 7 {
525            types::Int16.decode(buf)?
526        } else {
527            0
528        };
529        let session_id = if version >= 7 {
530            types::Int32.decode(buf)?
531        } else {
532            0
533        };
534        let responses = if version >= 12 {
535            types::CompactArray(types::Struct { version }).decode(buf)?
536        } else {
537            types::Array(types::Struct { version }).decode(buf)?
538        };
539        let mut node_endpoints = Default::default();
540        let mut unknown_tagged_fields = BTreeMap::new();
541        if version >= 12 {
542            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
543            for _ in 0..num_tagged_fields {
544                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
545                let size: u32 = types::UnsignedVarInt.decode(buf)?;
546                match tag {
547                    0 => {
548                        if version >= 16 {
549                            node_endpoints =
550                                types::CompactArray(types::Struct { version }).decode(buf)?;
551                        } else {
552                            bail!("Tag {} is not valid for version {}", tag, version);
553                        }
554                    }
555                    _ => {
556                        let unknown_value = buf.try_get_bytes(size as usize)?;
557                        unknown_tagged_fields.insert(tag as i32, unknown_value);
558                    }
559                }
560            }
561        }
562        Ok(Self {
563            throttle_time_ms,
564            error_code,
565            session_id,
566            responses,
567            node_endpoints,
568            unknown_tagged_fields,
569        })
570    }
571}
572
573impl Default for FetchResponse {
574    fn default() -> Self {
575        Self {
576            throttle_time_ms: 0,
577            error_code: 0,
578            session_id: 0,
579            responses: Default::default(),
580            node_endpoints: Default::default(),
581            unknown_tagged_fields: BTreeMap::new(),
582        }
583    }
584}
585
586impl Message for FetchResponse {
587    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
588    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
589}
590
591/// Valid versions: 4-18
592#[non_exhaustive]
593#[derive(Debug, Clone, PartialEq)]
594pub struct FetchableTopicResponse {
595    /// The topic name.
596    ///
597    /// Supported API versions: 4-12
598    pub topic: super::TopicName,
599
600    /// The unique topic ID.
601    ///
602    /// Supported API versions: 13-18
603    pub topic_id: Uuid,
604
605    /// The topic partitions.
606    ///
607    /// Supported API versions: 4-18
608    pub partitions: Vec<PartitionData>,
609
610    /// Other tagged fields
611    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
612}
613
614impl FetchableTopicResponse {
615    /// Sets `topic` to the passed value.
616    ///
617    /// The topic name.
618    ///
619    /// Supported API versions: 4-12
620    pub fn with_topic(mut self, value: super::TopicName) -> Self {
621        self.topic = value;
622        self
623    }
624    /// Sets `topic_id` to the passed value.
625    ///
626    /// The unique topic ID.
627    ///
628    /// Supported API versions: 13-18
629    pub fn with_topic_id(mut self, value: Uuid) -> Self {
630        self.topic_id = value;
631        self
632    }
633    /// Sets `partitions` to the passed value.
634    ///
635    /// The topic partitions.
636    ///
637    /// Supported API versions: 4-18
638    pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
639        self.partitions = value;
640        self
641    }
642    /// Sets unknown_tagged_fields to the passed value.
643    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
644        self.unknown_tagged_fields = value;
645        self
646    }
647    /// Inserts an entry into unknown_tagged_fields.
648    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
649        self.unknown_tagged_fields.insert(key, value);
650        self
651    }
652}
653
654#[cfg(feature = "broker")]
655impl Encodable for FetchableTopicResponse {
656    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
657        if version < 4 || version > 18 {
658            bail!("specified version not supported by this message type");
659        }
660        if version <= 12 {
661            if version >= 12 {
662                types::CompactString.encode(buf, &self.topic)?;
663            } else {
664                types::String.encode(buf, &self.topic)?;
665            }
666        }
667        if version >= 13 {
668            types::Uuid.encode(buf, &self.topic_id)?;
669        }
670        if version >= 12 {
671            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
672        } else {
673            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
674        }
675        if version >= 12 {
676            let num_tagged_fields = self.unknown_tagged_fields.len();
677            if num_tagged_fields > std::u32::MAX as usize {
678                bail!(
679                    "Too many tagged fields to encode ({} fields)",
680                    num_tagged_fields
681                );
682            }
683            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
684
685            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
686        }
687        Ok(())
688    }
689    fn compute_size(&self, version: i16) -> Result<usize> {
690        let mut total_size = 0;
691        if version <= 12 {
692            if version >= 12 {
693                total_size += types::CompactString.compute_size(&self.topic)?;
694            } else {
695                total_size += types::String.compute_size(&self.topic)?;
696            }
697        }
698        if version >= 13 {
699            total_size += types::Uuid.compute_size(&self.topic_id)?;
700        }
701        if version >= 12 {
702            total_size +=
703                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
704        } else {
705            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
706        }
707        if version >= 12 {
708            let num_tagged_fields = self.unknown_tagged_fields.len();
709            if num_tagged_fields > std::u32::MAX as usize {
710                bail!(
711                    "Too many tagged fields to encode ({} fields)",
712                    num_tagged_fields
713                );
714            }
715            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
716
717            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
718        }
719        Ok(total_size)
720    }
721}
722
723#[cfg(feature = "client")]
724impl Decodable for FetchableTopicResponse {
725    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
726        if version < 4 || version > 18 {
727            bail!("specified version not supported by this message type");
728        }
729        let topic = if version <= 12 {
730            if version >= 12 {
731                types::CompactString.decode(buf)?
732            } else {
733                types::String.decode(buf)?
734            }
735        } else {
736            Default::default()
737        };
738        let topic_id = if version >= 13 {
739            types::Uuid.decode(buf)?
740        } else {
741            Uuid::nil()
742        };
743        let partitions = if version >= 12 {
744            types::CompactArray(types::Struct { version }).decode(buf)?
745        } else {
746            types::Array(types::Struct { version }).decode(buf)?
747        };
748        let mut unknown_tagged_fields = BTreeMap::new();
749        if version >= 12 {
750            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
751            for _ in 0..num_tagged_fields {
752                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
753                let size: u32 = types::UnsignedVarInt.decode(buf)?;
754                let unknown_value = buf.try_get_bytes(size as usize)?;
755                unknown_tagged_fields.insert(tag as i32, unknown_value);
756            }
757        }
758        Ok(Self {
759            topic,
760            topic_id,
761            partitions,
762            unknown_tagged_fields,
763        })
764    }
765}
766
767impl Default for FetchableTopicResponse {
768    fn default() -> Self {
769        Self {
770            topic: Default::default(),
771            topic_id: Uuid::nil(),
772            partitions: Default::default(),
773            unknown_tagged_fields: BTreeMap::new(),
774        }
775    }
776}
777
778impl Message for FetchableTopicResponse {
779    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
780    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
781}
782
783/// Valid versions: 4-18
784#[non_exhaustive]
785#[derive(Debug, Clone, PartialEq)]
786pub struct LeaderIdAndEpoch {
787    /// The ID of the current leader or -1 if the leader is unknown.
788    ///
789    /// Supported API versions: 12-18
790    pub leader_id: super::BrokerId,
791
792    /// The latest known leader epoch.
793    ///
794    /// Supported API versions: 12-18
795    pub leader_epoch: i32,
796
797    /// Other tagged fields
798    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
799}
800
801impl LeaderIdAndEpoch {
802    /// Sets `leader_id` to the passed value.
803    ///
804    /// The ID of the current leader or -1 if the leader is unknown.
805    ///
806    /// Supported API versions: 12-18
807    pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
808        self.leader_id = value;
809        self
810    }
811    /// Sets `leader_epoch` to the passed value.
812    ///
813    /// The latest known leader epoch.
814    ///
815    /// Supported API versions: 12-18
816    pub fn with_leader_epoch(mut self, value: i32) -> Self {
817        self.leader_epoch = value;
818        self
819    }
820    /// Sets unknown_tagged_fields to the passed value.
821    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
822        self.unknown_tagged_fields = value;
823        self
824    }
825    /// Inserts an entry into unknown_tagged_fields.
826    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
827        self.unknown_tagged_fields.insert(key, value);
828        self
829    }
830}
831
832#[cfg(feature = "broker")]
833impl Encodable for LeaderIdAndEpoch {
834    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
835        if version < 4 || version > 18 {
836            bail!("specified version not supported by this message type");
837        }
838        if version >= 12 {
839            types::Int32.encode(buf, &self.leader_id)?;
840        } else {
841            if self.leader_id != -1 {
842                bail!("A field is set that is not available on the selected protocol version");
843            }
844        }
845        if version >= 12 {
846            types::Int32.encode(buf, &self.leader_epoch)?;
847        } else {
848            if self.leader_epoch != -1 {
849                bail!("A field is set that is not available on the selected protocol version");
850            }
851        }
852        if version >= 12 {
853            let num_tagged_fields = self.unknown_tagged_fields.len();
854            if num_tagged_fields > std::u32::MAX as usize {
855                bail!(
856                    "Too many tagged fields to encode ({} fields)",
857                    num_tagged_fields
858                );
859            }
860            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
861
862            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
863        }
864        Ok(())
865    }
866    fn compute_size(&self, version: i16) -> Result<usize> {
867        let mut total_size = 0;
868        if version >= 12 {
869            total_size += types::Int32.compute_size(&self.leader_id)?;
870        } else {
871            if self.leader_id != -1 {
872                bail!("A field is set that is not available on the selected protocol version");
873            }
874        }
875        if version >= 12 {
876            total_size += types::Int32.compute_size(&self.leader_epoch)?;
877        } else {
878            if self.leader_epoch != -1 {
879                bail!("A field is set that is not available on the selected protocol version");
880            }
881        }
882        if version >= 12 {
883            let num_tagged_fields = self.unknown_tagged_fields.len();
884            if num_tagged_fields > std::u32::MAX as usize {
885                bail!(
886                    "Too many tagged fields to encode ({} fields)",
887                    num_tagged_fields
888                );
889            }
890            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
891
892            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
893        }
894        Ok(total_size)
895    }
896}
897
898#[cfg(feature = "client")]
899impl Decodable for LeaderIdAndEpoch {
900    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
901        if version < 4 || version > 18 {
902            bail!("specified version not supported by this message type");
903        }
904        let leader_id = if version >= 12 {
905            types::Int32.decode(buf)?
906        } else {
907            (-1).into()
908        };
909        let leader_epoch = if version >= 12 {
910            types::Int32.decode(buf)?
911        } else {
912            -1
913        };
914        let mut unknown_tagged_fields = BTreeMap::new();
915        if version >= 12 {
916            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
917            for _ in 0..num_tagged_fields {
918                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
919                let size: u32 = types::UnsignedVarInt.decode(buf)?;
920                let unknown_value = buf.try_get_bytes(size as usize)?;
921                unknown_tagged_fields.insert(tag as i32, unknown_value);
922            }
923        }
924        Ok(Self {
925            leader_id,
926            leader_epoch,
927            unknown_tagged_fields,
928        })
929    }
930}
931
932impl Default for LeaderIdAndEpoch {
933    fn default() -> Self {
934        Self {
935            leader_id: (-1).into(),
936            leader_epoch: -1,
937            unknown_tagged_fields: BTreeMap::new(),
938        }
939    }
940}
941
942impl Message for LeaderIdAndEpoch {
943    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
944    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
945}
946
947/// Valid versions: 4-18
948#[non_exhaustive]
949#[derive(Debug, Clone, PartialEq)]
950pub struct NodeEndpoint {
951    /// The ID of the associated node.
952    ///
953    /// Supported API versions: 16-18
954    pub node_id: super::BrokerId,
955
956    /// The node's hostname.
957    ///
958    /// Supported API versions: 16-18
959    pub host: StrBytes,
960
961    /// The node's port.
962    ///
963    /// Supported API versions: 16-18
964    pub port: i32,
965
966    /// The rack of the node, or null if it has not been assigned to a rack.
967    ///
968    /// Supported API versions: 16-18
969    pub rack: Option<StrBytes>,
970
971    /// Other tagged fields
972    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
973}
974
975impl NodeEndpoint {
976    /// Sets `node_id` to the passed value.
977    ///
978    /// The ID of the associated node.
979    ///
980    /// Supported API versions: 16-18
981    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
982        self.node_id = value;
983        self
984    }
985    /// Sets `host` to the passed value.
986    ///
987    /// The node's hostname.
988    ///
989    /// Supported API versions: 16-18
990    pub fn with_host(mut self, value: StrBytes) -> Self {
991        self.host = value;
992        self
993    }
994    /// Sets `port` to the passed value.
995    ///
996    /// The node's port.
997    ///
998    /// Supported API versions: 16-18
999    pub fn with_port(mut self, value: i32) -> Self {
1000        self.port = value;
1001        self
1002    }
1003    /// Sets `rack` to the passed value.
1004    ///
1005    /// The rack of the node, or null if it has not been assigned to a rack.
1006    ///
1007    /// Supported API versions: 16-18
1008    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
1009        self.rack = value;
1010        self
1011    }
1012    /// Sets unknown_tagged_fields to the passed value.
1013    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1014        self.unknown_tagged_fields = value;
1015        self
1016    }
1017    /// Inserts an entry into unknown_tagged_fields.
1018    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1019        self.unknown_tagged_fields.insert(key, value);
1020        self
1021    }
1022}
1023
1024#[cfg(feature = "broker")]
1025impl Encodable for NodeEndpoint {
1026    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1027        if version < 4 || version > 18 {
1028            bail!("specified version not supported by this message type");
1029        }
1030        if version >= 16 {
1031            types::Int32.encode(buf, &self.node_id)?;
1032        } else {
1033            if self.node_id != 0 {
1034                bail!("A field is set that is not available on the selected protocol version");
1035            }
1036        }
1037        if version >= 16 {
1038            types::CompactString.encode(buf, &self.host)?;
1039        } else {
1040            if !self.host.is_empty() {
1041                bail!("A field is set that is not available on the selected protocol version");
1042            }
1043        }
1044        if version >= 16 {
1045            types::Int32.encode(buf, &self.port)?;
1046        } else {
1047            if self.port != 0 {
1048                bail!("A field is set that is not available on the selected protocol version");
1049            }
1050        }
1051        if version >= 16 {
1052            types::CompactString.encode(buf, &self.rack)?;
1053        } else {
1054            if !self.rack.is_none() {
1055                bail!("A field is set that is not available on the selected protocol version");
1056            }
1057        }
1058        if version >= 12 {
1059            let num_tagged_fields = self.unknown_tagged_fields.len();
1060            if num_tagged_fields > std::u32::MAX as usize {
1061                bail!(
1062                    "Too many tagged fields to encode ({} fields)",
1063                    num_tagged_fields
1064                );
1065            }
1066            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1067
1068            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1069        }
1070        Ok(())
1071    }
1072    fn compute_size(&self, version: i16) -> Result<usize> {
1073        let mut total_size = 0;
1074        if version >= 16 {
1075            total_size += types::Int32.compute_size(&self.node_id)?;
1076        } else {
1077            if self.node_id != 0 {
1078                bail!("A field is set that is not available on the selected protocol version");
1079            }
1080        }
1081        if version >= 16 {
1082            total_size += types::CompactString.compute_size(&self.host)?;
1083        } else {
1084            if !self.host.is_empty() {
1085                bail!("A field is set that is not available on the selected protocol version");
1086            }
1087        }
1088        if version >= 16 {
1089            total_size += types::Int32.compute_size(&self.port)?;
1090        } else {
1091            if self.port != 0 {
1092                bail!("A field is set that is not available on the selected protocol version");
1093            }
1094        }
1095        if version >= 16 {
1096            total_size += types::CompactString.compute_size(&self.rack)?;
1097        } else {
1098            if !self.rack.is_none() {
1099                bail!("A field is set that is not available on the selected protocol version");
1100            }
1101        }
1102        if version >= 12 {
1103            let num_tagged_fields = self.unknown_tagged_fields.len();
1104            if num_tagged_fields > std::u32::MAX as usize {
1105                bail!(
1106                    "Too many tagged fields to encode ({} fields)",
1107                    num_tagged_fields
1108                );
1109            }
1110            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1111
1112            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1113        }
1114        Ok(total_size)
1115    }
1116}
1117
1118#[cfg(feature = "client")]
1119impl Decodable for NodeEndpoint {
1120    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1121        if version < 4 || version > 18 {
1122            bail!("specified version not supported by this message type");
1123        }
1124        let node_id = if version >= 16 {
1125            types::Int32.decode(buf)?
1126        } else {
1127            (0).into()
1128        };
1129        let host = if version >= 16 {
1130            types::CompactString.decode(buf)?
1131        } else {
1132            Default::default()
1133        };
1134        let port = if version >= 16 {
1135            types::Int32.decode(buf)?
1136        } else {
1137            0
1138        };
1139        let rack = if version >= 16 {
1140            types::CompactString.decode(buf)?
1141        } else {
1142            None
1143        };
1144        let mut unknown_tagged_fields = BTreeMap::new();
1145        if version >= 12 {
1146            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1147            for _ in 0..num_tagged_fields {
1148                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1149                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1150                let unknown_value = buf.try_get_bytes(size as usize)?;
1151                unknown_tagged_fields.insert(tag as i32, unknown_value);
1152            }
1153        }
1154        Ok(Self {
1155            node_id,
1156            host,
1157            port,
1158            rack,
1159            unknown_tagged_fields,
1160        })
1161    }
1162}
1163
1164impl Default for NodeEndpoint {
1165    fn default() -> Self {
1166        Self {
1167            node_id: (0).into(),
1168            host: Default::default(),
1169            port: 0,
1170            rack: None,
1171            unknown_tagged_fields: BTreeMap::new(),
1172        }
1173    }
1174}
1175
1176impl Message for NodeEndpoint {
1177    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1178    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1179}
1180
1181/// Valid versions: 4-18
1182#[non_exhaustive]
1183#[derive(Debug, Clone, PartialEq)]
1184pub struct PartitionData {
1185    /// The partition index.
1186    ///
1187    /// Supported API versions: 4-18
1188    pub partition_index: i32,
1189
1190    /// The error code, or 0 if there was no fetch error.
1191    ///
1192    /// Supported API versions: 4-18
1193    pub error_code: i16,
1194
1195    /// The current high water mark.
1196    ///
1197    /// Supported API versions: 4-18
1198    pub high_watermark: i64,
1199
1200    /// The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED).
1201    ///
1202    /// Supported API versions: 4-18
1203    pub last_stable_offset: i64,
1204
1205    /// The current log start offset.
1206    ///
1207    /// Supported API versions: 5-18
1208    pub log_start_offset: i64,
1209
1210    /// In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge.
1211    ///
1212    /// Supported API versions: 12-18
1213    pub diverging_epoch: EpochEndOffset,
1214
1215    /// The current leader of the partition.
1216    ///
1217    /// Supported API versions: 12-18
1218    pub current_leader: LeaderIdAndEpoch,
1219
1220    /// In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.
1221    ///
1222    /// Supported API versions: 12-18
1223    pub snapshot_id: SnapshotId,
1224
1225    /// The aborted transactions.
1226    ///
1227    /// Supported API versions: 4-18
1228    pub aborted_transactions: Option<Vec<AbortedTransaction>>,
1229
1230    /// The preferred read replica for the consumer to use on its next fetch request.
1231    ///
1232    /// Supported API versions: 11-18
1233    pub preferred_read_replica: super::BrokerId,
1234
1235    /// The record data.
1236    ///
1237    /// Supported API versions: 4-18
1238    pub records: Option<Bytes>,
1239
1240    /// Other tagged fields
1241    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1242}
1243
1244impl PartitionData {
1245    /// Sets `partition_index` to the passed value.
1246    ///
1247    /// The partition index.
1248    ///
1249    /// Supported API versions: 4-18
1250    pub fn with_partition_index(mut self, value: i32) -> Self {
1251        self.partition_index = value;
1252        self
1253    }
1254    /// Sets `error_code` to the passed value.
1255    ///
1256    /// The error code, or 0 if there was no fetch error.
1257    ///
1258    /// Supported API versions: 4-18
1259    pub fn with_error_code(mut self, value: i16) -> Self {
1260        self.error_code = value;
1261        self
1262    }
1263    /// Sets `high_watermark` to the passed value.
1264    ///
1265    /// The current high water mark.
1266    ///
1267    /// Supported API versions: 4-18
1268    pub fn with_high_watermark(mut self, value: i64) -> Self {
1269        self.high_watermark = value;
1270        self
1271    }
1272    /// Sets `last_stable_offset` to the passed value.
1273    ///
1274    /// The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED).
1275    ///
1276    /// Supported API versions: 4-18
1277    pub fn with_last_stable_offset(mut self, value: i64) -> Self {
1278        self.last_stable_offset = value;
1279        self
1280    }
1281    /// Sets `log_start_offset` to the passed value.
1282    ///
1283    /// The current log start offset.
1284    ///
1285    /// Supported API versions: 5-18
1286    pub fn with_log_start_offset(mut self, value: i64) -> Self {
1287        self.log_start_offset = value;
1288        self
1289    }
1290    /// Sets `diverging_epoch` to the passed value.
1291    ///
1292    /// In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge.
1293    ///
1294    /// Supported API versions: 12-18
1295    pub fn with_diverging_epoch(mut self, value: EpochEndOffset) -> Self {
1296        self.diverging_epoch = value;
1297        self
1298    }
1299    /// Sets `current_leader` to the passed value.
1300    ///
1301    /// The current leader of the partition.
1302    ///
1303    /// Supported API versions: 12-18
1304    pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
1305        self.current_leader = value;
1306        self
1307    }
1308    /// Sets `snapshot_id` to the passed value.
1309    ///
1310    /// In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.
1311    ///
1312    /// Supported API versions: 12-18
1313    pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
1314        self.snapshot_id = value;
1315        self
1316    }
1317    /// Sets `aborted_transactions` to the passed value.
1318    ///
1319    /// The aborted transactions.
1320    ///
1321    /// Supported API versions: 4-18
1322    pub fn with_aborted_transactions(mut self, value: Option<Vec<AbortedTransaction>>) -> Self {
1323        self.aborted_transactions = value;
1324        self
1325    }
1326    /// Sets `preferred_read_replica` to the passed value.
1327    ///
1328    /// The preferred read replica for the consumer to use on its next fetch request.
1329    ///
1330    /// Supported API versions: 11-18
1331    pub fn with_preferred_read_replica(mut self, value: super::BrokerId) -> Self {
1332        self.preferred_read_replica = value;
1333        self
1334    }
1335    /// Sets `records` to the passed value.
1336    ///
1337    /// The record data.
1338    ///
1339    /// Supported API versions: 4-18
1340    pub fn with_records(mut self, value: Option<Bytes>) -> Self {
1341        self.records = value;
1342        self
1343    }
1344    /// Sets unknown_tagged_fields to the passed value.
1345    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1346        self.unknown_tagged_fields = value;
1347        self
1348    }
1349    /// Inserts an entry into unknown_tagged_fields.
1350    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1351        self.unknown_tagged_fields.insert(key, value);
1352        self
1353    }
1354}
1355
1356#[cfg(feature = "broker")]
1357impl Encodable for PartitionData {
1358    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1359        if version < 4 || version > 18 {
1360            bail!("specified version not supported by this message type");
1361        }
1362        types::Int32.encode(buf, &self.partition_index)?;
1363        types::Int16.encode(buf, &self.error_code)?;
1364        types::Int64.encode(buf, &self.high_watermark)?;
1365        types::Int64.encode(buf, &self.last_stable_offset)?;
1366        if version >= 5 {
1367            types::Int64.encode(buf, &self.log_start_offset)?;
1368        }
1369        if version >= 12 {
1370            types::CompactArray(types::Struct { version })
1371                .encode(buf, &self.aborted_transactions)?;
1372        } else {
1373            types::Array(types::Struct { version }).encode(buf, &self.aborted_transactions)?;
1374        }
1375        if version >= 11 {
1376            types::Int32.encode(buf, &self.preferred_read_replica)?;
1377        } else {
1378            if self.preferred_read_replica != -1 {
1379                bail!("A field is set that is not available on the selected protocol version");
1380            }
1381        }
1382        if version >= 12 {
1383            types::CompactBytes.encode(buf, &self.records)?;
1384        } else {
1385            types::Bytes.encode(buf, &self.records)?;
1386        }
1387        if version >= 12 {
1388            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1389            if &self.diverging_epoch != &Default::default() {
1390                num_tagged_fields += 1;
1391            }
1392            if &self.current_leader != &Default::default() {
1393                num_tagged_fields += 1;
1394            }
1395            if &self.snapshot_id != &Default::default() {
1396                num_tagged_fields += 1;
1397            }
1398            if num_tagged_fields > std::u32::MAX as usize {
1399                bail!(
1400                    "Too many tagged fields to encode ({} fields)",
1401                    num_tagged_fields
1402                );
1403            }
1404            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1405            if &self.diverging_epoch != &Default::default() {
1406                let computed_size =
1407                    types::Struct { version }.compute_size(&self.diverging_epoch)?;
1408                if computed_size > std::u32::MAX as usize {
1409                    bail!(
1410                        "Tagged field is too large to encode ({} bytes)",
1411                        computed_size
1412                    );
1413                }
1414                types::UnsignedVarInt.encode(buf, 0)?;
1415                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1416                types::Struct { version }.encode(buf, &self.diverging_epoch)?;
1417            }
1418            if &self.current_leader != &Default::default() {
1419                let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1420                if computed_size > std::u32::MAX as usize {
1421                    bail!(
1422                        "Tagged field is too large to encode ({} bytes)",
1423                        computed_size
1424                    );
1425                }
1426                types::UnsignedVarInt.encode(buf, 1)?;
1427                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1428                types::Struct { version }.encode(buf, &self.current_leader)?;
1429            }
1430            if &self.snapshot_id != &Default::default() {
1431                let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1432                if computed_size > std::u32::MAX as usize {
1433                    bail!(
1434                        "Tagged field is too large to encode ({} bytes)",
1435                        computed_size
1436                    );
1437                }
1438                types::UnsignedVarInt.encode(buf, 2)?;
1439                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1440                types::Struct { version }.encode(buf, &self.snapshot_id)?;
1441            }
1442
1443            write_unknown_tagged_fields(buf, 3.., &self.unknown_tagged_fields)?;
1444        }
1445        Ok(())
1446    }
1447    fn compute_size(&self, version: i16) -> Result<usize> {
1448        let mut total_size = 0;
1449        total_size += types::Int32.compute_size(&self.partition_index)?;
1450        total_size += types::Int16.compute_size(&self.error_code)?;
1451        total_size += types::Int64.compute_size(&self.high_watermark)?;
1452        total_size += types::Int64.compute_size(&self.last_stable_offset)?;
1453        if version >= 5 {
1454            total_size += types::Int64.compute_size(&self.log_start_offset)?;
1455        }
1456        if version >= 12 {
1457            total_size += types::CompactArray(types::Struct { version })
1458                .compute_size(&self.aborted_transactions)?;
1459        } else {
1460            total_size +=
1461                types::Array(types::Struct { version }).compute_size(&self.aborted_transactions)?;
1462        }
1463        if version >= 11 {
1464            total_size += types::Int32.compute_size(&self.preferred_read_replica)?;
1465        } else {
1466            if self.preferred_read_replica != -1 {
1467                bail!("A field is set that is not available on the selected protocol version");
1468            }
1469        }
1470        if version >= 12 {
1471            total_size += types::CompactBytes.compute_size(&self.records)?;
1472        } else {
1473            total_size += types::Bytes.compute_size(&self.records)?;
1474        }
1475        if version >= 12 {
1476            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1477            if &self.diverging_epoch != &Default::default() {
1478                num_tagged_fields += 1;
1479            }
1480            if &self.current_leader != &Default::default() {
1481                num_tagged_fields += 1;
1482            }
1483            if &self.snapshot_id != &Default::default() {
1484                num_tagged_fields += 1;
1485            }
1486            if num_tagged_fields > std::u32::MAX as usize {
1487                bail!(
1488                    "Too many tagged fields to encode ({} fields)",
1489                    num_tagged_fields
1490                );
1491            }
1492            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1493            if &self.diverging_epoch != &Default::default() {
1494                let computed_size =
1495                    types::Struct { version }.compute_size(&self.diverging_epoch)?;
1496                if computed_size > std::u32::MAX as usize {
1497                    bail!(
1498                        "Tagged field is too large to encode ({} bytes)",
1499                        computed_size
1500                    );
1501                }
1502                total_size += types::UnsignedVarInt.compute_size(0)?;
1503                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1504                total_size += computed_size;
1505            }
1506            if &self.current_leader != &Default::default() {
1507                let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
1508                if computed_size > std::u32::MAX as usize {
1509                    bail!(
1510                        "Tagged field is too large to encode ({} bytes)",
1511                        computed_size
1512                    );
1513                }
1514                total_size += types::UnsignedVarInt.compute_size(1)?;
1515                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1516                total_size += computed_size;
1517            }
1518            if &self.snapshot_id != &Default::default() {
1519                let computed_size = types::Struct { version }.compute_size(&self.snapshot_id)?;
1520                if computed_size > std::u32::MAX as usize {
1521                    bail!(
1522                        "Tagged field is too large to encode ({} bytes)",
1523                        computed_size
1524                    );
1525                }
1526                total_size += types::UnsignedVarInt.compute_size(2)?;
1527                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1528                total_size += computed_size;
1529            }
1530
1531            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1532        }
1533        Ok(total_size)
1534    }
1535}
1536
1537#[cfg(feature = "client")]
1538impl Decodable for PartitionData {
1539    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1540        if version < 4 || version > 18 {
1541            bail!("specified version not supported by this message type");
1542        }
1543        let partition_index = types::Int32.decode(buf)?;
1544        let error_code = types::Int16.decode(buf)?;
1545        let high_watermark = types::Int64.decode(buf)?;
1546        let last_stable_offset = types::Int64.decode(buf)?;
1547        let log_start_offset = if version >= 5 {
1548            types::Int64.decode(buf)?
1549        } else {
1550            -1
1551        };
1552        let mut diverging_epoch = Default::default();
1553        let mut current_leader = Default::default();
1554        let mut snapshot_id = Default::default();
1555        let aborted_transactions = if version >= 12 {
1556            types::CompactArray(types::Struct { version }).decode(buf)?
1557        } else {
1558            types::Array(types::Struct { version }).decode(buf)?
1559        };
1560        let preferred_read_replica = if version >= 11 {
1561            types::Int32.decode(buf)?
1562        } else {
1563            (-1).into()
1564        };
1565        let records = if version >= 12 {
1566            types::CompactBytes.decode(buf)?
1567        } else {
1568            types::Bytes.decode(buf)?
1569        };
1570        let mut unknown_tagged_fields = BTreeMap::new();
1571        if version >= 12 {
1572            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1573            for _ in 0..num_tagged_fields {
1574                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1575                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1576                match tag {
1577                    0 => {
1578                        diverging_epoch = types::Struct { version }.decode(buf)?;
1579                    }
1580                    1 => {
1581                        current_leader = types::Struct { version }.decode(buf)?;
1582                    }
1583                    2 => {
1584                        snapshot_id = types::Struct { version }.decode(buf)?;
1585                    }
1586                    _ => {
1587                        let unknown_value = buf.try_get_bytes(size as usize)?;
1588                        unknown_tagged_fields.insert(tag as i32, unknown_value);
1589                    }
1590                }
1591            }
1592        }
1593        Ok(Self {
1594            partition_index,
1595            error_code,
1596            high_watermark,
1597            last_stable_offset,
1598            log_start_offset,
1599            diverging_epoch,
1600            current_leader,
1601            snapshot_id,
1602            aborted_transactions,
1603            preferred_read_replica,
1604            records,
1605            unknown_tagged_fields,
1606        })
1607    }
1608}
1609
1610impl Default for PartitionData {
1611    fn default() -> Self {
1612        Self {
1613            partition_index: 0,
1614            error_code: 0,
1615            high_watermark: 0,
1616            last_stable_offset: -1,
1617            log_start_offset: -1,
1618            diverging_epoch: Default::default(),
1619            current_leader: Default::default(),
1620            snapshot_id: Default::default(),
1621            aborted_transactions: Some(Default::default()),
1622            preferred_read_replica: (-1).into(),
1623            records: Some(Default::default()),
1624            unknown_tagged_fields: BTreeMap::new(),
1625        }
1626    }
1627}
1628
1629impl Message for PartitionData {
1630    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1631    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1632}
1633
1634/// Valid versions: 4-18
1635#[non_exhaustive]
1636#[derive(Debug, Clone, PartialEq)]
1637pub struct SnapshotId {
1638    /// The end offset of the epoch.
1639    ///
1640    /// Supported API versions: 4-18
1641    pub end_offset: i64,
1642
1643    /// The largest epoch.
1644    ///
1645    /// Supported API versions: 4-18
1646    pub epoch: i32,
1647
1648    /// Other tagged fields
1649    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1650}
1651
1652impl SnapshotId {
1653    /// Sets `end_offset` to the passed value.
1654    ///
1655    /// The end offset of the epoch.
1656    ///
1657    /// Supported API versions: 4-18
1658    pub fn with_end_offset(mut self, value: i64) -> Self {
1659        self.end_offset = value;
1660        self
1661    }
1662    /// Sets `epoch` to the passed value.
1663    ///
1664    /// The largest epoch.
1665    ///
1666    /// Supported API versions: 4-18
1667    pub fn with_epoch(mut self, value: i32) -> Self {
1668        self.epoch = value;
1669        self
1670    }
1671    /// Sets unknown_tagged_fields to the passed value.
1672    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1673        self.unknown_tagged_fields = value;
1674        self
1675    }
1676    /// Inserts an entry into unknown_tagged_fields.
1677    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1678        self.unknown_tagged_fields.insert(key, value);
1679        self
1680    }
1681}
1682
1683#[cfg(feature = "broker")]
1684impl Encodable for SnapshotId {
1685    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1686        if version < 4 || version > 18 {
1687            bail!("specified version not supported by this message type");
1688        }
1689        types::Int64.encode(buf, &self.end_offset)?;
1690        types::Int32.encode(buf, &self.epoch)?;
1691        if version >= 12 {
1692            let num_tagged_fields = self.unknown_tagged_fields.len();
1693            if num_tagged_fields > std::u32::MAX as usize {
1694                bail!(
1695                    "Too many tagged fields to encode ({} fields)",
1696                    num_tagged_fields
1697                );
1698            }
1699            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1700
1701            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1702        }
1703        Ok(())
1704    }
1705    fn compute_size(&self, version: i16) -> Result<usize> {
1706        let mut total_size = 0;
1707        total_size += types::Int64.compute_size(&self.end_offset)?;
1708        total_size += types::Int32.compute_size(&self.epoch)?;
1709        if version >= 12 {
1710            let num_tagged_fields = self.unknown_tagged_fields.len();
1711            if num_tagged_fields > std::u32::MAX as usize {
1712                bail!(
1713                    "Too many tagged fields to encode ({} fields)",
1714                    num_tagged_fields
1715                );
1716            }
1717            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1718
1719            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1720        }
1721        Ok(total_size)
1722    }
1723}
1724
1725#[cfg(feature = "client")]
1726impl Decodable for SnapshotId {
1727    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1728        if version < 4 || version > 18 {
1729            bail!("specified version not supported by this message type");
1730        }
1731        let end_offset = types::Int64.decode(buf)?;
1732        let epoch = types::Int32.decode(buf)?;
1733        let mut unknown_tagged_fields = BTreeMap::new();
1734        if version >= 12 {
1735            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1736            for _ in 0..num_tagged_fields {
1737                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1738                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1739                let unknown_value = buf.try_get_bytes(size as usize)?;
1740                unknown_tagged_fields.insert(tag as i32, unknown_value);
1741            }
1742        }
1743        Ok(Self {
1744            end_offset,
1745            epoch,
1746            unknown_tagged_fields,
1747        })
1748    }
1749}
1750
1751impl Default for SnapshotId {
1752    fn default() -> Self {
1753        Self {
1754            end_offset: -1,
1755            epoch: -1,
1756            unknown_tagged_fields: BTreeMap::new(),
1757        }
1758    }
1759}
1760
1761impl Message for SnapshotId {
1762    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1763    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1764}
1765
1766impl HeaderVersion for FetchResponse {
1767    fn header_version(version: i16) -> i16 {
1768        if version >= 12 {
1769            1
1770        } else {
1771            0
1772        }
1773    }
1774}