1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct StopReplicaPartitionState {
24 pub partition_index: i32,
28
29 pub leader_epoch: i32,
33
34 pub delete_partition: bool,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl StopReplicaPartitionState {
44 pub fn with_partition_index(mut self, value: i32) -> Self {
50 self.partition_index = value;
51 self
52 }
53 pub fn with_leader_epoch(mut self, value: i32) -> Self {
59 self.leader_epoch = value;
60 self
61 }
62 pub fn with_delete_partition(mut self, value: bool) -> Self {
68 self.delete_partition = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for StopReplicaPartitionState {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 4 {
87 bail!("specified version not supported by this message type");
88 }
89 if version >= 3 {
90 types::Int32.encode(buf, &self.partition_index)?;
91 } else {
92 if self.partition_index != 0 {
93 bail!("A field is set that is not available on the selected protocol version");
94 }
95 }
96 if version >= 3 {
97 types::Int32.encode(buf, &self.leader_epoch)?;
98 } else {
99 if self.leader_epoch != -1 {
100 bail!("A field is set that is not available on the selected protocol version");
101 }
102 }
103 if version >= 3 {
104 types::Boolean.encode(buf, &self.delete_partition)?;
105 } else {
106 if self.delete_partition {
107 bail!("A field is set that is not available on the selected protocol version");
108 }
109 }
110 if version >= 2 {
111 let num_tagged_fields = self.unknown_tagged_fields.len();
112 if num_tagged_fields > std::u32::MAX as usize {
113 bail!(
114 "Too many tagged fields to encode ({} fields)",
115 num_tagged_fields
116 );
117 }
118 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
119
120 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
121 }
122 Ok(())
123 }
124 fn compute_size(&self, version: i16) -> Result<usize> {
125 let mut total_size = 0;
126 if version >= 3 {
127 total_size += types::Int32.compute_size(&self.partition_index)?;
128 } else {
129 if self.partition_index != 0 {
130 bail!("A field is set that is not available on the selected protocol version");
131 }
132 }
133 if version >= 3 {
134 total_size += types::Int32.compute_size(&self.leader_epoch)?;
135 } else {
136 if self.leader_epoch != -1 {
137 bail!("A field is set that is not available on the selected protocol version");
138 }
139 }
140 if version >= 3 {
141 total_size += types::Boolean.compute_size(&self.delete_partition)?;
142 } else {
143 if self.delete_partition {
144 bail!("A field is set that is not available on the selected protocol version");
145 }
146 }
147 if version >= 2 {
148 let num_tagged_fields = self.unknown_tagged_fields.len();
149 if num_tagged_fields > std::u32::MAX as usize {
150 bail!(
151 "Too many tagged fields to encode ({} fields)",
152 num_tagged_fields
153 );
154 }
155 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
156
157 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
158 }
159 Ok(total_size)
160 }
161}
162
163#[cfg(feature = "broker")]
164impl Decodable for StopReplicaPartitionState {
165 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
166 if version < 0 || version > 4 {
167 bail!("specified version not supported by this message type");
168 }
169 let partition_index = if version >= 3 {
170 types::Int32.decode(buf)?
171 } else {
172 0
173 };
174 let leader_epoch = if version >= 3 {
175 types::Int32.decode(buf)?
176 } else {
177 -1
178 };
179 let delete_partition = if version >= 3 {
180 types::Boolean.decode(buf)?
181 } else {
182 false
183 };
184 let mut unknown_tagged_fields = BTreeMap::new();
185 if version >= 2 {
186 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
187 for _ in 0..num_tagged_fields {
188 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
189 let size: u32 = types::UnsignedVarInt.decode(buf)?;
190 let unknown_value = buf.try_get_bytes(size as usize)?;
191 unknown_tagged_fields.insert(tag as i32, unknown_value);
192 }
193 }
194 Ok(Self {
195 partition_index,
196 leader_epoch,
197 delete_partition,
198 unknown_tagged_fields,
199 })
200 }
201}
202
203impl Default for StopReplicaPartitionState {
204 fn default() -> Self {
205 Self {
206 partition_index: 0,
207 leader_epoch: -1,
208 delete_partition: false,
209 unknown_tagged_fields: BTreeMap::new(),
210 }
211 }
212}
213
214impl Message for StopReplicaPartitionState {
215 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
216 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
217}
218
219#[non_exhaustive]
221#[derive(Debug, Clone, PartialEq)]
222pub struct StopReplicaPartitionV0 {
223 pub topic_name: super::TopicName,
227
228 pub partition_index: i32,
232
233 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
235}
236
237impl StopReplicaPartitionV0 {
238 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
244 self.topic_name = value;
245 self
246 }
247 pub fn with_partition_index(mut self, value: i32) -> Self {
253 self.partition_index = value;
254 self
255 }
256 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
258 self.unknown_tagged_fields = value;
259 self
260 }
261 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
263 self.unknown_tagged_fields.insert(key, value);
264 self
265 }
266}
267
268#[cfg(feature = "client")]
269impl Encodable for StopReplicaPartitionV0 {
270 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
271 if version < 0 || version > 4 {
272 bail!("specified version not supported by this message type");
273 }
274 if version == 0 {
275 types::String.encode(buf, &self.topic_name)?;
276 } else {
277 if !self.topic_name.is_empty() {
278 bail!("A field is set that is not available on the selected protocol version");
279 }
280 }
281 if version == 0 {
282 types::Int32.encode(buf, &self.partition_index)?;
283 } else {
284 if self.partition_index != 0 {
285 bail!("A field is set that is not available on the selected protocol version");
286 }
287 }
288 if version >= 2 {
289 let num_tagged_fields = self.unknown_tagged_fields.len();
290 if num_tagged_fields > std::u32::MAX as usize {
291 bail!(
292 "Too many tagged fields to encode ({} fields)",
293 num_tagged_fields
294 );
295 }
296 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
297
298 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
299 }
300 Ok(())
301 }
302 fn compute_size(&self, version: i16) -> Result<usize> {
303 let mut total_size = 0;
304 if version == 0 {
305 total_size += types::String.compute_size(&self.topic_name)?;
306 } else {
307 if !self.topic_name.is_empty() {
308 bail!("A field is set that is not available on the selected protocol version");
309 }
310 }
311 if version == 0 {
312 total_size += types::Int32.compute_size(&self.partition_index)?;
313 } else {
314 if self.partition_index != 0 {
315 bail!("A field is set that is not available on the selected protocol version");
316 }
317 }
318 if version >= 2 {
319 let num_tagged_fields = self.unknown_tagged_fields.len();
320 if num_tagged_fields > std::u32::MAX as usize {
321 bail!(
322 "Too many tagged fields to encode ({} fields)",
323 num_tagged_fields
324 );
325 }
326 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
327
328 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
329 }
330 Ok(total_size)
331 }
332}
333
334#[cfg(feature = "broker")]
335impl Decodable for StopReplicaPartitionV0 {
336 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
337 if version < 0 || version > 4 {
338 bail!("specified version not supported by this message type");
339 }
340 let topic_name = if version == 0 {
341 types::String.decode(buf)?
342 } else {
343 Default::default()
344 };
345 let partition_index = if version == 0 {
346 types::Int32.decode(buf)?
347 } else {
348 0
349 };
350 let mut unknown_tagged_fields = BTreeMap::new();
351 if version >= 2 {
352 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
353 for _ in 0..num_tagged_fields {
354 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
355 let size: u32 = types::UnsignedVarInt.decode(buf)?;
356 let unknown_value = buf.try_get_bytes(size as usize)?;
357 unknown_tagged_fields.insert(tag as i32, unknown_value);
358 }
359 }
360 Ok(Self {
361 topic_name,
362 partition_index,
363 unknown_tagged_fields,
364 })
365 }
366}
367
368impl Default for StopReplicaPartitionV0 {
369 fn default() -> Self {
370 Self {
371 topic_name: Default::default(),
372 partition_index: 0,
373 unknown_tagged_fields: BTreeMap::new(),
374 }
375 }
376}
377
378impl Message for StopReplicaPartitionV0 {
379 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
380 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
381}
382
383#[non_exhaustive]
385#[derive(Debug, Clone, PartialEq)]
386pub struct StopReplicaRequest {
387 pub controller_id: super::BrokerId,
391
392 pub is_k_raft_controller: bool,
396
397 pub controller_epoch: i32,
401
402 pub broker_epoch: i64,
406
407 pub delete_partitions: bool,
411
412 pub ungrouped_partitions: Vec<StopReplicaPartitionV0>,
416
417 pub topics: Vec<StopReplicaTopicV1>,
421
422 pub topic_states: Vec<StopReplicaTopicState>,
426
427 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
429}
430
431impl StopReplicaRequest {
432 pub fn with_controller_id(mut self, value: super::BrokerId) -> Self {
438 self.controller_id = value;
439 self
440 }
441 pub fn with_is_k_raft_controller(mut self, value: bool) -> Self {
447 self.is_k_raft_controller = value;
448 self
449 }
450 pub fn with_controller_epoch(mut self, value: i32) -> Self {
456 self.controller_epoch = value;
457 self
458 }
459 pub fn with_broker_epoch(mut self, value: i64) -> Self {
465 self.broker_epoch = value;
466 self
467 }
468 pub fn with_delete_partitions(mut self, value: bool) -> Self {
474 self.delete_partitions = value;
475 self
476 }
477 pub fn with_ungrouped_partitions(mut self, value: Vec<StopReplicaPartitionV0>) -> Self {
483 self.ungrouped_partitions = value;
484 self
485 }
486 pub fn with_topics(mut self, value: Vec<StopReplicaTopicV1>) -> Self {
492 self.topics = value;
493 self
494 }
495 pub fn with_topic_states(mut self, value: Vec<StopReplicaTopicState>) -> Self {
501 self.topic_states = value;
502 self
503 }
504 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
506 self.unknown_tagged_fields = value;
507 self
508 }
509 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
511 self.unknown_tagged_fields.insert(key, value);
512 self
513 }
514}
515
516#[cfg(feature = "client")]
517impl Encodable for StopReplicaRequest {
518 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
519 if version < 0 || version > 4 {
520 bail!("specified version not supported by this message type");
521 }
522 types::Int32.encode(buf, &self.controller_id)?;
523 if version >= 4 {
524 types::Boolean.encode(buf, &self.is_k_raft_controller)?;
525 } else {
526 if self.is_k_raft_controller {
527 bail!("A field is set that is not available on the selected protocol version");
528 }
529 }
530 types::Int32.encode(buf, &self.controller_epoch)?;
531 if version >= 1 {
532 types::Int64.encode(buf, &self.broker_epoch)?;
533 }
534 if version <= 2 {
535 types::Boolean.encode(buf, &self.delete_partitions)?;
536 } else {
537 if self.delete_partitions {
538 bail!("A field is set that is not available on the selected protocol version");
539 }
540 }
541 if version == 0 {
542 types::Array(types::Struct { version }).encode(buf, &self.ungrouped_partitions)?;
543 } else {
544 if !self.ungrouped_partitions.is_empty() {
545 bail!("A field is set that is not available on the selected protocol version");
546 }
547 }
548 if version >= 1 && version <= 2 {
549 if version >= 2 {
550 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
551 } else {
552 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
553 }
554 } else {
555 if !self.topics.is_empty() {
556 bail!("A field is set that is not available on the selected protocol version");
557 }
558 }
559 if version >= 3 {
560 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_states)?;
561 } else {
562 if !self.topic_states.is_empty() {
563 bail!("A field is set that is not available on the selected protocol version");
564 }
565 }
566 if version >= 2 {
567 let num_tagged_fields = self.unknown_tagged_fields.len();
568 if num_tagged_fields > std::u32::MAX as usize {
569 bail!(
570 "Too many tagged fields to encode ({} fields)",
571 num_tagged_fields
572 );
573 }
574 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
575
576 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
577 }
578 Ok(())
579 }
580 fn compute_size(&self, version: i16) -> Result<usize> {
581 let mut total_size = 0;
582 total_size += types::Int32.compute_size(&self.controller_id)?;
583 if version >= 4 {
584 total_size += types::Boolean.compute_size(&self.is_k_raft_controller)?;
585 } else {
586 if self.is_k_raft_controller {
587 bail!("A field is set that is not available on the selected protocol version");
588 }
589 }
590 total_size += types::Int32.compute_size(&self.controller_epoch)?;
591 if version >= 1 {
592 total_size += types::Int64.compute_size(&self.broker_epoch)?;
593 }
594 if version <= 2 {
595 total_size += types::Boolean.compute_size(&self.delete_partitions)?;
596 } else {
597 if self.delete_partitions {
598 bail!("A field is set that is not available on the selected protocol version");
599 }
600 }
601 if version == 0 {
602 total_size +=
603 types::Array(types::Struct { version }).compute_size(&self.ungrouped_partitions)?;
604 } else {
605 if !self.ungrouped_partitions.is_empty() {
606 bail!("A field is set that is not available on the selected protocol version");
607 }
608 }
609 if version >= 1 && version <= 2 {
610 if version >= 2 {
611 total_size +=
612 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
613 } else {
614 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
615 }
616 } else {
617 if !self.topics.is_empty() {
618 bail!("A field is set that is not available on the selected protocol version");
619 }
620 }
621 if version >= 3 {
622 total_size +=
623 types::CompactArray(types::Struct { version }).compute_size(&self.topic_states)?;
624 } else {
625 if !self.topic_states.is_empty() {
626 bail!("A field is set that is not available on the selected protocol version");
627 }
628 }
629 if version >= 2 {
630 let num_tagged_fields = self.unknown_tagged_fields.len();
631 if num_tagged_fields > std::u32::MAX as usize {
632 bail!(
633 "Too many tagged fields to encode ({} fields)",
634 num_tagged_fields
635 );
636 }
637 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
638
639 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
640 }
641 Ok(total_size)
642 }
643}
644
645#[cfg(feature = "broker")]
646impl Decodable for StopReplicaRequest {
647 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
648 if version < 0 || version > 4 {
649 bail!("specified version not supported by this message type");
650 }
651 let controller_id = types::Int32.decode(buf)?;
652 let is_k_raft_controller = if version >= 4 {
653 types::Boolean.decode(buf)?
654 } else {
655 false
656 };
657 let controller_epoch = types::Int32.decode(buf)?;
658 let broker_epoch = if version >= 1 {
659 types::Int64.decode(buf)?
660 } else {
661 -1
662 };
663 let delete_partitions = if version <= 2 {
664 types::Boolean.decode(buf)?
665 } else {
666 false
667 };
668 let ungrouped_partitions = if version == 0 {
669 types::Array(types::Struct { version }).decode(buf)?
670 } else {
671 Default::default()
672 };
673 let topics = if version >= 1 && version <= 2 {
674 if version >= 2 {
675 types::CompactArray(types::Struct { version }).decode(buf)?
676 } else {
677 types::Array(types::Struct { version }).decode(buf)?
678 }
679 } else {
680 Default::default()
681 };
682 let topic_states = if version >= 3 {
683 types::CompactArray(types::Struct { version }).decode(buf)?
684 } else {
685 Default::default()
686 };
687 let mut unknown_tagged_fields = BTreeMap::new();
688 if version >= 2 {
689 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
690 for _ in 0..num_tagged_fields {
691 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
692 let size: u32 = types::UnsignedVarInt.decode(buf)?;
693 let unknown_value = buf.try_get_bytes(size as usize)?;
694 unknown_tagged_fields.insert(tag as i32, unknown_value);
695 }
696 }
697 Ok(Self {
698 controller_id,
699 is_k_raft_controller,
700 controller_epoch,
701 broker_epoch,
702 delete_partitions,
703 ungrouped_partitions,
704 topics,
705 topic_states,
706 unknown_tagged_fields,
707 })
708 }
709}
710
711impl Default for StopReplicaRequest {
712 fn default() -> Self {
713 Self {
714 controller_id: (0).into(),
715 is_k_raft_controller: false,
716 controller_epoch: 0,
717 broker_epoch: -1,
718 delete_partitions: false,
719 ungrouped_partitions: Default::default(),
720 topics: Default::default(),
721 topic_states: Default::default(),
722 unknown_tagged_fields: BTreeMap::new(),
723 }
724 }
725}
726
727impl Message for StopReplicaRequest {
728 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
729 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
730}
731
732#[non_exhaustive]
734#[derive(Debug, Clone, PartialEq)]
735pub struct StopReplicaTopicState {
736 pub topic_name: super::TopicName,
740
741 pub partition_states: Vec<StopReplicaPartitionState>,
745
746 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
748}
749
750impl StopReplicaTopicState {
751 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
757 self.topic_name = value;
758 self
759 }
760 pub fn with_partition_states(mut self, value: Vec<StopReplicaPartitionState>) -> Self {
766 self.partition_states = value;
767 self
768 }
769 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
771 self.unknown_tagged_fields = value;
772 self
773 }
774 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
776 self.unknown_tagged_fields.insert(key, value);
777 self
778 }
779}
780
781#[cfg(feature = "client")]
782impl Encodable for StopReplicaTopicState {
783 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
784 if version < 0 || version > 4 {
785 bail!("specified version not supported by this message type");
786 }
787 if version >= 3 {
788 types::CompactString.encode(buf, &self.topic_name)?;
789 } else {
790 if !self.topic_name.is_empty() {
791 bail!("A field is set that is not available on the selected protocol version");
792 }
793 }
794 if version >= 3 {
795 types::CompactArray(types::Struct { version }).encode(buf, &self.partition_states)?;
796 } else {
797 if !self.partition_states.is_empty() {
798 bail!("A field is set that is not available on the selected protocol version");
799 }
800 }
801 if version >= 2 {
802 let num_tagged_fields = self.unknown_tagged_fields.len();
803 if num_tagged_fields > std::u32::MAX as usize {
804 bail!(
805 "Too many tagged fields to encode ({} fields)",
806 num_tagged_fields
807 );
808 }
809 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
810
811 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
812 }
813 Ok(())
814 }
815 fn compute_size(&self, version: i16) -> Result<usize> {
816 let mut total_size = 0;
817 if version >= 3 {
818 total_size += types::CompactString.compute_size(&self.topic_name)?;
819 } else {
820 if !self.topic_name.is_empty() {
821 bail!("A field is set that is not available on the selected protocol version");
822 }
823 }
824 if version >= 3 {
825 total_size += types::CompactArray(types::Struct { version })
826 .compute_size(&self.partition_states)?;
827 } else {
828 if !self.partition_states.is_empty() {
829 bail!("A field is set that is not available on the selected protocol version");
830 }
831 }
832 if version >= 2 {
833 let num_tagged_fields = self.unknown_tagged_fields.len();
834 if num_tagged_fields > std::u32::MAX as usize {
835 bail!(
836 "Too many tagged fields to encode ({} fields)",
837 num_tagged_fields
838 );
839 }
840 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
841
842 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
843 }
844 Ok(total_size)
845 }
846}
847
848#[cfg(feature = "broker")]
849impl Decodable for StopReplicaTopicState {
850 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
851 if version < 0 || version > 4 {
852 bail!("specified version not supported by this message type");
853 }
854 let topic_name = if version >= 3 {
855 types::CompactString.decode(buf)?
856 } else {
857 Default::default()
858 };
859 let partition_states = if version >= 3 {
860 types::CompactArray(types::Struct { version }).decode(buf)?
861 } else {
862 Default::default()
863 };
864 let mut unknown_tagged_fields = BTreeMap::new();
865 if version >= 2 {
866 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
867 for _ in 0..num_tagged_fields {
868 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
869 let size: u32 = types::UnsignedVarInt.decode(buf)?;
870 let unknown_value = buf.try_get_bytes(size as usize)?;
871 unknown_tagged_fields.insert(tag as i32, unknown_value);
872 }
873 }
874 Ok(Self {
875 topic_name,
876 partition_states,
877 unknown_tagged_fields,
878 })
879 }
880}
881
882impl Default for StopReplicaTopicState {
883 fn default() -> Self {
884 Self {
885 topic_name: Default::default(),
886 partition_states: Default::default(),
887 unknown_tagged_fields: BTreeMap::new(),
888 }
889 }
890}
891
892impl Message for StopReplicaTopicState {
893 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
894 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
895}
896
897#[non_exhaustive]
899#[derive(Debug, Clone, PartialEq)]
900pub struct StopReplicaTopicV1 {
901 pub name: super::TopicName,
905
906 pub partition_indexes: Vec<i32>,
910
911 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
913}
914
915impl StopReplicaTopicV1 {
916 pub fn with_name(mut self, value: super::TopicName) -> Self {
922 self.name = value;
923 self
924 }
925 pub fn with_partition_indexes(mut self, value: Vec<i32>) -> Self {
931 self.partition_indexes = value;
932 self
933 }
934 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
936 self.unknown_tagged_fields = value;
937 self
938 }
939 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
941 self.unknown_tagged_fields.insert(key, value);
942 self
943 }
944}
945
946#[cfg(feature = "client")]
947impl Encodable for StopReplicaTopicV1 {
948 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
949 if version < 0 || version > 4 {
950 bail!("specified version not supported by this message type");
951 }
952 if version >= 1 && version <= 2 {
953 if version >= 2 {
954 types::CompactString.encode(buf, &self.name)?;
955 } else {
956 types::String.encode(buf, &self.name)?;
957 }
958 } else {
959 if !self.name.is_empty() {
960 bail!("A field is set that is not available on the selected protocol version");
961 }
962 }
963 if version >= 1 && version <= 2 {
964 if version >= 2 {
965 types::CompactArray(types::Int32).encode(buf, &self.partition_indexes)?;
966 } else {
967 types::Array(types::Int32).encode(buf, &self.partition_indexes)?;
968 }
969 } else {
970 if !self.partition_indexes.is_empty() {
971 bail!("A field is set that is not available on the selected protocol version");
972 }
973 }
974 if version >= 2 {
975 let num_tagged_fields = self.unknown_tagged_fields.len();
976 if num_tagged_fields > std::u32::MAX as usize {
977 bail!(
978 "Too many tagged fields to encode ({} fields)",
979 num_tagged_fields
980 );
981 }
982 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
983
984 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
985 }
986 Ok(())
987 }
988 fn compute_size(&self, version: i16) -> Result<usize> {
989 let mut total_size = 0;
990 if version >= 1 && version <= 2 {
991 if version >= 2 {
992 total_size += types::CompactString.compute_size(&self.name)?;
993 } else {
994 total_size += types::String.compute_size(&self.name)?;
995 }
996 } else {
997 if !self.name.is_empty() {
998 bail!("A field is set that is not available on the selected protocol version");
999 }
1000 }
1001 if version >= 1 && version <= 2 {
1002 if version >= 2 {
1003 total_size +=
1004 types::CompactArray(types::Int32).compute_size(&self.partition_indexes)?;
1005 } else {
1006 total_size += types::Array(types::Int32).compute_size(&self.partition_indexes)?;
1007 }
1008 } else {
1009 if !self.partition_indexes.is_empty() {
1010 bail!("A field is set that is not available on the selected protocol version");
1011 }
1012 }
1013 if version >= 2 {
1014 let num_tagged_fields = self.unknown_tagged_fields.len();
1015 if num_tagged_fields > std::u32::MAX as usize {
1016 bail!(
1017 "Too many tagged fields to encode ({} fields)",
1018 num_tagged_fields
1019 );
1020 }
1021 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1022
1023 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1024 }
1025 Ok(total_size)
1026 }
1027}
1028
1029#[cfg(feature = "broker")]
1030impl Decodable for StopReplicaTopicV1 {
1031 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1032 if version < 0 || version > 4 {
1033 bail!("specified version not supported by this message type");
1034 }
1035 let name = if version >= 1 && version <= 2 {
1036 if version >= 2 {
1037 types::CompactString.decode(buf)?
1038 } else {
1039 types::String.decode(buf)?
1040 }
1041 } else {
1042 Default::default()
1043 };
1044 let partition_indexes = if version >= 1 && version <= 2 {
1045 if version >= 2 {
1046 types::CompactArray(types::Int32).decode(buf)?
1047 } else {
1048 types::Array(types::Int32).decode(buf)?
1049 }
1050 } else {
1051 Default::default()
1052 };
1053 let mut unknown_tagged_fields = BTreeMap::new();
1054 if version >= 2 {
1055 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1056 for _ in 0..num_tagged_fields {
1057 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1058 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1059 let unknown_value = buf.try_get_bytes(size as usize)?;
1060 unknown_tagged_fields.insert(tag as i32, unknown_value);
1061 }
1062 }
1063 Ok(Self {
1064 name,
1065 partition_indexes,
1066 unknown_tagged_fields,
1067 })
1068 }
1069}
1070
1071impl Default for StopReplicaTopicV1 {
1072 fn default() -> Self {
1073 Self {
1074 name: Default::default(),
1075 partition_indexes: Default::default(),
1076 unknown_tagged_fields: BTreeMap::new(),
1077 }
1078 }
1079}
1080
1081impl Message for StopReplicaTopicV1 {
1082 const VERSIONS: VersionRange = VersionRange { min: 0, max: 4 };
1083 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1084}
1085
1086impl HeaderVersion for StopReplicaRequest {
1087 fn header_version(version: i16) -> i16 {
1088 if version >= 2 {
1089 2
1090 } else {
1091 1
1092 }
1093 }
1094}