1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct EndQuorumEpochResponse {
24 pub error_code: i16,
28
29 pub topics: Vec<TopicData>,
33
34 pub node_endpoints: Vec<NodeEndpoint>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl EndQuorumEpochResponse {
44 pub fn with_error_code(mut self, value: i16) -> Self {
50 self.error_code = value;
51 self
52 }
53 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
59 self.topics = value;
60 self
61 }
62 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
68 self.node_endpoints = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for EndQuorumEpochResponse {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 1 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int16.encode(buf, &self.error_code)?;
90 if version >= 1 {
91 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
92 } else {
93 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
94 }
95 if version >= 1 {
96 let mut num_tagged_fields = self.unknown_tagged_fields.len();
97 if !self.node_endpoints.is_empty() {
98 num_tagged_fields += 1;
99 }
100 if num_tagged_fields > std::u32::MAX as usize {
101 bail!(
102 "Too many tagged fields to encode ({} fields)",
103 num_tagged_fields
104 );
105 }
106 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
107 if !self.node_endpoints.is_empty() {
108 let computed_size = types::CompactArray(types::Struct { version })
109 .compute_size(&self.node_endpoints)?;
110 if computed_size > std::u32::MAX as usize {
111 bail!(
112 "Tagged field is too large to encode ({} bytes)",
113 computed_size
114 );
115 }
116 types::UnsignedVarInt.encode(buf, 0)?;
117 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
118 types::CompactArray(types::Struct { version }).encode(buf, &self.node_endpoints)?;
119 }
120
121 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
122 }
123 Ok(())
124 }
125 fn compute_size(&self, version: i16) -> Result<usize> {
126 let mut total_size = 0;
127 total_size += types::Int16.compute_size(&self.error_code)?;
128 if version >= 1 {
129 total_size +=
130 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
131 } else {
132 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
133 }
134 if version >= 1 {
135 let mut num_tagged_fields = self.unknown_tagged_fields.len();
136 if !self.node_endpoints.is_empty() {
137 num_tagged_fields += 1;
138 }
139 if num_tagged_fields > std::u32::MAX as usize {
140 bail!(
141 "Too many tagged fields to encode ({} fields)",
142 num_tagged_fields
143 );
144 }
145 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
146 if !self.node_endpoints.is_empty() {
147 let computed_size = types::CompactArray(types::Struct { version })
148 .compute_size(&self.node_endpoints)?;
149 if computed_size > std::u32::MAX as usize {
150 bail!(
151 "Tagged field is too large to encode ({} bytes)",
152 computed_size
153 );
154 }
155 total_size += types::UnsignedVarInt.compute_size(0)?;
156 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
157 total_size += computed_size;
158 }
159
160 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
161 }
162 Ok(total_size)
163 }
164}
165
166#[cfg(feature = "client")]
167impl Decodable for EndQuorumEpochResponse {
168 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
169 if version < 0 || version > 1 {
170 bail!("specified version not supported by this message type");
171 }
172 let error_code = types::Int16.decode(buf)?;
173 let topics = if version >= 1 {
174 types::CompactArray(types::Struct { version }).decode(buf)?
175 } else {
176 types::Array(types::Struct { version }).decode(buf)?
177 };
178 let mut node_endpoints = Default::default();
179 let mut unknown_tagged_fields = BTreeMap::new();
180 if version >= 1 {
181 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
182 for _ in 0..num_tagged_fields {
183 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
184 let size: u32 = types::UnsignedVarInt.decode(buf)?;
185 match tag {
186 0 => {
187 node_endpoints =
188 types::CompactArray(types::Struct { version }).decode(buf)?;
189 }
190 _ => {
191 let unknown_value = buf.try_get_bytes(size as usize)?;
192 unknown_tagged_fields.insert(tag as i32, unknown_value);
193 }
194 }
195 }
196 }
197 Ok(Self {
198 error_code,
199 topics,
200 node_endpoints,
201 unknown_tagged_fields,
202 })
203 }
204}
205
206impl Default for EndQuorumEpochResponse {
207 fn default() -> Self {
208 Self {
209 error_code: 0,
210 topics: Default::default(),
211 node_endpoints: Default::default(),
212 unknown_tagged_fields: BTreeMap::new(),
213 }
214 }
215}
216
217impl Message for EndQuorumEpochResponse {
218 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
219 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
220}
221
222#[non_exhaustive]
224#[derive(Debug, Clone, PartialEq)]
225pub struct NodeEndpoint {
226 pub node_id: super::BrokerId,
230
231 pub host: StrBytes,
235
236 pub port: u16,
240
241 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
243}
244
245impl NodeEndpoint {
246 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
252 self.node_id = value;
253 self
254 }
255 pub fn with_host(mut self, value: StrBytes) -> Self {
261 self.host = value;
262 self
263 }
264 pub fn with_port(mut self, value: u16) -> Self {
270 self.port = value;
271 self
272 }
273 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
275 self.unknown_tagged_fields = value;
276 self
277 }
278 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
280 self.unknown_tagged_fields.insert(key, value);
281 self
282 }
283}
284
285#[cfg(feature = "broker")]
286impl Encodable for NodeEndpoint {
287 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
288 if version < 0 || version > 1 {
289 bail!("specified version not supported by this message type");
290 }
291 if version >= 1 {
292 types::Int32.encode(buf, &self.node_id)?;
293 } else {
294 if self.node_id != 0 {
295 bail!("A field is set that is not available on the selected protocol version");
296 }
297 }
298 if version >= 1 {
299 types::CompactString.encode(buf, &self.host)?;
300 } else {
301 if !self.host.is_empty() {
302 bail!("A field is set that is not available on the selected protocol version");
303 }
304 }
305 if version >= 1 {
306 types::UInt16.encode(buf, &self.port)?;
307 } else {
308 if self.port != 0 {
309 bail!("A field is set that is not available on the selected protocol version");
310 }
311 }
312 if version >= 1 {
313 let num_tagged_fields = self.unknown_tagged_fields.len();
314 if num_tagged_fields > std::u32::MAX as usize {
315 bail!(
316 "Too many tagged fields to encode ({} fields)",
317 num_tagged_fields
318 );
319 }
320 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
321
322 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
323 }
324 Ok(())
325 }
326 fn compute_size(&self, version: i16) -> Result<usize> {
327 let mut total_size = 0;
328 if version >= 1 {
329 total_size += types::Int32.compute_size(&self.node_id)?;
330 } else {
331 if self.node_id != 0 {
332 bail!("A field is set that is not available on the selected protocol version");
333 }
334 }
335 if version >= 1 {
336 total_size += types::CompactString.compute_size(&self.host)?;
337 } else {
338 if !self.host.is_empty() {
339 bail!("A field is set that is not available on the selected protocol version");
340 }
341 }
342 if version >= 1 {
343 total_size += types::UInt16.compute_size(&self.port)?;
344 } else {
345 if self.port != 0 {
346 bail!("A field is set that is not available on the selected protocol version");
347 }
348 }
349 if version >= 1 {
350 let num_tagged_fields = self.unknown_tagged_fields.len();
351 if num_tagged_fields > std::u32::MAX as usize {
352 bail!(
353 "Too many tagged fields to encode ({} fields)",
354 num_tagged_fields
355 );
356 }
357 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
358
359 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
360 }
361 Ok(total_size)
362 }
363}
364
365#[cfg(feature = "client")]
366impl Decodable for NodeEndpoint {
367 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
368 if version < 0 || version > 1 {
369 bail!("specified version not supported by this message type");
370 }
371 let node_id = if version >= 1 {
372 types::Int32.decode(buf)?
373 } else {
374 (0).into()
375 };
376 let host = if version >= 1 {
377 types::CompactString.decode(buf)?
378 } else {
379 Default::default()
380 };
381 let port = if version >= 1 {
382 types::UInt16.decode(buf)?
383 } else {
384 0
385 };
386 let mut unknown_tagged_fields = BTreeMap::new();
387 if version >= 1 {
388 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
389 for _ in 0..num_tagged_fields {
390 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
391 let size: u32 = types::UnsignedVarInt.decode(buf)?;
392 let unknown_value = buf.try_get_bytes(size as usize)?;
393 unknown_tagged_fields.insert(tag as i32, unknown_value);
394 }
395 }
396 Ok(Self {
397 node_id,
398 host,
399 port,
400 unknown_tagged_fields,
401 })
402 }
403}
404
405impl Default for NodeEndpoint {
406 fn default() -> Self {
407 Self {
408 node_id: (0).into(),
409 host: Default::default(),
410 port: 0,
411 unknown_tagged_fields: BTreeMap::new(),
412 }
413 }
414}
415
416impl Message for NodeEndpoint {
417 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
418 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
419}
420
421#[non_exhaustive]
423#[derive(Debug, Clone, PartialEq)]
424pub struct PartitionData {
425 pub partition_index: i32,
429
430 pub error_code: i16,
434
435 pub leader_id: super::BrokerId,
439
440 pub leader_epoch: i32,
444
445 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
447}
448
449impl PartitionData {
450 pub fn with_partition_index(mut self, value: i32) -> Self {
456 self.partition_index = value;
457 self
458 }
459 pub fn with_error_code(mut self, value: i16) -> Self {
465 self.error_code = value;
466 self
467 }
468 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
474 self.leader_id = value;
475 self
476 }
477 pub fn with_leader_epoch(mut self, value: i32) -> Self {
483 self.leader_epoch = value;
484 self
485 }
486 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
488 self.unknown_tagged_fields = value;
489 self
490 }
491 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
493 self.unknown_tagged_fields.insert(key, value);
494 self
495 }
496}
497
498#[cfg(feature = "broker")]
499impl Encodable for PartitionData {
500 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
501 if version < 0 || version > 1 {
502 bail!("specified version not supported by this message type");
503 }
504 types::Int32.encode(buf, &self.partition_index)?;
505 types::Int16.encode(buf, &self.error_code)?;
506 types::Int32.encode(buf, &self.leader_id)?;
507 types::Int32.encode(buf, &self.leader_epoch)?;
508 if version >= 1 {
509 let num_tagged_fields = self.unknown_tagged_fields.len();
510 if num_tagged_fields > std::u32::MAX as usize {
511 bail!(
512 "Too many tagged fields to encode ({} fields)",
513 num_tagged_fields
514 );
515 }
516 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
517
518 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
519 }
520 Ok(())
521 }
522 fn compute_size(&self, version: i16) -> Result<usize> {
523 let mut total_size = 0;
524 total_size += types::Int32.compute_size(&self.partition_index)?;
525 total_size += types::Int16.compute_size(&self.error_code)?;
526 total_size += types::Int32.compute_size(&self.leader_id)?;
527 total_size += types::Int32.compute_size(&self.leader_epoch)?;
528 if version >= 1 {
529 let num_tagged_fields = self.unknown_tagged_fields.len();
530 if num_tagged_fields > std::u32::MAX as usize {
531 bail!(
532 "Too many tagged fields to encode ({} fields)",
533 num_tagged_fields
534 );
535 }
536 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
537
538 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
539 }
540 Ok(total_size)
541 }
542}
543
544#[cfg(feature = "client")]
545impl Decodable for PartitionData {
546 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
547 if version < 0 || version > 1 {
548 bail!("specified version not supported by this message type");
549 }
550 let partition_index = types::Int32.decode(buf)?;
551 let error_code = types::Int16.decode(buf)?;
552 let leader_id = types::Int32.decode(buf)?;
553 let leader_epoch = types::Int32.decode(buf)?;
554 let mut unknown_tagged_fields = BTreeMap::new();
555 if version >= 1 {
556 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
557 for _ in 0..num_tagged_fields {
558 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
559 let size: u32 = types::UnsignedVarInt.decode(buf)?;
560 let unknown_value = buf.try_get_bytes(size as usize)?;
561 unknown_tagged_fields.insert(tag as i32, unknown_value);
562 }
563 }
564 Ok(Self {
565 partition_index,
566 error_code,
567 leader_id,
568 leader_epoch,
569 unknown_tagged_fields,
570 })
571 }
572}
573
574impl Default for PartitionData {
575 fn default() -> Self {
576 Self {
577 partition_index: 0,
578 error_code: 0,
579 leader_id: (0).into(),
580 leader_epoch: 0,
581 unknown_tagged_fields: BTreeMap::new(),
582 }
583 }
584}
585
586impl Message for PartitionData {
587 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
588 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
589}
590
591#[non_exhaustive]
593#[derive(Debug, Clone, PartialEq)]
594pub struct TopicData {
595 pub topic_name: super::TopicName,
599
600 pub partitions: Vec<PartitionData>,
604
605 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
607}
608
609impl TopicData {
610 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
616 self.topic_name = value;
617 self
618 }
619 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
625 self.partitions = value;
626 self
627 }
628 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
630 self.unknown_tagged_fields = value;
631 self
632 }
633 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
635 self.unknown_tagged_fields.insert(key, value);
636 self
637 }
638}
639
640#[cfg(feature = "broker")]
641impl Encodable for TopicData {
642 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
643 if version < 0 || version > 1 {
644 bail!("specified version not supported by this message type");
645 }
646 if version >= 1 {
647 types::CompactString.encode(buf, &self.topic_name)?;
648 } else {
649 types::String.encode(buf, &self.topic_name)?;
650 }
651 if version >= 1 {
652 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
653 } else {
654 types::Array(types::Struct { version }).encode(buf, &self.partitions)?;
655 }
656 if version >= 1 {
657 let num_tagged_fields = self.unknown_tagged_fields.len();
658 if num_tagged_fields > std::u32::MAX as usize {
659 bail!(
660 "Too many tagged fields to encode ({} fields)",
661 num_tagged_fields
662 );
663 }
664 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
665
666 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
667 }
668 Ok(())
669 }
670 fn compute_size(&self, version: i16) -> Result<usize> {
671 let mut total_size = 0;
672 if version >= 1 {
673 total_size += types::CompactString.compute_size(&self.topic_name)?;
674 } else {
675 total_size += types::String.compute_size(&self.topic_name)?;
676 }
677 if version >= 1 {
678 total_size +=
679 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
680 } else {
681 total_size += types::Array(types::Struct { version }).compute_size(&self.partitions)?;
682 }
683 if version >= 1 {
684 let num_tagged_fields = self.unknown_tagged_fields.len();
685 if num_tagged_fields > std::u32::MAX as usize {
686 bail!(
687 "Too many tagged fields to encode ({} fields)",
688 num_tagged_fields
689 );
690 }
691 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
692
693 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
694 }
695 Ok(total_size)
696 }
697}
698
699#[cfg(feature = "client")]
700impl Decodable for TopicData {
701 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
702 if version < 0 || version > 1 {
703 bail!("specified version not supported by this message type");
704 }
705 let topic_name = if version >= 1 {
706 types::CompactString.decode(buf)?
707 } else {
708 types::String.decode(buf)?
709 };
710 let partitions = if version >= 1 {
711 types::CompactArray(types::Struct { version }).decode(buf)?
712 } else {
713 types::Array(types::Struct { version }).decode(buf)?
714 };
715 let mut unknown_tagged_fields = BTreeMap::new();
716 if version >= 1 {
717 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
718 for _ in 0..num_tagged_fields {
719 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
720 let size: u32 = types::UnsignedVarInt.decode(buf)?;
721 let unknown_value = buf.try_get_bytes(size as usize)?;
722 unknown_tagged_fields.insert(tag as i32, unknown_value);
723 }
724 }
725 Ok(Self {
726 topic_name,
727 partitions,
728 unknown_tagged_fields,
729 })
730 }
731}
732
733impl Default for TopicData {
734 fn default() -> Self {
735 Self {
736 topic_name: Default::default(),
737 partitions: Default::default(),
738 unknown_tagged_fields: BTreeMap::new(),
739 }
740 }
741}
742
743impl Message for TopicData {
744 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
745 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
746}
747
748impl HeaderVersion for EndQuorumEpochResponse {
749 fn header_version(version: i16) -> i16 {
750 if version >= 1 {
751 1
752 } else {
753 0
754 }
755 }
756}