1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AlterPartitionRequest {
24 pub broker_id: super::BrokerId,
28
29 pub broker_epoch: i64,
33
34 pub topics: Vec<TopicData>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl AlterPartitionRequest {
44 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
50 self.broker_id = value;
51 self
52 }
53 pub fn with_broker_epoch(mut self, value: i64) -> Self {
59 self.broker_epoch = value;
60 self
61 }
62 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
68 self.topics = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for AlterPartitionRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 2 || version > 3 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int32.encode(buf, &self.broker_id)?;
90 types::Int64.encode(buf, &self.broker_epoch)?;
91 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
92 let num_tagged_fields = self.unknown_tagged_fields.len();
93 if num_tagged_fields > std::u32::MAX as usize {
94 bail!(
95 "Too many tagged fields to encode ({} fields)",
96 num_tagged_fields
97 );
98 }
99 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
100
101 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
102 Ok(())
103 }
104 fn compute_size(&self, version: i16) -> Result<usize> {
105 let mut total_size = 0;
106 total_size += types::Int32.compute_size(&self.broker_id)?;
107 total_size += types::Int64.compute_size(&self.broker_epoch)?;
108 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 Ok(total_size)
120 }
121}
122
123#[cfg(feature = "broker")]
124impl Decodable for AlterPartitionRequest {
125 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
126 if version < 2 || version > 3 {
127 bail!("specified version not supported by this message type");
128 }
129 let broker_id = types::Int32.decode(buf)?;
130 let broker_epoch = types::Int64.decode(buf)?;
131 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
132 let mut unknown_tagged_fields = BTreeMap::new();
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 Ok(Self {
141 broker_id,
142 broker_epoch,
143 topics,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for AlterPartitionRequest {
150 fn default() -> Self {
151 Self {
152 broker_id: (0).into(),
153 broker_epoch: -1,
154 topics: Default::default(),
155 unknown_tagged_fields: BTreeMap::new(),
156 }
157 }
158}
159
160impl Message for AlterPartitionRequest {
161 const VERSIONS: VersionRange = VersionRange { min: 2, max: 3 };
162 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
163}
164
165#[non_exhaustive]
167#[derive(Debug, Clone, PartialEq)]
168pub struct BrokerState {
169 pub broker_id: super::BrokerId,
173
174 pub broker_epoch: i64,
178
179 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
181}
182
183impl BrokerState {
184 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
190 self.broker_id = value;
191 self
192 }
193 pub fn with_broker_epoch(mut self, value: i64) -> Self {
199 self.broker_epoch = value;
200 self
201 }
202 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
204 self.unknown_tagged_fields = value;
205 self
206 }
207 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
209 self.unknown_tagged_fields.insert(key, value);
210 self
211 }
212}
213
214#[cfg(feature = "client")]
215impl Encodable for BrokerState {
216 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
217 if version < 2 || version > 3 {
218 bail!("specified version not supported by this message type");
219 }
220 if version >= 3 {
221 types::Int32.encode(buf, &self.broker_id)?;
222 } else {
223 if self.broker_id != 0 {
224 bail!("A field is set that is not available on the selected protocol version");
225 }
226 }
227 if version >= 3 {
228 types::Int64.encode(buf, &self.broker_epoch)?;
229 } else {
230 if self.broker_epoch != -1 {
231 bail!("A field is set that is not available on the selected protocol version");
232 }
233 }
234 let num_tagged_fields = self.unknown_tagged_fields.len();
235 if num_tagged_fields > std::u32::MAX as usize {
236 bail!(
237 "Too many tagged fields to encode ({} fields)",
238 num_tagged_fields
239 );
240 }
241 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
242
243 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
244 Ok(())
245 }
246 fn compute_size(&self, version: i16) -> Result<usize> {
247 let mut total_size = 0;
248 if version >= 3 {
249 total_size += types::Int32.compute_size(&self.broker_id)?;
250 } else {
251 if self.broker_id != 0 {
252 bail!("A field is set that is not available on the selected protocol version");
253 }
254 }
255 if version >= 3 {
256 total_size += types::Int64.compute_size(&self.broker_epoch)?;
257 } else {
258 if self.broker_epoch != -1 {
259 bail!("A field is set that is not available on the selected protocol version");
260 }
261 }
262 let num_tagged_fields = self.unknown_tagged_fields.len();
263 if num_tagged_fields > std::u32::MAX as usize {
264 bail!(
265 "Too many tagged fields to encode ({} fields)",
266 num_tagged_fields
267 );
268 }
269 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
270
271 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
272 Ok(total_size)
273 }
274}
275
276#[cfg(feature = "broker")]
277impl Decodable for BrokerState {
278 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
279 if version < 2 || version > 3 {
280 bail!("specified version not supported by this message type");
281 }
282 let broker_id = if version >= 3 {
283 types::Int32.decode(buf)?
284 } else {
285 (0).into()
286 };
287 let broker_epoch = if version >= 3 {
288 types::Int64.decode(buf)?
289 } else {
290 -1
291 };
292 let mut unknown_tagged_fields = BTreeMap::new();
293 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
294 for _ in 0..num_tagged_fields {
295 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
296 let size: u32 = types::UnsignedVarInt.decode(buf)?;
297 let unknown_value = buf.try_get_bytes(size as usize)?;
298 unknown_tagged_fields.insert(tag as i32, unknown_value);
299 }
300 Ok(Self {
301 broker_id,
302 broker_epoch,
303 unknown_tagged_fields,
304 })
305 }
306}
307
308impl Default for BrokerState {
309 fn default() -> Self {
310 Self {
311 broker_id: (0).into(),
312 broker_epoch: -1,
313 unknown_tagged_fields: BTreeMap::new(),
314 }
315 }
316}
317
318impl Message for BrokerState {
319 const VERSIONS: VersionRange = VersionRange { min: 2, max: 3 };
320 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
321}
322
323#[non_exhaustive]
325#[derive(Debug, Clone, PartialEq)]
326pub struct PartitionData {
327 pub partition_index: i32,
331
332 pub leader_epoch: i32,
336
337 pub new_isr: Vec<super::BrokerId>,
341
342 pub new_isr_with_epochs: Vec<BrokerState>,
346
347 pub leader_recovery_state: i8,
351
352 pub partition_epoch: i32,
356
357 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
359}
360
361impl PartitionData {
362 pub fn with_partition_index(mut self, value: i32) -> Self {
368 self.partition_index = value;
369 self
370 }
371 pub fn with_leader_epoch(mut self, value: i32) -> Self {
377 self.leader_epoch = value;
378 self
379 }
380 pub fn with_new_isr(mut self, value: Vec<super::BrokerId>) -> Self {
386 self.new_isr = value;
387 self
388 }
389 pub fn with_new_isr_with_epochs(mut self, value: Vec<BrokerState>) -> Self {
395 self.new_isr_with_epochs = value;
396 self
397 }
398 pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
404 self.leader_recovery_state = value;
405 self
406 }
407 pub fn with_partition_epoch(mut self, value: i32) -> Self {
413 self.partition_epoch = value;
414 self
415 }
416 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
418 self.unknown_tagged_fields = value;
419 self
420 }
421 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
423 self.unknown_tagged_fields.insert(key, value);
424 self
425 }
426}
427
428#[cfg(feature = "client")]
429impl Encodable for PartitionData {
430 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
431 if version < 2 || version > 3 {
432 bail!("specified version not supported by this message type");
433 }
434 types::Int32.encode(buf, &self.partition_index)?;
435 types::Int32.encode(buf, &self.leader_epoch)?;
436 if version == 2 {
437 types::CompactArray(types::Int32).encode(buf, &self.new_isr)?;
438 } else {
439 if !self.new_isr.is_empty() {
440 bail!("A field is set that is not available on the selected protocol version");
441 }
442 }
443 if version >= 3 {
444 types::CompactArray(types::Struct { version })
445 .encode(buf, &self.new_isr_with_epochs)?;
446 } else {
447 if !self.new_isr_with_epochs.is_empty() {
448 bail!("A field is set that is not available on the selected protocol version");
449 }
450 }
451 types::Int8.encode(buf, &self.leader_recovery_state)?;
452 types::Int32.encode(buf, &self.partition_epoch)?;
453 let num_tagged_fields = self.unknown_tagged_fields.len();
454 if num_tagged_fields > std::u32::MAX as usize {
455 bail!(
456 "Too many tagged fields to encode ({} fields)",
457 num_tagged_fields
458 );
459 }
460 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
461
462 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
463 Ok(())
464 }
465 fn compute_size(&self, version: i16) -> Result<usize> {
466 let mut total_size = 0;
467 total_size += types::Int32.compute_size(&self.partition_index)?;
468 total_size += types::Int32.compute_size(&self.leader_epoch)?;
469 if version == 2 {
470 total_size += types::CompactArray(types::Int32).compute_size(&self.new_isr)?;
471 } else {
472 if !self.new_isr.is_empty() {
473 bail!("A field is set that is not available on the selected protocol version");
474 }
475 }
476 if version >= 3 {
477 total_size += types::CompactArray(types::Struct { version })
478 .compute_size(&self.new_isr_with_epochs)?;
479 } else {
480 if !self.new_isr_with_epochs.is_empty() {
481 bail!("A field is set that is not available on the selected protocol version");
482 }
483 }
484 total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
485 total_size += types::Int32.compute_size(&self.partition_epoch)?;
486 let num_tagged_fields = self.unknown_tagged_fields.len();
487 if num_tagged_fields > std::u32::MAX as usize {
488 bail!(
489 "Too many tagged fields to encode ({} fields)",
490 num_tagged_fields
491 );
492 }
493 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
494
495 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
496 Ok(total_size)
497 }
498}
499
500#[cfg(feature = "broker")]
501impl Decodable for PartitionData {
502 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
503 if version < 2 || version > 3 {
504 bail!("specified version not supported by this message type");
505 }
506 let partition_index = types::Int32.decode(buf)?;
507 let leader_epoch = types::Int32.decode(buf)?;
508 let new_isr = if version == 2 {
509 types::CompactArray(types::Int32).decode(buf)?
510 } else {
511 Default::default()
512 };
513 let new_isr_with_epochs = if version >= 3 {
514 types::CompactArray(types::Struct { version }).decode(buf)?
515 } else {
516 Default::default()
517 };
518 let leader_recovery_state = types::Int8.decode(buf)?;
519 let partition_epoch = types::Int32.decode(buf)?;
520 let mut unknown_tagged_fields = BTreeMap::new();
521 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
522 for _ in 0..num_tagged_fields {
523 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
524 let size: u32 = types::UnsignedVarInt.decode(buf)?;
525 let unknown_value = buf.try_get_bytes(size as usize)?;
526 unknown_tagged_fields.insert(tag as i32, unknown_value);
527 }
528 Ok(Self {
529 partition_index,
530 leader_epoch,
531 new_isr,
532 new_isr_with_epochs,
533 leader_recovery_state,
534 partition_epoch,
535 unknown_tagged_fields,
536 })
537 }
538}
539
540impl Default for PartitionData {
541 fn default() -> Self {
542 Self {
543 partition_index: 0,
544 leader_epoch: 0,
545 new_isr: Default::default(),
546 new_isr_with_epochs: Default::default(),
547 leader_recovery_state: 0,
548 partition_epoch: 0,
549 unknown_tagged_fields: BTreeMap::new(),
550 }
551 }
552}
553
554impl Message for PartitionData {
555 const VERSIONS: VersionRange = VersionRange { min: 2, max: 3 };
556 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
557}
558
559#[non_exhaustive]
561#[derive(Debug, Clone, PartialEq)]
562pub struct TopicData {
563 pub topic_id: Uuid,
567
568 pub partitions: Vec<PartitionData>,
572
573 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
575}
576
577impl TopicData {
578 pub fn with_topic_id(mut self, value: Uuid) -> Self {
584 self.topic_id = value;
585 self
586 }
587 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
593 self.partitions = value;
594 self
595 }
596 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
598 self.unknown_tagged_fields = value;
599 self
600 }
601 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
603 self.unknown_tagged_fields.insert(key, value);
604 self
605 }
606}
607
608#[cfg(feature = "client")]
609impl Encodable for TopicData {
610 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
611 if version < 2 || version > 3 {
612 bail!("specified version not supported by this message type");
613 }
614 types::Uuid.encode(buf, &self.topic_id)?;
615 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
616 let num_tagged_fields = self.unknown_tagged_fields.len();
617 if num_tagged_fields > std::u32::MAX as usize {
618 bail!(
619 "Too many tagged fields to encode ({} fields)",
620 num_tagged_fields
621 );
622 }
623 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
624
625 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
626 Ok(())
627 }
628 fn compute_size(&self, version: i16) -> Result<usize> {
629 let mut total_size = 0;
630 total_size += types::Uuid.compute_size(&self.topic_id)?;
631 total_size +=
632 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
633 let num_tagged_fields = self.unknown_tagged_fields.len();
634 if num_tagged_fields > std::u32::MAX as usize {
635 bail!(
636 "Too many tagged fields to encode ({} fields)",
637 num_tagged_fields
638 );
639 }
640 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
641
642 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
643 Ok(total_size)
644 }
645}
646
647#[cfg(feature = "broker")]
648impl Decodable for TopicData {
649 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
650 if version < 2 || version > 3 {
651 bail!("specified version not supported by this message type");
652 }
653 let topic_id = types::Uuid.decode(buf)?;
654 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
655 let mut unknown_tagged_fields = BTreeMap::new();
656 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
657 for _ in 0..num_tagged_fields {
658 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
659 let size: u32 = types::UnsignedVarInt.decode(buf)?;
660 let unknown_value = buf.try_get_bytes(size as usize)?;
661 unknown_tagged_fields.insert(tag as i32, unknown_value);
662 }
663 Ok(Self {
664 topic_id,
665 partitions,
666 unknown_tagged_fields,
667 })
668 }
669}
670
671impl Default for TopicData {
672 fn default() -> Self {
673 Self {
674 topic_id: Uuid::nil(),
675 partitions: Default::default(),
676 unknown_tagged_fields: BTreeMap::new(),
677 }
678 }
679}
680
681impl Message for TopicData {
682 const VERSIONS: VersionRange = VersionRange { min: 2, max: 3 };
683 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
684}
685
686impl HeaderVersion for AlterPartitionRequest {
687 fn header_version(version: i16) -> i16 {
688 2
689 }
690}