kafka_api/schemata/
produce_request.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::ReadBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 and 2 are the same as version 0.
21//
22// Version 3 adds the transactional ID, which is used for authorization when attempting to write
23// transactional data.  Version 3 also adds support for Kafka Message Format v2.
24//
25// Version 4 is the same as version 3, but the requester must be prepared to handle a
26// KAFKA_STORAGE_ERROR.
27//
28// Version 5 and 6 are the same as version 3.
29//
30// Starting in version 7, records can be produced using ZStandard compression.  See KIP-110.
31//
32// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
33//
34// Version 9 enables flexible versions.
35
36#[derive(Debug, Default, Clone)]
37pub struct ProduceRequest {
38    /// The transactional ID, or null if the producer is not transactional.
39    pub transactional_id: Option<String>,
40    /// The number of acknowledgments the producer requires the leader to have received before
41    /// considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the
42    /// leader and -1 for the full ISR.
43    pub acks: i16,
44    /// The timeout to await a response in milliseconds.
45    pub timeout_ms: i32,
46    /// Each topic to produce to.
47    pub topic_data: Vec<TopicProduceData>,
48    /// Unknown tagged fields.
49    pub unknown_tagged_fields: Vec<RawTaggedField>,
50}
51
52impl Decodable for ProduceRequest {
53    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
54        let mut this = ProduceRequest::default();
55        if version >= 3 {
56            this.transactional_id = NullableString(version >= 9).decode(buf)?;
57        }
58        this.acks = Int16.decode(buf)?;
59        this.timeout_ms = Int32.decode(buf)?;
60        this.topic_data = NullableArray(Struct(version), version >= 9)
61            .decode(buf)?
62            .ok_or_else(|| err_decode_message_null("topic_data"))?;
63        if version >= 9 {
64            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
65        }
66        Ok(this)
67    }
68}
69
70#[derive(Debug, Default, Clone)]
71pub struct TopicProduceData {
72    /// The topic name.
73    pub name: String,
74    /// Each partition to produce to.
75    pub partition_data: Vec<PartitionProduceData>,
76    /// Unknown tagged fields.
77    pub unknown_tagged_fields: Vec<RawTaggedField>,
78}
79
80impl Decodable for TopicProduceData {
81    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
82        if version > 9 {
83            Err(err_decode_message_unsupported(version, "TopicProduceData"))?
84        }
85        let mut this = TopicProduceData {
86            name: NullableString(version >= 9)
87                .decode(buf)?
88                .ok_or_else(|| err_decode_message_null("name"))?,
89            partition_data: NullableArray(Struct(version), version >= 9)
90                .decode(buf)?
91                .ok_or_else(|| err_decode_message_null("partition_data"))?,
92            ..Default::default()
93        };
94        if version >= 9 {
95            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
96        }
97        Ok(this)
98    }
99}
100
101#[derive(Debug, Default, Clone)]
102pub struct PartitionProduceData {
103    /// The partition index.
104    pub index: i32,
105    /// The record data to be produced.
106    pub records: Option<Vec<u8>>,
107    /// Unknown tagged fields.
108    pub unknown_tagged_fields: Vec<RawTaggedField>,
109}
110
111impl Decodable for PartitionProduceData {
112    fn read<B: ReadBytesExt>(buf: &mut B, version: i16) -> IoResult<Self> {
113        if version > 9 {
114            Err(err_decode_message_unsupported(
115                version,
116                "PartitionProduceData",
117            ))?
118        }
119        let mut this = PartitionProduceData {
120            index: Int32.decode(buf)?,
121            records: NullableBytes(version >= 9).decode(buf)?,
122            ..Default::default()
123        };
124        if version >= 9 {
125            this.unknown_tagged_fields = RawTaggedFieldList.decode(buf)?;
126        }
127        Ok(this)
128    }
129}