1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BatchIndexAndErrorMessage {
24 pub batch_index: i32,
28
29 pub batch_index_error_message: Option<StrBytes>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl BatchIndexAndErrorMessage {
39 pub fn with_batch_index(mut self, value: i32) -> Self {
45 self.batch_index = value;
46 self
47 }
48 pub fn with_batch_index_error_message(mut self, value: Option<StrBytes>) -> Self {
54 self.batch_index_error_message = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for BatchIndexAndErrorMessage {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 11 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 8 {
76 types::Int32.encode(buf, &self.batch_index)?;
77 } else {
78 if self.batch_index != 0 {
79 bail!("A field is set that is not available on the selected protocol version");
80 }
81 }
82 if version >= 8 {
83 if version >= 9 {
84 types::CompactString.encode(buf, &self.batch_index_error_message)?;
85 } else {
86 types::String.encode(buf, &self.batch_index_error_message)?;
87 }
88 } else {
89 if !self.batch_index_error_message.is_none() {
90 bail!("A field is set that is not available on the selected protocol version");
91 }
92 }
93 if version >= 9 {
94 let num_tagged_fields = self.unknown_tagged_fields.len();
95 if num_tagged_fields > std::u32::MAX as usize {
96 bail!(
97 "Too many tagged fields to encode ({} fields)",
98 num_tagged_fields
99 );
100 }
101 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
102
103 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
104 }
105 Ok(())
106 }
107 fn compute_size(&self, version: i16) -> Result<usize> {
108 let mut total_size = 0;
109 if version >= 8 {
110 total_size += types::Int32.compute_size(&self.batch_index)?;
111 } else {
112 if self.batch_index != 0 {
113 bail!("A field is set that is not available on the selected protocol version");
114 }
115 }
116 if version >= 8 {
117 if version >= 9 {
118 total_size += types::CompactString.compute_size(&self.batch_index_error_message)?;
119 } else {
120 total_size += types::String.compute_size(&self.batch_index_error_message)?;
121 }
122 } else {
123 if !self.batch_index_error_message.is_none() {
124 bail!("A field is set that is not available on the selected protocol version");
125 }
126 }
127 if version >= 9 {
128 let num_tagged_fields = self.unknown_tagged_fields.len();
129 if num_tagged_fields > std::u32::MAX as usize {
130 bail!(
131 "Too many tagged fields to encode ({} fields)",
132 num_tagged_fields
133 );
134 }
135 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
136
137 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
138 }
139 Ok(total_size)
140 }
141}
142
143#[cfg(feature = "client")]
144impl Decodable for BatchIndexAndErrorMessage {
145 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
146 if version < 0 || version > 11 {
147 bail!("specified version not supported by this message type");
148 }
149 let batch_index = if version >= 8 {
150 types::Int32.decode(buf)?
151 } else {
152 0
153 };
154 let batch_index_error_message = if version >= 8 {
155 if version >= 9 {
156 types::CompactString.decode(buf)?
157 } else {
158 types::String.decode(buf)?
159 }
160 } else {
161 None
162 };
163 let mut unknown_tagged_fields = BTreeMap::new();
164 if version >= 9 {
165 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
166 for _ in 0..num_tagged_fields {
167 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
168 let size: u32 = types::UnsignedVarInt.decode(buf)?;
169 let unknown_value = buf.try_get_bytes(size as usize)?;
170 unknown_tagged_fields.insert(tag as i32, unknown_value);
171 }
172 }
173 Ok(Self {
174 batch_index,
175 batch_index_error_message,
176 unknown_tagged_fields,
177 })
178 }
179}
180
181impl Default for BatchIndexAndErrorMessage {
182 fn default() -> Self {
183 Self {
184 batch_index: 0,
185 batch_index_error_message: None,
186 unknown_tagged_fields: BTreeMap::new(),
187 }
188 }
189}
190
191impl Message for BatchIndexAndErrorMessage {
192 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
193 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
194}
195
196#[non_exhaustive]
198#[derive(Debug, Clone, PartialEq)]
199pub struct LeaderIdAndEpoch {
200 pub leader_id: super::BrokerId,
204
205 pub leader_epoch: i32,
209
210 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
212}
213
214impl LeaderIdAndEpoch {
215 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
221 self.leader_id = value;
222 self
223 }
224 pub fn with_leader_epoch(mut self, value: i32) -> Self {
230 self.leader_epoch = value;
231 self
232 }
233 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
235 self.unknown_tagged_fields = value;
236 self
237 }
238 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
240 self.unknown_tagged_fields.insert(key, value);
241 self
242 }
243}
244
245#[cfg(feature = "broker")]
246impl Encodable for LeaderIdAndEpoch {
247 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
248 if version < 0 || version > 11 {
249 bail!("specified version not supported by this message type");
250 }
251 if version >= 10 {
252 types::Int32.encode(buf, &self.leader_id)?;
253 } else {
254 if self.leader_id != -1 {
255 bail!("A field is set that is not available on the selected protocol version");
256 }
257 }
258 if version >= 10 {
259 types::Int32.encode(buf, &self.leader_epoch)?;
260 } else {
261 if self.leader_epoch != -1 {
262 bail!("A field is set that is not available on the selected protocol version");
263 }
264 }
265 if version >= 9 {
266 let num_tagged_fields = self.unknown_tagged_fields.len();
267 if num_tagged_fields > std::u32::MAX as usize {
268 bail!(
269 "Too many tagged fields to encode ({} fields)",
270 num_tagged_fields
271 );
272 }
273 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
274
275 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
276 }
277 Ok(())
278 }
279 fn compute_size(&self, version: i16) -> Result<usize> {
280 let mut total_size = 0;
281 if version >= 10 {
282 total_size += types::Int32.compute_size(&self.leader_id)?;
283 } else {
284 if self.leader_id != -1 {
285 bail!("A field is set that is not available on the selected protocol version");
286 }
287 }
288 if version >= 10 {
289 total_size += types::Int32.compute_size(&self.leader_epoch)?;
290 } else {
291 if self.leader_epoch != -1 {
292 bail!("A field is set that is not available on the selected protocol version");
293 }
294 }
295 if version >= 9 {
296 let num_tagged_fields = self.unknown_tagged_fields.len();
297 if num_tagged_fields > std::u32::MAX as usize {
298 bail!(
299 "Too many tagged fields to encode ({} fields)",
300 num_tagged_fields
301 );
302 }
303 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
304
305 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
306 }
307 Ok(total_size)
308 }
309}
310
311#[cfg(feature = "client")]
312impl Decodable for LeaderIdAndEpoch {
313 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
314 if version < 0 || version > 11 {
315 bail!("specified version not supported by this message type");
316 }
317 let leader_id = if version >= 10 {
318 types::Int32.decode(buf)?
319 } else {
320 (-1).into()
321 };
322 let leader_epoch = if version >= 10 {
323 types::Int32.decode(buf)?
324 } else {
325 -1
326 };
327 let mut unknown_tagged_fields = BTreeMap::new();
328 if version >= 9 {
329 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
330 for _ in 0..num_tagged_fields {
331 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
332 let size: u32 = types::UnsignedVarInt.decode(buf)?;
333 let unknown_value = buf.try_get_bytes(size as usize)?;
334 unknown_tagged_fields.insert(tag as i32, unknown_value);
335 }
336 }
337 Ok(Self {
338 leader_id,
339 leader_epoch,
340 unknown_tagged_fields,
341 })
342 }
343}
344
345impl Default for LeaderIdAndEpoch {
346 fn default() -> Self {
347 Self {
348 leader_id: (-1).into(),
349 leader_epoch: -1,
350 unknown_tagged_fields: BTreeMap::new(),
351 }
352 }
353}
354
355impl Message for LeaderIdAndEpoch {
356 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
357 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
358}
359
360#[non_exhaustive]
362#[derive(Debug, Clone, PartialEq)]
363pub struct NodeEndpoint {
364 pub node_id: super::BrokerId,
368
369 pub host: StrBytes,
373
374 pub port: i32,
378
379 pub rack: Option<StrBytes>,
383
384 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
386}
387
388impl NodeEndpoint {
389 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
395 self.node_id = value;
396 self
397 }
398 pub fn with_host(mut self, value: StrBytes) -> Self {
404 self.host = value;
405 self
406 }
407 pub fn with_port(mut self, value: i32) -> Self {
413 self.port = value;
414 self
415 }
416 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
422 self.rack = value;
423 self
424 }
425 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
427 self.unknown_tagged_fields = value;
428 self
429 }
430 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
432 self.unknown_tagged_fields.insert(key, value);
433 self
434 }
435}
436
437#[cfg(feature = "broker")]
438impl Encodable for NodeEndpoint {
439 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
440 if version < 0 || version > 11 {
441 bail!("specified version not supported by this message type");
442 }
443 if version >= 10 {
444 types::Int32.encode(buf, &self.node_id)?;
445 } else {
446 if self.node_id != 0 {
447 bail!("A field is set that is not available on the selected protocol version");
448 }
449 }
450 if version >= 10 {
451 types::CompactString.encode(buf, &self.host)?;
452 } else {
453 if !self.host.is_empty() {
454 bail!("A field is set that is not available on the selected protocol version");
455 }
456 }
457 if version >= 10 {
458 types::Int32.encode(buf, &self.port)?;
459 } else {
460 if self.port != 0 {
461 bail!("A field is set that is not available on the selected protocol version");
462 }
463 }
464 if version >= 10 {
465 types::CompactString.encode(buf, &self.rack)?;
466 } else {
467 if !self.rack.is_none() {
468 bail!("A field is set that is not available on the selected protocol version");
469 }
470 }
471 if version >= 9 {
472 let num_tagged_fields = self.unknown_tagged_fields.len();
473 if num_tagged_fields > std::u32::MAX as usize {
474 bail!(
475 "Too many tagged fields to encode ({} fields)",
476 num_tagged_fields
477 );
478 }
479 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
480
481 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
482 }
483 Ok(())
484 }
485 fn compute_size(&self, version: i16) -> Result<usize> {
486 let mut total_size = 0;
487 if version >= 10 {
488 total_size += types::Int32.compute_size(&self.node_id)?;
489 } else {
490 if self.node_id != 0 {
491 bail!("A field is set that is not available on the selected protocol version");
492 }
493 }
494 if version >= 10 {
495 total_size += types::CompactString.compute_size(&self.host)?;
496 } else {
497 if !self.host.is_empty() {
498 bail!("A field is set that is not available on the selected protocol version");
499 }
500 }
501 if version >= 10 {
502 total_size += types::Int32.compute_size(&self.port)?;
503 } else {
504 if self.port != 0 {
505 bail!("A field is set that is not available on the selected protocol version");
506 }
507 }
508 if version >= 10 {
509 total_size += types::CompactString.compute_size(&self.rack)?;
510 } else {
511 if !self.rack.is_none() {
512 bail!("A field is set that is not available on the selected protocol version");
513 }
514 }
515 if version >= 9 {
516 let num_tagged_fields = self.unknown_tagged_fields.len();
517 if num_tagged_fields > std::u32::MAX as usize {
518 bail!(
519 "Too many tagged fields to encode ({} fields)",
520 num_tagged_fields
521 );
522 }
523 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
524
525 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
526 }
527 Ok(total_size)
528 }
529}
530
531#[cfg(feature = "client")]
532impl Decodable for NodeEndpoint {
533 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
534 if version < 0 || version > 11 {
535 bail!("specified version not supported by this message type");
536 }
537 let node_id = if version >= 10 {
538 types::Int32.decode(buf)?
539 } else {
540 (0).into()
541 };
542 let host = if version >= 10 {
543 types::CompactString.decode(buf)?
544 } else {
545 Default::default()
546 };
547 let port = if version >= 10 {
548 types::Int32.decode(buf)?
549 } else {
550 0
551 };
552 let rack = if version >= 10 {
553 types::CompactString.decode(buf)?
554 } else {
555 None
556 };
557 let mut unknown_tagged_fields = BTreeMap::new();
558 if version >= 9 {
559 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
560 for _ in 0..num_tagged_fields {
561 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
562 let size: u32 = types::UnsignedVarInt.decode(buf)?;
563 let unknown_value = buf.try_get_bytes(size as usize)?;
564 unknown_tagged_fields.insert(tag as i32, unknown_value);
565 }
566 }
567 Ok(Self {
568 node_id,
569 host,
570 port,
571 rack,
572 unknown_tagged_fields,
573 })
574 }
575}
576
577impl Default for NodeEndpoint {
578 fn default() -> Self {
579 Self {
580 node_id: (0).into(),
581 host: Default::default(),
582 port: 0,
583 rack: None,
584 unknown_tagged_fields: BTreeMap::new(),
585 }
586 }
587}
588
589impl Message for NodeEndpoint {
590 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
591 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
592}
593
594#[non_exhaustive]
596#[derive(Debug, Clone, PartialEq)]
597pub struct PartitionProduceResponse {
598 pub index: i32,
602
603 pub error_code: i16,
607
608 pub base_offset: i64,
612
613 pub log_append_time_ms: i64,
617
618 pub log_start_offset: i64,
622
623 pub record_errors: Vec<BatchIndexAndErrorMessage>,
627
628 pub error_message: Option<StrBytes>,
632
633 pub current_leader: LeaderIdAndEpoch,
637
638 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
640}
641
642impl PartitionProduceResponse {
643 pub fn with_index(mut self, value: i32) -> Self {
649 self.index = value;
650 self
651 }
652 pub fn with_error_code(mut self, value: i16) -> Self {
658 self.error_code = value;
659 self
660 }
661 pub fn with_base_offset(mut self, value: i64) -> Self {
667 self.base_offset = value;
668 self
669 }
670 pub fn with_log_append_time_ms(mut self, value: i64) -> Self {
676 self.log_append_time_ms = value;
677 self
678 }
679 pub fn with_log_start_offset(mut self, value: i64) -> Self {
685 self.log_start_offset = value;
686 self
687 }
688 pub fn with_record_errors(mut self, value: Vec<BatchIndexAndErrorMessage>) -> Self {
694 self.record_errors = value;
695 self
696 }
697 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
703 self.error_message = value;
704 self
705 }
706 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
712 self.current_leader = value;
713 self
714 }
715 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
717 self.unknown_tagged_fields = value;
718 self
719 }
720 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
722 self.unknown_tagged_fields.insert(key, value);
723 self
724 }
725}
726
727#[cfg(feature = "broker")]
728impl Encodable for PartitionProduceResponse {
729 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
730 if version < 0 || version > 11 {
731 bail!("specified version not supported by this message type");
732 }
733 types::Int32.encode(buf, &self.index)?;
734 types::Int16.encode(buf, &self.error_code)?;
735 types::Int64.encode(buf, &self.base_offset)?;
736 if version >= 2 {
737 types::Int64.encode(buf, &self.log_append_time_ms)?;
738 }
739 if version >= 5 {
740 types::Int64.encode(buf, &self.log_start_offset)?;
741 }
742 if version >= 8 {
743 if version >= 9 {
744 types::CompactArray(types::Struct { version }).encode(buf, &self.record_errors)?;
745 } else {
746 types::Array(types::Struct { version }).encode(buf, &self.record_errors)?;
747 }
748 }
749 if version >= 8 {
750 if version >= 9 {
751 types::CompactString.encode(buf, &self.error_message)?;
752 } else {
753 types::String.encode(buf, &self.error_message)?;
754 }
755 }
756 if version >= 9 {
757 let mut num_tagged_fields = self.unknown_tagged_fields.len();
758 if version >= 10 {
759 if &self.current_leader != &Default::default() {
760 num_tagged_fields += 1;
761 }
762 }
763 if num_tagged_fields > std::u32::MAX as usize {
764 bail!(
765 "Too many tagged fields to encode ({} fields)",
766 num_tagged_fields
767 );
768 }
769 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
770 if version >= 10 {
771 if &self.current_leader != &Default::default() {
772 let computed_size =
773 types::Struct { version }.compute_size(&self.current_leader)?;
774 if computed_size > std::u32::MAX as usize {
775 bail!(
776 "Tagged field is too large to encode ({} bytes)",
777 computed_size
778 );
779 }
780 types::UnsignedVarInt.encode(buf, 0)?;
781 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
782 types::Struct { version }.encode(buf, &self.current_leader)?;
783 }
784 }
785 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
786 }
787 Ok(())
788 }
789 fn compute_size(&self, version: i16) -> Result<usize> {
790 let mut total_size = 0;
791 total_size += types::Int32.compute_size(&self.index)?;
792 total_size += types::Int16.compute_size(&self.error_code)?;
793 total_size += types::Int64.compute_size(&self.base_offset)?;
794 if version >= 2 {
795 total_size += types::Int64.compute_size(&self.log_append_time_ms)?;
796 }
797 if version >= 5 {
798 total_size += types::Int64.compute_size(&self.log_start_offset)?;
799 }
800 if version >= 8 {
801 if version >= 9 {
802 total_size += types::CompactArray(types::Struct { version })
803 .compute_size(&self.record_errors)?;
804 } else {
805 total_size +=
806 types::Array(types::Struct { version }).compute_size(&self.record_errors)?;
807 }
808 }
809 if version >= 8 {
810 if version >= 9 {
811 total_size += types::CompactString.compute_size(&self.error_message)?;
812 } else {
813 total_size += types::String.compute_size(&self.error_message)?;
814 }
815 }
816 if version >= 9 {
817 let mut num_tagged_fields = self.unknown_tagged_fields.len();
818 if version >= 10 {
819 if &self.current_leader != &Default::default() {
820 num_tagged_fields += 1;
821 }
822 }
823 if num_tagged_fields > std::u32::MAX as usize {
824 bail!(
825 "Too many tagged fields to encode ({} fields)",
826 num_tagged_fields
827 );
828 }
829 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
830 if version >= 10 {
831 if &self.current_leader != &Default::default() {
832 let computed_size =
833 types::Struct { version }.compute_size(&self.current_leader)?;
834 if computed_size > std::u32::MAX as usize {
835 bail!(
836 "Tagged field is too large to encode ({} bytes)",
837 computed_size
838 );
839 }
840 total_size += types::UnsignedVarInt.compute_size(0)?;
841 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
842 total_size += computed_size;
843 }
844 }
845 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
846 }
847 Ok(total_size)
848 }
849}
850
851#[cfg(feature = "client")]
852impl Decodable for PartitionProduceResponse {
853 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
854 if version < 0 || version > 11 {
855 bail!("specified version not supported by this message type");
856 }
857 let index = types::Int32.decode(buf)?;
858 let error_code = types::Int16.decode(buf)?;
859 let base_offset = types::Int64.decode(buf)?;
860 let log_append_time_ms = if version >= 2 {
861 types::Int64.decode(buf)?
862 } else {
863 -1
864 };
865 let log_start_offset = if version >= 5 {
866 types::Int64.decode(buf)?
867 } else {
868 -1
869 };
870 let record_errors = if version >= 8 {
871 if version >= 9 {
872 types::CompactArray(types::Struct { version }).decode(buf)?
873 } else {
874 types::Array(types::Struct { version }).decode(buf)?
875 }
876 } else {
877 Default::default()
878 };
879 let error_message = if version >= 8 {
880 if version >= 9 {
881 types::CompactString.decode(buf)?
882 } else {
883 types::String.decode(buf)?
884 }
885 } else {
886 None
887 };
888 let mut current_leader = Default::default();
889 let mut unknown_tagged_fields = BTreeMap::new();
890 if version >= 9 {
891 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
892 for _ in 0..num_tagged_fields {
893 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
894 let size: u32 = types::UnsignedVarInt.decode(buf)?;
895 match tag {
896 0 => {
897 if version >= 10 {
898 current_leader = types::Struct { version }.decode(buf)?;
899 } else {
900 bail!("Tag {} is not valid for version {}", tag, version);
901 }
902 }
903 _ => {
904 let unknown_value = buf.try_get_bytes(size as usize)?;
905 unknown_tagged_fields.insert(tag as i32, unknown_value);
906 }
907 }
908 }
909 }
910 Ok(Self {
911 index,
912 error_code,
913 base_offset,
914 log_append_time_ms,
915 log_start_offset,
916 record_errors,
917 error_message,
918 current_leader,
919 unknown_tagged_fields,
920 })
921 }
922}
923
924impl Default for PartitionProduceResponse {
925 fn default() -> Self {
926 Self {
927 index: 0,
928 error_code: 0,
929 base_offset: 0,
930 log_append_time_ms: -1,
931 log_start_offset: -1,
932 record_errors: Default::default(),
933 error_message: None,
934 current_leader: Default::default(),
935 unknown_tagged_fields: BTreeMap::new(),
936 }
937 }
938}
939
940impl Message for PartitionProduceResponse {
941 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
942 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
943}
944
945#[non_exhaustive]
947#[derive(Debug, Clone, PartialEq)]
948pub struct ProduceResponse {
949 pub responses: Vec<TopicProduceResponse>,
953
954 pub throttle_time_ms: i32,
958
959 pub node_endpoints: Vec<NodeEndpoint>,
963
964 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
966}
967
968impl ProduceResponse {
969 pub fn with_responses(mut self, value: Vec<TopicProduceResponse>) -> Self {
975 self.responses = value;
976 self
977 }
978 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
984 self.throttle_time_ms = value;
985 self
986 }
987 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
993 self.node_endpoints = value;
994 self
995 }
996 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
998 self.unknown_tagged_fields = value;
999 self
1000 }
1001 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1003 self.unknown_tagged_fields.insert(key, value);
1004 self
1005 }
1006}
1007
1008#[cfg(feature = "broker")]
1009impl Encodable for ProduceResponse {
1010 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1011 if version < 0 || version > 11 {
1012 bail!("specified version not supported by this message type");
1013 }
1014 if version >= 9 {
1015 types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
1016 } else {
1017 types::Array(types::Struct { version }).encode(buf, &self.responses)?;
1018 }
1019 if version >= 1 {
1020 types::Int32.encode(buf, &self.throttle_time_ms)?;
1021 }
1022 if version >= 9 {
1023 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1024 if version >= 10 {
1025 if !self.node_endpoints.is_empty() {
1026 num_tagged_fields += 1;
1027 }
1028 }
1029 if num_tagged_fields > std::u32::MAX as usize {
1030 bail!(
1031 "Too many tagged fields to encode ({} fields)",
1032 num_tagged_fields
1033 );
1034 }
1035 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1036 if version >= 10 {
1037 if !self.node_endpoints.is_empty() {
1038 let computed_size = types::CompactArray(types::Struct { version })
1039 .compute_size(&self.node_endpoints)?;
1040 if computed_size > std::u32::MAX as usize {
1041 bail!(
1042 "Tagged field is too large to encode ({} bytes)",
1043 computed_size
1044 );
1045 }
1046 types::UnsignedVarInt.encode(buf, 0)?;
1047 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1048 types::CompactArray(types::Struct { version })
1049 .encode(buf, &self.node_endpoints)?;
1050 }
1051 }
1052 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
1053 }
1054 Ok(())
1055 }
1056 fn compute_size(&self, version: i16) -> Result<usize> {
1057 let mut total_size = 0;
1058 if version >= 9 {
1059 total_size +=
1060 types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
1061 } else {
1062 total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
1063 }
1064 if version >= 1 {
1065 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
1066 }
1067 if version >= 9 {
1068 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1069 if version >= 10 {
1070 if !self.node_endpoints.is_empty() {
1071 num_tagged_fields += 1;
1072 }
1073 }
1074 if num_tagged_fields > std::u32::MAX as usize {
1075 bail!(
1076 "Too many tagged fields to encode ({} fields)",
1077 num_tagged_fields
1078 );
1079 }
1080 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1081 if version >= 10 {
1082 if !self.node_endpoints.is_empty() {
1083 let computed_size = types::CompactArray(types::Struct { version })
1084 .compute_size(&self.node_endpoints)?;
1085 if computed_size > std::u32::MAX as usize {
1086 bail!(
1087 "Tagged field is too large to encode ({} bytes)",
1088 computed_size
1089 );
1090 }
1091 total_size += types::UnsignedVarInt.compute_size(0)?;
1092 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1093 total_size += computed_size;
1094 }
1095 }
1096 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1097 }
1098 Ok(total_size)
1099 }
1100}
1101
1102#[cfg(feature = "client")]
1103impl Decodable for ProduceResponse {
1104 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1105 if version < 0 || version > 11 {
1106 bail!("specified version not supported by this message type");
1107 }
1108 let responses = if version >= 9 {
1109 types::CompactArray(types::Struct { version }).decode(buf)?
1110 } else {
1111 types::Array(types::Struct { version }).decode(buf)?
1112 };
1113 let throttle_time_ms = if version >= 1 {
1114 types::Int32.decode(buf)?
1115 } else {
1116 0
1117 };
1118 let mut node_endpoints = Default::default();
1119 let mut unknown_tagged_fields = BTreeMap::new();
1120 if version >= 9 {
1121 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1122 for _ in 0..num_tagged_fields {
1123 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1124 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1125 match tag {
1126 0 => {
1127 if version >= 10 {
1128 node_endpoints =
1129 types::CompactArray(types::Struct { version }).decode(buf)?;
1130 } else {
1131 bail!("Tag {} is not valid for version {}", tag, version);
1132 }
1133 }
1134 _ => {
1135 let unknown_value = buf.try_get_bytes(size as usize)?;
1136 unknown_tagged_fields.insert(tag as i32, unknown_value);
1137 }
1138 }
1139 }
1140 }
1141 Ok(Self {
1142 responses,
1143 throttle_time_ms,
1144 node_endpoints,
1145 unknown_tagged_fields,
1146 })
1147 }
1148}
1149
1150impl Default for ProduceResponse {
1151 fn default() -> Self {
1152 Self {
1153 responses: Default::default(),
1154 throttle_time_ms: 0,
1155 node_endpoints: Default::default(),
1156 unknown_tagged_fields: BTreeMap::new(),
1157 }
1158 }
1159}
1160
1161impl Message for ProduceResponse {
1162 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
1163 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1164}
1165
1166#[non_exhaustive]
1168#[derive(Debug, Clone, PartialEq)]
1169pub struct TopicProduceResponse {
1170 pub name: super::TopicName,
1174
1175 pub partition_responses: Vec<PartitionProduceResponse>,
1179
1180 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1182}
1183
1184impl TopicProduceResponse {
1185 pub fn with_name(mut self, value: super::TopicName) -> Self {
1191 self.name = value;
1192 self
1193 }
1194 pub fn with_partition_responses(mut self, value: Vec<PartitionProduceResponse>) -> Self {
1200 self.partition_responses = value;
1201 self
1202 }
1203 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1205 self.unknown_tagged_fields = value;
1206 self
1207 }
1208 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1210 self.unknown_tagged_fields.insert(key, value);
1211 self
1212 }
1213}
1214
1215#[cfg(feature = "broker")]
1216impl Encodable for TopicProduceResponse {
1217 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1218 if version < 0 || version > 11 {
1219 bail!("specified version not supported by this message type");
1220 }
1221 if version >= 9 {
1222 types::CompactString.encode(buf, &self.name)?;
1223 } else {
1224 types::String.encode(buf, &self.name)?;
1225 }
1226 if version >= 9 {
1227 types::CompactArray(types::Struct { version })
1228 .encode(buf, &self.partition_responses)?;
1229 } else {
1230 types::Array(types::Struct { version }).encode(buf, &self.partition_responses)?;
1231 }
1232 if version >= 9 {
1233 let num_tagged_fields = self.unknown_tagged_fields.len();
1234 if num_tagged_fields > std::u32::MAX as usize {
1235 bail!(
1236 "Too many tagged fields to encode ({} fields)",
1237 num_tagged_fields
1238 );
1239 }
1240 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1241
1242 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1243 }
1244 Ok(())
1245 }
1246 fn compute_size(&self, version: i16) -> Result<usize> {
1247 let mut total_size = 0;
1248 if version >= 9 {
1249 total_size += types::CompactString.compute_size(&self.name)?;
1250 } else {
1251 total_size += types::String.compute_size(&self.name)?;
1252 }
1253 if version >= 9 {
1254 total_size += types::CompactArray(types::Struct { version })
1255 .compute_size(&self.partition_responses)?;
1256 } else {
1257 total_size +=
1258 types::Array(types::Struct { version }).compute_size(&self.partition_responses)?;
1259 }
1260 if version >= 9 {
1261 let num_tagged_fields = self.unknown_tagged_fields.len();
1262 if num_tagged_fields > std::u32::MAX as usize {
1263 bail!(
1264 "Too many tagged fields to encode ({} fields)",
1265 num_tagged_fields
1266 );
1267 }
1268 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1269
1270 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1271 }
1272 Ok(total_size)
1273 }
1274}
1275
1276#[cfg(feature = "client")]
1277impl Decodable for TopicProduceResponse {
1278 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1279 if version < 0 || version > 11 {
1280 bail!("specified version not supported by this message type");
1281 }
1282 let name = if version >= 9 {
1283 types::CompactString.decode(buf)?
1284 } else {
1285 types::String.decode(buf)?
1286 };
1287 let partition_responses = if version >= 9 {
1288 types::CompactArray(types::Struct { version }).decode(buf)?
1289 } else {
1290 types::Array(types::Struct { version }).decode(buf)?
1291 };
1292 let mut unknown_tagged_fields = BTreeMap::new();
1293 if version >= 9 {
1294 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1295 for _ in 0..num_tagged_fields {
1296 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1297 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1298 let unknown_value = buf.try_get_bytes(size as usize)?;
1299 unknown_tagged_fields.insert(tag as i32, unknown_value);
1300 }
1301 }
1302 Ok(Self {
1303 name,
1304 partition_responses,
1305 unknown_tagged_fields,
1306 })
1307 }
1308}
1309
1310impl Default for TopicProduceResponse {
1311 fn default() -> Self {
1312 Self {
1313 name: Default::default(),
1314 partition_responses: Default::default(),
1315 unknown_tagged_fields: BTreeMap::new(),
1316 }
1317 }
1318}
1319
1320impl Message for TopicProduceResponse {
1321 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
1322 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1323}
1324
1325impl HeaderVersion for ProduceResponse {
1326 fn header_version(version: i16) -> i16 {
1327 if version >= 9 {
1328 1
1329 } else {
1330 0
1331 }
1332 }
1333}