kafka_protocol/messages/
fetch_request.rs

1//! FetchRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FetchRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 4-18
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchPartition {
24    /// The partition index.
25    ///
26    /// Supported API versions: 4-18
27    pub partition: i32,
28
29    /// The current leader epoch of the partition.
30    ///
31    /// Supported API versions: 9-18
32    pub current_leader_epoch: i32,
33
34    /// The message offset.
35    ///
36    /// Supported API versions: 4-18
37    pub fetch_offset: i64,
38
39    /// The epoch of the last fetched record or -1 if there is none.
40    ///
41    /// Supported API versions: 12-18
42    pub last_fetched_epoch: i32,
43
44    /// The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower.
45    ///
46    /// Supported API versions: 5-18
47    pub log_start_offset: i64,
48
49    /// The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored.
50    ///
51    /// Supported API versions: 4-18
52    pub partition_max_bytes: i32,
53
54    /// The directory id of the follower fetching.
55    ///
56    /// Supported API versions: 17-18
57    pub replica_directory_id: Uuid,
58
59    /// The high-watermark known by the replica. -1 if the high-watermark is not known and 9223372036854775807 if the feature is not supported.
60    ///
61    /// Supported API versions: 18
62    pub high_watermark: i64,
63
64    /// Other tagged fields
65    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl FetchPartition {
69    /// Sets `partition` to the passed value.
70    ///
71    /// The partition index.
72    ///
73    /// Supported API versions: 4-18
74    pub fn with_partition(mut self, value: i32) -> Self {
75        self.partition = value;
76        self
77    }
78    /// Sets `current_leader_epoch` to the passed value.
79    ///
80    /// The current leader epoch of the partition.
81    ///
82    /// Supported API versions: 9-18
83    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
84        self.current_leader_epoch = value;
85        self
86    }
87    /// Sets `fetch_offset` to the passed value.
88    ///
89    /// The message offset.
90    ///
91    /// Supported API versions: 4-18
92    pub fn with_fetch_offset(mut self, value: i64) -> Self {
93        self.fetch_offset = value;
94        self
95    }
96    /// Sets `last_fetched_epoch` to the passed value.
97    ///
98    /// The epoch of the last fetched record or -1 if there is none.
99    ///
100    /// Supported API versions: 12-18
101    pub fn with_last_fetched_epoch(mut self, value: i32) -> Self {
102        self.last_fetched_epoch = value;
103        self
104    }
105    /// Sets `log_start_offset` to the passed value.
106    ///
107    /// The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower.
108    ///
109    /// Supported API versions: 5-18
110    pub fn with_log_start_offset(mut self, value: i64) -> Self {
111        self.log_start_offset = value;
112        self
113    }
114    /// Sets `partition_max_bytes` to the passed value.
115    ///
116    /// The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored.
117    ///
118    /// Supported API versions: 4-18
119    pub fn with_partition_max_bytes(mut self, value: i32) -> Self {
120        self.partition_max_bytes = value;
121        self
122    }
123    /// Sets `replica_directory_id` to the passed value.
124    ///
125    /// The directory id of the follower fetching.
126    ///
127    /// Supported API versions: 17-18
128    pub fn with_replica_directory_id(mut self, value: Uuid) -> Self {
129        self.replica_directory_id = value;
130        self
131    }
132    /// Sets `high_watermark` to the passed value.
133    ///
134    /// The high-watermark known by the replica. -1 if the high-watermark is not known and 9223372036854775807 if the feature is not supported.
135    ///
136    /// Supported API versions: 18
137    pub fn with_high_watermark(mut self, value: i64) -> Self {
138        self.high_watermark = value;
139        self
140    }
141    /// Sets unknown_tagged_fields to the passed value.
142    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143        self.unknown_tagged_fields = value;
144        self
145    }
146    /// Inserts an entry into unknown_tagged_fields.
147    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148        self.unknown_tagged_fields.insert(key, value);
149        self
150    }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for FetchPartition {
155    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156        if version < 4 || version > 18 {
157            bail!("specified version not supported by this message type");
158        }
159        types::Int32.encode(buf, &self.partition)?;
160        if version >= 9 {
161            types::Int32.encode(buf, &self.current_leader_epoch)?;
162        }
163        types::Int64.encode(buf, &self.fetch_offset)?;
164        if version >= 12 {
165            types::Int32.encode(buf, &self.last_fetched_epoch)?;
166        } else {
167            if self.last_fetched_epoch != -1 {
168                bail!("A field is set that is not available on the selected protocol version");
169            }
170        }
171        if version >= 5 {
172            types::Int64.encode(buf, &self.log_start_offset)?;
173        }
174        types::Int32.encode(buf, &self.partition_max_bytes)?;
175        if version >= 12 {
176            let mut num_tagged_fields = self.unknown_tagged_fields.len();
177            if version >= 17 {
178                if &self.replica_directory_id != &Uuid::nil() {
179                    num_tagged_fields += 1;
180                }
181            }
182            if version >= 18 {
183                if self.high_watermark != 9223372036854775807 {
184                    num_tagged_fields += 1;
185                }
186            }
187            if num_tagged_fields > std::u32::MAX as usize {
188                bail!(
189                    "Too many tagged fields to encode ({} fields)",
190                    num_tagged_fields
191                );
192            }
193            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
194            if version >= 17 {
195                if &self.replica_directory_id != &Uuid::nil() {
196                    let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
197                    if computed_size > std::u32::MAX as usize {
198                        bail!(
199                            "Tagged field is too large to encode ({} bytes)",
200                            computed_size
201                        );
202                    }
203                    types::UnsignedVarInt.encode(buf, 0)?;
204                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
205                    types::Uuid.encode(buf, &self.replica_directory_id)?;
206                }
207            }
208            if version >= 18 {
209                if self.high_watermark != 9223372036854775807 {
210                    let computed_size = types::Int64.compute_size(&self.high_watermark)?;
211                    if computed_size > std::u32::MAX as usize {
212                        bail!(
213                            "Tagged field is too large to encode ({} bytes)",
214                            computed_size
215                        );
216                    }
217                    types::UnsignedVarInt.encode(buf, 1)?;
218                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
219                    types::Int64.encode(buf, &self.high_watermark)?;
220                }
221            }
222            write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
223        }
224        Ok(())
225    }
226    fn compute_size(&self, version: i16) -> Result<usize> {
227        let mut total_size = 0;
228        total_size += types::Int32.compute_size(&self.partition)?;
229        if version >= 9 {
230            total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
231        }
232        total_size += types::Int64.compute_size(&self.fetch_offset)?;
233        if version >= 12 {
234            total_size += types::Int32.compute_size(&self.last_fetched_epoch)?;
235        } else {
236            if self.last_fetched_epoch != -1 {
237                bail!("A field is set that is not available on the selected protocol version");
238            }
239        }
240        if version >= 5 {
241            total_size += types::Int64.compute_size(&self.log_start_offset)?;
242        }
243        total_size += types::Int32.compute_size(&self.partition_max_bytes)?;
244        if version >= 12 {
245            let mut num_tagged_fields = self.unknown_tagged_fields.len();
246            if version >= 17 {
247                if &self.replica_directory_id != &Uuid::nil() {
248                    num_tagged_fields += 1;
249                }
250            }
251            if version >= 18 {
252                if self.high_watermark != 9223372036854775807 {
253                    num_tagged_fields += 1;
254                }
255            }
256            if num_tagged_fields > std::u32::MAX as usize {
257                bail!(
258                    "Too many tagged fields to encode ({} fields)",
259                    num_tagged_fields
260                );
261            }
262            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
263            if version >= 17 {
264                if &self.replica_directory_id != &Uuid::nil() {
265                    let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
266                    if computed_size > std::u32::MAX as usize {
267                        bail!(
268                            "Tagged field is too large to encode ({} bytes)",
269                            computed_size
270                        );
271                    }
272                    total_size += types::UnsignedVarInt.compute_size(0)?;
273                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
274                    total_size += computed_size;
275                }
276            }
277            if version >= 18 {
278                if self.high_watermark != 9223372036854775807 {
279                    let computed_size = types::Int64.compute_size(&self.high_watermark)?;
280                    if computed_size > std::u32::MAX as usize {
281                        bail!(
282                            "Tagged field is too large to encode ({} bytes)",
283                            computed_size
284                        );
285                    }
286                    total_size += types::UnsignedVarInt.compute_size(1)?;
287                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
288                    total_size += computed_size;
289                }
290            }
291            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
292        }
293        Ok(total_size)
294    }
295}
296
297#[cfg(feature = "broker")]
298impl Decodable for FetchPartition {
299    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
300        if version < 4 || version > 18 {
301            bail!("specified version not supported by this message type");
302        }
303        let partition = types::Int32.decode(buf)?;
304        let current_leader_epoch = if version >= 9 {
305            types::Int32.decode(buf)?
306        } else {
307            -1
308        };
309        let fetch_offset = types::Int64.decode(buf)?;
310        let last_fetched_epoch = if version >= 12 {
311            types::Int32.decode(buf)?
312        } else {
313            -1
314        };
315        let log_start_offset = if version >= 5 {
316            types::Int64.decode(buf)?
317        } else {
318            -1
319        };
320        let partition_max_bytes = types::Int32.decode(buf)?;
321        let mut replica_directory_id = Uuid::nil();
322        let mut high_watermark = 9223372036854775807;
323        let mut unknown_tagged_fields = BTreeMap::new();
324        if version >= 12 {
325            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
326            for _ in 0..num_tagged_fields {
327                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
328                let size: u32 = types::UnsignedVarInt.decode(buf)?;
329                match tag {
330                    0 => {
331                        if version >= 17 {
332                            replica_directory_id = types::Uuid.decode(buf)?;
333                        } else {
334                            bail!("Tag {} is not valid for version {}", tag, version);
335                        }
336                    }
337                    1 => {
338                        if version >= 18 {
339                            high_watermark = types::Int64.decode(buf)?;
340                        } else {
341                            bail!("Tag {} is not valid for version {}", tag, version);
342                        }
343                    }
344                    _ => {
345                        let unknown_value = buf.try_get_bytes(size as usize)?;
346                        unknown_tagged_fields.insert(tag as i32, unknown_value);
347                    }
348                }
349            }
350        }
351        Ok(Self {
352            partition,
353            current_leader_epoch,
354            fetch_offset,
355            last_fetched_epoch,
356            log_start_offset,
357            partition_max_bytes,
358            replica_directory_id,
359            high_watermark,
360            unknown_tagged_fields,
361        })
362    }
363}
364
365impl Default for FetchPartition {
366    fn default() -> Self {
367        Self {
368            partition: 0,
369            current_leader_epoch: -1,
370            fetch_offset: 0,
371            last_fetched_epoch: -1,
372            log_start_offset: -1,
373            partition_max_bytes: 0,
374            replica_directory_id: Uuid::nil(),
375            high_watermark: 9223372036854775807,
376            unknown_tagged_fields: BTreeMap::new(),
377        }
378    }
379}
380
381impl Message for FetchPartition {
382    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
383    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
384}
385
386/// Valid versions: 4-18
387#[non_exhaustive]
388#[derive(Debug, Clone, PartialEq)]
389pub struct FetchRequest {
390    /// The clusterId if known. This is used to validate metadata fetches prior to broker registration.
391    ///
392    /// Supported API versions: 12-18
393    pub cluster_id: Option<StrBytes>,
394
395    /// The broker ID of the follower, of -1 if this request is from a consumer.
396    ///
397    /// Supported API versions: 4-14
398    pub replica_id: super::BrokerId,
399
400    /// The state of the replica in the follower.
401    ///
402    /// Supported API versions: 15-18
403    pub replica_state: ReplicaState,
404
405    /// The maximum time in milliseconds to wait for the response.
406    ///
407    /// Supported API versions: 4-18
408    pub max_wait_ms: i32,
409
410    /// The minimum bytes to accumulate in the response.
411    ///
412    /// Supported API versions: 4-18
413    pub min_bytes: i32,
414
415    /// The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.
416    ///
417    /// Supported API versions: 4-18
418    pub max_bytes: i32,
419
420    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records.
421    ///
422    /// Supported API versions: 4-18
423    pub isolation_level: i8,
424
425    /// The fetch session ID.
426    ///
427    /// Supported API versions: 7-18
428    pub session_id: i32,
429
430    /// The fetch session epoch, which is used for ordering requests in a session.
431    ///
432    /// Supported API versions: 7-18
433    pub session_epoch: i32,
434
435    /// The topics to fetch.
436    ///
437    /// Supported API versions: 4-18
438    pub topics: Vec<FetchTopic>,
439
440    /// In an incremental fetch request, the partitions to remove.
441    ///
442    /// Supported API versions: 7-18
443    pub forgotten_topics_data: Vec<ForgottenTopic>,
444
445    /// Rack ID of the consumer making this request.
446    ///
447    /// Supported API versions: 11-18
448    pub rack_id: StrBytes,
449
450    /// Other tagged fields
451    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
452}
453
454impl FetchRequest {
455    /// Sets `cluster_id` to the passed value.
456    ///
457    /// The clusterId if known. This is used to validate metadata fetches prior to broker registration.
458    ///
459    /// Supported API versions: 12-18
460    pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
461        self.cluster_id = value;
462        self
463    }
464    /// Sets `replica_id` to the passed value.
465    ///
466    /// The broker ID of the follower, of -1 if this request is from a consumer.
467    ///
468    /// Supported API versions: 4-14
469    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
470        self.replica_id = value;
471        self
472    }
473    /// Sets `replica_state` to the passed value.
474    ///
475    /// The state of the replica in the follower.
476    ///
477    /// Supported API versions: 15-18
478    pub fn with_replica_state(mut self, value: ReplicaState) -> Self {
479        self.replica_state = value;
480        self
481    }
482    /// Sets `max_wait_ms` to the passed value.
483    ///
484    /// The maximum time in milliseconds to wait for the response.
485    ///
486    /// Supported API versions: 4-18
487    pub fn with_max_wait_ms(mut self, value: i32) -> Self {
488        self.max_wait_ms = value;
489        self
490    }
491    /// Sets `min_bytes` to the passed value.
492    ///
493    /// The minimum bytes to accumulate in the response.
494    ///
495    /// Supported API versions: 4-18
496    pub fn with_min_bytes(mut self, value: i32) -> Self {
497        self.min_bytes = value;
498        self
499    }
500    /// Sets `max_bytes` to the passed value.
501    ///
502    /// The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.
503    ///
504    /// Supported API versions: 4-18
505    pub fn with_max_bytes(mut self, value: i32) -> Self {
506        self.max_bytes = value;
507        self
508    }
509    /// Sets `isolation_level` to the passed value.
510    ///
511    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records.
512    ///
513    /// Supported API versions: 4-18
514    pub fn with_isolation_level(mut self, value: i8) -> Self {
515        self.isolation_level = value;
516        self
517    }
518    /// Sets `session_id` to the passed value.
519    ///
520    /// The fetch session ID.
521    ///
522    /// Supported API versions: 7-18
523    pub fn with_session_id(mut self, value: i32) -> Self {
524        self.session_id = value;
525        self
526    }
527    /// Sets `session_epoch` to the passed value.
528    ///
529    /// The fetch session epoch, which is used for ordering requests in a session.
530    ///
531    /// Supported API versions: 7-18
532    pub fn with_session_epoch(mut self, value: i32) -> Self {
533        self.session_epoch = value;
534        self
535    }
536    /// Sets `topics` to the passed value.
537    ///
538    /// The topics to fetch.
539    ///
540    /// Supported API versions: 4-18
541    pub fn with_topics(mut self, value: Vec<FetchTopic>) -> Self {
542        self.topics = value;
543        self
544    }
545    /// Sets `forgotten_topics_data` to the passed value.
546    ///
547    /// In an incremental fetch request, the partitions to remove.
548    ///
549    /// Supported API versions: 7-18
550    pub fn with_forgotten_topics_data(mut self, value: Vec<ForgottenTopic>) -> Self {
551        self.forgotten_topics_data = value;
552        self
553    }
554    /// Sets `rack_id` to the passed value.
555    ///
556    /// Rack ID of the consumer making this request.
557    ///
558    /// Supported API versions: 11-18
559    pub fn with_rack_id(mut self, value: StrBytes) -> Self {
560        self.rack_id = value;
561        self
562    }
563    /// Sets unknown_tagged_fields to the passed value.
564    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
565        self.unknown_tagged_fields = value;
566        self
567    }
568    /// Inserts an entry into unknown_tagged_fields.
569    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
570        self.unknown_tagged_fields.insert(key, value);
571        self
572    }
573}
574
575#[cfg(feature = "client")]
576impl Encodable for FetchRequest {
577    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
578        if version < 4 || version > 18 {
579            bail!("specified version not supported by this message type");
580        }
581        if version <= 14 {
582            types::Int32.encode(buf, &self.replica_id)?;
583        } else {
584            if self.replica_id != -1 {
585                bail!("A field is set that is not available on the selected protocol version");
586            }
587        }
588        types::Int32.encode(buf, &self.max_wait_ms)?;
589        types::Int32.encode(buf, &self.min_bytes)?;
590        types::Int32.encode(buf, &self.max_bytes)?;
591        types::Int8.encode(buf, &self.isolation_level)?;
592        if version >= 7 {
593            types::Int32.encode(buf, &self.session_id)?;
594        }
595        if version >= 7 {
596            types::Int32.encode(buf, &self.session_epoch)?;
597        }
598        if version >= 12 {
599            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
600        } else {
601            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
602        }
603        if version >= 7 {
604            if version >= 12 {
605                types::CompactArray(types::Struct { version })
606                    .encode(buf, &self.forgotten_topics_data)?;
607            } else {
608                types::Array(types::Struct { version }).encode(buf, &self.forgotten_topics_data)?;
609            }
610        } else {
611            if !self.forgotten_topics_data.is_empty() {
612                bail!("A field is set that is not available on the selected protocol version");
613            }
614        }
615        if version >= 11 {
616            if version >= 12 {
617                types::CompactString.encode(buf, &self.rack_id)?;
618            } else {
619                types::String.encode(buf, &self.rack_id)?;
620            }
621        }
622        if version >= 12 {
623            let mut num_tagged_fields = self.unknown_tagged_fields.len();
624            if !self.cluster_id.is_none() {
625                num_tagged_fields += 1;
626            }
627            if version >= 15 {
628                if &self.replica_state != &Default::default() {
629                    num_tagged_fields += 1;
630                }
631            }
632            if num_tagged_fields > std::u32::MAX as usize {
633                bail!(
634                    "Too many tagged fields to encode ({} fields)",
635                    num_tagged_fields
636                );
637            }
638            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
639            if !self.cluster_id.is_none() {
640                let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
641                if computed_size > std::u32::MAX as usize {
642                    bail!(
643                        "Tagged field is too large to encode ({} bytes)",
644                        computed_size
645                    );
646                }
647                types::UnsignedVarInt.encode(buf, 0)?;
648                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
649                types::CompactString.encode(buf, &self.cluster_id)?;
650            }
651            if version >= 15 {
652                if &self.replica_state != &Default::default() {
653                    let computed_size =
654                        types::Struct { version }.compute_size(&self.replica_state)?;
655                    if computed_size > std::u32::MAX as usize {
656                        bail!(
657                            "Tagged field is too large to encode ({} bytes)",
658                            computed_size
659                        );
660                    }
661                    types::UnsignedVarInt.encode(buf, 1)?;
662                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
663                    types::Struct { version }.encode(buf, &self.replica_state)?;
664                }
665            }
666            write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
667        }
668        Ok(())
669    }
670    fn compute_size(&self, version: i16) -> Result<usize> {
671        let mut total_size = 0;
672        if version <= 14 {
673            total_size += types::Int32.compute_size(&self.replica_id)?;
674        } else {
675            if self.replica_id != -1 {
676                bail!("A field is set that is not available on the selected protocol version");
677            }
678        }
679        total_size += types::Int32.compute_size(&self.max_wait_ms)?;
680        total_size += types::Int32.compute_size(&self.min_bytes)?;
681        total_size += types::Int32.compute_size(&self.max_bytes)?;
682        total_size += types::Int8.compute_size(&self.isolation_level)?;
683        if version >= 7 {
684            total_size += types::Int32.compute_size(&self.session_id)?;
685        }
686        if version >= 7 {
687            total_size += types::Int32.compute_size(&self.session_epoch)?;
688        }
689        if version >= 12 {
690            total_size +=
691                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
692        } else {
693            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
694        }
695        if version >= 7 {
696            if version >= 12 {
697                total_size += types::CompactArray(types::Struct { version })
698                    .compute_size(&self.forgotten_topics_data)?;
699            } else {
700                total_size += types::Array(types::Struct { version })
701                    .compute_size(&self.forgotten_topics_data)?;
702            }
703        } else {
704            if !self.forgotten_topics_data.is_empty() {
705                bail!("A field is set that is not available on the selected protocol version");
706            }
707        }
708        if version >= 11 {
709            if version >= 12 {
710                total_size += types::CompactString.compute_size(&self.rack_id)?;
711            } else {
712                total_size += types::String.compute_size(&self.rack_id)?;
713            }
714        }
715        if version >= 12 {
716            let mut num_tagged_fields = self.unknown_tagged_fields.len();
717            if !self.cluster_id.is_none() {
718                num_tagged_fields += 1;
719            }
720            if version >= 15 {
721                if &self.replica_state != &Default::default() {
722                    num_tagged_fields += 1;
723                }
724            }
725            if num_tagged_fields > std::u32::MAX as usize {
726                bail!(
727                    "Too many tagged fields to encode ({} fields)",
728                    num_tagged_fields
729                );
730            }
731            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
732            if !self.cluster_id.is_none() {
733                let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
734                if computed_size > std::u32::MAX as usize {
735                    bail!(
736                        "Tagged field is too large to encode ({} bytes)",
737                        computed_size
738                    );
739                }
740                total_size += types::UnsignedVarInt.compute_size(0)?;
741                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
742                total_size += computed_size;
743            }
744            if version >= 15 {
745                if &self.replica_state != &Default::default() {
746                    let computed_size =
747                        types::Struct { version }.compute_size(&self.replica_state)?;
748                    if computed_size > std::u32::MAX as usize {
749                        bail!(
750                            "Tagged field is too large to encode ({} bytes)",
751                            computed_size
752                        );
753                    }
754                    total_size += types::UnsignedVarInt.compute_size(1)?;
755                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
756                    total_size += computed_size;
757                }
758            }
759            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
760        }
761        Ok(total_size)
762    }
763}
764
765#[cfg(feature = "broker")]
766impl Decodable for FetchRequest {
767    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
768        if version < 4 || version > 18 {
769            bail!("specified version not supported by this message type");
770        }
771        let mut cluster_id = None;
772        let replica_id = if version <= 14 {
773            types::Int32.decode(buf)?
774        } else {
775            (-1).into()
776        };
777        let mut replica_state = Default::default();
778        let max_wait_ms = types::Int32.decode(buf)?;
779        let min_bytes = types::Int32.decode(buf)?;
780        let max_bytes = types::Int32.decode(buf)?;
781        let isolation_level = types::Int8.decode(buf)?;
782        let session_id = if version >= 7 {
783            types::Int32.decode(buf)?
784        } else {
785            0
786        };
787        let session_epoch = if version >= 7 {
788            types::Int32.decode(buf)?
789        } else {
790            -1
791        };
792        let topics = if version >= 12 {
793            types::CompactArray(types::Struct { version }).decode(buf)?
794        } else {
795            types::Array(types::Struct { version }).decode(buf)?
796        };
797        let forgotten_topics_data = if version >= 7 {
798            if version >= 12 {
799                types::CompactArray(types::Struct { version }).decode(buf)?
800            } else {
801                types::Array(types::Struct { version }).decode(buf)?
802            }
803        } else {
804            Default::default()
805        };
806        let rack_id = if version >= 11 {
807            if version >= 12 {
808                types::CompactString.decode(buf)?
809            } else {
810                types::String.decode(buf)?
811            }
812        } else {
813            StrBytes::from_static_str("")
814        };
815        let mut unknown_tagged_fields = BTreeMap::new();
816        if version >= 12 {
817            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
818            for _ in 0..num_tagged_fields {
819                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
820                let size: u32 = types::UnsignedVarInt.decode(buf)?;
821                match tag {
822                    0 => {
823                        cluster_id = types::CompactString.decode(buf)?;
824                    }
825                    1 => {
826                        if version >= 15 {
827                            replica_state = types::Struct { version }.decode(buf)?;
828                        } else {
829                            bail!("Tag {} is not valid for version {}", tag, version);
830                        }
831                    }
832                    _ => {
833                        let unknown_value = buf.try_get_bytes(size as usize)?;
834                        unknown_tagged_fields.insert(tag as i32, unknown_value);
835                    }
836                }
837            }
838        }
839        Ok(Self {
840            cluster_id,
841            replica_id,
842            replica_state,
843            max_wait_ms,
844            min_bytes,
845            max_bytes,
846            isolation_level,
847            session_id,
848            session_epoch,
849            topics,
850            forgotten_topics_data,
851            rack_id,
852            unknown_tagged_fields,
853        })
854    }
855}
856
857impl Default for FetchRequest {
858    fn default() -> Self {
859        Self {
860            cluster_id: None,
861            replica_id: (-1).into(),
862            replica_state: Default::default(),
863            max_wait_ms: 0,
864            min_bytes: 0,
865            max_bytes: 0x7fffffff,
866            isolation_level: 0,
867            session_id: 0,
868            session_epoch: -1,
869            topics: Default::default(),
870            forgotten_topics_data: Default::default(),
871            rack_id: StrBytes::from_static_str(""),
872            unknown_tagged_fields: BTreeMap::new(),
873        }
874    }
875}
876
877impl Message for FetchRequest {
878    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
879    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
880}
881
882/// Valid versions: 4-18
883#[non_exhaustive]
884#[derive(Debug, Clone, PartialEq)]
885pub struct FetchTopic {
886    /// The name of the topic to fetch.
887    ///
888    /// Supported API versions: 4-12
889    pub topic: super::TopicName,
890
891    /// The unique topic ID.
892    ///
893    /// Supported API versions: 13-18
894    pub topic_id: Uuid,
895
896    /// The partitions to fetch.
897    ///
898    /// Supported API versions: 4-18
899    pub partitions: Vec<FetchPartition>,
900
901    /// Other tagged fields
902    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
903}
904
905impl FetchTopic {
906    /// Sets `topic` to the passed value.
907    ///
908    /// The name of the topic to fetch.
909    ///
910    /// Supported API versions: 4-12
911    pub fn with_topic(mut self, value: super::TopicName) -> Self {
912        self.topic = value;
913        self
914    }
915    /// Sets `topic_id` to the passed value.
916    ///
917    /// The unique topic ID.
918    ///
919    /// Supported API versions: 13-18
920    pub fn with_topic_id(mut self, value: Uuid) -> Self {
921        self.topic_id = value;
922        self
923    }
924    /// Sets `partitions` to the passed value.
925    ///
926    /// The partitions to fetch.
927    ///
928    /// Supported API versions: 4-18
929    pub fn with_partitions(mut self, value: Vec<FetchPartition>) -> Self {
930        self.partitions = value;
931        self
932    }
933    /// Sets unknown_tagged_fields to the passed value.
934    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
935        self.unknown_tagged_fields = value;
936        self
937    }
938    /// Inserts an entry into unknown_tagged_fields.
939    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
940        self.unknown_tagged_fields.insert(key, value);
941        self
942    }
943}
944
945#[cfg(feature = "client")]
946impl Encodable for FetchTopic {
947    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
948        if version < 4 || version > 18 {
949            bail!("specified version not supported by this message type");
950        }
951        if version <= 12 {
952            if version >= 12 {
953                types::CompactString.encode(buf, &self.topic)?;
954            } else {
955                types::String.encode(buf, &self.topic)?;
956            }
957        }
958        if version >= 13 {
959            types::Uuid.encode(buf, &self.topic_id)?;
960        }
961        if version >= 12 {
962            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
963        } else {
964            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
965        }
966        if version >= 12 {
967            let num_tagged_fields = self.unknown_tagged_fields.len();
968            if num_tagged_fields > std::u32::MAX as usize {
969                bail!(
970                    "Too many tagged fields to encode ({} fields)",
971                    num_tagged_fields
972                );
973            }
974            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
975
976            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
977        }
978        Ok(())
979    }
980    fn compute_size(&self, version: i16) -> Result<usize> {
981        let mut total_size = 0;
982        if version <= 12 {
983            if version >= 12 {
984                total_size += types::CompactString.compute_size(&self.topic)?;
985            } else {
986                total_size += types::String.compute_size(&self.topic)?;
987            }
988        }
989        if version >= 13 {
990            total_size += types::Uuid.compute_size(&self.topic_id)?;
991        }
992        if version >= 12 {
993            total_size +=
994                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
995        } else {
996            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
997        }
998        if version >= 12 {
999            let num_tagged_fields = self.unknown_tagged_fields.len();
1000            if num_tagged_fields > std::u32::MAX as usize {
1001                bail!(
1002                    "Too many tagged fields to encode ({} fields)",
1003                    num_tagged_fields
1004                );
1005            }
1006            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1007
1008            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1009        }
1010        Ok(total_size)
1011    }
1012}
1013
1014#[cfg(feature = "broker")]
1015impl Decodable for FetchTopic {
1016    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1017        if version < 4 || version > 18 {
1018            bail!("specified version not supported by this message type");
1019        }
1020        let topic = if version <= 12 {
1021            if version >= 12 {
1022                types::CompactString.decode(buf)?
1023            } else {
1024                types::String.decode(buf)?
1025            }
1026        } else {
1027            Default::default()
1028        };
1029        let topic_id = if version >= 13 {
1030            types::Uuid.decode(buf)?
1031        } else {
1032            Uuid::nil()
1033        };
1034        let partitions = if version >= 12 {
1035            types::CompactArray(types::Struct { version }).decode(buf)?
1036        } else {
1037            types::Array(types::Struct { version }).decode(buf)?
1038        };
1039        let mut unknown_tagged_fields = BTreeMap::new();
1040        if version >= 12 {
1041            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1042            for _ in 0..num_tagged_fields {
1043                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1044                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1045                let unknown_value = buf.try_get_bytes(size as usize)?;
1046                unknown_tagged_fields.insert(tag as i32, unknown_value);
1047            }
1048        }
1049        Ok(Self {
1050            topic,
1051            topic_id,
1052            partitions,
1053            unknown_tagged_fields,
1054        })
1055    }
1056}
1057
1058impl Default for FetchTopic {
1059    fn default() -> Self {
1060        Self {
1061            topic: Default::default(),
1062            topic_id: Uuid::nil(),
1063            partitions: Default::default(),
1064            unknown_tagged_fields: BTreeMap::new(),
1065        }
1066    }
1067}
1068
1069impl Message for FetchTopic {
1070    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1071    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1072}
1073
1074/// Valid versions: 4-18
1075#[non_exhaustive]
1076#[derive(Debug, Clone, PartialEq)]
1077pub struct ForgottenTopic {
1078    /// The topic name.
1079    ///
1080    /// Supported API versions: 7-12
1081    pub topic: super::TopicName,
1082
1083    /// The unique topic ID.
1084    ///
1085    /// Supported API versions: 13-18
1086    pub topic_id: Uuid,
1087
1088    /// The partitions indexes to forget.
1089    ///
1090    /// Supported API versions: 7-18
1091    pub partitions: Vec<i32>,
1092
1093    /// Other tagged fields
1094    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1095}
1096
1097impl ForgottenTopic {
1098    /// Sets `topic` to the passed value.
1099    ///
1100    /// The topic name.
1101    ///
1102    /// Supported API versions: 7-12
1103    pub fn with_topic(mut self, value: super::TopicName) -> Self {
1104        self.topic = value;
1105        self
1106    }
1107    /// Sets `topic_id` to the passed value.
1108    ///
1109    /// The unique topic ID.
1110    ///
1111    /// Supported API versions: 13-18
1112    pub fn with_topic_id(mut self, value: Uuid) -> Self {
1113        self.topic_id = value;
1114        self
1115    }
1116    /// Sets `partitions` to the passed value.
1117    ///
1118    /// The partitions indexes to forget.
1119    ///
1120    /// Supported API versions: 7-18
1121    pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
1122        self.partitions = value;
1123        self
1124    }
1125    /// Sets unknown_tagged_fields to the passed value.
1126    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1127        self.unknown_tagged_fields = value;
1128        self
1129    }
1130    /// Inserts an entry into unknown_tagged_fields.
1131    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1132        self.unknown_tagged_fields.insert(key, value);
1133        self
1134    }
1135}
1136
1137#[cfg(feature = "client")]
1138impl Encodable for ForgottenTopic {
1139    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1140        if version < 4 || version > 18 {
1141            bail!("specified version not supported by this message type");
1142        }
1143        if version >= 7 && version <= 12 {
1144            if version >= 12 {
1145                types::CompactString.encode(buf, &self.topic)?;
1146            } else {
1147                types::String.encode(buf, &self.topic)?;
1148            }
1149        }
1150        if version >= 13 {
1151            types::Uuid.encode(buf, &self.topic_id)?;
1152        }
1153        if version >= 7 {
1154            if version >= 12 {
1155                types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
1156            } else {
1157                types::Array(types::Int32).encode(buf, &self.partitions)?;
1158            }
1159        } else {
1160            if !self.partitions.is_empty() {
1161                bail!("A field is set that is not available on the selected protocol version");
1162            }
1163        }
1164        if version >= 12 {
1165            let num_tagged_fields = self.unknown_tagged_fields.len();
1166            if num_tagged_fields > std::u32::MAX as usize {
1167                bail!(
1168                    "Too many tagged fields to encode ({} fields)",
1169                    num_tagged_fields
1170                );
1171            }
1172            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1173
1174            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1175        }
1176        Ok(())
1177    }
1178    fn compute_size(&self, version: i16) -> Result<usize> {
1179        let mut total_size = 0;
1180        if version >= 7 && version <= 12 {
1181            if version >= 12 {
1182                total_size += types::CompactString.compute_size(&self.topic)?;
1183            } else {
1184                total_size += types::String.compute_size(&self.topic)?;
1185            }
1186        }
1187        if version >= 13 {
1188            total_size += types::Uuid.compute_size(&self.topic_id)?;
1189        }
1190        if version >= 7 {
1191            if version >= 12 {
1192                total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
1193            } else {
1194                total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
1195            }
1196        } else {
1197            if !self.partitions.is_empty() {
1198                bail!("A field is set that is not available on the selected protocol version");
1199            }
1200        }
1201        if version >= 12 {
1202            let num_tagged_fields = self.unknown_tagged_fields.len();
1203            if num_tagged_fields > std::u32::MAX as usize {
1204                bail!(
1205                    "Too many tagged fields to encode ({} fields)",
1206                    num_tagged_fields
1207                );
1208            }
1209            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1210
1211            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1212        }
1213        Ok(total_size)
1214    }
1215}
1216
1217#[cfg(feature = "broker")]
1218impl Decodable for ForgottenTopic {
1219    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1220        if version < 4 || version > 18 {
1221            bail!("specified version not supported by this message type");
1222        }
1223        let topic = if version >= 7 && version <= 12 {
1224            if version >= 12 {
1225                types::CompactString.decode(buf)?
1226            } else {
1227                types::String.decode(buf)?
1228            }
1229        } else {
1230            Default::default()
1231        };
1232        let topic_id = if version >= 13 {
1233            types::Uuid.decode(buf)?
1234        } else {
1235            Uuid::nil()
1236        };
1237        let partitions = if version >= 7 {
1238            if version >= 12 {
1239                types::CompactArray(types::Int32).decode(buf)?
1240            } else {
1241                types::Array(types::Int32).decode(buf)?
1242            }
1243        } else {
1244            Default::default()
1245        };
1246        let mut unknown_tagged_fields = BTreeMap::new();
1247        if version >= 12 {
1248            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1249            for _ in 0..num_tagged_fields {
1250                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1251                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1252                let unknown_value = buf.try_get_bytes(size as usize)?;
1253                unknown_tagged_fields.insert(tag as i32, unknown_value);
1254            }
1255        }
1256        Ok(Self {
1257            topic,
1258            topic_id,
1259            partitions,
1260            unknown_tagged_fields,
1261        })
1262    }
1263}
1264
1265impl Default for ForgottenTopic {
1266    fn default() -> Self {
1267        Self {
1268            topic: Default::default(),
1269            topic_id: Uuid::nil(),
1270            partitions: Default::default(),
1271            unknown_tagged_fields: BTreeMap::new(),
1272        }
1273    }
1274}
1275
1276impl Message for ForgottenTopic {
1277    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1278    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1279}
1280
1281/// Valid versions: 4-18
1282#[non_exhaustive]
1283#[derive(Debug, Clone, PartialEq)]
1284pub struct ReplicaState {
1285    /// The replica ID of the follower, or -1 if this request is from a consumer.
1286    ///
1287    /// Supported API versions: 15-18
1288    pub replica_id: super::BrokerId,
1289
1290    /// The epoch of this follower, or -1 if not available.
1291    ///
1292    /// Supported API versions: 15-18
1293    pub replica_epoch: i64,
1294
1295    /// Other tagged fields
1296    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1297}
1298
1299impl ReplicaState {
1300    /// Sets `replica_id` to the passed value.
1301    ///
1302    /// The replica ID of the follower, or -1 if this request is from a consumer.
1303    ///
1304    /// Supported API versions: 15-18
1305    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
1306        self.replica_id = value;
1307        self
1308    }
1309    /// Sets `replica_epoch` to the passed value.
1310    ///
1311    /// The epoch of this follower, or -1 if not available.
1312    ///
1313    /// Supported API versions: 15-18
1314    pub fn with_replica_epoch(mut self, value: i64) -> Self {
1315        self.replica_epoch = value;
1316        self
1317    }
1318    /// Sets unknown_tagged_fields to the passed value.
1319    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1320        self.unknown_tagged_fields = value;
1321        self
1322    }
1323    /// Inserts an entry into unknown_tagged_fields.
1324    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1325        self.unknown_tagged_fields.insert(key, value);
1326        self
1327    }
1328}
1329
1330#[cfg(feature = "client")]
1331impl Encodable for ReplicaState {
1332    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1333        if version < 4 || version > 18 {
1334            bail!("specified version not supported by this message type");
1335        }
1336        if version >= 15 {
1337            types::Int32.encode(buf, &self.replica_id)?;
1338        } else {
1339            if self.replica_id != -1 {
1340                bail!("A field is set that is not available on the selected protocol version");
1341            }
1342        }
1343        if version >= 15 {
1344            types::Int64.encode(buf, &self.replica_epoch)?;
1345        } else {
1346            if self.replica_epoch != -1 {
1347                bail!("A field is set that is not available on the selected protocol version");
1348            }
1349        }
1350        if version >= 12 {
1351            let num_tagged_fields = self.unknown_tagged_fields.len();
1352            if num_tagged_fields > std::u32::MAX as usize {
1353                bail!(
1354                    "Too many tagged fields to encode ({} fields)",
1355                    num_tagged_fields
1356                );
1357            }
1358            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1359
1360            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1361        }
1362        Ok(())
1363    }
1364    fn compute_size(&self, version: i16) -> Result<usize> {
1365        let mut total_size = 0;
1366        if version >= 15 {
1367            total_size += types::Int32.compute_size(&self.replica_id)?;
1368        } else {
1369            if self.replica_id != -1 {
1370                bail!("A field is set that is not available on the selected protocol version");
1371            }
1372        }
1373        if version >= 15 {
1374            total_size += types::Int64.compute_size(&self.replica_epoch)?;
1375        } else {
1376            if self.replica_epoch != -1 {
1377                bail!("A field is set that is not available on the selected protocol version");
1378            }
1379        }
1380        if version >= 12 {
1381            let num_tagged_fields = self.unknown_tagged_fields.len();
1382            if num_tagged_fields > std::u32::MAX as usize {
1383                bail!(
1384                    "Too many tagged fields to encode ({} fields)",
1385                    num_tagged_fields
1386                );
1387            }
1388            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1389
1390            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1391        }
1392        Ok(total_size)
1393    }
1394}
1395
1396#[cfg(feature = "broker")]
1397impl Decodable for ReplicaState {
1398    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1399        if version < 4 || version > 18 {
1400            bail!("specified version not supported by this message type");
1401        }
1402        let replica_id = if version >= 15 {
1403            types::Int32.decode(buf)?
1404        } else {
1405            (-1).into()
1406        };
1407        let replica_epoch = if version >= 15 {
1408            types::Int64.decode(buf)?
1409        } else {
1410            -1
1411        };
1412        let mut unknown_tagged_fields = BTreeMap::new();
1413        if version >= 12 {
1414            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1415            for _ in 0..num_tagged_fields {
1416                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1417                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1418                let unknown_value = buf.try_get_bytes(size as usize)?;
1419                unknown_tagged_fields.insert(tag as i32, unknown_value);
1420            }
1421        }
1422        Ok(Self {
1423            replica_id,
1424            replica_epoch,
1425            unknown_tagged_fields,
1426        })
1427    }
1428}
1429
1430impl Default for ReplicaState {
1431    fn default() -> Self {
1432        Self {
1433            replica_id: (-1).into(),
1434            replica_epoch: -1,
1435            unknown_tagged_fields: BTreeMap::new(),
1436        }
1437    }
1438}
1439
1440impl Message for ReplicaState {
1441    const VERSIONS: VersionRange = VersionRange { min: 4, max: 18 };
1442    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1443}
1444
1445impl HeaderVersion for FetchRequest {
1446    fn header_version(version: i16) -> i16 {
1447        if version >= 12 {
1448            2
1449        } else {
1450            1
1451        }
1452    }
1453}