1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionProduceData {
24 pub index: i32,
28
29 pub records: Option<Bytes>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl PartitionProduceData {
39 pub fn with_index(mut self, value: i32) -> Self {
45 self.index = value;
46 self
47 }
48 pub fn with_records(mut self, value: Option<Bytes>) -> Self {
54 self.records = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for PartitionProduceData {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 types::Int32.encode(buf, &self.index)?;
73 if version >= 9 {
74 types::CompactBytes.encode(buf, &self.records)?;
75 } else {
76 types::Bytes.encode(buf, &self.records)?;
77 }
78 if version >= 9 {
79 let num_tagged_fields = self.unknown_tagged_fields.len();
80 if num_tagged_fields > std::u32::MAX as usize {
81 bail!(
82 "Too many tagged fields to encode ({} fields)",
83 num_tagged_fields
84 );
85 }
86 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
87
88 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
89 }
90 Ok(())
91 }
92 fn compute_size(&self, version: i16) -> Result<usize> {
93 let mut total_size = 0;
94 total_size += types::Int32.compute_size(&self.index)?;
95 if version >= 9 {
96 total_size += types::CompactBytes.compute_size(&self.records)?;
97 } else {
98 total_size += types::Bytes.compute_size(&self.records)?;
99 }
100 if version >= 9 {
101 let num_tagged_fields = self.unknown_tagged_fields.len();
102 if num_tagged_fields > std::u32::MAX as usize {
103 bail!(
104 "Too many tagged fields to encode ({} fields)",
105 num_tagged_fields
106 );
107 }
108 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
109
110 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
111 }
112 Ok(total_size)
113 }
114}
115
116#[cfg(feature = "broker")]
117impl Decodable for PartitionProduceData {
118 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
119 let index = types::Int32.decode(buf)?;
120 let records = if version >= 9 {
121 types::CompactBytes.decode(buf)?
122 } else {
123 types::Bytes.decode(buf)?
124 };
125 let mut unknown_tagged_fields = BTreeMap::new();
126 if version >= 9 {
127 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
128 for _ in 0..num_tagged_fields {
129 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
130 let size: u32 = types::UnsignedVarInt.decode(buf)?;
131 let unknown_value = buf.try_get_bytes(size as usize)?;
132 unknown_tagged_fields.insert(tag as i32, unknown_value);
133 }
134 }
135 Ok(Self {
136 index,
137 records,
138 unknown_tagged_fields,
139 })
140 }
141}
142
143impl Default for PartitionProduceData {
144 fn default() -> Self {
145 Self {
146 index: 0,
147 records: Some(Default::default()),
148 unknown_tagged_fields: BTreeMap::new(),
149 }
150 }
151}
152
153impl Message for PartitionProduceData {
154 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
155 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 6 });
156}
157
158#[non_exhaustive]
160#[derive(Debug, Clone, PartialEq)]
161pub struct ProduceRequest {
162 pub transactional_id: Option<super::TransactionalId>,
166
167 pub acks: i16,
171
172 pub timeout_ms: i32,
176
177 pub topic_data: Vec<TopicProduceData>,
181
182 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
184}
185
186impl ProduceRequest {
187 pub fn with_transactional_id(mut self, value: Option<super::TransactionalId>) -> Self {
193 self.transactional_id = value;
194 self
195 }
196 pub fn with_acks(mut self, value: i16) -> Self {
202 self.acks = value;
203 self
204 }
205 pub fn with_timeout_ms(mut self, value: i32) -> Self {
211 self.timeout_ms = value;
212 self
213 }
214 pub fn with_topic_data(mut self, value: Vec<TopicProduceData>) -> Self {
220 self.topic_data = value;
221 self
222 }
223 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
225 self.unknown_tagged_fields = value;
226 self
227 }
228 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
230 self.unknown_tagged_fields.insert(key, value);
231 self
232 }
233}
234
235#[cfg(feature = "client")]
236impl Encodable for ProduceRequest {
237 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
238 if version >= 3 {
239 if version >= 9 {
240 types::CompactString.encode(buf, &self.transactional_id)?;
241 } else {
242 types::String.encode(buf, &self.transactional_id)?;
243 }
244 } else {
245 if !self.transactional_id.is_none() {
246 bail!("A field is set that is not available on the selected protocol version");
247 }
248 }
249 types::Int16.encode(buf, &self.acks)?;
250 types::Int32.encode(buf, &self.timeout_ms)?;
251 if version >= 9 {
252 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_data)?;
253 } else {
254 types::Array(types::Struct { version }).encode(buf, &self.topic_data)?;
255 }
256 if version >= 9 {
257 let num_tagged_fields = self.unknown_tagged_fields.len();
258 if num_tagged_fields > std::u32::MAX as usize {
259 bail!(
260 "Too many tagged fields to encode ({} fields)",
261 num_tagged_fields
262 );
263 }
264 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
265
266 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
267 }
268 Ok(())
269 }
270 fn compute_size(&self, version: i16) -> Result<usize> {
271 let mut total_size = 0;
272 if version >= 3 {
273 if version >= 9 {
274 total_size += types::CompactString.compute_size(&self.transactional_id)?;
275 } else {
276 total_size += types::String.compute_size(&self.transactional_id)?;
277 }
278 } else {
279 if !self.transactional_id.is_none() {
280 bail!("A field is set that is not available on the selected protocol version");
281 }
282 }
283 total_size += types::Int16.compute_size(&self.acks)?;
284 total_size += types::Int32.compute_size(&self.timeout_ms)?;
285 if version >= 9 {
286 total_size +=
287 types::CompactArray(types::Struct { version }).compute_size(&self.topic_data)?;
288 } else {
289 total_size += types::Array(types::Struct { version }).compute_size(&self.topic_data)?;
290 }
291 if version >= 9 {
292 let num_tagged_fields = self.unknown_tagged_fields.len();
293 if num_tagged_fields > std::u32::MAX as usize {
294 bail!(
295 "Too many tagged fields to encode ({} fields)",
296 num_tagged_fields
297 );
298 }
299 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
300
301 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
302 }
303 Ok(total_size)
304 }
305}
306
307#[cfg(feature = "broker")]
308impl Decodable for ProduceRequest {
309 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
310 let transactional_id = if version >= 3 {
311 if version >= 9 {
312 types::CompactString.decode(buf)?
313 } else {
314 types::String.decode(buf)?
315 }
316 } else {
317 None
318 };
319 let acks = types::Int16.decode(buf)?;
320 let timeout_ms = types::Int32.decode(buf)?;
321 let topic_data = if version >= 9 {
322 types::CompactArray(types::Struct { version }).decode(buf)?
323 } else {
324 types::Array(types::Struct { version }).decode(buf)?
325 };
326 let mut unknown_tagged_fields = BTreeMap::new();
327 if version >= 9 {
328 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
329 for _ in 0..num_tagged_fields {
330 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
331 let size: u32 = types::UnsignedVarInt.decode(buf)?;
332 let unknown_value = buf.try_get_bytes(size as usize)?;
333 unknown_tagged_fields.insert(tag as i32, unknown_value);
334 }
335 }
336 Ok(Self {
337 transactional_id,
338 acks,
339 timeout_ms,
340 topic_data,
341 unknown_tagged_fields,
342 })
343 }
344}
345
346impl Default for ProduceRequest {
347 fn default() -> Self {
348 Self {
349 transactional_id: None,
350 acks: 0,
351 timeout_ms: 0,
352 topic_data: Default::default(),
353 unknown_tagged_fields: BTreeMap::new(),
354 }
355 }
356}
357
358impl Message for ProduceRequest {
359 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
360 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 6 });
361}
362
363#[non_exhaustive]
365#[derive(Debug, Clone, PartialEq)]
366pub struct TopicProduceData {
367 pub name: super::TopicName,
371
372 pub partition_data: Vec<PartitionProduceData>,
376
377 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
379}
380
381impl TopicProduceData {
382 pub fn with_name(mut self, value: super::TopicName) -> Self {
388 self.name = value;
389 self
390 }
391 pub fn with_partition_data(mut self, value: Vec<PartitionProduceData>) -> Self {
397 self.partition_data = value;
398 self
399 }
400 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
402 self.unknown_tagged_fields = value;
403 self
404 }
405 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
407 self.unknown_tagged_fields.insert(key, value);
408 self
409 }
410}
411
412#[cfg(feature = "client")]
413impl Encodable for TopicProduceData {
414 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
415 if version >= 9 {
416 types::CompactString.encode(buf, &self.name)?;
417 } else {
418 types::String.encode(buf, &self.name)?;
419 }
420 if version >= 9 {
421 types::CompactArray(types::Struct { version }).encode(buf, &self.partition_data)?;
422 } else {
423 types::Array(types::Struct { version }).encode(buf, &self.partition_data)?;
424 }
425 if version >= 9 {
426 let num_tagged_fields = self.unknown_tagged_fields.len();
427 if num_tagged_fields > std::u32::MAX as usize {
428 bail!(
429 "Too many tagged fields to encode ({} fields)",
430 num_tagged_fields
431 );
432 }
433 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
434
435 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
436 }
437 Ok(())
438 }
439 fn compute_size(&self, version: i16) -> Result<usize> {
440 let mut total_size = 0;
441 if version >= 9 {
442 total_size += types::CompactString.compute_size(&self.name)?;
443 } else {
444 total_size += types::String.compute_size(&self.name)?;
445 }
446 if version >= 9 {
447 total_size += types::CompactArray(types::Struct { version })
448 .compute_size(&self.partition_data)?;
449 } else {
450 total_size +=
451 types::Array(types::Struct { version }).compute_size(&self.partition_data)?;
452 }
453 if version >= 9 {
454 let num_tagged_fields = self.unknown_tagged_fields.len();
455 if num_tagged_fields > std::u32::MAX as usize {
456 bail!(
457 "Too many tagged fields to encode ({} fields)",
458 num_tagged_fields
459 );
460 }
461 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
462
463 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
464 }
465 Ok(total_size)
466 }
467}
468
469#[cfg(feature = "broker")]
470impl Decodable for TopicProduceData {
471 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
472 let name = if version >= 9 {
473 types::CompactString.decode(buf)?
474 } else {
475 types::String.decode(buf)?
476 };
477 let partition_data = if version >= 9 {
478 types::CompactArray(types::Struct { version }).decode(buf)?
479 } else {
480 types::Array(types::Struct { version }).decode(buf)?
481 };
482 let mut unknown_tagged_fields = BTreeMap::new();
483 if version >= 9 {
484 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
485 for _ in 0..num_tagged_fields {
486 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
487 let size: u32 = types::UnsignedVarInt.decode(buf)?;
488 let unknown_value = buf.try_get_bytes(size as usize)?;
489 unknown_tagged_fields.insert(tag as i32, unknown_value);
490 }
491 }
492 Ok(Self {
493 name,
494 partition_data,
495 unknown_tagged_fields,
496 })
497 }
498}
499
500impl Default for TopicProduceData {
501 fn default() -> Self {
502 Self {
503 name: Default::default(),
504 partition_data: Default::default(),
505 unknown_tagged_fields: BTreeMap::new(),
506 }
507 }
508}
509
510impl Message for TopicProduceData {
511 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
512 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 6 });
513}
514
515impl HeaderVersion for ProduceRequest {
516 fn header_version(version: i16) -> i16 {
517 if version >= 9 {
518 2
519 } else {
520 1
521 }
522 }
523}