1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AlterPartitionReassignmentsRequest {
24 pub timeout_ms: i32,
28
29 pub allow_replication_factor_change: bool,
33
34 pub topics: Vec<ReassignableTopic>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl AlterPartitionReassignmentsRequest {
44 pub fn with_timeout_ms(mut self, value: i32) -> Self {
50 self.timeout_ms = value;
51 self
52 }
53 pub fn with_allow_replication_factor_change(mut self, value: bool) -> Self {
59 self.allow_replication_factor_change = value;
60 self
61 }
62 pub fn with_topics(mut self, value: Vec<ReassignableTopic>) -> Self {
68 self.topics = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "client")]
84impl Encodable for AlterPartitionReassignmentsRequest {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 1 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int32.encode(buf, &self.timeout_ms)?;
90 if version >= 1 {
91 types::Boolean.encode(buf, &self.allow_replication_factor_change)?;
92 } else {
93 if !self.allow_replication_factor_change {
94 bail!("A field is set that is not available on the selected protocol version");
95 }
96 }
97 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
98 let num_tagged_fields = self.unknown_tagged_fields.len();
99 if num_tagged_fields > std::u32::MAX as usize {
100 bail!(
101 "Too many tagged fields to encode ({} fields)",
102 num_tagged_fields
103 );
104 }
105 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
106
107 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
108 Ok(())
109 }
110 fn compute_size(&self, version: i16) -> Result<usize> {
111 let mut total_size = 0;
112 total_size += types::Int32.compute_size(&self.timeout_ms)?;
113 if version >= 1 {
114 total_size += types::Boolean.compute_size(&self.allow_replication_factor_change)?;
115 } else {
116 if !self.allow_replication_factor_change {
117 bail!("A field is set that is not available on the selected protocol version");
118 }
119 }
120 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
121 let num_tagged_fields = self.unknown_tagged_fields.len();
122 if num_tagged_fields > std::u32::MAX as usize {
123 bail!(
124 "Too many tagged fields to encode ({} fields)",
125 num_tagged_fields
126 );
127 }
128 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
129
130 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
131 Ok(total_size)
132 }
133}
134
135#[cfg(feature = "broker")]
136impl Decodable for AlterPartitionReassignmentsRequest {
137 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
138 if version < 0 || version > 1 {
139 bail!("specified version not supported by this message type");
140 }
141 let timeout_ms = types::Int32.decode(buf)?;
142 let allow_replication_factor_change = if version >= 1 {
143 types::Boolean.decode(buf)?
144 } else {
145 true
146 };
147 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
148 let mut unknown_tagged_fields = BTreeMap::new();
149 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
150 for _ in 0..num_tagged_fields {
151 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
152 let size: u32 = types::UnsignedVarInt.decode(buf)?;
153 let unknown_value = buf.try_get_bytes(size as usize)?;
154 unknown_tagged_fields.insert(tag as i32, unknown_value);
155 }
156 Ok(Self {
157 timeout_ms,
158 allow_replication_factor_change,
159 topics,
160 unknown_tagged_fields,
161 })
162 }
163}
164
165impl Default for AlterPartitionReassignmentsRequest {
166 fn default() -> Self {
167 Self {
168 timeout_ms: 60000,
169 allow_replication_factor_change: true,
170 topics: Default::default(),
171 unknown_tagged_fields: BTreeMap::new(),
172 }
173 }
174}
175
176impl Message for AlterPartitionReassignmentsRequest {
177 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
178 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
179}
180
181#[non_exhaustive]
183#[derive(Debug, Clone, PartialEq)]
184pub struct ReassignablePartition {
185 pub partition_index: i32,
189
190 pub replicas: Option<Vec<super::BrokerId>>,
194
195 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
197}
198
199impl ReassignablePartition {
200 pub fn with_partition_index(mut self, value: i32) -> Self {
206 self.partition_index = value;
207 self
208 }
209 pub fn with_replicas(mut self, value: Option<Vec<super::BrokerId>>) -> Self {
215 self.replicas = value;
216 self
217 }
218 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
220 self.unknown_tagged_fields = value;
221 self
222 }
223 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
225 self.unknown_tagged_fields.insert(key, value);
226 self
227 }
228}
229
230#[cfg(feature = "client")]
231impl Encodable for ReassignablePartition {
232 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
233 if version < 0 || version > 1 {
234 bail!("specified version not supported by this message type");
235 }
236 types::Int32.encode(buf, &self.partition_index)?;
237 types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
238 let num_tagged_fields = self.unknown_tagged_fields.len();
239 if num_tagged_fields > std::u32::MAX as usize {
240 bail!(
241 "Too many tagged fields to encode ({} fields)",
242 num_tagged_fields
243 );
244 }
245 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
246
247 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
248 Ok(())
249 }
250 fn compute_size(&self, version: i16) -> Result<usize> {
251 let mut total_size = 0;
252 total_size += types::Int32.compute_size(&self.partition_index)?;
253 total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
254 let num_tagged_fields = self.unknown_tagged_fields.len();
255 if num_tagged_fields > std::u32::MAX as usize {
256 bail!(
257 "Too many tagged fields to encode ({} fields)",
258 num_tagged_fields
259 );
260 }
261 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
262
263 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
264 Ok(total_size)
265 }
266}
267
268#[cfg(feature = "broker")]
269impl Decodable for ReassignablePartition {
270 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
271 if version < 0 || version > 1 {
272 bail!("specified version not supported by this message type");
273 }
274 let partition_index = types::Int32.decode(buf)?;
275 let replicas = types::CompactArray(types::Int32).decode(buf)?;
276 let mut unknown_tagged_fields = BTreeMap::new();
277 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
278 for _ in 0..num_tagged_fields {
279 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
280 let size: u32 = types::UnsignedVarInt.decode(buf)?;
281 let unknown_value = buf.try_get_bytes(size as usize)?;
282 unknown_tagged_fields.insert(tag as i32, unknown_value);
283 }
284 Ok(Self {
285 partition_index,
286 replicas,
287 unknown_tagged_fields,
288 })
289 }
290}
291
292impl Default for ReassignablePartition {
293 fn default() -> Self {
294 Self {
295 partition_index: 0,
296 replicas: None,
297 unknown_tagged_fields: BTreeMap::new(),
298 }
299 }
300}
301
302impl Message for ReassignablePartition {
303 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
304 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
305}
306
307#[non_exhaustive]
309#[derive(Debug, Clone, PartialEq)]
310pub struct ReassignableTopic {
311 pub name: super::TopicName,
315
316 pub partitions: Vec<ReassignablePartition>,
320
321 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
323}
324
325impl ReassignableTopic {
326 pub fn with_name(mut self, value: super::TopicName) -> Self {
332 self.name = value;
333 self
334 }
335 pub fn with_partitions(mut self, value: Vec<ReassignablePartition>) -> Self {
341 self.partitions = value;
342 self
343 }
344 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
346 self.unknown_tagged_fields = value;
347 self
348 }
349 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
351 self.unknown_tagged_fields.insert(key, value);
352 self
353 }
354}
355
356#[cfg(feature = "client")]
357impl Encodable for ReassignableTopic {
358 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
359 if version < 0 || version > 1 {
360 bail!("specified version not supported by this message type");
361 }
362 types::CompactString.encode(buf, &self.name)?;
363 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
364 let num_tagged_fields = self.unknown_tagged_fields.len();
365 if num_tagged_fields > std::u32::MAX as usize {
366 bail!(
367 "Too many tagged fields to encode ({} fields)",
368 num_tagged_fields
369 );
370 }
371 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
372
373 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
374 Ok(())
375 }
376 fn compute_size(&self, version: i16) -> Result<usize> {
377 let mut total_size = 0;
378 total_size += types::CompactString.compute_size(&self.name)?;
379 total_size +=
380 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
381 let num_tagged_fields = self.unknown_tagged_fields.len();
382 if num_tagged_fields > std::u32::MAX as usize {
383 bail!(
384 "Too many tagged fields to encode ({} fields)",
385 num_tagged_fields
386 );
387 }
388 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
389
390 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
391 Ok(total_size)
392 }
393}
394
395#[cfg(feature = "broker")]
396impl Decodable for ReassignableTopic {
397 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
398 if version < 0 || version > 1 {
399 bail!("specified version not supported by this message type");
400 }
401 let name = types::CompactString.decode(buf)?;
402 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
403 let mut unknown_tagged_fields = BTreeMap::new();
404 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
405 for _ in 0..num_tagged_fields {
406 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
407 let size: u32 = types::UnsignedVarInt.decode(buf)?;
408 let unknown_value = buf.try_get_bytes(size as usize)?;
409 unknown_tagged_fields.insert(tag as i32, unknown_value);
410 }
411 Ok(Self {
412 name,
413 partitions,
414 unknown_tagged_fields,
415 })
416 }
417}
418
419impl Default for ReassignableTopic {
420 fn default() -> Self {
421 Self {
422 name: Default::default(),
423 partitions: Default::default(),
424 unknown_tagged_fields: BTreeMap::new(),
425 }
426 }
427}
428
429impl Message for ReassignableTopic {
430 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
431 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
432}
433
434impl HeaderVersion for AlterPartitionReassignmentsRequest {
435 fn header_version(version: i16) -> i16 {
436 2
437 }
438}