kafka_protocol/messages/
fetch_request.rs

1//! FetchRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FetchRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-16
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchPartition {
24    /// The partition index.
25    ///
26    /// Supported API versions: 0-16
27    pub partition: i32,
28
29    /// The current leader epoch of the partition.
30    ///
31    /// Supported API versions: 9-16
32    pub current_leader_epoch: i32,
33
34    /// The message offset.
35    ///
36    /// Supported API versions: 0-16
37    pub fetch_offset: i64,
38
39    /// The epoch of the last fetched record or -1 if there is none
40    ///
41    /// Supported API versions: 12-16
42    pub last_fetched_epoch: i32,
43
44    /// The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower.
45    ///
46    /// Supported API versions: 5-16
47    pub log_start_offset: i64,
48
49    /// The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored.
50    ///
51    /// Supported API versions: 0-16
52    pub partition_max_bytes: i32,
53
54    /// Other tagged fields
55    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl FetchPartition {
59    /// Sets `partition` to the passed value.
60    ///
61    /// The partition index.
62    ///
63    /// Supported API versions: 0-16
64    pub fn with_partition(mut self, value: i32) -> Self {
65        self.partition = value;
66        self
67    }
68    /// Sets `current_leader_epoch` to the passed value.
69    ///
70    /// The current leader epoch of the partition.
71    ///
72    /// Supported API versions: 9-16
73    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
74        self.current_leader_epoch = value;
75        self
76    }
77    /// Sets `fetch_offset` to the passed value.
78    ///
79    /// The message offset.
80    ///
81    /// Supported API versions: 0-16
82    pub fn with_fetch_offset(mut self, value: i64) -> Self {
83        self.fetch_offset = value;
84        self
85    }
86    /// Sets `last_fetched_epoch` to the passed value.
87    ///
88    /// The epoch of the last fetched record or -1 if there is none
89    ///
90    /// Supported API versions: 12-16
91    pub fn with_last_fetched_epoch(mut self, value: i32) -> Self {
92        self.last_fetched_epoch = value;
93        self
94    }
95    /// Sets `log_start_offset` to the passed value.
96    ///
97    /// The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower.
98    ///
99    /// Supported API versions: 5-16
100    pub fn with_log_start_offset(mut self, value: i64) -> Self {
101        self.log_start_offset = value;
102        self
103    }
104    /// Sets `partition_max_bytes` to the passed value.
105    ///
106    /// The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored.
107    ///
108    /// Supported API versions: 0-16
109    pub fn with_partition_max_bytes(mut self, value: i32) -> Self {
110        self.partition_max_bytes = value;
111        self
112    }
113    /// Sets unknown_tagged_fields to the passed value.
114    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115        self.unknown_tagged_fields = value;
116        self
117    }
118    /// Inserts an entry into unknown_tagged_fields.
119    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120        self.unknown_tagged_fields.insert(key, value);
121        self
122    }
123}
124
125#[cfg(feature = "client")]
126impl Encodable for FetchPartition {
127    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128        types::Int32.encode(buf, &self.partition)?;
129        if version >= 9 {
130            types::Int32.encode(buf, &self.current_leader_epoch)?;
131        }
132        types::Int64.encode(buf, &self.fetch_offset)?;
133        if version >= 12 {
134            types::Int32.encode(buf, &self.last_fetched_epoch)?;
135        } else {
136            if self.last_fetched_epoch != -1 {
137                bail!("A field is set that is not available on the selected protocol version");
138            }
139        }
140        if version >= 5 {
141            types::Int64.encode(buf, &self.log_start_offset)?;
142        }
143        types::Int32.encode(buf, &self.partition_max_bytes)?;
144        if version >= 12 {
145            let num_tagged_fields = self.unknown_tagged_fields.len();
146            if num_tagged_fields > std::u32::MAX as usize {
147                bail!(
148                    "Too many tagged fields to encode ({} fields)",
149                    num_tagged_fields
150                );
151            }
152            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
153
154            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
155        }
156        Ok(())
157    }
158    fn compute_size(&self, version: i16) -> Result<usize> {
159        let mut total_size = 0;
160        total_size += types::Int32.compute_size(&self.partition)?;
161        if version >= 9 {
162            total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
163        }
164        total_size += types::Int64.compute_size(&self.fetch_offset)?;
165        if version >= 12 {
166            total_size += types::Int32.compute_size(&self.last_fetched_epoch)?;
167        } else {
168            if self.last_fetched_epoch != -1 {
169                bail!("A field is set that is not available on the selected protocol version");
170            }
171        }
172        if version >= 5 {
173            total_size += types::Int64.compute_size(&self.log_start_offset)?;
174        }
175        total_size += types::Int32.compute_size(&self.partition_max_bytes)?;
176        if version >= 12 {
177            let num_tagged_fields = self.unknown_tagged_fields.len();
178            if num_tagged_fields > std::u32::MAX as usize {
179                bail!(
180                    "Too many tagged fields to encode ({} fields)",
181                    num_tagged_fields
182                );
183            }
184            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
185
186            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
187        }
188        Ok(total_size)
189    }
190}
191
192#[cfg(feature = "broker")]
193impl Decodable for FetchPartition {
194    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
195        let partition = types::Int32.decode(buf)?;
196        let current_leader_epoch = if version >= 9 {
197            types::Int32.decode(buf)?
198        } else {
199            -1
200        };
201        let fetch_offset = types::Int64.decode(buf)?;
202        let last_fetched_epoch = if version >= 12 {
203            types::Int32.decode(buf)?
204        } else {
205            -1
206        };
207        let log_start_offset = if version >= 5 {
208            types::Int64.decode(buf)?
209        } else {
210            -1
211        };
212        let partition_max_bytes = types::Int32.decode(buf)?;
213        let mut unknown_tagged_fields = BTreeMap::new();
214        if version >= 12 {
215            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
216            for _ in 0..num_tagged_fields {
217                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
218                let size: u32 = types::UnsignedVarInt.decode(buf)?;
219                let unknown_value = buf.try_get_bytes(size as usize)?;
220                unknown_tagged_fields.insert(tag as i32, unknown_value);
221            }
222        }
223        Ok(Self {
224            partition,
225            current_leader_epoch,
226            fetch_offset,
227            last_fetched_epoch,
228            log_start_offset,
229            partition_max_bytes,
230            unknown_tagged_fields,
231        })
232    }
233}
234
235impl Default for FetchPartition {
236    fn default() -> Self {
237        Self {
238            partition: 0,
239            current_leader_epoch: -1,
240            fetch_offset: 0,
241            last_fetched_epoch: -1,
242            log_start_offset: -1,
243            partition_max_bytes: 0,
244            unknown_tagged_fields: BTreeMap::new(),
245        }
246    }
247}
248
249impl Message for FetchPartition {
250    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
251    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
252}
253
254/// Valid versions: 0-16
255#[non_exhaustive]
256#[derive(Debug, Clone, PartialEq)]
257pub struct FetchRequest {
258    /// The clusterId if known. This is used to validate metadata fetches prior to broker registration.
259    ///
260    /// Supported API versions: 12-16
261    pub cluster_id: Option<StrBytes>,
262
263    /// The broker ID of the follower, of -1 if this request is from a consumer.
264    ///
265    /// Supported API versions: 0-14
266    pub replica_id: super::BrokerId,
267
268    ///
269    ///
270    /// Supported API versions: 15-16
271    pub replica_state: ReplicaState,
272
273    /// The maximum time in milliseconds to wait for the response.
274    ///
275    /// Supported API versions: 0-16
276    pub max_wait_ms: i32,
277
278    /// The minimum bytes to accumulate in the response.
279    ///
280    /// Supported API versions: 0-16
281    pub min_bytes: i32,
282
283    /// The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.
284    ///
285    /// Supported API versions: 3-16
286    pub max_bytes: i32,
287
288    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
289    ///
290    /// Supported API versions: 4-16
291    pub isolation_level: i8,
292
293    /// The fetch session ID.
294    ///
295    /// Supported API versions: 7-16
296    pub session_id: i32,
297
298    /// The fetch session epoch, which is used for ordering requests in a session.
299    ///
300    /// Supported API versions: 7-16
301    pub session_epoch: i32,
302
303    /// The topics to fetch.
304    ///
305    /// Supported API versions: 0-16
306    pub topics: Vec<FetchTopic>,
307
308    /// In an incremental fetch request, the partitions to remove.
309    ///
310    /// Supported API versions: 7-16
311    pub forgotten_topics_data: Vec<ForgottenTopic>,
312
313    /// Rack ID of the consumer making this request
314    ///
315    /// Supported API versions: 11-16
316    pub rack_id: StrBytes,
317
318    /// Other tagged fields
319    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
320}
321
322impl FetchRequest {
323    /// Sets `cluster_id` to the passed value.
324    ///
325    /// The clusterId if known. This is used to validate metadata fetches prior to broker registration.
326    ///
327    /// Supported API versions: 12-16
328    pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
329        self.cluster_id = value;
330        self
331    }
332    /// Sets `replica_id` to the passed value.
333    ///
334    /// The broker ID of the follower, of -1 if this request is from a consumer.
335    ///
336    /// Supported API versions: 0-14
337    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
338        self.replica_id = value;
339        self
340    }
341    /// Sets `replica_state` to the passed value.
342    ///
343    ///
344    ///
345    /// Supported API versions: 15-16
346    pub fn with_replica_state(mut self, value: ReplicaState) -> Self {
347        self.replica_state = value;
348        self
349    }
350    /// Sets `max_wait_ms` to the passed value.
351    ///
352    /// The maximum time in milliseconds to wait for the response.
353    ///
354    /// Supported API versions: 0-16
355    pub fn with_max_wait_ms(mut self, value: i32) -> Self {
356        self.max_wait_ms = value;
357        self
358    }
359    /// Sets `min_bytes` to the passed value.
360    ///
361    /// The minimum bytes to accumulate in the response.
362    ///
363    /// Supported API versions: 0-16
364    pub fn with_min_bytes(mut self, value: i32) -> Self {
365        self.min_bytes = value;
366        self
367    }
368    /// Sets `max_bytes` to the passed value.
369    ///
370    /// The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.
371    ///
372    /// Supported API versions: 3-16
373    pub fn with_max_bytes(mut self, value: i32) -> Self {
374        self.max_bytes = value;
375        self
376    }
377    /// Sets `isolation_level` to the passed value.
378    ///
379    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
380    ///
381    /// Supported API versions: 4-16
382    pub fn with_isolation_level(mut self, value: i8) -> Self {
383        self.isolation_level = value;
384        self
385    }
386    /// Sets `session_id` to the passed value.
387    ///
388    /// The fetch session ID.
389    ///
390    /// Supported API versions: 7-16
391    pub fn with_session_id(mut self, value: i32) -> Self {
392        self.session_id = value;
393        self
394    }
395    /// Sets `session_epoch` to the passed value.
396    ///
397    /// The fetch session epoch, which is used for ordering requests in a session.
398    ///
399    /// Supported API versions: 7-16
400    pub fn with_session_epoch(mut self, value: i32) -> Self {
401        self.session_epoch = value;
402        self
403    }
404    /// Sets `topics` to the passed value.
405    ///
406    /// The topics to fetch.
407    ///
408    /// Supported API versions: 0-16
409    pub fn with_topics(mut self, value: Vec<FetchTopic>) -> Self {
410        self.topics = value;
411        self
412    }
413    /// Sets `forgotten_topics_data` to the passed value.
414    ///
415    /// In an incremental fetch request, the partitions to remove.
416    ///
417    /// Supported API versions: 7-16
418    pub fn with_forgotten_topics_data(mut self, value: Vec<ForgottenTopic>) -> Self {
419        self.forgotten_topics_data = value;
420        self
421    }
422    /// Sets `rack_id` to the passed value.
423    ///
424    /// Rack ID of the consumer making this request
425    ///
426    /// Supported API versions: 11-16
427    pub fn with_rack_id(mut self, value: StrBytes) -> Self {
428        self.rack_id = value;
429        self
430    }
431    /// Sets unknown_tagged_fields to the passed value.
432    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
433        self.unknown_tagged_fields = value;
434        self
435    }
436    /// Inserts an entry into unknown_tagged_fields.
437    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
438        self.unknown_tagged_fields.insert(key, value);
439        self
440    }
441}
442
443#[cfg(feature = "client")]
444impl Encodable for FetchRequest {
445    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
446        if version <= 14 {
447            types::Int32.encode(buf, &self.replica_id)?;
448        } else {
449            if self.replica_id != -1 {
450                bail!("A field is set that is not available on the selected protocol version");
451            }
452        }
453        types::Int32.encode(buf, &self.max_wait_ms)?;
454        types::Int32.encode(buf, &self.min_bytes)?;
455        if version >= 3 {
456            types::Int32.encode(buf, &self.max_bytes)?;
457        }
458        if version >= 4 {
459            types::Int8.encode(buf, &self.isolation_level)?;
460        }
461        if version >= 7 {
462            types::Int32.encode(buf, &self.session_id)?;
463        }
464        if version >= 7 {
465            types::Int32.encode(buf, &self.session_epoch)?;
466        }
467        if version >= 12 {
468            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
469        } else {
470            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
471        }
472        if version >= 7 {
473            if version >= 12 {
474                types::CompactArray(types::Struct { version })
475                    .encode(buf, &self.forgotten_topics_data)?;
476            } else {
477                types::Array(types::Struct { version }).encode(buf, &self.forgotten_topics_data)?;
478            }
479        } else {
480            if !self.forgotten_topics_data.is_empty() {
481                bail!("A field is set that is not available on the selected protocol version");
482            }
483        }
484        if version >= 11 {
485            if version >= 12 {
486                types::CompactString.encode(buf, &self.rack_id)?;
487            } else {
488                types::String.encode(buf, &self.rack_id)?;
489            }
490        }
491        if version >= 12 {
492            let mut num_tagged_fields = self.unknown_tagged_fields.len();
493            if !self.cluster_id.is_none() {
494                num_tagged_fields += 1;
495            }
496            if version >= 15 {
497                if &self.replica_state != &Default::default() {
498                    num_tagged_fields += 1;
499                }
500            }
501            if num_tagged_fields > std::u32::MAX as usize {
502                bail!(
503                    "Too many tagged fields to encode ({} fields)",
504                    num_tagged_fields
505                );
506            }
507            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
508            if !self.cluster_id.is_none() {
509                let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
510                if computed_size > std::u32::MAX as usize {
511                    bail!(
512                        "Tagged field is too large to encode ({} bytes)",
513                        computed_size
514                    );
515                }
516                types::UnsignedVarInt.encode(buf, 0)?;
517                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
518                types::CompactString.encode(buf, &self.cluster_id)?;
519            }
520            if version >= 15 {
521                if &self.replica_state != &Default::default() {
522                    let computed_size =
523                        types::Struct { version }.compute_size(&self.replica_state)?;
524                    if computed_size > std::u32::MAX as usize {
525                        bail!(
526                            "Tagged field is too large to encode ({} bytes)",
527                            computed_size
528                        );
529                    }
530                    types::UnsignedVarInt.encode(buf, 1)?;
531                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
532                    types::Struct { version }.encode(buf, &self.replica_state)?;
533                }
534            }
535            write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
536        }
537        Ok(())
538    }
539    fn compute_size(&self, version: i16) -> Result<usize> {
540        let mut total_size = 0;
541        if version <= 14 {
542            total_size += types::Int32.compute_size(&self.replica_id)?;
543        } else {
544            if self.replica_id != -1 {
545                bail!("A field is set that is not available on the selected protocol version");
546            }
547        }
548        total_size += types::Int32.compute_size(&self.max_wait_ms)?;
549        total_size += types::Int32.compute_size(&self.min_bytes)?;
550        if version >= 3 {
551            total_size += types::Int32.compute_size(&self.max_bytes)?;
552        }
553        if version >= 4 {
554            total_size += types::Int8.compute_size(&self.isolation_level)?;
555        }
556        if version >= 7 {
557            total_size += types::Int32.compute_size(&self.session_id)?;
558        }
559        if version >= 7 {
560            total_size += types::Int32.compute_size(&self.session_epoch)?;
561        }
562        if version >= 12 {
563            total_size +=
564                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
565        } else {
566            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
567        }
568        if version >= 7 {
569            if version >= 12 {
570                total_size += types::CompactArray(types::Struct { version })
571                    .compute_size(&self.forgotten_topics_data)?;
572            } else {
573                total_size += types::Array(types::Struct { version })
574                    .compute_size(&self.forgotten_topics_data)?;
575            }
576        } else {
577            if !self.forgotten_topics_data.is_empty() {
578                bail!("A field is set that is not available on the selected protocol version");
579            }
580        }
581        if version >= 11 {
582            if version >= 12 {
583                total_size += types::CompactString.compute_size(&self.rack_id)?;
584            } else {
585                total_size += types::String.compute_size(&self.rack_id)?;
586            }
587        }
588        if version >= 12 {
589            let mut num_tagged_fields = self.unknown_tagged_fields.len();
590            if !self.cluster_id.is_none() {
591                num_tagged_fields += 1;
592            }
593            if version >= 15 {
594                if &self.replica_state != &Default::default() {
595                    num_tagged_fields += 1;
596                }
597            }
598            if num_tagged_fields > std::u32::MAX as usize {
599                bail!(
600                    "Too many tagged fields to encode ({} fields)",
601                    num_tagged_fields
602                );
603            }
604            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
605            if !self.cluster_id.is_none() {
606                let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
607                if computed_size > std::u32::MAX as usize {
608                    bail!(
609                        "Tagged field is too large to encode ({} bytes)",
610                        computed_size
611                    );
612                }
613                total_size += types::UnsignedVarInt.compute_size(0)?;
614                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
615                total_size += computed_size;
616            }
617            if version >= 15 {
618                if &self.replica_state != &Default::default() {
619                    let computed_size =
620                        types::Struct { version }.compute_size(&self.replica_state)?;
621                    if computed_size > std::u32::MAX as usize {
622                        bail!(
623                            "Tagged field is too large to encode ({} bytes)",
624                            computed_size
625                        );
626                    }
627                    total_size += types::UnsignedVarInt.compute_size(1)?;
628                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
629                    total_size += computed_size;
630                }
631            }
632            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
633        }
634        Ok(total_size)
635    }
636}
637
638#[cfg(feature = "broker")]
639impl Decodable for FetchRequest {
640    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
641        let mut cluster_id = None;
642        let replica_id = if version <= 14 {
643            types::Int32.decode(buf)?
644        } else {
645            (-1).into()
646        };
647        let mut replica_state = Default::default();
648        let max_wait_ms = types::Int32.decode(buf)?;
649        let min_bytes = types::Int32.decode(buf)?;
650        let max_bytes = if version >= 3 {
651            types::Int32.decode(buf)?
652        } else {
653            0x7fffffff
654        };
655        let isolation_level = if version >= 4 {
656            types::Int8.decode(buf)?
657        } else {
658            0
659        };
660        let session_id = if version >= 7 {
661            types::Int32.decode(buf)?
662        } else {
663            0
664        };
665        let session_epoch = if version >= 7 {
666            types::Int32.decode(buf)?
667        } else {
668            -1
669        };
670        let topics = if version >= 12 {
671            types::CompactArray(types::Struct { version }).decode(buf)?
672        } else {
673            types::Array(types::Struct { version }).decode(buf)?
674        };
675        let forgotten_topics_data = if version >= 7 {
676            if version >= 12 {
677                types::CompactArray(types::Struct { version }).decode(buf)?
678            } else {
679                types::Array(types::Struct { version }).decode(buf)?
680            }
681        } else {
682            Default::default()
683        };
684        let rack_id = if version >= 11 {
685            if version >= 12 {
686                types::CompactString.decode(buf)?
687            } else {
688                types::String.decode(buf)?
689            }
690        } else {
691            StrBytes::from_static_str("")
692        };
693        let mut unknown_tagged_fields = BTreeMap::new();
694        if version >= 12 {
695            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
696            for _ in 0..num_tagged_fields {
697                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
698                let size: u32 = types::UnsignedVarInt.decode(buf)?;
699                match tag {
700                    0 => {
701                        cluster_id = types::CompactString.decode(buf)?;
702                    }
703                    1 => {
704                        if version >= 15 {
705                            replica_state = types::Struct { version }.decode(buf)?;
706                        } else {
707                            bail!("Tag {} is not valid for version {}", tag, version);
708                        }
709                    }
710                    _ => {
711                        let unknown_value = buf.try_get_bytes(size as usize)?;
712                        unknown_tagged_fields.insert(tag as i32, unknown_value);
713                    }
714                }
715            }
716        }
717        Ok(Self {
718            cluster_id,
719            replica_id,
720            replica_state,
721            max_wait_ms,
722            min_bytes,
723            max_bytes,
724            isolation_level,
725            session_id,
726            session_epoch,
727            topics,
728            forgotten_topics_data,
729            rack_id,
730            unknown_tagged_fields,
731        })
732    }
733}
734
735impl Default for FetchRequest {
736    fn default() -> Self {
737        Self {
738            cluster_id: None,
739            replica_id: (-1).into(),
740            replica_state: Default::default(),
741            max_wait_ms: 0,
742            min_bytes: 0,
743            max_bytes: 0x7fffffff,
744            isolation_level: 0,
745            session_id: 0,
746            session_epoch: -1,
747            topics: Default::default(),
748            forgotten_topics_data: Default::default(),
749            rack_id: StrBytes::from_static_str(""),
750            unknown_tagged_fields: BTreeMap::new(),
751        }
752    }
753}
754
755impl Message for FetchRequest {
756    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
757    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
758}
759
760/// Valid versions: 0-16
761#[non_exhaustive]
762#[derive(Debug, Clone, PartialEq)]
763pub struct FetchTopic {
764    /// The name of the topic to fetch.
765    ///
766    /// Supported API versions: 0-12
767    pub topic: super::TopicName,
768
769    /// The unique topic ID
770    ///
771    /// Supported API versions: 13-16
772    pub topic_id: Uuid,
773
774    /// The partitions to fetch.
775    ///
776    /// Supported API versions: 0-16
777    pub partitions: Vec<FetchPartition>,
778
779    /// Other tagged fields
780    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
781}
782
783impl FetchTopic {
784    /// Sets `topic` to the passed value.
785    ///
786    /// The name of the topic to fetch.
787    ///
788    /// Supported API versions: 0-12
789    pub fn with_topic(mut self, value: super::TopicName) -> Self {
790        self.topic = value;
791        self
792    }
793    /// Sets `topic_id` to the passed value.
794    ///
795    /// The unique topic ID
796    ///
797    /// Supported API versions: 13-16
798    pub fn with_topic_id(mut self, value: Uuid) -> Self {
799        self.topic_id = value;
800        self
801    }
802    /// Sets `partitions` to the passed value.
803    ///
804    /// The partitions to fetch.
805    ///
806    /// Supported API versions: 0-16
807    pub fn with_partitions(mut self, value: Vec<FetchPartition>) -> Self {
808        self.partitions = value;
809        self
810    }
811    /// Sets unknown_tagged_fields to the passed value.
812    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
813        self.unknown_tagged_fields = value;
814        self
815    }
816    /// Inserts an entry into unknown_tagged_fields.
817    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
818        self.unknown_tagged_fields.insert(key, value);
819        self
820    }
821}
822
823#[cfg(feature = "client")]
824impl Encodable for FetchTopic {
825    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
826        if version <= 12 {
827            if version >= 12 {
828                types::CompactString.encode(buf, &self.topic)?;
829            } else {
830                types::String.encode(buf, &self.topic)?;
831            }
832        }
833        if version >= 13 {
834            types::Uuid.encode(buf, &self.topic_id)?;
835        }
836        if version >= 12 {
837            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
838        } else {
839            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
840        }
841        if version >= 12 {
842            let num_tagged_fields = self.unknown_tagged_fields.len();
843            if num_tagged_fields > std::u32::MAX as usize {
844                bail!(
845                    "Too many tagged fields to encode ({} fields)",
846                    num_tagged_fields
847                );
848            }
849            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
850
851            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
852        }
853        Ok(())
854    }
855    fn compute_size(&self, version: i16) -> Result<usize> {
856        let mut total_size = 0;
857        if version <= 12 {
858            if version >= 12 {
859                total_size += types::CompactString.compute_size(&self.topic)?;
860            } else {
861                total_size += types::String.compute_size(&self.topic)?;
862            }
863        }
864        if version >= 13 {
865            total_size += types::Uuid.compute_size(&self.topic_id)?;
866        }
867        if version >= 12 {
868            total_size +=
869                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
870        } else {
871            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
872        }
873        if version >= 12 {
874            let num_tagged_fields = self.unknown_tagged_fields.len();
875            if num_tagged_fields > std::u32::MAX as usize {
876                bail!(
877                    "Too many tagged fields to encode ({} fields)",
878                    num_tagged_fields
879                );
880            }
881            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
882
883            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
884        }
885        Ok(total_size)
886    }
887}
888
889#[cfg(feature = "broker")]
890impl Decodable for FetchTopic {
891    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
892        let topic = if version <= 12 {
893            if version >= 12 {
894                types::CompactString.decode(buf)?
895            } else {
896                types::String.decode(buf)?
897            }
898        } else {
899            Default::default()
900        };
901        let topic_id = if version >= 13 {
902            types::Uuid.decode(buf)?
903        } else {
904            Uuid::nil()
905        };
906        let partitions = if version >= 12 {
907            types::CompactArray(types::Struct { version }).decode(buf)?
908        } else {
909            types::Array(types::Struct { version }).decode(buf)?
910        };
911        let mut unknown_tagged_fields = BTreeMap::new();
912        if version >= 12 {
913            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
914            for _ in 0..num_tagged_fields {
915                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
916                let size: u32 = types::UnsignedVarInt.decode(buf)?;
917                let unknown_value = buf.try_get_bytes(size as usize)?;
918                unknown_tagged_fields.insert(tag as i32, unknown_value);
919            }
920        }
921        Ok(Self {
922            topic,
923            topic_id,
924            partitions,
925            unknown_tagged_fields,
926        })
927    }
928}
929
930impl Default for FetchTopic {
931    fn default() -> Self {
932        Self {
933            topic: Default::default(),
934            topic_id: Uuid::nil(),
935            partitions: Default::default(),
936            unknown_tagged_fields: BTreeMap::new(),
937        }
938    }
939}
940
941impl Message for FetchTopic {
942    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
943    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
944}
945
946/// Valid versions: 0-16
947#[non_exhaustive]
948#[derive(Debug, Clone, PartialEq)]
949pub struct ForgottenTopic {
950    /// The topic name.
951    ///
952    /// Supported API versions: 7-12
953    pub topic: super::TopicName,
954
955    /// The unique topic ID
956    ///
957    /// Supported API versions: 13-16
958    pub topic_id: Uuid,
959
960    /// The partitions indexes to forget.
961    ///
962    /// Supported API versions: 7-16
963    pub partitions: Vec<i32>,
964
965    /// Other tagged fields
966    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
967}
968
969impl ForgottenTopic {
970    /// Sets `topic` to the passed value.
971    ///
972    /// The topic name.
973    ///
974    /// Supported API versions: 7-12
975    pub fn with_topic(mut self, value: super::TopicName) -> Self {
976        self.topic = value;
977        self
978    }
979    /// Sets `topic_id` to the passed value.
980    ///
981    /// The unique topic ID
982    ///
983    /// Supported API versions: 13-16
984    pub fn with_topic_id(mut self, value: Uuid) -> Self {
985        self.topic_id = value;
986        self
987    }
988    /// Sets `partitions` to the passed value.
989    ///
990    /// The partitions indexes to forget.
991    ///
992    /// Supported API versions: 7-16
993    pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
994        self.partitions = value;
995        self
996    }
997    /// Sets unknown_tagged_fields to the passed value.
998    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
999        self.unknown_tagged_fields = value;
1000        self
1001    }
1002    /// Inserts an entry into unknown_tagged_fields.
1003    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1004        self.unknown_tagged_fields.insert(key, value);
1005        self
1006    }
1007}
1008
1009#[cfg(feature = "client")]
1010impl Encodable for ForgottenTopic {
1011    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1012        if version >= 7 && version <= 12 {
1013            if version >= 12 {
1014                types::CompactString.encode(buf, &self.topic)?;
1015            } else {
1016                types::String.encode(buf, &self.topic)?;
1017            }
1018        }
1019        if version >= 13 {
1020            types::Uuid.encode(buf, &self.topic_id)?;
1021        }
1022        if version >= 7 {
1023            if version >= 12 {
1024                types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
1025            } else {
1026                types::Array(types::Int32).encode(buf, &self.partitions)?;
1027            }
1028        } else {
1029            if !self.partitions.is_empty() {
1030                bail!("A field is set that is not available on the selected protocol version");
1031            }
1032        }
1033        if version >= 12 {
1034            let num_tagged_fields = self.unknown_tagged_fields.len();
1035            if num_tagged_fields > std::u32::MAX as usize {
1036                bail!(
1037                    "Too many tagged fields to encode ({} fields)",
1038                    num_tagged_fields
1039                );
1040            }
1041            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1042
1043            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1044        }
1045        Ok(())
1046    }
1047    fn compute_size(&self, version: i16) -> Result<usize> {
1048        let mut total_size = 0;
1049        if version >= 7 && version <= 12 {
1050            if version >= 12 {
1051                total_size += types::CompactString.compute_size(&self.topic)?;
1052            } else {
1053                total_size += types::String.compute_size(&self.topic)?;
1054            }
1055        }
1056        if version >= 13 {
1057            total_size += types::Uuid.compute_size(&self.topic_id)?;
1058        }
1059        if version >= 7 {
1060            if version >= 12 {
1061                total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
1062            } else {
1063                total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
1064            }
1065        } else {
1066            if !self.partitions.is_empty() {
1067                bail!("A field is set that is not available on the selected protocol version");
1068            }
1069        }
1070        if version >= 12 {
1071            let num_tagged_fields = self.unknown_tagged_fields.len();
1072            if num_tagged_fields > std::u32::MAX as usize {
1073                bail!(
1074                    "Too many tagged fields to encode ({} fields)",
1075                    num_tagged_fields
1076                );
1077            }
1078            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1079
1080            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1081        }
1082        Ok(total_size)
1083    }
1084}
1085
1086#[cfg(feature = "broker")]
1087impl Decodable for ForgottenTopic {
1088    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1089        let topic = if version >= 7 && version <= 12 {
1090            if version >= 12 {
1091                types::CompactString.decode(buf)?
1092            } else {
1093                types::String.decode(buf)?
1094            }
1095        } else {
1096            Default::default()
1097        };
1098        let topic_id = if version >= 13 {
1099            types::Uuid.decode(buf)?
1100        } else {
1101            Uuid::nil()
1102        };
1103        let partitions = if version >= 7 {
1104            if version >= 12 {
1105                types::CompactArray(types::Int32).decode(buf)?
1106            } else {
1107                types::Array(types::Int32).decode(buf)?
1108            }
1109        } else {
1110            Default::default()
1111        };
1112        let mut unknown_tagged_fields = BTreeMap::new();
1113        if version >= 12 {
1114            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1115            for _ in 0..num_tagged_fields {
1116                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1117                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1118                let unknown_value = buf.try_get_bytes(size as usize)?;
1119                unknown_tagged_fields.insert(tag as i32, unknown_value);
1120            }
1121        }
1122        Ok(Self {
1123            topic,
1124            topic_id,
1125            partitions,
1126            unknown_tagged_fields,
1127        })
1128    }
1129}
1130
1131impl Default for ForgottenTopic {
1132    fn default() -> Self {
1133        Self {
1134            topic: Default::default(),
1135            topic_id: Uuid::nil(),
1136            partitions: Default::default(),
1137            unknown_tagged_fields: BTreeMap::new(),
1138        }
1139    }
1140}
1141
1142impl Message for ForgottenTopic {
1143    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1144    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1145}
1146
1147/// Valid versions: 0-16
1148#[non_exhaustive]
1149#[derive(Debug, Clone, PartialEq)]
1150pub struct ReplicaState {
1151    /// The replica ID of the follower, or -1 if this request is from a consumer.
1152    ///
1153    /// Supported API versions: 15-16
1154    pub replica_id: super::BrokerId,
1155
1156    /// The epoch of this follower, or -1 if not available.
1157    ///
1158    /// Supported API versions: 15-16
1159    pub replica_epoch: i64,
1160
1161    /// Other tagged fields
1162    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1163}
1164
1165impl ReplicaState {
1166    /// Sets `replica_id` to the passed value.
1167    ///
1168    /// The replica ID of the follower, or -1 if this request is from a consumer.
1169    ///
1170    /// Supported API versions: 15-16
1171    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
1172        self.replica_id = value;
1173        self
1174    }
1175    /// Sets `replica_epoch` to the passed value.
1176    ///
1177    /// The epoch of this follower, or -1 if not available.
1178    ///
1179    /// Supported API versions: 15-16
1180    pub fn with_replica_epoch(mut self, value: i64) -> Self {
1181        self.replica_epoch = value;
1182        self
1183    }
1184    /// Sets unknown_tagged_fields to the passed value.
1185    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1186        self.unknown_tagged_fields = value;
1187        self
1188    }
1189    /// Inserts an entry into unknown_tagged_fields.
1190    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1191        self.unknown_tagged_fields.insert(key, value);
1192        self
1193    }
1194}
1195
1196#[cfg(feature = "client")]
1197impl Encodable for ReplicaState {
1198    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1199        if version >= 15 {
1200            types::Int32.encode(buf, &self.replica_id)?;
1201        } else {
1202            if self.replica_id != -1 {
1203                bail!("A field is set that is not available on the selected protocol version");
1204            }
1205        }
1206        if version >= 15 {
1207            types::Int64.encode(buf, &self.replica_epoch)?;
1208        } else {
1209            if self.replica_epoch != -1 {
1210                bail!("A field is set that is not available on the selected protocol version");
1211            }
1212        }
1213        if version >= 12 {
1214            let num_tagged_fields = self.unknown_tagged_fields.len();
1215            if num_tagged_fields > std::u32::MAX as usize {
1216                bail!(
1217                    "Too many tagged fields to encode ({} fields)",
1218                    num_tagged_fields
1219                );
1220            }
1221            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1222
1223            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1224        }
1225        Ok(())
1226    }
1227    fn compute_size(&self, version: i16) -> Result<usize> {
1228        let mut total_size = 0;
1229        if version >= 15 {
1230            total_size += types::Int32.compute_size(&self.replica_id)?;
1231        } else {
1232            if self.replica_id != -1 {
1233                bail!("A field is set that is not available on the selected protocol version");
1234            }
1235        }
1236        if version >= 15 {
1237            total_size += types::Int64.compute_size(&self.replica_epoch)?;
1238        } else {
1239            if self.replica_epoch != -1 {
1240                bail!("A field is set that is not available on the selected protocol version");
1241            }
1242        }
1243        if version >= 12 {
1244            let num_tagged_fields = self.unknown_tagged_fields.len();
1245            if num_tagged_fields > std::u32::MAX as usize {
1246                bail!(
1247                    "Too many tagged fields to encode ({} fields)",
1248                    num_tagged_fields
1249                );
1250            }
1251            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1252
1253            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1254        }
1255        Ok(total_size)
1256    }
1257}
1258
1259#[cfg(feature = "broker")]
1260impl Decodable for ReplicaState {
1261    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1262        let replica_id = if version >= 15 {
1263            types::Int32.decode(buf)?
1264        } else {
1265            (-1).into()
1266        };
1267        let replica_epoch = if version >= 15 {
1268            types::Int64.decode(buf)?
1269        } else {
1270            -1
1271        };
1272        let mut unknown_tagged_fields = BTreeMap::new();
1273        if version >= 12 {
1274            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1275            for _ in 0..num_tagged_fields {
1276                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1277                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1278                let unknown_value = buf.try_get_bytes(size as usize)?;
1279                unknown_tagged_fields.insert(tag as i32, unknown_value);
1280            }
1281        }
1282        Ok(Self {
1283            replica_id,
1284            replica_epoch,
1285            unknown_tagged_fields,
1286        })
1287    }
1288}
1289
1290impl Default for ReplicaState {
1291    fn default() -> Self {
1292        Self {
1293            replica_id: (-1).into(),
1294            replica_epoch: -1,
1295            unknown_tagged_fields: BTreeMap::new(),
1296        }
1297    }
1298}
1299
1300impl Message for ReplicaState {
1301    const VERSIONS: VersionRange = VersionRange { min: 0, max: 16 };
1302    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1303}
1304
1305impl HeaderVersion for FetchRequest {
1306    fn header_version(version: i16) -> i16 {
1307        if version >= 12 {
1308            2
1309        } else {
1310            1
1311        }
1312    }
1313}