1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct FetchSnapshotResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub topics: Vec<TopicSnapshot>,
38
39 pub node_endpoints: Vec<NodeEndpoint>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl FetchSnapshotResponse {
49 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
55 self.throttle_time_ms = value;
56 self
57 }
58 pub fn with_error_code(mut self, value: i16) -> Self {
64 self.error_code = value;
65 self
66 }
67 pub fn with_topics(mut self, value: Vec<TopicSnapshot>) -> Self {
73 self.topics = value;
74 self
75 }
76 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
82 self.node_endpoints = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "broker")]
98impl Encodable for FetchSnapshotResponse {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version < 0 || version > 1 {
101 bail!("specified version not supported by this message type");
102 }
103 types::Int32.encode(buf, &self.throttle_time_ms)?;
104 types::Int16.encode(buf, &self.error_code)?;
105 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
106 let mut num_tagged_fields = self.unknown_tagged_fields.len();
107 if version >= 1 {
108 if !self.node_endpoints.is_empty() {
109 num_tagged_fields += 1;
110 }
111 }
112 if num_tagged_fields > std::u32::MAX as usize {
113 bail!(
114 "Too many tagged fields to encode ({} fields)",
115 num_tagged_fields
116 );
117 }
118 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
119 if version >= 1 {
120 if !self.node_endpoints.is_empty() {
121 let computed_size = types::CompactArray(types::Struct { version })
122 .compute_size(&self.node_endpoints)?;
123 if computed_size > std::u32::MAX as usize {
124 bail!(
125 "Tagged field is too large to encode ({} bytes)",
126 computed_size
127 );
128 }
129 types::UnsignedVarInt.encode(buf, 0)?;
130 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
131 types::CompactArray(types::Struct { version }).encode(buf, &self.node_endpoints)?;
132 }
133 }
134 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
135 Ok(())
136 }
137 fn compute_size(&self, version: i16) -> Result<usize> {
138 let mut total_size = 0;
139 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
140 total_size += types::Int16.compute_size(&self.error_code)?;
141 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
142 let mut num_tagged_fields = self.unknown_tagged_fields.len();
143 if version >= 1 {
144 if !self.node_endpoints.is_empty() {
145 num_tagged_fields += 1;
146 }
147 }
148 if num_tagged_fields > std::u32::MAX as usize {
149 bail!(
150 "Too many tagged fields to encode ({} fields)",
151 num_tagged_fields
152 );
153 }
154 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
155 if version >= 1 {
156 if !self.node_endpoints.is_empty() {
157 let computed_size = types::CompactArray(types::Struct { version })
158 .compute_size(&self.node_endpoints)?;
159 if computed_size > std::u32::MAX as usize {
160 bail!(
161 "Tagged field is too large to encode ({} bytes)",
162 computed_size
163 );
164 }
165 total_size += types::UnsignedVarInt.compute_size(0)?;
166 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
167 total_size += computed_size;
168 }
169 }
170 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
171 Ok(total_size)
172 }
173}
174
175#[cfg(feature = "client")]
176impl Decodable for FetchSnapshotResponse {
177 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
178 if version < 0 || version > 1 {
179 bail!("specified version not supported by this message type");
180 }
181 let throttle_time_ms = types::Int32.decode(buf)?;
182 let error_code = types::Int16.decode(buf)?;
183 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
184 let mut node_endpoints = Default::default();
185 let mut unknown_tagged_fields = BTreeMap::new();
186 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
187 for _ in 0..num_tagged_fields {
188 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
189 let size: u32 = types::UnsignedVarInt.decode(buf)?;
190 match tag {
191 0 => {
192 if version >= 1 {
193 node_endpoints =
194 types::CompactArray(types::Struct { version }).decode(buf)?;
195 } else {
196 bail!("Tag {} is not valid for version {}", tag, version);
197 }
198 }
199 _ => {
200 let unknown_value = buf.try_get_bytes(size as usize)?;
201 unknown_tagged_fields.insert(tag as i32, unknown_value);
202 }
203 }
204 }
205 Ok(Self {
206 throttle_time_ms,
207 error_code,
208 topics,
209 node_endpoints,
210 unknown_tagged_fields,
211 })
212 }
213}
214
215impl Default for FetchSnapshotResponse {
216 fn default() -> Self {
217 Self {
218 throttle_time_ms: 0,
219 error_code: 0,
220 topics: Default::default(),
221 node_endpoints: Default::default(),
222 unknown_tagged_fields: BTreeMap::new(),
223 }
224 }
225}
226
227impl Message for FetchSnapshotResponse {
228 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
229 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
230}
231
232#[non_exhaustive]
234#[derive(Debug, Clone, PartialEq)]
235pub struct LeaderIdAndEpoch {
236 pub leader_id: super::BrokerId,
240
241 pub leader_epoch: i32,
245
246 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
248}
249
250impl LeaderIdAndEpoch {
251 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
257 self.leader_id = value;
258 self
259 }
260 pub fn with_leader_epoch(mut self, value: i32) -> Self {
266 self.leader_epoch = value;
267 self
268 }
269 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
271 self.unknown_tagged_fields = value;
272 self
273 }
274 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
276 self.unknown_tagged_fields.insert(key, value);
277 self
278 }
279}
280
281#[cfg(feature = "broker")]
282impl Encodable for LeaderIdAndEpoch {
283 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
284 if version < 0 || version > 1 {
285 bail!("specified version not supported by this message type");
286 }
287 types::Int32.encode(buf, &self.leader_id)?;
288 types::Int32.encode(buf, &self.leader_epoch)?;
289 let num_tagged_fields = self.unknown_tagged_fields.len();
290 if num_tagged_fields > std::u32::MAX as usize {
291 bail!(
292 "Too many tagged fields to encode ({} fields)",
293 num_tagged_fields
294 );
295 }
296 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
297
298 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
299 Ok(())
300 }
301 fn compute_size(&self, version: i16) -> Result<usize> {
302 let mut total_size = 0;
303 total_size += types::Int32.compute_size(&self.leader_id)?;
304 total_size += types::Int32.compute_size(&self.leader_epoch)?;
305 let num_tagged_fields = self.unknown_tagged_fields.len();
306 if num_tagged_fields > std::u32::MAX as usize {
307 bail!(
308 "Too many tagged fields to encode ({} fields)",
309 num_tagged_fields
310 );
311 }
312 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
313
314 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
315 Ok(total_size)
316 }
317}
318
319#[cfg(feature = "client")]
320impl Decodable for LeaderIdAndEpoch {
321 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
322 if version < 0 || version > 1 {
323 bail!("specified version not supported by this message type");
324 }
325 let leader_id = types::Int32.decode(buf)?;
326 let leader_epoch = types::Int32.decode(buf)?;
327 let mut unknown_tagged_fields = BTreeMap::new();
328 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
329 for _ in 0..num_tagged_fields {
330 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
331 let size: u32 = types::UnsignedVarInt.decode(buf)?;
332 let unknown_value = buf.try_get_bytes(size as usize)?;
333 unknown_tagged_fields.insert(tag as i32, unknown_value);
334 }
335 Ok(Self {
336 leader_id,
337 leader_epoch,
338 unknown_tagged_fields,
339 })
340 }
341}
342
343impl Default for LeaderIdAndEpoch {
344 fn default() -> Self {
345 Self {
346 leader_id: (0).into(),
347 leader_epoch: 0,
348 unknown_tagged_fields: BTreeMap::new(),
349 }
350 }
351}
352
353impl Message for LeaderIdAndEpoch {
354 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
355 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
356}
357
358#[non_exhaustive]
360#[derive(Debug, Clone, PartialEq)]
361pub struct NodeEndpoint {
362 pub node_id: super::BrokerId,
366
367 pub host: StrBytes,
371
372 pub port: u16,
376
377 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
379}
380
381impl NodeEndpoint {
382 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
388 self.node_id = value;
389 self
390 }
391 pub fn with_host(mut self, value: StrBytes) -> Self {
397 self.host = value;
398 self
399 }
400 pub fn with_port(mut self, value: u16) -> Self {
406 self.port = value;
407 self
408 }
409 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
411 self.unknown_tagged_fields = value;
412 self
413 }
414 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
416 self.unknown_tagged_fields.insert(key, value);
417 self
418 }
419}
420
421#[cfg(feature = "broker")]
422impl Encodable for NodeEndpoint {
423 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
424 if version < 0 || version > 1 {
425 bail!("specified version not supported by this message type");
426 }
427 if version >= 1 {
428 types::Int32.encode(buf, &self.node_id)?;
429 } else {
430 if self.node_id != 0 {
431 bail!("A field is set that is not available on the selected protocol version");
432 }
433 }
434 if version >= 1 {
435 types::CompactString.encode(buf, &self.host)?;
436 } else {
437 if !self.host.is_empty() {
438 bail!("A field is set that is not available on the selected protocol version");
439 }
440 }
441 if version >= 1 {
442 types::UInt16.encode(buf, &self.port)?;
443 } else {
444 if self.port != 0 {
445 bail!("A field is set that is not available on the selected protocol version");
446 }
447 }
448 let num_tagged_fields = self.unknown_tagged_fields.len();
449 if num_tagged_fields > std::u32::MAX as usize {
450 bail!(
451 "Too many tagged fields to encode ({} fields)",
452 num_tagged_fields
453 );
454 }
455 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
456
457 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
458 Ok(())
459 }
460 fn compute_size(&self, version: i16) -> Result<usize> {
461 let mut total_size = 0;
462 if version >= 1 {
463 total_size += types::Int32.compute_size(&self.node_id)?;
464 } else {
465 if self.node_id != 0 {
466 bail!("A field is set that is not available on the selected protocol version");
467 }
468 }
469 if version >= 1 {
470 total_size += types::CompactString.compute_size(&self.host)?;
471 } else {
472 if !self.host.is_empty() {
473 bail!("A field is set that is not available on the selected protocol version");
474 }
475 }
476 if version >= 1 {
477 total_size += types::UInt16.compute_size(&self.port)?;
478 } else {
479 if self.port != 0 {
480 bail!("A field is set that is not available on the selected protocol version");
481 }
482 }
483 let num_tagged_fields = self.unknown_tagged_fields.len();
484 if num_tagged_fields > std::u32::MAX as usize {
485 bail!(
486 "Too many tagged fields to encode ({} fields)",
487 num_tagged_fields
488 );
489 }
490 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
491
492 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
493 Ok(total_size)
494 }
495}
496
497#[cfg(feature = "client")]
498impl Decodable for NodeEndpoint {
499 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
500 if version < 0 || version > 1 {
501 bail!("specified version not supported by this message type");
502 }
503 let node_id = if version >= 1 {
504 types::Int32.decode(buf)?
505 } else {
506 (0).into()
507 };
508 let host = if version >= 1 {
509 types::CompactString.decode(buf)?
510 } else {
511 Default::default()
512 };
513 let port = if version >= 1 {
514 types::UInt16.decode(buf)?
515 } else {
516 0
517 };
518 let mut unknown_tagged_fields = BTreeMap::new();
519 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
520 for _ in 0..num_tagged_fields {
521 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
522 let size: u32 = types::UnsignedVarInt.decode(buf)?;
523 let unknown_value = buf.try_get_bytes(size as usize)?;
524 unknown_tagged_fields.insert(tag as i32, unknown_value);
525 }
526 Ok(Self {
527 node_id,
528 host,
529 port,
530 unknown_tagged_fields,
531 })
532 }
533}
534
535impl Default for NodeEndpoint {
536 fn default() -> Self {
537 Self {
538 node_id: (0).into(),
539 host: Default::default(),
540 port: 0,
541 unknown_tagged_fields: BTreeMap::new(),
542 }
543 }
544}
545
546impl Message for NodeEndpoint {
547 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
548 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
549}
550
551#[non_exhaustive]
553#[derive(Debug, Clone, PartialEq)]
554pub struct PartitionSnapshot {
555 pub index: i32,
559
560 pub error_code: i16,
564
565 pub snapshot_id: SnapshotId,
569
570 pub current_leader: LeaderIdAndEpoch,
574
575 pub size: i64,
579
580 pub position: i64,
584
585 pub unaligned_records: Bytes,
589
590 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
592}
593
594impl PartitionSnapshot {
595 pub fn with_index(mut self, value: i32) -> Self {
601 self.index = value;
602 self
603 }
604 pub fn with_error_code(mut self, value: i16) -> Self {
610 self.error_code = value;
611 self
612 }
613 pub fn with_snapshot_id(mut self, value: SnapshotId) -> Self {
619 self.snapshot_id = value;
620 self
621 }
622 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
628 self.current_leader = value;
629 self
630 }
631 pub fn with_size(mut self, value: i64) -> Self {
637 self.size = value;
638 self
639 }
640 pub fn with_position(mut self, value: i64) -> Self {
646 self.position = value;
647 self
648 }
649 pub fn with_unaligned_records(mut self, value: Bytes) -> Self {
655 self.unaligned_records = value;
656 self
657 }
658 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
660 self.unknown_tagged_fields = value;
661 self
662 }
663 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
665 self.unknown_tagged_fields.insert(key, value);
666 self
667 }
668}
669
670#[cfg(feature = "broker")]
671impl Encodable for PartitionSnapshot {
672 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
673 if version < 0 || version > 1 {
674 bail!("specified version not supported by this message type");
675 }
676 types::Int32.encode(buf, &self.index)?;
677 types::Int16.encode(buf, &self.error_code)?;
678 types::Struct { version }.encode(buf, &self.snapshot_id)?;
679 types::Int64.encode(buf, &self.size)?;
680 types::Int64.encode(buf, &self.position)?;
681 types::CompactBytes.encode(buf, &self.unaligned_records)?;
682 let mut num_tagged_fields = self.unknown_tagged_fields.len();
683 if &self.current_leader != &Default::default() {
684 num_tagged_fields += 1;
685 }
686 if num_tagged_fields > std::u32::MAX as usize {
687 bail!(
688 "Too many tagged fields to encode ({} fields)",
689 num_tagged_fields
690 );
691 }
692 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
693 if &self.current_leader != &Default::default() {
694 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
695 if computed_size > std::u32::MAX as usize {
696 bail!(
697 "Tagged field is too large to encode ({} bytes)",
698 computed_size
699 );
700 }
701 types::UnsignedVarInt.encode(buf, 0)?;
702 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
703 types::Struct { version }.encode(buf, &self.current_leader)?;
704 }
705
706 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
707 Ok(())
708 }
709 fn compute_size(&self, version: i16) -> Result<usize> {
710 let mut total_size = 0;
711 total_size += types::Int32.compute_size(&self.index)?;
712 total_size += types::Int16.compute_size(&self.error_code)?;
713 total_size += types::Struct { version }.compute_size(&self.snapshot_id)?;
714 total_size += types::Int64.compute_size(&self.size)?;
715 total_size += types::Int64.compute_size(&self.position)?;
716 total_size += types::CompactBytes.compute_size(&self.unaligned_records)?;
717 let mut num_tagged_fields = self.unknown_tagged_fields.len();
718 if &self.current_leader != &Default::default() {
719 num_tagged_fields += 1;
720 }
721 if num_tagged_fields > std::u32::MAX as usize {
722 bail!(
723 "Too many tagged fields to encode ({} fields)",
724 num_tagged_fields
725 );
726 }
727 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
728 if &self.current_leader != &Default::default() {
729 let computed_size = types::Struct { version }.compute_size(&self.current_leader)?;
730 if computed_size > std::u32::MAX as usize {
731 bail!(
732 "Tagged field is too large to encode ({} bytes)",
733 computed_size
734 );
735 }
736 total_size += types::UnsignedVarInt.compute_size(0)?;
737 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
738 total_size += computed_size;
739 }
740
741 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
742 Ok(total_size)
743 }
744}
745
746#[cfg(feature = "client")]
747impl Decodable for PartitionSnapshot {
748 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
749 if version < 0 || version > 1 {
750 bail!("specified version not supported by this message type");
751 }
752 let index = types::Int32.decode(buf)?;
753 let error_code = types::Int16.decode(buf)?;
754 let snapshot_id = types::Struct { version }.decode(buf)?;
755 let mut current_leader = Default::default();
756 let size = types::Int64.decode(buf)?;
757 let position = types::Int64.decode(buf)?;
758 let unaligned_records = types::CompactBytes.decode(buf)?;
759 let mut unknown_tagged_fields = BTreeMap::new();
760 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
761 for _ in 0..num_tagged_fields {
762 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
763 let size: u32 = types::UnsignedVarInt.decode(buf)?;
764 match tag {
765 0 => {
766 current_leader = types::Struct { version }.decode(buf)?;
767 }
768 _ => {
769 let unknown_value = buf.try_get_bytes(size as usize)?;
770 unknown_tagged_fields.insert(tag as i32, unknown_value);
771 }
772 }
773 }
774 Ok(Self {
775 index,
776 error_code,
777 snapshot_id,
778 current_leader,
779 size,
780 position,
781 unaligned_records,
782 unknown_tagged_fields,
783 })
784 }
785}
786
787impl Default for PartitionSnapshot {
788 fn default() -> Self {
789 Self {
790 index: 0,
791 error_code: 0,
792 snapshot_id: Default::default(),
793 current_leader: Default::default(),
794 size: 0,
795 position: 0,
796 unaligned_records: Default::default(),
797 unknown_tagged_fields: BTreeMap::new(),
798 }
799 }
800}
801
802impl Message for PartitionSnapshot {
803 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
804 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
805}
806
807#[non_exhaustive]
809#[derive(Debug, Clone, PartialEq)]
810pub struct SnapshotId {
811 pub end_offset: i64,
815
816 pub epoch: i32,
820
821 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
823}
824
825impl SnapshotId {
826 pub fn with_end_offset(mut self, value: i64) -> Self {
832 self.end_offset = value;
833 self
834 }
835 pub fn with_epoch(mut self, value: i32) -> Self {
841 self.epoch = value;
842 self
843 }
844 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
846 self.unknown_tagged_fields = value;
847 self
848 }
849 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
851 self.unknown_tagged_fields.insert(key, value);
852 self
853 }
854}
855
856#[cfg(feature = "broker")]
857impl Encodable for SnapshotId {
858 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
859 if version < 0 || version > 1 {
860 bail!("specified version not supported by this message type");
861 }
862 types::Int64.encode(buf, &self.end_offset)?;
863 types::Int32.encode(buf, &self.epoch)?;
864 let num_tagged_fields = self.unknown_tagged_fields.len();
865 if num_tagged_fields > std::u32::MAX as usize {
866 bail!(
867 "Too many tagged fields to encode ({} fields)",
868 num_tagged_fields
869 );
870 }
871 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
872
873 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
874 Ok(())
875 }
876 fn compute_size(&self, version: i16) -> Result<usize> {
877 let mut total_size = 0;
878 total_size += types::Int64.compute_size(&self.end_offset)?;
879 total_size += types::Int32.compute_size(&self.epoch)?;
880 let num_tagged_fields = self.unknown_tagged_fields.len();
881 if num_tagged_fields > std::u32::MAX as usize {
882 bail!(
883 "Too many tagged fields to encode ({} fields)",
884 num_tagged_fields
885 );
886 }
887 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
888
889 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
890 Ok(total_size)
891 }
892}
893
894#[cfg(feature = "client")]
895impl Decodable for SnapshotId {
896 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
897 if version < 0 || version > 1 {
898 bail!("specified version not supported by this message type");
899 }
900 let end_offset = types::Int64.decode(buf)?;
901 let epoch = types::Int32.decode(buf)?;
902 let mut unknown_tagged_fields = BTreeMap::new();
903 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
904 for _ in 0..num_tagged_fields {
905 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
906 let size: u32 = types::UnsignedVarInt.decode(buf)?;
907 let unknown_value = buf.try_get_bytes(size as usize)?;
908 unknown_tagged_fields.insert(tag as i32, unknown_value);
909 }
910 Ok(Self {
911 end_offset,
912 epoch,
913 unknown_tagged_fields,
914 })
915 }
916}
917
918impl Default for SnapshotId {
919 fn default() -> Self {
920 Self {
921 end_offset: 0,
922 epoch: 0,
923 unknown_tagged_fields: BTreeMap::new(),
924 }
925 }
926}
927
928impl Message for SnapshotId {
929 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
930 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
931}
932
933#[non_exhaustive]
935#[derive(Debug, Clone, PartialEq)]
936pub struct TopicSnapshot {
937 pub name: super::TopicName,
941
942 pub partitions: Vec<PartitionSnapshot>,
946
947 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
949}
950
951impl TopicSnapshot {
952 pub fn with_name(mut self, value: super::TopicName) -> Self {
958 self.name = value;
959 self
960 }
961 pub fn with_partitions(mut self, value: Vec<PartitionSnapshot>) -> Self {
967 self.partitions = value;
968 self
969 }
970 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
972 self.unknown_tagged_fields = value;
973 self
974 }
975 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
977 self.unknown_tagged_fields.insert(key, value);
978 self
979 }
980}
981
982#[cfg(feature = "broker")]
983impl Encodable for TopicSnapshot {
984 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
985 if version < 0 || version > 1 {
986 bail!("specified version not supported by this message type");
987 }
988 types::CompactString.encode(buf, &self.name)?;
989 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
990 let num_tagged_fields = self.unknown_tagged_fields.len();
991 if num_tagged_fields > std::u32::MAX as usize {
992 bail!(
993 "Too many tagged fields to encode ({} fields)",
994 num_tagged_fields
995 );
996 }
997 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
998
999 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1000 Ok(())
1001 }
1002 fn compute_size(&self, version: i16) -> Result<usize> {
1003 let mut total_size = 0;
1004 total_size += types::CompactString.compute_size(&self.name)?;
1005 total_size +=
1006 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
1007 let num_tagged_fields = self.unknown_tagged_fields.len();
1008 if num_tagged_fields > std::u32::MAX as usize {
1009 bail!(
1010 "Too many tagged fields to encode ({} fields)",
1011 num_tagged_fields
1012 );
1013 }
1014 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1015
1016 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1017 Ok(total_size)
1018 }
1019}
1020
1021#[cfg(feature = "client")]
1022impl Decodable for TopicSnapshot {
1023 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1024 if version < 0 || version > 1 {
1025 bail!("specified version not supported by this message type");
1026 }
1027 let name = types::CompactString.decode(buf)?;
1028 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
1029 let mut unknown_tagged_fields = BTreeMap::new();
1030 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1031 for _ in 0..num_tagged_fields {
1032 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1033 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1034 let unknown_value = buf.try_get_bytes(size as usize)?;
1035 unknown_tagged_fields.insert(tag as i32, unknown_value);
1036 }
1037 Ok(Self {
1038 name,
1039 partitions,
1040 unknown_tagged_fields,
1041 })
1042 }
1043}
1044
1045impl Default for TopicSnapshot {
1046 fn default() -> Self {
1047 Self {
1048 name: Default::default(),
1049 partitions: Default::default(),
1050 unknown_tagged_fields: BTreeMap::new(),
1051 }
1052 }
1053}
1054
1055impl Message for TopicSnapshot {
1056 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
1057 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1058}
1059
1060impl HeaderVersion for FetchSnapshotResponse {
1061 fn header_version(version: i16) -> i16 {
1062 1
1063 }
1064}