1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct StopReplicaPartitionState {
24 pub partition_index: i32,
28
29 pub leader_epoch: i32,
33
34 pub delete_partition: bool,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl StopReplicaPartitionState {
44 pub fn with_partition_index(mut self, value: i32) -> Self {
50 self.partition_index = value;
51 self
52 }
53 pub fn with_leader_epoch(mut self, value: i32) -> Self {
59 self.leader_epoch = value;
60 self
61 }
62 pub fn with_delete_partition(mut self, value: bool) -> Self {
68 self.delete_partition = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for StopReplicaPartitionState {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version >= 3 {
87 types::Int32.encode(buf, &self.partition_index)?;
88 } else {
89 if self.partition_index != 0 {
90 bail!("A field is set that is not available on the selected protocol version");
91 }
92 }
93 if version >= 3 {
94 types::Int32.encode(buf, &self.leader_epoch)?;
95 } else {
96 if self.leader_epoch != -1 {
97 bail!("A field is set that is not available on the selected protocol version");
98 }
99 }
100 if version >= 3 {
101 types::Boolean.encode(buf, &self.delete_partition)?;
102 } else {
103 if self.delete_partition {
104 bail!("A field is set that is not available on the selected protocol version");
105 }
106 }
107 if version >= 2 {
108 let num_tagged_fields = self.unknown_tagged_fields.len();
109 if num_tagged_fields > std::u32::MAX as usize {
110 bail!(
111 "Too many tagged fields to encode ({} fields)",
112 num_tagged_fields
113 );
114 }
115 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
116
117 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
118 }
119 Ok(())
120 }
121 fn compute_size(&self, version: i16) -> Result<usize> {
122 let mut total_size = 0;
123 if version >= 3 {
124 total_size += types::Int32.compute_size(&self.partition_index)?;
125 } else {
126 if self.partition_index != 0 {
127 bail!("A field is set that is not available on the selected protocol version");
128 }
129 }
130 if version >= 3 {
131 total_size += types::Int32.compute_size(&self.leader_epoch)?;
132 } else {
133 if self.leader_epoch != -1 {
134 bail!("A field is set that is not available on the selected protocol version");
135 }
136 }
137 if version >= 3 {
138 total_size += types::Boolean.compute_size(&self.delete_partition)?;
139 } else {
140 if self.delete_partition {
141 bail!("A field is set that is not available on the selected protocol version");
142 }
143 }
144 if version >= 2 {
145 let num_tagged_fields = self.unknown_tagged_fields.len();
146 if num_tagged_fields > std::u32::MAX as usize {
147 bail!(
148 "Too many tagged fields to encode ({} fields)",
149 num_tagged_fields
150 );
151 }
152 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
153
154 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
155 }
156 Ok(total_size)
157 }
158}
159
160#[cfg(feature = "broker")]
161impl Decodable for StopReplicaPartitionState {
162 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
163 let partition_index = if version >= 3 {
164 types::Int32.decode(buf)?
165 } else {
166 0
167 };
168 let leader_epoch = if version >= 3 {
169 types::Int32.decode(buf)?
170 } else {
171 -1
172 };
173 let delete_partition = if version >= 3 {
174 types::Boolean.decode(buf)?
175 } else {
176 false
177 };
178 let mut unknown_tagged_fields = BTreeMap::new();
179 if version >= 2 {
180 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
181 for _ in 0..num_tagged_fields {
182 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
183 let size: u32 = types::UnsignedVarInt.decode(buf)?;
184 let unknown_value = buf.try_get_bytes(size as usize)?;
185 unknown_tagged_fields.insert(tag as i32, unknown_value);
186 }
187 }
188 Ok(Self {
189 partition_index,
190 leader_epoch,
191 delete_partition,
192 unknown_tagged_fields,
193 })
194 }
195}
196
197impl Default for StopReplicaPartitionState {
198 fn default() -> Self {
199 Self {
200 partition_index: 0,
201 leader_epoch: -1,
202 delete_partition: false,
203 unknown_tagged_fields: BTreeMap::new(),
204 }
205 }
206}
207
208impl Message for StopReplicaPartitionState {
209 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
210 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
211}
212
213#[non_exhaustive]
215#[derive(Debug, Clone, PartialEq)]
216pub struct StopReplicaPartitionV0 {
217 pub topic_name: super::TopicName,
221
222 pub partition_index: i32,
226
227 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
229}
230
231impl StopReplicaPartitionV0 {
232 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
238 self.topic_name = value;
239 self
240 }
241 pub fn with_partition_index(mut self, value: i32) -> Self {
247 self.partition_index = value;
248 self
249 }
250 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
252 self.unknown_tagged_fields = value;
253 self
254 }
255 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
257 self.unknown_tagged_fields.insert(key, value);
258 self
259 }
260}
261
262#[cfg(feature = "client")]
263impl Encodable for StopReplicaPartitionV0 {
264 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
265 if version == 0 {
266 types::String.encode(buf, &self.topic_name)?;
267 } else {
268 if !self.topic_name.is_empty() {
269 bail!("A field is set that is not available on the selected protocol version");
270 }
271 }
272 if version == 0 {
273 types::Int32.encode(buf, &self.partition_index)?;
274 } else {
275 if self.partition_index != 0 {
276 bail!("A field is set that is not available on the selected protocol version");
277 }
278 }
279 if version >= 2 {
280 let num_tagged_fields = self.unknown_tagged_fields.len();
281 if num_tagged_fields > std::u32::MAX as usize {
282 bail!(
283 "Too many tagged fields to encode ({} fields)",
284 num_tagged_fields
285 );
286 }
287 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
288
289 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
290 }
291 Ok(())
292 }
293 fn compute_size(&self, version: i16) -> Result<usize> {
294 let mut total_size = 0;
295 if version == 0 {
296 total_size += types::String.compute_size(&self.topic_name)?;
297 } else {
298 if !self.topic_name.is_empty() {
299 bail!("A field is set that is not available on the selected protocol version");
300 }
301 }
302 if version == 0 {
303 total_size += types::Int32.compute_size(&self.partition_index)?;
304 } else {
305 if self.partition_index != 0 {
306 bail!("A field is set that is not available on the selected protocol version");
307 }
308 }
309 if version >= 2 {
310 let num_tagged_fields = self.unknown_tagged_fields.len();
311 if num_tagged_fields > std::u32::MAX as usize {
312 bail!(
313 "Too many tagged fields to encode ({} fields)",
314 num_tagged_fields
315 );
316 }
317 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
318
319 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
320 }
321 Ok(total_size)
322 }
323}
324
325#[cfg(feature = "broker")]
326impl Decodable for StopReplicaPartitionV0 {
327 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
328 let topic_name = if version == 0 {
329 types::String.decode(buf)?
330 } else {
331 Default::default()
332 };
333 let partition_index = if version == 0 {
334 types::Int32.decode(buf)?
335 } else {
336 0
337 };
338 let mut unknown_tagged_fields = BTreeMap::new();
339 if version >= 2 {
340 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
341 for _ in 0..num_tagged_fields {
342 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
343 let size: u32 = types::UnsignedVarInt.decode(buf)?;
344 let unknown_value = buf.try_get_bytes(size as usize)?;
345 unknown_tagged_fields.insert(tag as i32, unknown_value);
346 }
347 }
348 Ok(Self {
349 topic_name,
350 partition_index,
351 unknown_tagged_fields,
352 })
353 }
354}
355
356impl Default for StopReplicaPartitionV0 {
357 fn default() -> Self {
358 Self {
359 topic_name: Default::default(),
360 partition_index: 0,
361 unknown_tagged_fields: BTreeMap::new(),
362 }
363 }
364}
365
366impl Message for StopReplicaPartitionV0 {
367 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
368 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
369}
370
371#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq)]
374pub struct StopReplicaRequest {
375 pub controller_id: super::BrokerId,
379
380 pub is_k_raft_controller: bool,
384
385 pub controller_epoch: i32,
389
390 pub broker_epoch: i64,
394
395 pub delete_partitions: bool,
399
400 pub ungrouped_partitions: Vec<StopReplicaPartitionV0>,
404
405 pub topics: Vec<StopReplicaTopicV1>,
409
410 pub topic_states: Vec<StopReplicaTopicState>,
414
415 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
417}
418
419impl StopReplicaRequest {
420 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
426 self.controller_id = value;
427 self
428 }
429 pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
435 self.is_k_raft_controller = value;
436 self
437 }
438 pub fn with_controller_epoch(mut self, value: i32) -> Self {
444 self.controller_epoch = value;
445 self
446 }
447 pub fn with_broker_epoch(mut self, value: i64) -> Self {
453 self.broker_epoch = value;
454 self
455 }
456 pub fn with_delete_partitions(mut self, value: bool) -> Self {
462 self.delete_partitions = value;
463 self
464 }
465 pub fn with_ungrouped_partitions(mut self, value: Vec<StopReplicaPartitionV0>) -> Self {
471 self.ungrouped_partitions = value;
472 self
473 }
474 pub fn with_topics(mut self, value: Vec<StopReplicaTopicV1>) -> Self {
480 self.topics = value;
481 self
482 }
483 pub fn with_topic_states(mut self, value: Vec<StopReplicaTopicState>) -> Self {
489 self.topic_states = value;
490 self
491 }
492 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
494 self.unknown_tagged_fields = value;
495 self
496 }
497 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
499 self.unknown_tagged_fields.insert(key, value);
500 self
501 }
502}
503
504#[cfg(feature = "client")]
505impl Encodable for StopReplicaRequest {
506 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
507 types::Int32.encode(buf, &self.controller_id)?;
508 if version >= 4 {
509 types::Boolean.encode(buf, &self.is_k_raft_controller)?;
510 } else {
511 if self.is_k_raft_controller {
512 bail!("A field is set that is not available on the selected protocol version");
513 }
514 }
515 types::Int32.encode(buf, &self.controller_epoch)?;
516 if version >= 1 {
517 types::Int64.encode(buf, &self.broker_epoch)?;
518 }
519 if version <= 2 {
520 types::Boolean.encode(buf, &self.delete_partitions)?;
521 } else {
522 if self.delete_partitions {
523 bail!("A field is set that is not available on the selected protocol version");
524 }
525 }
526 if version == 0 {
527 types::Array(types::Struct { version }).encode(buf, &self.ungrouped_partitions)?;
528 } else {
529 if !self.ungrouped_partitions.is_empty() {
530 bail!("A field is set that is not available on the selected protocol version");
531 }
532 }
533 if version >= 1 && version <= 2 {
534 if version >= 2 {
535 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
536 } else {
537 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
538 }
539 } else {
540 if !self.topics.is_empty() {
541 bail!("A field is set that is not available on the selected protocol version");
542 }
543 }
544 if version >= 3 {
545 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
546 } else {
547 if !self.topic_states.is_empty() {
548 bail!("A field is set that is not available on the selected protocol version");
549 }
550 }
551 if version >= 2 {
552 let num_tagged_fields = self.unknown_tagged_fields.len();
553 if num_tagged_fields > std::u32::MAX as usize {
554 bail!(
555 "Too many tagged fields to encode ({} fields)",
556 num_tagged_fields
557 );
558 }
559 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
560
561 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
562 }
563 Ok(())
564 }
565 fn compute_size(&self, version: i16) -> Result<usize> {
566 let mut total_size = 0;
567 total_size += types::Int32.compute_size(&self.controller_id)?;
568 if version >= 4 {
569 total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
570 } else {
571 if self.is_k_raft_controller {
572 bail!("A field is set that is not available on the selected protocol version");
573 }
574 }
575 total_size += types::Int32.compute_size(&self.controller_epoch)?;
576 if version >= 1 {
577 total_size += types::Int64.compute_size(&self.broker_epoch)?;
578 }
579 if version <= 2 {
580 total_size += types::Boolean.compute_size(&self.delete_partitions)?;
581 } else {
582 if self.delete_partitions {
583 bail!("A field is set that is not available on the selected protocol version");
584 }
585 }
586 if version == 0 {
587 total_size +=
588 types::Array(types::Struct { version }).compute_size(&self.ungrouped_partitions)?;
589 } else {
590 if !self.ungrouped_partitions.is_empty() {
591 bail!("A field is set that is not available on the selected protocol version");
592 }
593 }
594 if version >= 1 && version <= 2 {
595 if version >= 2 {
596 total_size +=
597 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
598 } else {
599 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
600 }
601 } else {
602 if !self.topics.is_empty() {
603 bail!("A field is set that is not available on the selected protocol version");
604 }
605 }
606 if version >= 3 {
607 total_size +=
608 types::CompactArray(types::Struct { version }).compute_size(&self.topic_states)?;
609 } else {
610 if !self.topic_states.is_empty() {
611 bail!("A field is set that is not available on the selected protocol version");
612 }
613 }
614 if version >= 2 {
615 let num_tagged_fields = self.unknown_tagged_fields.len();
616 if num_tagged_fields > std::u32::MAX as usize {
617 bail!(
618 "Too many tagged fields to encode ({} fields)",
619 num_tagged_fields
620 );
621 }
622 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
623
624 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
625 }
626 Ok(total_size)
627 }
628}
629
630#[cfg(feature = "broker")]
631impl Decodable for StopReplicaRequest {
632 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
633 let controller_id = types::Int32.decode(buf)?;
634 let is_k_raft_controller = if version >= 4 {
635 types::Boolean.decode(buf)?
636 } else {
637 false
638 };
639 let controller_epoch = types::Int32.decode(buf)?;
640 let broker_epoch = if version >= 1 {
641 types::Int64.decode(buf)?
642 } else {
643 -1
644 };
645 let delete_partitions = if version <= 2 {
646 types::Boolean.decode(buf)?
647 } else {
648 false
649 };
650 let ungrouped_partitions = if version == 0 {
651 types::Array(types::Struct { version }).decode(buf)?
652 } else {
653 Default::default()
654 };
655 let topics = if version >= 1 && version <= 2 {
656 if version >= 2 {
657 types::CompactArray(types::Struct { version }).decode(buf)?
658 } else {
659 types::Array(types::Struct { version }).decode(buf)?
660 }
661 } else {
662 Default::default()
663 };
664 let topic_states = if version >= 3 {
665 types::CompactArray(types::Struct { version }).decode(buf)?
666 } else {
667 Default::default()
668 };
669 let mut unknown_tagged_fields = BTreeMap::new();
670 if version >= 2 {
671 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
672 for _ in 0..num_tagged_fields {
673 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
674 let size: u32 = types::UnsignedVarInt.decode(buf)?;
675 let unknown_value = buf.try_get_bytes(size as usize)?;
676 unknown_tagged_fields.insert(tag as i32, unknown_value);
677 }
678 }
679 Ok(Self {
680 controller_id,
681 is_k_raft_controller,
682 controller_epoch,
683 broker_epoch,
684 delete_partitions,
685 ungrouped_partitions,
686 topics,
687 topic_states,
688 unknown_tagged_fields,
689 })
690 }
691}
692
693impl Default for StopReplicaRequest {
694 fn default() -> Self {
695 Self {
696 controller_id: (0).into(),
697 is_k_raft_controller: false,
698 controller_epoch: 0,
699 broker_epoch: -1,
700 delete_partitions: false,
701 ungrouped_partitions: Default::default(),
702 topics: Default::default(),
703 topic_states: Default::default(),
704 unknown_tagged_fields: BTreeMap::new(),
705 }
706 }
707}
708
709impl Message for StopReplicaRequest {
710 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
711 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
712}
713
714#[non_exhaustive]
716#[derive(Debug, Clone, PartialEq)]
717pub struct StopReplicaTopicState {
718 pub topic_name: super::TopicName,
722
723 pub partition_states: Vec<StopReplicaPartitionState>,
727
728 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
730}
731
732impl StopReplicaTopicState {
733 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
739 self.topic_name = value;
740 self
741 }
742 pub fn with_partition_states(mut self, value: Vec<StopReplicaPartitionState>) -> Self {
748 self.partition_states = value;
749 self
750 }
751 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
753 self.unknown_tagged_fields = value;
754 self
755 }
756 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
758 self.unknown_tagged_fields.insert(key, value);
759 self
760 }
761}
762
763#[cfg(feature = "client")]
764impl Encodable for StopReplicaTopicState {
765 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
766 if version >= 3 {
767 types::CompactString.encode(buf, &self.topic_name)?;
768 } else {
769 if !self.topic_name.is_empty() {
770 bail!("A field is set that is not available on the selected protocol version");
771 }
772 }
773 if version >= 3 {
774 types::CompactArray(types::Struct { version }).encode(buf, &self.partition_states)?;
775 } else {
776 if !self.partition_states.is_empty() {
777 bail!("A field is set that is not available on the selected protocol version");
778 }
779 }
780 if version >= 2 {
781 let num_tagged_fields = self.unknown_tagged_fields.len();
782 if num_tagged_fields > std::u32::MAX as usize {
783 bail!(
784 "Too many tagged fields to encode ({} fields)",
785 num_tagged_fields
786 );
787 }
788 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
789
790 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
791 }
792 Ok(())
793 }
794 fn compute_size(&self, version: i16) -> Result<usize> {
795 let mut total_size = 0;
796 if version >= 3 {
797 total_size += types::CompactString.compute_size(&self.topic_name)?;
798 } else {
799 if !self.topic_name.is_empty() {
800 bail!("A field is set that is not available on the selected protocol version");
801 }
802 }
803 if version >= 3 {
804 total_size += types::CompactArray(types::Struct { version })
805 .compute_size(&self.partition_states)?;
806 } else {
807 if !self.partition_states.is_empty() {
808 bail!("A field is set that is not available on the selected protocol version");
809 }
810 }
811 if version >= 2 {
812 let num_tagged_fields = self.unknown_tagged_fields.len();
813 if num_tagged_fields > std::u32::MAX as usize {
814 bail!(
815 "Too many tagged fields to encode ({} fields)",
816 num_tagged_fields
817 );
818 }
819 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
820
821 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
822 }
823 Ok(total_size)
824 }
825}
826
827#[cfg(feature = "broker")]
828impl Decodable for StopReplicaTopicState {
829 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
830 let topic_name = if version >= 3 {
831 types::CompactString.decode(buf)?
832 } else {
833 Default::default()
834 };
835 let partition_states = if version >= 3 {
836 types::CompactArray(types::Struct { version }).decode(buf)?
837 } else {
838 Default::default()
839 };
840 let mut unknown_tagged_fields = BTreeMap::new();
841 if version >= 2 {
842 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
843 for _ in 0..num_tagged_fields {
844 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
845 let size: u32 = types::UnsignedVarInt.decode(buf)?;
846 let unknown_value = buf.try_get_bytes(size as usize)?;
847 unknown_tagged_fields.insert(tag as i32, unknown_value);
848 }
849 }
850 Ok(Self {
851 topic_name,
852 partition_states,
853 unknown_tagged_fields,
854 })
855 }
856}
857
858impl Default for StopReplicaTopicState {
859 fn default() -> Self {
860 Self {
861 topic_name: Default::default(),
862 partition_states: Default::default(),
863 unknown_tagged_fields: BTreeMap::new(),
864 }
865 }
866}
867
868impl Message for StopReplicaTopicState {
869 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
870 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
871}
872
873#[non_exhaustive]
875#[derive(Debug, Clone, PartialEq)]
876pub struct StopReplicaTopicV1 {
877 pub name: super::TopicName,
881
882 pub partition_indexes: Vec<i32>,
886
887 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
889}
890
891impl StopReplicaTopicV1 {
892 pub fn with_name(mut self, value: super::TopicName) -> Self {
898 self.name = value;
899 self
900 }
901 pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
907 self.partition_indexes = value;
908 self
909 }
910 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
912 self.unknown_tagged_fields = value;
913 self
914 }
915 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
917 self.unknown_tagged_fields.insert(key, value);
918 self
919 }
920}
921
922#[cfg(feature = "client")]
923impl Encodable for StopReplicaTopicV1 {
924 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
925 if version >= 1 && version <= 2 {
926 if version >= 2 {
927 types::CompactString.encode(buf, &self.name)?;
928 } else {
929 types::String.encode(buf, &self.name)?;
930 }
931 } else {
932 if !self.name.is_empty() {
933 bail!("A field is set that is not available on the selected protocol version");
934 }
935 }
936 if version >= 1 && version <= 2 {
937 if version >= 2 {
938 types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
939 } else {
940 types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
941 }
942 } else {
943 if !self.partition_indexes.is_empty() {
944 bail!("A field is set that is not available on the selected protocol version");
945 }
946 }
947 if version >= 2 {
948 let num_tagged_fields = self.unknown_tagged_fields.len();
949 if num_tagged_fields > std::u32::MAX as usize {
950 bail!(
951 "Too many tagged fields to encode ({} fields)",
952 num_tagged_fields
953 );
954 }
955 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
956
957 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
958 }
959 Ok(())
960 }
961 fn compute_size(&self, version: i16) -> Result<usize> {
962 let mut total_size = 0;
963 if version >= 1 && version <= 2 {
964 if version >= 2 {
965 total_size += types::CompactString.compute_size(&self.name)?;
966 } else {
967 total_size += types::String.compute_size(&self.name)?;
968 }
969 } else {
970 if !self.name.is_empty() {
971 bail!("A field is set that is not available on the selected protocol version");
972 }
973 }
974 if version >= 1 && version <= 2 {
975 if version >= 2 {
976 total_size +=
977 types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
978 } else {
979 total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
980 }
981 } else {
982 if !self.partition_indexes.is_empty() {
983 bail!("A field is set that is not available on the selected protocol version");
984 }
985 }
986 if version >= 2 {
987 let num_tagged_fields = self.unknown_tagged_fields.len();
988 if num_tagged_fields > std::u32::MAX as usize {
989 bail!(
990 "Too many tagged fields to encode ({} fields)",
991 num_tagged_fields
992 );
993 }
994 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
995
996 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
997 }
998 Ok(total_size)
999 }
1000}
1001
1002#[cfg(feature = "broker")]
1003impl Decodable for StopReplicaTopicV1 {
1004 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1005 let name = if version >= 1 && version <= 2 {
1006 if version >= 2 {
1007 types::CompactString.decode(buf)?
1008 } else {
1009 types::String.decode(buf)?
1010 }
1011 } else {
1012 Default::default()
1013 };
1014 let partition_indexes = if version >= 1 && version <= 2 {
1015 if version >= 2 {
1016 types::CompactArray(types::Int32).decode(buf)?
1017 } else {
1018 types::Array(types::Int32).decode(buf)?
1019 }
1020 } else {
1021 Default::default()
1022 };
1023 let mut unknown_tagged_fields = BTreeMap::new();
1024 if version >= 2 {
1025 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1026 for _ in 0..num_tagged_fields {
1027 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1028 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1029 let unknown_value = buf.try_get_bytes(size as usize)?;
1030 unknown_tagged_fields.insert(tag as i32, unknown_value);
1031 }
1032 }
1033 Ok(Self {
1034 name,
1035 partition_indexes,
1036 unknown_tagged_fields,
1037 })
1038 }
1039}
1040
1041impl Default for StopReplicaTopicV1 {
1042 fn default() -> Self {
1043 Self {
1044 name: Default::default(),
1045 partition_indexes: Default::default(),
1046 unknown_tagged_fields: BTreeMap::new(),
1047 }
1048 }
1049}
1050
1051impl Message for StopReplicaTopicV1 {
1052 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
1053 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1054}
1055
1056impl HeaderVersion for StopReplicaRequest {
1057 fn header_version(version: i16) -> i16 {
1058 if version >= 2 {
1059 2
1060 } else {
1061 1
1062 }
1063 }
1064}