1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct DescribeQuorumResponse {
24 pub error_code: i16,
28
29 pub topics: Vec<TopicData>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl DescribeQuorumResponse {
39 pub fn with_error_code(mut self, value: i16) -> Self {
45 self.error_code = value;
46 self
47 }
48 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
54 self.topics = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for DescribeQuorumResponse {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 types::Int16.encode(buf, &self.error_code)?;
73 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
74 let num_tagged_fields = self.unknown_tagged_fields.len();
75 if num_tagged_fields > std::u32::MAX as usize {
76 bail!(
77 "Too many tagged fields to encode ({} fields)",
78 num_tagged_fields
79 );
80 }
81 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
82
83 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
84 Ok(())
85 }
86 fn compute_size(&self, version: i16) -> Result<usize> {
87 let mut total_size = 0;
88 total_size += types::Int16.compute_size(&self.error_code)?;
89 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
90 let num_tagged_fields = self.unknown_tagged_fields.len();
91 if num_tagged_fields > std::u32::MAX as usize {
92 bail!(
93 "Too many tagged fields to encode ({} fields)",
94 num_tagged_fields
95 );
96 }
97 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
98
99 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
100 Ok(total_size)
101 }
102}
103
104#[cfg(feature = "client")]
105impl Decodable for DescribeQuorumResponse {
106 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
107 let error_code = types::Int16.decode(buf)?;
108 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
109 let mut unknown_tagged_fields = BTreeMap::new();
110 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
111 for _ in 0..num_tagged_fields {
112 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
113 let size: u32 = types::UnsignedVarInt.decode(buf)?;
114 let unknown_value = buf.try_get_bytes(size as usize)?;
115 unknown_tagged_fields.insert(tag as i32, unknown_value);
116 }
117 Ok(Self {
118 error_code,
119 topics,
120 unknown_tagged_fields,
121 })
122 }
123}
124
125impl Default for DescribeQuorumResponse {
126 fn default() -> Self {
127 Self {
128 error_code: 0,
129 topics: Default::default(),
130 unknown_tagged_fields: BTreeMap::new(),
131 }
132 }
133}
134
135impl Message for DescribeQuorumResponse {
136 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
137 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
138}
139
140#[non_exhaustive]
142#[derive(Debug, Clone, PartialEq)]
143pub struct PartitionData {
144 pub partition_index: i32,
148
149 pub error_code: i16,
153
154 pub leader_id: super::BrokerId,
158
159 pub leader_epoch: i32,
163
164 pub high_watermark: i64,
168
169 pub current_voters: Vec<ReplicaState>,
173
174 pub observers: Vec<ReplicaState>,
178
179 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
181}
182
183impl PartitionData {
184 pub fn with_partition_index(mut self, value: i32) -> Self {
190 self.partition_index = value;
191 self
192 }
193 pub fn with_error_code(mut self, value: i16) -> Self {
199 self.error_code = value;
200 self
201 }
202 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
208 self.leader_id = value;
209 self
210 }
211 pub fn with_leader_epoch(mut self, value: i32) -> Self {
217 self.leader_epoch = value;
218 self
219 }
220 pub fn with_high_watermark(mut self, value: i64) -> Self {
226 self.high_watermark = value;
227 self
228 }
229 pub fn with_current_voters(mut self, value: Vec<ReplicaState>) -> Self {
235 self.current_voters = value;
236 self
237 }
238 pub fn with_observers(mut self, value: Vec<ReplicaState>) -> Self {
244 self.observers = value;
245 self
246 }
247 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
249 self.unknown_tagged_fields = value;
250 self
251 }
252 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
254 self.unknown_tagged_fields.insert(key, value);
255 self
256 }
257}
258
259#[cfg(feature = "broker")]
260impl Encodable for PartitionData {
261 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
262 types::Int32.encode(buf, &self.partition_index)?;
263 types::Int16.encode(buf, &self.error_code)?;
264 types::Int32.encode(buf, &self.leader_id)?;
265 types::Int32.encode(buf, &self.leader_epoch)?;
266 types::Int64.encode(buf, &self.high_watermark)?;
267 types::CompactArray(types::Struct { version }).encode(buf, &self.current_voters)?;
268 types::CompactArray(types::Struct { version }).encode(buf, &self.observers)?;
269 let num_tagged_fields = self.unknown_tagged_fields.len();
270 if num_tagged_fields > std::u32::MAX as usize {
271 bail!(
272 "Too many tagged fields to encode ({} fields)",
273 num_tagged_fields
274 );
275 }
276 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
277
278 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
279 Ok(())
280 }
281 fn compute_size(&self, version: i16) -> Result<usize> {
282 let mut total_size = 0;
283 total_size += types::Int32.compute_size(&self.partition_index)?;
284 total_size += types::Int16.compute_size(&self.error_code)?;
285 total_size += types::Int32.compute_size(&self.leader_id)?;
286 total_size += types::Int32.compute_size(&self.leader_epoch)?;
287 total_size += types::Int64.compute_size(&self.high_watermark)?;
288 total_size +=
289 types::CompactArray(types::Struct { version }).compute_size(&self.current_voters)?;
290 total_size +=
291 types::CompactArray(types::Struct { version }).compute_size(&self.observers)?;
292 let num_tagged_fields = self.unknown_tagged_fields.len();
293 if num_tagged_fields > std::u32::MAX as usize {
294 bail!(
295 "Too many tagged fields to encode ({} fields)",
296 num_tagged_fields
297 );
298 }
299 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
300
301 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
302 Ok(total_size)
303 }
304}
305
306#[cfg(feature = "client")]
307impl Decodable for PartitionData {
308 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
309 let partition_index = types::Int32.decode(buf)?;
310 let error_code = types::Int16.decode(buf)?;
311 let leader_id = types::Int32.decode(buf)?;
312 let leader_epoch = types::Int32.decode(buf)?;
313 let high_watermark = types::Int64.decode(buf)?;
314 let current_voters = types::CompactArray(types::Struct { version }).decode(buf)?;
315 let observers = types::CompactArray(types::Struct { version }).decode(buf)?;
316 let mut unknown_tagged_fields = BTreeMap::new();
317 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
318 for _ in 0..num_tagged_fields {
319 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
320 let size: u32 = types::UnsignedVarInt.decode(buf)?;
321 let unknown_value = buf.try_get_bytes(size as usize)?;
322 unknown_tagged_fields.insert(tag as i32, unknown_value);
323 }
324 Ok(Self {
325 partition_index,
326 error_code,
327 leader_id,
328 leader_epoch,
329 high_watermark,
330 current_voters,
331 observers,
332 unknown_tagged_fields,
333 })
334 }
335}
336
337impl Default for PartitionData {
338 fn default() -> Self {
339 Self {
340 partition_index: 0,
341 error_code: 0,
342 leader_id: (0).into(),
343 leader_epoch: 0,
344 high_watermark: 0,
345 current_voters: Default::default(),
346 observers: Default::default(),
347 unknown_tagged_fields: BTreeMap::new(),
348 }
349 }
350}
351
352impl Message for PartitionData {
353 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
354 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
355}
356
357#[non_exhaustive]
359#[derive(Debug, Clone, PartialEq)]
360pub struct ReplicaState {
361 pub replica_id: super::BrokerId,
365
366 pub log_end_offset: i64,
370
371 pub last_fetch_timestamp: i64,
375
376 pub last_caught_up_timestamp: i64,
380
381 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
383}
384
385impl ReplicaState {
386 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
392 self.replica_id = value;
393 self
394 }
395 pub fn with_log_end_offset(mut self, value: i64) -> Self {
401 self.log_end_offset = value;
402 self
403 }
404 pub fn with_last_fetch_timestamp(mut self, value: i64) -> Self {
410 self.last_fetch_timestamp = value;
411 self
412 }
413 pub fn with_last_caught_up_timestamp(mut self, value: i64) -> Self {
419 self.last_caught_up_timestamp = value;
420 self
421 }
422 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
424 self.unknown_tagged_fields = value;
425 self
426 }
427 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
429 self.unknown_tagged_fields.insert(key, value);
430 self
431 }
432}
433
434#[cfg(feature = "broker")]
435impl Encodable for ReplicaState {
436 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
437 types::Int32.encode(buf, &self.replica_id)?;
438 types::Int64.encode(buf, &self.log_end_offset)?;
439 if version >= 1 {
440 types::Int64.encode(buf, &self.last_fetch_timestamp)?;
441 }
442 if version >= 1 {
443 types::Int64.encode(buf, &self.last_caught_up_timestamp)?;
444 }
445 let num_tagged_fields = self.unknown_tagged_fields.len();
446 if num_tagged_fields > std::u32::MAX as usize {
447 bail!(
448 "Too many tagged fields to encode ({} fields)",
449 num_tagged_fields
450 );
451 }
452 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
453
454 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
455 Ok(())
456 }
457 fn compute_size(&self, version: i16) -> Result<usize> {
458 let mut total_size = 0;
459 total_size += types::Int32.compute_size(&self.replica_id)?;
460 total_size += types::Int64.compute_size(&self.log_end_offset)?;
461 if version >= 1 {
462 total_size += types::Int64.compute_size(&self.last_fetch_timestamp)?;
463 }
464 if version >= 1 {
465 total_size += types::Int64.compute_size(&self.last_caught_up_timestamp)?;
466 }
467 let num_tagged_fields = self.unknown_tagged_fields.len();
468 if num_tagged_fields > std::u32::MAX as usize {
469 bail!(
470 "Too many tagged fields to encode ({} fields)",
471 num_tagged_fields
472 );
473 }
474 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
475
476 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
477 Ok(total_size)
478 }
479}
480
481#[cfg(feature = "client")]
482impl Decodable for ReplicaState {
483 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
484 let replica_id = types::Int32.decode(buf)?;
485 let log_end_offset = types::Int64.decode(buf)?;
486 let last_fetch_timestamp = if version >= 1 {
487 types::Int64.decode(buf)?
488 } else {
489 -1
490 };
491 let last_caught_up_timestamp = if version >= 1 {
492 types::Int64.decode(buf)?
493 } else {
494 -1
495 };
496 let mut unknown_tagged_fields = BTreeMap::new();
497 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
498 for _ in 0..num_tagged_fields {
499 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
500 let size: u32 = types::UnsignedVarInt.decode(buf)?;
501 let unknown_value = buf.try_get_bytes(size as usize)?;
502 unknown_tagged_fields.insert(tag as i32, unknown_value);
503 }
504 Ok(Self {
505 replica_id,
506 log_end_offset,
507 last_fetch_timestamp,
508 last_caught_up_timestamp,
509 unknown_tagged_fields,
510 })
511 }
512}
513
514impl Default for ReplicaState {
515 fn default() -> Self {
516 Self {
517 replica_id: (0).into(),
518 log_end_offset: 0,
519 last_fetch_timestamp: -1,
520 last_caught_up_timestamp: -1,
521 unknown_tagged_fields: BTreeMap::new(),
522 }
523 }
524}
525
526impl Message for ReplicaState {
527 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
528 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
529}
530
531#[non_exhaustive]
533#[derive(Debug, Clone, PartialEq)]
534pub struct TopicData {
535 pub topic_name: super::TopicName,
539
540 pub partitions: Vec<PartitionData>,
544
545 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
547}
548
549impl TopicData {
550 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
556 self.topic_name = value;
557 self
558 }
559 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
565 self.partitions = value;
566 self
567 }
568 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
570 self.unknown_tagged_fields = value;
571 self
572 }
573 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
575 self.unknown_tagged_fields.insert(key, value);
576 self
577 }
578}
579
580#[cfg(feature = "broker")]
581impl Encodable for TopicData {
582 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
583 types::CompactString.encode(buf, &self.topic_name)?;
584 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
585 let num_tagged_fields = self.unknown_tagged_fields.len();
586 if num_tagged_fields > std::u32::MAX as usize {
587 bail!(
588 "Too many tagged fields to encode ({} fields)",
589 num_tagged_fields
590 );
591 }
592 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
593
594 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
595 Ok(())
596 }
597 fn compute_size(&self, version: i16) -> Result<usize> {
598 let mut total_size = 0;
599 total_size += types::CompactString.compute_size(&self.topic_name)?;
600 total_size +=
601 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
602 let num_tagged_fields = self.unknown_tagged_fields.len();
603 if num_tagged_fields > std::u32::MAX as usize {
604 bail!(
605 "Too many tagged fields to encode ({} fields)",
606 num_tagged_fields
607 );
608 }
609 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
610
611 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
612 Ok(total_size)
613 }
614}
615
616#[cfg(feature = "client")]
617impl Decodable for TopicData {
618 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
619 let topic_name = types::CompactString.decode(buf)?;
620 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
621 let mut unknown_tagged_fields = BTreeMap::new();
622 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
623 for _ in 0..num_tagged_fields {
624 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
625 let size: u32 = types::UnsignedVarInt.decode(buf)?;
626 let unknown_value = buf.try_get_bytes(size as usize)?;
627 unknown_tagged_fields.insert(tag as i32, unknown_value);
628 }
629 Ok(Self {
630 topic_name,
631 partitions,
632 unknown_tagged_fields,
633 })
634 }
635}
636
637impl Default for TopicData {
638 fn default() -> Self {
639 Self {
640 topic_name: Default::default(),
641 partitions: Default::default(),
642 unknown_tagged_fields: BTreeMap::new(),
643 }
644 }
645}
646
647impl Message for TopicData {
648 const VERSIONS: VersionRange = VersionRange { min: 0, max: 1 };
649 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
650}
651
652impl HeaderVersion for DescribeQuorumResponse {
653 fn header_version(version: i16) -> i16 {
654 1
655 }
656}