1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AlterPartitionRequest {
24 pub broker_id: super::BrokerId,
28
29 pub broker_epoch: i64,
33
34 pub topics: Vec<TopicData>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl AlterPartitionRequest {
44 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
50 self.broker_id = value;
51 self
52 }
53 pub fn with_broker_epoch(mut self, value: i64) -> Self {
59 self.broker_epoch = value;
60 self
61 }
62 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
68 self.topics = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for AlterPartitionRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 types::Int32.encode(buf, &self.broker_id)?;
87 types::Int64.encode(buf, &self.broker_epoch)?;
88 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
89 let num_tagged_fields = self.unknown_tagged_fields.len();
90 if num_tagged_fields > std::u32::MAX as usize {
91 bail!(
92 "Too many tagged fields to encode ({} fields)",
93 num_tagged_fields
94 );
95 }
96 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
97
98 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
99 Ok(())
100 }
101 fn compute_size(&self, version: i16) -> Result<usize> {
102 let mut total_size = 0;
103 total_size += types::Int32.compute_size(&self.broker_id)?;
104 total_size += types::Int64.compute_size(&self.broker_epoch)?;
105 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
106 let num_tagged_fields = self.unknown_tagged_fields.len();
107 if num_tagged_fields > std::u32::MAX as usize {
108 bail!(
109 "Too many tagged fields to encode ({} fields)",
110 num_tagged_fields
111 );
112 }
113 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
114
115 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
116 Ok(total_size)
117 }
118}
119
120#[cfg(feature = "broker")]
121impl Decodable for AlterPartitionRequest {
122 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
123 let broker_id = types::Int32.decode(buf)?;
124 let broker_epoch = types::Int64.decode(buf)?;
125 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
126 let mut unknown_tagged_fields = BTreeMap::new();
127 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
128 for _ in 0..num_tagged_fields {
129 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
130 let size: u32 = types::UnsignedVarInt.decode(buf)?;
131 let unknown_value = buf.try_get_bytes(size as usize)?;
132 unknown_tagged_fields.insert(tag as i32, unknown_value);
133 }
134 Ok(Self {
135 broker_id,
136 broker_epoch,
137 topics,
138 unknown_tagged_fields,
139 })
140 }
141}
142
143impl Default for AlterPartitionRequest {
144 fn default() -> Self {
145 Self {
146 broker_id: (0).into(),
147 broker_epoch: -1,
148 topics: Default::default(),
149 unknown_tagged_fields: BTreeMap::new(),
150 }
151 }
152}
153
154impl Message for AlterPartitionRequest {
155 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
156 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
157}
158
159#[non_exhaustive]
161#[derive(Debug, Clone, PartialEq)]
162pub struct BrokerState {
163 pub broker_id: super::BrokerId,
167
168 pub broker_epoch: i64,
172
173 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
175}
176
177impl BrokerState {
178 pub fn with_broker_id(mut self, value: super::BrokerId) -> Self {
184 self.broker_id = value;
185 self
186 }
187 pub fn with_broker_epoch(mut self, value: i64) -> Self {
193 self.broker_epoch = value;
194 self
195 }
196 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
198 self.unknown_tagged_fields = value;
199 self
200 }
201 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
203 self.unknown_tagged_fields.insert(key, value);
204 self
205 }
206}
207
208#[cfg(feature = "client")]
209impl Encodable for BrokerState {
210 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
211 if version >= 3 {
212 types::Int32.encode(buf, &self.broker_id)?;
213 } else {
214 if self.broker_id != 0 {
215 bail!("A field is set that is not available on the selected protocol version");
216 }
217 }
218 if version >= 3 {
219 types::Int64.encode(buf, &self.broker_epoch)?;
220 } else {
221 if self.broker_epoch != -1 {
222 bail!("A field is set that is not available on the selected protocol version");
223 }
224 }
225 let num_tagged_fields = self.unknown_tagged_fields.len();
226 if num_tagged_fields > std::u32::MAX as usize {
227 bail!(
228 "Too many tagged fields to encode ({} fields)",
229 num_tagged_fields
230 );
231 }
232 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
233
234 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
235 Ok(())
236 }
237 fn compute_size(&self, version: i16) -> Result<usize> {
238 let mut total_size = 0;
239 if version >= 3 {
240 total_size += types::Int32.compute_size(&self.broker_id)?;
241 } else {
242 if self.broker_id != 0 {
243 bail!("A field is set that is not available on the selected protocol version");
244 }
245 }
246 if version >= 3 {
247 total_size += types::Int64.compute_size(&self.broker_epoch)?;
248 } else {
249 if self.broker_epoch != -1 {
250 bail!("A field is set that is not available on the selected protocol version");
251 }
252 }
253 let num_tagged_fields = self.unknown_tagged_fields.len();
254 if num_tagged_fields > std::u32::MAX as usize {
255 bail!(
256 "Too many tagged fields to encode ({} fields)",
257 num_tagged_fields
258 );
259 }
260 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
261
262 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
263 Ok(total_size)
264 }
265}
266
267#[cfg(feature = "broker")]
268impl Decodable for BrokerState {
269 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
270 let broker_id = if version >= 3 {
271 types::Int32.decode(buf)?
272 } else {
273 (0).into()
274 };
275 let broker_epoch = if version >= 3 {
276 types::Int64.decode(buf)?
277 } else {
278 -1
279 };
280 let mut unknown_tagged_fields = BTreeMap::new();
281 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
282 for _ in 0..num_tagged_fields {
283 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
284 let size: u32 = types::UnsignedVarInt.decode(buf)?;
285 let unknown_value = buf.try_get_bytes(size as usize)?;
286 unknown_tagged_fields.insert(tag as i32, unknown_value);
287 }
288 Ok(Self {
289 broker_id,
290 broker_epoch,
291 unknown_tagged_fields,
292 })
293 }
294}
295
296impl Default for BrokerState {
297 fn default() -> Self {
298 Self {
299 broker_id: (0).into(),
300 broker_epoch: -1,
301 unknown_tagged_fields: BTreeMap::new(),
302 }
303 }
304}
305
306impl Message for BrokerState {
307 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
308 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
309}
310
311#[non_exhaustive]
313#[derive(Debug, Clone, PartialEq)]
314pub struct PartitionData {
315 pub partition_index: i32,
319
320 pub leader_epoch: i32,
324
325 pub new_isr: Vec<super::BrokerId>,
329
330 pub new_isr_with_epochs: Vec<BrokerState>,
334
335 pub leader_recovery_state: i8,
339
340 pub partition_epoch: i32,
344
345 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
347}
348
349impl PartitionData {
350 pub fn with_partition_index(mut self, value: i32) -> Self {
356 self.partition_index = value;
357 self
358 }
359 pub fn with_leader_epoch(mut self, value: i32) -> Self {
365 self.leader_epoch = value;
366 self
367 }
368 pub fn with_new_isr(mut self, value: Vec<super::BrokerId>) -> Self {
374 self.new_isr = value;
375 self
376 }
377 pub fn with_new_isr_with_epochs(mut self, value: Vec<BrokerState>) -> Self {
383 self.new_isr_with_epochs = value;
384 self
385 }
386 pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
392 self.leader_recovery_state = value;
393 self
394 }
395 pub fn with_partition_epoch(mut self, value: i32) -> Self {
401 self.partition_epoch = value;
402 self
403 }
404 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
406 self.unknown_tagged_fields = value;
407 self
408 }
409 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
411 self.unknown_tagged_fields.insert(key, value);
412 self
413 }
414}
415
416#[cfg(feature = "client")]
417impl Encodable for PartitionData {
418 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
419 types::Int32.encode(buf, &self.partition_index)?;
420 types::Int32.encode(buf, &self.leader_epoch)?;
421 if version <= 2 {
422 types::CompactArray(types::Int32).encode(buf, &self.new_isr)?;
423 } else {
424 if !self.new_isr.is_empty() {
425 bail!("A field is set that is not available on the selected protocol version");
426 }
427 }
428 if version >= 3 {
429 types::CompactArray(types::Struct { version })
430 .encode(buf, &self.new_isr_with_epochs)?;
431 } else {
432 if !self.new_isr_with_epochs.is_empty() {
433 bail!("A field is set that is not available on the selected protocol version");
434 }
435 }
436 if version >= 1 {
437 types::Int8.encode(buf, &self.leader_recovery_state)?;
438 } else {
439 if self.leader_recovery_state != 0 {
440 bail!("A field is set that is not available on the selected protocol version");
441 }
442 }
443 types::Int32.encode(buf, &self.partition_epoch)?;
444 let num_tagged_fields = self.unknown_tagged_fields.len();
445 if num_tagged_fields > std::u32::MAX as usize {
446 bail!(
447 "Too many tagged fields to encode ({} fields)",
448 num_tagged_fields
449 );
450 }
451 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
452
453 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
454 Ok(())
455 }
456 fn compute_size(&self, version: i16) -> Result<usize> {
457 let mut total_size = 0;
458 total_size += types::Int32.compute_size(&self.partition_index)?;
459 total_size += types::Int32.compute_size(&self.leader_epoch)?;
460 if version <= 2 {
461 total_size += types::CompactArray(types::Int32).compute_size(&self.new_isr)?;
462 } else {
463 if !self.new_isr.is_empty() {
464 bail!("A field is set that is not available on the selected protocol version");
465 }
466 }
467 if version >= 3 {
468 total_size += types::CompactArray(types::Struct { version })
469 .compute_size(&self.new_isr_with_epochs)?;
470 } else {
471 if !self.new_isr_with_epochs.is_empty() {
472 bail!("A field is set that is not available on the selected protocol version");
473 }
474 }
475 if version >= 1 {
476 total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
477 } else {
478 if self.leader_recovery_state != 0 {
479 bail!("A field is set that is not available on the selected protocol version");
480 }
481 }
482 total_size += types::Int32.compute_size(&self.partition_epoch)?;
483 let num_tagged_fields = self.unknown_tagged_fields.len();
484 if num_tagged_fields > std::u32::MAX as usize {
485 bail!(
486 "Too many tagged fields to encode ({} fields)",
487 num_tagged_fields
488 );
489 }
490 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
491
492 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
493 Ok(total_size)
494 }
495}
496
497#[cfg(feature = "broker")]
498impl Decodable for PartitionData {
499 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
500 let partition_index = types::Int32.decode(buf)?;
501 let leader_epoch = types::Int32.decode(buf)?;
502 let new_isr = if version <= 2 {
503 types::CompactArray(types::Int32).decode(buf)?
504 } else {
505 Default::default()
506 };
507 let new_isr_with_epochs = if version >= 3 {
508 types::CompactArray(types::Struct { version }).decode(buf)?
509 } else {
510 Default::default()
511 };
512 let leader_recovery_state = if version >= 1 {
513 types::Int8.decode(buf)?
514 } else {
515 0
516 };
517 let partition_epoch = types::Int32.decode(buf)?;
518 let mut unknown_tagged_fields = BTreeMap::new();
519 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
520 for _ in 0..num_tagged_fields {
521 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
522 let size: u32 = types::UnsignedVarInt.decode(buf)?;
523 let unknown_value = buf.try_get_bytes(size as usize)?;
524 unknown_tagged_fields.insert(tag as i32, unknown_value);
525 }
526 Ok(Self {
527 partition_index,
528 leader_epoch,
529 new_isr,
530 new_isr_with_epochs,
531 leader_recovery_state,
532 partition_epoch,
533 unknown_tagged_fields,
534 })
535 }
536}
537
538impl Default for PartitionData {
539 fn default() -> Self {
540 Self {
541 partition_index: 0,
542 leader_epoch: 0,
543 new_isr: Default::default(),
544 new_isr_with_epochs: Default::default(),
545 leader_recovery_state: 0,
546 partition_epoch: 0,
547 unknown_tagged_fields: BTreeMap::new(),
548 }
549 }
550}
551
552impl Message for PartitionData {
553 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
554 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
555}
556
557#[non_exhaustive]
559#[derive(Debug, Clone, PartialEq)]
560pub struct TopicData {
561 pub topic_name: super::TopicName,
565
566 pub topic_id: Uuid,
570
571 pub partitions: Vec<PartitionData>,
575
576 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
578}
579
580impl TopicData {
581 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
587 self.topic_name = value;
588 self
589 }
590 pub fn with_topic_id(mut self, value: Uuid) -> Self {
596 self.topic_id = value;
597 self
598 }
599 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
605 self.partitions = value;
606 self
607 }
608 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
610 self.unknown_tagged_fields = value;
611 self
612 }
613 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
615 self.unknown_tagged_fields.insert(key, value);
616 self
617 }
618}
619
620#[cfg(feature = "client")]
621impl Encodable for TopicData {
622 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
623 if version <= 1 {
624 types::CompactString.encode(buf, &self.topic_name)?;
625 }
626 if version >= 2 {
627 types::Uuid.encode(buf, &self.topic_id)?;
628 }
629 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
630 let num_tagged_fields = self.unknown_tagged_fields.len();
631 if num_tagged_fields > std::u32::MAX as usize {
632 bail!(
633 "Too many tagged fields to encode ({} fields)",
634 num_tagged_fields
635 );
636 }
637 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
638
639 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
640 Ok(())
641 }
642 fn compute_size(&self, version: i16) -> Result<usize> {
643 let mut total_size = 0;
644 if version <= 1 {
645 total_size += types::CompactString.compute_size(&self.topic_name)?;
646 }
647 if version >= 2 {
648 total_size += types::Uuid.compute_size(&self.topic_id)?;
649 }
650 total_size +=
651 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
652 let num_tagged_fields = self.unknown_tagged_fields.len();
653 if num_tagged_fields > std::u32::MAX as usize {
654 bail!(
655 "Too many tagged fields to encode ({} fields)",
656 num_tagged_fields
657 );
658 }
659 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
660
661 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
662 Ok(total_size)
663 }
664}
665
666#[cfg(feature = "broker")]
667impl Decodable for TopicData {
668 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
669 let topic_name = if version <= 1 {
670 types::CompactString.decode(buf)?
671 } else {
672 Default::default()
673 };
674 let topic_id = if version >= 2 {
675 types::Uuid.decode(buf)?
676 } else {
677 Uuid::nil()
678 };
679 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
680 let mut unknown_tagged_fields = BTreeMap::new();
681 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
682 for _ in 0..num_tagged_fields {
683 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
684 let size: u32 = types::UnsignedVarInt.decode(buf)?;
685 let unknown_value = buf.try_get_bytes(size as usize)?;
686 unknown_tagged_fields.insert(tag as i32, unknown_value);
687 }
688 Ok(Self {
689 topic_name,
690 topic_id,
691 partitions,
692 unknown_tagged_fields,
693 })
694 }
695}
696
697impl Default for TopicData {
698 fn default() -> Self {
699 Self {
700 topic_name: Default::default(),
701 topic_id: Uuid::nil(),
702 partitions: Default::default(),
703 unknown_tagged_fields: BTreeMap::new(),
704 }
705 }
706}
707
708impl Message for TopicData {
709 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
710 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
711}
712
713impl HeaderVersion for AlterPartitionRequest {
714 fn header_version(version: i16) -> i16 {
715 2
716 }
717}