1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BatchIndexAndErrorMessage {
24 pub batch_index: i32,
28
29 pub batch_index_error_message: Option<StrBytes>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl BatchIndexAndErrorMessage {
39 pub fn with_batch_index(mut self, value: i32) -> Self {
45 self.batch_index = value;
46 self
47 }
48 pub fn with_batch_index_error_message(mut self, value: Option<StrBytes>) -> Self {
54 self.batch_index_error_message = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for BatchIndexAndErrorMessage {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 3 || version > 13 {
73 bail!("specified version not supported by this message type");
74 }
75 if version >= 8 {
76 types::Int32.encode(buf, &self.batch_index)?;
77 } else {
78 if self.batch_index != 0 {
79 bail!("A field is set that is not available on the selected protocol version");
80 }
81 }
82 if version >= 8 {
83 if version >= 9 {
84 types::CompactString.encode(buf, &self.batch_index_error_message)?;
85 } else {
86 types::String.encode(buf, &self.batch_index_error_message)?;
87 }
88 } else {
89 if !self.batch_index_error_message.is_none() {
90 bail!("A field is set that is not available on the selected protocol version");
91 }
92 }
93 if version >= 9 {
94 let num_tagged_fields = self.unknown_tagged_fields.len();
95 if num_tagged_fields > std::u32::MAX as usize {
96 bail!(
97 "Too many tagged fields to encode ({} fields)",
98 num_tagged_fields
99 );
100 }
101 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
102
103 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
104 }
105 Ok(())
106 }
107 fn compute_size(&self, version: i16) -> Result<usize> {
108 let mut total_size = 0;
109 if version >= 8 {
110 total_size += types::Int32.compute_size(&self.batch_index)?;
111 } else {
112 if self.batch_index != 0 {
113 bail!("A field is set that is not available on the selected protocol version");
114 }
115 }
116 if version >= 8 {
117 if version >= 9 {
118 total_size += types::CompactString.compute_size(&self.batch_index_error_message)?;
119 } else {
120 total_size += types::String.compute_size(&self.batch_index_error_message)?;
121 }
122 } else {
123 if !self.batch_index_error_message.is_none() {
124 bail!("A field is set that is not available on the selected protocol version");
125 }
126 }
127 if version >= 9 {
128 let num_tagged_fields = self.unknown_tagged_fields.len();
129 if num_tagged_fields > std::u32::MAX as usize {
130 bail!(
131 "Too many tagged fields to encode ({} fields)",
132 num_tagged_fields
133 );
134 }
135 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
136
137 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
138 }
139 Ok(total_size)
140 }
141}
142
143#[cfg(feature = "client")]
144impl Decodable for BatchIndexAndErrorMessage {
145 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
146 if version < 3 || version > 13 {
147 bail!("specified version not supported by this message type");
148 }
149 let batch_index = if version >= 8 {
150 types::Int32.decode(buf)?
151 } else {
152 0
153 };
154 let batch_index_error_message = if version >= 8 {
155 if version >= 9 {
156 types::CompactString.decode(buf)?
157 } else {
158 types::String.decode(buf)?
159 }
160 } else {
161 None
162 };
163 let mut unknown_tagged_fields = BTreeMap::new();
164 if version >= 9 {
165 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
166 for _ in 0..num_tagged_fields {
167 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
168 let size: u32 = types::UnsignedVarInt.decode(buf)?;
169 let unknown_value = buf.try_get_bytes(size as usize)?;
170 unknown_tagged_fields.insert(tag as i32, unknown_value);
171 }
172 }
173 Ok(Self {
174 batch_index,
175 batch_index_error_message,
176 unknown_tagged_fields,
177 })
178 }
179}
180
181impl Default for BatchIndexAndErrorMessage {
182 fn default() -> Self {
183 Self {
184 batch_index: 0,
185 batch_index_error_message: None,
186 unknown_tagged_fields: BTreeMap::new(),
187 }
188 }
189}
190
191impl Message for BatchIndexAndErrorMessage {
192 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
193 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
194}
195
196#[non_exhaustive]
198#[derive(Debug, Clone, PartialEq)]
199pub struct LeaderIdAndEpoch {
200 pub leader_id: super::BrokerId,
204
205 pub leader_epoch: i32,
209
210 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
212}
213
214impl LeaderIdAndEpoch {
215 pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
221 self.leader_id = value;
222 self
223 }
224 pub fn with_leader_epoch(mut self, value: i32) -> Self {
230 self.leader_epoch = value;
231 self
232 }
233 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
235 self.unknown_tagged_fields = value;
236 self
237 }
238 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
240 self.unknown_tagged_fields.insert(key, value);
241 self
242 }
243}
244
245#[cfg(feature = "broker")]
246impl Encodable for LeaderIdAndEpoch {
247 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
248 if version < 3 || version > 13 {
249 bail!("specified version not supported by this message type");
250 }
251 if version >= 10 {
252 types::Int32.encode(buf, &self.leader_id)?;
253 } else {
254 if self.leader_id != -1 {
255 bail!("A field is set that is not available on the selected protocol version");
256 }
257 }
258 if version >= 10 {
259 types::Int32.encode(buf, &self.leader_epoch)?;
260 } else {
261 if self.leader_epoch != -1 {
262 bail!("A field is set that is not available on the selected protocol version");
263 }
264 }
265 if version >= 9 {
266 let num_tagged_fields = self.unknown_tagged_fields.len();
267 if num_tagged_fields > std::u32::MAX as usize {
268 bail!(
269 "Too many tagged fields to encode ({} fields)",
270 num_tagged_fields
271 );
272 }
273 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
274
275 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
276 }
277 Ok(())
278 }
279 fn compute_size(&self, version: i16) -> Result<usize> {
280 let mut total_size = 0;
281 if version >= 10 {
282 total_size += types::Int32.compute_size(&self.leader_id)?;
283 } else {
284 if self.leader_id != -1 {
285 bail!("A field is set that is not available on the selected protocol version");
286 }
287 }
288 if version >= 10 {
289 total_size += types::Int32.compute_size(&self.leader_epoch)?;
290 } else {
291 if self.leader_epoch != -1 {
292 bail!("A field is set that is not available on the selected protocol version");
293 }
294 }
295 if version >= 9 {
296 let num_tagged_fields = self.unknown_tagged_fields.len();
297 if num_tagged_fields > std::u32::MAX as usize {
298 bail!(
299 "Too many tagged fields to encode ({} fields)",
300 num_tagged_fields
301 );
302 }
303 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
304
305 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
306 }
307 Ok(total_size)
308 }
309}
310
311#[cfg(feature = "client")]
312impl Decodable for LeaderIdAndEpoch {
313 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
314 if version < 3 || version > 13 {
315 bail!("specified version not supported by this message type");
316 }
317 let leader_id = if version >= 10 {
318 types::Int32.decode(buf)?
319 } else {
320 (-1).into()
321 };
322 let leader_epoch = if version >= 10 {
323 types::Int32.decode(buf)?
324 } else {
325 -1
326 };
327 let mut unknown_tagged_fields = BTreeMap::new();
328 if version >= 9 {
329 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
330 for _ in 0..num_tagged_fields {
331 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
332 let size: u32 = types::UnsignedVarInt.decode(buf)?;
333 let unknown_value = buf.try_get_bytes(size as usize)?;
334 unknown_tagged_fields.insert(tag as i32, unknown_value);
335 }
336 }
337 Ok(Self {
338 leader_id,
339 leader_epoch,
340 unknown_tagged_fields,
341 })
342 }
343}
344
345impl Default for LeaderIdAndEpoch {
346 fn default() -> Self {
347 Self {
348 leader_id: (-1).into(),
349 leader_epoch: -1,
350 unknown_tagged_fields: BTreeMap::new(),
351 }
352 }
353}
354
355impl Message for LeaderIdAndEpoch {
356 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
357 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
358}
359
360#[non_exhaustive]
362#[derive(Debug, Clone, PartialEq)]
363pub struct NodeEndpoint {
364 pub node_id: super::BrokerId,
368
369 pub host: StrBytes,
373
374 pub port: i32,
378
379 pub rack: Option<StrBytes>,
383
384 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
386}
387
388impl NodeEndpoint {
389 pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
395 self.node_id = value;
396 self
397 }
398 pub fn with_host(mut self, value: StrBytes) -> Self {
404 self.host = value;
405 self
406 }
407 pub fn with_port(mut self, value: i32) -> Self {
413 self.port = value;
414 self
415 }
416 pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
422 self.rack = value;
423 self
424 }
425 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
427 self.unknown_tagged_fields = value;
428 self
429 }
430 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
432 self.unknown_tagged_fields.insert(key, value);
433 self
434 }
435}
436
437#[cfg(feature = "broker")]
438impl Encodable for NodeEndpoint {
439 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
440 if version < 3 || version > 13 {
441 bail!("specified version not supported by this message type");
442 }
443 if version >= 10 {
444 types::Int32.encode(buf, &self.node_id)?;
445 } else {
446 if self.node_id != 0 {
447 bail!("A field is set that is not available on the selected protocol version");
448 }
449 }
450 if version >= 10 {
451 types::CompactString.encode(buf, &self.host)?;
452 } else {
453 if !self.host.is_empty() {
454 bail!("A field is set that is not available on the selected protocol version");
455 }
456 }
457 if version >= 10 {
458 types::Int32.encode(buf, &self.port)?;
459 } else {
460 if self.port != 0 {
461 bail!("A field is set that is not available on the selected protocol version");
462 }
463 }
464 if version >= 10 {
465 types::CompactString.encode(buf, &self.rack)?;
466 } else {
467 if !self.rack.is_none() {
468 bail!("A field is set that is not available on the selected protocol version");
469 }
470 }
471 if version >= 9 {
472 let num_tagged_fields = self.unknown_tagged_fields.len();
473 if num_tagged_fields > std::u32::MAX as usize {
474 bail!(
475 "Too many tagged fields to encode ({} fields)",
476 num_tagged_fields
477 );
478 }
479 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
480
481 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
482 }
483 Ok(())
484 }
485 fn compute_size(&self, version: i16) -> Result<usize> {
486 let mut total_size = 0;
487 if version >= 10 {
488 total_size += types::Int32.compute_size(&self.node_id)?;
489 } else {
490 if self.node_id != 0 {
491 bail!("A field is set that is not available on the selected protocol version");
492 }
493 }
494 if version >= 10 {
495 total_size += types::CompactString.compute_size(&self.host)?;
496 } else {
497 if !self.host.is_empty() {
498 bail!("A field is set that is not available on the selected protocol version");
499 }
500 }
501 if version >= 10 {
502 total_size += types::Int32.compute_size(&self.port)?;
503 } else {
504 if self.port != 0 {
505 bail!("A field is set that is not available on the selected protocol version");
506 }
507 }
508 if version >= 10 {
509 total_size += types::CompactString.compute_size(&self.rack)?;
510 } else {
511 if !self.rack.is_none() {
512 bail!("A field is set that is not available on the selected protocol version");
513 }
514 }
515 if version >= 9 {
516 let num_tagged_fields = self.unknown_tagged_fields.len();
517 if num_tagged_fields > std::u32::MAX as usize {
518 bail!(
519 "Too many tagged fields to encode ({} fields)",
520 num_tagged_fields
521 );
522 }
523 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
524
525 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
526 }
527 Ok(total_size)
528 }
529}
530
531#[cfg(feature = "client")]
532impl Decodable for NodeEndpoint {
533 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
534 if version < 3 || version > 13 {
535 bail!("specified version not supported by this message type");
536 }
537 let node_id = if version >= 10 {
538 types::Int32.decode(buf)?
539 } else {
540 (0).into()
541 };
542 let host = if version >= 10 {
543 types::CompactString.decode(buf)?
544 } else {
545 Default::default()
546 };
547 let port = if version >= 10 {
548 types::Int32.decode(buf)?
549 } else {
550 0
551 };
552 let rack = if version >= 10 {
553 types::CompactString.decode(buf)?
554 } else {
555 None
556 };
557 let mut unknown_tagged_fields = BTreeMap::new();
558 if version >= 9 {
559 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
560 for _ in 0..num_tagged_fields {
561 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
562 let size: u32 = types::UnsignedVarInt.decode(buf)?;
563 let unknown_value = buf.try_get_bytes(size as usize)?;
564 unknown_tagged_fields.insert(tag as i32, unknown_value);
565 }
566 }
567 Ok(Self {
568 node_id,
569 host,
570 port,
571 rack,
572 unknown_tagged_fields,
573 })
574 }
575}
576
577impl Default for NodeEndpoint {
578 fn default() -> Self {
579 Self {
580 node_id: (0).into(),
581 host: Default::default(),
582 port: 0,
583 rack: None,
584 unknown_tagged_fields: BTreeMap::new(),
585 }
586 }
587}
588
589impl Message for NodeEndpoint {
590 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
591 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
592}
593
594#[non_exhaustive]
596#[derive(Debug, Clone, PartialEq)]
597pub struct PartitionProduceResponse {
598 pub index: i32,
602
603 pub error_code: i16,
607
608 pub base_offset: i64,
612
613 pub log_append_time_ms: i64,
617
618 pub log_start_offset: i64,
622
623 pub record_errors: Vec<BatchIndexAndErrorMessage>,
627
628 pub error_message: Option<StrBytes>,
632
633 pub current_leader: LeaderIdAndEpoch,
637
638 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
640}
641
642impl PartitionProduceResponse {
643 pub fn with_index(mut self, value: i32) -> Self {
649 self.index = value;
650 self
651 }
652 pub fn with_error_code(mut self, value: i16) -> Self {
658 self.error_code = value;
659 self
660 }
661 pub fn with_base_offset(mut self, value: i64) -> Self {
667 self.base_offset = value;
668 self
669 }
670 pub fn with_log_append_time_ms(mut self, value: i64) -> Self {
676 self.log_append_time_ms = value;
677 self
678 }
679 pub fn with_log_start_offset(mut self, value: i64) -> Self {
685 self.log_start_offset = value;
686 self
687 }
688 pub fn with_record_errors(mut self, value: Vec<BatchIndexAndErrorMessage>) -> Self {
694 self.record_errors = value;
695 self
696 }
697 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
703 self.error_message = value;
704 self
705 }
706 pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
712 self.current_leader = value;
713 self
714 }
715 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
717 self.unknown_tagged_fields = value;
718 self
719 }
720 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
722 self.unknown_tagged_fields.insert(key, value);
723 self
724 }
725}
726
727#[cfg(feature = "broker")]
728impl Encodable for PartitionProduceResponse {
729 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
730 if version < 3 || version > 13 {
731 bail!("specified version not supported by this message type");
732 }
733 types::Int32.encode(buf, &self.index)?;
734 types::Int16.encode(buf, &self.error_code)?;
735 types::Int64.encode(buf, &self.base_offset)?;
736 types::Int64.encode(buf, &self.log_append_time_ms)?;
737 if version >= 5 {
738 types::Int64.encode(buf, &self.log_start_offset)?;
739 }
740 if version >= 8 {
741 if version >= 9 {
742 types::CompactArray(types::Struct { version }).encode(buf, &self.record_errors)?;
743 } else {
744 types::Array(types::Struct { version }).encode(buf, &self.record_errors)?;
745 }
746 }
747 if version >= 8 {
748 if version >= 9 {
749 types::CompactString.encode(buf, &self.error_message)?;
750 } else {
751 types::String.encode(buf, &self.error_message)?;
752 }
753 }
754 if version >= 9 {
755 let mut num_tagged_fields = self.unknown_tagged_fields.len();
756 if version >= 10 {
757 if &self.current_leader != &Default::default() {
758 num_tagged_fields += 1;
759 }
760 }
761 if num_tagged_fields > std::u32::MAX as usize {
762 bail!(
763 "Too many tagged fields to encode ({} fields)",
764 num_tagged_fields
765 );
766 }
767 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
768 if version >= 10 {
769 if &self.current_leader != &Default::default() {
770 let computed_size =
771 types::Struct { version }.compute_size(&self.current_leader)?;
772 if computed_size > std::u32::MAX as usize {
773 bail!(
774 "Tagged field is too large to encode ({} bytes)",
775 computed_size
776 );
777 }
778 types::UnsignedVarInt.encode(buf, 0)?;
779 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
780 types::Struct { version }.encode(buf, &self.current_leader)?;
781 }
782 }
783 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
784 }
785 Ok(())
786 }
787 fn compute_size(&self, version: i16) -> Result<usize> {
788 let mut total_size = 0;
789 total_size += types::Int32.compute_size(&self.index)?;
790 total_size += types::Int16.compute_size(&self.error_code)?;
791 total_size += types::Int64.compute_size(&self.base_offset)?;
792 total_size += types::Int64.compute_size(&self.log_append_time_ms)?;
793 if version >= 5 {
794 total_size += types::Int64.compute_size(&self.log_start_offset)?;
795 }
796 if version >= 8 {
797 if version >= 9 {
798 total_size += types::CompactArray(types::Struct { version })
799 .compute_size(&self.record_errors)?;
800 } else {
801 total_size +=
802 types::Array(types::Struct { version }).compute_size(&self.record_errors)?;
803 }
804 }
805 if version >= 8 {
806 if version >= 9 {
807 total_size += types::CompactString.compute_size(&self.error_message)?;
808 } else {
809 total_size += types::String.compute_size(&self.error_message)?;
810 }
811 }
812 if version >= 9 {
813 let mut num_tagged_fields = self.unknown_tagged_fields.len();
814 if version >= 10 {
815 if &self.current_leader != &Default::default() {
816 num_tagged_fields += 1;
817 }
818 }
819 if num_tagged_fields > std::u32::MAX as usize {
820 bail!(
821 "Too many tagged fields to encode ({} fields)",
822 num_tagged_fields
823 );
824 }
825 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
826 if version >= 10 {
827 if &self.current_leader != &Default::default() {
828 let computed_size =
829 types::Struct { version }.compute_size(&self.current_leader)?;
830 if computed_size > std::u32::MAX as usize {
831 bail!(
832 "Tagged field is too large to encode ({} bytes)",
833 computed_size
834 );
835 }
836 total_size += types::UnsignedVarInt.compute_size(0)?;
837 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
838 total_size += computed_size;
839 }
840 }
841 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
842 }
843 Ok(total_size)
844 }
845}
846
847#[cfg(feature = "client")]
848impl Decodable for PartitionProduceResponse {
849 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
850 if version < 3 || version > 13 {
851 bail!("specified version not supported by this message type");
852 }
853 let index = types::Int32.decode(buf)?;
854 let error_code = types::Int16.decode(buf)?;
855 let base_offset = types::Int64.decode(buf)?;
856 let log_append_time_ms = types::Int64.decode(buf)?;
857 let log_start_offset = if version >= 5 {
858 types::Int64.decode(buf)?
859 } else {
860 -1
861 };
862 let record_errors = if version >= 8 {
863 if version >= 9 {
864 types::CompactArray(types::Struct { version }).decode(buf)?
865 } else {
866 types::Array(types::Struct { version }).decode(buf)?
867 }
868 } else {
869 Default::default()
870 };
871 let error_message = if version >= 8 {
872 if version >= 9 {
873 types::CompactString.decode(buf)?
874 } else {
875 types::String.decode(buf)?
876 }
877 } else {
878 None
879 };
880 let mut current_leader = Default::default();
881 let mut unknown_tagged_fields = BTreeMap::new();
882 if version >= 9 {
883 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
884 for _ in 0..num_tagged_fields {
885 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
886 let size: u32 = types::UnsignedVarInt.decode(buf)?;
887 match tag {
888 0 => {
889 if version >= 10 {
890 current_leader = types::Struct { version }.decode(buf)?;
891 } else {
892 bail!("Tag {} is not valid for version {}", tag, version);
893 }
894 }
895 _ => {
896 let unknown_value = buf.try_get_bytes(size as usize)?;
897 unknown_tagged_fields.insert(tag as i32, unknown_value);
898 }
899 }
900 }
901 }
902 Ok(Self {
903 index,
904 error_code,
905 base_offset,
906 log_append_time_ms,
907 log_start_offset,
908 record_errors,
909 error_message,
910 current_leader,
911 unknown_tagged_fields,
912 })
913 }
914}
915
916impl Default for PartitionProduceResponse {
917 fn default() -> Self {
918 Self {
919 index: 0,
920 error_code: 0,
921 base_offset: 0,
922 log_append_time_ms: -1,
923 log_start_offset: -1,
924 record_errors: Default::default(),
925 error_message: None,
926 current_leader: Default::default(),
927 unknown_tagged_fields: BTreeMap::new(),
928 }
929 }
930}
931
932impl Message for PartitionProduceResponse {
933 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
934 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
935}
936
937#[non_exhaustive]
939#[derive(Debug, Clone, PartialEq)]
940pub struct ProduceResponse {
941 pub responses: Vec<TopicProduceResponse>,
945
946 pub throttle_time_ms: i32,
950
951 pub node_endpoints: Vec<NodeEndpoint>,
955
956 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
958}
959
960impl ProduceResponse {
961 pub fn with_responses(mut self, value: Vec<TopicProduceResponse>) -> Self {
967 self.responses = value;
968 self
969 }
970 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
976 self.throttle_time_ms = value;
977 self
978 }
979 pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
985 self.node_endpoints = value;
986 self
987 }
988 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
990 self.unknown_tagged_fields = value;
991 self
992 }
993 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
995 self.unknown_tagged_fields.insert(key, value);
996 self
997 }
998}
999
1000#[cfg(feature = "broker")]
1001impl Encodable for ProduceResponse {
1002 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1003 if version < 3 || version > 13 {
1004 bail!("specified version not supported by this message type");
1005 }
1006 if version >= 9 {
1007 types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
1008 } else {
1009 types::Array(types::Struct { version }).encode(buf, &self.responses)?;
1010 }
1011 types::Int32.encode(buf, &self.throttle_time_ms)?;
1012 if version >= 9 {
1013 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1014 if version >= 10 {
1015 if !self.node_endpoints.is_empty() {
1016 num_tagged_fields += 1;
1017 }
1018 }
1019 if num_tagged_fields > std::u32::MAX as usize {
1020 bail!(
1021 "Too many tagged fields to encode ({} fields)",
1022 num_tagged_fields
1023 );
1024 }
1025 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1026 if version >= 10 {
1027 if !self.node_endpoints.is_empty() {
1028 let computed_size = types::CompactArray(types::Struct { version })
1029 .compute_size(&self.node_endpoints)?;
1030 if computed_size > std::u32::MAX as usize {
1031 bail!(
1032 "Tagged field is too large to encode ({} bytes)",
1033 computed_size
1034 );
1035 }
1036 types::UnsignedVarInt.encode(buf, 0)?;
1037 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1038 types::CompactArray(types::Struct { version })
1039 .encode(buf, &self.node_endpoints)?;
1040 }
1041 }
1042 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
1043 }
1044 Ok(())
1045 }
1046 fn compute_size(&self, version: i16) -> Result<usize> {
1047 let mut total_size = 0;
1048 if version >= 9 {
1049 total_size +=
1050 types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
1051 } else {
1052 total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
1053 }
1054 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
1055 if version >= 9 {
1056 let mut num_tagged_fields = self.unknown_tagged_fields.len();
1057 if version >= 10 {
1058 if !self.node_endpoints.is_empty() {
1059 num_tagged_fields += 1;
1060 }
1061 }
1062 if num_tagged_fields > std::u32::MAX as usize {
1063 bail!(
1064 "Too many tagged fields to encode ({} fields)",
1065 num_tagged_fields
1066 );
1067 }
1068 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1069 if version >= 10 {
1070 if !self.node_endpoints.is_empty() {
1071 let computed_size = types::CompactArray(types::Struct { version })
1072 .compute_size(&self.node_endpoints)?;
1073 if computed_size > std::u32::MAX as usize {
1074 bail!(
1075 "Tagged field is too large to encode ({} bytes)",
1076 computed_size
1077 );
1078 }
1079 total_size += types::UnsignedVarInt.compute_size(0)?;
1080 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1081 total_size += computed_size;
1082 }
1083 }
1084 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1085 }
1086 Ok(total_size)
1087 }
1088}
1089
1090#[cfg(feature = "client")]
1091impl Decodable for ProduceResponse {
1092 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1093 if version < 3 || version > 13 {
1094 bail!("specified version not supported by this message type");
1095 }
1096 let responses = if version >= 9 {
1097 types::CompactArray(types::Struct { version }).decode(buf)?
1098 } else {
1099 types::Array(types::Struct { version }).decode(buf)?
1100 };
1101 let throttle_time_ms = types::Int32.decode(buf)?;
1102 let mut node_endpoints = Default::default();
1103 let mut unknown_tagged_fields = BTreeMap::new();
1104 if version >= 9 {
1105 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1106 for _ in 0..num_tagged_fields {
1107 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1108 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1109 match tag {
1110 0 => {
1111 if version >= 10 {
1112 node_endpoints =
1113 types::CompactArray(types::Struct { version }).decode(buf)?;
1114 } else {
1115 bail!("Tag {} is not valid for version {}", tag, version);
1116 }
1117 }
1118 _ => {
1119 let unknown_value = buf.try_get_bytes(size as usize)?;
1120 unknown_tagged_fields.insert(tag as i32, unknown_value);
1121 }
1122 }
1123 }
1124 }
1125 Ok(Self {
1126 responses,
1127 throttle_time_ms,
1128 node_endpoints,
1129 unknown_tagged_fields,
1130 })
1131 }
1132}
1133
1134impl Default for ProduceResponse {
1135 fn default() -> Self {
1136 Self {
1137 responses: Default::default(),
1138 throttle_time_ms: 0,
1139 node_endpoints: Default::default(),
1140 unknown_tagged_fields: BTreeMap::new(),
1141 }
1142 }
1143}
1144
1145impl Message for ProduceResponse {
1146 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
1147 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1148}
1149
1150#[non_exhaustive]
1152#[derive(Debug, Clone, PartialEq)]
1153pub struct TopicProduceResponse {
1154 pub name: super::TopicName,
1158
1159 pub topic_id: Uuid,
1163
1164 pub partition_responses: Vec<PartitionProduceResponse>,
1168
1169 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1171}
1172
1173impl TopicProduceResponse {
1174 pub fn with_name(mut self, value: super::TopicName) -> Self {
1180 self.name = value;
1181 self
1182 }
1183 pub fn with_topic_id(mut self, value: Uuid) -> Self {
1189 self.topic_id = value;
1190 self
1191 }
1192 pub fn with_partition_responses(mut self, value: Vec<PartitionProduceResponse>) -> Self {
1198 self.partition_responses = value;
1199 self
1200 }
1201 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1203 self.unknown_tagged_fields = value;
1204 self
1205 }
1206 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1208 self.unknown_tagged_fields.insert(key, value);
1209 self
1210 }
1211}
1212
1213#[cfg(feature = "broker")]
1214impl Encodable for TopicProduceResponse {
1215 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1216 if version < 3 || version > 13 {
1217 bail!("specified version not supported by this message type");
1218 }
1219 if version <= 12 {
1220 if version >= 9 {
1221 types::CompactString.encode(buf, &self.name)?;
1222 } else {
1223 types::String.encode(buf, &self.name)?;
1224 }
1225 }
1226 if version >= 13 {
1227 types::Uuid.encode(buf, &self.topic_id)?;
1228 }
1229 if version >= 9 {
1230 types::CompactArray(types::Struct { version })
1231 .encode(buf, &self.partition_responses)?;
1232 } else {
1233 types::Array(types::Struct { version }).encode(buf, &self.partition_responses)?;
1234 }
1235 if version >= 9 {
1236 let num_tagged_fields = self.unknown_tagged_fields.len();
1237 if num_tagged_fields > std::u32::MAX as usize {
1238 bail!(
1239 "Too many tagged fields to encode ({} fields)",
1240 num_tagged_fields
1241 );
1242 }
1243 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1244
1245 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1246 }
1247 Ok(())
1248 }
1249 fn compute_size(&self, version: i16) -> Result<usize> {
1250 let mut total_size = 0;
1251 if version <= 12 {
1252 if version >= 9 {
1253 total_size += types::CompactString.compute_size(&self.name)?;
1254 } else {
1255 total_size += types::String.compute_size(&self.name)?;
1256 }
1257 }
1258 if version >= 13 {
1259 total_size += types::Uuid.compute_size(&self.topic_id)?;
1260 }
1261 if version >= 9 {
1262 total_size += types::CompactArray(types::Struct { version })
1263 .compute_size(&self.partition_responses)?;
1264 } else {
1265 total_size +=
1266 types::Array(types::Struct { version }).compute_size(&self.partition_responses)?;
1267 }
1268 if version >= 9 {
1269 let num_tagged_fields = self.unknown_tagged_fields.len();
1270 if num_tagged_fields > std::u32::MAX as usize {
1271 bail!(
1272 "Too many tagged fields to encode ({} fields)",
1273 num_tagged_fields
1274 );
1275 }
1276 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1277
1278 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1279 }
1280 Ok(total_size)
1281 }
1282}
1283
1284#[cfg(feature = "client")]
1285impl Decodable for TopicProduceResponse {
1286 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1287 if version < 3 || version > 13 {
1288 bail!("specified version not supported by this message type");
1289 }
1290 let name = if version <= 12 {
1291 if version >= 9 {
1292 types::CompactString.decode(buf)?
1293 } else {
1294 types::String.decode(buf)?
1295 }
1296 } else {
1297 Default::default()
1298 };
1299 let topic_id = if version >= 13 {
1300 types::Uuid.decode(buf)?
1301 } else {
1302 Uuid::nil()
1303 };
1304 let partition_responses = if version >= 9 {
1305 types::CompactArray(types::Struct { version }).decode(buf)?
1306 } else {
1307 types::Array(types::Struct { version }).decode(buf)?
1308 };
1309 let mut unknown_tagged_fields = BTreeMap::new();
1310 if version >= 9 {
1311 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1312 for _ in 0..num_tagged_fields {
1313 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1314 let size: u32 = types::UnsignedVarInt.decode(buf)?;
1315 let unknown_value = buf.try_get_bytes(size as usize)?;
1316 unknown_tagged_fields.insert(tag as i32, unknown_value);
1317 }
1318 }
1319 Ok(Self {
1320 name,
1321 topic_id,
1322 partition_responses,
1323 unknown_tagged_fields,
1324 })
1325 }
1326}
1327
1328impl Default for TopicProduceResponse {
1329 fn default() -> Self {
1330 Self {
1331 name: Default::default(),
1332 topic_id: Uuid::nil(),
1333 partition_responses: Default::default(),
1334 unknown_tagged_fields: BTreeMap::new(),
1335 }
1336 }
1337}
1338
1339impl Message for TopicProduceResponse {
1340 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
1341 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1342}
1343
1344impl HeaderVersion for ProduceResponse {
1345 fn header_version(version: i16) -> i16 {
1346 if version >= 9 {
1347 1
1348 } else {
1349 0
1350 }
1351 }
1352}