1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct NodeEndpoint {
24 pub node_id: super::BrokerId,
28
29 pub host: StrBytes,
33
34 pub port: u16,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl NodeEndpoint {
44 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
50 self.node_id = value;
51 self
52 }
53 pub fn with_host(mut self, value: StrBytes) -> Self {
59 self.host = value;
60 self
61 }
62 pub fn with_port(mut self, value: u16) -> Self {
68 self.port = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for NodeEndpoint {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 1 {
87 bail!("specified version not supported by this message type");
88 }
89 if version >= 1 {
90 types::Int32.encode(buf, &self.node_id)?;
91 } else {
92 if self.node_id != 0 {
93 bail!("A field is set that is not available on the selected protocol version");
94 }
95 }
96 if version >= 1 {
97 types::CompactString.encode(buf, &self.host)?;
98 } else {
99 if !self.host.is_empty() {
100 bail!("A field is set that is not available on the selected protocol version");
101 }
102 }
103 if version >= 1 {
104 types::UInt16.encode(buf, &self.port)?;
105 } else {
106 if self.port != 0 {
107 bail!("A field is set that is not available on the selected protocol version");
108 }
109 }
110 let num_tagged_fields = self.unknown_tagged_fields.len();
111 if num_tagged_fields > std::u32::MAX as usize {
112 bail!(
113 "Too many tagged fields to encode ({} fields)",
114 num_tagged_fields
115 );
116 }
117 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
118
119 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
120 Ok(())
121 }
122 fn compute_size(&self, version: i16) -> Result<usize> {
123 let mut total_size = 0;
124 if version >= 1 {
125 total_size += types::Int32.compute_size(&self.node_id)?;
126 } else {
127 if self.node_id != 0 {
128 bail!("A field is set that is not available on the selected protocol version");
129 }
130 }
131 if version >= 1 {
132 total_size += types::CompactString.compute_size(&self.host)?;
133 } else {
134 if !self.host.is_empty() {
135 bail!("A field is set that is not available on the selected protocol version");
136 }
137 }
138 if version >= 1 {
139 total_size += types::UInt16.compute_size(&self.port)?;
140 } else {
141 if self.port != 0 {
142 bail!("A field is set that is not available on the selected protocol version");
143 }
144 }
145 let num_tagged_fields = self.unknown_tagged_fields.len();
146 if num_tagged_fields > std::u32::MAX as usize {
147 bail!(
148 "Too many tagged fields to encode ({} fields)",
149 num_tagged_fields
150 );
151 }
152 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
153
154 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
155 Ok(total_size)
156 }
157}
158
159#[cfg(feature = "client")]
160impl Decodable for NodeEndpoint {
161 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
162 if version < 0 || version > 1 {
163 bail!("specified version not supported by this message type");
164 }
165 let node_id = if version >= 1 {
166 types::Int32.decode(buf)?
167 } else {
168 (0).into()
169 };
170 let host = if version >= 1 {
171 types::CompactString.decode(buf)?
172 } else {
173 Default::default()
174 };
175 let port = if version >= 1 {
176 types::UInt16.decode(buf)?
177 } else {
178 0
179 };
180 let mut unknown_tagged_fields = BTreeMap::new();
181 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
182 for _ in 0..num_tagged_fields {
183 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
184 let size: u32 = types::UnsignedVarInt.decode(buf)?;
185 let unknown_value = buf.try_get_bytes(size as usize)?;
186 unknown_tagged_fields.insert(tag as i32, unknown_value);
187 }
188 Ok(Self {
189 node_id,
190 host,
191 port,
192 unknown_tagged_fields,
193 })
194 }
195}
196
197impl Default for NodeEndpoint {
198 fn default() -> Self {
199 Self {
200 node_id: (0).into(),
201 host: Default::default(),
202 port: 0,
203 unknown_tagged_fields: BTreeMap::new(),
204 }
205 }
206}
207
208impl Message for NodeEndpoint {
209 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
210 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
211}
212
213#[non_exhaustive]
215#[derive(Debug, Clone, PartialEq)]
216pub struct PartitionData {
217 pub partition_index: i32,
221
222 pub error_code: i16,
226
227 pub leader_id: super::BrokerId,
231
232 pub leader_epoch: i32,
236
237 pub vote_granted: bool,
241
242 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
244}
245
246impl PartitionData {
247 pub fn with_partition_index(mut self, value: i32) -> Self {
253 self.partition_index = value;
254 self
255 }
256 pub fn with_error_code(mut self, value: i16) -> Self {
262 self.error_code = value;
263 self
264 }
265 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
271 self.leader_id = value;
272 self
273 }
274 pub fn with_leader_epoch(mut self, value: i32) -> Self {
280 self.leader_epoch = value;
281 self
282 }
283 pub fn with_vote_granted(mut self, value: bool) -> Self {
289 self.vote_granted = value;
290 self
291 }
292 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
294 self.unknown_tagged_fields = value;
295 self
296 }
297 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
299 self.unknown_tagged_fields.insert(key, value);
300 self
301 }
302}
303
304#[cfg(feature = "broker")]
305impl Encodable for PartitionData {
306 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
307 if version < 0 || version > 1 {
308 bail!("specified version not supported by this message type");
309 }
310 types::Int32.encode(buf, &self.partition_index)?;
311 types::Int16.encode(buf, &self.error_code)?;
312 types::Int32.encode(buf, &self.leader_id)?;
313 types::Int32.encode(buf, &self.leader_epoch)?;
314 types::Boolean.encode(buf, &self.vote_granted)?;
315 let num_tagged_fields = self.unknown_tagged_fields.len();
316 if num_tagged_fields > std::u32::MAX as usize {
317 bail!(
318 "Too many tagged fields to encode ({} fields)",
319 num_tagged_fields
320 );
321 }
322 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
323
324 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
325 Ok(())
326 }
327 fn compute_size(&self, version: i16) -> Result<usize> {
328 let mut total_size = 0;
329 total_size += types::Int32.compute_size(&self.partition_index)?;
330 total_size += types::Int16.compute_size(&self.error_code)?;
331 total_size += types::Int32.compute_size(&self.leader_id)?;
332 total_size += types::Int32.compute_size(&self.leader_epoch)?;
333 total_size += types::Boolean.compute_size(&self.vote_granted)?;
334 let num_tagged_fields = self.unknown_tagged_fields.len();
335 if num_tagged_fields > std::u32::MAX as usize {
336 bail!(
337 "Too many tagged fields to encode ({} fields)",
338 num_tagged_fields
339 );
340 }
341 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
342
343 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
344 Ok(total_size)
345 }
346}
347
348#[cfg(feature = "client")]
349impl Decodable for PartitionData {
350 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
351 if version < 0 || version > 1 {
352 bail!("specified version not supported by this message type");
353 }
354 let partition_index = types::Int32.decode(buf)?;
355 let error_code = types::Int16.decode(buf)?;
356 let leader_id = types::Int32.decode(buf)?;
357 let leader_epoch = types::Int32.decode(buf)?;
358 let vote_granted = types::Boolean.decode(buf)?;
359 let mut unknown_tagged_fields = BTreeMap::new();
360 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
361 for _ in 0..num_tagged_fields {
362 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
363 let size: u32 = types::UnsignedVarInt.decode(buf)?;
364 let unknown_value = buf.try_get_bytes(size as usize)?;
365 unknown_tagged_fields.insert(tag as i32, unknown_value);
366 }
367 Ok(Self {
368 partition_index,
369 error_code,
370 leader_id,
371 leader_epoch,
372 vote_granted,
373 unknown_tagged_fields,
374 })
375 }
376}
377
378impl Default for PartitionData {
379 fn default() -> Self {
380 Self {
381 partition_index: 0,
382 error_code: 0,
383 leader_id: (0).into(),
384 leader_epoch: 0,
385 vote_granted: false,
386 unknown_tagged_fields: BTreeMap::new(),
387 }
388 }
389}
390
391impl Message for PartitionData {
392 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
393 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
394}
395
396#[non_exhaustive]
398#[derive(Debug, Clone, PartialEq)]
399pub struct TopicData {
400 pub topic_name: super::TopicName,
404
405 pub partitions: Vec<PartitionData>,
409
410 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
412}
413
414impl TopicData {
415 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
421 self.topic_name = value;
422 self
423 }
424 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
430 self.partitions = value;
431 self
432 }
433 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
435 self.unknown_tagged_fields = value;
436 self
437 }
438 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
440 self.unknown_tagged_fields.insert(key, value);
441 self
442 }
443}
444
445#[cfg(feature = "broker")]
446impl Encodable for TopicData {
447 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
448 if version < 0 || version > 1 {
449 bail!("specified version not supported by this message type");
450 }
451 types::CompactString.encode(buf, &self.topic_name)?;
452 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
453 let num_tagged_fields = self.unknown_tagged_fields.len();
454 if num_tagged_fields > std::u32::MAX as usize {
455 bail!(
456 "Too many tagged fields to encode ({} fields)",
457 num_tagged_fields
458 );
459 }
460 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
461
462 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
463 Ok(())
464 }
465 fn compute_size(&self, version: i16) -> Result<usize> {
466 let mut total_size = 0;
467 total_size += types::CompactString.compute_size(&self.topic_name)?;
468 total_size +=
469 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
470 let num_tagged_fields = self.unknown_tagged_fields.len();
471 if num_tagged_fields > std::u32::MAX as usize {
472 bail!(
473 "Too many tagged fields to encode ({} fields)",
474 num_tagged_fields
475 );
476 }
477 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
478
479 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
480 Ok(total_size)
481 }
482}
483
484#[cfg(feature = "client")]
485impl Decodable for TopicData {
486 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
487 if version < 0 || version > 1 {
488 bail!("specified version not supported by this message type");
489 }
490 let topic_name = types::CompactString.decode(buf)?;
491 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
492 let mut unknown_tagged_fields = BTreeMap::new();
493 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
494 for _ in 0..num_tagged_fields {
495 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
496 let size: u32 = types::UnsignedVarInt.decode(buf)?;
497 let unknown_value = buf.try_get_bytes(size as usize)?;
498 unknown_tagged_fields.insert(tag as i32, unknown_value);
499 }
500 Ok(Self {
501 topic_name,
502 partitions,
503 unknown_tagged_fields,
504 })
505 }
506}
507
508impl Default for TopicData {
509 fn default() -> Self {
510 Self {
511 topic_name: Default::default(),
512 partitions: Default::default(),
513 unknown_tagged_fields: BTreeMap::new(),
514 }
515 }
516}
517
518impl Message for TopicData {
519 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
520 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
521}
522
523#[non_exhaustive]
525#[derive(Debug, Clone, PartialEq)]
526pub struct VoteResponse {
527 pub error_code: i16,
531
532 pub topics: Vec<TopicData>,
536
537 pub node_endpoints: Vec<NodeEndpoint>,
541
542 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
544}
545
546impl VoteResponse {
547 pub fn with_error_code(mut self, value: i16) -> Self {
553 self.error_code = value;
554 self
555 }
556 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
562 self.topics = value;
563 self
564 }
565 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
571 self.node_endpoints = value;
572 self
573 }
574 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
576 self.unknown_tagged_fields = value;
577 self
578 }
579 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
581 self.unknown_tagged_fields.insert(key, value);
582 self
583 }
584}
585
586#[cfg(feature = "broker")]
587impl Encodable for VoteResponse {
588 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
589 if version < 0 || version > 1 {
590 bail!("specified version not supported by this message type");
591 }
592 types::Int16.encode(buf, &self.error_code)?;
593 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
594 let mut num_tagged_fields = self.unknown_tagged_fields.len();
595 if version >= 1 {
596 if !self.node_endpoints.is_empty() {
597 num_tagged_fields += 1;
598 }
599 }
600 if num_tagged_fields > std::u32::MAX as usize {
601 bail!(
602 "Too many tagged fields to encode ({} fields)",
603 num_tagged_fields
604 );
605 }
606 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
607 if version >= 1 {
608 if !self.node_endpoints.is_empty() {
609 let computed_size = types::CompactArray(types::Struct { version })
610 .compute_size(&self.node_endpoints)?;
611 if computed_size > std::u32::MAX as usize {
612 bail!(
613 "Tagged field is too large to encode ({} bytes)",
614 computed_size
615 );
616 }
617 types::UnsignedVarInt.encode(buf, 0)?;
618 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
619 types::CompactArray(types::Struct { version }).encode(buf, &self.node_endpoints)?;
620 }
621 }
622 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
623 Ok(())
624 }
625 fn compute_size(&self, version: i16) -> Result<usize> {
626 let mut total_size = 0;
627 total_size += types::Int16.compute_size(&self.error_code)?;
628 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
629 let mut num_tagged_fields = self.unknown_tagged_fields.len();
630 if version >= 1 {
631 if !self.node_endpoints.is_empty() {
632 num_tagged_fields += 1;
633 }
634 }
635 if num_tagged_fields > std::u32::MAX as usize {
636 bail!(
637 "Too many tagged fields to encode ({} fields)",
638 num_tagged_fields
639 );
640 }
641 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
642 if version >= 1 {
643 if !self.node_endpoints.is_empty() {
644 let computed_size = types::CompactArray(types::Struct { version })
645 .compute_size(&self.node_endpoints)?;
646 if computed_size > std::u32::MAX as usize {
647 bail!(
648 "Tagged field is too large to encode ({} bytes)",
649 computed_size
650 );
651 }
652 total_size += types::UnsignedVarInt.compute_size(0)?;
653 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
654 total_size += computed_size;
655 }
656 }
657 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
658 Ok(total_size)
659 }
660}
661
662#[cfg(feature = "client")]
663impl Decodable for VoteResponse {
664 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
665 if version < 0 || version > 1 {
666 bail!("specified version not supported by this message type");
667 }
668 let error_code = types::Int16.decode(buf)?;
669 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
670 let mut node_endpoints = Default::default();
671 let mut unknown_tagged_fields = BTreeMap::new();
672 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
673 for _ in 0..num_tagged_fields {
674 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
675 let size: u32 = types::UnsignedVarInt.decode(buf)?;
676 match tag {
677 0 => {
678 if version >= 1 {
679 node_endpoints =
680 types::CompactArray(types::Struct { version }).decode(buf)?;
681 } else {
682 bail!("Tag {} is not valid for version {}", tag, version);
683 }
684 }
685 _ => {
686 let unknown_value = buf.try_get_bytes(size as usize)?;
687 unknown_tagged_fields.insert(tag as i32, unknown_value);
688 }
689 }
690 }
691 Ok(Self {
692 error_code,
693 topics,
694 node_endpoints,
695 unknown_tagged_fields,
696 })
697 }
698}
699
700impl Default for VoteResponse {
701 fn default() -> Self {
702 Self {
703 error_code: 0,
704 topics: Default::default(),
705 node_endpoints: Default::default(),
706 unknown_tagged_fields: BTreeMap::new(),
707 }
708 }
709}
710
711impl Message for VoteResponse {
712 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
713 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
714}
715
716impl HeaderVersion for VoteResponse {
717 fn header_version(version: i16) -> i16 {
718 1
719 }
720}