1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AcquiredRecords {
24 pub first_offset: i64,
28
29 pub last_offset: i64,
33
34 pub delivery_count: i16,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl AcquiredRecords {
44 pub fn with_first_offset(mut self, value: i64) -> Self {
50 self.first_offset = value;
51 self
52 }
53 pub fn with_last_offset(mut self, value: i64) -> Self {
59 self.last_offset = value;
60 self
61 }
62 pub fn with_delivery_count(mut self, value: i16) -> Self {
68 self.delivery_count = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for AcquiredRecords {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version != 1 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int64.encode(buf, &self.first_offset)?;
90 types::Int64.encode(buf, &self.last_offset)?;
91 types::Int16.encode(buf, &self.delivery_count)?;
92 let num_tagged_fields = self.unknown_tagged_fields.len();
93 if num_tagged_fields > std::u32::MAX as usize {
94 bail!(
95 "Too many tagged fields to encode ({} fields)",
96 num_tagged_fields
97 );
98 }
99 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
100
101 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
102 Ok(())
103 }
104 fn compute_size(&self, version: i16) -> Result<usize> {
105 let mut total_size = 0;
106 total_size += types::Int64.compute_size(&self.first_offset)?;
107 total_size += types::Int64.compute_size(&self.last_offset)?;
108 total_size += types::Int16.compute_size(&self.delivery_count)?;
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 Ok(total_size)
120 }
121}
122
123#[cfg(feature = "client")]
124impl Decodable for AcquiredRecords {
125 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
126 if version != 1 {
127 bail!("specified version not supported by this message type");
128 }
129 let first_offset = types::Int64.decode(buf)?;
130 let last_offset = types::Int64.decode(buf)?;
131 let delivery_count = types::Int16.decode(buf)?;
132 let mut unknown_tagged_fields = BTreeMap::new();
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 Ok(Self {
141 first_offset,
142 last_offset,
143 delivery_count,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for AcquiredRecords {
150 fn default() -> Self {
151 Self {
152 first_offset: 0,
153 last_offset: 0,
154 delivery_count: 0,
155 unknown_tagged_fields: BTreeMap::new(),
156 }
157 }
158}
159
160impl Message for AcquiredRecords {
161 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
162 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
163}
164
165#[non_exhaustive]
167#[derive(Debug, Clone, PartialEq)]
168pub struct LeaderIdAndEpoch {
169 pub leader_id: i32,
173
174 pub leader_epoch: i32,
178
179 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
181}
182
183impl LeaderIdAndEpoch {
184 pub fn with_leader_id(mut self, value: i32) -> Self {
190 self.leader_id = value;
191 self
192 }
193 pub fn with_leader_epoch(mut self, value: i32) -> Self {
199 self.leader_epoch = value;
200 self
201 }
202 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
204 self.unknown_tagged_fields = value;
205 self
206 }
207 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
209 self.unknown_tagged_fields.insert(key, value);
210 self
211 }
212}
213
214#[cfg(feature = "broker")]
215impl Encodable for LeaderIdAndEpoch {
216 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
217 if version != 1 {
218 bail!("specified version not supported by this message type");
219 }
220 types::Int32.encode(buf, &self.leader_id)?;
221 types::Int32.encode(buf, &self.leader_epoch)?;
222 let num_tagged_fields = self.unknown_tagged_fields.len();
223 if num_tagged_fields > std::u32::MAX as usize {
224 bail!(
225 "Too many tagged fields to encode ({} fields)",
226 num_tagged_fields
227 );
228 }
229 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
230
231 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
232 Ok(())
233 }
234 fn compute_size(&self, version: i16) -> Result<usize> {
235 let mut total_size = 0;
236 total_size += types::Int32.compute_size(&self.leader_id)?;
237 total_size += types::Int32.compute_size(&self.leader_epoch)?;
238 let num_tagged_fields = self.unknown_tagged_fields.len();
239 if num_tagged_fields > std::u32::MAX as usize {
240 bail!(
241 "Too many tagged fields to encode ({} fields)",
242 num_tagged_fields
243 );
244 }
245 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
246
247 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
248 Ok(total_size)
249 }
250}
251
252#[cfg(feature = "client")]
253impl Decodable for LeaderIdAndEpoch {
254 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
255 if version != 1 {
256 bail!("specified version not supported by this message type");
257 }
258 let leader_id = types::Int32.decode(buf)?;
259 let leader_epoch = types::Int32.decode(buf)?;
260 let mut unknown_tagged_fields = BTreeMap::new();
261 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
262 for _ in 0..num_tagged_fields {
263 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
264 let size: u32 = types::UnsignedVarInt.decode(buf)?;
265 let unknown_value = buf.try_get_bytes(size as usize)?;
266 unknown_tagged_fields.insert(tag as i32, unknown_value);
267 }
268 Ok(Self {
269 leader_id,
270 leader_epoch,
271 unknown_tagged_fields,
272 })
273 }
274}
275
276impl Default for LeaderIdAndEpoch {
277 fn default() -> Self {
278 Self {
279 leader_id: 0,
280 leader_epoch: 0,
281 unknown_tagged_fields: BTreeMap::new(),
282 }
283 }
284}
285
286impl Message for LeaderIdAndEpoch {
287 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
288 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
289}
290
291#[non_exhaustive]
293#[derive(Debug, Clone, PartialEq)]
294pub struct NodeEndpoint {
295 pub node_id: super::BrokerId,
299
300 pub host: StrBytes,
304
305 pub port: i32,
309
310 pub rack: Option<StrBytes>,
314
315 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
317}
318
319impl NodeEndpoint {
320 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
326 self.node_id = value;
327 self
328 }
329 pub fn with_host(mut self, value: StrBytes) -> Self {
335 self.host = value;
336 self
337 }
338 pub fn with_port(mut self, value: i32) -> Self {
344 self.port = value;
345 self
346 }
347 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
353 self.rack = value;
354 self
355 }
356 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
358 self.unknown_tagged_fields = value;
359 self
360 }
361 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
363 self.unknown_tagged_fields.insert(key, value);
364 self
365 }
366}
367
368#[cfg(feature = "broker")]
369impl Encodable for NodeEndpoint {
370 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
371 if version != 1 {
372 bail!("specified version not supported by this message type");
373 }
374 types::Int32.encode(buf, &self.node_id)?;
375 types::CompactString.encode(buf, &self.host)?;
376 types::Int32.encode(buf, &self.port)?;
377 types::CompactString.encode(buf, &self.rack)?;
378 let num_tagged_fields = self.unknown_tagged_fields.len();
379 if num_tagged_fields > std::u32::MAX as usize {
380 bail!(
381 "Too many tagged fields to encode ({} fields)",
382 num_tagged_fields
383 );
384 }
385 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
386
387 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
388 Ok(())
389 }
390 fn compute_size(&self, version: i16) -> Result<usize> {
391 let mut total_size = 0;
392 total_size += types::Int32.compute_size(&self.node_id)?;
393 total_size += types::CompactString.compute_size(&self.host)?;
394 total_size += types::Int32.compute_size(&self.port)?;
395 total_size += types::CompactString.compute_size(&self.rack)?;
396 let num_tagged_fields = self.unknown_tagged_fields.len();
397 if num_tagged_fields > std::u32::MAX as usize {
398 bail!(
399 "Too many tagged fields to encode ({} fields)",
400 num_tagged_fields
401 );
402 }
403 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
404
405 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
406 Ok(total_size)
407 }
408}
409
410#[cfg(feature = "client")]
411impl Decodable for NodeEndpoint {
412 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
413 if version != 1 {
414 bail!("specified version not supported by this message type");
415 }
416 let node_id = types::Int32.decode(buf)?;
417 let host = types::CompactString.decode(buf)?;
418 let port = types::Int32.decode(buf)?;
419 let rack = types::CompactString.decode(buf)?;
420 let mut unknown_tagged_fields = BTreeMap::new();
421 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
422 for _ in 0..num_tagged_fields {
423 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
424 let size: u32 = types::UnsignedVarInt.decode(buf)?;
425 let unknown_value = buf.try_get_bytes(size as usize)?;
426 unknown_tagged_fields.insert(tag as i32, unknown_value);
427 }
428 Ok(Self {
429 node_id,
430 host,
431 port,
432 rack,
433 unknown_tagged_fields,
434 })
435 }
436}
437
438impl Default for NodeEndpoint {
439 fn default() -> Self {
440 Self {
441 node_id: (0).into(),
442 host: Default::default(),
443 port: 0,
444 rack: None,
445 unknown_tagged_fields: BTreeMap::new(),
446 }
447 }
448}
449
450impl Message for NodeEndpoint {
451 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
452 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
453}
454
455#[non_exhaustive]
457#[derive(Debug, Clone, PartialEq)]
458pub struct PartitionData {
459 pub partition_index: i32,
463
464 pub error_code: i16,
468
469 pub error_message: Option<StrBytes>,
473
474 pub acknowledge_error_code: i16,
478
479 pub acknowledge_error_message: Option<StrBytes>,
483
484 pub current_leader: LeaderIdAndEpoch,
488
489 pub records: Option<Bytes>,
493
494 pub acquired_records: Vec<AcquiredRecords>,
498
499 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
501}
502
503impl PartitionData {
504 pub fn with_partition_index(mut self, value: i32) -> Self {
510 self.partition_index = value;
511 self
512 }
513 pub fn with_error_code(mut self, value: i16) -> Self {
519 self.error_code = value;
520 self
521 }
522 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
528 self.error_message = value;
529 self
530 }
531 pub fn with_acknowledge_error_code(mut self, value: i16) -> Self {
537 self.acknowledge_error_code = value;
538 self
539 }
540 pub fn with_acknowledge_error_message(mut self, value: Option<StrBytes>) -> Self {
546 self.acknowledge_error_message = value;
547 self
548 }
549 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
555 self.current_leader = value;
556 self
557 }
558 pub fn with_records(mut self, value: Option<Bytes>) -> Self {
564 self.records = value;
565 self
566 }
567 pub fn with_acquired_records(mut self, value: Vec<AcquiredRecords>) -> Self {
573 self.acquired_records = value;
574 self
575 }
576 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
578 self.unknown_tagged_fields = value;
579 self
580 }
581 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
583 self.unknown_tagged_fields.insert(key, value);
584 self
585 }
586}
587
588#[cfg(feature = "broker")]
589impl Encodable for PartitionData {
590 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
591 if version != 1 {
592 bail!("specified version not supported by this message type");
593 }
594 types::Int32.encode(buf, &self.partition_index)?;
595 types::Int16.encode(buf, &self.error_code)?;
596 types::CompactString.encode(buf, &self.error_message)?;
597 types::Int16.encode(buf, &self.acknowledge_error_code)?;
598 types::CompactString.encode(buf, &self.acknowledge_error_message)?;
599 types::Struct { version }.encode(buf, &self.current_leader)?;
600 types::CompactBytes.encode(buf, &self.records)?;
601 types::CompactArray(types::Struct { version }).encode(buf, &self.acquired_records)?;
602 let num_tagged_fields = self.unknown_tagged_fields.len();
603 if num_tagged_fields > std::u32::MAX as usize {
604 bail!(
605 "Too many tagged fields to encode ({} fields)",
606 num_tagged_fields
607 );
608 }
609 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
610
611 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
612 Ok(())
613 }
614 fn compute_size(&self, version: i16) -> Result<usize> {
615 let mut total_size = 0;
616 total_size += types::Int32.compute_size(&self.partition_index)?;
617 total_size += types::Int16.compute_size(&self.error_code)?;
618 total_size += types::CompactString.compute_size(&self.error_message)?;
619 total_size += types::Int16.compute_size(&self.acknowledge_error_code)?;
620 total_size += types::CompactString.compute_size(&self.acknowledge_error_message)?;
621 total_size += types::Struct { version }.compute_size(&self.current_leader)?;
622 total_size += types::CompactBytes.compute_size(&self.records)?;
623 total_size +=
624 types::CompactArray(types::Struct { version }).compute_size(&self.acquired_records)?;
625 let num_tagged_fields = self.unknown_tagged_fields.len();
626 if num_tagged_fields > std::u32::MAX as usize {
627 bail!(
628 "Too many tagged fields to encode ({} fields)",
629 num_tagged_fields
630 );
631 }
632 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
633
634 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
635 Ok(total_size)
636 }
637}
638
639#[cfg(feature = "client")]
640impl Decodable for PartitionData {
641 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
642 if version != 1 {
643 bail!("specified version not supported by this message type");
644 }
645 let partition_index = types::Int32.decode(buf)?;
646 let error_code = types::Int16.decode(buf)?;
647 let error_message = types::CompactString.decode(buf)?;
648 let acknowledge_error_code = types::Int16.decode(buf)?;
649 let acknowledge_error_message = types::CompactString.decode(buf)?;
650 let current_leader = types::Struct { version }.decode(buf)?;
651 let records = types::CompactBytes.decode(buf)?;
652 let acquired_records = types::CompactArray(types::Struct { version }).decode(buf)?;
653 let mut unknown_tagged_fields = BTreeMap::new();
654 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
655 for _ in 0..num_tagged_fields {
656 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
657 let size: u32 = types::UnsignedVarInt.decode(buf)?;
658 let unknown_value = buf.try_get_bytes(size as usize)?;
659 unknown_tagged_fields.insert(tag as i32, unknown_value);
660 }
661 Ok(Self {
662 partition_index,
663 error_code,
664 error_message,
665 acknowledge_error_code,
666 acknowledge_error_message,
667 current_leader,
668 records,
669 acquired_records,
670 unknown_tagged_fields,
671 })
672 }
673}
674
675impl Default for PartitionData {
676 fn default() -> Self {
677 Self {
678 partition_index: 0,
679 error_code: 0,
680 error_message: None,
681 acknowledge_error_code: 0,
682 acknowledge_error_message: None,
683 current_leader: Default::default(),
684 records: Some(Default::default()),
685 acquired_records: Default::default(),
686 unknown_tagged_fields: BTreeMap::new(),
687 }
688 }
689}
690
691impl Message for PartitionData {
692 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
693 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
694}
695
696#[non_exhaustive]
698#[derive(Debug, Clone, PartialEq)]
699pub struct ShareFetchResponse {
700 pub throttle_time_ms: i32,
704
705 pub error_code: i16,
709
710 pub error_message: Option<StrBytes>,
714
715 pub acquisition_lock_timeout_ms: i32,
719
720 pub responses: Vec<ShareFetchableTopicResponse>,
724
725 pub node_endpoints: Vec<NodeEndpoint>,
729
730 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
732}
733
734impl ShareFetchResponse {
735 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
741 self.throttle_time_ms = value;
742 self
743 }
744 pub fn with_error_code(mut self, value: i16) -> Self {
750 self.error_code = value;
751 self
752 }
753 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
759 self.error_message = value;
760 self
761 }
762 pub fn with_acquisition_lock_timeout_ms(mut self, value: i32) -> Self {
768 self.acquisition_lock_timeout_ms = value;
769 self
770 }
771 pub fn with_responses(mut self, value: Vec<ShareFetchableTopicResponse>) -> Self {
777 self.responses = value;
778 self
779 }
780 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
786 self.node_endpoints = value;
787 self
788 }
789 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
791 self.unknown_tagged_fields = value;
792 self
793 }
794 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
796 self.unknown_tagged_fields.insert(key, value);
797 self
798 }
799}
800
801#[cfg(feature = "broker")]
802impl Encodable for ShareFetchResponse {
803 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
804 if version != 1 {
805 bail!("specified version not supported by this message type");
806 }
807 types::Int32.encode(buf, &self.throttle_time_ms)?;
808 types::Int16.encode(buf, &self.error_code)?;
809 types::CompactString.encode(buf, &self.error_message)?;
810 types::Int32.encode(buf, &self.acquisition_lock_timeout_ms)?;
811 types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
812 types::CompactArray(types::Struct { version }).encode(buf, &self.node_endpoints)?;
813 let num_tagged_fields = self.unknown_tagged_fields.len();
814 if num_tagged_fields > std::u32::MAX as usize {
815 bail!(
816 "Too many tagged fields to encode ({} fields)",
817 num_tagged_fields
818 );
819 }
820 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
821
822 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
823 Ok(())
824 }
825 fn compute_size(&self, version: i16) -> Result<usize> {
826 let mut total_size = 0;
827 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
828 total_size += types::Int16.compute_size(&self.error_code)?;
829 total_size += types::CompactString.compute_size(&self.error_message)?;
830 total_size += types::Int32.compute_size(&self.acquisition_lock_timeout_ms)?;
831 total_size +=
832 types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
833 total_size +=
834 types::CompactArray(types::Struct { version }).compute_size(&self.node_endpoints)?;
835 let num_tagged_fields = self.unknown_tagged_fields.len();
836 if num_tagged_fields > std::u32::MAX as usize {
837 bail!(
838 "Too many tagged fields to encode ({} fields)",
839 num_tagged_fields
840 );
841 }
842 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
843
844 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
845 Ok(total_size)
846 }
847}
848
849#[cfg(feature = "client")]
850impl Decodable for ShareFetchResponse {
851 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
852 if version != 1 {
853 bail!("specified version not supported by this message type");
854 }
855 let throttle_time_ms = types::Int32.decode(buf)?;
856 let error_code = types::Int16.decode(buf)?;
857 let error_message = types::CompactString.decode(buf)?;
858 let acquisition_lock_timeout_ms = types::Int32.decode(buf)?;
859 let responses = types::CompactArray(types::Struct { version }).decode(buf)?;
860 let node_endpoints = types::CompactArray(types::Struct { version }).decode(buf)?;
861 let mut unknown_tagged_fields = BTreeMap::new();
862 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
863 for _ in 0..num_tagged_fields {
864 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
865 let size: u32 = types::UnsignedVarInt.decode(buf)?;
866 let unknown_value = buf.try_get_bytes(size as usize)?;
867 unknown_tagged_fields.insert(tag as i32, unknown_value);
868 }
869 Ok(Self {
870 throttle_time_ms,
871 error_code,
872 error_message,
873 acquisition_lock_timeout_ms,
874 responses,
875 node_endpoints,
876 unknown_tagged_fields,
877 })
878 }
879}
880
881impl Default for ShareFetchResponse {
882 fn default() -> Self {
883 Self {
884 throttle_time_ms: 0,
885 error_code: 0,
886 error_message: None,
887 acquisition_lock_timeout_ms: 0,
888 responses: Default::default(),
889 node_endpoints: Default::default(),
890 unknown_tagged_fields: BTreeMap::new(),
891 }
892 }
893}
894
895impl Message for ShareFetchResponse {
896 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
897 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
898}
899
900#[non_exhaustive]
902#[derive(Debug, Clone, PartialEq)]
903pub struct ShareFetchableTopicResponse {
904 pub topic_id: Uuid,
908
909 pub partitions: Vec<PartitionData>,
913
914 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
916}
917
918impl ShareFetchableTopicResponse {
919 pub fn with_topic_id(mut self, value: Uuid) -> Self {
925 self.topic_id = value;
926 self
927 }
928 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
934 self.partitions = value;
935 self
936 }
937 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
939 self.unknown_tagged_fields = value;
940 self
941 }
942 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
944 self.unknown_tagged_fields.insert(key, value);
945 self
946 }
947}
948
949#[cfg(feature = "broker")]
950impl Encodable for ShareFetchableTopicResponse {
951 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
952 if version != 1 {
953 bail!("specified version not supported by this message type");
954 }
955 types::Uuid.encode(buf, &self.topic_id)?;
956 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
957 let num_tagged_fields = self.unknown_tagged_fields.len();
958 if num_tagged_fields > std::u32::MAX as usize {
959 bail!(
960 "Too many tagged fields to encode ({} fields)",
961 num_tagged_fields
962 );
963 }
964 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
965
966 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
967 Ok(())
968 }
969 fn compute_size(&self, version: i16) -> Result<usize> {
970 let mut total_size = 0;
971 total_size += types::Uuid.compute_size(&self.topic_id)?;
972 total_size +=
973 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
974 let num_tagged_fields = self.unknown_tagged_fields.len();
975 if num_tagged_fields > std::u32::MAX as usize {
976 bail!(
977 "Too many tagged fields to encode ({} fields)",
978 num_tagged_fields
979 );
980 }
981 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
982
983 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
984 Ok(total_size)
985 }
986}
987
988#[cfg(feature = "client")]
989impl Decodable for ShareFetchableTopicResponse {
990 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
991 if version != 1 {
992 bail!("specified version not supported by this message type");
993 }
994 let topic_id = types::Uuid.decode(buf)?;
995 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
996 let mut unknown_tagged_fields = BTreeMap::new();
997 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
998 for _ in 0..num_tagged_fields {
999 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1000 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1001 let unknown_value = buf.try_get_bytes(size as usize)?;
1002 unknown_tagged_fields.insert(tag as i32, unknown_value);
1003 }
1004 Ok(Self {
1005 topic_id,
1006 partitions,
1007 unknown_tagged_fields,
1008 })
1009 }
1010}
1011
1012impl Default for ShareFetchableTopicResponse {
1013 fn default() -> Self {
1014 Self {
1015 topic_id: Uuid::nil(),
1016 partitions: Default::default(),
1017 unknown_tagged_fields: BTreeMap::new(),
1018 }
1019 }
1020}
1021
1022impl Message for ShareFetchableTopicResponse {
1023 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
1024 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1025}
1026
1027impl HeaderVersion for ShareFetchResponse {
1028 fn header_version(version: i16) -> i16 {
1029 1
1030 }
1031}