1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeProducersResponse {
24 pub throttle_time_ms: i32,
28
29 pub topics: Vec<TopicResponse>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl DescribeProducersResponse {
39 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
45 self.throttle_time_ms = value;
46 self
47 }
48 pub fn with_topics(mut self, value: Vec<TopicResponse>) -> Self {
54 self.topics = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for DescribeProducersResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version != 0 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.throttle_time_ms)?;
76 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
77 let num_tagged_fields = self.unknown_tagged_fields.len();
78 if num_tagged_fields > std::u32::MAX as usize {
79 bail!(
80 "Too many tagged fields to encode ({} fields)",
81 num_tagged_fields
82 );
83 }
84 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
85
86 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
87 Ok(())
88 }
89 fn compute_size(&self, version: i16) -> Result<usize> {
90 let mut total_size = 0;
91 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
92 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
93 let num_tagged_fields = self.unknown_tagged_fields.len();
94 if num_tagged_fields > std::u32::MAX as usize {
95 bail!(
96 "Too many tagged fields to encode ({} fields)",
97 num_tagged_fields
98 );
99 }
100 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
101
102 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
103 Ok(total_size)
104 }
105}
106
107#[cfg(feature = "client")]
108impl Decodable for DescribeProducersResponse {
109 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
110 if version != 0 {
111 bail!("specified version not supported by this message type");
112 }
113 let throttle_time_ms = types::Int32.decode(buf)?;
114 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
115 let mut unknown_tagged_fields = BTreeMap::new();
116 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
117 for _ in 0..num_tagged_fields {
118 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
119 let size: u32 = types::UnsignedVarInt.decode(buf)?;
120 let unknown_value = buf.try_get_bytes(size as usize)?;
121 unknown_tagged_fields.insert(tag as i32, unknown_value);
122 }
123 Ok(Self {
124 throttle_time_ms,
125 topics,
126 unknown_tagged_fields,
127 })
128 }
129}
130
131impl Default for DescribeProducersResponse {
132 fn default() -> Self {
133 Self {
134 throttle_time_ms: 0,
135 topics: Default::default(),
136 unknown_tagged_fields: BTreeMap::new(),
137 }
138 }
139}
140
141impl Message for DescribeProducersResponse {
142 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
143 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
144}
145
146#[non_exhaustive]
148#[derive(Debug, Clone, PartialEq)]
149pub struct PartitionResponse {
150 pub partition_index: i32,
154
155 pub error_code: i16,
159
160 pub error_message: Option<StrBytes>,
164
165 pub active_producers: Vec<ProducerState>,
169
170 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
172}
173
174impl PartitionResponse {
175 pub fn with_partition_index(mut self, value: i32) -> Self {
181 self.partition_index = value;
182 self
183 }
184 pub fn with_error_code(mut self, value: i16) -> Self {
190 self.error_code = value;
191 self
192 }
193 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
199 self.error_message = value;
200 self
201 }
202 pub fn with_active_producers(mut self, value: Vec<ProducerState>) -> Self {
208 self.active_producers = value;
209 self
210 }
211 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
213 self.unknown_tagged_fields = value;
214 self
215 }
216 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
218 self.unknown_tagged_fields.insert(key, value);
219 self
220 }
221}
222
223#[cfg(feature = "broker")]
224impl Encodable for PartitionResponse {
225 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
226 if version != 0 {
227 bail!("specified version not supported by this message type");
228 }
229 types::Int32.encode(buf, &self.partition_index)?;
230 types::Int16.encode(buf, &self.error_code)?;
231 types::CompactString.encode(buf, &self.error_message)?;
232 types::CompactArray(types::Struct { version }).encode(buf, &self.active_producers)?;
233 let num_tagged_fields = self.unknown_tagged_fields.len();
234 if num_tagged_fields > std::u32::MAX as usize {
235 bail!(
236 "Too many tagged fields to encode ({} fields)",
237 num_tagged_fields
238 );
239 }
240 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
241
242 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
243 Ok(())
244 }
245 fn compute_size(&self, version: i16) -> Result<usize> {
246 let mut total_size = 0;
247 total_size += types::Int32.compute_size(&self.partition_index)?;
248 total_size += types::Int16.compute_size(&self.error_code)?;
249 total_size += types::CompactString.compute_size(&self.error_message)?;
250 total_size +=
251 types::CompactArray(types::Struct { version }).compute_size(&self.active_producers)?;
252 let num_tagged_fields = self.unknown_tagged_fields.len();
253 if num_tagged_fields > std::u32::MAX as usize {
254 bail!(
255 "Too many tagged fields to encode ({} fields)",
256 num_tagged_fields
257 );
258 }
259 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
260
261 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
262 Ok(total_size)
263 }
264}
265
266#[cfg(feature = "client")]
267impl Decodable for PartitionResponse {
268 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
269 if version != 0 {
270 bail!("specified version not supported by this message type");
271 }
272 let partition_index = types::Int32.decode(buf)?;
273 let error_code = types::Int16.decode(buf)?;
274 let error_message = types::CompactString.decode(buf)?;
275 let active_producers = types::CompactArray(types::Struct { version }).decode(buf)?;
276 let mut unknown_tagged_fields = BTreeMap::new();
277 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
278 for _ in 0..num_tagged_fields {
279 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
280 let size: u32 = types::UnsignedVarInt.decode(buf)?;
281 let unknown_value = buf.try_get_bytes(size as usize)?;
282 unknown_tagged_fields.insert(tag as i32, unknown_value);
283 }
284 Ok(Self {
285 partition_index,
286 error_code,
287 error_message,
288 active_producers,
289 unknown_tagged_fields,
290 })
291 }
292}
293
294impl Default for PartitionResponse {
295 fn default() -> Self {
296 Self {
297 partition_index: 0,
298 error_code: 0,
299 error_message: None,
300 active_producers: Default::default(),
301 unknown_tagged_fields: BTreeMap::new(),
302 }
303 }
304}
305
306impl Message for PartitionResponse {
307 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
308 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
309}
310
311#[non_exhaustive]
313#[derive(Debug, Clone, PartialEq)]
314pub struct ProducerState {
315 pub producer_id: super::ProducerId,
319
320 pub producer_epoch: i32,
324
325 pub last_sequence: i32,
329
330 pub last_timestamp: i64,
334
335 pub coordinator_epoch: i32,
339
340 pub current_txn_start_offset: i64,
344
345 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
347}
348
349impl ProducerState {
350 pub fn with_producer_id(mut self, value: super::ProducerId) -> Self {
356 self.producer_id = value;
357 self
358 }
359 pub fn with_producer_epoch(mut self, value: i32) -> Self {
365 self.producer_epoch = value;
366 self
367 }
368 pub fn with_last_sequence(mut self, value: i32) -> Self {
374 self.last_sequence = value;
375 self
376 }
377 pub fn with_last_timestamp(mut self, value: i64) -> Self {
383 self.last_timestamp = value;
384 self
385 }
386 pub fn with_coordinator_epoch(mut self, value: i32) -> Self {
392 self.coordinator_epoch = value;
393 self
394 }
395 pub fn with_current_txn_start_offset(mut self, value: i64) -> Self {
401 self.current_txn_start_offset = value;
402 self
403 }
404 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
406 self.unknown_tagged_fields = value;
407 self
408 }
409 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
411 self.unknown_tagged_fields.insert(key, value);
412 self
413 }
414}
415
416#[cfg(feature = "broker")]
417impl Encodable for ProducerState {
418 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
419 if version != 0 {
420 bail!("specified version not supported by this message type");
421 }
422 types::Int64.encode(buf, &self.producer_id)?;
423 types::Int32.encode(buf, &self.producer_epoch)?;
424 types::Int32.encode(buf, &self.last_sequence)?;
425 types::Int64.encode(buf, &self.last_timestamp)?;
426 types::Int32.encode(buf, &self.coordinator_epoch)?;
427 types::Int64.encode(buf, &self.current_txn_start_offset)?;
428 let num_tagged_fields = self.unknown_tagged_fields.len();
429 if num_tagged_fields > std::u32::MAX as usize {
430 bail!(
431 "Too many tagged fields to encode ({} fields)",
432 num_tagged_fields
433 );
434 }
435 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
436
437 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
438 Ok(())
439 }
440 fn compute_size(&self, version: i16) -> Result<usize> {
441 let mut total_size = 0;
442 total_size += types::Int64.compute_size(&self.producer_id)?;
443 total_size += types::Int32.compute_size(&self.producer_epoch)?;
444 total_size += types::Int32.compute_size(&self.last_sequence)?;
445 total_size += types::Int64.compute_size(&self.last_timestamp)?;
446 total_size += types::Int32.compute_size(&self.coordinator_epoch)?;
447 total_size += types::Int64.compute_size(&self.current_txn_start_offset)?;
448 let num_tagged_fields = self.unknown_tagged_fields.len();
449 if num_tagged_fields > std::u32::MAX as usize {
450 bail!(
451 "Too many tagged fields to encode ({} fields)",
452 num_tagged_fields
453 );
454 }
455 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
456
457 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
458 Ok(total_size)
459 }
460}
461
462#[cfg(feature = "client")]
463impl Decodable for ProducerState {
464 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
465 if version != 0 {
466 bail!("specified version not supported by this message type");
467 }
468 let producer_id = types::Int64.decode(buf)?;
469 let producer_epoch = types::Int32.decode(buf)?;
470 let last_sequence = types::Int32.decode(buf)?;
471 let last_timestamp = types::Int64.decode(buf)?;
472 let coordinator_epoch = types::Int32.decode(buf)?;
473 let current_txn_start_offset = types::Int64.decode(buf)?;
474 let mut unknown_tagged_fields = BTreeMap::new();
475 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
476 for _ in 0..num_tagged_fields {
477 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
478 let size: u32 = types::UnsignedVarInt.decode(buf)?;
479 let unknown_value = buf.try_get_bytes(size as usize)?;
480 unknown_tagged_fields.insert(tag as i32, unknown_value);
481 }
482 Ok(Self {
483 producer_id,
484 producer_epoch,
485 last_sequence,
486 last_timestamp,
487 coordinator_epoch,
488 current_txn_start_offset,
489 unknown_tagged_fields,
490 })
491 }
492}
493
494impl Default for ProducerState {
495 fn default() -> Self {
496 Self {
497 producer_id: (0).into(),
498 producer_epoch: 0,
499 last_sequence: -1,
500 last_timestamp: -1,
501 coordinator_epoch: 0,
502 current_txn_start_offset: -1,
503 unknown_tagged_fields: BTreeMap::new(),
504 }
505 }
506}
507
508impl Message for ProducerState {
509 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
510 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
511}
512
513#[non_exhaustive]
515#[derive(Debug, Clone, PartialEq)]
516pub struct TopicResponse {
517 pub name: super::TopicName,
521
522 pub partitions: Vec<PartitionResponse>,
526
527 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
529}
530
531impl TopicResponse {
532 pub fn with_name(mut self, value: super::TopicName) -> Self {
538 self.name = value;
539 self
540 }
541 pub fn with_partitions(mut self, value: Vec<PartitionResponse>) -> Self {
547 self.partitions = value;
548 self
549 }
550 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
552 self.unknown_tagged_fields = value;
553 self
554 }
555 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
557 self.unknown_tagged_fields.insert(key, value);
558 self
559 }
560}
561
562#[cfg(feature = "broker")]
563impl Encodable for TopicResponse {
564 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
565 if version != 0 {
566 bail!("specified version not supported by this message type");
567 }
568 types::CompactString.encode(buf, &self.name)?;
569 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
570 let num_tagged_fields = self.unknown_tagged_fields.len();
571 if num_tagged_fields > std::u32::MAX as usize {
572 bail!(
573 "Too many tagged fields to encode ({} fields)",
574 num_tagged_fields
575 );
576 }
577 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
578
579 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
580 Ok(())
581 }
582 fn compute_size(&self, version: i16) -> Result<usize> {
583 let mut total_size = 0;
584 total_size += types::CompactString.compute_size(&self.name)?;
585 total_size +=
586 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
587 let num_tagged_fields = self.unknown_tagged_fields.len();
588 if num_tagged_fields > std::u32::MAX as usize {
589 bail!(
590 "Too many tagged fields to encode ({} fields)",
591 num_tagged_fields
592 );
593 }
594 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
595
596 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
597 Ok(total_size)
598 }
599}
600
601#[cfg(feature = "client")]
602impl Decodable for TopicResponse {
603 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
604 if version != 0 {
605 bail!("specified version not supported by this message type");
606 }
607 let name = types::CompactString.decode(buf)?;
608 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
609 let mut unknown_tagged_fields = BTreeMap::new();
610 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
611 for _ in 0..num_tagged_fields {
612 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
613 let size: u32 = types::UnsignedVarInt.decode(buf)?;
614 let unknown_value = buf.try_get_bytes(size as usize)?;
615 unknown_tagged_fields.insert(tag as i32, unknown_value);
616 }
617 Ok(Self {
618 name,
619 partitions,
620 unknown_tagged_fields,
621 })
622 }
623}
624
625impl Default for TopicResponse {
626 fn default() -> Self {
627 Self {
628 name: Default::default(),
629 partitions: Default::default(),
630 unknown_tagged_fields: BTreeMap::new(),
631 }
632 }
633}
634
635impl Message for TopicResponse {
636 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
637 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
638}
639
640impl HeaderVersion for DescribeProducersResponse {
641 fn header_version(version: i16) -> i16 {
642 1
643 }
644}