1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchSnapshotResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub topics: Vec<TopicSnapshot>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl FetchSnapshotResponse {
44 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
50 self.throttle_time_ms = value;
51 self
52 }
53 pub fn with_error_code(mut self, value: i16) -> Self {
59 self.error_code = value;
60 self
61 }
62 pub fn with_topics(mut self, value: Vec<TopicSnapshot>) -> Self {
68 self.topics = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for FetchSnapshotResponse {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 types::Int32.encode(buf, &self.throttle_time_ms)?;
87 types::Int16.encode(buf, &self.error_code)?;
88 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
89 let num_tagged_fields = self.unknown_tagged_fields.len();
90 if num_tagged_fields > std::u32::MAX as usize {
91 bail!(
92 "Too many tagged fields to encode ({} fields)",
93 num_tagged_fields
94 );
95 }
96 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
97
98 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
99 Ok(())
100 }
101 fn compute_size(&self, version: i16) -> Result<usize> {
102 let mut total_size = 0;
103 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
104 total_size += types::Int16.compute_size(&self.error_code)?;
105 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
106 let num_tagged_fields = self.unknown_tagged_fields.len();
107 if num_tagged_fields > std::u32::MAX as usize {
108 bail!(
109 "Too many tagged fields to encode ({} fields)",
110 num_tagged_fields
111 );
112 }
113 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
114
115 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
116 Ok(total_size)
117 }
118}
119
120#[cfg(feature = "client")]
121impl Decodable for FetchSnapshotResponse {
122 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
123 let throttle_time_ms = types::Int32.decode(buf)?;
124 let error_code = types::Int16.decode(buf)?;
125 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
126 let mut unknown_tagged_fields = BTreeMap::new();
127 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
128 for _ in 0..num_tagged_fields {
129 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
130 let size: u32 = types::UnsignedVarInt.decode(buf)?;
131 let unknown_value = buf.try_get_bytes(size as usize)?;
132 unknown_tagged_fields.insert(tag as i32, unknown_value);
133 }
134 Ok(Self {
135 throttle_time_ms,
136 error_code,
137 topics,
138 unknown_tagged_fields,
139 })
140 }
141}
142
143impl Default for FetchSnapshotResponse {
144 fn default() -> Self {
145 Self {
146 throttle_time_ms: 0,
147 error_code: 0,
148 topics: Default::default(),
149 unknown_tagged_fields: BTreeMap::new(),
150 }
151 }
152}
153
154impl Message for FetchSnapshotResponse {
155 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
156 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
157}
158
159#[non_exhaustive]
161#[derive(Debug, Clone, PartialEq)]
162pub struct LeaderIdAndEpoch {
163 pub leader_id: super::BrokerId,
167
168 pub leader_epoch: i32,
172
173 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
175}
176
177impl LeaderIdAndEpoch {
178 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
184 self.leader_id = value;
185 self
186 }
187 pub fn with_leader_epoch(mut self, value: i32) -> Self {
193 self.leader_epoch = value;
194 self
195 }
196 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
198 self.unknown_tagged_fields = value;
199 self
200 }
201 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
203 self.unknown_tagged_fields.insert(key, value);
204 self
205 }
206}
207
208#[cfg(feature = "broker")]
209impl Encodable for LeaderIdAndEpoch {
210 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
211 types::Int32.encode(buf, &self.leader_id)?;
212 types::Int32.encode(buf, &self.leader_epoch)?;
213 let num_tagged_fields = self.unknown_tagged_fields.len();
214 if num_tagged_fields > std::u32::MAX as usize {
215 bail!(
216 "Too many tagged fields to encode ({} fields)",
217 num_tagged_fields
218 );
219 }
220 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
221
222 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
223 Ok(())
224 }
225 fn compute_size(&self, version: i16) -> Result<usize> {
226 let mut total_size = 0;
227 total_size += types::Int32.compute_size(&self.leader_id)?;
228 total_size += types::Int32.compute_size(&self.leader_epoch)?;
229 let num_tagged_fields = self.unknown_tagged_fields.len();
230 if num_tagged_fields > std::u32::MAX as usize {
231 bail!(
232 "Too many tagged fields to encode ({} fields)",
233 num_tagged_fields
234 );
235 }
236 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
237
238 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
239 Ok(total_size)
240 }
241}
242
243#[cfg(feature = "client")]
244impl Decodable for LeaderIdAndEpoch {
245 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
246 let leader_id = types::Int32.decode(buf)?;
247 let leader_epoch = types::Int32.decode(buf)?;
248 let mut unknown_tagged_fields = BTreeMap::new();
249 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
250 for _ in 0..num_tagged_fields {
251 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
252 let size: u32 = types::UnsignedVarInt.decode(buf)?;
253 let unknown_value = buf.try_get_bytes(size as usize)?;
254 unknown_tagged_fields.insert(tag as i32, unknown_value);
255 }
256 Ok(Self {
257 leader_id,
258 leader_epoch,
259 unknown_tagged_fields,
260 })
261 }
262}
263
264impl Default for LeaderIdAndEpoch {
265 fn default() -> Self {
266 Self {
267 leader_id: (0).into(),
268 leader_epoch: 0,
269 unknown_tagged_fields: BTreeMap::new(),
270 }
271 }
272}
273
274impl Message for LeaderIdAndEpoch {
275 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
276 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
277}
278
279#[non_exhaustive]
281#[derive(Debug, Clone, PartialEq)]
282pub struct PartitionSnapshot {
283 pub index: i32,
287
288 pub error_code: i16,
292
293 pub snapshot_id: SnapshotId,
297
298 pub current_leader: LeaderIdAndEpoch,
302
303 pub size: i64,
307
308 pub position: i64,
312
313 pub unaligned_records: Bytes,
317
318 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
320}
321
322impl PartitionSnapshot {
323 pub fn with_index(mut self, value: i32) -> Self {
329 self.index = value;
330 self
331 }
332 pub fn with_error_code(mut self, value: i16) -> Self {
338 self.error_code = value;
339 self
340 }
341 pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
347 self.snapshot_id = value;
348 self
349 }
350 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
356 self.current_leader = value;
357 self
358 }
359 pub fn with_size(mut self, value: i64) -> Self {
365 self.size = value;
366 self
367 }
368 pub fn with_position(mut self, value: i64) -> Self {
374 self.position = value;
375 self
376 }
377 pub fn with_unaligned_records(mut self, value: Bytes) -> Self {
383 self.unaligned_records = value;
384 self
385 }
386 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
388 self.unknown_tagged_fields = value;
389 self
390 }
391 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
393 self.unknown_tagged_fields.insert(key, value);
394 self
395 }
396}
397
398#[cfg(feature = "broker")]
399impl Encodable for PartitionSnapshot {
400 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
401 types::Int32.encode(buf, &self.index)?;
402 types::Int16.encode(buf, &self.error_code)?;
403 types::Struct { version }.encode(buf, &self.snapshot_id)?;
404 types::Int64.encode(buf, &self.size)?;
405 types::Int64.encode(buf, &self.position)?;
406 types::CompactBytes.encode(buf, &self.unaligned_records)?;
407 let mut num_tagged_fields = self.unknown_tagged_fields.len();
408 if &self.current_leader != &Default::default() {
409 num_tagged_fields += 1;
410 }
411 if num_tagged_fields > std::u32::MAX as usize {
412 bail!(
413 "Too many tagged fields to encode ({} fields)",
414 num_tagged_fields
415 );
416 }
417 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
418 if &self.current_leader != &Default::default() {
419 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
420 if computed_size > std::u32::MAX as usize {
421 bail!(
422 "Tagged field is too large to encode ({} bytes)",
423 computed_size
424 );
425 }
426 types::UnsignedVarInt.encode(buf, 0)?;
427 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
428 types::Struct { version }.encode(buf, &self.current_leader)?;
429 }
430
431 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
432 Ok(())
433 }
434 fn compute_size(&self, version: i16) -> Result<usize> {
435 let mut total_size = 0;
436 total_size += types::Int32.compute_size(&self.index)?;
437 total_size += types::Int16.compute_size(&self.error_code)?;
438 total_size += types::Struct { version }.compute_size(&self.snapshot_id)?;
439 total_size += types::Int64.compute_size(&self.size)?;
440 total_size += types::Int64.compute_size(&self.position)?;
441 total_size += types::CompactBytes.compute_size(&self.unaligned_records)?;
442 let mut num_tagged_fields = self.unknown_tagged_fields.len();
443 if &self.current_leader != &Default::default() {
444 num_tagged_fields += 1;
445 }
446 if num_tagged_fields > std::u32::MAX as usize {
447 bail!(
448 "Too many tagged fields to encode ({} fields)",
449 num_tagged_fields
450 );
451 }
452 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
453 if &self.current_leader != &Default::default() {
454 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
455 if computed_size > std::u32::MAX as usize {
456 bail!(
457 "Tagged field is too large to encode ({} bytes)",
458 computed_size
459 );
460 }
461 total_size += types::UnsignedVarInt.compute_size(0)?;
462 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
463 total_size += computed_size;
464 }
465
466 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
467 Ok(total_size)
468 }
469}
470
471#[cfg(feature = "client")]
472impl Decodable for PartitionSnapshot {
473 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
474 let index = types::Int32.decode(buf)?;
475 let error_code = types::Int16.decode(buf)?;
476 let snapshot_id = types::Struct { version }.decode(buf)?;
477 let mut current_leader = Default::default();
478 let size = types::Int64.decode(buf)?;
479 let position = types::Int64.decode(buf)?;
480 let unaligned_records = types::CompactBytes.decode(buf)?;
481 let mut unknown_tagged_fields = BTreeMap::new();
482 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
483 for _ in 0..num_tagged_fields {
484 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
485 let size: u32 = types::UnsignedVarInt.decode(buf)?;
486 match tag {
487 0 => {
488 current_leader = types::Struct { version }.decode(buf)?;
489 }
490 _ => {
491 let unknown_value = buf.try_get_bytes(size as usize)?;
492 unknown_tagged_fields.insert(tag as i32, unknown_value);
493 }
494 }
495 }
496 Ok(Self {
497 index,
498 error_code,
499 snapshot_id,
500 current_leader,
501 size,
502 position,
503 unaligned_records,
504 unknown_tagged_fields,
505 })
506 }
507}
508
509impl Default for PartitionSnapshot {
510 fn default() -> Self {
511 Self {
512 index: 0,
513 error_code: 0,
514 snapshot_id: Default::default(),
515 current_leader: Default::default(),
516 size: 0,
517 position: 0,
518 unaligned_records: Default::default(),
519 unknown_tagged_fields: BTreeMap::new(),
520 }
521 }
522}
523
524impl Message for PartitionSnapshot {
525 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
526 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
527}
528
529#[non_exhaustive]
531#[derive(Debug, Clone, PartialEq)]
532pub struct SnapshotId {
533 pub end_offset: i64,
537
538 pub epoch: i32,
542
543 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
545}
546
547impl SnapshotId {
548 pub fn with_end_offset(mut self, value: i64) -> Self {
554 self.end_offset = value;
555 self
556 }
557 pub fn with_epoch(mut self, value: i32) -> Self {
563 self.epoch = value;
564 self
565 }
566 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
568 self.unknown_tagged_fields = value;
569 self
570 }
571 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
573 self.unknown_tagged_fields.insert(key, value);
574 self
575 }
576}
577
578#[cfg(feature = "broker")]
579impl Encodable for SnapshotId {
580 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
581 types::Int64.encode(buf, &self.end_offset)?;
582 types::Int32.encode(buf, &self.epoch)?;
583 let num_tagged_fields = self.unknown_tagged_fields.len();
584 if num_tagged_fields > std::u32::MAX as usize {
585 bail!(
586 "Too many tagged fields to encode ({} fields)",
587 num_tagged_fields
588 );
589 }
590 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
591
592 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
593 Ok(())
594 }
595 fn compute_size(&self, version: i16) -> Result<usize> {
596 let mut total_size = 0;
597 total_size += types::Int64.compute_size(&self.end_offset)?;
598 total_size += types::Int32.compute_size(&self.epoch)?;
599 let num_tagged_fields = self.unknown_tagged_fields.len();
600 if num_tagged_fields > std::u32::MAX as usize {
601 bail!(
602 "Too many tagged fields to encode ({} fields)",
603 num_tagged_fields
604 );
605 }
606 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
607
608 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
609 Ok(total_size)
610 }
611}
612
613#[cfg(feature = "client")]
614impl Decodable for SnapshotId {
615 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
616 let end_offset = types::Int64.decode(buf)?;
617 let epoch = types::Int32.decode(buf)?;
618 let mut unknown_tagged_fields = BTreeMap::new();
619 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
620 for _ in 0..num_tagged_fields {
621 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
622 let size: u32 = types::UnsignedVarInt.decode(buf)?;
623 let unknown_value = buf.try_get_bytes(size as usize)?;
624 unknown_tagged_fields.insert(tag as i32, unknown_value);
625 }
626 Ok(Self {
627 end_offset,
628 epoch,
629 unknown_tagged_fields,
630 })
631 }
632}
633
634impl Default for SnapshotId {
635 fn default() -> Self {
636 Self {
637 end_offset: 0,
638 epoch: 0,
639 unknown_tagged_fields: BTreeMap::new(),
640 }
641 }
642}
643
644impl Message for SnapshotId {
645 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
646 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
647}
648
649#[non_exhaustive]
651#[derive(Debug, Clone, PartialEq)]
652pub struct TopicSnapshot {
653 pub name: super::TopicName,
657
658 pub partitions: Vec<PartitionSnapshot>,
662
663 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
665}
666
667impl TopicSnapshot {
668 pub fn with_name(mut self, value: super::TopicName) -> Self {
674 self.name = value;
675 self
676 }
677 pub fn with_partitions(mut self, value: Vec<PartitionSnapshot>) -> Self {
683 self.partitions = value;
684 self
685 }
686 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
688 self.unknown_tagged_fields = value;
689 self
690 }
691 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
693 self.unknown_tagged_fields.insert(key, value);
694 self
695 }
696}
697
698#[cfg(feature = "broker")]
699impl Encodable for TopicSnapshot {
700 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
701 types::CompactString.encode(buf, &self.name)?;
702 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
703 let num_tagged_fields = self.unknown_tagged_fields.len();
704 if num_tagged_fields > std::u32::MAX as usize {
705 bail!(
706 "Too many tagged fields to encode ({} fields)",
707 num_tagged_fields
708 );
709 }
710 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
711
712 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
713 Ok(())
714 }
715 fn compute_size(&self, version: i16) -> Result<usize> {
716 let mut total_size = 0;
717 total_size += types::CompactString.compute_size(&self.name)?;
718 total_size +=
719 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
720 let num_tagged_fields = self.unknown_tagged_fields.len();
721 if num_tagged_fields > std::u32::MAX as usize {
722 bail!(
723 "Too many tagged fields to encode ({} fields)",
724 num_tagged_fields
725 );
726 }
727 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
728
729 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
730 Ok(total_size)
731 }
732}
733
734#[cfg(feature = "client")]
735impl Decodable for TopicSnapshot {
736 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
737 let name = types::CompactString.decode(buf)?;
738 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
739 let mut unknown_tagged_fields = BTreeMap::new();
740 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
741 for _ in 0..num_tagged_fields {
742 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
743 let size: u32 = types::UnsignedVarInt.decode(buf)?;
744 let unknown_value = buf.try_get_bytes(size as usize)?;
745 unknown_tagged_fields.insert(tag as i32, unknown_value);
746 }
747 Ok(Self {
748 name,
749 partitions,
750 unknown_tagged_fields,
751 })
752 }
753}
754
755impl Default for TopicSnapshot {
756 fn default() -> Self {
757 Self {
758 name: Default::default(),
759 partitions: Default::default(),
760 unknown_tagged_fields: BTreeMap::new(),
761 }
762 }
763}
764
765impl Message for TopicSnapshot {
766 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
767 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
768}
769
770impl HeaderVersion for FetchSnapshotResponse {
771 fn header_version(version: i16) -> i16 {
772 1
773 }
774}