1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct CreatableTopicConfigs {
24 pub name: StrBytes,
28
29 pub value: Option<StrBytes>,
33
34 pub read_only: bool,
38
39 pub config_source: i8,
43
44 pub is_sensitive: bool,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl CreatableTopicConfigs {
54 pub fn with_name(mut self, value: StrBytes) -> Self {
60 self.name = value;
61 self
62 }
63 pub fn with_value(mut self, value: Option<StrBytes>) -> Self {
69 self.value = value;
70 self
71 }
72 pub fn with_read_only(mut self, value: bool) -> Self {
78 self.read_only = value;
79 self
80 }
81 pub fn with_config_source(mut self, value: i8) -> Self {
87 self.config_source = value;
88 self
89 }
90 pub fn with_is_sensitive(mut self, value: bool) -> Self {
96 self.is_sensitive = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "broker")]
112impl Encodable for CreatableTopicConfigs {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 0 || version > 7 {
115 bail!("specified version not supported by this message type");
116 }
117 if version >= 5 {
118 types::CompactString.encode(buf, &self.name)?;
119 } else {
120 if !self.name.is_empty() {
121 bail!("A field is set that is not available on the selected protocol version");
122 }
123 }
124 if version >= 5 {
125 types::CompactString.encode(buf, &self.value)?;
126 } else {
127 if !self
128 .value
129 .as_ref()
130 .map(|x| x.is_empty())
131 .unwrap_or_default()
132 {
133 bail!("A field is set that is not available on the selected protocol version");
134 }
135 }
136 if version >= 5 {
137 types::Boolean.encode(buf, &self.read_only)?;
138 } else {
139 if self.read_only {
140 bail!("A field is set that is not available on the selected protocol version");
141 }
142 }
143 if version >= 5 {
144 types::Int8.encode(buf, &self.config_source)?;
145 }
146 if version >= 5 {
147 types::Boolean.encode(buf, &self.is_sensitive)?;
148 } else {
149 if self.is_sensitive {
150 bail!("A field is set that is not available on the selected protocol version");
151 }
152 }
153 if version >= 5 {
154 let num_tagged_fields = self.unknown_tagged_fields.len();
155 if num_tagged_fields > std::u32::MAX as usize {
156 bail!(
157 "Too many tagged fields to encode ({} fields)",
158 num_tagged_fields
159 );
160 }
161 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
162
163 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
164 }
165 Ok(())
166 }
167 fn compute_size(&self, version: i16) -> Result<usize> {
168 let mut total_size = 0;
169 if version >= 5 {
170 total_size += types::CompactString.compute_size(&self.name)?;
171 } else {
172 if !self.name.is_empty() {
173 bail!("A field is set that is not available on the selected protocol version");
174 }
175 }
176 if version >= 5 {
177 total_size += types::CompactString.compute_size(&self.value)?;
178 } else {
179 if !self
180 .value
181 .as_ref()
182 .map(|x| x.is_empty())
183 .unwrap_or_default()
184 {
185 bail!("A field is set that is not available on the selected protocol version");
186 }
187 }
188 if version >= 5 {
189 total_size += types::Boolean.compute_size(&self.read_only)?;
190 } else {
191 if self.read_only {
192 bail!("A field is set that is not available on the selected protocol version");
193 }
194 }
195 if version >= 5 {
196 total_size += types::Int8.compute_size(&self.config_source)?;
197 }
198 if version >= 5 {
199 total_size += types::Boolean.compute_size(&self.is_sensitive)?;
200 } else {
201 if self.is_sensitive {
202 bail!("A field is set that is not available on the selected protocol version");
203 }
204 }
205 if version >= 5 {
206 let num_tagged_fields = self.unknown_tagged_fields.len();
207 if num_tagged_fields > std::u32::MAX as usize {
208 bail!(
209 "Too many tagged fields to encode ({} fields)",
210 num_tagged_fields
211 );
212 }
213 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
214
215 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
216 }
217 Ok(total_size)
218 }
219}
220
221#[cfg(feature = "client")]
222impl Decodable for CreatableTopicConfigs {
223 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
224 if version < 0 || version > 7 {
225 bail!("specified version not supported by this message type");
226 }
227 let name = if version >= 5 {
228 types::CompactString.decode(buf)?
229 } else {
230 Default::default()
231 };
232 let value = if version >= 5 {
233 types::CompactString.decode(buf)?
234 } else {
235 Some(Default::default())
236 };
237 let read_only = if version >= 5 {
238 types::Boolean.decode(buf)?
239 } else {
240 false
241 };
242 let config_source = if version >= 5 {
243 types::Int8.decode(buf)?
244 } else {
245 -1
246 };
247 let is_sensitive = if version >= 5 {
248 types::Boolean.decode(buf)?
249 } else {
250 false
251 };
252 let mut unknown_tagged_fields = BTreeMap::new();
253 if version >= 5 {
254 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
255 for _ in 0..num_tagged_fields {
256 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
257 let size: u32 = types::UnsignedVarInt.decode(buf)?;
258 let unknown_value = buf.try_get_bytes(size as usize)?;
259 unknown_tagged_fields.insert(tag as i32, unknown_value);
260 }
261 }
262 Ok(Self {
263 name,
264 value,
265 read_only,
266 config_source,
267 is_sensitive,
268 unknown_tagged_fields,
269 })
270 }
271}
272
273impl Default for CreatableTopicConfigs {
274 fn default() -> Self {
275 Self {
276 name: Default::default(),
277 value: Some(Default::default()),
278 read_only: false,
279 config_source: -1,
280 is_sensitive: false,
281 unknown_tagged_fields: BTreeMap::new(),
282 }
283 }
284}
285
286impl Message for CreatableTopicConfigs {
287 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
288 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
289}
290
291#[non_exhaustive]
293#[derive(Debug, Clone, PartialEq)]
294pub struct CreatableTopicResult {
295 pub name: super::TopicName,
299
300 pub topic_id: Uuid,
304
305 pub error_code: i16,
309
310 pub error_message: Option<StrBytes>,
314
315 pub topic_config_error_code: i16,
319
320 pub num_partitions: i32,
324
325 pub replication_factor: i16,
329
330 pub configs: Option<Vec<CreatableTopicConfigs>>,
334
335 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
337}
338
339impl CreatableTopicResult {
340 pub fn with_name(mut self, value: super::TopicName) -> Self {
346 self.name = value;
347 self
348 }
349 pub fn with_topic_id(mut self, value: Uuid) -> Self {
355 self.topic_id = value;
356 self
357 }
358 pub fn with_error_code(mut self, value: i16) -> Self {
364 self.error_code = value;
365 self
366 }
367 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
373 self.error_message = value;
374 self
375 }
376 pub fn with_topic_config_error_code(mut self, value: i16) -> Self {
382 self.topic_config_error_code = value;
383 self
384 }
385 pub fn with_num_partitions(mut self, value: i32) -> Self {
391 self.num_partitions = value;
392 self
393 }
394 pub fn with_replication_factor(mut self, value: i16) -> Self {
400 self.replication_factor = value;
401 self
402 }
403 pub fn with_configs(mut self, value: Option<Vec<CreatableTopicConfigs>>) -> Self {
409 self.configs = value;
410 self
411 }
412 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
414 self.unknown_tagged_fields = value;
415 self
416 }
417 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
419 self.unknown_tagged_fields.insert(key, value);
420 self
421 }
422}
423
424#[cfg(feature = "broker")]
425impl Encodable for CreatableTopicResult {
426 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
427 if version < 0 || version > 7 {
428 bail!("specified version not supported by this message type");
429 }
430 if version >= 5 {
431 types::CompactString.encode(buf, &self.name)?;
432 } else {
433 types::String.encode(buf, &self.name)?;
434 }
435 if version >= 7 {
436 types::Uuid.encode(buf, &self.topic_id)?;
437 }
438 types::Int16.encode(buf, &self.error_code)?;
439 if version >= 1 {
440 if version >= 5 {
441 types::CompactString.encode(buf, &self.error_message)?;
442 } else {
443 types::String.encode(buf, &self.error_message)?;
444 }
445 }
446 if version >= 5 {
447 types::Int32.encode(buf, &self.num_partitions)?;
448 }
449 if version >= 5 {
450 types::Int16.encode(buf, &self.replication_factor)?;
451 }
452 if version >= 5 {
453 types::CompactArray(types::Struct { version }).encode(buf, &self.configs)?;
454 }
455 if version >= 5 {
456 let mut num_tagged_fields = self.unknown_tagged_fields.len();
457 if self.topic_config_error_code != 0 {
458 num_tagged_fields += 1;
459 }
460 if num_tagged_fields > std::u32::MAX as usize {
461 bail!(
462 "Too many tagged fields to encode ({} fields)",
463 num_tagged_fields
464 );
465 }
466 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
467 if self.topic_config_error_code != 0 {
468 let computed_size = types::Int16.compute_size(&self.topic_config_error_code)?;
469 if computed_size > std::u32::MAX as usize {
470 bail!(
471 "Tagged field is too large to encode ({} bytes)",
472 computed_size
473 );
474 }
475 types::UnsignedVarInt.encode(buf, 0)?;
476 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
477 types::Int16.encode(buf, &self.topic_config_error_code)?;
478 }
479
480 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
481 }
482 Ok(())
483 }
484 fn compute_size(&self, version: i16) -> Result<usize> {
485 let mut total_size = 0;
486 if version >= 5 {
487 total_size += types::CompactString.compute_size(&self.name)?;
488 } else {
489 total_size += types::String.compute_size(&self.name)?;
490 }
491 if version >= 7 {
492 total_size += types::Uuid.compute_size(&self.topic_id)?;
493 }
494 total_size += types::Int16.compute_size(&self.error_code)?;
495 if version >= 1 {
496 if version >= 5 {
497 total_size += types::CompactString.compute_size(&self.error_message)?;
498 } else {
499 total_size += types::String.compute_size(&self.error_message)?;
500 }
501 }
502 if version >= 5 {
503 total_size += types::Int32.compute_size(&self.num_partitions)?;
504 }
505 if version >= 5 {
506 total_size += types::Int16.compute_size(&self.replication_factor)?;
507 }
508 if version >= 5 {
509 total_size +=
510 types::CompactArray(types::Struct { version }).compute_size(&self.configs)?;
511 }
512 if version >= 5 {
513 let mut num_tagged_fields = self.unknown_tagged_fields.len();
514 if self.topic_config_error_code != 0 {
515 num_tagged_fields += 1;
516 }
517 if num_tagged_fields > std::u32::MAX as usize {
518 bail!(
519 "Too many tagged fields to encode ({} fields)",
520 num_tagged_fields
521 );
522 }
523 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
524 if self.topic_config_error_code != 0 {
525 let computed_size = types::Int16.compute_size(&self.topic_config_error_code)?;
526 if computed_size > std::u32::MAX as usize {
527 bail!(
528 "Tagged field is too large to encode ({} bytes)",
529 computed_size
530 );
531 }
532 total_size += types::UnsignedVarInt.compute_size(0)?;
533 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
534 total_size += computed_size;
535 }
536
537 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
538 }
539 Ok(total_size)
540 }
541}
542
543#[cfg(feature = "client")]
544impl Decodable for CreatableTopicResult {
545 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
546 if version < 0 || version > 7 {
547 bail!("specified version not supported by this message type");
548 }
549 let name = if version >= 5 {
550 types::CompactString.decode(buf)?
551 } else {
552 types::String.decode(buf)?
553 };
554 let topic_id = if version >= 7 {
555 types::Uuid.decode(buf)?
556 } else {
557 Uuid::nil()
558 };
559 let error_code = types::Int16.decode(buf)?;
560 let error_message = if version >= 1 {
561 if version >= 5 {
562 types::CompactString.decode(buf)?
563 } else {
564 types::String.decode(buf)?
565 }
566 } else {
567 Some(Default::default())
568 };
569 let mut topic_config_error_code = 0;
570 let num_partitions = if version >= 5 {
571 types::Int32.decode(buf)?
572 } else {
573 -1
574 };
575 let replication_factor = if version >= 5 {
576 types::Int16.decode(buf)?
577 } else {
578 -1
579 };
580 let configs = if version >= 5 {
581 types::CompactArray(types::Struct { version }).decode(buf)?
582 } else {
583 Some(Default::default())
584 };
585 let mut unknown_tagged_fields = BTreeMap::new();
586 if version >= 5 {
587 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
588 for _ in 0..num_tagged_fields {
589 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
590 let size: u32 = types::UnsignedVarInt.decode(buf)?;
591 match tag {
592 0 => {
593 topic_config_error_code = types::Int16.decode(buf)?;
594 }
595 _ => {
596 let unknown_value = buf.try_get_bytes(size as usize)?;
597 unknown_tagged_fields.insert(tag as i32, unknown_value);
598 }
599 }
600 }
601 }
602 Ok(Self {
603 name,
604 topic_id,
605 error_code,
606 error_message,
607 topic_config_error_code,
608 num_partitions,
609 replication_factor,
610 configs,
611 unknown_tagged_fields,
612 })
613 }
614}
615
616impl Default for CreatableTopicResult {
617 fn default() -> Self {
618 Self {
619 name: Default::default(),
620 topic_id: Uuid::nil(),
621 error_code: 0,
622 error_message: Some(Default::default()),
623 topic_config_error_code: 0,
624 num_partitions: -1,
625 replication_factor: -1,
626 configs: Some(Default::default()),
627 unknown_tagged_fields: BTreeMap::new(),
628 }
629 }
630}
631
632impl Message for CreatableTopicResult {
633 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
634 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
635}
636
637#[non_exhaustive]
639#[derive(Debug, Clone, PartialEq)]
640pub struct CreateTopicsResponse {
641 pub throttle_time_ms: i32,
645
646 pub topics: Vec<CreatableTopicResult>,
650
651 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
653}
654
655impl CreateTopicsResponse {
656 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
662 self.throttle_time_ms = value;
663 self
664 }
665 pub fn with_topics(mut self, value: Vec<CreatableTopicResult>) -> Self {
671 self.topics = value;
672 self
673 }
674 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
676 self.unknown_tagged_fields = value;
677 self
678 }
679 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
681 self.unknown_tagged_fields.insert(key, value);
682 self
683 }
684}
685
686#[cfg(feature = "broker")]
687impl Encodable for CreateTopicsResponse {
688 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
689 if version < 0 || version > 7 {
690 bail!("specified version not supported by this message type");
691 }
692 if version >= 2 {
693 types::Int32.encode(buf, &self.throttle_time_ms)?;
694 }
695 if version >= 5 {
696 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
697 } else {
698 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
699 }
700 if version >= 5 {
701 let num_tagged_fields = self.unknown_tagged_fields.len();
702 if num_tagged_fields > std::u32::MAX as usize {
703 bail!(
704 "Too many tagged fields to encode ({} fields)",
705 num_tagged_fields
706 );
707 }
708 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
709
710 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
711 }
712 Ok(())
713 }
714 fn compute_size(&self, version: i16) -> Result<usize> {
715 let mut total_size = 0;
716 if version >= 2 {
717 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
718 }
719 if version >= 5 {
720 total_size +=
721 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
722 } else {
723 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
724 }
725 if version >= 5 {
726 let num_tagged_fields = self.unknown_tagged_fields.len();
727 if num_tagged_fields > std::u32::MAX as usize {
728 bail!(
729 "Too many tagged fields to encode ({} fields)",
730 num_tagged_fields
731 );
732 }
733 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
734
735 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
736 }
737 Ok(total_size)
738 }
739}
740
741#[cfg(feature = "client")]
742impl Decodable for CreateTopicsResponse {
743 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
744 if version < 0 || version > 7 {
745 bail!("specified version not supported by this message type");
746 }
747 let throttle_time_ms = if version >= 2 {
748 types::Int32.decode(buf)?
749 } else {
750 0
751 };
752 let topics = if version >= 5 {
753 types::CompactArray(types::Struct { version }).decode(buf)?
754 } else {
755 types::Array(types::Struct { version }).decode(buf)?
756 };
757 let mut unknown_tagged_fields = BTreeMap::new();
758 if version >= 5 {
759 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
760 for _ in 0..num_tagged_fields {
761 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
762 let size: u32 = types::UnsignedVarInt.decode(buf)?;
763 let unknown_value = buf.try_get_bytes(size as usize)?;
764 unknown_tagged_fields.insert(tag as i32, unknown_value);
765 }
766 }
767 Ok(Self {
768 throttle_time_ms,
769 topics,
770 unknown_tagged_fields,
771 })
772 }
773}
774
775impl Default for CreateTopicsResponse {
776 fn default() -> Self {
777 Self {
778 throttle_time_ms: 0,
779 topics: Default::default(),
780 unknown_tagged_fields: BTreeMap::new(),
781 }
782 }
783}
784
785impl Message for CreateTopicsResponse {
786 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
787 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
788}
789
790impl HeaderVersion for CreateTopicsResponse {
791 fn header_version(version: i16) -> i16 {
792 if version >= 5 {
793 1
794 } else {
795 0
796 }
797 }
798}