1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct ListPartitionReassignmentsResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub error_message: Option<StrBytes>,
38
39 pub topics: Vec<OngoingTopicReassignment>,
43
44 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
46}
47
48impl ListPartitionReassignmentsResponse {
49 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
55 self.throttle_time_ms = value;
56 self
57 }
58 pub fn with_error_code(mut self, value: i16) -> Self {
64 self.error_code = value;
65 self
66 }
67 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
73 self.error_message = value;
74 self
75 }
76 pub fn with_topics(mut self, value: Vec<OngoingTopicReassignment>) -> Self {
82 self.topics = value;
83 self
84 }
85 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
87 self.unknown_tagged_fields = value;
88 self
89 }
90 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
92 self.unknown_tagged_fields.insert(key, value);
93 self
94 }
95}
96
97#[cfg(feature = "broker")]
98impl Encodable for ListPartitionReassignmentsResponse {
99 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
100 if version != 0 {
101 bail!("specified version not supported by this message type");
102 }
103 types::Int32.encode(buf, &self.throttle_time_ms)?;
104 types::Int16.encode(buf, &self.error_code)?;
105 types::CompactString.encode(buf, &self.error_message)?;
106 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
107 let num_tagged_fields = self.unknown_tagged_fields.len();
108 if num_tagged_fields > std::u32::MAX as usize {
109 bail!(
110 "Too many tagged fields to encode ({} fields)",
111 num_tagged_fields
112 );
113 }
114 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
115
116 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
117 Ok(())
118 }
119 fn compute_size(&self, version: i16) -> Result<usize> {
120 let mut total_size = 0;
121 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
122 total_size += types::Int16.compute_size(&self.error_code)?;
123 total_size += types::CompactString.compute_size(&self.error_message)?;
124 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
125 let num_tagged_fields = self.unknown_tagged_fields.len();
126 if num_tagged_fields > std::u32::MAX as usize {
127 bail!(
128 "Too many tagged fields to encode ({} fields)",
129 num_tagged_fields
130 );
131 }
132 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
133
134 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
135 Ok(total_size)
136 }
137}
138
139#[cfg(feature = "client")]
140impl Decodable for ListPartitionReassignmentsResponse {
141 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
142 if version != 0 {
143 bail!("specified version not supported by this message type");
144 }
145 let throttle_time_ms = types::Int32.decode(buf)?;
146 let error_code = types::Int16.decode(buf)?;
147 let error_message = types::CompactString.decode(buf)?;
148 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
149 let mut unknown_tagged_fields = BTreeMap::new();
150 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
151 for _ in 0..num_tagged_fields {
152 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
153 let size: u32 = types::UnsignedVarInt.decode(buf)?;
154 let unknown_value = buf.try_get_bytes(size as usize)?;
155 unknown_tagged_fields.insert(tag as i32, unknown_value);
156 }
157 Ok(Self {
158 throttle_time_ms,
159 error_code,
160 error_message,
161 topics,
162 unknown_tagged_fields,
163 })
164 }
165}
166
167impl Default for ListPartitionReassignmentsResponse {
168 fn default() -> Self {
169 Self {
170 throttle_time_ms: 0,
171 error_code: 0,
172 error_message: Some(Default::default()),
173 topics: Default::default(),
174 unknown_tagged_fields: BTreeMap::new(),
175 }
176 }
177}
178
179impl Message for ListPartitionReassignmentsResponse {
180 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
181 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
182}
183
184#[non_exhaustive]
186#[derive(Debug, Clone, PartialEq)]
187pub struct OngoingPartitionReassignment {
188 pub partition_index: i32,
192
193 pub replicas: Vec<super::BrokerId>,
197
198 pub adding_replicas: Vec<super::BrokerId>,
202
203 pub removing_replicas: Vec<super::BrokerId>,
207
208 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
210}
211
212impl OngoingPartitionReassignment {
213 pub fn with_partition_index(mut self, value: i32) -> Self {
219 self.partition_index = value;
220 self
221 }
222 pub fn with_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
228 self.replicas = value;
229 self
230 }
231 pub fn with_adding_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
237 self.adding_replicas = value;
238 self
239 }
240 pub fn with_removing_replicas(mut self, value: Vec<super::BrokerId>) -> Self {
246 self.removing_replicas = value;
247 self
248 }
249 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
251 self.unknown_tagged_fields = value;
252 self
253 }
254 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
256 self.unknown_tagged_fields.insert(key, value);
257 self
258 }
259}
260
261#[cfg(feature = "broker")]
262impl Encodable for OngoingPartitionReassignment {
263 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
264 if version != 0 {
265 bail!("specified version not supported by this message type");
266 }
267 types::Int32.encode(buf, &self.partition_index)?;
268 types::CompactArray(types::Int32).encode(buf, &self.replicas)?;
269 types::CompactArray(types::Int32).encode(buf, &self.adding_replicas)?;
270 types::CompactArray(types::Int32).encode(buf, &self.removing_replicas)?;
271 let num_tagged_fields = self.unknown_tagged_fields.len();
272 if num_tagged_fields > std::u32::MAX as usize {
273 bail!(
274 "Too many tagged fields to encode ({} fields)",
275 num_tagged_fields
276 );
277 }
278 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
279
280 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
281 Ok(())
282 }
283 fn compute_size(&self, version: i16) -> Result<usize> {
284 let mut total_size = 0;
285 total_size += types::Int32.compute_size(&self.partition_index)?;
286 total_size += types::CompactArray(types::Int32).compute_size(&self.replicas)?;
287 total_size += types::CompactArray(types::Int32).compute_size(&self.adding_replicas)?;
288 total_size += types::CompactArray(types::Int32).compute_size(&self.removing_replicas)?;
289 let num_tagged_fields = self.unknown_tagged_fields.len();
290 if num_tagged_fields > std::u32::MAX as usize {
291 bail!(
292 "Too many tagged fields to encode ({} fields)",
293 num_tagged_fields
294 );
295 }
296 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
297
298 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
299 Ok(total_size)
300 }
301}
302
303#[cfg(feature = "client")]
304impl Decodable for OngoingPartitionReassignment {
305 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
306 if version != 0 {
307 bail!("specified version not supported by this message type");
308 }
309 let partition_index = types::Int32.decode(buf)?;
310 let replicas = types::CompactArray(types::Int32).decode(buf)?;
311 let adding_replicas = types::CompactArray(types::Int32).decode(buf)?;
312 let removing_replicas = types::CompactArray(types::Int32).decode(buf)?;
313 let mut unknown_tagged_fields = BTreeMap::new();
314 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
315 for _ in 0..num_tagged_fields {
316 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
317 let size: u32 = types::UnsignedVarInt.decode(buf)?;
318 let unknown_value = buf.try_get_bytes(size as usize)?;
319 unknown_tagged_fields.insert(tag as i32, unknown_value);
320 }
321 Ok(Self {
322 partition_index,
323 replicas,
324 adding_replicas,
325 removing_replicas,
326 unknown_tagged_fields,
327 })
328 }
329}
330
331impl Default for OngoingPartitionReassignment {
332 fn default() -> Self {
333 Self {
334 partition_index: 0,
335 replicas: Default::default(),
336 adding_replicas: Default::default(),
337 removing_replicas: Default::default(),
338 unknown_tagged_fields: BTreeMap::new(),
339 }
340 }
341}
342
343impl Message for OngoingPartitionReassignment {
344 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
345 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
346}
347
348#[non_exhaustive]
350#[derive(Debug, Clone, PartialEq)]
351pub struct OngoingTopicReassignment {
352 pub name: super::TopicName,
356
357 pub partitions: Vec<OngoingPartitionReassignment>,
361
362 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
364}
365
366impl OngoingTopicReassignment {
367 pub fn with_name(mut self, value: super::TopicName) -> Self {
373 self.name = value;
374 self
375 }
376 pub fn with_partitions(mut self, value: Vec<OngoingPartitionReassignment>) -> Self {
382 self.partitions = value;
383 self
384 }
385 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
387 self.unknown_tagged_fields = value;
388 self
389 }
390 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
392 self.unknown_tagged_fields.insert(key, value);
393 self
394 }
395}
396
397#[cfg(feature = "broker")]
398impl Encodable for OngoingTopicReassignment {
399 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
400 if version != 0 {
401 bail!("specified version not supported by this message type");
402 }
403 types::CompactString.encode(buf, &self.name)?;
404 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
405 let num_tagged_fields = self.unknown_tagged_fields.len();
406 if num_tagged_fields > std::u32::MAX as usize {
407 bail!(
408 "Too many tagged fields to encode ({} fields)",
409 num_tagged_fields
410 );
411 }
412 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
413
414 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
415 Ok(())
416 }
417 fn compute_size(&self, version: i16) -> Result<usize> {
418 let mut total_size = 0;
419 total_size += types::CompactString.compute_size(&self.name)?;
420 total_size +=
421 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
422 let num_tagged_fields = self.unknown_tagged_fields.len();
423 if num_tagged_fields > std::u32::MAX as usize {
424 bail!(
425 "Too many tagged fields to encode ({} fields)",
426 num_tagged_fields
427 );
428 }
429 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
430
431 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
432 Ok(total_size)
433 }
434}
435
436#[cfg(feature = "client")]
437impl Decodable for OngoingTopicReassignment {
438 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
439 if version != 0 {
440 bail!("specified version not supported by this message type");
441 }
442 let name = types::CompactString.decode(buf)?;
443 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
444 let mut unknown_tagged_fields = BTreeMap::new();
445 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
446 for _ in 0..num_tagged_fields {
447 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
448 let size: u32 = types::UnsignedVarInt.decode(buf)?;
449 let unknown_value = buf.try_get_bytes(size as usize)?;
450 unknown_tagged_fields.insert(tag as i32, unknown_value);
451 }
452 Ok(Self {
453 name,
454 partitions,
455 unknown_tagged_fields,
456 })
457 }
458}
459
460impl Default for OngoingTopicReassignment {
461 fn default() -> Self {
462 Self {
463 name: Default::default(),
464 partitions: Default::default(),
465 unknown_tagged_fields: BTreeMap::new(),
466 }
467 }
468}
469
470impl Message for OngoingTopicReassignment {
471 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
472 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
473}
474
475impl HeaderVersion for ListPartitionReassignmentsResponse {
476 fn header_version(version: i16) -> i16 {
477 1
478 }
479}