kafka_protocol/messages/
fetch_request.rs

1//! FetchRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FetchRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-17
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchPartition {
24    /// The partition index.
25    ///
26    /// Supported API versions: 0-17
27    pub partition: i32,
28
29    /// The current leader epoch of the partition.
30    ///
31    /// Supported API versions: 9-17
32    pub current_leader_epoch: i32,
33
34    /// The message offset.
35    ///
36    /// Supported API versions: 0-17
37    pub fetch_offset: i64,
38
39    /// The epoch of the last fetched record or -1 if there is none
40    ///
41    /// Supported API versions: 12-17
42    pub last_fetched_epoch: i32,
43
44    /// The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower.
45    ///
46    /// Supported API versions: 5-17
47    pub log_start_offset: i64,
48
49    /// The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored.
50    ///
51    /// Supported API versions: 0-17
52    pub partition_max_bytes: i32,
53
54    /// The directory id of the follower fetching
55    ///
56    /// Supported API versions: 17
57    pub replica_directory_id: Uuid,
58
59    /// Other tagged fields
60    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
61}
62
63impl FetchPartition {
64    /// Sets `partition` to the passed value.
65    ///
66    /// The partition index.
67    ///
68    /// Supported API versions: 0-17
69    pub fn with_partition(mut self, value: i32) -> Self {
70        self.partition = value;
71        self
72    }
73    /// Sets `current_leader_epoch` to the passed value.
74    ///
75    /// The current leader epoch of the partition.
76    ///
77    /// Supported API versions: 9-17
78    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
79        self.current_leader_epoch = value;
80        self
81    }
82    /// Sets `fetch_offset` to the passed value.
83    ///
84    /// The message offset.
85    ///
86    /// Supported API versions: 0-17
87    pub fn with_fetch_offset(mut self, value: i64) -> Self {
88        self.fetch_offset = value;
89        self
90    }
91    /// Sets `last_fetched_epoch` to the passed value.
92    ///
93    /// The epoch of the last fetched record or -1 if there is none
94    ///
95    /// Supported API versions: 12-17
96    pub fn with_last_fetched_epoch(mut self, value: i32) -> Self {
97        self.last_fetched_epoch = value;
98        self
99    }
100    /// Sets `log_start_offset` to the passed value.
101    ///
102    /// The earliest available offset of the follower replica.  The field is only used when the request is sent by the follower.
103    ///
104    /// Supported API versions: 5-17
105    pub fn with_log_start_offset(mut self, value: i64) -> Self {
106        self.log_start_offset = value;
107        self
108    }
109    /// Sets `partition_max_bytes` to the passed value.
110    ///
111    /// The maximum bytes to fetch from this partition.  See KIP-74 for cases where this limit may not be honored.
112    ///
113    /// Supported API versions: 0-17
114    pub fn with_partition_max_bytes(mut self, value: i32) -> Self {
115        self.partition_max_bytes = value;
116        self
117    }
118    /// Sets `replica_directory_id` to the passed value.
119    ///
120    /// The directory id of the follower fetching
121    ///
122    /// Supported API versions: 17
123    pub fn with_replica_directory_id(mut self, value: Uuid) -> Self {
124        self.replica_directory_id = value;
125        self
126    }
127    /// Sets unknown_tagged_fields to the passed value.
128    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
129        self.unknown_tagged_fields = value;
130        self
131    }
132    /// Inserts an entry into unknown_tagged_fields.
133    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
134        self.unknown_tagged_fields.insert(key, value);
135        self
136    }
137}
138
139#[cfg(feature = "client")]
140impl Encodable for FetchPartition {
141    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
142        if version < 0 || version > 17 {
143            bail!("specified version not supported by this message type");
144        }
145        types::Int32.encode(buf, &self.partition)?;
146        if version >= 9 {
147            types::Int32.encode(buf, &self.current_leader_epoch)?;
148        }
149        types::Int64.encode(buf, &self.fetch_offset)?;
150        if version >= 12 {
151            types::Int32.encode(buf, &self.last_fetched_epoch)?;
152        } else {
153            if self.last_fetched_epoch != -1 {
154                bail!("A field is set that is not available on the selected protocol version");
155            }
156        }
157        if version >= 5 {
158            types::Int64.encode(buf, &self.log_start_offset)?;
159        }
160        types::Int32.encode(buf, &self.partition_max_bytes)?;
161        if version >= 12 {
162            let mut num_tagged_fields = self.unknown_tagged_fields.len();
163            if version >= 17 {
164                if &self.replica_directory_id != &Uuid::nil() {
165                    num_tagged_fields += 1;
166                }
167            }
168            if num_tagged_fields > std::u32::MAX as usize {
169                bail!(
170                    "Too many tagged fields to encode ({} fields)",
171                    num_tagged_fields
172                );
173            }
174            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
175            if version >= 17 {
176                if &self.replica_directory_id != &Uuid::nil() {
177                    let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
178                    if computed_size > std::u32::MAX as usize {
179                        bail!(
180                            "Tagged field is too large to encode ({} bytes)",
181                            computed_size
182                        );
183                    }
184                    types::UnsignedVarInt.encode(buf, 0)?;
185                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
186                    types::Uuid.encode(buf, &self.replica_directory_id)?;
187                }
188            }
189            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
190        }
191        Ok(())
192    }
193    fn compute_size(&self, version: i16) -> Result<usize> {
194        let mut total_size = 0;
195        total_size += types::Int32.compute_size(&self.partition)?;
196        if version >= 9 {
197            total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
198        }
199        total_size += types::Int64.compute_size(&self.fetch_offset)?;
200        if version >= 12 {
201            total_size += types::Int32.compute_size(&self.last_fetched_epoch)?;
202        } else {
203            if self.last_fetched_epoch != -1 {
204                bail!("A field is set that is not available on the selected protocol version");
205            }
206        }
207        if version >= 5 {
208            total_size += types::Int64.compute_size(&self.log_start_offset)?;
209        }
210        total_size += types::Int32.compute_size(&self.partition_max_bytes)?;
211        if version >= 12 {
212            let mut num_tagged_fields = self.unknown_tagged_fields.len();
213            if version >= 17 {
214                if &self.replica_directory_id != &Uuid::nil() {
215                    num_tagged_fields += 1;
216                }
217            }
218            if num_tagged_fields > std::u32::MAX as usize {
219                bail!(
220                    "Too many tagged fields to encode ({} fields)",
221                    num_tagged_fields
222                );
223            }
224            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
225            if version >= 17 {
226                if &self.replica_directory_id != &Uuid::nil() {
227                    let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
228                    if computed_size > std::u32::MAX as usize {
229                        bail!(
230                            "Tagged field is too large to encode ({} bytes)",
231                            computed_size
232                        );
233                    }
234                    total_size += types::UnsignedVarInt.compute_size(0)?;
235                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
236                    total_size += computed_size;
237                }
238            }
239            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
240        }
241        Ok(total_size)
242    }
243}
244
245#[cfg(feature = "broker")]
246impl Decodable for FetchPartition {
247    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
248        if version < 0 || version > 17 {
249            bail!("specified version not supported by this message type");
250        }
251        let partition = types::Int32.decode(buf)?;
252        let current_leader_epoch = if version >= 9 {
253            types::Int32.decode(buf)?
254        } else {
255            -1
256        };
257        let fetch_offset = types::Int64.decode(buf)?;
258        let last_fetched_epoch = if version >= 12 {
259            types::Int32.decode(buf)?
260        } else {
261            -1
262        };
263        let log_start_offset = if version >= 5 {
264            types::Int64.decode(buf)?
265        } else {
266            -1
267        };
268        let partition_max_bytes = types::Int32.decode(buf)?;
269        let mut replica_directory_id = Uuid::nil();
270        let mut unknown_tagged_fields = BTreeMap::new();
271        if version >= 12 {
272            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
273            for _ in 0..num_tagged_fields {
274                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
275                let size: u32 = types::UnsignedVarInt.decode(buf)?;
276                match tag {
277                    0 => {
278                        if version >= 17 {
279                            replica_directory_id = types::Uuid.decode(buf)?;
280                        } else {
281                            bail!("Tag {} is not valid for version {}", tag, version);
282                        }
283                    }
284                    _ => {
285                        let unknown_value = buf.try_get_bytes(size as usize)?;
286                        unknown_tagged_fields.insert(tag as i32, unknown_value);
287                    }
288                }
289            }
290        }
291        Ok(Self {
292            partition,
293            current_leader_epoch,
294            fetch_offset,
295            last_fetched_epoch,
296            log_start_offset,
297            partition_max_bytes,
298            replica_directory_id,
299            unknown_tagged_fields,
300        })
301    }
302}
303
304impl Default for FetchPartition {
305    fn default() -> Self {
306        Self {
307            partition: 0,
308            current_leader_epoch: -1,
309            fetch_offset: 0,
310            last_fetched_epoch: -1,
311            log_start_offset: -1,
312            partition_max_bytes: 0,
313            replica_directory_id: Uuid::nil(),
314            unknown_tagged_fields: BTreeMap::new(),
315        }
316    }
317}
318
319impl Message for FetchPartition {
320    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
321    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
322}
323
324/// Valid versions: 0-17
325#[non_exhaustive]
326#[derive(Debug, Clone, PartialEq)]
327pub struct FetchRequest {
328    /// The clusterId if known. This is used to validate metadata fetches prior to broker registration.
329    ///
330    /// Supported API versions: 12-17
331    pub cluster_id: Option<StrBytes>,
332
333    /// The broker ID of the follower, of -1 if this request is from a consumer.
334    ///
335    /// Supported API versions: 0-14
336    pub replica_id: super::BrokerId,
337
338    ///
339    ///
340    /// Supported API versions: 15-17
341    pub replica_state: ReplicaState,
342
343    /// The maximum time in milliseconds to wait for the response.
344    ///
345    /// Supported API versions: 0-17
346    pub max_wait_ms: i32,
347
348    /// The minimum bytes to accumulate in the response.
349    ///
350    /// Supported API versions: 0-17
351    pub min_bytes: i32,
352
353    /// The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.
354    ///
355    /// Supported API versions: 3-17
356    pub max_bytes: i32,
357
358    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
359    ///
360    /// Supported API versions: 4-17
361    pub isolation_level: i8,
362
363    /// The fetch session ID.
364    ///
365    /// Supported API versions: 7-17
366    pub session_id: i32,
367
368    /// The fetch session epoch, which is used for ordering requests in a session.
369    ///
370    /// Supported API versions: 7-17
371    pub session_epoch: i32,
372
373    /// The topics to fetch.
374    ///
375    /// Supported API versions: 0-17
376    pub topics: Vec<FetchTopic>,
377
378    /// In an incremental fetch request, the partitions to remove.
379    ///
380    /// Supported API versions: 7-17
381    pub forgotten_topics_data: Vec<ForgottenTopic>,
382
383    /// Rack ID of the consumer making this request
384    ///
385    /// Supported API versions: 11-17
386    pub rack_id: StrBytes,
387
388    /// Other tagged fields
389    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
390}
391
392impl FetchRequest {
393    /// Sets `cluster_id` to the passed value.
394    ///
395    /// The clusterId if known. This is used to validate metadata fetches prior to broker registration.
396    ///
397    /// Supported API versions: 12-17
398    pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
399        self.cluster_id = value;
400        self
401    }
402    /// Sets `replica_id` to the passed value.
403    ///
404    /// The broker ID of the follower, of -1 if this request is from a consumer.
405    ///
406    /// Supported API versions: 0-14
407    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
408        self.replica_id = value;
409        self
410    }
411    /// Sets `replica_state` to the passed value.
412    ///
413    ///
414    ///
415    /// Supported API versions: 15-17
416    pub fn with_replica_state(mut self, value: ReplicaState) -> Self {
417        self.replica_state = value;
418        self
419    }
420    /// Sets `max_wait_ms` to the passed value.
421    ///
422    /// The maximum time in milliseconds to wait for the response.
423    ///
424    /// Supported API versions: 0-17
425    pub fn with_max_wait_ms(mut self, value: i32) -> Self {
426        self.max_wait_ms = value;
427        self
428    }
429    /// Sets `min_bytes` to the passed value.
430    ///
431    /// The minimum bytes to accumulate in the response.
432    ///
433    /// Supported API versions: 0-17
434    pub fn with_min_bytes(mut self, value: i32) -> Self {
435        self.min_bytes = value;
436        self
437    }
438    /// Sets `max_bytes` to the passed value.
439    ///
440    /// The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored.
441    ///
442    /// Supported API versions: 3-17
443    pub fn with_max_bytes(mut self, value: i32) -> Self {
444        self.max_bytes = value;
445        self
446    }
447    /// Sets `isolation_level` to the passed value.
448    ///
449    /// This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records
450    ///
451    /// Supported API versions: 4-17
452    pub fn with_isolation_level(mut self, value: i8) -> Self {
453        self.isolation_level = value;
454        self
455    }
456    /// Sets `session_id` to the passed value.
457    ///
458    /// The fetch session ID.
459    ///
460    /// Supported API versions: 7-17
461    pub fn with_session_id(mut self, value: i32) -> Self {
462        self.session_id = value;
463        self
464    }
465    /// Sets `session_epoch` to the passed value.
466    ///
467    /// The fetch session epoch, which is used for ordering requests in a session.
468    ///
469    /// Supported API versions: 7-17
470    pub fn with_session_epoch(mut self, value: i32) -> Self {
471        self.session_epoch = value;
472        self
473    }
474    /// Sets `topics` to the passed value.
475    ///
476    /// The topics to fetch.
477    ///
478    /// Supported API versions: 0-17
479    pub fn with_topics(mut self, value: Vec<FetchTopic>) -> Self {
480        self.topics = value;
481        self
482    }
483    /// Sets `forgotten_topics_data` to the passed value.
484    ///
485    /// In an incremental fetch request, the partitions to remove.
486    ///
487    /// Supported API versions: 7-17
488    pub fn with_forgotten_topics_data(mut self, value: Vec<ForgottenTopic>) -> Self {
489        self.forgotten_topics_data = value;
490        self
491    }
492    /// Sets `rack_id` to the passed value.
493    ///
494    /// Rack ID of the consumer making this request
495    ///
496    /// Supported API versions: 11-17
497    pub fn with_rack_id(mut self, value: StrBytes) -> Self {
498        self.rack_id = value;
499        self
500    }
501    /// Sets unknown_tagged_fields to the passed value.
502    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
503        self.unknown_tagged_fields = value;
504        self
505    }
506    /// Inserts an entry into unknown_tagged_fields.
507    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
508        self.unknown_tagged_fields.insert(key, value);
509        self
510    }
511}
512
513#[cfg(feature = "client")]
514impl Encodable for FetchRequest {
515    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
516        if version < 0 || version > 17 {
517            bail!("specified version not supported by this message type");
518        }
519        if version <= 14 {
520            types::Int32.encode(buf, &self.replica_id)?;
521        } else {
522            if self.replica_id != -1 {
523                bail!("A field is set that is not available on the selected protocol version");
524            }
525        }
526        types::Int32.encode(buf, &self.max_wait_ms)?;
527        types::Int32.encode(buf, &self.min_bytes)?;
528        if version >= 3 {
529            types::Int32.encode(buf, &self.max_bytes)?;
530        }
531        if version >= 4 {
532            types::Int8.encode(buf, &self.isolation_level)?;
533        }
534        if version >= 7 {
535            types::Int32.encode(buf, &self.session_id)?;
536        }
537        if version >= 7 {
538            types::Int32.encode(buf, &self.session_epoch)?;
539        }
540        if version >= 12 {
541            types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
542        } else {
543            types::Array(types::Struct { version }).encode(buf, &self.topics)?;
544        }
545        if version >= 7 {
546            if version >= 12 {
547                types::CompactArray(types::Struct { version })
548                    .encode(buf, &self.forgotten_topics_data)?;
549            } else {
550                types::Array(types::Struct { version }).encode(buf, &self.forgotten_topics_data)?;
551            }
552        } else {
553            if !self.forgotten_topics_data.is_empty() {
554                bail!("A field is set that is not available on the selected protocol version");
555            }
556        }
557        if version >= 11 {
558            if version >= 12 {
559                types::CompactString.encode(buf, &self.rack_id)?;
560            } else {
561                types::String.encode(buf, &self.rack_id)?;
562            }
563        }
564        if version >= 12 {
565            let mut num_tagged_fields = self.unknown_tagged_fields.len();
566            if !self.cluster_id.is_none() {
567                num_tagged_fields += 1;
568            }
569            if version >= 15 {
570                if &self.replica_state != &Default::default() {
571                    num_tagged_fields += 1;
572                }
573            }
574            if num_tagged_fields > std::u32::MAX as usize {
575                bail!(
576                    "Too many tagged fields to encode ({} fields)",
577                    num_tagged_fields
578                );
579            }
580            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
581            if !self.cluster_id.is_none() {
582                let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
583                if computed_size > std::u32::MAX as usize {
584                    bail!(
585                        "Tagged field is too large to encode ({} bytes)",
586                        computed_size
587                    );
588                }
589                types::UnsignedVarInt.encode(buf, 0)?;
590                types::UnsignedVarInt.encode(buf, computed_size as u32)?;
591                types::CompactString.encode(buf, &self.cluster_id)?;
592            }
593            if version >= 15 {
594                if &self.replica_state != &Default::default() {
595                    let computed_size =
596                        types::Struct { version }.compute_size(&self.replica_state)?;
597                    if computed_size > std::u32::MAX as usize {
598                        bail!(
599                            "Tagged field is too large to encode ({} bytes)",
600                            computed_size
601                        );
602                    }
603                    types::UnsignedVarInt.encode(buf, 1)?;
604                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
605                    types::Struct { version }.encode(buf, &self.replica_state)?;
606                }
607            }
608            write_unknown_tagged_fields(buf, 2.., &self.unknown_tagged_fields)?;
609        }
610        Ok(())
611    }
612    fn compute_size(&self, version: i16) -> Result<usize> {
613        let mut total_size = 0;
614        if version <= 14 {
615            total_size += types::Int32.compute_size(&self.replica_id)?;
616        } else {
617            if self.replica_id != -1 {
618                bail!("A field is set that is not available on the selected protocol version");
619            }
620        }
621        total_size += types::Int32.compute_size(&self.max_wait_ms)?;
622        total_size += types::Int32.compute_size(&self.min_bytes)?;
623        if version >= 3 {
624            total_size += types::Int32.compute_size(&self.max_bytes)?;
625        }
626        if version >= 4 {
627            total_size += types::Int8.compute_size(&self.isolation_level)?;
628        }
629        if version >= 7 {
630            total_size += types::Int32.compute_size(&self.session_id)?;
631        }
632        if version >= 7 {
633            total_size += types::Int32.compute_size(&self.session_epoch)?;
634        }
635        if version >= 12 {
636            total_size +=
637                types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
638        } else {
639            total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
640        }
641        if version >= 7 {
642            if version >= 12 {
643                total_size += types::CompactArray(types::Struct { version })
644                    .compute_size(&self.forgotten_topics_data)?;
645            } else {
646                total_size += types::Array(types::Struct { version })
647                    .compute_size(&self.forgotten_topics_data)?;
648            }
649        } else {
650            if !self.forgotten_topics_data.is_empty() {
651                bail!("A field is set that is not available on the selected protocol version");
652            }
653        }
654        if version >= 11 {
655            if version >= 12 {
656                total_size += types::CompactString.compute_size(&self.rack_id)?;
657            } else {
658                total_size += types::String.compute_size(&self.rack_id)?;
659            }
660        }
661        if version >= 12 {
662            let mut num_tagged_fields = self.unknown_tagged_fields.len();
663            if !self.cluster_id.is_none() {
664                num_tagged_fields += 1;
665            }
666            if version >= 15 {
667                if &self.replica_state != &Default::default() {
668                    num_tagged_fields += 1;
669                }
670            }
671            if num_tagged_fields > std::u32::MAX as usize {
672                bail!(
673                    "Too many tagged fields to encode ({} fields)",
674                    num_tagged_fields
675                );
676            }
677            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
678            if !self.cluster_id.is_none() {
679                let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
680                if computed_size > std::u32::MAX as usize {
681                    bail!(
682                        "Tagged field is too large to encode ({} bytes)",
683                        computed_size
684                    );
685                }
686                total_size += types::UnsignedVarInt.compute_size(0)?;
687                total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
688                total_size += computed_size;
689            }
690            if version >= 15 {
691                if &self.replica_state != &Default::default() {
692                    let computed_size =
693                        types::Struct { version }.compute_size(&self.replica_state)?;
694                    if computed_size > std::u32::MAX as usize {
695                        bail!(
696                            "Tagged field is too large to encode ({} bytes)",
697                            computed_size
698                        );
699                    }
700                    total_size += types::UnsignedVarInt.compute_size(1)?;
701                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
702                    total_size += computed_size;
703                }
704            }
705            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
706        }
707        Ok(total_size)
708    }
709}
710
711#[cfg(feature = "broker")]
712impl Decodable for FetchRequest {
713    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
714        if version < 0 || version > 17 {
715            bail!("specified version not supported by this message type");
716        }
717        let mut cluster_id = None;
718        let replica_id = if version <= 14 {
719            types::Int32.decode(buf)?
720        } else {
721            (-1).into()
722        };
723        let mut replica_state = Default::default();
724        let max_wait_ms = types::Int32.decode(buf)?;
725        let min_bytes = types::Int32.decode(buf)?;
726        let max_bytes = if version >= 3 {
727            types::Int32.decode(buf)?
728        } else {
729            0x7fffffff
730        };
731        let isolation_level = if version >= 4 {
732            types::Int8.decode(buf)?
733        } else {
734            0
735        };
736        let session_id = if version >= 7 {
737            types::Int32.decode(buf)?
738        } else {
739            0
740        };
741        let session_epoch = if version >= 7 {
742            types::Int32.decode(buf)?
743        } else {
744            -1
745        };
746        let topics = if version >= 12 {
747            types::CompactArray(types::Struct { version }).decode(buf)?
748        } else {
749            types::Array(types::Struct { version }).decode(buf)?
750        };
751        let forgotten_topics_data = if version >= 7 {
752            if version >= 12 {
753                types::CompactArray(types::Struct { version }).decode(buf)?
754            } else {
755                types::Array(types::Struct { version }).decode(buf)?
756            }
757        } else {
758            Default::default()
759        };
760        let rack_id = if version >= 11 {
761            if version >= 12 {
762                types::CompactString.decode(buf)?
763            } else {
764                types::String.decode(buf)?
765            }
766        } else {
767            StrBytes::from_static_str("")
768        };
769        let mut unknown_tagged_fields = BTreeMap::new();
770        if version >= 12 {
771            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
772            for _ in 0..num_tagged_fields {
773                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
774                let size: u32 = types::UnsignedVarInt.decode(buf)?;
775                match tag {
776                    0 => {
777                        cluster_id = types::CompactString.decode(buf)?;
778                    }
779                    1 => {
780                        if version >= 15 {
781                            replica_state = types::Struct { version }.decode(buf)?;
782                        } else {
783                            bail!("Tag {} is not valid for version {}", tag, version);
784                        }
785                    }
786                    _ => {
787                        let unknown_value = buf.try_get_bytes(size as usize)?;
788                        unknown_tagged_fields.insert(tag as i32, unknown_value);
789                    }
790                }
791            }
792        }
793        Ok(Self {
794            cluster_id,
795            replica_id,
796            replica_state,
797            max_wait_ms,
798            min_bytes,
799            max_bytes,
800            isolation_level,
801            session_id,
802            session_epoch,
803            topics,
804            forgotten_topics_data,
805            rack_id,
806            unknown_tagged_fields,
807        })
808    }
809}
810
811impl Default for FetchRequest {
812    fn default() -> Self {
813        Self {
814            cluster_id: None,
815            replica_id: (-1).into(),
816            replica_state: Default::default(),
817            max_wait_ms: 0,
818            min_bytes: 0,
819            max_bytes: 0x7fffffff,
820            isolation_level: 0,
821            session_id: 0,
822            session_epoch: -1,
823            topics: Default::default(),
824            forgotten_topics_data: Default::default(),
825            rack_id: StrBytes::from_static_str(""),
826            unknown_tagged_fields: BTreeMap::new(),
827        }
828    }
829}
830
831impl Message for FetchRequest {
832    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
833    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
834}
835
836/// Valid versions: 0-17
837#[non_exhaustive]
838#[derive(Debug, Clone, PartialEq)]
839pub struct FetchTopic {
840    /// The name of the topic to fetch.
841    ///
842    /// Supported API versions: 0-12
843    pub topic: super::TopicName,
844
845    /// The unique topic ID
846    ///
847    /// Supported API versions: 13-17
848    pub topic_id: Uuid,
849
850    /// The partitions to fetch.
851    ///
852    /// Supported API versions: 0-17
853    pub partitions: Vec<FetchPartition>,
854
855    /// Other tagged fields
856    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
857}
858
859impl FetchTopic {
860    /// Sets `topic` to the passed value.
861    ///
862    /// The name of the topic to fetch.
863    ///
864    /// Supported API versions: 0-12
865    pub fn with_topic(mut self, value: super::TopicName) -> Self {
866        self.topic = value;
867        self
868    }
869    /// Sets `topic_id` to the passed value.
870    ///
871    /// The unique topic ID
872    ///
873    /// Supported API versions: 13-17
874    pub fn with_topic_id(mut self, value: Uuid) -> Self {
875        self.topic_id = value;
876        self
877    }
878    /// Sets `partitions` to the passed value.
879    ///
880    /// The partitions to fetch.
881    ///
882    /// Supported API versions: 0-17
883    pub fn with_partitions(mut self, value: Vec<FetchPartition>) -> Self {
884        self.partitions = value;
885        self
886    }
887    /// Sets unknown_tagged_fields to the passed value.
888    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
889        self.unknown_tagged_fields = value;
890        self
891    }
892    /// Inserts an entry into unknown_tagged_fields.
893    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
894        self.unknown_tagged_fields.insert(key, value);
895        self
896    }
897}
898
899#[cfg(feature = "client")]
900impl Encodable for FetchTopic {
901    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
902        if version < 0 || version > 17 {
903            bail!("specified version not supported by this message type");
904        }
905        if version <= 12 {
906            if version >= 12 {
907                types::CompactString.encode(buf, &self.topic)?;
908            } else {
909                types::String.encode(buf, &self.topic)?;
910            }
911        }
912        if version >= 13 {
913            types::Uuid.encode(buf, &self.topic_id)?;
914        }
915        if version >= 12 {
916            types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
917        } else {
918            types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
919        }
920        if version >= 12 {
921            let num_tagged_fields = self.unknown_tagged_fields.len();
922            if num_tagged_fields > std::u32::MAX as usize {
923                bail!(
924                    "Too many tagged fields to encode ({} fields)",
925                    num_tagged_fields
926                );
927            }
928            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
929
930            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
931        }
932        Ok(())
933    }
934    fn compute_size(&self, version: i16) -> Result<usize> {
935        let mut total_size = 0;
936        if version <= 12 {
937            if version >= 12 {
938                total_size += types::CompactString.compute_size(&self.topic)?;
939            } else {
940                total_size += types::String.compute_size(&self.topic)?;
941            }
942        }
943        if version >= 13 {
944            total_size += types::Uuid.compute_size(&self.topic_id)?;
945        }
946        if version >= 12 {
947            total_size +=
948                types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
949        } else {
950            total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
951        }
952        if version >= 12 {
953            let num_tagged_fields = self.unknown_tagged_fields.len();
954            if num_tagged_fields > std::u32::MAX as usize {
955                bail!(
956                    "Too many tagged fields to encode ({} fields)",
957                    num_tagged_fields
958                );
959            }
960            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
961
962            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
963        }
964        Ok(total_size)
965    }
966}
967
968#[cfg(feature = "broker")]
969impl Decodable for FetchTopic {
970    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
971        if version < 0 || version > 17 {
972            bail!("specified version not supported by this message type");
973        }
974        let topic = if version <= 12 {
975            if version >= 12 {
976                types::CompactString.decode(buf)?
977            } else {
978                types::String.decode(buf)?
979            }
980        } else {
981            Default::default()
982        };
983        let topic_id = if version >= 13 {
984            types::Uuid.decode(buf)?
985        } else {
986            Uuid::nil()
987        };
988        let partitions = if version >= 12 {
989            types::CompactArray(types::Struct { version }).decode(buf)?
990        } else {
991            types::Array(types::Struct { version }).decode(buf)?
992        };
993        let mut unknown_tagged_fields = BTreeMap::new();
994        if version >= 12 {
995            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
996            for _ in 0..num_tagged_fields {
997                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
998                let size: u32 = types::UnsignedVarInt.decode(buf)?;
999                let unknown_value = buf.try_get_bytes(size as usize)?;
1000                unknown_tagged_fields.insert(tag as i32, unknown_value);
1001            }
1002        }
1003        Ok(Self {
1004            topic,
1005            topic_id,
1006            partitions,
1007            unknown_tagged_fields,
1008        })
1009    }
1010}
1011
1012impl Default for FetchTopic {
1013    fn default() -> Self {
1014        Self {
1015            topic: Default::default(),
1016            topic_id: Uuid::nil(),
1017            partitions: Default::default(),
1018            unknown_tagged_fields: BTreeMap::new(),
1019        }
1020    }
1021}
1022
1023impl Message for FetchTopic {
1024    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1025    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1026}
1027
1028/// Valid versions: 0-17
1029#[non_exhaustive]
1030#[derive(Debug, Clone, PartialEq)]
1031pub struct ForgottenTopic {
1032    /// The topic name.
1033    ///
1034    /// Supported API versions: 7-12
1035    pub topic: super::TopicName,
1036
1037    /// The unique topic ID
1038    ///
1039    /// Supported API versions: 13-17
1040    pub topic_id: Uuid,
1041
1042    /// The partitions indexes to forget.
1043    ///
1044    /// Supported API versions: 7-17
1045    pub partitions: Vec<i32>,
1046
1047    /// Other tagged fields
1048    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1049}
1050
1051impl ForgottenTopic {
1052    /// Sets `topic` to the passed value.
1053    ///
1054    /// The topic name.
1055    ///
1056    /// Supported API versions: 7-12
1057    pub fn with_topic(mut self, value: super::TopicName) -> Self {
1058        self.topic = value;
1059        self
1060    }
1061    /// Sets `topic_id` to the passed value.
1062    ///
1063    /// The unique topic ID
1064    ///
1065    /// Supported API versions: 13-17
1066    pub fn with_topic_id(mut self, value: Uuid) -> Self {
1067        self.topic_id = value;
1068        self
1069    }
1070    /// Sets `partitions` to the passed value.
1071    ///
1072    /// The partitions indexes to forget.
1073    ///
1074    /// Supported API versions: 7-17
1075    pub fn with_partitions(mut self, value: Vec<i32>) -> Self {
1076        self.partitions = value;
1077        self
1078    }
1079    /// Sets unknown_tagged_fields to the passed value.
1080    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1081        self.unknown_tagged_fields = value;
1082        self
1083    }
1084    /// Inserts an entry into unknown_tagged_fields.
1085    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1086        self.unknown_tagged_fields.insert(key, value);
1087        self
1088    }
1089}
1090
1091#[cfg(feature = "client")]
1092impl Encodable for ForgottenTopic {
1093    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1094        if version < 0 || version > 17 {
1095            bail!("specified version not supported by this message type");
1096        }
1097        if version >= 7 && version <= 12 {
1098            if version >= 12 {
1099                types::CompactString.encode(buf, &self.topic)?;
1100            } else {
1101                types::String.encode(buf, &self.topic)?;
1102            }
1103        }
1104        if version >= 13 {
1105            types::Uuid.encode(buf, &self.topic_id)?;
1106        }
1107        if version >= 7 {
1108            if version >= 12 {
1109                types::CompactArray(types::Int32).encode(buf, &self.partitions)?;
1110            } else {
1111                types::Array(types::Int32).encode(buf, &self.partitions)?;
1112            }
1113        } else {
1114            if !self.partitions.is_empty() {
1115                bail!("A field is set that is not available on the selected protocol version");
1116            }
1117        }
1118        if version >= 12 {
1119            let num_tagged_fields = self.unknown_tagged_fields.len();
1120            if num_tagged_fields > std::u32::MAX as usize {
1121                bail!(
1122                    "Too many tagged fields to encode ({} fields)",
1123                    num_tagged_fields
1124                );
1125            }
1126            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1127
1128            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1129        }
1130        Ok(())
1131    }
1132    fn compute_size(&self, version: i16) -> Result<usize> {
1133        let mut total_size = 0;
1134        if version >= 7 && version <= 12 {
1135            if version >= 12 {
1136                total_size += types::CompactString.compute_size(&self.topic)?;
1137            } else {
1138                total_size += types::String.compute_size(&self.topic)?;
1139            }
1140        }
1141        if version >= 13 {
1142            total_size += types::Uuid.compute_size(&self.topic_id)?;
1143        }
1144        if version >= 7 {
1145            if version >= 12 {
1146                total_size += types::CompactArray(types::Int32).compute_size(&self.partitions)?;
1147            } else {
1148                total_size += types::Array(types::Int32).compute_size(&self.partitions)?;
1149            }
1150        } else {
1151            if !self.partitions.is_empty() {
1152                bail!("A field is set that is not available on the selected protocol version");
1153            }
1154        }
1155        if version >= 12 {
1156            let num_tagged_fields = self.unknown_tagged_fields.len();
1157            if num_tagged_fields > std::u32::MAX as usize {
1158                bail!(
1159                    "Too many tagged fields to encode ({} fields)",
1160                    num_tagged_fields
1161                );
1162            }
1163            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1164
1165            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1166        }
1167        Ok(total_size)
1168    }
1169}
1170
1171#[cfg(feature = "broker")]
1172impl Decodable for ForgottenTopic {
1173    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1174        if version < 0 || version > 17 {
1175            bail!("specified version not supported by this message type");
1176        }
1177        let topic = if version >= 7 && version <= 12 {
1178            if version >= 12 {
1179                types::CompactString.decode(buf)?
1180            } else {
1181                types::String.decode(buf)?
1182            }
1183        } else {
1184            Default::default()
1185        };
1186        let topic_id = if version >= 13 {
1187            types::Uuid.decode(buf)?
1188        } else {
1189            Uuid::nil()
1190        };
1191        let partitions = if version >= 7 {
1192            if version >= 12 {
1193                types::CompactArray(types::Int32).decode(buf)?
1194            } else {
1195                types::Array(types::Int32).decode(buf)?
1196            }
1197        } else {
1198            Default::default()
1199        };
1200        let mut unknown_tagged_fields = BTreeMap::new();
1201        if version >= 12 {
1202            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1203            for _ in 0..num_tagged_fields {
1204                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1205                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1206                let unknown_value = buf.try_get_bytes(size as usize)?;
1207                unknown_tagged_fields.insert(tag as i32, unknown_value);
1208            }
1209        }
1210        Ok(Self {
1211            topic,
1212            topic_id,
1213            partitions,
1214            unknown_tagged_fields,
1215        })
1216    }
1217}
1218
1219impl Default for ForgottenTopic {
1220    fn default() -> Self {
1221        Self {
1222            topic: Default::default(),
1223            topic_id: Uuid::nil(),
1224            partitions: Default::default(),
1225            unknown_tagged_fields: BTreeMap::new(),
1226        }
1227    }
1228}
1229
1230impl Message for ForgottenTopic {
1231    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1232    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1233}
1234
1235/// Valid versions: 0-17
1236#[non_exhaustive]
1237#[derive(Debug, Clone, PartialEq)]
1238pub struct ReplicaState {
1239    /// The replica ID of the follower, or -1 if this request is from a consumer.
1240    ///
1241    /// Supported API versions: 15-17
1242    pub replica_id: super::BrokerId,
1243
1244    /// The epoch of this follower, or -1 if not available.
1245    ///
1246    /// Supported API versions: 15-17
1247    pub replica_epoch: i64,
1248
1249    /// Other tagged fields
1250    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1251}
1252
1253impl ReplicaState {
1254    /// Sets `replica_id` to the passed value.
1255    ///
1256    /// The replica ID of the follower, or -1 if this request is from a consumer.
1257    ///
1258    /// Supported API versions: 15-17
1259    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
1260        self.replica_id = value;
1261        self
1262    }
1263    /// Sets `replica_epoch` to the passed value.
1264    ///
1265    /// The epoch of this follower, or -1 if not available.
1266    ///
1267    /// Supported API versions: 15-17
1268    pub fn with_replica_epoch(mut self, value: i64) -> Self {
1269        self.replica_epoch = value;
1270        self
1271    }
1272    /// Sets unknown_tagged_fields to the passed value.
1273    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1274        self.unknown_tagged_fields = value;
1275        self
1276    }
1277    /// Inserts an entry into unknown_tagged_fields.
1278    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1279        self.unknown_tagged_fields.insert(key, value);
1280        self
1281    }
1282}
1283
1284#[cfg(feature = "client")]
1285impl Encodable for ReplicaState {
1286    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1287        if version < 0 || version > 17 {
1288            bail!("specified version not supported by this message type");
1289        }
1290        if version >= 15 {
1291            types::Int32.encode(buf, &self.replica_id)?;
1292        } else {
1293            if self.replica_id != -1 {
1294                bail!("A field is set that is not available on the selected protocol version");
1295            }
1296        }
1297        if version >= 15 {
1298            types::Int64.encode(buf, &self.replica_epoch)?;
1299        } else {
1300            if self.replica_epoch != -1 {
1301                bail!("A field is set that is not available on the selected protocol version");
1302            }
1303        }
1304        if version >= 12 {
1305            let num_tagged_fields = self.unknown_tagged_fields.len();
1306            if num_tagged_fields > std::u32::MAX as usize {
1307                bail!(
1308                    "Too many tagged fields to encode ({} fields)",
1309                    num_tagged_fields
1310                );
1311            }
1312            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1313
1314            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1315        }
1316        Ok(())
1317    }
1318    fn compute_size(&self, version: i16) -> Result<usize> {
1319        let mut total_size = 0;
1320        if version >= 15 {
1321            total_size += types::Int32.compute_size(&self.replica_id)?;
1322        } else {
1323            if self.replica_id != -1 {
1324                bail!("A field is set that is not available on the selected protocol version");
1325            }
1326        }
1327        if version >= 15 {
1328            total_size += types::Int64.compute_size(&self.replica_epoch)?;
1329        } else {
1330            if self.replica_epoch != -1 {
1331                bail!("A field is set that is not available on the selected protocol version");
1332            }
1333        }
1334        if version >= 12 {
1335            let num_tagged_fields = self.unknown_tagged_fields.len();
1336            if num_tagged_fields > std::u32::MAX as usize {
1337                bail!(
1338                    "Too many tagged fields to encode ({} fields)",
1339                    num_tagged_fields
1340                );
1341            }
1342            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1343
1344            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1345        }
1346        Ok(total_size)
1347    }
1348}
1349
1350#[cfg(feature = "broker")]
1351impl Decodable for ReplicaState {
1352    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1353        if version < 0 || version > 17 {
1354            bail!("specified version not supported by this message type");
1355        }
1356        let replica_id = if version >= 15 {
1357            types::Int32.decode(buf)?
1358        } else {
1359            (-1).into()
1360        };
1361        let replica_epoch = if version >= 15 {
1362            types::Int64.decode(buf)?
1363        } else {
1364            -1
1365        };
1366        let mut unknown_tagged_fields = BTreeMap::new();
1367        if version >= 12 {
1368            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1369            for _ in 0..num_tagged_fields {
1370                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1371                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1372                let unknown_value = buf.try_get_bytes(size as usize)?;
1373                unknown_tagged_fields.insert(tag as i32, unknown_value);
1374            }
1375        }
1376        Ok(Self {
1377            replica_id,
1378            replica_epoch,
1379            unknown_tagged_fields,
1380        })
1381    }
1382}
1383
1384impl Default for ReplicaState {
1385    fn default() -> Self {
1386        Self {
1387            replica_id: (-1).into(),
1388            replica_epoch: -1,
1389            unknown_tagged_fields: BTreeMap::new(),
1390        }
1391    }
1392}
1393
1394impl Message for ReplicaState {
1395    const VERSIONS: VersionRange = VersionRange { min: 0, max: 17 };
1396    const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 3 });
1397}
1398
1399impl HeaderVersion for FetchRequest {
1400    fn header_version(version: i16) -> i16 {
1401        if version >= 12 {
1402            2
1403        } else {
1404            1
1405        }
1406    }
1407}