1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AlterPartitionResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub topics: Vec<TopicData>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl AlterPartitionResponse {
44 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
50 self.throttle_time_ms = value;
51 self
52 }
53 pub fn with_error_code(mut self, value: i16) -> Self {
59 self.error_code = value;
60 self
61 }
62 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
68 self.topics = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for AlterPartitionResponse {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 2 || version > 3 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int32.encode(buf, &self.throttle_time_ms)?;
90 types::Int16.encode(buf, &self.error_code)?;
91 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
92 let num_tagged_fields = self.unknown_tagged_fields.len();
93 if num_tagged_fields > std::u32::MAX as usize {
94 bail!(
95 "Too many tagged fields to encode ({} fields)",
96 num_tagged_fields
97 );
98 }
99 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
100
101 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
102 Ok(())
103 }
104 fn compute_size(&self, version: i16) -> Result<usize> {
105 let mut total_size = 0;
106 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
107 total_size += types::Int16.compute_size(&self.error_code)?;
108 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 Ok(total_size)
120 }
121}
122
123#[cfg(feature = "client")]
124impl Decodable for AlterPartitionResponse {
125 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
126 if version < 2 || version > 3 {
127 bail!("specified version not supported by this message type");
128 }
129 let throttle_time_ms = types::Int32.decode(buf)?;
130 let error_code = types::Int16.decode(buf)?;
131 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
132 let mut unknown_tagged_fields = BTreeMap::new();
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 Ok(Self {
141 throttle_time_ms,
142 error_code,
143 topics,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for AlterPartitionResponse {
150 fn default() -> Self {
151 Self {
152 throttle_time_ms: 0,
153 error_code: 0,
154 topics: Default::default(),
155 unknown_tagged_fields: BTreeMap::new(),
156 }
157 }
158}
159
160impl Message for AlterPartitionResponse {
161 const VERSIONS: VersionRange = VersionRange { min: 2, max: 3 };
162 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
163}
164
165#[non_exhaustive]
167#[derive(Debug, Clone, PartialEq)]
168pub struct PartitionData {
169 pub partition_index: i32,
173
174 pub error_code: i16,
178
179 pub leader_id: super::BrokerId,
183
184 pub leader_epoch: i32,
188
189 pub isr: Vec<super::BrokerId>,
193
194 pub leader_recovery_state: i8,
198
199 pub partition_epoch: i32,
203
204 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
206}
207
208impl PartitionData {
209 pub fn with_partition_index(mut self, value: i32) -> Self {
215 self.partition_index = value;
216 self
217 }
218 pub fn with_error_code(mut self, value: i16) -> Self {
224 self.error_code = value;
225 self
226 }
227 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
233 self.leader_id = value;
234 self
235 }
236 pub fn with_leader_epoch(mut self, value: i32) -> Self {
242 self.leader_epoch = value;
243 self
244 }
245 pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
251 self.isr = value;
252 self
253 }
254 pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
260 self.leader_recovery_state = value;
261 self
262 }
263 pub fn with_partition_epoch(mut self, value: i32) -> Self {
269 self.partition_epoch = value;
270 self
271 }
272 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
274 self.unknown_tagged_fields = value;
275 self
276 }
277 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
279 self.unknown_tagged_fields.insert(key, value);
280 self
281 }
282}
283
284#[cfg(feature = "broker")]
285impl Encodable for PartitionData {
286 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
287 if version < 2 || version > 3 {
288 bail!("specified version not supported by this message type");
289 }
290 types::Int32.encode(buf, &self.partition_index)?;
291 types::Int16.encode(buf, &self.error_code)?;
292 types::Int32.encode(buf, &self.leader_id)?;
293 types::Int32.encode(buf, &self.leader_epoch)?;
294 types::CompactArray(types::Int32).encode(buf, &self.isr)?;
295 types::Int8.encode(buf, &self.leader_recovery_state)?;
296 types::Int32.encode(buf, &self.partition_epoch)?;
297 let num_tagged_fields = self.unknown_tagged_fields.len();
298 if num_tagged_fields > std::u32::MAX as usize {
299 bail!(
300 "Too many tagged fields to encode ({} fields)",
301 num_tagged_fields
302 );
303 }
304 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
305
306 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
307 Ok(())
308 }
309 fn compute_size(&self, version: i16) -> Result<usize> {
310 let mut total_size = 0;
311 total_size += types::Int32.compute_size(&self.partition_index)?;
312 total_size += types::Int16.compute_size(&self.error_code)?;
313 total_size += types::Int32.compute_size(&self.leader_id)?;
314 total_size += types::Int32.compute_size(&self.leader_epoch)?;
315 total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
316 total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
317 total_size += types::Int32.compute_size(&self.partition_epoch)?;
318 let num_tagged_fields = self.unknown_tagged_fields.len();
319 if num_tagged_fields > std::u32::MAX as usize {
320 bail!(
321 "Too many tagged fields to encode ({} fields)",
322 num_tagged_fields
323 );
324 }
325 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
326
327 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
328 Ok(total_size)
329 }
330}
331
332#[cfg(feature = "client")]
333impl Decodable for PartitionData {
334 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
335 if version < 2 || version > 3 {
336 bail!("specified version not supported by this message type");
337 }
338 let partition_index = types::Int32.decode(buf)?;
339 let error_code = types::Int16.decode(buf)?;
340 let leader_id = types::Int32.decode(buf)?;
341 let leader_epoch = types::Int32.decode(buf)?;
342 let isr = types::CompactArray(types::Int32).decode(buf)?;
343 let leader_recovery_state = types::Int8.decode(buf)?;
344 let partition_epoch = types::Int32.decode(buf)?;
345 let mut unknown_tagged_fields = BTreeMap::new();
346 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
347 for _ in 0..num_tagged_fields {
348 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
349 let size: u32 = types::UnsignedVarInt.decode(buf)?;
350 let unknown_value = buf.try_get_bytes(size as usize)?;
351 unknown_tagged_fields.insert(tag as i32, unknown_value);
352 }
353 Ok(Self {
354 partition_index,
355 error_code,
356 leader_id,
357 leader_epoch,
358 isr,
359 leader_recovery_state,
360 partition_epoch,
361 unknown_tagged_fields,
362 })
363 }
364}
365
366impl Default for PartitionData {
367 fn default() -> Self {
368 Self {
369 partition_index: 0,
370 error_code: 0,
371 leader_id: (0).into(),
372 leader_epoch: 0,
373 isr: Default::default(),
374 leader_recovery_state: 0,
375 partition_epoch: 0,
376 unknown_tagged_fields: BTreeMap::new(),
377 }
378 }
379}
380
381impl Message for PartitionData {
382 const VERSIONS: VersionRange = VersionRange { min: 2, max: 3 };
383 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
384}
385
386#[non_exhaustive]
388#[derive(Debug, Clone, PartialEq)]
389pub struct TopicData {
390 pub topic_id: Uuid,
394
395 pub partitions: Vec<PartitionData>,
399
400 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
402}
403
404impl TopicData {
405 pub fn with_topic_id(mut self, value: Uuid) -> Self {
411 self.topic_id = value;
412 self
413 }
414 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
420 self.partitions = value;
421 self
422 }
423 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
425 self.unknown_tagged_fields = value;
426 self
427 }
428 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
430 self.unknown_tagged_fields.insert(key, value);
431 self
432 }
433}
434
435#[cfg(feature = "broker")]
436impl Encodable for TopicData {
437 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
438 if version < 2 || version > 3 {
439 bail!("specified version not supported by this message type");
440 }
441 types::Uuid.encode(buf, &self.topic_id)?;
442 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
443 let num_tagged_fields = self.unknown_tagged_fields.len();
444 if num_tagged_fields > std::u32::MAX as usize {
445 bail!(
446 "Too many tagged fields to encode ({} fields)",
447 num_tagged_fields
448 );
449 }
450 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
451
452 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
453 Ok(())
454 }
455 fn compute_size(&self, version: i16) -> Result<usize> {
456 let mut total_size = 0;
457 total_size += types::Uuid.compute_size(&self.topic_id)?;
458 total_size +=
459 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
460 let num_tagged_fields = self.unknown_tagged_fields.len();
461 if num_tagged_fields > std::u32::MAX as usize {
462 bail!(
463 "Too many tagged fields to encode ({} fields)",
464 num_tagged_fields
465 );
466 }
467 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
468
469 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
470 Ok(total_size)
471 }
472}
473
474#[cfg(feature = "client")]
475impl Decodable for TopicData {
476 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
477 if version < 2 || version > 3 {
478 bail!("specified version not supported by this message type");
479 }
480 let topic_id = types::Uuid.decode(buf)?;
481 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
482 let mut unknown_tagged_fields = BTreeMap::new();
483 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
484 for _ in 0..num_tagged_fields {
485 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
486 let size: u32 = types::UnsignedVarInt.decode(buf)?;
487 let unknown_value = buf.try_get_bytes(size as usize)?;
488 unknown_tagged_fields.insert(tag as i32, unknown_value);
489 }
490 Ok(Self {
491 topic_id,
492 partitions,
493 unknown_tagged_fields,
494 })
495 }
496}
497
498impl Default for TopicData {
499 fn default() -> Self {
500 Self {
501 topic_id: Uuid::nil(),
502 partitions: Default::default(),
503 unknown_tagged_fields: BTreeMap::new(),
504 }
505 }
506}
507
508impl Message for TopicData {
509 const VERSIONS: VersionRange = VersionRange { min: 2, max: 3 };
510 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
511}
512
513impl HeaderVersion for AlterPartitionResponse {
514 fn header_version(version: i16) -> i16 {
515 1
516 }
517}