1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AlterPartitionResponse {
24 pub throttle_time_ms: i32,
28
29 pub error_code: i16,
33
34 pub topics: Vec<TopicData>,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl AlterPartitionResponse {
44 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
50 self.throttle_time_ms = value;
51 self
52 }
53 pub fn with_error_code(mut self, value: i16) -> Self {
59 self.error_code = value;
60 self
61 }
62 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
68 self.topics = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for AlterPartitionResponse {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 3 {
87 bail!("specified version not supported by this message type");
88 }
89 types::Int32.encode(buf, &self.throttle_time_ms)?;
90 types::Int16.encode(buf, &self.error_code)?;
91 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
92 let num_tagged_fields = self.unknown_tagged_fields.len();
93 if num_tagged_fields > std::u32::MAX as usize {
94 bail!(
95 "Too many tagged fields to encode ({} fields)",
96 num_tagged_fields
97 );
98 }
99 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
100
101 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
102 Ok(())
103 }
104 fn compute_size(&self, version: i16) -> Result<usize> {
105 let mut total_size = 0;
106 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
107 total_size += types::Int16.compute_size(&self.error_code)?;
108 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
109 let num_tagged_fields = self.unknown_tagged_fields.len();
110 if num_tagged_fields > std::u32::MAX as usize {
111 bail!(
112 "Too many tagged fields to encode ({} fields)",
113 num_tagged_fields
114 );
115 }
116 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
117
118 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
119 Ok(total_size)
120 }
121}
122
123#[cfg(feature = "client")]
124impl Decodable for AlterPartitionResponse {
125 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
126 if version < 0 || version > 3 {
127 bail!("specified version not supported by this message type");
128 }
129 let throttle_time_ms = types::Int32.decode(buf)?;
130 let error_code = types::Int16.decode(buf)?;
131 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
132 let mut unknown_tagged_fields = BTreeMap::new();
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 Ok(Self {
141 throttle_time_ms,
142 error_code,
143 topics,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for AlterPartitionResponse {
150 fn default() -> Self {
151 Self {
152 throttle_time_ms: 0,
153 error_code: 0,
154 topics: Default::default(),
155 unknown_tagged_fields: BTreeMap::new(),
156 }
157 }
158}
159
160impl Message for AlterPartitionResponse {
161 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
162 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
163}
164
165#[non_exhaustive]
167#[derive(Debug, Clone, PartialEq)]
168pub struct PartitionData {
169 pub partition_index: i32,
173
174 pub error_code: i16,
178
179 pub leader_id: super::BrokerId,
183
184 pub leader_epoch: i32,
188
189 pub isr: Vec<super::BrokerId>,
193
194 pub leader_recovery_state: i8,
198
199 pub partition_epoch: i32,
203
204 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
206}
207
208impl PartitionData {
209 pub fn with_partition_index(mut self, value: i32) -> Self {
215 self.partition_index = value;
216 self
217 }
218 pub fn with_error_code(mut self, value: i16) -> Self {
224 self.error_code = value;
225 self
226 }
227 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
233 self.leader_id = value;
234 self
235 }
236 pub fn with_leader_epoch(mut self, value: i32) -> Self {
242 self.leader_epoch = value;
243 self
244 }
245 pub fn with_isr(mut self, value: Vec<super::BrokerId>) -> Self {
251 self.isr = value;
252 self
253 }
254 pub fn with_leader_recovery_state(mut self, value: i8) -> Self {
260 self.leader_recovery_state = value;
261 self
262 }
263 pub fn with_partition_epoch(mut self, value: i32) -> Self {
269 self.partition_epoch = value;
270 self
271 }
272 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
274 self.unknown_tagged_fields = value;
275 self
276 }
277 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
279 self.unknown_tagged_fields.insert(key, value);
280 self
281 }
282}
283
284#[cfg(feature = "broker")]
285impl Encodable for PartitionData {
286 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
287 if version < 0 || version > 3 {
288 bail!("specified version not supported by this message type");
289 }
290 types::Int32.encode(buf, &self.partition_index)?;
291 types::Int16.encode(buf, &self.error_code)?;
292 types::Int32.encode(buf, &self.leader_id)?;
293 types::Int32.encode(buf, &self.leader_epoch)?;
294 types::CompactArray(types::Int32).encode(buf, &self.isr)?;
295 if version >= 1 {
296 types::Int8.encode(buf, &self.leader_recovery_state)?;
297 }
298 types::Int32.encode(buf, &self.partition_epoch)?;
299 let num_tagged_fields = self.unknown_tagged_fields.len();
300 if num_tagged_fields > std::u32::MAX as usize {
301 bail!(
302 "Too many tagged fields to encode ({} fields)",
303 num_tagged_fields
304 );
305 }
306 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
307
308 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
309 Ok(())
310 }
311 fn compute_size(&self, version: i16) -> Result<usize> {
312 let mut total_size = 0;
313 total_size += types::Int32.compute_size(&self.partition_index)?;
314 total_size += types::Int16.compute_size(&self.error_code)?;
315 total_size += types::Int32.compute_size(&self.leader_id)?;
316 total_size += types::Int32.compute_size(&self.leader_epoch)?;
317 total_size += types::CompactArray(types::Int32).compute_size(&self.isr)?;
318 if version >= 1 {
319 total_size += types::Int8.compute_size(&self.leader_recovery_state)?;
320 }
321 total_size += types::Int32.compute_size(&self.partition_epoch)?;
322 let num_tagged_fields = self.unknown_tagged_fields.len();
323 if num_tagged_fields > std::u32::MAX as usize {
324 bail!(
325 "Too many tagged fields to encode ({} fields)",
326 num_tagged_fields
327 );
328 }
329 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
330
331 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
332 Ok(total_size)
333 }
334}
335
336#[cfg(feature = "client")]
337impl Decodable for PartitionData {
338 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
339 if version < 0 || version > 3 {
340 bail!("specified version not supported by this message type");
341 }
342 let partition_index = types::Int32.decode(buf)?;
343 let error_code = types::Int16.decode(buf)?;
344 let leader_id = types::Int32.decode(buf)?;
345 let leader_epoch = types::Int32.decode(buf)?;
346 let isr = types::CompactArray(types::Int32).decode(buf)?;
347 let leader_recovery_state = if version >= 1 {
348 types::Int8.decode(buf)?
349 } else {
350 0
351 };
352 let partition_epoch = types::Int32.decode(buf)?;
353 let mut unknown_tagged_fields = BTreeMap::new();
354 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
355 for _ in 0..num_tagged_fields {
356 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
357 let size: u32 = types::UnsignedVarInt.decode(buf)?;
358 let unknown_value = buf.try_get_bytes(size as usize)?;
359 unknown_tagged_fields.insert(tag as i32, unknown_value);
360 }
361 Ok(Self {
362 partition_index,
363 error_code,
364 leader_id,
365 leader_epoch,
366 isr,
367 leader_recovery_state,
368 partition_epoch,
369 unknown_tagged_fields,
370 })
371 }
372}
373
374impl Default for PartitionData {
375 fn default() -> Self {
376 Self {
377 partition_index: 0,
378 error_code: 0,
379 leader_id: (0).into(),
380 leader_epoch: 0,
381 isr: Default::default(),
382 leader_recovery_state: 0,
383 partition_epoch: 0,
384 unknown_tagged_fields: BTreeMap::new(),
385 }
386 }
387}
388
389impl Message for PartitionData {
390 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
391 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
392}
393
394#[non_exhaustive]
396#[derive(Debug, Clone, PartialEq)]
397pub struct TopicData {
398 pub topic_name: super::TopicName,
402
403 pub topic_id: Uuid,
407
408 pub partitions: Vec<PartitionData>,
412
413 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
415}
416
417impl TopicData {
418 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
424 self.topic_name = value;
425 self
426 }
427 pub fn with_topic_id(mut self, value: Uuid) -> Self {
433 self.topic_id = value;
434 self
435 }
436 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
442 self.partitions = value;
443 self
444 }
445 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
447 self.unknown_tagged_fields = value;
448 self
449 }
450 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
452 self.unknown_tagged_fields.insert(key, value);
453 self
454 }
455}
456
457#[cfg(feature = "broker")]
458impl Encodable for TopicData {
459 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
460 if version < 0 || version > 3 {
461 bail!("specified version not supported by this message type");
462 }
463 if version <= 1 {
464 types::CompactString.encode(buf, &self.topic_name)?;
465 }
466 if version >= 2 {
467 types::Uuid.encode(buf, &self.topic_id)?;
468 }
469 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
470 let num_tagged_fields = self.unknown_tagged_fields.len();
471 if num_tagged_fields > std::u32::MAX as usize {
472 bail!(
473 "Too many tagged fields to encode ({} fields)",
474 num_tagged_fields
475 );
476 }
477 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
478
479 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
480 Ok(())
481 }
482 fn compute_size(&self, version: i16) -> Result<usize> {
483 let mut total_size = 0;
484 if version <= 1 {
485 total_size += types::CompactString.compute_size(&self.topic_name)?;
486 }
487 if version >= 2 {
488 total_size += types::Uuid.compute_size(&self.topic_id)?;
489 }
490 total_size +=
491 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
492 let num_tagged_fields = self.unknown_tagged_fields.len();
493 if num_tagged_fields > std::u32::MAX as usize {
494 bail!(
495 "Too many tagged fields to encode ({} fields)",
496 num_tagged_fields
497 );
498 }
499 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
500
501 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
502 Ok(total_size)
503 }
504}
505
506#[cfg(feature = "client")]
507impl Decodable for TopicData {
508 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
509 if version < 0 || version > 3 {
510 bail!("specified version not supported by this message type");
511 }
512 let topic_name = if version <= 1 {
513 types::CompactString.decode(buf)?
514 } else {
515 Default::default()
516 };
517 let topic_id = if version >= 2 {
518 types::Uuid.decode(buf)?
519 } else {
520 Uuid::nil()
521 };
522 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
523 let mut unknown_tagged_fields = BTreeMap::new();
524 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
525 for _ in 0..num_tagged_fields {
526 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
527 let size: u32 = types::UnsignedVarInt.decode(buf)?;
528 let unknown_value = buf.try_get_bytes(size as usize)?;
529 unknown_tagged_fields.insert(tag as i32, unknown_value);
530 }
531 Ok(Self {
532 topic_name,
533 topic_id,
534 partitions,
535 unknown_tagged_fields,
536 })
537 }
538}
539
540impl Default for TopicData {
541 fn default() -> Self {
542 Self {
543 topic_name: Default::default(),
544 topic_id: Uuid::nil(),
545 partitions: Default::default(),
546 unknown_tagged_fields: BTreeMap::new(),
547 }
548 }
549}
550
551impl Message for TopicData {
552 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
553 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
554}
555
556impl HeaderVersion for AlterPartitionResponse {
557 fn header_version(version: i16) -> i16 {
558 1
559 }
560}