1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct CreatableTopicConfigs {
24 pub name: StrBytes,
28
29 pub value: Option<StrBytes>,
33
34 pub read_only: bool,
38
39 pub config_source: i8,
43
44 pub is_sensitive: bool,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl CreatableTopicConfigs {
54 pub fn with_name(mut self, value: StrBytes) -> Self {
60 self.name = value;
61 self
62 }
63 pub fn with_value(mut self, value: Option<StrBytes>) -> Self {
69 self.value = value;
70 self
71 }
72 pub fn with_read_only(mut self, value: bool) -> Self {
78 self.read_only = value;
79 self
80 }
81 pub fn with_config_source(mut self, value: i8) -> Self {
87 self.config_source = value;
88 self
89 }
90 pub fn with_is_sensitive(mut self, value: bool) -> Self {
96 self.is_sensitive = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "broker")]
112impl Encodable for CreatableTopicConfigs {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version >= 5 {
115 types::CompactString.encode(buf, &self.name)?;
116 } else {
117 if !self.name.is_empty() {
118 bail!("A field is set that is not available on the selected protocol version");
119 }
120 }
121 if version >= 5 {
122 types::CompactString.encode(buf, &self.value)?;
123 } else {
124 if !self
125 .value
126 .as_ref()
127 .map(|x| x.is_empty())
128 .unwrap_or_default()
129 {
130 bail!("A field is set that is not available on the selected protocol version");
131 }
132 }
133 if version >= 5 {
134 types::Boolean.encode(buf, &self.read_only)?;
135 } else {
136 if self.read_only {
137 bail!("A field is set that is not available on the selected protocol version");
138 }
139 }
140 if version >= 5 {
141 types::Int8.encode(buf, &self.config_source)?;
142 }
143 if version >= 5 {
144 types::Boolean.encode(buf, &self.is_sensitive)?;
145 } else {
146 if self.is_sensitive {
147 bail!("A field is set that is not available on the selected protocol version");
148 }
149 }
150 if version >= 5 {
151 let num_tagged_fields = self.unknown_tagged_fields.len();
152 if num_tagged_fields > std::u32::MAX as usize {
153 bail!(
154 "Too many tagged fields to encode ({} fields)",
155 num_tagged_fields
156 );
157 }
158 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
159
160 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
161 }
162 Ok(())
163 }
164 fn compute_size(&self, version: i16) -> Result<usize> {
165 let mut total_size = 0;
166 if version >= 5 {
167 total_size += types::CompactString.compute_size(&self.name)?;
168 } else {
169 if !self.name.is_empty() {
170 bail!("A field is set that is not available on the selected protocol version");
171 }
172 }
173 if version >= 5 {
174 total_size += types::CompactString.compute_size(&self.value)?;
175 } else {
176 if !self
177 .value
178 .as_ref()
179 .map(|x| x.is_empty())
180 .unwrap_or_default()
181 {
182 bail!("A field is set that is not available on the selected protocol version");
183 }
184 }
185 if version >= 5 {
186 total_size += types::Boolean.compute_size(&self.read_only)?;
187 } else {
188 if self.read_only {
189 bail!("A field is set that is not available on the selected protocol version");
190 }
191 }
192 if version >= 5 {
193 total_size += types::Int8.compute_size(&self.config_source)?;
194 }
195 if version >= 5 {
196 total_size += types::Boolean.compute_size(&self.is_sensitive)?;
197 } else {
198 if self.is_sensitive {
199 bail!("A field is set that is not available on the selected protocol version");
200 }
201 }
202 if version >= 5 {
203 let num_tagged_fields = self.unknown_tagged_fields.len();
204 if num_tagged_fields > std::u32::MAX as usize {
205 bail!(
206 "Too many tagged fields to encode ({} fields)",
207 num_tagged_fields
208 );
209 }
210 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
211
212 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
213 }
214 Ok(total_size)
215 }
216}
217
218#[cfg(feature = "client")]
219impl Decodable for CreatableTopicConfigs {
220 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
221 let name = if version >= 5 {
222 types::CompactString.decode(buf)?
223 } else {
224 Default::default()
225 };
226 let value = if version >= 5 {
227 types::CompactString.decode(buf)?
228 } else {
229 Some(Default::default())
230 };
231 let read_only = if version >= 5 {
232 types::Boolean.decode(buf)?
233 } else {
234 false
235 };
236 let config_source = if version >= 5 {
237 types::Int8.decode(buf)?
238 } else {
239 -1
240 };
241 let is_sensitive = if version >= 5 {
242 types::Boolean.decode(buf)?
243 } else {
244 false
245 };
246 let mut unknown_tagged_fields = BTreeMap::new();
247 if version >= 5 {
248 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
249 for _ in 0..num_tagged_fields {
250 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
251 let size: u32 = types::UnsignedVarInt.decode(buf)?;
252 let unknown_value = buf.try_get_bytes(size as usize)?;
253 unknown_tagged_fields.insert(tag as i32, unknown_value);
254 }
255 }
256 Ok(Self {
257 name,
258 value,
259 read_only,
260 config_source,
261 is_sensitive,
262 unknown_tagged_fields,
263 })
264 }
265}
266
267impl Default for CreatableTopicConfigs {
268 fn default() -> Self {
269 Self {
270 name: Default::default(),
271 value: Some(Default::default()),
272 read_only: false,
273 config_source: -1,
274 is_sensitive: false,
275 unknown_tagged_fields: BTreeMap::new(),
276 }
277 }
278}
279
280impl Message for CreatableTopicConfigs {
281 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
282 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
283}
284
285#[non_exhaustive]
287#[derive(Debug, Clone, PartialEq)]
288pub struct CreatableTopicResult {
289 pub name: super::TopicName,
293
294 pub topic_id: Uuid,
298
299 pub error_code: i16,
303
304 pub error_message: Option<StrBytes>,
308
309 pub topic_config_error_code: i16,
313
314 pub num_partitions: i32,
318
319 pub replication_factor: i16,
323
324 pub configs: Option<Vec<CreatableTopicConfigs>>,
328
329 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
331}
332
333impl CreatableTopicResult {
334 pub fn with_name(mut self, value: super::TopicName) -> Self {
340 self.name = value;
341 self
342 }
343 pub fn with_topic_id(mut self, value: Uuid) -> Self {
349 self.topic_id = value;
350 self
351 }
352 pub fn with_error_code(mut self, value: i16) -> Self {
358 self.error_code = value;
359 self
360 }
361 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
367 self.error_message = value;
368 self
369 }
370 pub fn with_topic_config_error_code(mut self, value: i16) -> Self {
376 self.topic_config_error_code = value;
377 self
378 }
379 pub fn with_num_partitions(mut self, value: i32) -> Self {
385 self.num_partitions = value;
386 self
387 }
388 pub fn with_replication_factor(mut self, value: i16) -> Self {
394 self.replication_factor = value;
395 self
396 }
397 pub fn with_configs(mut self, value: Option<Vec<CreatableTopicConfigs>>) -> Self {
403 self.configs = value;
404 self
405 }
406 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
408 self.unknown_tagged_fields = value;
409 self
410 }
411 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
413 self.unknown_tagged_fields.insert(key, value);
414 self
415 }
416}
417
418#[cfg(feature = "broker")]
419impl Encodable for CreatableTopicResult {
420 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
421 if version >= 5 {
422 types::CompactString.encode(buf, &self.name)?;
423 } else {
424 types::String.encode(buf, &self.name)?;
425 }
426 if version >= 7 {
427 types::Uuid.encode(buf, &self.topic_id)?;
428 }
429 types::Int16.encode(buf, &self.error_code)?;
430 if version >= 1 {
431 if version >= 5 {
432 types::CompactString.encode(buf, &self.error_message)?;
433 } else {
434 types::String.encode(buf, &self.error_message)?;
435 }
436 }
437 if version >= 5 {
438 types::Int32.encode(buf, &self.num_partitions)?;
439 }
440 if version >= 5 {
441 types::Int16.encode(buf, &self.replication_factor)?;
442 }
443 if version >= 5 {
444 types::CompactArray(types::Struct { version }).encode(buf, &self.configs)?;
445 }
446 if version >= 5 {
447 let mut num_tagged_fields = self.unknown_tagged_fields.len();
448 if self.topic_config_error_code != 0 {
449 num_tagged_fields += 1;
450 }
451 if num_tagged_fields > std::u32::MAX as usize {
452 bail!(
453 "Too many tagged fields to encode ({} fields)",
454 num_tagged_fields
455 );
456 }
457 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
458 if self.topic_config_error_code != 0 {
459 let computed_size = types::Int16.compute_size(&self.topic_config_error_code)?;
460 if computed_size > std::u32::MAX as usize {
461 bail!(
462 "Tagged field is too large to encode ({} bytes)",
463 computed_size
464 );
465 }
466 types::UnsignedVarInt.encode(buf, 0)?;
467 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
468 types::Int16.encode(buf, &self.topic_config_error_code)?;
469 }
470
471 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
472 }
473 Ok(())
474 }
475 fn compute_size(&self, version: i16) -> Result<usize> {
476 let mut total_size = 0;
477 if version >= 5 {
478 total_size += types::CompactString.compute_size(&self.name)?;
479 } else {
480 total_size += types::String.compute_size(&self.name)?;
481 }
482 if version >= 7 {
483 total_size += types::Uuid.compute_size(&self.topic_id)?;
484 }
485 total_size += types::Int16.compute_size(&self.error_code)?;
486 if version >= 1 {
487 if version >= 5 {
488 total_size += types::CompactString.compute_size(&self.error_message)?;
489 } else {
490 total_size += types::String.compute_size(&self.error_message)?;
491 }
492 }
493 if version >= 5 {
494 total_size += types::Int32.compute_size(&self.num_partitions)?;
495 }
496 if version >= 5 {
497 total_size += types::Int16.compute_size(&self.replication_factor)?;
498 }
499 if version >= 5 {
500 total_size +=
501 types::CompactArray(types::Struct { version }).compute_size(&self.configs)?;
502 }
503 if version >= 5 {
504 let mut num_tagged_fields = self.unknown_tagged_fields.len();
505 if self.topic_config_error_code != 0 {
506 num_tagged_fields += 1;
507 }
508 if num_tagged_fields > std::u32::MAX as usize {
509 bail!(
510 "Too many tagged fields to encode ({} fields)",
511 num_tagged_fields
512 );
513 }
514 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
515 if self.topic_config_error_code != 0 {
516 let computed_size = types::Int16.compute_size(&self.topic_config_error_code)?;
517 if computed_size > std::u32::MAX as usize {
518 bail!(
519 "Tagged field is too large to encode ({} bytes)",
520 computed_size
521 );
522 }
523 total_size += types::UnsignedVarInt.compute_size(0)?;
524 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
525 total_size += computed_size;
526 }
527
528 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
529 }
530 Ok(total_size)
531 }
532}
533
534#[cfg(feature = "client")]
535impl Decodable for CreatableTopicResult {
536 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
537 let name = if version >= 5 {
538 types::CompactString.decode(buf)?
539 } else {
540 types::String.decode(buf)?
541 };
542 let topic_id = if version >= 7 {
543 types::Uuid.decode(buf)?
544 } else {
545 Uuid::nil()
546 };
547 let error_code = types::Int16.decode(buf)?;
548 let error_message = if version >= 1 {
549 if version >= 5 {
550 types::CompactString.decode(buf)?
551 } else {
552 types::String.decode(buf)?
553 }
554 } else {
555 Some(Default::default())
556 };
557 let mut topic_config_error_code = 0;
558 let num_partitions = if version >= 5 {
559 types::Int32.decode(buf)?
560 } else {
561 -1
562 };
563 let replication_factor = if version >= 5 {
564 types::Int16.decode(buf)?
565 } else {
566 -1
567 };
568 let configs = if version >= 5 {
569 types::CompactArray(types::Struct { version }).decode(buf)?
570 } else {
571 Some(Default::default())
572 };
573 let mut unknown_tagged_fields = BTreeMap::new();
574 if version >= 5 {
575 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
576 for _ in 0..num_tagged_fields {
577 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
578 let size: u32 = types::UnsignedVarInt.decode(buf)?;
579 match tag {
580 0 => {
581 topic_config_error_code = types::Int16.decode(buf)?;
582 }
583 _ => {
584 let unknown_value = buf.try_get_bytes(size as usize)?;
585 unknown_tagged_fields.insert(tag as i32, unknown_value);
586 }
587 }
588 }
589 }
590 Ok(Self {
591 name,
592 topic_id,
593 error_code,
594 error_message,
595 topic_config_error_code,
596 num_partitions,
597 replication_factor,
598 configs,
599 unknown_tagged_fields,
600 })
601 }
602}
603
604impl Default for CreatableTopicResult {
605 fn default() -> Self {
606 Self {
607 name: Default::default(),
608 topic_id: Uuid::nil(),
609 error_code: 0,
610 error_message: Some(Default::default()),
611 topic_config_error_code: 0,
612 num_partitions: -1,
613 replication_factor: -1,
614 configs: Some(Default::default()),
615 unknown_tagged_fields: BTreeMap::new(),
616 }
617 }
618}
619
620impl Message for CreatableTopicResult {
621 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
622 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
623}
624
625#[non_exhaustive]
627#[derive(Debug, Clone, PartialEq)]
628pub struct CreateTopicsResponse {
629 pub throttle_time_ms: i32,
633
634 pub topics: Vec<CreatableTopicResult>,
638
639 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
641}
642
643impl CreateTopicsResponse {
644 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
650 self.throttle_time_ms = value;
651 self
652 }
653 pub fn with_topics(mut self, value: Vec<CreatableTopicResult>) -> Self {
659 self.topics = value;
660 self
661 }
662 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
664 self.unknown_tagged_fields = value;
665 self
666 }
667 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
669 self.unknown_tagged_fields.insert(key, value);
670 self
671 }
672}
673
674#[cfg(feature = "broker")]
675impl Encodable for CreateTopicsResponse {
676 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
677 if version >= 2 {
678 types::Int32.encode(buf, &self.throttle_time_ms)?;
679 }
680 if version >= 5 {
681 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
682 } else {
683 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
684 }
685 if version >= 5 {
686 let num_tagged_fields = self.unknown_tagged_fields.len();
687 if num_tagged_fields > std::u32::MAX as usize {
688 bail!(
689 "Too many tagged fields to encode ({} fields)",
690 num_tagged_fields
691 );
692 }
693 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
694
695 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
696 }
697 Ok(())
698 }
699 fn compute_size(&self, version: i16) -> Result<usize> {
700 let mut total_size = 0;
701 if version >= 2 {
702 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
703 }
704 if version >= 5 {
705 total_size +=
706 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
707 } else {
708 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
709 }
710 if version >= 5 {
711 let num_tagged_fields = self.unknown_tagged_fields.len();
712 if num_tagged_fields > std::u32::MAX as usize {
713 bail!(
714 "Too many tagged fields to encode ({} fields)",
715 num_tagged_fields
716 );
717 }
718 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
719
720 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
721 }
722 Ok(total_size)
723 }
724}
725
726#[cfg(feature = "client")]
727impl Decodable for CreateTopicsResponse {
728 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
729 let throttle_time_ms = if version >= 2 {
730 types::Int32.decode(buf)?
731 } else {
732 0
733 };
734 let topics = if version >= 5 {
735 types::CompactArray(types::Struct { version }).decode(buf)?
736 } else {
737 types::Array(types::Struct { version }).decode(buf)?
738 };
739 let mut unknown_tagged_fields = BTreeMap::new();
740 if version >= 5 {
741 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
742 for _ in 0..num_tagged_fields {
743 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
744 let size: u32 = types::UnsignedVarInt.decode(buf)?;
745 let unknown_value = buf.try_get_bytes(size as usize)?;
746 unknown_tagged_fields.insert(tag as i32, unknown_value);
747 }
748 }
749 Ok(Self {
750 throttle_time_ms,
751 topics,
752 unknown_tagged_fields,
753 })
754 }
755}
756
757impl Default for CreateTopicsResponse {
758 fn default() -> Self {
759 Self {
760 throttle_time_ms: 0,
761 topics: Default::default(),
762 unknown_tagged_fields: BTreeMap::new(),
763 }
764 }
765}
766
767impl Message for CreateTopicsResponse {
768 const VERSIONS: VersionRange = VersionRange { min: 0, max: 7 };
769 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
770}
771
772impl HeaderVersion for CreateTopicsResponse {
773 fn header_version(version: i16) -> i16 {
774 if version >= 5 {
775 1
776 } else {
777 0
778 }
779 }
780}