1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderIdAndEpoch {
24 pub leader_id: i32,
28
29 pub leader_epoch: i32,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl LeaderIdAndEpoch {
39 pub fn with_leader_id(mut self, value: i32) -> Self {
45 self.leader_id = value;
46 self
47 }
48 pub fn with_leader_epoch(mut self, value: i32) -> Self {
54 self.leader_epoch = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for LeaderIdAndEpoch {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version != 1 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.leader_id)?;
76 types::Int32.encode(buf, &self.leader_epoch)?;
77 let num_tagged_fields = self.unknown_tagged_fields.len();
78 if num_tagged_fields > std::u32::MAX as usize {
79 bail!(
80 "Too many tagged fields to encode ({} fields)",
81 num_tagged_fields
82 );
83 }
84 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
85
86 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
87 Ok(())
88 }
89 fn compute_size(&self, version: i16) -> Result<usize> {
90 let mut total_size = 0;
91 total_size += types::Int32.compute_size(&self.leader_id)?;
92 total_size += types::Int32.compute_size(&self.leader_epoch)?;
93 let num_tagged_fields = self.unknown_tagged_fields.len();
94 if num_tagged_fields > std::u32::MAX as usize {
95 bail!(
96 "Too many tagged fields to encode ({} fields)",
97 num_tagged_fields
98 );
99 }
100 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
101
102 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
103 Ok(total_size)
104 }
105}
106
107#[cfg(feature = "client")]
108impl Decodable for LeaderIdAndEpoch {
109 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
110 if version != 1 {
111 bail!("specified version not supported by this message type");
112 }
113 let leader_id = types::Int32.decode(buf)?;
114 let leader_epoch = types::Int32.decode(buf)?;
115 let mut unknown_tagged_fields = BTreeMap::new();
116 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
117 for _ in 0..num_tagged_fields {
118 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
119 let size: u32 = types::UnsignedVarInt.decode(buf)?;
120 let unknown_value = buf.try_get_bytes(size as usize)?;
121 unknown_tagged_fields.insert(tag as i32, unknown_value);
122 }
123 Ok(Self {
124 leader_id,
125 leader_epoch,
126 unknown_tagged_fields,
127 })
128 }
129}
130
131impl Default for LeaderIdAndEpoch {
132 fn default() -> Self {
133 Self {
134 leader_id: 0,
135 leader_epoch: 0,
136 unknown_tagged_fields: BTreeMap::new(),
137 }
138 }
139}
140
141impl Message for LeaderIdAndEpoch {
142 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
143 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
144}
145
146#[non_exhaustive]
148#[derive(Debug, Clone, PartialEq)]
149pub struct NodeEndpoint {
150 pub node_id: super::BrokerId,
154
155 pub host: StrBytes,
159
160 pub port: i32,
164
165 pub rack: Option<StrBytes>,
169
170 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
172}
173
174impl NodeEndpoint {
175 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
181 self.node_id = value;
182 self
183 }
184 pub fn with_host(mut self, value: StrBytes) -> Self {
190 self.host = value;
191 self
192 }
193 pub fn with_port(mut self, value: i32) -> Self {
199 self.port = value;
200 self
201 }
202 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
208 self.rack = value;
209 self
210 }
211 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
213 self.unknown_tagged_fields = value;
214 self
215 }
216 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
218 self.unknown_tagged_fields.insert(key, value);
219 self
220 }
221}
222
223#[cfg(feature = "broker")]
224impl Encodable for NodeEndpoint {
225 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
226 if version != 1 {
227 bail!("specified version not supported by this message type");
228 }
229 types::Int32.encode(buf, &self.node_id)?;
230 types::CompactString.encode(buf, &self.host)?;
231 types::Int32.encode(buf, &self.port)?;
232 types::CompactString.encode(buf, &self.rack)?;
233 let num_tagged_fields = self.unknown_tagged_fields.len();
234 if num_tagged_fields > std::u32::MAX as usize {
235 bail!(
236 "Too many tagged fields to encode ({} fields)",
237 num_tagged_fields
238 );
239 }
240 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
241
242 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
243 Ok(())
244 }
245 fn compute_size(&self, version: i16) -> Result<usize> {
246 let mut total_size = 0;
247 total_size += types::Int32.compute_size(&self.node_id)?;
248 total_size += types::CompactString.compute_size(&self.host)?;
249 total_size += types::Int32.compute_size(&self.port)?;
250 total_size += types::CompactString.compute_size(&self.rack)?;
251 let num_tagged_fields = self.unknown_tagged_fields.len();
252 if num_tagged_fields > std::u32::MAX as usize {
253 bail!(
254 "Too many tagged fields to encode ({} fields)",
255 num_tagged_fields
256 );
257 }
258 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
259
260 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
261 Ok(total_size)
262 }
263}
264
265#[cfg(feature = "client")]
266impl Decodable for NodeEndpoint {
267 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
268 if version != 1 {
269 bail!("specified version not supported by this message type");
270 }
271 let node_id = types::Int32.decode(buf)?;
272 let host = types::CompactString.decode(buf)?;
273 let port = types::Int32.decode(buf)?;
274 let rack = types::CompactString.decode(buf)?;
275 let mut unknown_tagged_fields = BTreeMap::new();
276 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
277 for _ in 0..num_tagged_fields {
278 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
279 let size: u32 = types::UnsignedVarInt.decode(buf)?;
280 let unknown_value = buf.try_get_bytes(size as usize)?;
281 unknown_tagged_fields.insert(tag as i32, unknown_value);
282 }
283 Ok(Self {
284 node_id,
285 host,
286 port,
287 rack,
288 unknown_tagged_fields,
289 })
290 }
291}
292
293impl Default for NodeEndpoint {
294 fn default() -> Self {
295 Self {
296 node_id: (0).into(),
297 host: Default::default(),
298 port: 0,
299 rack: None,
300 unknown_tagged_fields: BTreeMap::new(),
301 }
302 }
303}
304
305impl Message for NodeEndpoint {
306 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
307 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
308}
309
310#[non_exhaustive]
312#[derive(Debug, Clone, PartialEq)]
313pub struct PartitionData {
314 pub partition_index: i32,
318
319 pub error_code: i16,
323
324 pub error_message: Option<StrBytes>,
328
329 pub current_leader: LeaderIdAndEpoch,
333
334 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
336}
337
338impl PartitionData {
339 pub fn with_partition_index(mut self, value: i32) -> Self {
345 self.partition_index = value;
346 self
347 }
348 pub fn with_error_code(mut self, value: i16) -> Self {
354 self.error_code = value;
355 self
356 }
357 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
363 self.error_message = value;
364 self
365 }
366 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
372 self.current_leader = value;
373 self
374 }
375 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
377 self.unknown_tagged_fields = value;
378 self
379 }
380 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
382 self.unknown_tagged_fields.insert(key, value);
383 self
384 }
385}
386
387#[cfg(feature = "broker")]
388impl Encodable for PartitionData {
389 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
390 if version != 1 {
391 bail!("specified version not supported by this message type");
392 }
393 types::Int32.encode(buf, &self.partition_index)?;
394 types::Int16.encode(buf, &self.error_code)?;
395 types::CompactString.encode(buf, &self.error_message)?;
396 types::Struct { version }.encode(buf, &self.current_leader)?;
397 let num_tagged_fields = self.unknown_tagged_fields.len();
398 if num_tagged_fields > std::u32::MAX as usize {
399 bail!(
400 "Too many tagged fields to encode ({} fields)",
401 num_tagged_fields
402 );
403 }
404 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
405
406 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
407 Ok(())
408 }
409 fn compute_size(&self, version: i16) -> Result<usize> {
410 let mut total_size = 0;
411 total_size += types::Int32.compute_size(&self.partition_index)?;
412 total_size += types::Int16.compute_size(&self.error_code)?;
413 total_size += types::CompactString.compute_size(&self.error_message)?;
414 total_size += types::Struct { version }.compute_size(&self.current_leader)?;
415 let num_tagged_fields = self.unknown_tagged_fields.len();
416 if num_tagged_fields > std::u32::MAX as usize {
417 bail!(
418 "Too many tagged fields to encode ({} fields)",
419 num_tagged_fields
420 );
421 }
422 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
423
424 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
425 Ok(total_size)
426 }
427}
428
429#[cfg(feature = "client")]
430impl Decodable for PartitionData {
431 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
432 if version != 1 {
433 bail!("specified version not supported by this message type");
434 }
435 let partition_index = types::Int32.decode(buf)?;
436 let error_code = types::Int16.decode(buf)?;
437 let error_message = types::CompactString.decode(buf)?;
438 let current_leader = types::Struct { version }.decode(buf)?;
439 let mut unknown_tagged_fields = BTreeMap::new();
440 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
441 for _ in 0..num_tagged_fields {
442 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
443 let size: u32 = types::UnsignedVarInt.decode(buf)?;
444 let unknown_value = buf.try_get_bytes(size as usize)?;
445 unknown_tagged_fields.insert(tag as i32, unknown_value);
446 }
447 Ok(Self {
448 partition_index,
449 error_code,
450 error_message,
451 current_leader,
452 unknown_tagged_fields,
453 })
454 }
455}
456
457impl Default for PartitionData {
458 fn default() -> Self {
459 Self {
460 partition_index: 0,
461 error_code: 0,
462 error_message: None,
463 current_leader: Default::default(),
464 unknown_tagged_fields: BTreeMap::new(),
465 }
466 }
467}
468
469impl Message for PartitionData {
470 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
471 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
472}
473
474#[non_exhaustive]
476#[derive(Debug, Clone, PartialEq)]
477pub struct ShareAcknowledgeResponse {
478 pub throttle_time_ms: i32,
482
483 pub error_code: i16,
487
488 pub error_message: Option<StrBytes>,
492
493 pub responses: Vec<ShareAcknowledgeTopicResponse>,
497
498 pub node_endpoints: Vec<NodeEndpoint>,
502
503 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
505}
506
507impl ShareAcknowledgeResponse {
508 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
514 self.throttle_time_ms = value;
515 self
516 }
517 pub fn with_error_code(mut self, value: i16) -> Self {
523 self.error_code = value;
524 self
525 }
526 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
532 self.error_message = value;
533 self
534 }
535 pub fn with_responses(mut self, value: Vec<ShareAcknowledgeTopicResponse>) -> Self {
541 self.responses = value;
542 self
543 }
544 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
550 self.node_endpoints = value;
551 self
552 }
553 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
555 self.unknown_tagged_fields = value;
556 self
557 }
558 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
560 self.unknown_tagged_fields.insert(key, value);
561 self
562 }
563}
564
565#[cfg(feature = "broker")]
566impl Encodable for ShareAcknowledgeResponse {
567 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
568 if version != 1 {
569 bail!("specified version not supported by this message type");
570 }
571 types::Int32.encode(buf, &self.throttle_time_ms)?;
572 types::Int16.encode(buf, &self.error_code)?;
573 types::CompactString.encode(buf, &self.error_message)?;
574 types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
575 types::CompactArray(types::Struct { version }).encode(buf, &self.node_endpoints)?;
576 let num_tagged_fields = self.unknown_tagged_fields.len();
577 if num_tagged_fields > std::u32::MAX as usize {
578 bail!(
579 "Too many tagged fields to encode ({} fields)",
580 num_tagged_fields
581 );
582 }
583 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
584
585 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
586 Ok(())
587 }
588 fn compute_size(&self, version: i16) -> Result<usize> {
589 let mut total_size = 0;
590 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
591 total_size += types::Int16.compute_size(&self.error_code)?;
592 total_size += types::CompactString.compute_size(&self.error_message)?;
593 total_size +=
594 types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
595 total_size +=
596 types::CompactArray(types::Struct { version }).compute_size(&self.node_endpoints)?;
597 let num_tagged_fields = self.unknown_tagged_fields.len();
598 if num_tagged_fields > std::u32::MAX as usize {
599 bail!(
600 "Too many tagged fields to encode ({} fields)",
601 num_tagged_fields
602 );
603 }
604 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
605
606 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
607 Ok(total_size)
608 }
609}
610
611#[cfg(feature = "client")]
612impl Decodable for ShareAcknowledgeResponse {
613 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
614 if version != 1 {
615 bail!("specified version not supported by this message type");
616 }
617 let throttle_time_ms = types::Int32.decode(buf)?;
618 let error_code = types::Int16.decode(buf)?;
619 let error_message = types::CompactString.decode(buf)?;
620 let responses = types::CompactArray(types::Struct { version }).decode(buf)?;
621 let node_endpoints = types::CompactArray(types::Struct { version }).decode(buf)?;
622 let mut unknown_tagged_fields = BTreeMap::new();
623 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
624 for _ in 0..num_tagged_fields {
625 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
626 let size: u32 = types::UnsignedVarInt.decode(buf)?;
627 let unknown_value = buf.try_get_bytes(size as usize)?;
628 unknown_tagged_fields.insert(tag as i32, unknown_value);
629 }
630 Ok(Self {
631 throttle_time_ms,
632 error_code,
633 error_message,
634 responses,
635 node_endpoints,
636 unknown_tagged_fields,
637 })
638 }
639}
640
641impl Default for ShareAcknowledgeResponse {
642 fn default() -> Self {
643 Self {
644 throttle_time_ms: 0,
645 error_code: 0,
646 error_message: None,
647 responses: Default::default(),
648 node_endpoints: Default::default(),
649 unknown_tagged_fields: BTreeMap::new(),
650 }
651 }
652}
653
654impl Message for ShareAcknowledgeResponse {
655 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
656 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
657}
658
659#[non_exhaustive]
661#[derive(Debug, Clone, PartialEq)]
662pub struct ShareAcknowledgeTopicResponse {
663 pub topic_id: Uuid,
667
668 pub partitions: Vec<PartitionData>,
672
673 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
675}
676
677impl ShareAcknowledgeTopicResponse {
678 pub fn with_topic_id(mut self, value: Uuid) -> Self {
684 self.topic_id = value;
685 self
686 }
687 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
693 self.partitions = value;
694 self
695 }
696 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
698 self.unknown_tagged_fields = value;
699 self
700 }
701 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
703 self.unknown_tagged_fields.insert(key, value);
704 self
705 }
706}
707
708#[cfg(feature = "broker")]
709impl Encodable for ShareAcknowledgeTopicResponse {
710 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
711 if version != 1 {
712 bail!("specified version not supported by this message type");
713 }
714 types::Uuid.encode(buf, &self.topic_id)?;
715 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
716 let num_tagged_fields = self.unknown_tagged_fields.len();
717 if num_tagged_fields > std::u32::MAX as usize {
718 bail!(
719 "Too many tagged fields to encode ({} fields)",
720 num_tagged_fields
721 );
722 }
723 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
724
725 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
726 Ok(())
727 }
728 fn compute_size(&self, version: i16) -> Result<usize> {
729 let mut total_size = 0;
730 total_size += types::Uuid.compute_size(&self.topic_id)?;
731 total_size +=
732 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
733 let num_tagged_fields = self.unknown_tagged_fields.len();
734 if num_tagged_fields > std::u32::MAX as usize {
735 bail!(
736 "Too many tagged fields to encode ({} fields)",
737 num_tagged_fields
738 );
739 }
740 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
741
742 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
743 Ok(total_size)
744 }
745}
746
747#[cfg(feature = "client")]
748impl Decodable for ShareAcknowledgeTopicResponse {
749 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
750 if version != 1 {
751 bail!("specified version not supported by this message type");
752 }
753 let topic_id = types::Uuid.decode(buf)?;
754 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
755 let mut unknown_tagged_fields = BTreeMap::new();
756 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
757 for _ in 0..num_tagged_fields {
758 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
759 let size: u32 = types::UnsignedVarInt.decode(buf)?;
760 let unknown_value = buf.try_get_bytes(size as usize)?;
761 unknown_tagged_fields.insert(tag as i32, unknown_value);
762 }
763 Ok(Self {
764 topic_id,
765 partitions,
766 unknown_tagged_fields,
767 })
768 }
769}
770
771impl Default for ShareAcknowledgeTopicResponse {
772 fn default() -> Self {
773 Self {
774 topic_id: Uuid::nil(),
775 partitions: Default::default(),
776 unknown_tagged_fields: BTreeMap::new(),
777 }
778 }
779}
780
781impl Message for ShareAcknowledgeTopicResponse {
782 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
783 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
784}
785
786impl HeaderVersion for ShareAcknowledgeResponse {
787 fn header_version(version: i16) -> i16 {
788 1
789 }
790}