1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionProduceData {
24 pub index: i32,
28
29 pub records: Option<Bytes>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl PartitionProduceData {
39 pub fn with_index(mut self, value: i32) -> Self {
45 self.index = value;
46 self
47 }
48 pub fn with_records(mut self, value: Option<Bytes>) -> Self {
54 self.records = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for PartitionProduceData {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 3 || version > 13 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.index)?;
76 if version >= 9 {
77 types::CompactBytes.encode(buf, &self.records)?;
78 } else {
79 types::Bytes.encode(buf, &self.records)?;
80 }
81 if version >= 9 {
82 let num_tagged_fields = self.unknown_tagged_fields.len();
83 if num_tagged_fields > std::u32::MAX as usize {
84 bail!(
85 "Too many tagged fields to encode ({} fields)",
86 num_tagged_fields
87 );
88 }
89 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
90
91 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
92 }
93 Ok(())
94 }
95 fn compute_size(&self, version: i16) -> Result<usize> {
96 let mut total_size = 0;
97 total_size += types::Int32.compute_size(&self.index)?;
98 if version >= 9 {
99 total_size += types::CompactBytes.compute_size(&self.records)?;
100 } else {
101 total_size += types::Bytes.compute_size(&self.records)?;
102 }
103 if version >= 9 {
104 let num_tagged_fields = self.unknown_tagged_fields.len();
105 if num_tagged_fields > std::u32::MAX as usize {
106 bail!(
107 "Too many tagged fields to encode ({} fields)",
108 num_tagged_fields
109 );
110 }
111 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
112
113 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
114 }
115 Ok(total_size)
116 }
117}
118
119#[cfg(feature = "broker")]
120impl Decodable for PartitionProduceData {
121 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
122 if version < 3 || version > 13 {
123 bail!("specified version not supported by this message type");
124 }
125 let index = types::Int32.decode(buf)?;
126 let records = if version >= 9 {
127 types::CompactBytes.decode(buf)?
128 } else {
129 types::Bytes.decode(buf)?
130 };
131 let mut unknown_tagged_fields = BTreeMap::new();
132 if version >= 9 {
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 }
141 Ok(Self {
142 index,
143 records,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for PartitionProduceData {
150 fn default() -> Self {
151 Self {
152 index: 0,
153 records: Some(Default::default()),
154 unknown_tagged_fields: BTreeMap::new(),
155 }
156 }
157}
158
159impl Message for PartitionProduceData {
160 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
161 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
162}
163
164#[non_exhaustive]
166#[derive(Debug, Clone, PartialEq)]
167pub struct ProduceRequest {
168 pub transactional_id: Option<super::TransactionalId>,
172
173 pub acks: i16,
177
178 pub timeout_ms: i32,
182
183 pub topic_data: Vec<TopicProduceData>,
187
188 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
190}
191
192impl ProduceRequest {
193 pub fn with_transactional_id(mut self, value: Option<super::TransactionalId>) -> Self {
199 self.transactional_id = value;
200 self
201 }
202 pub fn with_acks(mut self, value: i16) -> Self {
208 self.acks = value;
209 self
210 }
211 pub fn with_timeout_ms(mut self, value: i32) -> Self {
217 self.timeout_ms = value;
218 self
219 }
220 pub fn with_topic_data(mut self, value: Vec<TopicProduceData>) -> Self {
226 self.topic_data = value;
227 self
228 }
229 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
231 self.unknown_tagged_fields = value;
232 self
233 }
234 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
236 self.unknown_tagged_fields.insert(key, value);
237 self
238 }
239}
240
241#[cfg(feature = "client")]
242impl Encodable for ProduceRequest {
243 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
244 if version < 3 || version > 13 {
245 bail!("specified version not supported by this message type");
246 }
247 if version >= 9 {
248 types::CompactString.encode(buf, &self.transactional_id)?;
249 } else {
250 types::String.encode(buf, &self.transactional_id)?;
251 }
252 types::Int16.encode(buf, &self.acks)?;
253 types::Int32.encode(buf, &self.timeout_ms)?;
254 if version >= 9 {
255 types::CompactArray(types::Struct { version }).encode(buf, &self.topic_data)?;
256 } else {
257 types::Array(types::Struct { version }).encode(buf, &self.topic_data)?;
258 }
259 if version >= 9 {
260 let num_tagged_fields = self.unknown_tagged_fields.len();
261 if num_tagged_fields > std::u32::MAX as usize {
262 bail!(
263 "Too many tagged fields to encode ({} fields)",
264 num_tagged_fields
265 );
266 }
267 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
268
269 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
270 }
271 Ok(())
272 }
273 fn compute_size(&self, version: i16) -> Result<usize> {
274 let mut total_size = 0;
275 if version >= 9 {
276 total_size += types::CompactString.compute_size(&self.transactional_id)?;
277 } else {
278 total_size += types::String.compute_size(&self.transactional_id)?;
279 }
280 total_size += types::Int16.compute_size(&self.acks)?;
281 total_size += types::Int32.compute_size(&self.timeout_ms)?;
282 if version >= 9 {
283 total_size +=
284 types::CompactArray(types::Struct { version }).compute_size(&self.topic_data)?;
285 } else {
286 total_size += types::Array(types::Struct { version }).compute_size(&self.topic_data)?;
287 }
288 if version >= 9 {
289 let num_tagged_fields = self.unknown_tagged_fields.len();
290 if num_tagged_fields > std::u32::MAX as usize {
291 bail!(
292 "Too many tagged fields to encode ({} fields)",
293 num_tagged_fields
294 );
295 }
296 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
297
298 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
299 }
300 Ok(total_size)
301 }
302}
303
304#[cfg(feature = "broker")]
305impl Decodable for ProduceRequest {
306 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
307 if version < 3 || version > 13 {
308 bail!("specified version not supported by this message type");
309 }
310 let transactional_id = if version >= 9 {
311 types::CompactString.decode(buf)?
312 } else {
313 types::String.decode(buf)?
314 };
315 let acks = types::Int16.decode(buf)?;
316 let timeout_ms = types::Int32.decode(buf)?;
317 let topic_data = if version >= 9 {
318 types::CompactArray(types::Struct { version }).decode(buf)?
319 } else {
320 types::Array(types::Struct { version }).decode(buf)?
321 };
322 let mut unknown_tagged_fields = BTreeMap::new();
323 if version >= 9 {
324 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
325 for _ in 0..num_tagged_fields {
326 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
327 let size: u32 = types::UnsignedVarInt.decode(buf)?;
328 let unknown_value = buf.try_get_bytes(size as usize)?;
329 unknown_tagged_fields.insert(tag as i32, unknown_value);
330 }
331 }
332 Ok(Self {
333 transactional_id,
334 acks,
335 timeout_ms,
336 topic_data,
337 unknown_tagged_fields,
338 })
339 }
340}
341
342impl Default for ProduceRequest {
343 fn default() -> Self {
344 Self {
345 transactional_id: None,
346 acks: 0,
347 timeout_ms: 0,
348 topic_data: Default::default(),
349 unknown_tagged_fields: BTreeMap::new(),
350 }
351 }
352}
353
354impl Message for ProduceRequest {
355 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
356 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
357}
358
359#[non_exhaustive]
361#[derive(Debug, Clone, PartialEq)]
362pub struct TopicProduceData {
363 pub name: super::TopicName,
367
368 pub topic_id: Uuid,
372
373 pub partition_data: Vec<PartitionProduceData>,
377
378 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
380}
381
382impl TopicProduceData {
383 pub fn with_name(mut self, value: super::TopicName) -> Self {
389 self.name = value;
390 self
391 }
392 pub fn with_topic_id(mut self, value: Uuid) -> Self {
398 self.topic_id = value;
399 self
400 }
401 pub fn with_partition_data(mut self, value: Vec<PartitionProduceData>) -> Self {
407 self.partition_data = value;
408 self
409 }
410 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
412 self.unknown_tagged_fields = value;
413 self
414 }
415 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
417 self.unknown_tagged_fields.insert(key, value);
418 self
419 }
420}
421
422#[cfg(feature = "client")]
423impl Encodable for TopicProduceData {
424 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
425 if version < 3 || version > 13 {
426 bail!("specified version not supported by this message type");
427 }
428 if version <= 12 {
429 if version >= 9 {
430 types::CompactString.encode(buf, &self.name)?;
431 } else {
432 types::String.encode(buf, &self.name)?;
433 }
434 }
435 if version >= 13 {
436 types::Uuid.encode(buf, &self.topic_id)?;
437 }
438 if version >= 9 {
439 types::CompactArray(types::Struct { version }).encode(buf, &self.partition_data)?;
440 } else {
441 types::Array(types::Struct { version }).encode(buf, &self.partition_data)?;
442 }
443 if version >= 9 {
444 let num_tagged_fields = self.unknown_tagged_fields.len();
445 if num_tagged_fields > std::u32::MAX as usize {
446 bail!(
447 "Too many tagged fields to encode ({} fields)",
448 num_tagged_fields
449 );
450 }
451 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
452
453 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
454 }
455 Ok(())
456 }
457 fn compute_size(&self, version: i16) -> Result<usize> {
458 let mut total_size = 0;
459 if version <= 12 {
460 if version >= 9 {
461 total_size += types::CompactString.compute_size(&self.name)?;
462 } else {
463 total_size += types::String.compute_size(&self.name)?;
464 }
465 }
466 if version >= 13 {
467 total_size += types::Uuid.compute_size(&self.topic_id)?;
468 }
469 if version >= 9 {
470 total_size += types::CompactArray(types::Struct { version })
471 .compute_size(&self.partition_data)?;
472 } else {
473 total_size +=
474 types::Array(types::Struct { version }).compute_size(&self.partition_data)?;
475 }
476 if version >= 9 {
477 let num_tagged_fields = self.unknown_tagged_fields.len();
478 if num_tagged_fields > std::u32::MAX as usize {
479 bail!(
480 "Too many tagged fields to encode ({} fields)",
481 num_tagged_fields
482 );
483 }
484 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
485
486 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
487 }
488 Ok(total_size)
489 }
490}
491
492#[cfg(feature = "broker")]
493impl Decodable for TopicProduceData {
494 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
495 if version < 3 || version > 13 {
496 bail!("specified version not supported by this message type");
497 }
498 let name = if version <= 12 {
499 if version >= 9 {
500 types::CompactString.decode(buf)?
501 } else {
502 types::String.decode(buf)?
503 }
504 } else {
505 Default::default()
506 };
507 let topic_id = if version >= 13 {
508 types::Uuid.decode(buf)?
509 } else {
510 Uuid::nil()
511 };
512 let partition_data = if version >= 9 {
513 types::CompactArray(types::Struct { version }).decode(buf)?
514 } else {
515 types::Array(types::Struct { version }).decode(buf)?
516 };
517 let mut unknown_tagged_fields = BTreeMap::new();
518 if version >= 9 {
519 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
520 for _ in 0..num_tagged_fields {
521 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
522 let size: u32 = types::UnsignedVarInt.decode(buf)?;
523 let unknown_value = buf.try_get_bytes(size as usize)?;
524 unknown_tagged_fields.insert(tag as i32, unknown_value);
525 }
526 }
527 Ok(Self {
528 name,
529 topic_id,
530 partition_data,
531 unknown_tagged_fields,
532 })
533 }
534}
535
536impl Default for TopicProduceData {
537 fn default() -> Self {
538 Self {
539 name: Default::default(),
540 topic_id: Uuid::nil(),
541 partition_data: Default::default(),
542 unknown_tagged_fields: BTreeMap::new(),
543 }
544 }
545}
546
547impl Message for TopicProduceData {
548 const VERSIONS: VersionRange = VersionRange { min: 3, max: 13 };
549 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
550}
551
552impl HeaderVersion for ProduceRequest {
553 fn header_version(version: i16) -> i16 {
554 if version >= 9 {
555 2
556 } else {
557 1
558 }
559 }
560}