kafka_protocol/messages/
fetch_snapshot_request.rs

1//! FetchSnapshotRequest
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/FetchSnapshotRequest.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchSnapshotRequest {
24    /// The clusterId if known, this is used to validate metadata fetches prior to broker registration
25    ///
26    /// Supported API versions: 0
27    pub cluster_id: Option<StrBytes>,
28
29    /// The broker ID of the follower
30    ///
31    /// Supported API versions: 0
32    pub replica_id: super::BrokerId,
33
34    /// The maximum bytes to fetch from all of the snapshots
35    ///
36    /// Supported API versions: 0
37    pub max_bytes: i32,
38
39    /// The topics to fetch
40    ///
41    /// Supported API versions: 0
42    pub topics: Vec<TopicSnapshot>,
43
44    /// Other tagged fields
45    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl FetchSnapshotRequest {
49    /// Sets `cluster_id` to the passed value.
50    ///
51    /// The clusterId if known, this is used to validate metadata fetches prior to broker registration
52    ///
53    /// Supported API versions: 0
54    pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
55        self.cluster_id = value;
56        self
57    }
58    /// Sets `replica_id` to the passed value.
59    ///
60    /// The broker ID of the follower
61    ///
62    /// Supported API versions: 0
63    pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
64        self.replica_id = value;
65        self
66    }
67    /// Sets `max_bytes` to the passed value.
68    ///
69    /// The maximum bytes to fetch from all of the snapshots
70    ///
71    /// Supported API versions: 0
72    pub fn with_max_bytes(mut self, value: i32) -> Self {
73        self.max_bytes = value;
74        self
75    }
76    /// Sets `topics` to the passed value.
77    ///
78    /// The topics to fetch
79    ///
80    /// Supported API versions: 0
81    pub fn with_topics(mut self, value: Vec<TopicSnapshot>) -> Self {
82        self.topics = value;
83        self
84    }
85    /// Sets unknown_tagged_fields to the passed value.
86    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87        self.unknown_tagged_fields = value;
88        self
89    }
90    /// Inserts an entry into unknown_tagged_fields.
91    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92        self.unknown_tagged_fields.insert(key, value);
93        self
94    }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for FetchSnapshotRequest {
99    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100        types::Int32.encode(buf, &self.replica_id)?;
101        types::Int32.encode(buf, &self.max_bytes)?;
102        types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
103        let mut num_tagged_fields = self.unknown_tagged_fields.len();
104        if !self.cluster_id.is_none() {
105            num_tagged_fields += 1;
106        }
107        if num_tagged_fields > std::u32::MAX as usize {
108            bail!(
109                "Too many tagged fields to encode ({} fields)",
110                num_tagged_fields
111            );
112        }
113        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
114        if !self.cluster_id.is_none() {
115            let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
116            if computed_size > std::u32::MAX as usize {
117                bail!(
118                    "Tagged field is too large to encode ({} bytes)",
119                    computed_size
120                );
121            }
122            types::UnsignedVarInt.encode(buf, 0)?;
123            types::UnsignedVarInt.encode(buf, computed_size as u32)?;
124            types::CompactString.encode(buf, &self.cluster_id)?;
125        }
126
127        write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
128        Ok(())
129    }
130    fn compute_size(&self, version: i16) -> Result<usize> {
131        let mut total_size = 0;
132        total_size += types::Int32.compute_size(&self.replica_id)?;
133        total_size += types::Int32.compute_size(&self.max_bytes)?;
134        total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
135        let mut num_tagged_fields = self.unknown_tagged_fields.len();
136        if !self.cluster_id.is_none() {
137            num_tagged_fields += 1;
138        }
139        if num_tagged_fields > std::u32::MAX as usize {
140            bail!(
141                "Too many tagged fields to encode ({} fields)",
142                num_tagged_fields
143            );
144        }
145        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
146        if !self.cluster_id.is_none() {
147            let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
148            if computed_size > std::u32::MAX as usize {
149                bail!(
150                    "Tagged field is too large to encode ({} bytes)",
151                    computed_size
152                );
153            }
154            total_size += types::UnsignedVarInt.compute_size(0)?;
155            total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
156            total_size += computed_size;
157        }
158
159        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
160        Ok(total_size)
161    }
162}
163
164#[cfg(feature = "broker")]
165impl Decodable for FetchSnapshotRequest {
166    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
167        let mut cluster_id = None;
168        let replica_id = types::Int32.decode(buf)?;
169        let max_bytes = types::Int32.decode(buf)?;
170        let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
171        let mut unknown_tagged_fields = BTreeMap::new();
172        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
173        for _ in 0..num_tagged_fields {
174            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
175            let size: u32 = types::UnsignedVarInt.decode(buf)?;
176            match tag {
177                0 => {
178                    cluster_id = types::CompactString.decode(buf)?;
179                }
180                _ => {
181                    let unknown_value = buf.try_get_bytes(size as usize)?;
182                    unknown_tagged_fields.insert(tag as i32, unknown_value);
183                }
184            }
185        }
186        Ok(Self {
187            cluster_id,
188            replica_id,
189            max_bytes,
190            topics,
191            unknown_tagged_fields,
192        })
193    }
194}
195
196impl Default for FetchSnapshotRequest {
197    fn default() -> Self {
198        Self {
199            cluster_id: None,
200            replica_id: (-1).into(),
201            max_bytes: 0x7fffffff,
202            topics: Default::default(),
203            unknown_tagged_fields: BTreeMap::new(),
204        }
205    }
206}
207
208impl Message for FetchSnapshotRequest {
209    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
210    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
211}
212
213/// Valid versions: 0
214#[non_exhaustive]
215#[derive(Debug, Clone, PartialEq)]
216pub struct PartitionSnapshot {
217    /// The partition index
218    ///
219    /// Supported API versions: 0
220    pub partition: i32,
221
222    /// The current leader epoch of the partition, -1 for unknown leader epoch
223    ///
224    /// Supported API versions: 0
225    pub current_leader_epoch: i32,
226
227    /// The snapshot endOffset and epoch to fetch
228    ///
229    /// Supported API versions: 0
230    pub snapshot_id: SnapshotId,
231
232    /// The byte position within the snapshot to start fetching from
233    ///
234    /// Supported API versions: 0
235    pub position: i64,
236
237    /// Other tagged fields
238    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
239}
240
241impl PartitionSnapshot {
242    /// Sets `partition` to the passed value.
243    ///
244    /// The partition index
245    ///
246    /// Supported API versions: 0
247    pub fn with_partition(mut self, value: i32) -> Self {
248        self.partition = value;
249        self
250    }
251    /// Sets `current_leader_epoch` to the passed value.
252    ///
253    /// The current leader epoch of the partition, -1 for unknown leader epoch
254    ///
255    /// Supported API versions: 0
256    pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
257        self.current_leader_epoch = value;
258        self
259    }
260    /// Sets `snapshot_id` to the passed value.
261    ///
262    /// The snapshot endOffset and epoch to fetch
263    ///
264    /// Supported API versions: 0
265    pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
266        self.snapshot_id = value;
267        self
268    }
269    /// Sets `position` to the passed value.
270    ///
271    /// The byte position within the snapshot to start fetching from
272    ///
273    /// Supported API versions: 0
274    pub fn with_position(mut self, value: i64) -> Self {
275        self.position = value;
276        self
277    }
278    /// Sets unknown_tagged_fields to the passed value.
279    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
280        self.unknown_tagged_fields = value;
281        self
282    }
283    /// Inserts an entry into unknown_tagged_fields.
284    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
285        self.unknown_tagged_fields.insert(key, value);
286        self
287    }
288}
289
290#[cfg(feature = "client")]
291impl Encodable for PartitionSnapshot {
292    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
293        types::Int32.encode(buf, &self.partition)?;
294        types::Int32.encode(buf, &self.current_leader_epoch)?;
295        types::Struct { version }.encode(buf, &self.snapshot_id)?;
296        types::Int64.encode(buf, &self.position)?;
297        let num_tagged_fields = self.unknown_tagged_fields.len();
298        if num_tagged_fields > std::u32::MAX as usize {
299            bail!(
300                "Too many tagged fields to encode ({} fields)",
301                num_tagged_fields
302            );
303        }
304        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
305
306        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
307        Ok(())
308    }
309    fn compute_size(&self, version: i16) -> Result<usize> {
310        let mut total_size = 0;
311        total_size += types::Int32.compute_size(&self.partition)?;
312        total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
313        total_size += types::Struct { version }.compute_size(&self.snapshot_id)?;
314        total_size += types::Int64.compute_size(&self.position)?;
315        let num_tagged_fields = self.unknown_tagged_fields.len();
316        if num_tagged_fields > std::u32::MAX as usize {
317            bail!(
318                "Too many tagged fields to encode ({} fields)",
319                num_tagged_fields
320            );
321        }
322        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
323
324        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
325        Ok(total_size)
326    }
327}
328
329#[cfg(feature = "broker")]
330impl Decodable for PartitionSnapshot {
331    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
332        let partition = types::Int32.decode(buf)?;
333        let current_leader_epoch = types::Int32.decode(buf)?;
334        let snapshot_id = types::Struct { version }.decode(buf)?;
335        let position = types::Int64.decode(buf)?;
336        let mut unknown_tagged_fields = BTreeMap::new();
337        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
338        for _ in 0..num_tagged_fields {
339            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
340            let size: u32 = types::UnsignedVarInt.decode(buf)?;
341            let unknown_value = buf.try_get_bytes(size as usize)?;
342            unknown_tagged_fields.insert(tag as i32, unknown_value);
343        }
344        Ok(Self {
345            partition,
346            current_leader_epoch,
347            snapshot_id,
348            position,
349            unknown_tagged_fields,
350        })
351    }
352}
353
354impl Default for PartitionSnapshot {
355    fn default() -> Self {
356        Self {
357            partition: 0,
358            current_leader_epoch: 0,
359            snapshot_id: Default::default(),
360            position: 0,
361            unknown_tagged_fields: BTreeMap::new(),
362        }
363    }
364}
365
366impl Message for PartitionSnapshot {
367    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
368    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
369}
370
371/// Valid versions: 0
372#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq)]
374pub struct SnapshotId {
375    ///
376    ///
377    /// Supported API versions: 0
378    pub end_offset: i64,
379
380    ///
381    ///
382    /// Supported API versions: 0
383    pub epoch: i32,
384
385    /// Other tagged fields
386    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
387}
388
389impl SnapshotId {
390    /// Sets `end_offset` to the passed value.
391    ///
392    ///
393    ///
394    /// Supported API versions: 0
395    pub fn with_end_offset(mut self, value: i64) -> Self {
396        self.end_offset = value;
397        self
398    }
399    /// Sets `epoch` to the passed value.
400    ///
401    ///
402    ///
403    /// Supported API versions: 0
404    pub fn with_epoch(mut self, value: i32) -> Self {
405        self.epoch = value;
406        self
407    }
408    /// Sets unknown_tagged_fields to the passed value.
409    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
410        self.unknown_tagged_fields = value;
411        self
412    }
413    /// Inserts an entry into unknown_tagged_fields.
414    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
415        self.unknown_tagged_fields.insert(key, value);
416        self
417    }
418}
419
420#[cfg(feature = "client")]
421impl Encodable for SnapshotId {
422    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
423        types::Int64.encode(buf, &self.end_offset)?;
424        types::Int32.encode(buf, &self.epoch)?;
425        let num_tagged_fields = self.unknown_tagged_fields.len();
426        if num_tagged_fields > std::u32::MAX as usize {
427            bail!(
428                "Too many tagged fields to encode ({} fields)",
429                num_tagged_fields
430            );
431        }
432        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
433
434        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
435        Ok(())
436    }
437    fn compute_size(&self, version: i16) -> Result<usize> {
438        let mut total_size = 0;
439        total_size += types::Int64.compute_size(&self.end_offset)?;
440        total_size += types::Int32.compute_size(&self.epoch)?;
441        let num_tagged_fields = self.unknown_tagged_fields.len();
442        if num_tagged_fields > std::u32::MAX as usize {
443            bail!(
444                "Too many tagged fields to encode ({} fields)",
445                num_tagged_fields
446            );
447        }
448        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
449
450        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
451        Ok(total_size)
452    }
453}
454
455#[cfg(feature = "broker")]
456impl Decodable for SnapshotId {
457    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
458        let end_offset = types::Int64.decode(buf)?;
459        let epoch = types::Int32.decode(buf)?;
460        let mut unknown_tagged_fields = BTreeMap::new();
461        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
462        for _ in 0..num_tagged_fields {
463            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
464            let size: u32 = types::UnsignedVarInt.decode(buf)?;
465            let unknown_value = buf.try_get_bytes(size as usize)?;
466            unknown_tagged_fields.insert(tag as i32, unknown_value);
467        }
468        Ok(Self {
469            end_offset,
470            epoch,
471            unknown_tagged_fields,
472        })
473    }
474}
475
476impl Default for SnapshotId {
477    fn default() -> Self {
478        Self {
479            end_offset: 0,
480            epoch: 0,
481            unknown_tagged_fields: BTreeMap::new(),
482        }
483    }
484}
485
486impl Message for SnapshotId {
487    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
488    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
489}
490
491/// Valid versions: 0
492#[non_exhaustive]
493#[derive(Debug, Clone, PartialEq)]
494pub struct TopicSnapshot {
495    /// The name of the topic to fetch
496    ///
497    /// Supported API versions: 0
498    pub name: super::TopicName,
499
500    /// The partitions to fetch
501    ///
502    /// Supported API versions: 0
503    pub partitions: Vec<PartitionSnapshot>,
504
505    /// Other tagged fields
506    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
507}
508
509impl TopicSnapshot {
510    /// Sets `name` to the passed value.
511    ///
512    /// The name of the topic to fetch
513    ///
514    /// Supported API versions: 0
515    pub fn with_name(mut self, value: super::TopicName) -> Self {
516        self.name = value;
517        self
518    }
519    /// Sets `partitions` to the passed value.
520    ///
521    /// The partitions to fetch
522    ///
523    /// Supported API versions: 0
524    pub fn with_partitions(mut self, value: Vec<PartitionSnapshot>) -> Self {
525        self.partitions = value;
526        self
527    }
528    /// Sets unknown_tagged_fields to the passed value.
529    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
530        self.unknown_tagged_fields = value;
531        self
532    }
533    /// Inserts an entry into unknown_tagged_fields.
534    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
535        self.unknown_tagged_fields.insert(key, value);
536        self
537    }
538}
539
540#[cfg(feature = "client")]
541impl Encodable for TopicSnapshot {
542    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
543        types::CompactString.encode(buf, &self.name)?;
544        types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
545        let num_tagged_fields = self.unknown_tagged_fields.len();
546        if num_tagged_fields > std::u32::MAX as usize {
547            bail!(
548                "Too many tagged fields to encode ({} fields)",
549                num_tagged_fields
550            );
551        }
552        types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
553
554        write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
555        Ok(())
556    }
557    fn compute_size(&self, version: i16) -> Result<usize> {
558        let mut total_size = 0;
559        total_size += types::CompactString.compute_size(&self.name)?;
560        total_size +=
561            types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
562        let num_tagged_fields = self.unknown_tagged_fields.len();
563        if num_tagged_fields > std::u32::MAX as usize {
564            bail!(
565                "Too many tagged fields to encode ({} fields)",
566                num_tagged_fields
567            );
568        }
569        total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
570
571        total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
572        Ok(total_size)
573    }
574}
575
576#[cfg(feature = "broker")]
577impl Decodable for TopicSnapshot {
578    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
579        let name = types::CompactString.decode(buf)?;
580        let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
581        let mut unknown_tagged_fields = BTreeMap::new();
582        let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
583        for _ in 0..num_tagged_fields {
584            let tag: u32 = types::UnsignedVarInt.decode(buf)?;
585            let size: u32 = types::UnsignedVarInt.decode(buf)?;
586            let unknown_value = buf.try_get_bytes(size as usize)?;
587            unknown_tagged_fields.insert(tag as i32, unknown_value);
588        }
589        Ok(Self {
590            name,
591            partitions,
592            unknown_tagged_fields,
593        })
594    }
595}
596
597impl Default for TopicSnapshot {
598    fn default() -> Self {
599        Self {
600            name: Default::default(),
601            partitions: Default::default(),
602            unknown_tagged_fields: BTreeMap::new(),
603        }
604    }
605}
606
607impl Message for TopicSnapshot {
608    const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
609    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
610}
611
612impl HeaderVersion for FetchSnapshotRequest {
613    fn header_version(version: i16) -> i16 {
614        2
615    }
616}