kafka_api/schemata/
produce_response.rs

1// Copyright 2024 tison <wander4096@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use byteorder::WriteBytesExt;
16
17use crate::codec::*;
18use crate::IoResult;
19
20// Version 1 added the throttle time.
21//
22// Version 2 added the log append time.
23//
24// Version 3 is the same as version 2.
25//
26// Version 4 added KAFKA_STORAGE_ERROR as a possible error code.
27//
28// Version 5 added LogStartOffset to filter out spurious
29// OutOfOrderSequenceExceptions on the client.
30//
31// Version 8 added RecordErrors and ErrorMessage to include information about
32// records that cause the whole batch to be dropped.  See KIP-467 for details.
33//
34// Version 9 enables flexible versions.
35
36#[derive(Debug, Default, Clone)]
37pub struct ProduceResponse {
38    /// Each produce response
39    pub responses: Vec<TopicProduceResponse>,
40    /// The duration in milliseconds for which the request was throttled due to a quota violation,
41    /// or zero if the request did not violate any quota.
42    pub throttle_time_ms: i32,
43    /// Unknown tagged fields.
44    pub unknown_tagged_fields: Vec<RawTaggedField>,
45}
46
47impl Encodable for ProduceResponse {
48    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
49        NullableArray(Struct(version), version >= 9).encode(buf, self.responses.as_slice())?;
50        if version > 1 {
51            Int32.encode(buf, self.throttle_time_ms)?;
52        }
53        if version >= 9 {
54            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
55        }
56        Ok(())
57    }
58
59    fn calculate_size(&self, version: i16) -> usize {
60        let mut res = 0;
61        res +=
62            NullableArray(Struct(version), version >= 9).calculate_size(self.responses.as_slice());
63        if version > 1 {
64            res += Int32::SIZE; // self.throttle_time_ms
65        }
66        if version >= 9 {
67            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
68        }
69        res
70    }
71}
72
73#[derive(Debug, Default, Clone)]
74pub struct TopicProduceResponse {
75    /// The topic name.
76    pub name: String,
77    /// Each partition that we produced to within the topic.
78    pub partition_responses: Vec<PartitionProduceResponse>,
79    /// Unknown tagged fields.
80    pub unknown_tagged_fields: Vec<RawTaggedField>,
81}
82
83impl Encodable for TopicProduceResponse {
84    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
85        NullableString(version >= 9).encode(buf, self.name.as_str())?;
86        NullableArray(Struct(version), version >= 9)
87            .encode(buf, self.partition_responses.as_slice())?;
88        if version >= 9 {
89            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
90        }
91        Ok(())
92    }
93
94    fn calculate_size(&self, version: i16) -> usize {
95        let mut res = 0;
96        res += NullableString(version >= 9).calculate_size(self.name.as_str());
97        res += NullableArray(Struct(version), version >= 9)
98            .calculate_size(self.partition_responses.as_slice());
99        if version >= 9 {
100            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
101        }
102        res
103    }
104}
105
106#[derive(Debug, Default, Clone)]
107pub struct PartitionProduceResponse {
108    /// The partition index.
109    pub index: i32,
110    /// The error code, or 0 if there was no error.
111    pub error_code: i16,
112    /// The base offset.
113    pub base_offset: i64,
114    /// The timestamp returned by broker after appending the messages. If CreateTime is used for
115    /// the topic, the timestamp will be -1.  If LogAppendTime is used for the topic, the timestamp
116    /// will be the broker local time when the messages are appended.
117    pub log_append_time_ms: i64,
118    /// The log start offset.
119    pub log_start_offset: i64,
120    /// The batch indices of records that caused the batch to be dropped.
121    pub record_errors: Vec<BatchIndexAndErrorMessage>,
122    /// The global error message summarizing the common root cause of the records that caused the
123    /// batch to be dropped.
124    pub error_message: Option<String>,
125    /// Unknown tagged fields.
126    pub unknown_tagged_fields: Vec<RawTaggedField>,
127}
128
129impl Encodable for PartitionProduceResponse {
130    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
131        Int32.encode(buf, self.index)?;
132        Int16.encode(buf, self.error_code)?;
133        Int64.encode(buf, self.base_offset)?;
134        if version >= 2 {
135            Int64.encode(buf, self.log_append_time_ms)?;
136        }
137        if version >= 5 {
138            Int64.encode(buf, self.log_start_offset)?;
139        }
140        if version >= 8 {
141            NullableArray(Struct(version), version >= 9)
142                .encode(buf, self.record_errors.as_slice())?;
143        }
144        if version >= 8 {
145            NullableString(version >= 9).encode(buf, self.error_message.as_deref())?;
146        }
147        if version >= 9 {
148            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
149        }
150        Ok(())
151    }
152
153    fn calculate_size(&self, version: i16) -> usize {
154        let mut res = 0;
155        res += Int32::SIZE; // self.index
156        res += Int16::SIZE; // self.error_code
157        res += Int64::SIZE; // self.base_offset
158        if version >= 2 {
159            res += Int64::SIZE; // self.log_append_time_ms
160        }
161        if version >= 5 {
162            res += Int64::SIZE; // self.log_start_offset
163        }
164        if version >= 8 {
165            res += NullableArray(Struct(version), version >= 9)
166                .calculate_size(self.record_errors.as_slice());
167        }
168        if version >= 8 {
169            res += NullableString(version >= 9).calculate_size(self.error_message.as_deref());
170        }
171        if version >= 9 {
172            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
173        }
174        res
175    }
176}
177
178#[derive(Debug, Default, Clone)]
179pub struct BatchIndexAndErrorMessage {
180    /// The batch index of the record that cause the batch to be dropped.
181    pub batch_index: i32,
182    /// The error message of the record that caused the batch to be dropped.
183    pub batch_index_error_message: Option<String>,
184    /// Unknown tagged fields.
185    pub unknown_tagged_fields: Vec<RawTaggedField>,
186}
187
188impl Encodable for BatchIndexAndErrorMessage {
189    fn write<B: WriteBytesExt>(&self, buf: &mut B, version: i16) -> IoResult<()> {
190        if version < 8 {
191            Err(err_encode_message_unsupported(
192                version,
193                "BatchIndexAndErrorMessage",
194            ))?
195        }
196        Int32.encode(buf, self.batch_index)?;
197        NullableString(version >= 9).encode(buf, self.batch_index_error_message.as_deref())?;
198        if version >= 9 {
199            RawTaggedFieldList.encode(buf, &self.unknown_tagged_fields)?;
200        }
201        Ok(())
202    }
203
204    fn calculate_size(&self, version: i16) -> usize {
205        let mut res = 0;
206        res += Int32::SIZE; // self.batch_index
207        res +=
208            NullableString(version >= 9).calculate_size(self.batch_index_error_message.as_deref());
209        if version >= 9 {
210            res += RawTaggedFieldList.calculate_size(&self.unknown_tagged_fields);
211        }
212        res
213    }
214}