1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderAndIsrPartitionError {
24 pub topic_name: super::TopicName,
28
29 pub partition_index: i32,
33
34 pub error_code: i16,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl LeaderAndIsrPartitionError {
44 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
50 self.topic_name = value;
51 self
52 }
53 pub fn with_partition_index(mut self, value: i32) -> Self {
59 self.partition_index = value;
60 self
61 }
62 pub fn with_error_code(mut self, value: i16) -> Self {
68 self.error_code = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for LeaderAndIsrPartitionError {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version < 0 || version > 7 {
87 bail!("specified version not supported by this message type");
88 }
89 if version <= 4 {
90 if version >= 4 {
91 types::CompactString.encode(buf, &self.topic_name)?;
92 } else {
93 types::String.encode(buf, &self.topic_name)?;
94 }
95 }
96 types::Int32.encode(buf, &self.partition_index)?;
97 types::Int16.encode(buf, &self.error_code)?;
98 if version >= 4 {
99 let num_tagged_fields = self.unknown_tagged_fields.len();
100 if num_tagged_fields > std::u32::MAX as usize {
101 bail!(
102 "Too many tagged fields to encode ({} fields)",
103 num_tagged_fields
104 );
105 }
106 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
107
108 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
109 }
110 Ok(())
111 }
112 fn compute_size(&self, version: i16) -> Result<usize> {
113 let mut total_size = 0;
114 if version <= 4 {
115 if version >= 4 {
116 total_size += types::CompactString.compute_size(&self.topic_name)?;
117 } else {
118 total_size += types::String.compute_size(&self.topic_name)?;
119 }
120 }
121 total_size += types::Int32.compute_size(&self.partition_index)?;
122 total_size += types::Int16.compute_size(&self.error_code)?;
123 if version >= 4 {
124 let num_tagged_fields = self.unknown_tagged_fields.len();
125 if num_tagged_fields > std::u32::MAX as usize {
126 bail!(
127 "Too many tagged fields to encode ({} fields)",
128 num_tagged_fields
129 );
130 }
131 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
132
133 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
134 }
135 Ok(total_size)
136 }
137}
138
139#[cfg(feature = "client")]
140impl Decodable for LeaderAndIsrPartitionError {
141 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
142 if version < 0 || version > 7 {
143 bail!("specified version not supported by this message type");
144 }
145 let topic_name = if version <= 4 {
146 if version >= 4 {
147 types::CompactString.decode(buf)?
148 } else {
149 types::String.decode(buf)?
150 }
151 } else {
152 Default::default()
153 };
154 let partition_index = types::Int32.decode(buf)?;
155 let error_code = types::Int16.decode(buf)?;
156 let mut unknown_tagged_fields = BTreeMap::new();
157 if version >= 4 {
158 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
159 for _ in 0..num_tagged_fields {
160 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
161 let size: u32 = types::UnsignedVarInt.decode(buf)?;
162 let unknown_value = buf.try_get_bytes(size as usize)?;
163 unknown_tagged_fields.insert(tag as i32, unknown_value);
164 }
165 }
166 Ok(Self {
167 topic_name,
168 partition_index,
169 error_code,
170 unknown_tagged_fields,
171 })
172 }
173}
174
175impl Default for LeaderAndIsrPartitionError {
176 fn default() -> Self {
177 Self {
178 topic_name: Default::default(),
179 partition_index: 0,
180 error_code: 0,
181 unknown_tagged_fields: BTreeMap::new(),
182 }
183 }
184}
185
186impl Message for LeaderAndIsrPartitionError {
187 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
188 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
189}
190
191#[non_exhaustive]
193#[derive(Debug, Clone, PartialEq)]
194pub struct LeaderAndIsrResponse {
195 pub error_code: i16,
199
200 pub partition_errors: Vec<LeaderAndIsrPartitionError>,
204
205 pub topics: Vec<LeaderAndIsrTopicError>,
209
210 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
212}
213
214impl LeaderAndIsrResponse {
215 pub fn with_error_code(mut self, value: i16) -> Self {
221 self.error_code = value;
222 self
223 }
224 pub fn with_partition_errors(mut self, value: Vec<LeaderAndIsrPartitionError>) -> Self {
230 self.partition_errors = value;
231 self
232 }
233 pub fn with_topics(mut self, value: Vec<LeaderAndIsrTopicError>) -> Self {
239 self.topics = value;
240 self
241 }
242 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
244 self.unknown_tagged_fields = value;
245 self
246 }
247 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
249 self.unknown_tagged_fields.insert(key, value);
250 self
251 }
252}
253
254#[cfg(feature = "broker")]
255impl Encodable for LeaderAndIsrResponse {
256 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
257 if version < 0 || version > 7 {
258 bail!("specified version not supported by this message type");
259 }
260 types::Int16.encode(buf, &self.error_code)?;
261 if version <= 4 {
262 if version >= 4 {
263 types::CompactArray(types::Struct { version })
264 .encode(buf, &self.partition_errors)?;
265 } else {
266 types::Array(types::Struct { version }).encode(buf, &self.partition_errors)?;
267 }
268 } else {
269 if !self.partition_errors.is_empty() {
270 bail!("A field is set that is not available on the selected protocol version");
271 }
272 }
273 if version >= 5 {
274 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
275 } else {
276 if !self.topics.is_empty() {
277 bail!("A field is set that is not available on the selected protocol version");
278 }
279 }
280 if version >= 4 {
281 let num_tagged_fields = self.unknown_tagged_fields.len();
282 if num_tagged_fields > std::u32::MAX as usize {
283 bail!(
284 "Too many tagged fields to encode ({} fields)",
285 num_tagged_fields
286 );
287 }
288 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
289
290 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
291 }
292 Ok(())
293 }
294 fn compute_size(&self, version: i16) -> Result<usize> {
295 let mut total_size = 0;
296 total_size += types::Int16.compute_size(&self.error_code)?;
297 if version <= 4 {
298 if version >= 4 {
299 total_size += types::CompactArray(types::Struct { version })
300 .compute_size(&self.partition_errors)?;
301 } else {
302 total_size +=
303 types::Array(types::Struct { version }).compute_size(&self.partition_errors)?;
304 }
305 } else {
306 if !self.partition_errors.is_empty() {
307 bail!("A field is set that is not available on the selected protocol version");
308 }
309 }
310 if version >= 5 {
311 total_size +=
312 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
313 } else {
314 if !self.topics.is_empty() {
315 bail!("A field is set that is not available on the selected protocol version");
316 }
317 }
318 if version >= 4 {
319 let num_tagged_fields = self.unknown_tagged_fields.len();
320 if num_tagged_fields > std::u32::MAX as usize {
321 bail!(
322 "Too many tagged fields to encode ({} fields)",
323 num_tagged_fields
324 );
325 }
326 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
327
328 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
329 }
330 Ok(total_size)
331 }
332}
333
334#[cfg(feature = "client")]
335impl Decodable for LeaderAndIsrResponse {
336 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
337 if version < 0 || version > 7 {
338 bail!("specified version not supported by this message type");
339 }
340 let error_code = types::Int16.decode(buf)?;
341 let partition_errors = if version <= 4 {
342 if version >= 4 {
343 types::CompactArray(types::Struct { version }).decode(buf)?
344 } else {
345 types::Array(types::Struct { version }).decode(buf)?
346 }
347 } else {
348 Default::default()
349 };
350 let topics = if version >= 5 {
351 types::CompactArray(types::Struct { version }).decode(buf)?
352 } else {
353 Default::default()
354 };
355 let mut unknown_tagged_fields = BTreeMap::new();
356 if version >= 4 {
357 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
358 for _ in 0..num_tagged_fields {
359 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
360 let size: u32 = types::UnsignedVarInt.decode(buf)?;
361 let unknown_value = buf.try_get_bytes(size as usize)?;
362 unknown_tagged_fields.insert(tag as i32, unknown_value);
363 }
364 }
365 Ok(Self {
366 error_code,
367 partition_errors,
368 topics,
369 unknown_tagged_fields,
370 })
371 }
372}
373
374impl Default for LeaderAndIsrResponse {
375 fn default() -> Self {
376 Self {
377 error_code: 0,
378 partition_errors: Default::default(),
379 topics: Default::default(),
380 unknown_tagged_fields: BTreeMap::new(),
381 }
382 }
383}
384
385impl Message for LeaderAndIsrResponse {
386 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
387 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
388}
389
390#[non_exhaustive]
392#[derive(Debug, Clone, PartialEq)]
393pub struct LeaderAndIsrTopicError {
394 pub topic_id: Uuid,
398
399 pub partition_errors: Vec<LeaderAndIsrPartitionError>,
403
404 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
406}
407
408impl LeaderAndIsrTopicError {
409 pub fn with_topic_id(mut self, value: Uuid) -> Self {
415 self.topic_id = value;
416 self
417 }
418 pub fn with_partition_errors(mut self, value: Vec<LeaderAndIsrPartitionError>) -> Self {
424 self.partition_errors = value;
425 self
426 }
427 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
429 self.unknown_tagged_fields = value;
430 self
431 }
432 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
434 self.unknown_tagged_fields.insert(key, value);
435 self
436 }
437}
438
439#[cfg(feature = "broker")]
440impl Encodable for LeaderAndIsrTopicError {
441 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
442 if version < 0 || version > 7 {
443 bail!("specified version not supported by this message type");
444 }
445 if version >= 5 {
446 types::Uuid.encode(buf, &self.topic_id)?;
447 } else {
448 if &self.topic_id != &Uuid::nil() {
449 bail!("A field is set that is not available on the selected protocol version");
450 }
451 }
452 if version >= 5 {
453 types::CompactArray(types::Struct { version }).encode(buf, &self.partition_errors)?;
454 } else {
455 if !self.partition_errors.is_empty() {
456 bail!("A field is set that is not available on the selected protocol version");
457 }
458 }
459 if version >= 4 {
460 let num_tagged_fields = self.unknown_tagged_fields.len();
461 if num_tagged_fields > std::u32::MAX as usize {
462 bail!(
463 "Too many tagged fields to encode ({} fields)",
464 num_tagged_fields
465 );
466 }
467 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
468
469 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
470 }
471 Ok(())
472 }
473 fn compute_size(&self, version: i16) -> Result<usize> {
474 let mut total_size = 0;
475 if version >= 5 {
476 total_size += types::Uuid.compute_size(&self.topic_id)?;
477 } else {
478 if &self.topic_id != &Uuid::nil() {
479 bail!("A field is set that is not available on the selected protocol version");
480 }
481 }
482 if version >= 5 {
483 total_size += types::CompactArray(types::Struct { version })
484 .compute_size(&self.partition_errors)?;
485 } else {
486 if !self.partition_errors.is_empty() {
487 bail!("A field is set that is not available on the selected protocol version");
488 }
489 }
490 if version >= 4 {
491 let num_tagged_fields = self.unknown_tagged_fields.len();
492 if num_tagged_fields > std::u32::MAX as usize {
493 bail!(
494 "Too many tagged fields to encode ({} fields)",
495 num_tagged_fields
496 );
497 }
498 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
499
500 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
501 }
502 Ok(total_size)
503 }
504}
505
506#[cfg(feature = "client")]
507impl Decodable for LeaderAndIsrTopicError {
508 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
509 if version < 0 || version > 7 {
510 bail!("specified version not supported by this message type");
511 }
512 let topic_id = if version >= 5 {
513 types::Uuid.decode(buf)?
514 } else {
515 Uuid::nil()
516 };
517 let partition_errors = if version >= 5 {
518 types::CompactArray(types::Struct { version }).decode(buf)?
519 } else {
520 Default::default()
521 };
522 let mut unknown_tagged_fields = BTreeMap::new();
523 if version >= 4 {
524 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
525 for _ in 0..num_tagged_fields {
526 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
527 let size: u32 = types::UnsignedVarInt.decode(buf)?;
528 let unknown_value = buf.try_get_bytes(size as usize)?;
529 unknown_tagged_fields.insert(tag as i32, unknown_value);
530 }
531 }
532 Ok(Self {
533 topic_id,
534 partition_errors,
535 unknown_tagged_fields,
536 })
537 }
538}
539
540impl Default for LeaderAndIsrTopicError {
541 fn default() -> Self {
542 Self {
543 topic_id: Uuid::nil(),
544 partition_errors: Default::default(),
545 unknown_tagged_fields: BTreeMap::new(),
546 }
547 }
548}
549
550impl Message for LeaderAndIsrTopicError {
551 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
552 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
553}
554
555impl HeaderVersion for LeaderAndIsrResponse {
556 fn header_version(version: i16) -> i16 {
557 if version >= 4 {
558 1
559 } else {
560 0
561 }
562 }
563}