1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct CreatableReplicaAssignment {
24 pub partition_index: i32,
28
29 pub broker_ids: Vec<super::BrokerId>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl CreatableReplicaAssignment {
39 pub fn with_partition_index(mut self, value: i32) -> Self {
45 self.partition_index = value;
46 self
47 }
48 pub fn with_broker_ids(mut self, value: Vec<super::BrokerId>) -> Self {
54 self.broker_ids = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for CreatableReplicaAssignment {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 types::Int32.encode(buf, &self.partition_index)?;
73 if version >= 5 {
74 types::CompactArray(types::Int32).encode(buf, &self.broker_ids)?;
75 } else {
76 types::Array(types::Int32).encode(buf, &self.broker_ids)?;
77 }
78 if version >= 5 {
79 let num_tagged_fields = self.unknown_tagged_fields.len();
80 if num_tagged_fields > std::u32::MAX as usize {
81 bail!(
82 "Too many tagged fields to encode ({} fields)",
83 num_tagged_fields
84 );
85 }
86 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
87
88 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
89 }
90 Ok(())
91 }
92 fn compute_size(&self, version: i16) -> Result<usize> {
93 let mut total_size = 0;
94 total_size += types::Int32.compute_size(&self.partition_index)?;
95 if version >= 5 {
96 total_size += types::CompactArray(types::Int32).compute_size(&self.broker_ids)?;
97 } else {
98 total_size += types::Array(types::Int32).compute_size(&self.broker_ids)?;
99 }
100 if version >= 5 {
101 let num_tagged_fields = self.unknown_tagged_fields.len();
102 if num_tagged_fields > std::u32::MAX as usize {
103 bail!(
104 "Too many tagged fields to encode ({} fields)",
105 num_tagged_fields
106 );
107 }
108 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
109
110 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
111 }
112 Ok(total_size)
113 }
114}
115
116#[cfg(feature = "broker")]
117impl Decodable for CreatableReplicaAssignment {
118 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
119 let partition_index = types::Int32.decode(buf)?;
120 let broker_ids = if version >= 5 {
121 types::CompactArray(types::Int32).decode(buf)?
122 } else {
123 types::Array(types::Int32).decode(buf)?
124 };
125 let mut unknown_tagged_fields = BTreeMap::new();
126 if version >= 5 {
127 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
128 for _ in 0..num_tagged_fields {
129 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
130 let size: u32 = types::UnsignedVarInt.decode(buf)?;
131 let unknown_value = buf.try_get_bytes(size as usize)?;
132 unknown_tagged_fields.insert(tag as i32, unknown_value);
133 }
134 }
135 Ok(Self {
136 partition_index,
137 broker_ids,
138 unknown_tagged_fields,
139 })
140 }
141}
142
143impl Default for CreatableReplicaAssignment {
144 fn default() -> Self {
145 Self {
146 partition_index: 0,
147 broker_ids: Default::default(),
148 unknown_tagged_fields: BTreeMap::new(),
149 }
150 }
151}
152
153impl Message for CreatableReplicaAssignment {
154 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
155 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
156}
157
158#[non_exhaustive]
160#[derive(Debug, Clone, PartialEq)]
161pub struct CreatableTopic {
162 pub name: super::TopicName,
166
167 pub num_partitions: i32,
171
172 pub replication_factor: i16,
176
177 pub assignments: Vec<CreatableReplicaAssignment>,
181
182 pub configs: Vec<CreateableTopicConfig>,
186
187 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
189}
190
191impl CreatableTopic {
192 pub fn with_name(mut self, value: super::TopicName) -> Self {
198 self.name = value;
199 self
200 }
201 pub fn with_num_partitions(mut self, value: i32) -> Self {
207 self.num_partitions = value;
208 self
209 }
210 pub fn with_replication_factor(mut self, value: i16) -> Self {
216 self.replication_factor = value;
217 self
218 }
219 pub fn with_assignments(mut self, value: Vec<CreatableReplicaAssignment>) -> Self {
225 self.assignments = value;
226 self
227 }
228 pub fn with_configs(mut self, value: Vec<CreateableTopicConfig>) -> Self {
234 self.configs = value;
235 self
236 }
237 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
239 self.unknown_tagged_fields = value;
240 self
241 }
242 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
244 self.unknown_tagged_fields.insert(key, value);
245 self
246 }
247}
248
249#[cfg(feature = "client")]
250impl Encodable for CreatableTopic {
251 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
252 if version >= 5 {
253 types::CompactString.encode(buf, &self.name)?;
254 } else {
255 types::String.encode(buf, &self.name)?;
256 }
257 types::Int32.encode(buf, &self.num_partitions)?;
258 types::Int16.encode(buf, &self.replication_factor)?;
259 if version >= 5 {
260 types::CompactArray(types::Struct { version }).encode(buf, &self.assignments)?;
261 } else {
262 types::Array(types::Struct { version }).encode(buf, &self.assignments)?;
263 }
264 if version >= 5 {
265 types::CompactArray(types::Struct { version }).encode(buf, &self.configs)?;
266 } else {
267 types::Array(types::Struct { version }).encode(buf, &self.configs)?;
268 }
269 if version >= 5 {
270 let num_tagged_fields = self.unknown_tagged_fields.len();
271 if num_tagged_fields > std::u32::MAX as usize {
272 bail!(
273 "Too many tagged fields to encode ({} fields)",
274 num_tagged_fields
275 );
276 }
277 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
278
279 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
280 }
281 Ok(())
282 }
283 fn compute_size(&self, version: i16) -> Result<usize> {
284 let mut total_size = 0;
285 if version >= 5 {
286 total_size += types::CompactString.compute_size(&self.name)?;
287 } else {
288 total_size += types::String.compute_size(&self.name)?;
289 }
290 total_size += types::Int32.compute_size(&self.num_partitions)?;
291 total_size += types::Int16.compute_size(&self.replication_factor)?;
292 if version >= 5 {
293 total_size +=
294 types::CompactArray(types::Struct { version }).compute_size(&self.assignments)?;
295 } else {
296 total_size +=
297 types::Array(types::Struct { version }).compute_size(&self.assignments)?;
298 }
299 if version >= 5 {
300 total_size +=
301 types::CompactArray(types::Struct { version }).compute_size(&self.configs)?;
302 } else {
303 total_size += types::Array(types::Struct { version }).compute_size(&self.configs)?;
304 }
305 if version >= 5 {
306 let num_tagged_fields = self.unknown_tagged_fields.len();
307 if num_tagged_fields > std::u32::MAX as usize {
308 bail!(
309 "Too many tagged fields to encode ({} fields)",
310 num_tagged_fields
311 );
312 }
313 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
314
315 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
316 }
317 Ok(total_size)
318 }
319}
320
321#[cfg(feature = "broker")]
322impl Decodable for CreatableTopic {
323 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
324 let name = if version >= 5 {
325 types::CompactString.decode(buf)?
326 } else {
327 types::String.decode(buf)?
328 };
329 let num_partitions = types::Int32.decode(buf)?;
330 let replication_factor = types::Int16.decode(buf)?;
331 let assignments = if version >= 5 {
332 types::CompactArray(types::Struct { version }).decode(buf)?
333 } else {
334 types::Array(types::Struct { version }).decode(buf)?
335 };
336 let configs = if version >= 5 {
337 types::CompactArray(types::Struct { version }).decode(buf)?
338 } else {
339 types::Array(types::Struct { version }).decode(buf)?
340 };
341 let mut unknown_tagged_fields = BTreeMap::new();
342 if version >= 5 {
343 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
344 for _ in 0..num_tagged_fields {
345 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
346 let size: u32 = types::UnsignedVarInt.decode(buf)?;
347 let unknown_value = buf.try_get_bytes(size as usize)?;
348 unknown_tagged_fields.insert(tag as i32, unknown_value);
349 }
350 }
351 Ok(Self {
352 name,
353 num_partitions,
354 replication_factor,
355 assignments,
356 configs,
357 unknown_tagged_fields,
358 })
359 }
360}
361
362impl Default for CreatableTopic {
363 fn default() -> Self {
364 Self {
365 name: Default::default(),
366 num_partitions: 0,
367 replication_factor: 0,
368 assignments: Default::default(),
369 configs: Default::default(),
370 unknown_tagged_fields: BTreeMap::new(),
371 }
372 }
373}
374
375impl Message for CreatableTopic {
376 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
377 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
378}
379
380#[non_exhaustive]
382#[derive(Debug, Clone, PartialEq)]
383pub struct CreateTopicsRequest {
384 pub topics: Vec<CreatableTopic>,
388
389 pub timeout_ms: i32,
393
394 pub validate_only: bool,
398
399 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
401}
402
403impl CreateTopicsRequest {
404 pub fn with_topics(mut self, value: Vec<CreatableTopic>) -> Self {
410 self.topics = value;
411 self
412 }
413 pub fn with_timeout_ms(mut self, value: i32) -> Self {
419 self.timeout_ms = value;
420 self
421 }
422 pub fn with_validate_only(mut self, value: bool) -> Self {
428 self.validate_only = value;
429 self
430 }
431 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
433 self.unknown_tagged_fields = value;
434 self
435 }
436 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
438 self.unknown_tagged_fields.insert(key, value);
439 self
440 }
441}
442
443#[cfg(feature = "client")]
444impl Encodable for CreateTopicsRequest {
445 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
446 if version >= 5 {
447 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
448 } else {
449 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
450 }
451 types::Int32.encode(buf, &self.timeout_ms)?;
452 if version >= 1 {
453 types::Boolean.encode(buf, &self.validate_only)?;
454 } else {
455 if self.validate_only {
456 bail!("A field is set that is not available on the selected protocol version");
457 }
458 }
459 if version >= 5 {
460 let num_tagged_fields = self.unknown_tagged_fields.len();
461 if num_tagged_fields > std::u32::MAX as usize {
462 bail!(
463 "Too many tagged fields to encode ({} fields)",
464 num_tagged_fields
465 );
466 }
467 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
468
469 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
470 }
471 Ok(())
472 }
473 fn compute_size(&self, version: i16) -> Result<usize> {
474 let mut total_size = 0;
475 if version >= 5 {
476 total_size +=
477 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
478 } else {
479 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
480 }
481 total_size += types::Int32.compute_size(&self.timeout_ms)?;
482 if version >= 1 {
483 total_size += types::Boolean.compute_size(&self.validate_only)?;
484 } else {
485 if self.validate_only {
486 bail!("A field is set that is not available on the selected protocol version");
487 }
488 }
489 if version >= 5 {
490 let num_tagged_fields = self.unknown_tagged_fields.len();
491 if num_tagged_fields > std::u32::MAX as usize {
492 bail!(
493 "Too many tagged fields to encode ({} fields)",
494 num_tagged_fields
495 );
496 }
497 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
498
499 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
500 }
501 Ok(total_size)
502 }
503}
504
505#[cfg(feature = "broker")]
506impl Decodable for CreateTopicsRequest {
507 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
508 let topics = if version >= 5 {
509 types::CompactArray(types::Struct { version }).decode(buf)?
510 } else {
511 types::Array(types::Struct { version }).decode(buf)?
512 };
513 let timeout_ms = types::Int32.decode(buf)?;
514 let validate_only = if version >= 1 {
515 types::Boolean.decode(buf)?
516 } else {
517 false
518 };
519 let mut unknown_tagged_fields = BTreeMap::new();
520 if version >= 5 {
521 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
522 for _ in 0..num_tagged_fields {
523 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
524 let size: u32 = types::UnsignedVarInt.decode(buf)?;
525 let unknown_value = buf.try_get_bytes(size as usize)?;
526 unknown_tagged_fields.insert(tag as i32, unknown_value);
527 }
528 }
529 Ok(Self {
530 topics,
531 timeout_ms,
532 validate_only,
533 unknown_tagged_fields,
534 })
535 }
536}
537
538impl Default for CreateTopicsRequest {
539 fn default() -> Self {
540 Self {
541 topics: Default::default(),
542 timeout_ms: 60000,
543 validate_only: false,
544 unknown_tagged_fields: BTreeMap::new(),
545 }
546 }
547}
548
549impl Message for CreateTopicsRequest {
550 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
551 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
552}
553
554#[non_exhaustive]
556#[derive(Debug, Clone, PartialEq)]
557pub struct CreateableTopicConfig {
558 pub name: StrBytes,
562
563 pub value: Option<StrBytes>,
567
568 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
570}
571
572impl CreateableTopicConfig {
573 pub fn with_name(mut self, value: StrBytes) -> Self {
579 self.name = value;
580 self
581 }
582 pub fn with_value(mut self, value: Option<StrBytes>) -> Self {
588 self.value = value;
589 self
590 }
591 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
593 self.unknown_tagged_fields = value;
594 self
595 }
596 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
598 self.unknown_tagged_fields.insert(key, value);
599 self
600 }
601}
602
603#[cfg(feature = "client")]
604impl Encodable for CreateableTopicConfig {
605 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
606 if version >= 5 {
607 types::CompactString.encode(buf, &self.name)?;
608 } else {
609 types::String.encode(buf, &self.name)?;
610 }
611 if version >= 5 {
612 types::CompactString.encode(buf, &self.value)?;
613 } else {
614 types::String.encode(buf, &self.value)?;
615 }
616 if version >= 5 {
617 let num_tagged_fields = self.unknown_tagged_fields.len();
618 if num_tagged_fields > std::u32::MAX as usize {
619 bail!(
620 "Too many tagged fields to encode ({} fields)",
621 num_tagged_fields
622 );
623 }
624 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
625
626 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
627 }
628 Ok(())
629 }
630 fn compute_size(&self, version: i16) -> Result<usize> {
631 let mut total_size = 0;
632 if version >= 5 {
633 total_size += types::CompactString.compute_size(&self.name)?;
634 } else {
635 total_size += types::String.compute_size(&self.name)?;
636 }
637 if version >= 5 {
638 total_size += types::CompactString.compute_size(&self.value)?;
639 } else {
640 total_size += types::String.compute_size(&self.value)?;
641 }
642 if version >= 5 {
643 let num_tagged_fields = self.unknown_tagged_fields.len();
644 if num_tagged_fields > std::u32::MAX as usize {
645 bail!(
646 "Too many tagged fields to encode ({} fields)",
647 num_tagged_fields
648 );
649 }
650 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
651
652 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
653 }
654 Ok(total_size)
655 }
656}
657
658#[cfg(feature = "broker")]
659impl Decodable for CreateableTopicConfig {
660 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
661 let name = if version >= 5 {
662 types::CompactString.decode(buf)?
663 } else {
664 types::String.decode(buf)?
665 };
666 let value = if version >= 5 {
667 types::CompactString.decode(buf)?
668 } else {
669 types::String.decode(buf)?
670 };
671 let mut unknown_tagged_fields = BTreeMap::new();
672 if version >= 5 {
673 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
674 for _ in 0..num_tagged_fields {
675 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
676 let size: u32 = types::UnsignedVarInt.decode(buf)?;
677 let unknown_value = buf.try_get_bytes(size as usize)?;
678 unknown_tagged_fields.insert(tag as i32, unknown_value);
679 }
680 }
681 Ok(Self {
682 name,
683 value,
684 unknown_tagged_fields,
685 })
686 }
687}
688
689impl Default for CreateableTopicConfig {
690 fn default() -> Self {
691 Self {
692 name: Default::default(),
693 value: Some(Default::default()),
694 unknown_tagged_fields: BTreeMap::new(),
695 }
696 }
697}
698
699impl Message for CreateableTopicConfig {
700 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
701 const DEPRECATED_VERSIONS: Option<VersionRange> = Some(VersionRange { min: 0, max: 1 });
702}
703
704impl HeaderVersion for CreateTopicsRequest {
705 fn header_version(version: i16) -> i16 {
706 if version >= 5 {
707 2
708 } else {
709 1
710 }
711 }
712}