1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchSnapshotRequest {
24 pub cluster_id: Option<StrBytes>,
28
29 pub replica_id: super::BrokerId,
33
34 pub max_bytes: i32,
38
39 pub topics: Vec<TopicSnapshot>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl FetchSnapshotRequest {
49 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
55 self.cluster_id = value;
56 self
57 }
58 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
64 self.replica_id = value;
65 self
66 }
67 pub fn with_max_bytes(mut self, value: i32) -> Self {
73 self.max_bytes = value;
74 self
75 }
76 pub fn with_topics(mut self, value: Vec<TopicSnapshot>) -> Self {
82 self.topics = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "client")]
98impl Encodable for FetchSnapshotRequest {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 1 {
101 bail!("specified version not supported by this message type");
102 }
103 types::Int32.encode(buf, &self.replica_id)?;
104 types::Int32.encode(buf, &self.max_bytes)?;
105 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
106 let mut num_tagged_fields = self.unknown_tagged_fields.len();
107 if !self.cluster_id.is_none() {
108 num_tagged_fields += 1;
109 }
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
117 if !self.cluster_id.is_none() {
118 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
119 if computed_size > std::u32::MAX as usize {
120 bail!(
121 "Tagged field is too large to encode ({} bytes)",
122 computed_size
123 );
124 }
125 types::UnsignedVarInt.encode(buf, 0)?;
126 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
127 types::CompactString.encode(buf, &self.cluster_id)?;
128 }
129
130 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
131 Ok(())
132 }
133 fn compute_size(&self, version: i16) -> Result<usize> {
134 let mut total_size = 0;
135 total_size += types::Int32.compute_size(&self.replica_id)?;
136 total_size += types::Int32.compute_size(&self.max_bytes)?;
137 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
138 let mut num_tagged_fields = self.unknown_tagged_fields.len();
139 if !self.cluster_id.is_none() {
140 num_tagged_fields += 1;
141 }
142 if num_tagged_fields > std::u32::MAX as usize {
143 bail!(
144 "Too many tagged fields to encode ({} fields)",
145 num_tagged_fields
146 );
147 }
148 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
149 if !self.cluster_id.is_none() {
150 let computed_size = types::CompactString.compute_size(&self.cluster_id)?;
151 if computed_size > std::u32::MAX as usize {
152 bail!(
153 "Tagged field is too large to encode ({} bytes)",
154 computed_size
155 );
156 }
157 total_size += types::UnsignedVarInt.compute_size(0)?;
158 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
159 total_size += computed_size;
160 }
161
162 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
163 Ok(total_size)
164 }
165}
166
167#[cfg(feature = "broker")]
168impl Decodable for FetchSnapshotRequest {
169 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
170 if version < 0 || version > 1 {
171 bail!("specified version not supported by this message type");
172 }
173 let mut cluster_id = None;
174 let replica_id = types::Int32.decode(buf)?;
175 let max_bytes = types::Int32.decode(buf)?;
176 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
177 let mut unknown_tagged_fields = BTreeMap::new();
178 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
179 for _ in 0..num_tagged_fields {
180 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
181 let size: u32 = types::UnsignedVarInt.decode(buf)?;
182 match tag {
183 0 => {
184 cluster_id = types::CompactString.decode(buf)?;
185 }
186 _ => {
187 let unknown_value = buf.try_get_bytes(size as usize)?;
188 unknown_tagged_fields.insert(tag as i32, unknown_value);
189 }
190 }
191 }
192 Ok(Self {
193 cluster_id,
194 replica_id,
195 max_bytes,
196 topics,
197 unknown_tagged_fields,
198 })
199 }
200}
201
202impl Default for FetchSnapshotRequest {
203 fn default() -> Self {
204 Self {
205 cluster_id: None,
206 replica_id: (-1).into(),
207 max_bytes: 0x7fffffff,
208 topics: Default::default(),
209 unknown_tagged_fields: BTreeMap::new(),
210 }
211 }
212}
213
214impl Message for FetchSnapshotRequest {
215 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
216 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
217}
218
219#[non_exhaustive]
221#[derive(Debug, Clone, PartialEq)]
222pub struct PartitionSnapshot {
223 pub partition: i32,
227
228 pub current_leader_epoch: i32,
232
233 pub snapshot_id: SnapshotId,
237
238 pub position: i64,
242
243 pub replica_directory_id: Uuid,
247
248 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
250}
251
252impl PartitionSnapshot {
253 pub fn with_partition(mut self, value: i32) -> Self {
259 self.partition = value;
260 self
261 }
262 pub fn with_current_leader_epoch(mut self, value: i32) -> Self {
268 self.current_leader_epoch = value;
269 self
270 }
271 pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
277 self.snapshot_id = value;
278 self
279 }
280 pub fn with_position(mut self, value: i64) -> Self {
286 self.position = value;
287 self
288 }
289 pub fn with_replica_directory_id(mut self, value: Uuid) -> Self {
295 self.replica_directory_id = value;
296 self
297 }
298 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
300 self.unknown_tagged_fields = value;
301 self
302 }
303 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
305 self.unknown_tagged_fields.insert(key, value);
306 self
307 }
308}
309
310#[cfg(feature = "client")]
311impl Encodable for PartitionSnapshot {
312 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
313 if version < 0 || version > 1 {
314 bail!("specified version not supported by this message type");
315 }
316 types::Int32.encode(buf, &self.partition)?;
317 types::Int32.encode(buf, &self.current_leader_epoch)?;
318 types::Struct { version }.encode(buf, &self.snapshot_id)?;
319 types::Int64.encode(buf, &self.position)?;
320 let mut num_tagged_fields = self.unknown_tagged_fields.len();
321 if version >= 1 {
322 if &self.replica_directory_id != &Uuid::nil() {
323 num_tagged_fields += 1;
324 }
325 }
326 if num_tagged_fields > std::u32::MAX as usize {
327 bail!(
328 "Too many tagged fields to encode ({} fields)",
329 num_tagged_fields
330 );
331 }
332 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
333 if version >= 1 {
334 if &self.replica_directory_id != &Uuid::nil() {
335 let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
336 if computed_size > std::u32::MAX as usize {
337 bail!(
338 "Tagged field is too large to encode ({} bytes)",
339 computed_size
340 );
341 }
342 types::UnsignedVarInt.encode(buf, 0)?;
343 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
344 types::Uuid.encode(buf, &self.replica_directory_id)?;
345 }
346 }
347 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
348 Ok(())
349 }
350 fn compute_size(&self, version: i16) -> Result<usize> {
351 let mut total_size = 0;
352 total_size += types::Int32.compute_size(&self.partition)?;
353 total_size += types::Int32.compute_size(&self.current_leader_epoch)?;
354 total_size += types::Struct { version }.compute_size(&self.snapshot_id)?;
355 total_size += types::Int64.compute_size(&self.position)?;
356 let mut num_tagged_fields = self.unknown_tagged_fields.len();
357 if version >= 1 {
358 if &self.replica_directory_id != &Uuid::nil() {
359 num_tagged_fields += 1;
360 }
361 }
362 if num_tagged_fields > std::u32::MAX as usize {
363 bail!(
364 "Too many tagged fields to encode ({} fields)",
365 num_tagged_fields
366 );
367 }
368 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
369 if version >= 1 {
370 if &self.replica_directory_id != &Uuid::nil() {
371 let computed_size = types::Uuid.compute_size(&self.replica_directory_id)?;
372 if computed_size > std::u32::MAX as usize {
373 bail!(
374 "Tagged field is too large to encode ({} bytes)",
375 computed_size
376 );
377 }
378 total_size += types::UnsignedVarInt.compute_size(0)?;
379 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
380 total_size += computed_size;
381 }
382 }
383 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
384 Ok(total_size)
385 }
386}
387
388#[cfg(feature = "broker")]
389impl Decodable for PartitionSnapshot {
390 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
391 if version < 0 || version > 1 {
392 bail!("specified version not supported by this message type");
393 }
394 let partition = types::Int32.decode(buf)?;
395 let current_leader_epoch = types::Int32.decode(buf)?;
396 let snapshot_id = types::Struct { version }.decode(buf)?;
397 let position = types::Int64.decode(buf)?;
398 let mut replica_directory_id = Uuid::nil();
399 let mut unknown_tagged_fields = BTreeMap::new();
400 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
401 for _ in 0..num_tagged_fields {
402 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
403 let size: u32 = types::UnsignedVarInt.decode(buf)?;
404 match tag {
405 0 => {
406 if version >= 1 {
407 replica_directory_id = types::Uuid.decode(buf)?;
408 } else {
409 bail!("Tag {} is not valid for version {}", tag, version);
410 }
411 }
412 _ => {
413 let unknown_value = buf.try_get_bytes(size as usize)?;
414 unknown_tagged_fields.insert(tag as i32, unknown_value);
415 }
416 }
417 }
418 Ok(Self {
419 partition,
420 current_leader_epoch,
421 snapshot_id,
422 position,
423 replica_directory_id,
424 unknown_tagged_fields,
425 })
426 }
427}
428
429impl Default for PartitionSnapshot {
430 fn default() -> Self {
431 Self {
432 partition: 0,
433 current_leader_epoch: 0,
434 snapshot_id: Default::default(),
435 position: 0,
436 replica_directory_id: Uuid::nil(),
437 unknown_tagged_fields: BTreeMap::new(),
438 }
439 }
440}
441
442impl Message for PartitionSnapshot {
443 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
444 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
445}
446
447#[non_exhaustive]
449#[derive(Debug, Clone, PartialEq)]
450pub struct SnapshotId {
451 pub end_offset: i64,
455
456 pub epoch: i32,
460
461 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
463}
464
465impl SnapshotId {
466 pub fn with_end_offset(mut self, value: i64) -> Self {
472 self.end_offset = value;
473 self
474 }
475 pub fn with_epoch(mut self, value: i32) -> Self {
481 self.epoch = value;
482 self
483 }
484 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
486 self.unknown_tagged_fields = value;
487 self
488 }
489 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
491 self.unknown_tagged_fields.insert(key, value);
492 self
493 }
494}
495
496#[cfg(feature = "client")]
497impl Encodable for SnapshotId {
498 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
499 if version < 0 || version > 1 {
500 bail!("specified version not supported by this message type");
501 }
502 types::Int64.encode(buf, &self.end_offset)?;
503 types::Int32.encode(buf, &self.epoch)?;
504 let num_tagged_fields = self.unknown_tagged_fields.len();
505 if num_tagged_fields > std::u32::MAX as usize {
506 bail!(
507 "Too many tagged fields to encode ({} fields)",
508 num_tagged_fields
509 );
510 }
511 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
512
513 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
514 Ok(())
515 }
516 fn compute_size(&self, version: i16) -> Result<usize> {
517 let mut total_size = 0;
518 total_size += types::Int64.compute_size(&self.end_offset)?;
519 total_size += types::Int32.compute_size(&self.epoch)?;
520 let num_tagged_fields = self.unknown_tagged_fields.len();
521 if num_tagged_fields > std::u32::MAX as usize {
522 bail!(
523 "Too many tagged fields to encode ({} fields)",
524 num_tagged_fields
525 );
526 }
527 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
528
529 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
530 Ok(total_size)
531 }
532}
533
534#[cfg(feature = "broker")]
535impl Decodable for SnapshotId {
536 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
537 if version < 0 || version > 1 {
538 bail!("specified version not supported by this message type");
539 }
540 let end_offset = types::Int64.decode(buf)?;
541 let epoch = types::Int32.decode(buf)?;
542 let mut unknown_tagged_fields = BTreeMap::new();
543 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
544 for _ in 0..num_tagged_fields {
545 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
546 let size: u32 = types::UnsignedVarInt.decode(buf)?;
547 let unknown_value = buf.try_get_bytes(size as usize)?;
548 unknown_tagged_fields.insert(tag as i32, unknown_value);
549 }
550 Ok(Self {
551 end_offset,
552 epoch,
553 unknown_tagged_fields,
554 })
555 }
556}
557
558impl Default for SnapshotId {
559 fn default() -> Self {
560 Self {
561 end_offset: 0,
562 epoch: 0,
563 unknown_tagged_fields: BTreeMap::new(),
564 }
565 }
566}
567
568impl Message for SnapshotId {
569 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
570 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
571}
572
573#[non_exhaustive]
575#[derive(Debug, Clone, PartialEq)]
576pub struct TopicSnapshot {
577 pub name: super::TopicName,
581
582 pub partitions: Vec<PartitionSnapshot>,
586
587 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
589}
590
591impl TopicSnapshot {
592 pub fn with_name(mut self, value: super::TopicName) -> Self {
598 self.name = value;
599 self
600 }
601 pub fn with_partitions(mut self, value: Vec<PartitionSnapshot>) -> Self {
607 self.partitions = value;
608 self
609 }
610 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
612 self.unknown_tagged_fields = value;
613 self
614 }
615 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
617 self.unknown_tagged_fields.insert(key, value);
618 self
619 }
620}
621
622#[cfg(feature = "client")]
623impl Encodable for TopicSnapshot {
624 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
625 if version < 0 || version > 1 {
626 bail!("specified version not supported by this message type");
627 }
628 types::CompactString.encode(buf, &self.name)?;
629 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
630 let num_tagged_fields = self.unknown_tagged_fields.len();
631 if num_tagged_fields > std::u32::MAX as usize {
632 bail!(
633 "Too many tagged fields to encode ({} fields)",
634 num_tagged_fields
635 );
636 }
637 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
638
639 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
640 Ok(())
641 }
642 fn compute_size(&self, version: i16) -> Result<usize> {
643 let mut total_size = 0;
644 total_size += types::CompactString.compute_size(&self.name)?;
645 total_size +=
646 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
647 let num_tagged_fields = self.unknown_tagged_fields.len();
648 if num_tagged_fields > std::u32::MAX as usize {
649 bail!(
650 "Too many tagged fields to encode ({} fields)",
651 num_tagged_fields
652 );
653 }
654 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
655
656 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
657 Ok(total_size)
658 }
659}
660
661#[cfg(feature = "broker")]
662impl Decodable for TopicSnapshot {
663 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
664 if version < 0 || version > 1 {
665 bail!("specified version not supported by this message type");
666 }
667 let name = types::CompactString.decode(buf)?;
668 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
669 let mut unknown_tagged_fields = BTreeMap::new();
670 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
671 for _ in 0..num_tagged_fields {
672 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
673 let size: u32 = types::UnsignedVarInt.decode(buf)?;
674 let unknown_value = buf.try_get_bytes(size as usize)?;
675 unknown_tagged_fields.insert(tag as i32, unknown_value);
676 }
677 Ok(Self {
678 name,
679 partitions,
680 unknown_tagged_fields,
681 })
682 }
683}
684
685impl Default for TopicSnapshot {
686 fn default() -> Self {
687 Self {
688 name: Default::default(),
689 partitions: Default::default(),
690 unknown_tagged_fields: BTreeMap::new(),
691 }
692 }
693}
694
695impl Message for TopicSnapshot {
696 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
697 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
698}
699
700impl HeaderVersion for FetchSnapshotRequest {
701 fn header_version(version: i16) -> i16 {
702 2
703 }
704}