1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AlterPartitionRequest {
24 pub broker_id: super::BrokerId,
28
29 pub broker_epoch: i64,
33
34 pub topics: Vec<TopicData>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl AlterPartitionRequest {
44 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
50 self.broker_id = value;
51 self
52 }
53 pub fn with_broker_epoch(mut self, value: i64) -> Self {
59 self.broker_epoch = value;
60 self
61 }
62 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
68 self.topics = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for AlterPartitionRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 3 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int32.encode(buf, &self.broker_id)?;
90 types::Int64.encode(buf, &self.broker_epoch)?;
91 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
92 let num_tagged_fields = self.unknown_tagged_fields.len();
93 if num_tagged_fields > std::u32::MAX as usize {
94 bail!(
95 "Too many tagged fields to encode ({} fields)",
96 num_tagged_fields
97 );
98 }
99 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
100
101 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
102 Ok(())
103 }
104 fn compute_size(&self, version: i16) -> Result<usize> {
105 let mut total_size = 0;
106 total_size += types::Int32.compute_size(&self.broker_id)?;
107 total_size += types::Int64.compute_size(&self.broker_epoch)?;
108 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 Ok(total_size)
120 }
121}
122
123#[cfg(feature = "broker")]
124impl Decodable for AlterPartitionRequest {
125 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
126 if version < 0 || version > 3 {
127 bail!("specified version not supported by this message type");
128 }
129 let broker_id = types::Int32.decode(buf)?;
130 let broker_epoch = types::Int64.decode(buf)?;
131 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
132 let mut unknown_tagged_fields = BTreeMap::new();
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 Ok(Self {
141 broker_id,
142 broker_epoch,
143 topics,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for AlterPartitionRequest {
150 fn default() -> Self {
151 Self {
152 broker_id: (0).into(),
153 broker_epoch: -1,
154 topics: Default::default(),
155 unknown_tagged_fields: BTreeMap::new(),
156 }
157 }
158}
159
160impl Message for AlterPartitionRequest {
161 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
162 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
163}
164
165#[non_exhaustive]
167#[derive(Debug, Clone, PartialEq)]
168pub struct BrokerState {
169 pub broker_id: super::BrokerId,
173
174 pub broker_epoch: i64,
178
179 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
181}
182
183impl BrokerState {
184 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
190 self.broker_id = value;
191 self
192 }
193 pub fn with_broker_epoch(mut self, value: i64) -> Self {
199 self.broker_epoch = value;
200 self
201 }
202 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
204 self.unknown_tagged_fields = value;
205 self
206 }
207 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
209 self.unknown_tagged_fields.insert(key, value);
210 self
211 }
212}
213
214#[cfg(feature = "client")]
215impl Encodable for BrokerState {
216 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
217 if version < 0 || version > 3 {
218 bail!("specified version not supported by this message type");
219 }
220 if version >= 3 {
221 types::Int32.encode(buf, &self.broker_id)?;
222 } else {
223 if self.broker_id != 0 {
224 bail!("A field is set that is not available on the selected protocol version");
225 }
226 }
227 if version >= 3 {
228 types::Int64.encode(buf, &self.broker_epoch)?;
229 } else {
230 if self.broker_epoch != -1 {
231 bail!("A field is set that is not available on the selected protocol version");
232 }
233 }
234 let num_tagged_fields = self.unknown_tagged_fields.len();
235 if num_tagged_fields > std::u32::MAX as usize {
236 bail!(
237 "Too many tagged fields to encode ({} fields)",
238 num_tagged_fields
239 );
240 }
241 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
242
243 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
244 Ok(())
245 }
246 fn compute_size(&self, version: i16) -> Result<usize> {
247 let mut total_size = 0;
248 if version >= 3 {
249 total_size += types::Int32.compute_size(&self.broker_id)?;
250 } else {
251 if self.broker_id != 0 {
252 bail!("A field is set that is not available on the selected protocol version");
253 }
254 }
255 if version >= 3 {
256 total_size += types::Int64.compute_size(&self.broker_epoch)?;
257 } else {
258 if self.broker_epoch != -1 {
259 bail!("A field is set that is not available on the selected protocol version");
260 }
261 }
262 let num_tagged_fields = self.unknown_tagged_fields.len();
263 if num_tagged_fields > std::u32::MAX as usize {
264 bail!(
265 "Too many tagged fields to encode ({} fields)",
266 num_tagged_fields
267 );
268 }
269 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
270
271 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
272 Ok(total_size)
273 }
274}
275
276#[cfg(feature = "broker")]
277impl Decodable for BrokerState {
278 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
279 if version < 0 || version > 3 {
280 bail!("specified version not supported by this message type");
281 }
282 let broker_id = if version >= 3 {
283 types::Int32.decode(buf)?
284 } else {
285 (0).into()
286 };
287 let broker_epoch = if version >= 3 {
288 types::Int64.decode(buf)?
289 } else {
290 -1
291 };
292 let mut unknown_tagged_fields = BTreeMap::new();
293 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
294 for _ in 0..num_tagged_fields {
295 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
296 let size: u32 = types::UnsignedVarInt.decode(buf)?;
297 let unknown_value = buf.try_get_bytes(size as usize)?;
298 unknown_tagged_fields.insert(tag as i32, unknown_value);
299 }
300 Ok(Self {
301 broker_id,
302 broker_epoch,
303 unknown_tagged_fields,
304 })
305 }
306}
307
308impl Default for BrokerState {
309 fn default() -> Self {
310 Self {
311 broker_id: (0).into(),
312 broker_epoch: -1,
313 unknown_tagged_fields: BTreeMap::new(),
314 }
315 }
316}
317
318impl Message for BrokerState {
319 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
320 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
321}
322
323#[non_exhaustive]
325#[derive(Debug, Clone, PartialEq)]
326pub struct PartitionData {
327 pub partition_index: i32,
331
332 pub leader_epoch: i32,
336
337 pub new_isr: Vec<super::BrokerId>,
341
342 pub new_isr_with_epochs: Vec<BrokerState>,
346
347 pub leader_recovery_state: i8,
351
352 pub partition_epoch: i32,
356
357 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
359}
360
361impl PartitionData {
362 pub fn with_partition_index(mut self, value: i32) -> Self {
368 self.partition_index = value;
369 self
370 }
371 pub fn with_leader_epoch(mut self, value: i32) -> Self {
377 self.leader_epoch = value;
378 self
379 }
380 pub fn with_new_isr(mut self, value: Vec<super::BrokerId>) -> Self {
386 self.new_isr = value;
387 self
388 }
389 pub fn with_new_isr_with_epochs(mut self, value: Vec<BrokerState>) -> Self {
395 self.new_isr_with_epochs = value;
396 self
397 }
398 pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
404 self.leader_recovery_state = value;
405 self
406 }
407 pub fn with_partition_epoch(mut self, value: i32) -> Self {
413 self.partition_epoch = value;
414 self
415 }
416 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
418 self.unknown_tagged_fields = value;
419 self
420 }
421 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
423 self.unknown_tagged_fields.insert(key, value);
424 self
425 }
426}
427
428#[cfg(feature = "client")]
429impl Encodable for PartitionData {
430 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
431 if version < 0 || version > 3 {
432 bail!("specified version not supported by this message type");
433 }
434 types::Int32.encode(buf, &self.partition_index)?;
435 types::Int32.encode(buf, &self.leader_epoch)?;
436 if version <= 2 {
437 types::CompactArray(types::Int32).encode(buf, &self.new_isr)?;
438 } else {
439 if !self.new_isr.is_empty() {
440 bail!("A field is set that is not available on the selected protocol version");
441 }
442 }
443 if version >= 3 {
444 types::CompactArray(types::Struct { version })
445 .encode(buf, &self.new_isr_with_epochs)?;
446 } else {
447 if !self.new_isr_with_epochs.is_empty() {
448 bail!("A field is set that is not available on the selected protocol version");
449 }
450 }
451 if version >= 1 {
452 types::Int8.encode(buf, &self.leader_recovery_state)?;
453 } else {
454 if self.leader_recovery_state != 0 {
455 bail!("A field is set that is not available on the selected protocol version");
456 }
457 }
458 types::Int32.encode(buf, &self.partition_epoch)?;
459 let num_tagged_fields = self.unknown_tagged_fields.len();
460 if num_tagged_fields > std::u32::MAX as usize {
461 bail!(
462 "Too many tagged fields to encode ({} fields)",
463 num_tagged_fields
464 );
465 }
466 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
467
468 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
469 Ok(())
470 }
471 fn compute_size(&self, version: i16) -> Result<usize> {
472 let mut total_size = 0;
473 total_size += types::Int32.compute_size(&self.partition_index)?;
474 total_size += types::Int32.compute_size(&self.leader_epoch)?;
475 if version <= 2 {
476 total_size += types::CompactArray(types::Int32).compute_size(&self.new_isr)?;
477 } else {
478 if !self.new_isr.is_empty() {
479 bail!("A field is set that is not available on the selected protocol version");
480 }
481 }
482 if version >= 3 {
483 total_size += types::CompactArray(types::Struct { version })
484 .compute_size(&self.new_isr_with_epochs)?;
485 } else {
486 if !self.new_isr_with_epochs.is_empty() {
487 bail!("A field is set that is not available on the selected protocol version");
488 }
489 }
490 if version >= 1 {
491 total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
492 } else {
493 if self.leader_recovery_state != 0 {
494 bail!("A field is set that is not available on the selected protocol version");
495 }
496 }
497 total_size += types::Int32.compute_size(&self.partition_epoch)?;
498 let num_tagged_fields = self.unknown_tagged_fields.len();
499 if num_tagged_fields > std::u32::MAX as usize {
500 bail!(
501 "Too many tagged fields to encode ({} fields)",
502 num_tagged_fields
503 );
504 }
505 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
506
507 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
508 Ok(total_size)
509 }
510}
511
512#[cfg(feature = "broker")]
513impl Decodable for PartitionData {
514 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
515 if version < 0 || version > 3 {
516 bail!("specified version not supported by this message type");
517 }
518 let partition_index = types::Int32.decode(buf)?;
519 let leader_epoch = types::Int32.decode(buf)?;
520 let new_isr = if version <= 2 {
521 types::CompactArray(types::Int32).decode(buf)?
522 } else {
523 Default::default()
524 };
525 let new_isr_with_epochs = if version >= 3 {
526 types::CompactArray(types::Struct { version }).decode(buf)?
527 } else {
528 Default::default()
529 };
530 let leader_recovery_state = if version >= 1 {
531 types::Int8.decode(buf)?
532 } else {
533 0
534 };
535 let partition_epoch = types::Int32.decode(buf)?;
536 let mut unknown_tagged_fields = BTreeMap::new();
537 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
538 for _ in 0..num_tagged_fields {
539 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
540 let size: u32 = types::UnsignedVarInt.decode(buf)?;
541 let unknown_value = buf.try_get_bytes(size as usize)?;
542 unknown_tagged_fields.insert(tag as i32, unknown_value);
543 }
544 Ok(Self {
545 partition_index,
546 leader_epoch,
547 new_isr,
548 new_isr_with_epochs,
549 leader_recovery_state,
550 partition_epoch,
551 unknown_tagged_fields,
552 })
553 }
554}
555
556impl Default for PartitionData {
557 fn default() -> Self {
558 Self {
559 partition_index: 0,
560 leader_epoch: 0,
561 new_isr: Default::default(),
562 new_isr_with_epochs: Default::default(),
563 leader_recovery_state: 0,
564 partition_epoch: 0,
565 unknown_tagged_fields: BTreeMap::new(),
566 }
567 }
568}
569
570impl Message for PartitionData {
571 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
572 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
573}
574
575#[non_exhaustive]
577#[derive(Debug, Clone, PartialEq)]
578pub struct TopicData {
579 pub topic_name: super::TopicName,
583
584 pub topic_id: Uuid,
588
589 pub partitions: Vec<PartitionData>,
593
594 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
596}
597
598impl TopicData {
599 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
605 self.topic_name = value;
606 self
607 }
608 pub fn with_topic_id(mut self, value: Uuid) -> Self {
614 self.topic_id = value;
615 self
616 }
617 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
623 self.partitions = value;
624 self
625 }
626 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
628 self.unknown_tagged_fields = value;
629 self
630 }
631 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
633 self.unknown_tagged_fields.insert(key, value);
634 self
635 }
636}
637
638#[cfg(feature = "client")]
639impl Encodable for TopicData {
640 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
641 if version < 0 || version > 3 {
642 bail!("specified version not supported by this message type");
643 }
644 if version <= 1 {
645 types::CompactString.encode(buf, &self.topic_name)?;
646 }
647 if version >= 2 {
648 types::Uuid.encode(buf, &self.topic_id)?;
649 }
650 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
651 let num_tagged_fields = self.unknown_tagged_fields.len();
652 if num_tagged_fields > std::u32::MAX as usize {
653 bail!(
654 "Too many tagged fields to encode ({} fields)",
655 num_tagged_fields
656 );
657 }
658 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
659
660 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
661 Ok(())
662 }
663 fn compute_size(&self, version: i16) -> Result<usize> {
664 let mut total_size = 0;
665 if version <= 1 {
666 total_size += types::CompactString.compute_size(&self.topic_name)?;
667 }
668 if version >= 2 {
669 total_size += types::Uuid.compute_size(&self.topic_id)?;
670 }
671 total_size +=
672 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
673 let num_tagged_fields = self.unknown_tagged_fields.len();
674 if num_tagged_fields > std::u32::MAX as usize {
675 bail!(
676 "Too many tagged fields to encode ({} fields)",
677 num_tagged_fields
678 );
679 }
680 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
681
682 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
683 Ok(total_size)
684 }
685}
686
687#[cfg(feature = "broker")]
688impl Decodable for TopicData {
689 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
690 if version < 0 || version > 3 {
691 bail!("specified version not supported by this message type");
692 }
693 let topic_name = if version <= 1 {
694 types::CompactString.decode(buf)?
695 } else {
696 Default::default()
697 };
698 let topic_id = if version >= 2 {
699 types::Uuid.decode(buf)?
700 } else {
701 Uuid::nil()
702 };
703 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
704 let mut unknown_tagged_fields = BTreeMap::new();
705 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
706 for _ in 0..num_tagged_fields {
707 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
708 let size: u32 = types::UnsignedVarInt.decode(buf)?;
709 let unknown_value = buf.try_get_bytes(size as usize)?;
710 unknown_tagged_fields.insert(tag as i32, unknown_value);
711 }
712 Ok(Self {
713 topic_name,
714 topic_id,
715 partitions,
716 unknown_tagged_fields,
717 })
718 }
719}
720
721impl Default for TopicData {
722 fn default() -> Self {
723 Self {
724 topic_name: Default::default(),
725 topic_id: Uuid::nil(),
726 partitions: Default::default(),
727 unknown_tagged_fields: BTreeMap::new(),
728 }
729 }
730}
731
732impl Message for TopicData {
733 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
734 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
735}
736
737impl HeaderVersion for AlterPartitionRequest {
738 fn header_version(version: i16) -> i16 {
739 2
740 }
741}