1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct LeaderAndIsrPartitionError {
24 pub topic_name: super::TopicName,
28
29 pub partition_index: i32,
33
34 pub error_code: i16,
38
39 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
41}
42
43impl LeaderAndIsrPartitionError {
44 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
50 self.topic_name = value;
51 self
52 }
53 pub fn with_partition_index(mut self, value: i32) -> Self {
59 self.partition_index = value;
60 self
61 }
62 pub fn with_error_code(mut self, value: i16) -> Self {
68 self.error_code = value;
69 self
70 }
71 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
73 self.unknown_tagged_fields = value;
74 self
75 }
76 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
78 self.unknown_tagged_fields.insert(key, value);
79 self
80 }
81}
82
83#[cfg(feature = "broker")]
84impl Encodable for LeaderAndIsrPartitionError {
85 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
86 if version <= 4 {
87 if version >= 4 {
88 types::CompactString.encode(buf, &self.topic_name)?;
89 } else {
90 types::String.encode(buf, &self.topic_name)?;
91 }
92 }
93 types::Int32.encode(buf, &self.partition_index)?;
94 types::Int16.encode(buf, &self.error_code)?;
95 if version >= 4 {
96 let num_tagged_fields = self.unknown_tagged_fields.len();
97 if num_tagged_fields > std::u32::MAX as usize {
98 bail!(
99 "Too many tagged fields to encode ({} fields)",
100 num_tagged_fields
101 );
102 }
103 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
104
105 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
106 }
107 Ok(())
108 }
109 fn compute_size(&self, version: i16) -> Result<usize> {
110 let mut total_size = 0;
111 if version <= 4 {
112 if version >= 4 {
113 total_size += types::CompactString.compute_size(&self.topic_name)?;
114 } else {
115 total_size += types::String.compute_size(&self.topic_name)?;
116 }
117 }
118 total_size += types::Int32.compute_size(&self.partition_index)?;
119 total_size += types::Int16.compute_size(&self.error_code)?;
120 if version >= 4 {
121 let num_tagged_fields = self.unknown_tagged_fields.len();
122 if num_tagged_fields > std::u32::MAX as usize {
123 bail!(
124 "Too many tagged fields to encode ({} fields)",
125 num_tagged_fields
126 );
127 }
128 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
129
130 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
131 }
132 Ok(total_size)
133 }
134}
135
136#[cfg(feature = "client")]
137impl Decodable for LeaderAndIsrPartitionError {
138 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
139 let topic_name = if version <= 4 {
140 if version >= 4 {
141 types::CompactString.decode(buf)?
142 } else {
143 types::String.decode(buf)?
144 }
145 } else {
146 Default::default()
147 };
148 let partition_index = types::Int32.decode(buf)?;
149 let error_code = types::Int16.decode(buf)?;
150 let mut unknown_tagged_fields = BTreeMap::new();
151 if version >= 4 {
152 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
153 for _ in 0..num_tagged_fields {
154 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
155 let size: u32 = types::UnsignedVarInt.decode(buf)?;
156 let unknown_value = buf.try_get_bytes(size as usize)?;
157 unknown_tagged_fields.insert(tag as i32, unknown_value);
158 }
159 }
160 Ok(Self {
161 topic_name,
162 partition_index,
163 error_code,
164 unknown_tagged_fields,
165 })
166 }
167}
168
169impl Default for LeaderAndIsrPartitionError {
170 fn default() -> Self {
171 Self {
172 topic_name: Default::default(),
173 partition_index: 0,
174 error_code: 0,
175 unknown_tagged_fields: BTreeMap::new(),
176 }
177 }
178}
179
180impl Message for LeaderAndIsrPartitionError {
181 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
182 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
183}
184
185#[non_exhaustive]
187#[derive(Debug, Clone, PartialEq)]
188pub struct LeaderAndIsrResponse {
189 pub error_code: i16,
193
194 pub partition_errors: Vec<LeaderAndIsrPartitionError>,
198
199 pub topics: Vec<LeaderAndIsrTopicError>,
203
204 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
206}
207
208impl LeaderAndIsrResponse {
209 pub fn with_error_code(mut self, value: i16) -> Self {
215 self.error_code = value;
216 self
217 }
218 pub fn with_partition_errors(mut self, value: Vec<LeaderAndIsrPartitionError>) -> Self {
224 self.partition_errors = value;
225 self
226 }
227 pub fn with_topics(mut self, value: Vec<LeaderAndIsrTopicError>) -> Self {
233 self.topics = value;
234 self
235 }
236 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
238 self.unknown_tagged_fields = value;
239 self
240 }
241 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
243 self.unknown_tagged_fields.insert(key, value);
244 self
245 }
246}
247
248#[cfg(feature = "broker")]
249impl Encodable for LeaderAndIsrResponse {
250 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
251 types::Int16.encode(buf, &self.error_code)?;
252 if version <= 4 {
253 if version >= 4 {
254 types::CompactArray(types::Struct { version })
255 .encode(buf, &self.partition_errors)?;
256 } else {
257 types::Array(types::Struct { version }).encode(buf, &self.partition_errors)?;
258 }
259 } else {
260 if !self.partition_errors.is_empty() {
261 bail!("A field is set that is not available on the selected protocol version");
262 }
263 }
264 if version >= 5 {
265 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
266 } else {
267 if !self.topics.is_empty() {
268 bail!("A field is set that is not available on the selected protocol version");
269 }
270 }
271 if version >= 4 {
272 let num_tagged_fields = self.unknown_tagged_fields.len();
273 if num_tagged_fields > std::u32::MAX as usize {
274 bail!(
275 "Too many tagged fields to encode ({} fields)",
276 num_tagged_fields
277 );
278 }
279 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
280
281 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
282 }
283 Ok(())
284 }
285 fn compute_size(&self, version: i16) -> Result<usize> {
286 let mut total_size = 0;
287 total_size += types::Int16.compute_size(&self.error_code)?;
288 if version <= 4 {
289 if version >= 4 {
290 total_size += types::CompactArray(types::Struct { version })
291 .compute_size(&self.partition_errors)?;
292 } else {
293 total_size +=
294 types::Array(types::Struct { version }).compute_size(&self.partition_errors)?;
295 }
296 } else {
297 if !self.partition_errors.is_empty() {
298 bail!("A field is set that is not available on the selected protocol version");
299 }
300 }
301 if version >= 5 {
302 total_size +=
303 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
304 } else {
305 if !self.topics.is_empty() {
306 bail!("A field is set that is not available on the selected protocol version");
307 }
308 }
309 if version >= 4 {
310 let num_tagged_fields = self.unknown_tagged_fields.len();
311 if num_tagged_fields > std::u32::MAX as usize {
312 bail!(
313 "Too many tagged fields to encode ({} fields)",
314 num_tagged_fields
315 );
316 }
317 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
318
319 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
320 }
321 Ok(total_size)
322 }
323}
324
325#[cfg(feature = "client")]
326impl Decodable for LeaderAndIsrResponse {
327 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
328 let error_code = types::Int16.decode(buf)?;
329 let partition_errors = if version <= 4 {
330 if version >= 4 {
331 types::CompactArray(types::Struct { version }).decode(buf)?
332 } else {
333 types::Array(types::Struct { version }).decode(buf)?
334 }
335 } else {
336 Default::default()
337 };
338 let topics = if version >= 5 {
339 types::CompactArray(types::Struct { version }).decode(buf)?
340 } else {
341 Default::default()
342 };
343 let mut unknown_tagged_fields = BTreeMap::new();
344 if version >= 4 {
345 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
346 for _ in 0..num_tagged_fields {
347 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
348 let size: u32 = types::UnsignedVarInt.decode(buf)?;
349 let unknown_value = buf.try_get_bytes(size as usize)?;
350 unknown_tagged_fields.insert(tag as i32, unknown_value);
351 }
352 }
353 Ok(Self {
354 error_code,
355 partition_errors,
356 topics,
357 unknown_tagged_fields,
358 })
359 }
360}
361
362impl Default for LeaderAndIsrResponse {
363 fn default() -> Self {
364 Self {
365 error_code: 0,
366 partition_errors: Default::default(),
367 topics: Default::default(),
368 unknown_tagged_fields: BTreeMap::new(),
369 }
370 }
371}
372
373impl Message for LeaderAndIsrResponse {
374 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
375 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
376}
377
378#[non_exhaustive]
380#[derive(Debug, Clone, PartialEq)]
381pub struct LeaderAndIsrTopicError {
382 pub topic_id: Uuid,
386
387 pub partition_errors: Vec<LeaderAndIsrPartitionError>,
391
392 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
394}
395
396impl LeaderAndIsrTopicError {
397 pub fn with_topic_id(mut self, value: Uuid) -> Self {
403 self.topic_id = value;
404 self
405 }
406 pub fn with_partition_errors(mut self, value: Vec<LeaderAndIsrPartitionError>) -> Self {
412 self.partition_errors = value;
413 self
414 }
415 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
417 self.unknown_tagged_fields = value;
418 self
419 }
420 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
422 self.unknown_tagged_fields.insert(key, value);
423 self
424 }
425}
426
427#[cfg(feature = "broker")]
428impl Encodable for LeaderAndIsrTopicError {
429 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
430 if version >= 5 {
431 types::Uuid.encode(buf, &self.topic_id)?;
432 } else {
433 if &self.topic_id != &Uuid::nil() {
434 bail!("A field is set that is not available on the selected protocol version");
435 }
436 }
437 if version >= 5 {
438 types::CompactArray(types::Struct { version }).encode(buf, &self.partition_errors)?;
439 } else {
440 if !self.partition_errors.is_empty() {
441 bail!("A field is set that is not available on the selected protocol version");
442 }
443 }
444 if version >= 4 {
445 let num_tagged_fields = self.unknown_tagged_fields.len();
446 if num_tagged_fields > std::u32::MAX as usize {
447 bail!(
448 "Too many tagged fields to encode ({} fields)",
449 num_tagged_fields
450 );
451 }
452 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
453
454 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
455 }
456 Ok(())
457 }
458 fn compute_size(&self, version: i16) -> Result<usize> {
459 let mut total_size = 0;
460 if version >= 5 {
461 total_size += types::Uuid.compute_size(&self.topic_id)?;
462 } else {
463 if &self.topic_id != &Uuid::nil() {
464 bail!("A field is set that is not available on the selected protocol version");
465 }
466 }
467 if version >= 5 {
468 total_size += types::CompactArray(types::Struct { version })
469 .compute_size(&self.partition_errors)?;
470 } else {
471 if !self.partition_errors.is_empty() {
472 bail!("A field is set that is not available on the selected protocol version");
473 }
474 }
475 if version >= 4 {
476 let num_tagged_fields = self.unknown_tagged_fields.len();
477 if num_tagged_fields > std::u32::MAX as usize {
478 bail!(
479 "Too many tagged fields to encode ({} fields)",
480 num_tagged_fields
481 );
482 }
483 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
484
485 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
486 }
487 Ok(total_size)
488 }
489}
490
491#[cfg(feature = "client")]
492impl Decodable for LeaderAndIsrTopicError {
493 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
494 let topic_id = if version >= 5 {
495 types::Uuid.decode(buf)?
496 } else {
497 Uuid::nil()
498 };
499 let partition_errors = if version >= 5 {
500 types::CompactArray(types::Struct { version }).decode(buf)?
501 } else {
502 Default::default()
503 };
504 let mut unknown_tagged_fields = BTreeMap::new();
505 if version >= 4 {
506 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
507 for _ in 0..num_tagged_fields {
508 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
509 let size: u32 = types::UnsignedVarInt.decode(buf)?;
510 let unknown_value = buf.try_get_bytes(size as usize)?;
511 unknown_tagged_fields.insert(tag as i32, unknown_value);
512 }
513 }
514 Ok(Self {
515 topic_id,
516 partition_errors,
517 unknown_tagged_fields,
518 })
519 }
520}
521
522impl Default for LeaderAndIsrTopicError {
523 fn default() -> Self {
524 Self {
525 topic_id: Uuid::nil(),
526 partition_errors: Default::default(),
527 unknown_tagged_fields: BTreeMap::new(),
528 }
529 }
530}
531
532impl Message for LeaderAndIsrTopicError {
533 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
534 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
535}
536
537impl HeaderVersion for LeaderAndIsrResponse {
538 fn header_version(version: i16) -> i16 {
539 if version >= 4 {
540 1
541 } else {
542 0
543 }
544 }
545}