1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct CreatableReplicaAssignment {
24 pub partition_index: i32,
28
29 pub broker_ids: Vec<super::BrokerId>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl CreatableReplicaAssignment {
39 pub fn with_partition_index(mut self, value: i32) -> Self {
45 self.partition_index = value;
46 self
47 }
48 pub fn with_broker_ids(mut self, value: Vec<super::BrokerId>) -> Self {
54 self.broker_ids = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for CreatableReplicaAssignment {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version < 0 || version > 7 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.partition_index)?;
76 if version >= 5 {
77 types::CompactArray(types::Int32).encode(buf, &self.broker_ids)?;
78 } else {
79 types::Array(types::Int32).encode(buf, &self.broker_ids)?;
80 }
81 if version >= 5 {
82 let num_tagged_fields = self.unknown_tagged_fields.len();
83 if num_tagged_fields > std::u32::MAX as usize {
84 bail!(
85 "Too many tagged fields to encode ({} fields)",
86 num_tagged_fields
87 );
88 }
89 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
90
91 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
92 }
93 Ok(())
94 }
95 fn compute_size(&self, version: i16) -> Result<usize> {
96 let mut total_size = 0;
97 total_size += types::Int32.compute_size(&self.partition_index)?;
98 if version >= 5 {
99 total_size += types::CompactArray(types::Int32).compute_size(&self.broker_ids)?;
100 } else {
101 total_size += types::Array(types::Int32).compute_size(&self.broker_ids)?;
102 }
103 if version >= 5 {
104 let num_tagged_fields = self.unknown_tagged_fields.len();
105 if num_tagged_fields > std::u32::MAX as usize {
106 bail!(
107 "Too many tagged fields to encode ({} fields)",
108 num_tagged_fields
109 );
110 }
111 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
112
113 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
114 }
115 Ok(total_size)
116 }
117}
118
119#[cfg(feature = "broker")]
120impl Decodable for CreatableReplicaAssignment {
121 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
122 if version < 0 || version > 7 {
123 bail!("specified version not supported by this message type");
124 }
125 let partition_index = types::Int32.decode(buf)?;
126 let broker_ids = if version >= 5 {
127 types::CompactArray(types::Int32).decode(buf)?
128 } else {
129 types::Array(types::Int32).decode(buf)?
130 };
131 let mut unknown_tagged_fields = BTreeMap::new();
132 if version >= 5 {
133 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
134 for _ in 0..num_tagged_fields {
135 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
136 let size: u32 = types::UnsignedVarInt.decode(buf)?;
137 let unknown_value = buf.try_get_bytes(size as usize)?;
138 unknown_tagged_fields.insert(tag as i32, unknown_value);
139 }
140 }
141 Ok(Self {
142 partition_index,
143 broker_ids,
144 unknown_tagged_fields,
145 })
146 }
147}
148
149impl Default for CreatableReplicaAssignment {
150 fn default() -> Self {
151 Self {
152 partition_index: 0,
153 broker_ids: Default::default(),
154 unknown_tagged_fields: BTreeMap::new(),
155 }
156 }
157}
158
159impl Message for CreatableReplicaAssignment {
160 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
161 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
162}
163
164#[non_exhaustive]
166#[derive(Debug, Clone, PartialEq)]
167pub struct CreatableTopic {
168 pub name: super::TopicName,
172
173 pub num_partitions: i32,
177
178 pub replication_factor: i16,
182
183 pub assignments: Vec<CreatableReplicaAssignment>,
187
188 pub configs: Vec<CreatableTopicConfig>,
192
193 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
195}
196
197impl CreatableTopic {
198 pub fn with_name(mut self, value: super::TopicName) -> Self {
204 self.name = value;
205 self
206 }
207 pub fn with_num_partitions(mut self, value: i32) -> Self {
213 self.num_partitions = value;
214 self
215 }
216 pub fn with_replication_factor(mut self, value: i16) -> Self {
222 self.replication_factor = value;
223 self
224 }
225 pub fn with_assignments(mut self, value: Vec<CreatableReplicaAssignment>) -> Self {
231 self.assignments = value;
232 self
233 }
234 pub fn with_configs(mut self, value: Vec<CreatableTopicConfig>) -> Self {
240 self.configs = value;
241 self
242 }
243 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
245 self.unknown_tagged_fields = value;
246 self
247 }
248 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
250 self.unknown_tagged_fields.insert(key, value);
251 self
252 }
253}
254
255#[cfg(feature = "client")]
256impl Encodable for CreatableTopic {
257 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
258 if version < 0 || version > 7 {
259 bail!("specified version not supported by this message type");
260 }
261 if version >= 5 {
262 types::CompactString.encode(buf, &self.name)?;
263 } else {
264 types::String.encode(buf, &self.name)?;
265 }
266 types::Int32.encode(buf, &self.num_partitions)?;
267 types::Int16.encode(buf, &self.replication_factor)?;
268 if version >= 5 {
269 types::CompactArray(types::Struct { version }).encode(buf, &self.assignments)?;
270 } else {
271 types::Array(types::Struct { version }).encode(buf, &self.assignments)?;
272 }
273 if version >= 5 {
274 types::CompactArray(types::Struct { version }).encode(buf, &self.configs)?;
275 } else {
276 types::Array(types::Struct { version }).encode(buf, &self.configs)?;
277 }
278 if version >= 5 {
279 let num_tagged_fields = self.unknown_tagged_fields.len();
280 if num_tagged_fields > std::u32::MAX as usize {
281 bail!(
282 "Too many tagged fields to encode ({} fields)",
283 num_tagged_fields
284 );
285 }
286 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
287
288 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
289 }
290 Ok(())
291 }
292 fn compute_size(&self, version: i16) -> Result<usize> {
293 let mut total_size = 0;
294 if version >= 5 {
295 total_size += types::CompactString.compute_size(&self.name)?;
296 } else {
297 total_size += types::String.compute_size(&self.name)?;
298 }
299 total_size += types::Int32.compute_size(&self.num_partitions)?;
300 total_size += types::Int16.compute_size(&self.replication_factor)?;
301 if version >= 5 {
302 total_size +=
303 types::CompactArray(types::Struct { version }).compute_size(&self.assignments)?;
304 } else {
305 total_size +=
306 types::Array(types::Struct { version }).compute_size(&self.assignments)?;
307 }
308 if version >= 5 {
309 total_size +=
310 types::CompactArray(types::Struct { version }).compute_size(&self.configs)?;
311 } else {
312 total_size += types::Array(types::Struct { version }).compute_size(&self.configs)?;
313 }
314 if version >= 5 {
315 let num_tagged_fields = self.unknown_tagged_fields.len();
316 if num_tagged_fields > std::u32::MAX as usize {
317 bail!(
318 "Too many tagged fields to encode ({} fields)",
319 num_tagged_fields
320 );
321 }
322 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
323
324 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
325 }
326 Ok(total_size)
327 }
328}
329
330#[cfg(feature = "broker")]
331impl Decodable for CreatableTopic {
332 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
333 if version < 0 || version > 7 {
334 bail!("specified version not supported by this message type");
335 }
336 let name = if version >= 5 {
337 types::CompactString.decode(buf)?
338 } else {
339 types::String.decode(buf)?
340 };
341 let num_partitions = types::Int32.decode(buf)?;
342 let replication_factor = types::Int16.decode(buf)?;
343 let assignments = if version >= 5 {
344 types::CompactArray(types::Struct { version }).decode(buf)?
345 } else {
346 types::Array(types::Struct { version }).decode(buf)?
347 };
348 let configs = if version >= 5 {
349 types::CompactArray(types::Struct { version }).decode(buf)?
350 } else {
351 types::Array(types::Struct { version }).decode(buf)?
352 };
353 let mut unknown_tagged_fields = BTreeMap::new();
354 if version >= 5 {
355 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
356 for _ in 0..num_tagged_fields {
357 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
358 let size: u32 = types::UnsignedVarInt.decode(buf)?;
359 let unknown_value = buf.try_get_bytes(size as usize)?;
360 unknown_tagged_fields.insert(tag as i32, unknown_value);
361 }
362 }
363 Ok(Self {
364 name,
365 num_partitions,
366 replication_factor,
367 assignments,
368 configs,
369 unknown_tagged_fields,
370 })
371 }
372}
373
374impl Default for CreatableTopic {
375 fn default() -> Self {
376 Self {
377 name: Default::default(),
378 num_partitions: 0,
379 replication_factor: 0,
380 assignments: Default::default(),
381 configs: Default::default(),
382 unknown_tagged_fields: BTreeMap::new(),
383 }
384 }
385}
386
387impl Message for CreatableTopic {
388 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
389 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
390}
391
392#[non_exhaustive]
394#[derive(Debug, Clone, PartialEq)]
395pub struct CreatableTopicConfig {
396 pub name: StrBytes,
400
401 pub value: Option<StrBytes>,
405
406 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
408}
409
410impl CreatableTopicConfig {
411 pub fn with_name(mut self, value: StrBytes) -> Self {
417 self.name = value;
418 self
419 }
420 pub fn with_value(mut self, value: Option<StrBytes>) -> Self {
426 self.value = value;
427 self
428 }
429 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
431 self.unknown_tagged_fields = value;
432 self
433 }
434 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
436 self.unknown_tagged_fields.insert(key, value);
437 self
438 }
439}
440
441#[cfg(feature = "client")]
442impl Encodable for CreatableTopicConfig {
443 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
444 if version < 0 || version > 7 {
445 bail!("specified version not supported by this message type");
446 }
447 if version >= 5 {
448 types::CompactString.encode(buf, &self.name)?;
449 } else {
450 types::String.encode(buf, &self.name)?;
451 }
452 if version >= 5 {
453 types::CompactString.encode(buf, &self.value)?;
454 } else {
455 types::String.encode(buf, &self.value)?;
456 }
457 if version >= 5 {
458 let num_tagged_fields = self.unknown_tagged_fields.len();
459 if num_tagged_fields > std::u32::MAX as usize {
460 bail!(
461 "Too many tagged fields to encode ({} fields)",
462 num_tagged_fields
463 );
464 }
465 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
466
467 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
468 }
469 Ok(())
470 }
471 fn compute_size(&self, version: i16) -> Result<usize> {
472 let mut total_size = 0;
473 if version >= 5 {
474 total_size += types::CompactString.compute_size(&self.name)?;
475 } else {
476 total_size += types::String.compute_size(&self.name)?;
477 }
478 if version >= 5 {
479 total_size += types::CompactString.compute_size(&self.value)?;
480 } else {
481 total_size += types::String.compute_size(&self.value)?;
482 }
483 if version >= 5 {
484 let num_tagged_fields = self.unknown_tagged_fields.len();
485 if num_tagged_fields > std::u32::MAX as usize {
486 bail!(
487 "Too many tagged fields to encode ({} fields)",
488 num_tagged_fields
489 );
490 }
491 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
492
493 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
494 }
495 Ok(total_size)
496 }
497}
498
499#[cfg(feature = "broker")]
500impl Decodable for CreatableTopicConfig {
501 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
502 if version < 0 || version > 7 {
503 bail!("specified version not supported by this message type");
504 }
505 let name = if version >= 5 {
506 types::CompactString.decode(buf)?
507 } else {
508 types::String.decode(buf)?
509 };
510 let value = if version >= 5 {
511 types::CompactString.decode(buf)?
512 } else {
513 types::String.decode(buf)?
514 };
515 let mut unknown_tagged_fields = BTreeMap::new();
516 if version >= 5 {
517 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
518 for _ in 0..num_tagged_fields {
519 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
520 let size: u32 = types::UnsignedVarInt.decode(buf)?;
521 let unknown_value = buf.try_get_bytes(size as usize)?;
522 unknown_tagged_fields.insert(tag as i32, unknown_value);
523 }
524 }
525 Ok(Self {
526 name,
527 value,
528 unknown_tagged_fields,
529 })
530 }
531}
532
533impl Default for CreatableTopicConfig {
534 fn default() -> Self {
535 Self {
536 name: Default::default(),
537 value: Some(Default::default()),
538 unknown_tagged_fields: BTreeMap::new(),
539 }
540 }
541}
542
543impl Message for CreatableTopicConfig {
544 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
545 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
546}
547
548#[non_exhaustive]
550#[derive(Debug, Clone, PartialEq)]
551pub struct CreateTopicsRequest {
552 pub topics: Vec<CreatableTopic>,
556
557 pub timeout_ms: i32,
561
562 pub validate_only: bool,
566
567 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
569}
570
571impl CreateTopicsRequest {
572 pub fn with_topics(mut self, value: Vec<CreatableTopic>) -> Self {
578 self.topics = value;
579 self
580 }
581 pub fn with_timeout_ms(mut self, value: i32) -> Self {
587 self.timeout_ms = value;
588 self
589 }
590 pub fn with_validate_only(mut self, value: bool) -> Self {
596 self.validate_only = value;
597 self
598 }
599 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
601 self.unknown_tagged_fields = value;
602 self
603 }
604 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
606 self.unknown_tagged_fields.insert(key, value);
607 self
608 }
609}
610
611#[cfg(feature = "client")]
612impl Encodable for CreateTopicsRequest {
613 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
614 if version < 0 || version > 7 {
615 bail!("specified version not supported by this message type");
616 }
617 if version >= 5 {
618 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
619 } else {
620 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
621 }
622 types::Int32.encode(buf, &self.timeout_ms)?;
623 if version >= 1 {
624 types::Boolean.encode(buf, &self.validate_only)?;
625 } else {
626 if self.validate_only {
627 bail!("A field is set that is not available on the selected protocol version");
628 }
629 }
630 if version >= 5 {
631 let num_tagged_fields = self.unknown_tagged_fields.len();
632 if num_tagged_fields > std::u32::MAX as usize {
633 bail!(
634 "Too many tagged fields to encode ({} fields)",
635 num_tagged_fields
636 );
637 }
638 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
639
640 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
641 }
642 Ok(())
643 }
644 fn compute_size(&self, version: i16) -> Result<usize> {
645 let mut total_size = 0;
646 if version >= 5 {
647 total_size +=
648 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
649 } else {
650 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
651 }
652 total_size += types::Int32.compute_size(&self.timeout_ms)?;
653 if version >= 1 {
654 total_size += types::Boolean.compute_size(&self.validate_only)?;
655 } else {
656 if self.validate_only {
657 bail!("A field is set that is not available on the selected protocol version");
658 }
659 }
660 if version >= 5 {
661 let num_tagged_fields = self.unknown_tagged_fields.len();
662 if num_tagged_fields > std::u32::MAX as usize {
663 bail!(
664 "Too many tagged fields to encode ({} fields)",
665 num_tagged_fields
666 );
667 }
668 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
669
670 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
671 }
672 Ok(total_size)
673 }
674}
675
676#[cfg(feature = "broker")]
677impl Decodable for CreateTopicsRequest {
678 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
679 if version < 0 || version > 7 {
680 bail!("specified version not supported by this message type");
681 }
682 let topics = if version >= 5 {
683 types::CompactArray(types::Struct { version }).decode(buf)?
684 } else {
685 types::Array(types::Struct { version }).decode(buf)?
686 };
687 let timeout_ms = types::Int32.decode(buf)?;
688 let validate_only = if version >= 1 {
689 types::Boolean.decode(buf)?
690 } else {
691 false
692 };
693 let mut unknown_tagged_fields = BTreeMap::new();
694 if version >= 5 {
695 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
696 for _ in 0..num_tagged_fields {
697 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
698 let size: u32 = types::UnsignedVarInt.decode(buf)?;
699 let unknown_value = buf.try_get_bytes(size as usize)?;
700 unknown_tagged_fields.insert(tag as i32, unknown_value);
701 }
702 }
703 Ok(Self {
704 topics,
705 timeout_ms,
706 validate_only,
707 unknown_tagged_fields,
708 })
709 }
710}
711
712impl Default for CreateTopicsRequest {
713 fn default() -> Self {
714 Self {
715 topics: Default::default(),
716 timeout_ms: 60000,
717 validate_only: false,
718 unknown_tagged_fields: BTreeMap::new(),
719 }
720 }
721}
722
723impl Message for CreateTopicsRequest {
724 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
725 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
726}
727
728impl HeaderVersion for CreateTopicsRequest {
729 fn header_version(version: i16) -> i16 {
730 if version >= 5 {
731 2
732 } else {
733 1
734 }
735 }
736}