1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string,
10 string_len,
11};
12use crate::tagged_fields::{encode_to_bytes, read_tagged_fields, tagged_fields_len, WriteTaggedFields};
13use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
14
15pub const API_KEY: i16 = 0;
16pub const MIN_VERSION: i16 = 3;
17pub const MAX_VERSION: i16 = 13;
18pub const FLEXIBLE_MIN: i16 = 9;
19
20#[inline]
21fn is_flexible(version: i16) -> bool { version >= FLEXIBLE_MIN }
22
23#[derive(Debug, Clone, PartialEq, Eq, Default)]
24pub struct ProduceResponse {
25 pub responses: Vec<TopicProduceResponse>,
26 pub throttle_time_ms: i32,
27 pub node_endpoints: Vec<NodeEndpoint>,
28 pub unknown_tagged_fields: UnknownTaggedFields,
29}
30
31impl Encode for ProduceResponse {
32 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
33 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
34 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
35 }
36 let flex = is_flexible(version);
37 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.responses).len(), flex); for it in &self.responses { it.encode(buf, version)?; } } }
38 if version >= 1 { put_i32(buf, self.throttle_time_ms) }
39 if flex {
40 let mut tagged = WriteTaggedFields::new();
41 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
42 let payload = encode_to_bytes({ let prefix = crate::primitives::array::array_len_prefix_len((self.node_endpoints).len(), flex); let body: usize = (self.node_endpoints).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }, |b| { { crate::primitives::array::put_array_len(b, (self.node_endpoints).len(), flex); for it in &self.node_endpoints { it.encode(b, version)?; } }; Ok(()) });
43 tagged.add(0, payload);
44 }
45 tagged.write(buf, &self.unknown_tagged_fields);
46 }
47 Ok(())
48 }
49 fn encoded_len(&self, version: i16) -> usize {
50 let flex = is_flexible(version);
51 let mut n: usize = 0;
52 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.responses).len(), flex); let body: usize = (self.responses).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
53 if version >= 1 { n += 4; }
54 if flex {
55 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
56 if !(crate::codegen_helpers::is_default(&self.node_endpoints)) {
57 known_pairs.push((0, { let prefix = crate::primitives::array::array_len_prefix_len((self.node_endpoints).len(), flex); let body: usize = (self.node_endpoints).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }));
58 }
59 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
60 }
61 n
62 }
63}
64
65impl<'de> Decode<'de> for ProduceResponse {
66 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
67 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
68 return Err(ProtocolError::UnsupportedVersion { api_key: API_KEY, version });
69 }
70 let flex = is_flexible(version);
71 let mut out = Self::default();
72 if version >= 0 { out.responses = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(TopicProduceResponse::decode(buf, version)?); } v }; }
73 if version >= 1 { out.throttle_time_ms = get_i32(buf)?; }
74 if flex {
75 let mut tag_node_endpoints = None;
77 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
78 match tag {
79 0 => { tag_node_endpoints = Some({ let b: &mut &[u8] = payload; { let n = crate::primitives::array::get_array_len(b, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(NodeEndpoint::decode(b, version)?); } v } }); Ok(true) }
80 _ => Ok(false),
81 }
82 })?;
83 if let Some(v) = tag_node_endpoints { out.node_endpoints = v; }
84 }
85 Ok(out)
86 }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Default)]
90pub struct TopicProduceResponse {
91 pub name: String,
92 pub topic_id: crate::primitives::uuid::Uuid,
93 pub partition_responses: Vec<PartitionProduceResponse>,
94 pub unknown_tagged_fields: UnknownTaggedFields,
95}
96
97impl Encode for TopicProduceResponse {
98 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
99 let flex = version >= 9;
100 if version >= 0 && version <= 12 { if flex { put_compact_string(buf, &self.name) } else { put_string(buf, &self.name) } }
101 if version >= 13 { crate::primitives::uuid::put_uuid(buf, self.topic_id) }
102 if version >= 0 { { crate::primitives::array::put_array_len(buf, (self.partition_responses).len(), flex); for it in &self.partition_responses { it.encode(buf, version)?; } } }
103 if flex {
104 let tagged = WriteTaggedFields::new();
105 tagged.write(buf, &self.unknown_tagged_fields);
106 }
107 Ok(())
108 }
109 fn encoded_len(&self, version: i16) -> usize {
110 let flex = version >= 9;
111 let mut n: usize = 0;
112 if version >= 0 && version <= 12 { n += if flex { compact_string_len(&self.name) } else { string_len(&self.name) }; }
113 if version >= 13 { n += 16; }
114 if version >= 0 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.partition_responses).len(), flex); let body: usize = (self.partition_responses).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
115 if flex {
116 let known_pairs: Vec<(u32, usize)> = Vec::new();
117 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
118 }
119 n
120 }
121}
122
123impl<'de> Decode<'de> for TopicProduceResponse {
124 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
125 let flex = version >= 9;
126 let mut out = Self::default();
127 if version >= 0 && version <= 12 { out.name = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
128 if version >= 13 { out.topic_id = crate::primitives::uuid::get_uuid(buf)?; }
129 if version >= 0 { out.partition_responses = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(PartitionProduceResponse::decode(buf, version)?); } v }; }
130 if flex {
131 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
132 Ok(false)
133 })?;
134 }
135 Ok(out)
136 }
137}
138
139#[derive(Debug, Clone, PartialEq, Eq)]
140pub struct PartitionProduceResponse {
141 pub index: i32,
142 pub error_code: i16,
143 pub base_offset: i64,
144 pub log_append_time_ms: i64,
145 pub log_start_offset: i64,
146 pub record_errors: Vec<BatchIndexAndErrorMessage>,
147 pub error_message: Option<String>,
148 pub current_leader: LeaderIdAndEpoch,
149 pub unknown_tagged_fields: UnknownTaggedFields,
150}
151
152impl Default for PartitionProduceResponse {
153 fn default() -> Self {
154 Self {
155 index: 0i32,
156 error_code: 0i16,
157 base_offset: 0i64,
158 log_append_time_ms: -1i64,
159 log_start_offset: -1i64,
160 record_errors: Vec::new(),
161 error_message: None,
162 current_leader: Default::default(),
163 unknown_tagged_fields: Default::default(),
164 }
165 }
166}
167
168impl Encode for PartitionProduceResponse {
169 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
170 let flex = version >= 9;
171 if version >= 0 { put_i32(buf, self.index) }
172 if version >= 0 { put_i16(buf, self.error_code) }
173 if version >= 0 { put_i64(buf, self.base_offset) }
174 if version >= 2 { put_i64(buf, self.log_append_time_ms) }
175 if version >= 5 { put_i64(buf, self.log_start_offset) }
176 if version >= 8 { { crate::primitives::array::put_array_len(buf, (self.record_errors).len(), flex); for it in &self.record_errors { it.encode(buf, version)?; } } }
177 if version >= 8 { if flex { put_compact_nullable_string(buf, self.error_message.as_deref()) } else { put_nullable_string(buf, self.error_message.as_deref()) } }
178 if flex {
179 let mut tagged = WriteTaggedFields::new();
180 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
181 let payload = encode_to_bytes(self.current_leader.encoded_len(version), |b| { self.current_leader.encode(b, version)?; Ok(()) });
182 tagged.add(0, payload);
183 }
184 tagged.write(buf, &self.unknown_tagged_fields);
185 }
186 Ok(())
187 }
188 fn encoded_len(&self, version: i16) -> usize {
189 let flex = version >= 9;
190 let mut n: usize = 0;
191 if version >= 0 { n += 4; }
192 if version >= 0 { n += 2; }
193 if version >= 0 { n += 8; }
194 if version >= 2 { n += 8; }
195 if version >= 5 { n += 8; }
196 if version >= 8 { n += { let prefix = crate::primitives::array::array_len_prefix_len((self.record_errors).len(), flex); let body: usize = (self.record_errors).iter().map(|it| it.encoded_len(version)).sum(); prefix + body }; }
197 if version >= 8 { n += if flex { compact_nullable_string_len(self.error_message.as_deref()) } else { nullable_string_len(self.error_message.as_deref()) }; }
198 if flex {
199 let mut known_pairs: Vec<(u32, usize)> = Vec::new();
200 if !(crate::codegen_helpers::is_default(&self.current_leader)) {
201 known_pairs.push((0, self.current_leader.encoded_len(version)));
202 }
203 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
204 }
205 n
206 }
207}
208
209impl<'de> Decode<'de> for PartitionProduceResponse {
210 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
211 let flex = version >= 9;
212 let mut out = Self::default();
213 if version >= 0 { out.index = get_i32(buf)?; }
214 if version >= 0 { out.error_code = get_i16(buf)?; }
215 if version >= 0 { out.base_offset = get_i64(buf)?; }
216 if version >= 2 { out.log_append_time_ms = get_i64(buf)?; }
217 if version >= 5 { out.log_start_offset = get_i64(buf)?; }
218 if version >= 8 { out.record_errors = { let n = crate::primitives::array::get_array_len(buf, flex)?; let mut v = Vec::with_capacity(n); for _ in 0..n { v.push(BatchIndexAndErrorMessage::decode(buf, version)?); } v }; }
219 if version >= 8 { out.error_message = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
220 if flex {
221 let mut tag_current_leader = None;
223 out.unknown_tagged_fields = read_tagged_fields(buf, |tag, payload| {
224 match tag {
225 0 => { tag_current_leader = Some({ let b: &mut &[u8] = payload; LeaderIdAndEpoch::decode(b, version)? }); Ok(true) }
226 _ => Ok(false),
227 }
228 })?;
229 if let Some(v) = tag_current_leader { out.current_leader = v; }
230 }
231 Ok(out)
232 }
233}
234
235#[derive(Debug, Clone, PartialEq, Eq, Default)]
236pub struct BatchIndexAndErrorMessage {
237 pub batch_index: i32,
238 pub batch_index_error_message: Option<String>,
239 pub unknown_tagged_fields: UnknownTaggedFields,
240}
241
242impl Encode for BatchIndexAndErrorMessage {
243 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
244 let flex = version >= 9;
245 if version >= 8 { put_i32(buf, self.batch_index) }
246 if version >= 8 { if flex { put_compact_nullable_string(buf, self.batch_index_error_message.as_deref()) } else { put_nullable_string(buf, self.batch_index_error_message.as_deref()) } }
247 if flex {
248 let tagged = WriteTaggedFields::new();
249 tagged.write(buf, &self.unknown_tagged_fields);
250 }
251 Ok(())
252 }
253 fn encoded_len(&self, version: i16) -> usize {
254 let flex = version >= 9;
255 let mut n: usize = 0;
256 if version >= 8 { n += 4; }
257 if version >= 8 { n += if flex { compact_nullable_string_len(self.batch_index_error_message.as_deref()) } else { nullable_string_len(self.batch_index_error_message.as_deref()) }; }
258 if flex {
259 let known_pairs: Vec<(u32, usize)> = Vec::new();
260 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
261 }
262 n
263 }
264}
265
266impl<'de> Decode<'de> for BatchIndexAndErrorMessage {
267 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
268 let flex = version >= 9;
269 let mut out = Self::default();
270 if version >= 8 { out.batch_index = get_i32(buf)?; }
271 if version >= 8 { out.batch_index_error_message = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
272 if flex {
273 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
274 Ok(false)
275 })?;
276 }
277 Ok(out)
278 }
279}
280
281#[derive(Debug, Clone, PartialEq, Eq)]
282pub struct LeaderIdAndEpoch {
283 pub leader_id: i32,
284 pub leader_epoch: i32,
285 pub unknown_tagged_fields: UnknownTaggedFields,
286}
287
288impl Default for LeaderIdAndEpoch {
289 fn default() -> Self {
290 Self {
291 leader_id: -1i32,
292 leader_epoch: -1i32,
293 unknown_tagged_fields: Default::default(),
294 }
295 }
296}
297
298impl Encode for LeaderIdAndEpoch {
299 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
300 let flex = version >= 9;
301 if version >= 10 { put_i32(buf, self.leader_id) }
302 if version >= 10 { put_i32(buf, self.leader_epoch) }
303 if flex {
304 let tagged = WriteTaggedFields::new();
305 tagged.write(buf, &self.unknown_tagged_fields);
306 }
307 Ok(())
308 }
309 fn encoded_len(&self, version: i16) -> usize {
310 let flex = version >= 9;
311 let mut n: usize = 0;
312 if version >= 10 { n += 4; }
313 if version >= 10 { n += 4; }
314 if flex {
315 let known_pairs: Vec<(u32, usize)> = Vec::new();
316 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
317 }
318 n
319 }
320}
321
322impl<'de> Decode<'de> for LeaderIdAndEpoch {
323 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
324 let flex = version >= 9;
325 let mut out = Self::default();
326 if version >= 10 { out.leader_id = get_i32(buf)?; }
327 if version >= 10 { out.leader_epoch = get_i32(buf)?; }
328 if flex {
329 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
330 Ok(false)
331 })?;
332 }
333 Ok(out)
334 }
335}
336
337#[derive(Debug, Clone, PartialEq, Eq, Default)]
338pub struct NodeEndpoint {
339 pub node_id: i32,
340 pub host: String,
341 pub port: i32,
342 pub rack: Option<String>,
343 pub unknown_tagged_fields: UnknownTaggedFields,
344}
345
346impl Encode for NodeEndpoint {
347 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
348 let flex = version >= 9;
349 if version >= 10 { put_i32(buf, self.node_id) }
350 if version >= 10 { if flex { put_compact_string(buf, &self.host) } else { put_string(buf, &self.host) } }
351 if version >= 10 { put_i32(buf, self.port) }
352 if version >= 10 { if flex { put_compact_nullable_string(buf, self.rack.as_deref()) } else { put_nullable_string(buf, self.rack.as_deref()) } }
353 if flex {
354 let tagged = WriteTaggedFields::new();
355 tagged.write(buf, &self.unknown_tagged_fields);
356 }
357 Ok(())
358 }
359 fn encoded_len(&self, version: i16) -> usize {
360 let flex = version >= 9;
361 let mut n: usize = 0;
362 if version >= 10 { n += 4; }
363 if version >= 10 { n += if flex { compact_string_len(&self.host) } else { string_len(&self.host) }; }
364 if version >= 10 { n += 4; }
365 if version >= 10 { n += if flex { compact_nullable_string_len(self.rack.as_deref()) } else { nullable_string_len(self.rack.as_deref()) }; }
366 if flex {
367 let known_pairs: Vec<(u32, usize)> = Vec::new();
368 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
369 }
370 n
371 }
372}
373
374impl<'de> Decode<'de> for NodeEndpoint {
375 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
376 let flex = version >= 9;
377 let mut out = Self::default();
378 if version >= 10 { out.node_id = get_i32(buf)?; }
379 if version >= 10 { out.host = if flex { get_compact_string_owned(buf)? } else { get_string_owned(buf)? }; }
380 if version >= 10 { out.port = get_i32(buf)?; }
381 if version >= 10 { out.rack = if flex { get_compact_nullable_string_owned(buf)? } else { get_nullable_string_owned(buf)? }; }
382 if flex {
383 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| {
384 Ok(false)
385 })?;
386 }
387 Ok(out)
388 }
389}
390
391#[must_use]
394#[allow(unused_comparisons)]
395pub fn default_json(version: i16) -> ::serde_json::Value {
396 let mut obj = ::serde_json::Map::new();
397 obj.insert("responses".to_string(), ::serde_json::Value::Array(vec![]));
398 if version >= 1 {
399 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
400 }
401 if version >= 10 {
402 obj.insert("nodeEndpoints".to_string(), ::serde_json::Value::Array(vec![]));
403 }
404 ::serde_json::Value::Object(obj)
405}