kafka_protocol/messages/
produce_response.rs

1//! ProduceResponse
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ProduceResponse.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 0-11
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BatchIndexAndErrorMessage {
24    /// The batch index of the record that cause the batch to be dropped
25    ///
26    /// Supported API versions: 8-11
27    pub batch_index: i32,
28
29    /// The error message of the record that caused the batch to be dropped
30    ///
31    /// Supported API versions: 8-11
32    pub batch_index_error_message: Option<StrBytes>,
33
34    /// Other tagged fields
35    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl BatchIndexAndErrorMessage {
39    /// Sets `batch_index` to the passed value.
40    ///
41    /// The batch index of the record that cause the batch to be dropped
42    ///
43    /// Supported API versions: 8-11
44    pub fn with_batch_index(mut self, value: i32) -> Self {
45        self.batch_index = value;
46        self
47    }
48    /// Sets `batch_index_error_message` to the passed value.
49    ///
50    /// The error message of the record that caused the batch to be dropped
51    ///
52    /// Supported API versions: 8-11
53    pub fn with_batch_index_error_message(mut self, value: Option<StrBytes>) -> Self {
54        self.batch_index_error_message = value;
55        self
56    }
57    /// Sets unknown_tagged_fields to the passed value.
58    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59        self.unknown_tagged_fields = value;
60        self
61    }
62    /// Inserts an entry into unknown_tagged_fields.
63    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64        self.unknown_tagged_fields.insert(key, value);
65        self
66    }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for BatchIndexAndErrorMessage {
71    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72        if version < 0 || version > 11 {
73            bail!("specified version not supported by this message type");
74        }
75        if version >= 8 {
76            types::Int32.encode(buf, &self.batch_index)?;
77        } else {
78            if self.batch_index != 0 {
79                bail!("A field is set that is not available on the selected protocol version");
80            }
81        }
82        if version >= 8 {
83            if version >= 9 {
84                types::CompactString.encode(buf, &self.batch_index_error_message)?;
85            } else {
86                types::String.encode(buf, &self.batch_index_error_message)?;
87            }
88        } else {
89            if !self.batch_index_error_message.is_none() {
90                bail!("A field is set that is not available on the selected protocol version");
91            }
92        }
93        if version >= 9 {
94            let num_tagged_fields = self.unknown_tagged_fields.len();
95            if num_tagged_fields > std::u32::MAX as usize {
96                bail!(
97                    "Too many tagged fields to encode ({} fields)",
98                    num_tagged_fields
99                );
100            }
101            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
102
103            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
104        }
105        Ok(())
106    }
107    fn compute_size(&self, version: i16) -> Result<usize> {
108        let mut total_size = 0;
109        if version >= 8 {
110            total_size += types::Int32.compute_size(&self.batch_index)?;
111        } else {
112            if self.batch_index != 0 {
113                bail!("A field is set that is not available on the selected protocol version");
114            }
115        }
116        if version >= 8 {
117            if version >= 9 {
118                total_size += types::CompactString.compute_size(&self.batch_index_error_message)?;
119            } else {
120                total_size += types::String.compute_size(&self.batch_index_error_message)?;
121            }
122        } else {
123            if !self.batch_index_error_message.is_none() {
124                bail!("A field is set that is not available on the selected protocol version");
125            }
126        }
127        if version >= 9 {
128            let num_tagged_fields = self.unknown_tagged_fields.len();
129            if num_tagged_fields > std::u32::MAX as usize {
130                bail!(
131                    "Too many tagged fields to encode ({} fields)",
132                    num_tagged_fields
133                );
134            }
135            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
136
137            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
138        }
139        Ok(total_size)
140    }
141}
142
143#[cfg(feature = "client")]
144impl Decodable for BatchIndexAndErrorMessage {
145    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
146        if version < 0 || version > 11 {
147            bail!("specified version not supported by this message type");
148        }
149        let batch_index = if version >= 8 {
150            types::Int32.decode(buf)?
151        } else {
152            0
153        };
154        let batch_index_error_message = if version >= 8 {
155            if version >= 9 {
156                types::CompactString.decode(buf)?
157            } else {
158                types::String.decode(buf)?
159            }
160        } else {
161            None
162        };
163        let mut unknown_tagged_fields = BTreeMap::new();
164        if version >= 9 {
165            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
166            for _ in 0..num_tagged_fields {
167                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
168                let size: u32 = types::UnsignedVarInt.decode(buf)?;
169                let unknown_value = buf.try_get_bytes(size as usize)?;
170                unknown_tagged_fields.insert(tag as i32, unknown_value);
171            }
172        }
173        Ok(Self {
174            batch_index,
175            batch_index_error_message,
176            unknown_tagged_fields,
177        })
178    }
179}
180
181impl Default for BatchIndexAndErrorMessage {
182    fn default() -> Self {
183        Self {
184            batch_index: 0,
185            batch_index_error_message: None,
186            unknown_tagged_fields: BTreeMap::new(),
187        }
188    }
189}
190
191impl Message for BatchIndexAndErrorMessage {
192    const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
193    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
194}
195
196/// Valid versions: 0-11
197#[non_exhaustive]
198#[derive(Debug, Clone, PartialEq)]
199pub struct LeaderIdAndEpoch {
200    /// The ID of the current leader or -1 if the leader is unknown.
201    ///
202    /// Supported API versions: 10-11
203    pub leader_id: super::BrokerId,
204
205    /// The latest known leader epoch
206    ///
207    /// Supported API versions: 10-11
208    pub leader_epoch: i32,
209
210    /// Other tagged fields
211    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
212}
213
214impl LeaderIdAndEpoch {
215    /// Sets `leader_id` to the passed value.
216    ///
217    /// The ID of the current leader or -1 if the leader is unknown.
218    ///
219    /// Supported API versions: 10-11
220    pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
221        self.leader_id = value;
222        self
223    }
224    /// Sets `leader_epoch` to the passed value.
225    ///
226    /// The latest known leader epoch
227    ///
228    /// Supported API versions: 10-11
229    pub fn with_leader_epoch(mut self, value: i32) -> Self {
230        self.leader_epoch = value;
231        self
232    }
233    /// Sets unknown_tagged_fields to the passed value.
234    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
235        self.unknown_tagged_fields = value;
236        self
237    }
238    /// Inserts an entry into unknown_tagged_fields.
239    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
240        self.unknown_tagged_fields.insert(key, value);
241        self
242    }
243}
244
245#[cfg(feature = "broker")]
246impl Encodable for LeaderIdAndEpoch {
247    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
248        if version < 0 || version > 11 {
249            bail!("specified version not supported by this message type");
250        }
251        if version >= 10 {
252            types::Int32.encode(buf, &self.leader_id)?;
253        } else {
254            if self.leader_id != -1 {
255                bail!("A field is set that is not available on the selected protocol version");
256            }
257        }
258        if version >= 10 {
259            types::Int32.encode(buf, &self.leader_epoch)?;
260        } else {
261            if self.leader_epoch != -1 {
262                bail!("A field is set that is not available on the selected protocol version");
263            }
264        }
265        if version >= 9 {
266            let num_tagged_fields = self.unknown_tagged_fields.len();
267            if num_tagged_fields > std::u32::MAX as usize {
268                bail!(
269                    "Too many tagged fields to encode ({} fields)",
270                    num_tagged_fields
271                );
272            }
273            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
274
275            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
276        }
277        Ok(())
278    }
279    fn compute_size(&self, version: i16) -> Result<usize> {
280        let mut total_size = 0;
281        if version >= 10 {
282            total_size += types::Int32.compute_size(&self.leader_id)?;
283        } else {
284            if self.leader_id != -1 {
285                bail!("A field is set that is not available on the selected protocol version");
286            }
287        }
288        if version >= 10 {
289            total_size += types::Int32.compute_size(&self.leader_epoch)?;
290        } else {
291            if self.leader_epoch != -1 {
292                bail!("A field is set that is not available on the selected protocol version");
293            }
294        }
295        if version >= 9 {
296            let num_tagged_fields = self.unknown_tagged_fields.len();
297            if num_tagged_fields > std::u32::MAX as usize {
298                bail!(
299                    "Too many tagged fields to encode ({} fields)",
300                    num_tagged_fields
301                );
302            }
303            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
304
305            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
306        }
307        Ok(total_size)
308    }
309}
310
311#[cfg(feature = "client")]
312impl Decodable for LeaderIdAndEpoch {
313    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
314        if version < 0 || version > 11 {
315            bail!("specified version not supported by this message type");
316        }
317        let leader_id = if version >= 10 {
318            types::Int32.decode(buf)?
319        } else {
320            (-1).into()
321        };
322        let leader_epoch = if version >= 10 {
323            types::Int32.decode(buf)?
324        } else {
325            -1
326        };
327        let mut unknown_tagged_fields = BTreeMap::new();
328        if version >= 9 {
329            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
330            for _ in 0..num_tagged_fields {
331                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
332                let size: u32 = types::UnsignedVarInt.decode(buf)?;
333                let unknown_value = buf.try_get_bytes(size as usize)?;
334                unknown_tagged_fields.insert(tag as i32, unknown_value);
335            }
336        }
337        Ok(Self {
338            leader_id,
339            leader_epoch,
340            unknown_tagged_fields,
341        })
342    }
343}
344
345impl Default for LeaderIdAndEpoch {
346    fn default() -> Self {
347        Self {
348            leader_id: (-1).into(),
349            leader_epoch: -1,
350            unknown_tagged_fields: BTreeMap::new(),
351        }
352    }
353}
354
355impl Message for LeaderIdAndEpoch {
356    const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
357    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
358}
359
360/// Valid versions: 0-11
361#[non_exhaustive]
362#[derive(Debug, Clone, PartialEq)]
363pub struct NodeEndpoint {
364    /// The ID of the associated node.
365    ///
366    /// Supported API versions: 10-11
367    pub node_id: super::BrokerId,
368
369    /// The node's hostname.
370    ///
371    /// Supported API versions: 10-11
372    pub host: StrBytes,
373
374    /// The node's port.
375    ///
376    /// Supported API versions: 10-11
377    pub port: i32,
378
379    /// The rack of the node, or null if it has not been assigned to a rack.
380    ///
381    /// Supported API versions: 10-11
382    pub rack: Option<StrBytes>,
383
384    /// Other tagged fields
385    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
386}
387
388impl NodeEndpoint {
389    /// Sets `node_id` to the passed value.
390    ///
391    /// The ID of the associated node.
392    ///
393    /// Supported API versions: 10-11
394    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
395        self.node_id = value;
396        self
397    }
398    /// Sets `host` to the passed value.
399    ///
400    /// The node's hostname.
401    ///
402    /// Supported API versions: 10-11
403    pub fn with_host(mut self, value: StrBytes) -> Self {
404        self.host = value;
405        self
406    }
407    /// Sets `port` to the passed value.
408    ///
409    /// The node's port.
410    ///
411    /// Supported API versions: 10-11
412    pub fn with_port(mut self, value: i32) -> Self {
413        self.port = value;
414        self
415    }
416    /// Sets `rack` to the passed value.
417    ///
418    /// The rack of the node, or null if it has not been assigned to a rack.
419    ///
420    /// Supported API versions: 10-11
421    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
422        self.rack = value;
423        self
424    }
425    /// Sets unknown_tagged_fields to the passed value.
426    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
427        self.unknown_tagged_fields = value;
428        self
429    }
430    /// Inserts an entry into unknown_tagged_fields.
431    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
432        self.unknown_tagged_fields.insert(key, value);
433        self
434    }
435}
436
437#[cfg(feature = "broker")]
438impl Encodable for NodeEndpoint {
439    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
440        if version < 0 || version > 11 {
441            bail!("specified version not supported by this message type");
442        }
443        if version >= 10 {
444            types::Int32.encode(buf, &self.node_id)?;
445        } else {
446            if self.node_id != 0 {
447                bail!("A field is set that is not available on the selected protocol version");
448            }
449        }
450        if version >= 10 {
451            types::CompactString.encode(buf, &self.host)?;
452        } else {
453            if !self.host.is_empty() {
454                bail!("A field is set that is not available on the selected protocol version");
455            }
456        }
457        if version >= 10 {
458            types::Int32.encode(buf, &self.port)?;
459        } else {
460            if self.port != 0 {
461                bail!("A field is set that is not available on the selected protocol version");
462            }
463        }
464        if version >= 10 {
465            types::CompactString.encode(buf, &self.rack)?;
466        } else {
467            if !self.rack.is_none() {
468                bail!("A field is set that is not available on the selected protocol version");
469            }
470        }
471        if version >= 9 {
472            let num_tagged_fields = self.unknown_tagged_fields.len();
473            if num_tagged_fields > std::u32::MAX as usize {
474                bail!(
475                    "Too many tagged fields to encode ({} fields)",
476                    num_tagged_fields
477                );
478            }
479            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
480
481            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
482        }
483        Ok(())
484    }
485    fn compute_size(&self, version: i16) -> Result<usize> {
486        let mut total_size = 0;
487        if version >= 10 {
488            total_size += types::Int32.compute_size(&self.node_id)?;
489        } else {
490            if self.node_id != 0 {
491                bail!("A field is set that is not available on the selected protocol version");
492            }
493        }
494        if version >= 10 {
495            total_size += types::CompactString.compute_size(&self.host)?;
496        } else {
497            if !self.host.is_empty() {
498                bail!("A field is set that is not available on the selected protocol version");
499            }
500        }
501        if version >= 10 {
502            total_size += types::Int32.compute_size(&self.port)?;
503        } else {
504            if self.port != 0 {
505                bail!("A field is set that is not available on the selected protocol version");
506            }
507        }
508        if version >= 10 {
509            total_size += types::CompactString.compute_size(&self.rack)?;
510        } else {
511            if !self.rack.is_none() {
512                bail!("A field is set that is not available on the selected protocol version");
513            }
514        }
515        if version >= 9 {
516            let num_tagged_fields = self.unknown_tagged_fields.len();
517            if num_tagged_fields > std::u32::MAX as usize {
518                bail!(
519                    "Too many tagged fields to encode ({} fields)",
520                    num_tagged_fields
521                );
522            }
523            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
524
525            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
526        }
527        Ok(total_size)
528    }
529}
530
531#[cfg(feature = "client")]
532impl Decodable for NodeEndpoint {
533    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
534        if version < 0 || version > 11 {
535            bail!("specified version not supported by this message type");
536        }
537        let node_id = if version >= 10 {
538            types::Int32.decode(buf)?
539        } else {
540            (0).into()
541        };
542        let host = if version >= 10 {
543            types::CompactString.decode(buf)?
544        } else {
545            Default::default()
546        };
547        let port = if version >= 10 {
548            types::Int32.decode(buf)?
549        } else {
550            0
551        };
552        let rack = if version >= 10 {
553            types::CompactString.decode(buf)?
554        } else {
555            None
556        };
557        let mut unknown_tagged_fields = BTreeMap::new();
558        if version >= 9 {
559            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
560            for _ in 0..num_tagged_fields {
561                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
562                let size: u32 = types::UnsignedVarInt.decode(buf)?;
563                let unknown_value = buf.try_get_bytes(size as usize)?;
564                unknown_tagged_fields.insert(tag as i32, unknown_value);
565            }
566        }
567        Ok(Self {
568            node_id,
569            host,
570            port,
571            rack,
572            unknown_tagged_fields,
573        })
574    }
575}
576
577impl Default for NodeEndpoint {
578    fn default() -> Self {
579        Self {
580            node_id: (0).into(),
581            host: Default::default(),
582            port: 0,
583            rack: None,
584            unknown_tagged_fields: BTreeMap::new(),
585        }
586    }
587}
588
589impl Message for NodeEndpoint {
590    const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
591    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
592}
593
594/// Valid versions: 0-11
595#[non_exhaustive]
596#[derive(Debug, Clone, PartialEq)]
597pub struct PartitionProduceResponse {
598    /// The partition index.
599    ///
600    /// Supported API versions: 0-11
601    pub index: i32,
602
603    /// The error code, or 0 if there was no error.
604    ///
605    /// Supported API versions: 0-11
606    pub error_code: i16,
607
608    /// The base offset.
609    ///
610    /// Supported API versions: 0-11
611    pub base_offset: i64,
612
613    /// The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1.  If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended.
614    ///
615    /// Supported API versions: 2-11
616    pub log_append_time_ms: i64,
617
618    /// The log start offset.
619    ///
620    /// Supported API versions: 5-11
621    pub log_start_offset: i64,
622
623    /// The batch indices of records that caused the batch to be dropped
624    ///
625    /// Supported API versions: 8-11
626    pub record_errors: Vec<BatchIndexAndErrorMessage>,
627
628    /// The global error message summarizing the common root cause of the records that caused the batch to be dropped
629    ///
630    /// Supported API versions: 8-11
631    pub error_message: Option<StrBytes>,
632
633    ///
634    ///
635    /// Supported API versions: 10-11
636    pub current_leader: LeaderIdAndEpoch,
637
638    /// Other tagged fields
639    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
640}
641
642impl PartitionProduceResponse {
643    /// Sets `index` to the passed value.
644    ///
645    /// The partition index.
646    ///
647    /// Supported API versions: 0-11
648    pub fn with_index(mut self, value: i32) -> Self {
649        self.index = value;
650        self
651    }
652    /// Sets `error_code` to the passed value.
653    ///
654    /// The error code, or 0 if there was no error.
655    ///
656    /// Supported API versions: 0-11
657    pub fn with_error_code(mut self, value: i16) -> Self {
658        self.error_code = value;
659        self
660    }
661    /// Sets `base_offset` to the passed value.
662    ///
663    /// The base offset.
664    ///
665    /// Supported API versions: 0-11
666    pub fn with_base_offset(mut self, value: i64) -> Self {
667        self.base_offset = value;
668        self
669    }
670    /// Sets `log_append_time_ms` to the passed value.
671    ///
672    /// The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1.  If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended.
673    ///
674    /// Supported API versions: 2-11
675    pub fn with_log_append_time_ms(mut self, value: i64) -> Self {
676        self.log_append_time_ms = value;
677        self
678    }
679    /// Sets `log_start_offset` to the passed value.
680    ///
681    /// The log start offset.
682    ///
683    /// Supported API versions: 5-11
684    pub fn with_log_start_offset(mut self, value: i64) -> Self {
685        self.log_start_offset = value;
686        self
687    }
688    /// Sets `record_errors` to the passed value.
689    ///
690    /// The batch indices of records that caused the batch to be dropped
691    ///
692    /// Supported API versions: 8-11
693    pub fn with_record_errors(mut self, value: Vec<BatchIndexAndErrorMessage>) -> Self {
694        self.record_errors = value;
695        self
696    }
697    /// Sets `error_message` to the passed value.
698    ///
699    /// The global error message summarizing the common root cause of the records that caused the batch to be dropped
700    ///
701    /// Supported API versions: 8-11
702    pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
703        self.error_message = value;
704        self
705    }
706    /// Sets `current_leader` to the passed value.
707    ///
708    ///
709    ///
710    /// Supported API versions: 10-11
711    pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
712        self.current_leader = value;
713        self
714    }
715    /// Sets unknown_tagged_fields to the passed value.
716    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
717        self.unknown_tagged_fields = value;
718        self
719    }
720    /// Inserts an entry into unknown_tagged_fields.
721    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
722        self.unknown_tagged_fields.insert(key, value);
723        self
724    }
725}
726
727#[cfg(feature = "broker")]
728impl Encodable for PartitionProduceResponse {
729    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
730        if version < 0 || version > 11 {
731            bail!("specified version not supported by this message type");
732        }
733        types::Int32.encode(buf, &self.index)?;
734        types::Int16.encode(buf, &self.error_code)?;
735        types::Int64.encode(buf, &self.base_offset)?;
736        if version >= 2 {
737            types::Int64.encode(buf, &self.log_append_time_ms)?;
738        }
739        if version >= 5 {
740            types::Int64.encode(buf, &self.log_start_offset)?;
741        }
742        if version >= 8 {
743            if version >= 9 {
744                types::CompactArray(types::Struct { version }).encode(buf, &self.record_errors)?;
745            } else {
746                types::Array(types::Struct { version }).encode(buf, &self.record_errors)?;
747            }
748        }
749        if version >= 8 {
750            if version >= 9 {
751                types::CompactString.encode(buf, &self.error_message)?;
752            } else {
753                types::String.encode(buf, &self.error_message)?;
754            }
755        }
756        if version >= 9 {
757            let mut num_tagged_fields = self.unknown_tagged_fields.len();
758            if version >= 10 {
759                if &self.current_leader != &Default::default() {
760                    num_tagged_fields += 1;
761                }
762            }
763            if num_tagged_fields > std::u32::MAX as usize {
764                bail!(
765                    "Too many tagged fields to encode ({} fields)",
766                    num_tagged_fields
767                );
768            }
769            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
770            if version >= 10 {
771                if &self.current_leader != &Default::default() {
772                    let computed_size =
773                        types::Struct { version }.compute_size(&self.current_leader)?;
774                    if computed_size > std::u32::MAX as usize {
775                        bail!(
776                            "Tagged field is too large to encode ({} bytes)",
777                            computed_size
778                        );
779                    }
780                    types::UnsignedVarInt.encode(buf, 0)?;
781                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
782                    types::Struct { version }.encode(buf, &self.current_leader)?;
783                }
784            }
785            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
786        }
787        Ok(())
788    }
789    fn compute_size(&self, version: i16) -> Result<usize> {
790        let mut total_size = 0;
791        total_size += types::Int32.compute_size(&self.index)?;
792        total_size += types::Int16.compute_size(&self.error_code)?;
793        total_size += types::Int64.compute_size(&self.base_offset)?;
794        if version >= 2 {
795            total_size += types::Int64.compute_size(&self.log_append_time_ms)?;
796        }
797        if version >= 5 {
798            total_size += types::Int64.compute_size(&self.log_start_offset)?;
799        }
800        if version >= 8 {
801            if version >= 9 {
802                total_size += types::CompactArray(types::Struct { version })
803                    .compute_size(&self.record_errors)?;
804            } else {
805                total_size +=
806                    types::Array(types::Struct { version }).compute_size(&self.record_errors)?;
807            }
808        }
809        if version >= 8 {
810            if version >= 9 {
811                total_size += types::CompactString.compute_size(&self.error_message)?;
812            } else {
813                total_size += types::String.compute_size(&self.error_message)?;
814            }
815        }
816        if version >= 9 {
817            let mut num_tagged_fields = self.unknown_tagged_fields.len();
818            if version >= 10 {
819                if &self.current_leader != &Default::default() {
820                    num_tagged_fields += 1;
821                }
822            }
823            if num_tagged_fields > std::u32::MAX as usize {
824                bail!(
825                    "Too many tagged fields to encode ({} fields)",
826                    num_tagged_fields
827                );
828            }
829            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
830            if version >= 10 {
831                if &self.current_leader != &Default::default() {
832                    let computed_size =
833                        types::Struct { version }.compute_size(&self.current_leader)?;
834                    if computed_size > std::u32::MAX as usize {
835                        bail!(
836                            "Tagged field is too large to encode ({} bytes)",
837                            computed_size
838                        );
839                    }
840                    total_size += types::UnsignedVarInt.compute_size(0)?;
841                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
842                    total_size += computed_size;
843                }
844            }
845            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
846        }
847        Ok(total_size)
848    }
849}
850
851#[cfg(feature = "client")]
852impl Decodable for PartitionProduceResponse {
853    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
854        if version < 0 || version > 11 {
855            bail!("specified version not supported by this message type");
856        }
857        let index = types::Int32.decode(buf)?;
858        let error_code = types::Int16.decode(buf)?;
859        let base_offset = types::Int64.decode(buf)?;
860        let log_append_time_ms = if version >= 2 {
861            types::Int64.decode(buf)?
862        } else {
863            -1
864        };
865        let log_start_offset = if version >= 5 {
866            types::Int64.decode(buf)?
867        } else {
868            -1
869        };
870        let record_errors = if version >= 8 {
871            if version >= 9 {
872                types::CompactArray(types::Struct { version }).decode(buf)?
873            } else {
874                types::Array(types::Struct { version }).decode(buf)?
875            }
876        } else {
877            Default::default()
878        };
879        let error_message = if version >= 8 {
880            if version >= 9 {
881                types::CompactString.decode(buf)?
882            } else {
883                types::String.decode(buf)?
884            }
885        } else {
886            None
887        };
888        let mut current_leader = Default::default();
889        let mut unknown_tagged_fields = BTreeMap::new();
890        if version >= 9 {
891            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
892            for _ in 0..num_tagged_fields {
893                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
894                let size: u32 = types::UnsignedVarInt.decode(buf)?;
895                match tag {
896                    0 => {
897                        if version >= 10 {
898                            current_leader = types::Struct { version }.decode(buf)?;
899                        } else {
900                            bail!("Tag {} is not valid for version {}", tag, version);
901                        }
902                    }
903                    _ => {
904                        let unknown_value = buf.try_get_bytes(size as usize)?;
905                        unknown_tagged_fields.insert(tag as i32, unknown_value);
906                    }
907                }
908            }
909        }
910        Ok(Self {
911            index,
912            error_code,
913            base_offset,
914            log_append_time_ms,
915            log_start_offset,
916            record_errors,
917            error_message,
918            current_leader,
919            unknown_tagged_fields,
920        })
921    }
922}
923
924impl Default for PartitionProduceResponse {
925    fn default() -> Self {
926        Self {
927            index: 0,
928            error_code: 0,
929            base_offset: 0,
930            log_append_time_ms: -1,
931            log_start_offset: -1,
932            record_errors: Default::default(),
933            error_message: None,
934            current_leader: Default::default(),
935            unknown_tagged_fields: BTreeMap::new(),
936        }
937    }
938}
939
940impl Message for PartitionProduceResponse {
941    const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
942    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
943}
944
945/// Valid versions: 0-11
946#[non_exhaustive]
947#[derive(Debug, Clone, PartialEq)]
948pub struct ProduceResponse {
949    /// Each produce response
950    ///
951    /// Supported API versions: 0-11
952    pub responses: Vec<TopicProduceResponse>,
953
954    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
955    ///
956    /// Supported API versions: 1-11
957    pub throttle_time_ms: i32,
958
959    /// Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.
960    ///
961    /// Supported API versions: 10-11
962    pub node_endpoints: Vec<NodeEndpoint>,
963
964    /// Other tagged fields
965    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
966}
967
968impl ProduceResponse {
969    /// Sets `responses` to the passed value.
970    ///
971    /// Each produce response
972    ///
973    /// Supported API versions: 0-11
974    pub fn with_responses(mut self, value: Vec<TopicProduceResponse>) -> Self {
975        self.responses = value;
976        self
977    }
978    /// Sets `throttle_time_ms` to the passed value.
979    ///
980    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
981    ///
982    /// Supported API versions: 1-11
983    pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
984        self.throttle_time_ms = value;
985        self
986    }
987    /// Sets `node_endpoints` to the passed value.
988    ///
989    /// Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.
990    ///
991    /// Supported API versions: 10-11
992    pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
993        self.node_endpoints = value;
994        self
995    }
996    /// Sets unknown_tagged_fields to the passed value.
997    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
998        self.unknown_tagged_fields = value;
999        self
1000    }
1001    /// Inserts an entry into unknown_tagged_fields.
1002    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1003        self.unknown_tagged_fields.insert(key, value);
1004        self
1005    }
1006}
1007
1008#[cfg(feature = "broker")]
1009impl Encodable for ProduceResponse {
1010    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1011        if version < 0 || version > 11 {
1012            bail!("specified version not supported by this message type");
1013        }
1014        if version >= 9 {
1015            types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
1016        } else {
1017            types::Array(types::Struct { version }).encode(buf, &self.responses)?;
1018        }
1019        if version >= 1 {
1020            types::Int32.encode(buf, &self.throttle_time_ms)?;
1021        }
1022        if version >= 9 {
1023            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1024            if version >= 10 {
1025                if !self.node_endpoints.is_empty() {
1026                    num_tagged_fields += 1;
1027                }
1028            }
1029            if num_tagged_fields > std::u32::MAX as usize {
1030                bail!(
1031                    "Too many tagged fields to encode ({} fields)",
1032                    num_tagged_fields
1033                );
1034            }
1035            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1036            if version >= 10 {
1037                if !self.node_endpoints.is_empty() {
1038                    let computed_size = types::CompactArray(types::Struct { version })
1039                        .compute_size(&self.node_endpoints)?;
1040                    if computed_size > std::u32::MAX as usize {
1041                        bail!(
1042                            "Tagged field is too large to encode ({} bytes)",
1043                            computed_size
1044                        );
1045                    }
1046                    types::UnsignedVarInt.encode(buf, 0)?;
1047                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1048                    types::CompactArray(types::Struct { version })
1049                        .encode(buf, &self.node_endpoints)?;
1050                }
1051            }
1052            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
1053        }
1054        Ok(())
1055    }
1056    fn compute_size(&self, version: i16) -> Result<usize> {
1057        let mut total_size = 0;
1058        if version >= 9 {
1059            total_size +=
1060                types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
1061        } else {
1062            total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
1063        }
1064        if version >= 1 {
1065            total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
1066        }
1067        if version >= 9 {
1068            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1069            if version >= 10 {
1070                if !self.node_endpoints.is_empty() {
1071                    num_tagged_fields += 1;
1072                }
1073            }
1074            if num_tagged_fields > std::u32::MAX as usize {
1075                bail!(
1076                    "Too many tagged fields to encode ({} fields)",
1077                    num_tagged_fields
1078                );
1079            }
1080            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1081            if version >= 10 {
1082                if !self.node_endpoints.is_empty() {
1083                    let computed_size = types::CompactArray(types::Struct { version })
1084                        .compute_size(&self.node_endpoints)?;
1085                    if computed_size > std::u32::MAX as usize {
1086                        bail!(
1087                            "Tagged field is too large to encode ({} bytes)",
1088                            computed_size
1089                        );
1090                    }
1091                    total_size += types::UnsignedVarInt.compute_size(0)?;
1092                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1093                    total_size += computed_size;
1094                }
1095            }
1096            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1097        }
1098        Ok(total_size)
1099    }
1100}
1101
1102#[cfg(feature = "client")]
1103impl Decodable for ProduceResponse {
1104    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1105        if version < 0 || version > 11 {
1106            bail!("specified version not supported by this message type");
1107        }
1108        let responses = if version >= 9 {
1109            types::CompactArray(types::Struct { version }).decode(buf)?
1110        } else {
1111            types::Array(types::Struct { version }).decode(buf)?
1112        };
1113        let throttle_time_ms = if version >= 1 {
1114            types::Int32.decode(buf)?
1115        } else {
1116            0
1117        };
1118        let mut node_endpoints = Default::default();
1119        let mut unknown_tagged_fields = BTreeMap::new();
1120        if version >= 9 {
1121            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1122            for _ in 0..num_tagged_fields {
1123                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1124                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1125                match tag {
1126                    0 => {
1127                        if version >= 10 {
1128                            node_endpoints =
1129                                types::CompactArray(types::Struct { version }).decode(buf)?;
1130                        } else {
1131                            bail!("Tag {} is not valid for version {}", tag, version);
1132                        }
1133                    }
1134                    _ => {
1135                        let unknown_value = buf.try_get_bytes(size as usize)?;
1136                        unknown_tagged_fields.insert(tag as i32, unknown_value);
1137                    }
1138                }
1139            }
1140        }
1141        Ok(Self {
1142            responses,
1143            throttle_time_ms,
1144            node_endpoints,
1145            unknown_tagged_fields,
1146        })
1147    }
1148}
1149
1150impl Default for ProduceResponse {
1151    fn default() -> Self {
1152        Self {
1153            responses: Default::default(),
1154            throttle_time_ms: 0,
1155            node_endpoints: Default::default(),
1156            unknown_tagged_fields: BTreeMap::new(),
1157        }
1158    }
1159}
1160
1161impl Message for ProduceResponse {
1162    const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
1163    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1164}
1165
1166/// Valid versions: 0-11
1167#[non_exhaustive]
1168#[derive(Debug, Clone, PartialEq)]
1169pub struct TopicProduceResponse {
1170    /// The topic name
1171    ///
1172    /// Supported API versions: 0-11
1173    pub name: super::TopicName,
1174
1175    /// Each partition that we produced to within the topic.
1176    ///
1177    /// Supported API versions: 0-11
1178    pub partition_responses: Vec<PartitionProduceResponse>,
1179
1180    /// Other tagged fields
1181    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1182}
1183
1184impl TopicProduceResponse {
1185    /// Sets `name` to the passed value.
1186    ///
1187    /// The topic name
1188    ///
1189    /// Supported API versions: 0-11
1190    pub fn with_name(mut self, value: super::TopicName) -> Self {
1191        self.name = value;
1192        self
1193    }
1194    /// Sets `partition_responses` to the passed value.
1195    ///
1196    /// Each partition that we produced to within the topic.
1197    ///
1198    /// Supported API versions: 0-11
1199    pub fn with_partition_responses(mut self, value: Vec<PartitionProduceResponse>) -> Self {
1200        self.partition_responses = value;
1201        self
1202    }
1203    /// Sets unknown_tagged_fields to the passed value.
1204    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1205        self.unknown_tagged_fields = value;
1206        self
1207    }
1208    /// Inserts an entry into unknown_tagged_fields.
1209    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1210        self.unknown_tagged_fields.insert(key, value);
1211        self
1212    }
1213}
1214
1215#[cfg(feature = "broker")]
1216impl Encodable for TopicProduceResponse {
1217    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1218        if version < 0 || version > 11 {
1219            bail!("specified version not supported by this message type");
1220        }
1221        if version >= 9 {
1222            types::CompactString.encode(buf, &self.name)?;
1223        } else {
1224            types::String.encode(buf, &self.name)?;
1225        }
1226        if version >= 9 {
1227            types::CompactArray(types::Struct { version })
1228                .encode(buf, &self.partition_responses)?;
1229        } else {
1230            types::Array(types::Struct { version }).encode(buf, &self.partition_responses)?;
1231        }
1232        if version >= 9 {
1233            let num_tagged_fields = self.unknown_tagged_fields.len();
1234            if num_tagged_fields > std::u32::MAX as usize {
1235                bail!(
1236                    "Too many tagged fields to encode ({} fields)",
1237                    num_tagged_fields
1238                );
1239            }
1240            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1241
1242            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1243        }
1244        Ok(())
1245    }
1246    fn compute_size(&self, version: i16) -> Result<usize> {
1247        let mut total_size = 0;
1248        if version >= 9 {
1249            total_size += types::CompactString.compute_size(&self.name)?;
1250        } else {
1251            total_size += types::String.compute_size(&self.name)?;
1252        }
1253        if version >= 9 {
1254            total_size += types::CompactArray(types::Struct { version })
1255                .compute_size(&self.partition_responses)?;
1256        } else {
1257            total_size +=
1258                types::Array(types::Struct { version }).compute_size(&self.partition_responses)?;
1259        }
1260        if version >= 9 {
1261            let num_tagged_fields = self.unknown_tagged_fields.len();
1262            if num_tagged_fields > std::u32::MAX as usize {
1263                bail!(
1264                    "Too many tagged fields to encode ({} fields)",
1265                    num_tagged_fields
1266                );
1267            }
1268            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1269
1270            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1271        }
1272        Ok(total_size)
1273    }
1274}
1275
1276#[cfg(feature = "client")]
1277impl Decodable for TopicProduceResponse {
1278    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1279        if version < 0 || version > 11 {
1280            bail!("specified version not supported by this message type");
1281        }
1282        let name = if version >= 9 {
1283            types::CompactString.decode(buf)?
1284        } else {
1285            types::String.decode(buf)?
1286        };
1287        let partition_responses = if version >= 9 {
1288            types::CompactArray(types::Struct { version }).decode(buf)?
1289        } else {
1290            types::Array(types::Struct { version }).decode(buf)?
1291        };
1292        let mut unknown_tagged_fields = BTreeMap::new();
1293        if version >= 9 {
1294            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1295            for _ in 0..num_tagged_fields {
1296                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1297                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1298                let unknown_value = buf.try_get_bytes(size as usize)?;
1299                unknown_tagged_fields.insert(tag as i32, unknown_value);
1300            }
1301        }
1302        Ok(Self {
1303            name,
1304            partition_responses,
1305            unknown_tagged_fields,
1306        })
1307    }
1308}
1309
1310impl Default for TopicProduceResponse {
1311    fn default() -> Self {
1312        Self {
1313            name: Default::default(),
1314            partition_responses: Default::default(),
1315            unknown_tagged_fields: BTreeMap::new(),
1316        }
1317    }
1318}
1319
1320impl Message for TopicProduceResponse {
1321    const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
1322    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1323}
1324
1325impl HeaderVersion for ProduceResponse {
1326    fn header_version(version: i16) -> i16 {
1327        if version >= 9 {
1328            1
1329        } else {
1330            0
1331        }
1332    }
1333}