kafka_protocol/messages/
produce_response.rs

1//! ProduceResponse
2//!
3//! See the schema for this message [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ProduceResponse.json).
4// WARNING: the items of this module are generated and should not be edited directly
5#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15    buf::{ByteBuf, ByteBufMut},
16    compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17    Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20/// Valid versions: 3-13
21#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct BatchIndexAndErrorMessage {
24    /// The batch index of the record that caused the batch to be dropped.
25    ///
26    /// Supported API versions: 8-13
27    pub batch_index: i32,
28
29    /// The error message of the record that caused the batch to be dropped.
30    ///
31    /// Supported API versions: 8-13
32    pub batch_index_error_message: Option<StrBytes>,
33
34    /// Other tagged fields
35    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl BatchIndexAndErrorMessage {
39    /// Sets `batch_index` to the passed value.
40    ///
41    /// The batch index of the record that caused the batch to be dropped.
42    ///
43    /// Supported API versions: 8-13
44    pub fn with_batch_index(mut self, value: i32) -> Self {
45        self.batch_index = value;
46        self
47    }
48    /// Sets `batch_index_error_message` to the passed value.
49    ///
50    /// The error message of the record that caused the batch to be dropped.
51    ///
52    /// Supported API versions: 8-13
53    pub fn with_batch_index_error_message(mut self, value: Option<StrBytes>) -> Self {
54        self.batch_index_error_message = value;
55        self
56    }
57    /// Sets unknown_tagged_fields to the passed value.
58    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59        self.unknown_tagged_fields = value;
60        self
61    }
62    /// Inserts an entry into unknown_tagged_fields.
63    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64        self.unknown_tagged_fields.insert(key, value);
65        self
66    }
67}
68
69#[cfg(feature = "broker")]
70impl Encodable for BatchIndexAndErrorMessage {
71    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72        if version < 3 || version > 13 {
73            bail!("specified version not supported by this message type");
74        }
75        if version >= 8 {
76            types::Int32.encode(buf, &self.batch_index)?;
77        } else {
78            if self.batch_index != 0 {
79                bail!("A field is set that is not available on the selected protocol version");
80            }
81        }
82        if version >= 8 {
83            if version >= 9 {
84                types::CompactString.encode(buf, &self.batch_index_error_message)?;
85            } else {
86                types::String.encode(buf, &self.batch_index_error_message)?;
87            }
88        } else {
89            if !self.batch_index_error_message.is_none() {
90                bail!("A field is set that is not available on the selected protocol version");
91            }
92        }
93        if version >= 9 {
94            let num_tagged_fields = self.unknown_tagged_fields.len();
95            if num_tagged_fields > std::u32::MAX as usize {
96                bail!(
97                    "Too many tagged fields to encode ({} fields)",
98                    num_tagged_fields
99                );
100            }
101            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
102
103            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
104        }
105        Ok(())
106    }
107    fn compute_size(&self, version: i16) -> Result<usize> {
108        let mut total_size = 0;
109        if version >= 8 {
110            total_size += types::Int32.compute_size(&self.batch_index)?;
111        } else {
112            if self.batch_index != 0 {
113                bail!("A field is set that is not available on the selected protocol version");
114            }
115        }
116        if version >= 8 {
117            if version >= 9 {
118                total_size += types::CompactString.compute_size(&self.batch_index_error_message)?;
119            } else {
120                total_size += types::String.compute_size(&self.batch_index_error_message)?;
121            }
122        } else {
123            if !self.batch_index_error_message.is_none() {
124                bail!("A field is set that is not available on the selected protocol version");
125            }
126        }
127        if version >= 9 {
128            let num_tagged_fields = self.unknown_tagged_fields.len();
129            if num_tagged_fields > std::u32::MAX as usize {
130                bail!(
131                    "Too many tagged fields to encode ({} fields)",
132                    num_tagged_fields
133                );
134            }
135            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
136
137            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
138        }
139        Ok(total_size)
140    }
141}
142
143#[cfg(feature = "client")]
144impl Decodable for BatchIndexAndErrorMessage {
145    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
146        if version < 3 || version > 13 {
147            bail!("specified version not supported by this message type");
148        }
149        let batch_index = if version >= 8 {
150            types::Int32.decode(buf)?
151        } else {
152            0
153        };
154        let batch_index_error_message = if version >= 8 {
155            if version >= 9 {
156                types::CompactString.decode(buf)?
157            } else {
158                types::String.decode(buf)?
159            }
160        } else {
161            None
162        };
163        let mut unknown_tagged_fields = BTreeMap::new();
164        if version >= 9 {
165            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
166            for _ in 0..num_tagged_fields {
167                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
168                let size: u32 = types::UnsignedVarInt.decode(buf)?;
169                let unknown_value = buf.try_get_bytes(size as usize)?;
170                unknown_tagged_fields.insert(tag as i32, unknown_value);
171            }
172        }
173        Ok(Self {
174            batch_index,
175            batch_index_error_message,
176            unknown_tagged_fields,
177        })
178    }
179}
180
181impl Default for BatchIndexAndErrorMessage {
182    fn default() -> Self {
183        Self {
184            batch_index: 0,
185            batch_index_error_message: None,
186            unknown_tagged_fields: BTreeMap::new(),
187        }
188    }
189}
190
191impl Message for BatchIndexAndErrorMessage {
192    const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
193    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
194}
195
196/// Valid versions: 3-13
197#[non_exhaustive]
198#[derive(Debug, Clone, PartialEq)]
199pub struct LeaderIdAndEpoch {
200    /// The ID of the current leader or -1 if the leader is unknown.
201    ///
202    /// Supported API versions: 10-13
203    pub leader_id: super::BrokerId,
204
205    /// The latest known leader epoch.
206    ///
207    /// Supported API versions: 10-13
208    pub leader_epoch: i32,
209
210    /// Other tagged fields
211    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
212}
213
214impl LeaderIdAndEpoch {
215    /// Sets `leader_id` to the passed value.
216    ///
217    /// The ID of the current leader or -1 if the leader is unknown.
218    ///
219    /// Supported API versions: 10-13
220    pub fn with_leader_id(mut self, value: super::BrokerId) -> Self {
221        self.leader_id = value;
222        self
223    }
224    /// Sets `leader_epoch` to the passed value.
225    ///
226    /// The latest known leader epoch.
227    ///
228    /// Supported API versions: 10-13
229    pub fn with_leader_epoch(mut self, value: i32) -> Self {
230        self.leader_epoch = value;
231        self
232    }
233    /// Sets unknown_tagged_fields to the passed value.
234    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
235        self.unknown_tagged_fields = value;
236        self
237    }
238    /// Inserts an entry into unknown_tagged_fields.
239    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
240        self.unknown_tagged_fields.insert(key, value);
241        self
242    }
243}
244
245#[cfg(feature = "broker")]
246impl Encodable for LeaderIdAndEpoch {
247    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
248        if version < 3 || version > 13 {
249            bail!("specified version not supported by this message type");
250        }
251        if version >= 10 {
252            types::Int32.encode(buf, &self.leader_id)?;
253        } else {
254            if self.leader_id != -1 {
255                bail!("A field is set that is not available on the selected protocol version");
256            }
257        }
258        if version >= 10 {
259            types::Int32.encode(buf, &self.leader_epoch)?;
260        } else {
261            if self.leader_epoch != -1 {
262                bail!("A field is set that is not available on the selected protocol version");
263            }
264        }
265        if version >= 9 {
266            let num_tagged_fields = self.unknown_tagged_fields.len();
267            if num_tagged_fields > std::u32::MAX as usize {
268                bail!(
269                    "Too many tagged fields to encode ({} fields)",
270                    num_tagged_fields
271                );
272            }
273            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
274
275            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
276        }
277        Ok(())
278    }
279    fn compute_size(&self, version: i16) -> Result<usize> {
280        let mut total_size = 0;
281        if version >= 10 {
282            total_size += types::Int32.compute_size(&self.leader_id)?;
283        } else {
284            if self.leader_id != -1 {
285                bail!("A field is set that is not available on the selected protocol version");
286            }
287        }
288        if version >= 10 {
289            total_size += types::Int32.compute_size(&self.leader_epoch)?;
290        } else {
291            if self.leader_epoch != -1 {
292                bail!("A field is set that is not available on the selected protocol version");
293            }
294        }
295        if version >= 9 {
296            let num_tagged_fields = self.unknown_tagged_fields.len();
297            if num_tagged_fields > std::u32::MAX as usize {
298                bail!(
299                    "Too many tagged fields to encode ({} fields)",
300                    num_tagged_fields
301                );
302            }
303            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
304
305            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
306        }
307        Ok(total_size)
308    }
309}
310
311#[cfg(feature = "client")]
312impl Decodable for LeaderIdAndEpoch {
313    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
314        if version < 3 || version > 13 {
315            bail!("specified version not supported by this message type");
316        }
317        let leader_id = if version >= 10 {
318            types::Int32.decode(buf)?
319        } else {
320            (-1).into()
321        };
322        let leader_epoch = if version >= 10 {
323            types::Int32.decode(buf)?
324        } else {
325            -1
326        };
327        let mut unknown_tagged_fields = BTreeMap::new();
328        if version >= 9 {
329            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
330            for _ in 0..num_tagged_fields {
331                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
332                let size: u32 = types::UnsignedVarInt.decode(buf)?;
333                let unknown_value = buf.try_get_bytes(size as usize)?;
334                unknown_tagged_fields.insert(tag as i32, unknown_value);
335            }
336        }
337        Ok(Self {
338            leader_id,
339            leader_epoch,
340            unknown_tagged_fields,
341        })
342    }
343}
344
345impl Default for LeaderIdAndEpoch {
346    fn default() -> Self {
347        Self {
348            leader_id: (-1).into(),
349            leader_epoch: -1,
350            unknown_tagged_fields: BTreeMap::new(),
351        }
352    }
353}
354
355impl Message for LeaderIdAndEpoch {
356    const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
357    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
358}
359
360/// Valid versions: 3-13
361#[non_exhaustive]
362#[derive(Debug, Clone, PartialEq)]
363pub struct NodeEndpoint {
364    /// The ID of the associated node.
365    ///
366    /// Supported API versions: 10-13
367    pub node_id: super::BrokerId,
368
369    /// The node's hostname.
370    ///
371    /// Supported API versions: 10-13
372    pub host: StrBytes,
373
374    /// The node's port.
375    ///
376    /// Supported API versions: 10-13
377    pub port: i32,
378
379    /// The rack of the node, or null if it has not been assigned to a rack.
380    ///
381    /// Supported API versions: 10-13
382    pub rack: Option<StrBytes>,
383
384    /// Other tagged fields
385    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
386}
387
388impl NodeEndpoint {
389    /// Sets `node_id` to the passed value.
390    ///
391    /// The ID of the associated node.
392    ///
393    /// Supported API versions: 10-13
394    pub fn with_node_id(mut self, value: super::BrokerId) -> Self {
395        self.node_id = value;
396        self
397    }
398    /// Sets `host` to the passed value.
399    ///
400    /// The node's hostname.
401    ///
402    /// Supported API versions: 10-13
403    pub fn with_host(mut self, value: StrBytes) -> Self {
404        self.host = value;
405        self
406    }
407    /// Sets `port` to the passed value.
408    ///
409    /// The node's port.
410    ///
411    /// Supported API versions: 10-13
412    pub fn with_port(mut self, value: i32) -> Self {
413        self.port = value;
414        self
415    }
416    /// Sets `rack` to the passed value.
417    ///
418    /// The rack of the node, or null if it has not been assigned to a rack.
419    ///
420    /// Supported API versions: 10-13
421    pub fn with_rack(mut self, value: Option<StrBytes>) -> Self {
422        self.rack = value;
423        self
424    }
425    /// Sets unknown_tagged_fields to the passed value.
426    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
427        self.unknown_tagged_fields = value;
428        self
429    }
430    /// Inserts an entry into unknown_tagged_fields.
431    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
432        self.unknown_tagged_fields.insert(key, value);
433        self
434    }
435}
436
437#[cfg(feature = "broker")]
438impl Encodable for NodeEndpoint {
439    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
440        if version < 3 || version > 13 {
441            bail!("specified version not supported by this message type");
442        }
443        if version >= 10 {
444            types::Int32.encode(buf, &self.node_id)?;
445        } else {
446            if self.node_id != 0 {
447                bail!("A field is set that is not available on the selected protocol version");
448            }
449        }
450        if version >= 10 {
451            types::CompactString.encode(buf, &self.host)?;
452        } else {
453            if !self.host.is_empty() {
454                bail!("A field is set that is not available on the selected protocol version");
455            }
456        }
457        if version >= 10 {
458            types::Int32.encode(buf, &self.port)?;
459        } else {
460            if self.port != 0 {
461                bail!("A field is set that is not available on the selected protocol version");
462            }
463        }
464        if version >= 10 {
465            types::CompactString.encode(buf, &self.rack)?;
466        } else {
467            if !self.rack.is_none() {
468                bail!("A field is set that is not available on the selected protocol version");
469            }
470        }
471        if version >= 9 {
472            let num_tagged_fields = self.unknown_tagged_fields.len();
473            if num_tagged_fields > std::u32::MAX as usize {
474                bail!(
475                    "Too many tagged fields to encode ({} fields)",
476                    num_tagged_fields
477                );
478            }
479            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
480
481            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
482        }
483        Ok(())
484    }
485    fn compute_size(&self, version: i16) -> Result<usize> {
486        let mut total_size = 0;
487        if version >= 10 {
488            total_size += types::Int32.compute_size(&self.node_id)?;
489        } else {
490            if self.node_id != 0 {
491                bail!("A field is set that is not available on the selected protocol version");
492            }
493        }
494        if version >= 10 {
495            total_size += types::CompactString.compute_size(&self.host)?;
496        } else {
497            if !self.host.is_empty() {
498                bail!("A field is set that is not available on the selected protocol version");
499            }
500        }
501        if version >= 10 {
502            total_size += types::Int32.compute_size(&self.port)?;
503        } else {
504            if self.port != 0 {
505                bail!("A field is set that is not available on the selected protocol version");
506            }
507        }
508        if version >= 10 {
509            total_size += types::CompactString.compute_size(&self.rack)?;
510        } else {
511            if !self.rack.is_none() {
512                bail!("A field is set that is not available on the selected protocol version");
513            }
514        }
515        if version >= 9 {
516            let num_tagged_fields = self.unknown_tagged_fields.len();
517            if num_tagged_fields > std::u32::MAX as usize {
518                bail!(
519                    "Too many tagged fields to encode ({} fields)",
520                    num_tagged_fields
521                );
522            }
523            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
524
525            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
526        }
527        Ok(total_size)
528    }
529}
530
531#[cfg(feature = "client")]
532impl Decodable for NodeEndpoint {
533    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
534        if version < 3 || version > 13 {
535            bail!("specified version not supported by this message type");
536        }
537        let node_id = if version >= 10 {
538            types::Int32.decode(buf)?
539        } else {
540            (0).into()
541        };
542        let host = if version >= 10 {
543            types::CompactString.decode(buf)?
544        } else {
545            Default::default()
546        };
547        let port = if version >= 10 {
548            types::Int32.decode(buf)?
549        } else {
550            0
551        };
552        let rack = if version >= 10 {
553            types::CompactString.decode(buf)?
554        } else {
555            None
556        };
557        let mut unknown_tagged_fields = BTreeMap::new();
558        if version >= 9 {
559            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
560            for _ in 0..num_tagged_fields {
561                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
562                let size: u32 = types::UnsignedVarInt.decode(buf)?;
563                let unknown_value = buf.try_get_bytes(size as usize)?;
564                unknown_tagged_fields.insert(tag as i32, unknown_value);
565            }
566        }
567        Ok(Self {
568            node_id,
569            host,
570            port,
571            rack,
572            unknown_tagged_fields,
573        })
574    }
575}
576
577impl Default for NodeEndpoint {
578    fn default() -> Self {
579        Self {
580            node_id: (0).into(),
581            host: Default::default(),
582            port: 0,
583            rack: None,
584            unknown_tagged_fields: BTreeMap::new(),
585        }
586    }
587}
588
589impl Message for NodeEndpoint {
590    const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
591    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
592}
593
594/// Valid versions: 3-13
595#[non_exhaustive]
596#[derive(Debug, Clone, PartialEq)]
597pub struct PartitionProduceResponse {
598    /// The partition index.
599    ///
600    /// Supported API versions: 3-13
601    pub index: i32,
602
603    /// The error code, or 0 if there was no error.
604    ///
605    /// Supported API versions: 3-13
606    pub error_code: i16,
607
608    /// The base offset.
609    ///
610    /// Supported API versions: 3-13
611    pub base_offset: i64,
612
613    /// The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1.  If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended.
614    ///
615    /// Supported API versions: 3-13
616    pub log_append_time_ms: i64,
617
618    /// The log start offset.
619    ///
620    /// Supported API versions: 5-13
621    pub log_start_offset: i64,
622
623    /// The batch indices of records that caused the batch to be dropped.
624    ///
625    /// Supported API versions: 8-13
626    pub record_errors: Vec<BatchIndexAndErrorMessage>,
627
628    /// The global error message summarizing the common root cause of the records that caused the batch to be dropped.
629    ///
630    /// Supported API versions: 8-13
631    pub error_message: Option<StrBytes>,
632
633    /// The leader broker that the producer should use for future requests.
634    ///
635    /// Supported API versions: 10-13
636    pub current_leader: LeaderIdAndEpoch,
637
638    /// Other tagged fields
639    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
640}
641
642impl PartitionProduceResponse {
643    /// Sets `index` to the passed value.
644    ///
645    /// The partition index.
646    ///
647    /// Supported API versions: 3-13
648    pub fn with_index(mut self, value: i32) -> Self {
649        self.index = value;
650        self
651    }
652    /// Sets `error_code` to the passed value.
653    ///
654    /// The error code, or 0 if there was no error.
655    ///
656    /// Supported API versions: 3-13
657    pub fn with_error_code(mut self, value: i16) -> Self {
658        self.error_code = value;
659        self
660    }
661    /// Sets `base_offset` to the passed value.
662    ///
663    /// The base offset.
664    ///
665    /// Supported API versions: 3-13
666    pub fn with_base_offset(mut self, value: i64) -> Self {
667        self.base_offset = value;
668        self
669    }
670    /// Sets `log_append_time_ms` to the passed value.
671    ///
672    /// The timestamp returned by broker after appending the messages. If CreateTime is used for the topic, the timestamp will be -1.  If LogAppendTime is used for the topic, the timestamp will be the broker local time when the messages are appended.
673    ///
674    /// Supported API versions: 3-13
675    pub fn with_log_append_time_ms(mut self, value: i64) -> Self {
676        self.log_append_time_ms = value;
677        self
678    }
679    /// Sets `log_start_offset` to the passed value.
680    ///
681    /// The log start offset.
682    ///
683    /// Supported API versions: 5-13
684    pub fn with_log_start_offset(mut self, value: i64) -> Self {
685        self.log_start_offset = value;
686        self
687    }
688    /// Sets `record_errors` to the passed value.
689    ///
690    /// The batch indices of records that caused the batch to be dropped.
691    ///
692    /// Supported API versions: 8-13
693    pub fn with_record_errors(mut self, value: Vec<BatchIndexAndErrorMessage>) -> Self {
694        self.record_errors = value;
695        self
696    }
697    /// Sets `error_message` to the passed value.
698    ///
699    /// The global error message summarizing the common root cause of the records that caused the batch to be dropped.
700    ///
701    /// Supported API versions: 8-13
702    pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
703        self.error_message = value;
704        self
705    }
706    /// Sets `current_leader` to the passed value.
707    ///
708    /// The leader broker that the producer should use for future requests.
709    ///
710    /// Supported API versions: 10-13
711    pub fn with_current_leader(mut self, value: LeaderIdAndEpoch) -> Self {
712        self.current_leader = value;
713        self
714    }
715    /// Sets unknown_tagged_fields to the passed value.
716    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
717        self.unknown_tagged_fields = value;
718        self
719    }
720    /// Inserts an entry into unknown_tagged_fields.
721    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
722        self.unknown_tagged_fields.insert(key, value);
723        self
724    }
725}
726
727#[cfg(feature = "broker")]
728impl Encodable for PartitionProduceResponse {
729    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
730        if version < 3 || version > 13 {
731            bail!("specified version not supported by this message type");
732        }
733        types::Int32.encode(buf, &self.index)?;
734        types::Int16.encode(buf, &self.error_code)?;
735        types::Int64.encode(buf, &self.base_offset)?;
736        types::Int64.encode(buf, &self.log_append_time_ms)?;
737        if version >= 5 {
738            types::Int64.encode(buf, &self.log_start_offset)?;
739        }
740        if version >= 8 {
741            if version >= 9 {
742                types::CompactArray(types::Struct { version }).encode(buf, &self.record_errors)?;
743            } else {
744                types::Array(types::Struct { version }).encode(buf, &self.record_errors)?;
745            }
746        }
747        if version >= 8 {
748            if version >= 9 {
749                types::CompactString.encode(buf, &self.error_message)?;
750            } else {
751                types::String.encode(buf, &self.error_message)?;
752            }
753        }
754        if version >= 9 {
755            let mut num_tagged_fields = self.unknown_tagged_fields.len();
756            if version >= 10 {
757                if &self.current_leader != &Default::default() {
758                    num_tagged_fields += 1;
759                }
760            }
761            if num_tagged_fields > std::u32::MAX as usize {
762                bail!(
763                    "Too many tagged fields to encode ({} fields)",
764                    num_tagged_fields
765                );
766            }
767            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
768            if version >= 10 {
769                if &self.current_leader != &Default::default() {
770                    let computed_size =
771                        types::Struct { version }.compute_size(&self.current_leader)?;
772                    if computed_size > std::u32::MAX as usize {
773                        bail!(
774                            "Tagged field is too large to encode ({} bytes)",
775                            computed_size
776                        );
777                    }
778                    types::UnsignedVarInt.encode(buf, 0)?;
779                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
780                    types::Struct { version }.encode(buf, &self.current_leader)?;
781                }
782            }
783            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
784        }
785        Ok(())
786    }
787    fn compute_size(&self, version: i16) -> Result<usize> {
788        let mut total_size = 0;
789        total_size += types::Int32.compute_size(&self.index)?;
790        total_size += types::Int16.compute_size(&self.error_code)?;
791        total_size += types::Int64.compute_size(&self.base_offset)?;
792        total_size += types::Int64.compute_size(&self.log_append_time_ms)?;
793        if version >= 5 {
794            total_size += types::Int64.compute_size(&self.log_start_offset)?;
795        }
796        if version >= 8 {
797            if version >= 9 {
798                total_size += types::CompactArray(types::Struct { version })
799                    .compute_size(&self.record_errors)?;
800            } else {
801                total_size +=
802                    types::Array(types::Struct { version }).compute_size(&self.record_errors)?;
803            }
804        }
805        if version >= 8 {
806            if version >= 9 {
807                total_size += types::CompactString.compute_size(&self.error_message)?;
808            } else {
809                total_size += types::String.compute_size(&self.error_message)?;
810            }
811        }
812        if version >= 9 {
813            let mut num_tagged_fields = self.unknown_tagged_fields.len();
814            if version >= 10 {
815                if &self.current_leader != &Default::default() {
816                    num_tagged_fields += 1;
817                }
818            }
819            if num_tagged_fields > std::u32::MAX as usize {
820                bail!(
821                    "Too many tagged fields to encode ({} fields)",
822                    num_tagged_fields
823                );
824            }
825            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
826            if version >= 10 {
827                if &self.current_leader != &Default::default() {
828                    let computed_size =
829                        types::Struct { version }.compute_size(&self.current_leader)?;
830                    if computed_size > std::u32::MAX as usize {
831                        bail!(
832                            "Tagged field is too large to encode ({} bytes)",
833                            computed_size
834                        );
835                    }
836                    total_size += types::UnsignedVarInt.compute_size(0)?;
837                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
838                    total_size += computed_size;
839                }
840            }
841            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
842        }
843        Ok(total_size)
844    }
845}
846
847#[cfg(feature = "client")]
848impl Decodable for PartitionProduceResponse {
849    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
850        if version < 3 || version > 13 {
851            bail!("specified version not supported by this message type");
852        }
853        let index = types::Int32.decode(buf)?;
854        let error_code = types::Int16.decode(buf)?;
855        let base_offset = types::Int64.decode(buf)?;
856        let log_append_time_ms = types::Int64.decode(buf)?;
857        let log_start_offset = if version >= 5 {
858            types::Int64.decode(buf)?
859        } else {
860            -1
861        };
862        let record_errors = if version >= 8 {
863            if version >= 9 {
864                types::CompactArray(types::Struct { version }).decode(buf)?
865            } else {
866                types::Array(types::Struct { version }).decode(buf)?
867            }
868        } else {
869            Default::default()
870        };
871        let error_message = if version >= 8 {
872            if version >= 9 {
873                types::CompactString.decode(buf)?
874            } else {
875                types::String.decode(buf)?
876            }
877        } else {
878            None
879        };
880        let mut current_leader = Default::default();
881        let mut unknown_tagged_fields = BTreeMap::new();
882        if version >= 9 {
883            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
884            for _ in 0..num_tagged_fields {
885                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
886                let size: u32 = types::UnsignedVarInt.decode(buf)?;
887                match tag {
888                    0 => {
889                        if version >= 10 {
890                            current_leader = types::Struct { version }.decode(buf)?;
891                        } else {
892                            bail!("Tag {} is not valid for version {}", tag, version);
893                        }
894                    }
895                    _ => {
896                        let unknown_value = buf.try_get_bytes(size as usize)?;
897                        unknown_tagged_fields.insert(tag as i32, unknown_value);
898                    }
899                }
900            }
901        }
902        Ok(Self {
903            index,
904            error_code,
905            base_offset,
906            log_append_time_ms,
907            log_start_offset,
908            record_errors,
909            error_message,
910            current_leader,
911            unknown_tagged_fields,
912        })
913    }
914}
915
916impl Default for PartitionProduceResponse {
917    fn default() -> Self {
918        Self {
919            index: 0,
920            error_code: 0,
921            base_offset: 0,
922            log_append_time_ms: -1,
923            log_start_offset: -1,
924            record_errors: Default::default(),
925            error_message: None,
926            current_leader: Default::default(),
927            unknown_tagged_fields: BTreeMap::new(),
928        }
929    }
930}
931
932impl Message for PartitionProduceResponse {
933    const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
934    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
935}
936
937/// Valid versions: 3-13
938#[non_exhaustive]
939#[derive(Debug, Clone, PartialEq)]
940pub struct ProduceResponse {
941    /// Each produce response.
942    ///
943    /// Supported API versions: 3-13
944    pub responses: Vec<TopicProduceResponse>,
945
946    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
947    ///
948    /// Supported API versions: 3-13
949    pub throttle_time_ms: i32,
950
951    /// Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.
952    ///
953    /// Supported API versions: 10-13
954    pub node_endpoints: Vec<NodeEndpoint>,
955
956    /// Other tagged fields
957    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
958}
959
960impl ProduceResponse {
961    /// Sets `responses` to the passed value.
962    ///
963    /// Each produce response.
964    ///
965    /// Supported API versions: 3-13
966    pub fn with_responses(mut self, value: Vec<TopicProduceResponse>) -> Self {
967        self.responses = value;
968        self
969    }
970    /// Sets `throttle_time_ms` to the passed value.
971    ///
972    /// The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
973    ///
974    /// Supported API versions: 3-13
975    pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
976        self.throttle_time_ms = value;
977        self
978    }
979    /// Sets `node_endpoints` to the passed value.
980    ///
981    /// Endpoints for all current-leaders enumerated in PartitionProduceResponses, with errors NOT_LEADER_OR_FOLLOWER.
982    ///
983    /// Supported API versions: 10-13
984    pub fn with_node_endpoints(mut self, value: Vec<NodeEndpoint>) -> Self {
985        self.node_endpoints = value;
986        self
987    }
988    /// Sets unknown_tagged_fields to the passed value.
989    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
990        self.unknown_tagged_fields = value;
991        self
992    }
993    /// Inserts an entry into unknown_tagged_fields.
994    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
995        self.unknown_tagged_fields.insert(key, value);
996        self
997    }
998}
999
1000#[cfg(feature = "broker")]
1001impl Encodable for ProduceResponse {
1002    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1003        if version < 3 || version > 13 {
1004            bail!("specified version not supported by this message type");
1005        }
1006        if version >= 9 {
1007            types::CompactArray(types::Struct { version }).encode(buf, &self.responses)?;
1008        } else {
1009            types::Array(types::Struct { version }).encode(buf, &self.responses)?;
1010        }
1011        types::Int32.encode(buf, &self.throttle_time_ms)?;
1012        if version >= 9 {
1013            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1014            if version >= 10 {
1015                if !self.node_endpoints.is_empty() {
1016                    num_tagged_fields += 1;
1017                }
1018            }
1019            if num_tagged_fields > std::u32::MAX as usize {
1020                bail!(
1021                    "Too many tagged fields to encode ({} fields)",
1022                    num_tagged_fields
1023                );
1024            }
1025            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1026            if version >= 10 {
1027                if !self.node_endpoints.is_empty() {
1028                    let computed_size = types::CompactArray(types::Struct { version })
1029                        .compute_size(&self.node_endpoints)?;
1030                    if computed_size > std::u32::MAX as usize {
1031                        bail!(
1032                            "Tagged field is too large to encode ({} bytes)",
1033                            computed_size
1034                        );
1035                    }
1036                    types::UnsignedVarInt.encode(buf, 0)?;
1037                    types::UnsignedVarInt.encode(buf, computed_size as u32)?;
1038                    types::CompactArray(types::Struct { version })
1039                        .encode(buf, &self.node_endpoints)?;
1040                }
1041            }
1042            write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
1043        }
1044        Ok(())
1045    }
1046    fn compute_size(&self, version: i16) -> Result<usize> {
1047        let mut total_size = 0;
1048        if version >= 9 {
1049            total_size +=
1050                types::CompactArray(types::Struct { version }).compute_size(&self.responses)?;
1051        } else {
1052            total_size += types::Array(types::Struct { version }).compute_size(&self.responses)?;
1053        }
1054        total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
1055        if version >= 9 {
1056            let mut num_tagged_fields = self.unknown_tagged_fields.len();
1057            if version >= 10 {
1058                if !self.node_endpoints.is_empty() {
1059                    num_tagged_fields += 1;
1060                }
1061            }
1062            if num_tagged_fields > std::u32::MAX as usize {
1063                bail!(
1064                    "Too many tagged fields to encode ({} fields)",
1065                    num_tagged_fields
1066                );
1067            }
1068            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1069            if version >= 10 {
1070                if !self.node_endpoints.is_empty() {
1071                    let computed_size = types::CompactArray(types::Struct { version })
1072                        .compute_size(&self.node_endpoints)?;
1073                    if computed_size > std::u32::MAX as usize {
1074                        bail!(
1075                            "Tagged field is too large to encode ({} bytes)",
1076                            computed_size
1077                        );
1078                    }
1079                    total_size += types::UnsignedVarInt.compute_size(0)?;
1080                    total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
1081                    total_size += computed_size;
1082                }
1083            }
1084            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1085        }
1086        Ok(total_size)
1087    }
1088}
1089
1090#[cfg(feature = "client")]
1091impl Decodable for ProduceResponse {
1092    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1093        if version < 3 || version > 13 {
1094            bail!("specified version not supported by this message type");
1095        }
1096        let responses = if version >= 9 {
1097            types::CompactArray(types::Struct { version }).decode(buf)?
1098        } else {
1099            types::Array(types::Struct { version }).decode(buf)?
1100        };
1101        let throttle_time_ms = types::Int32.decode(buf)?;
1102        let mut node_endpoints = Default::default();
1103        let mut unknown_tagged_fields = BTreeMap::new();
1104        if version >= 9 {
1105            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1106            for _ in 0..num_tagged_fields {
1107                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1108                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1109                match tag {
1110                    0 => {
1111                        if version >= 10 {
1112                            node_endpoints =
1113                                types::CompactArray(types::Struct { version }).decode(buf)?;
1114                        } else {
1115                            bail!("Tag {} is not valid for version {}", tag, version);
1116                        }
1117                    }
1118                    _ => {
1119                        let unknown_value = buf.try_get_bytes(size as usize)?;
1120                        unknown_tagged_fields.insert(tag as i32, unknown_value);
1121                    }
1122                }
1123            }
1124        }
1125        Ok(Self {
1126            responses,
1127            throttle_time_ms,
1128            node_endpoints,
1129            unknown_tagged_fields,
1130        })
1131    }
1132}
1133
1134impl Default for ProduceResponse {
1135    fn default() -> Self {
1136        Self {
1137            responses: Default::default(),
1138            throttle_time_ms: 0,
1139            node_endpoints: Default::default(),
1140            unknown_tagged_fields: BTreeMap::new(),
1141        }
1142    }
1143}
1144
1145impl Message for ProduceResponse {
1146    const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
1147    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1148}
1149
1150/// Valid versions: 3-13
1151#[non_exhaustive]
1152#[derive(Debug, Clone, PartialEq)]
1153pub struct TopicProduceResponse {
1154    /// The topic name.
1155    ///
1156    /// Supported API versions: 3-12
1157    pub name: super::TopicName,
1158
1159    /// The unique topic ID
1160    ///
1161    /// Supported API versions: 13
1162    pub topic_id: Uuid,
1163
1164    /// Each partition that we produced to within the topic.
1165    ///
1166    /// Supported API versions: 3-13
1167    pub partition_responses: Vec<PartitionProduceResponse>,
1168
1169    /// Other tagged fields
1170    pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
1171}
1172
1173impl TopicProduceResponse {
1174    /// Sets `name` to the passed value.
1175    ///
1176    /// The topic name.
1177    ///
1178    /// Supported API versions: 3-12
1179    pub fn with_name(mut self, value: super::TopicName) -> Self {
1180        self.name = value;
1181        self
1182    }
1183    /// Sets `topic_id` to the passed value.
1184    ///
1185    /// The unique topic ID
1186    ///
1187    /// Supported API versions: 13
1188    pub fn with_topic_id(mut self, value: Uuid) -> Self {
1189        self.topic_id = value;
1190        self
1191    }
1192    /// Sets `partition_responses` to the passed value.
1193    ///
1194    /// Each partition that we produced to within the topic.
1195    ///
1196    /// Supported API versions: 3-13
1197    pub fn with_partition_responses(mut self, value: Vec<PartitionProduceResponse>) -> Self {
1198        self.partition_responses = value;
1199        self
1200    }
1201    /// Sets unknown_tagged_fields to the passed value.
1202    pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
1203        self.unknown_tagged_fields = value;
1204        self
1205    }
1206    /// Inserts an entry into unknown_tagged_fields.
1207    pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
1208        self.unknown_tagged_fields.insert(key, value);
1209        self
1210    }
1211}
1212
1213#[cfg(feature = "broker")]
1214impl Encodable for TopicProduceResponse {
1215    fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
1216        if version < 3 || version > 13 {
1217            bail!("specified version not supported by this message type");
1218        }
1219        if version <= 12 {
1220            if version >= 9 {
1221                types::CompactString.encode(buf, &self.name)?;
1222            } else {
1223                types::String.encode(buf, &self.name)?;
1224            }
1225        }
1226        if version >= 13 {
1227            types::Uuid.encode(buf, &self.topic_id)?;
1228        }
1229        if version >= 9 {
1230            types::CompactArray(types::Struct { version })
1231                .encode(buf, &self.partition_responses)?;
1232        } else {
1233            types::Array(types::Struct { version }).encode(buf, &self.partition_responses)?;
1234        }
1235        if version >= 9 {
1236            let num_tagged_fields = self.unknown_tagged_fields.len();
1237            if num_tagged_fields > std::u32::MAX as usize {
1238                bail!(
1239                    "Too many tagged fields to encode ({} fields)",
1240                    num_tagged_fields
1241                );
1242            }
1243            types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
1244
1245            write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
1246        }
1247        Ok(())
1248    }
1249    fn compute_size(&self, version: i16) -> Result<usize> {
1250        let mut total_size = 0;
1251        if version <= 12 {
1252            if version >= 9 {
1253                total_size += types::CompactString.compute_size(&self.name)?;
1254            } else {
1255                total_size += types::String.compute_size(&self.name)?;
1256            }
1257        }
1258        if version >= 13 {
1259            total_size += types::Uuid.compute_size(&self.topic_id)?;
1260        }
1261        if version >= 9 {
1262            total_size += types::CompactArray(types::Struct { version })
1263                .compute_size(&self.partition_responses)?;
1264        } else {
1265            total_size +=
1266                types::Array(types::Struct { version }).compute_size(&self.partition_responses)?;
1267        }
1268        if version >= 9 {
1269            let num_tagged_fields = self.unknown_tagged_fields.len();
1270            if num_tagged_fields > std::u32::MAX as usize {
1271                bail!(
1272                    "Too many tagged fields to encode ({} fields)",
1273                    num_tagged_fields
1274                );
1275            }
1276            total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
1277
1278            total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
1279        }
1280        Ok(total_size)
1281    }
1282}
1283
1284#[cfg(feature = "client")]
1285impl Decodable for TopicProduceResponse {
1286    fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
1287        if version < 3 || version > 13 {
1288            bail!("specified version not supported by this message type");
1289        }
1290        let name = if version <= 12 {
1291            if version >= 9 {
1292                types::CompactString.decode(buf)?
1293            } else {
1294                types::String.decode(buf)?
1295            }
1296        } else {
1297            Default::default()
1298        };
1299        let topic_id = if version >= 13 {
1300            types::Uuid.decode(buf)?
1301        } else {
1302            Uuid::nil()
1303        };
1304        let partition_responses = if version >= 9 {
1305            types::CompactArray(types::Struct { version }).decode(buf)?
1306        } else {
1307            types::Array(types::Struct { version }).decode(buf)?
1308        };
1309        let mut unknown_tagged_fields = BTreeMap::new();
1310        if version >= 9 {
1311            let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
1312            for _ in 0..num_tagged_fields {
1313                let tag: u32 = types::UnsignedVarInt.decode(buf)?;
1314                let size: u32 = types::UnsignedVarInt.decode(buf)?;
1315                let unknown_value = buf.try_get_bytes(size as usize)?;
1316                unknown_tagged_fields.insert(tag as i32, unknown_value);
1317            }
1318        }
1319        Ok(Self {
1320            name,
1321            topic_id,
1322            partition_responses,
1323            unknown_tagged_fields,
1324        })
1325    }
1326}
1327
1328impl Default for TopicProduceResponse {
1329    fn default() -> Self {
1330        Self {
1331            name: Default::default(),
1332            topic_id: Uuid::nil(),
1333            partition_responses: Default::default(),
1334            unknown_tagged_fields: BTreeMap::new(),
1335        }
1336    }
1337}
1338
1339impl Message for TopicProduceResponse {
1340    const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
1341    const DEPRECATED_VERSIONS: Option<VersionRange> = None;
1342}
1343
1344impl HeaderVersion for ProduceResponse {
1345    fn header_version(version: i16) -> i16 {
1346        if version >= 9 {
1347            1
1348        } else {
1349            0
1350        }
1351    }
1352}