1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionProduceData {
24 pub index: i32,
28
29 pub records: Option<Bytes>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl PartitionProduceData {
39 pub fn with_index(mut self, value: i32) -> Self {
45 self.index = value;
46 self
47 }
48 pub fn with_records(mut self, value: Option<Bytes>) -> Self {
54 self.records = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for PartitionProduceData {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 11 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.index)?;
76 if version >= 9 {
77 types::CompactBytes.encode(buf, &self.records)?;
78 } else {
79 types::Bytes.encode(buf, &self.records)?;
80 }
81 if version >= 9 {
82 let num_tagged_fields = self.unknown_tagged_fields.len();
83 if num_tagged_fields > std::u32::MAX as usize {
84 bail!(
85 "Too many tagged fields to encode ({} fields)",
86 num_tagged_fields
87 );
88 }
89 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
90
91 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
92 }
93 Ok(())
94 }
95 fn compute_size(&self, version: i16) -> Result<usize> {
96 let mut total_size = 0;
97 total_size += types::Int32.compute_size(&self.index)?;
98 if version >= 9 {
99 total_size += types::CompactBytes.compute_size(&self.records)?;
100 } else {
101 total_size += types::Bytes.compute_size(&self.records)?;
102 }
103 if version >= 9 {
104 let num_tagged_fields = self.unknown_tagged_fields.len();
105 if num_tagged_fields > std::u32::MAX as usize {
106 bail!(
107 "Too many tagged fields to encode ({} fields)",
108 num_tagged_fields
109 );
110 }
111 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
112
113 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
114 }
115 Ok(total_size)
116 }
117}
118
119#[cfg(feature = "broker")]
120impl Decodable for PartitionProduceData {
121 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
122 if version < 0 || version > 11 {
123 bail!("specified version not supported by this message type");
124 }
125 let index = types::Int32.decode(buf)?;
126 let records = if version >= 9 {
127 types::CompactBytes.decode(buf)?
128 } else {
129 types::Bytes.decode(buf)?
130 };
131 let mut unknown_tagged_fields = BTreeMap::new();
132 if version >= 9 {
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 }
141 Ok(Self {
142 index,
143 records,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for PartitionProduceData {
150 fn default() -> Self {
151 Self {
152 index: 0,
153 records: Some(Default::default()),
154 unknown_tagged_fields: BTreeMap::new(),
155 }
156 }
157}
158
159impl Message for PartitionProduceData {
160 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
161 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 6 });
162}
163
164#[non_exhaustive]
166#[derive(Debug, Clone, PartialEq)]
167pub struct ProduceRequest {
168 pub transactional_id: Option<super::TransactionalId>,
172
173 pub acks: i16,
177
178 pub timeout_ms: i32,
182
183 pub topic_data: Vec<TopicProduceData>,
187
188 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
190}
191
192impl ProduceRequest {
193 pub fn with_transactional_id(mut self, value: Option<super::TransactionalId>) -> Self {
199 self.transactional_id = value;
200 self
201 }
202 pub fn with_acks(mut self, value: i16) -> Self {
208 self.acks = value;
209 self
210 }
211 pub fn with_timeout_ms(mut self, value: i32) -> Self {
217 self.timeout_ms = value;
218 self
219 }
220 pub fn with_topic_data(mut self, value: Vec<TopicProduceData>) -> Self {
226 self.topic_data = value;
227 self
228 }
229 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
231 self.unknown_tagged_fields = value;
232 self
233 }
234 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
236 self.unknown_tagged_fields.insert(key, value);
237 self
238 }
239}
240
241#[cfg(feature = "client")]
242impl Encodable for ProduceRequest {
243 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
244 if version < 0 || version > 11 {
245 bail!("specified version not supported by this message type");
246 }
247 if version >= 3 {
248 if version >= 9 {
249 types::CompactString.encode(buf, &self.transactional_id)?;
250 } else {
251 types::String.encode(buf, &self.transactional_id)?;
252 }
253 } else {
254 if !self.transactional_id.is_none() {
255 bail!("A field is set that is not available on the selected protocol version");
256 }
257 }
258 types::Int16.encode(buf, &self.acks)?;
259 types::Int32.encode(buf, &self.timeout_ms)?;
260 if version >= 9 {
261 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_data)?;
262 } else {
263 types::Array(types::Struct { version }).encode(buf, &self.topic_data)?;
264 }
265 if version >= 9 {
266 let num_tagged_fields = self.unknown_tagged_fields.len();
267 if num_tagged_fields > std::u32::MAX as usize {
268 bail!(
269 "Too many tagged fields to encode ({} fields)",
270 num_tagged_fields
271 );
272 }
273 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
274
275 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
276 }
277 Ok(())
278 }
279 fn compute_size(&self, version: i16) -> Result<usize> {
280 let mut total_size = 0;
281 if version >= 3 {
282 if version >= 9 {
283 total_size += types::CompactString.compute_size(&self.transactional_id)?;
284 } else {
285 total_size += types::String.compute_size(&self.transactional_id)?;
286 }
287 } else {
288 if !self.transactional_id.is_none() {
289 bail!("A field is set that is not available on the selected protocol version");
290 }
291 }
292 total_size += types::Int16.compute_size(&self.acks)?;
293 total_size += types::Int32.compute_size(&self.timeout_ms)?;
294 if version >= 9 {
295 total_size +=
296 types::CompactArray(types::Struct { version }).compute_size(&self.topic_data)?;
297 } else {
298 total_size += types::Array(types::Struct { version }).compute_size(&self.topic_data)?;
299 }
300 if version >= 9 {
301 let num_tagged_fields = self.unknown_tagged_fields.len();
302 if num_tagged_fields > std::u32::MAX as usize {
303 bail!(
304 "Too many tagged fields to encode ({} fields)",
305 num_tagged_fields
306 );
307 }
308 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
309
310 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
311 }
312 Ok(total_size)
313 }
314}
315
316#[cfg(feature = "broker")]
317impl Decodable for ProduceRequest {
318 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
319 if version < 0 || version > 11 {
320 bail!("specified version not supported by this message type");
321 }
322 let transactional_id = if version >= 3 {
323 if version >= 9 {
324 types::CompactString.decode(buf)?
325 } else {
326 types::String.decode(buf)?
327 }
328 } else {
329 None
330 };
331 let acks = types::Int16.decode(buf)?;
332 let timeout_ms = types::Int32.decode(buf)?;
333 let topic_data = if version >= 9 {
334 types::CompactArray(types::Struct { version }).decode(buf)?
335 } else {
336 types::Array(types::Struct { version }).decode(buf)?
337 };
338 let mut unknown_tagged_fields = BTreeMap::new();
339 if version >= 9 {
340 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
341 for _ in 0..num_tagged_fields {
342 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
343 let size: u32 = types::UnsignedVarInt.decode(buf)?;
344 let unknown_value = buf.try_get_bytes(size as usize)?;
345 unknown_tagged_fields.insert(tag as i32, unknown_value);
346 }
347 }
348 Ok(Self {
349 transactional_id,
350 acks,
351 timeout_ms,
352 topic_data,
353 unknown_tagged_fields,
354 })
355 }
356}
357
358impl Default for ProduceRequest {
359 fn default() -> Self {
360 Self {
361 transactional_id: None,
362 acks: 0,
363 timeout_ms: 0,
364 topic_data: Default::default(),
365 unknown_tagged_fields: BTreeMap::new(),
366 }
367 }
368}
369
370impl Message for ProduceRequest {
371 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
372 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 6 });
373}
374
375#[non_exhaustive]
377#[derive(Debug, Clone, PartialEq)]
378pub struct TopicProduceData {
379 pub name: super::TopicName,
383
384 pub partition_data: Vec<PartitionProduceData>,
388
389 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
391}
392
393impl TopicProduceData {
394 pub fn with_name(mut self, value: super::TopicName) -> Self {
400 self.name = value;
401 self
402 }
403 pub fn with_partition_data(mut self, value: Vec<PartitionProduceData>) -> Self {
409 self.partition_data = value;
410 self
411 }
412 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
414 self.unknown_tagged_fields = value;
415 self
416 }
417 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
419 self.unknown_tagged_fields.insert(key, value);
420 self
421 }
422}
423
424#[cfg(feature = "client")]
425impl Encodable for TopicProduceData {
426 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
427 if version < 0 || version > 11 {
428 bail!("specified version not supported by this message type");
429 }
430 if version >= 9 {
431 types::CompactString.encode(buf, &self.name)?;
432 } else {
433 types::String.encode(buf, &self.name)?;
434 }
435 if version >= 9 {
436 types::CompactArray(types::Struct { version }).encode(buf, &self.partition_data)?;
437 } else {
438 types::Array(types::Struct { version }).encode(buf, &self.partition_data)?;
439 }
440 if version >= 9 {
441 let num_tagged_fields = self.unknown_tagged_fields.len();
442 if num_tagged_fields > std::u32::MAX as usize {
443 bail!(
444 "Too many tagged fields to encode ({} fields)",
445 num_tagged_fields
446 );
447 }
448 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
449
450 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
451 }
452 Ok(())
453 }
454 fn compute_size(&self, version: i16) -> Result<usize> {
455 let mut total_size = 0;
456 if version >= 9 {
457 total_size += types::CompactString.compute_size(&self.name)?;
458 } else {
459 total_size += types::String.compute_size(&self.name)?;
460 }
461 if version >= 9 {
462 total_size += types::CompactArray(types::Struct { version })
463 .compute_size(&self.partition_data)?;
464 } else {
465 total_size +=
466 types::Array(types::Struct { version }).compute_size(&self.partition_data)?;
467 }
468 if version >= 9 {
469 let num_tagged_fields = self.unknown_tagged_fields.len();
470 if num_tagged_fields > std::u32::MAX as usize {
471 bail!(
472 "Too many tagged fields to encode ({} fields)",
473 num_tagged_fields
474 );
475 }
476 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
477
478 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
479 }
480 Ok(total_size)
481 }
482}
483
484#[cfg(feature = "broker")]
485impl Decodable for TopicProduceData {
486 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
487 if version < 0 || version > 11 {
488 bail!("specified version not supported by this message type");
489 }
490 let name = if version >= 9 {
491 types::CompactString.decode(buf)?
492 } else {
493 types::String.decode(buf)?
494 };
495 let partition_data = if version >= 9 {
496 types::CompactArray(types::Struct { version }).decode(buf)?
497 } else {
498 types::Array(types::Struct { version }).decode(buf)?
499 };
500 let mut unknown_tagged_fields = BTreeMap::new();
501 if version >= 9 {
502 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
503 for _ in 0..num_tagged_fields {
504 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
505 let size: u32 = types::UnsignedVarInt.decode(buf)?;
506 let unknown_value = buf.try_get_bytes(size as usize)?;
507 unknown_tagged_fields.insert(tag as i32, unknown_value);
508 }
509 }
510 Ok(Self {
511 name,
512 partition_data,
513 unknown_tagged_fields,
514 })
515 }
516}
517
518impl Default for TopicProduceData {
519 fn default() -> Self {
520 Self {
521 name: Default::default(),
522 partition_data: Default::default(),
523 unknown_tagged_fields: BTreeMap::new(),
524 }
525 }
526}
527
528impl Message for TopicProduceData {
529 const VERSIONS: VersionRange = VersionRange { min: 0, max: 11 };
530 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 6 });
531}
532
533impl HeaderVersion for ProduceRequest {
534 fn header_version(version: i16) -> i16 {
535 if version >= 9 {
536 2
537 } else {
538 1
539 }
540 }
541}