1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct CreatableTopicConfigs {
24 pub name: StrBytes,
28
29 pub value: Option<StrBytes>,
33
34 pub read_only: bool,
38
39 pub config_source: i8,
43
44 pub is_sensitive: bool,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl CreatableTopicConfigs {
54 pub fn with_name(mut self, value: StrBytes) -> Self {
60 self.name = value;
61 self
62 }
63 pub fn with_value(mut self, value: Option<StrBytes>) -> Self {
69 self.value = value;
70 self
71 }
72 pub fn with_read_only(mut self, value: bool) -> Self {
78 self.read_only = value;
79 self
80 }
81 pub fn with_config_source(mut self, value: i8) -> Self {
87 self.config_source = value;
88 self
89 }
90 pub fn with_is_sensitive(mut self, value: bool) -> Self {
96 self.is_sensitive = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "broker")]
112impl Encodable for CreatableTopicConfigs {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version < 2 || version > 7 {
115 bail!("specified version not supported by this message type");
116 }
117 if version >= 5 {
118 types::CompactString.encode(buf, &self.name)?;
119 } else {
120 if !self.name.is_empty() {
121 bail!("A field is set that is not available on the selected protocol version");
122 }
123 }
124 if version >= 5 {
125 types::CompactString.encode(buf, &self.value)?;
126 } else {
127 if !self
128 .value
129 .as_ref()
130 .map(|x| x.is_empty())
131 .unwrap_or_default()
132 {
133 bail!("A field is set that is not available on the selected protocol version");
134 }
135 }
136 if version >= 5 {
137 types::Boolean.encode(buf, &self.read_only)?;
138 } else {
139 if self.read_only {
140 bail!("A field is set that is not available on the selected protocol version");
141 }
142 }
143 if version >= 5 {
144 types::Int8.encode(buf, &self.config_source)?;
145 }
146 if version >= 5 {
147 types::Boolean.encode(buf, &self.is_sensitive)?;
148 } else {
149 if self.is_sensitive {
150 bail!("A field is set that is not available on the selected protocol version");
151 }
152 }
153 if version >= 5 {
154 let num_tagged_fields = self.unknown_tagged_fields.len();
155 if num_tagged_fields > std::u32::MAX as usize {
156 bail!(
157 "Too many tagged fields to encode ({} fields)",
158 num_tagged_fields
159 );
160 }
161 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
162
163 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
164 }
165 Ok(())
166 }
167 fn compute_size(&self, version: i16) -> Result<usize> {
168 let mut total_size = 0;
169 if version >= 5 {
170 total_size += types::CompactString.compute_size(&self.name)?;
171 } else {
172 if !self.name.is_empty() {
173 bail!("A field is set that is not available on the selected protocol version");
174 }
175 }
176 if version >= 5 {
177 total_size += types::CompactString.compute_size(&self.value)?;
178 } else {
179 if !self
180 .value
181 .as_ref()
182 .map(|x| x.is_empty())
183 .unwrap_or_default()
184 {
185 bail!("A field is set that is not available on the selected protocol version");
186 }
187 }
188 if version >= 5 {
189 total_size += types::Boolean.compute_size(&self.read_only)?;
190 } else {
191 if self.read_only {
192 bail!("A field is set that is not available on the selected protocol version");
193 }
194 }
195 if version >= 5 {
196 total_size += types::Int8.compute_size(&self.config_source)?;
197 }
198 if version >= 5 {
199 total_size += types::Boolean.compute_size(&self.is_sensitive)?;
200 } else {
201 if self.is_sensitive {
202 bail!("A field is set that is not available on the selected protocol version");
203 }
204 }
205 if version >= 5 {
206 let num_tagged_fields = self.unknown_tagged_fields.len();
207 if num_tagged_fields > std::u32::MAX as usize {
208 bail!(
209 "Too many tagged fields to encode ({} fields)",
210 num_tagged_fields
211 );
212 }
213 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
214
215 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
216 }
217 Ok(total_size)
218 }
219}
220
221#[cfg(feature = "client")]
222impl Decodable for CreatableTopicConfigs {
223 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
224 if version < 2 || version > 7 {
225 bail!("specified version not supported by this message type");
226 }
227 let name = if version >= 5 {
228 types::CompactString.decode(buf)?
229 } else {
230 Default::default()
231 };
232 let value = if version >= 5 {
233 types::CompactString.decode(buf)?
234 } else {
235 Some(Default::default())
236 };
237 let read_only = if version >= 5 {
238 types::Boolean.decode(buf)?
239 } else {
240 false
241 };
242 let config_source = if version >= 5 {
243 types::Int8.decode(buf)?
244 } else {
245 -1
246 };
247 let is_sensitive = if version >= 5 {
248 types::Boolean.decode(buf)?
249 } else {
250 false
251 };
252 let mut unknown_tagged_fields = BTreeMap::new();
253 if version >= 5 {
254 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
255 for _ in 0..num_tagged_fields {
256 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
257 let size: u32 = types::UnsignedVarInt.decode(buf)?;
258 let unknown_value = buf.try_get_bytes(size as usize)?;
259 unknown_tagged_fields.insert(tag as i32, unknown_value);
260 }
261 }
262 Ok(Self {
263 name,
264 value,
265 read_only,
266 config_source,
267 is_sensitive,
268 unknown_tagged_fields,
269 })
270 }
271}
272
273impl Default for CreatableTopicConfigs {
274 fn default() -> Self {
275 Self {
276 name: Default::default(),
277 value: Some(Default::default()),
278 read_only: false,
279 config_source: -1,
280 is_sensitive: false,
281 unknown_tagged_fields: BTreeMap::new(),
282 }
283 }
284}
285
286impl Message for CreatableTopicConfigs {
287 const VERSIONS: VersionRange = VersionRange { min: 2, max: 7 };
288 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
289}
290
291#[non_exhaustive]
293#[derive(Debug, Clone, PartialEq)]
294pub struct CreatableTopicResult {
295 pub name: super::TopicName,
299
300 pub topic_id: Uuid,
304
305 pub error_code: i16,
309
310 pub error_message: Option<StrBytes>,
314
315 pub topic_config_error_code: i16,
319
320 pub num_partitions: i32,
324
325 pub replication_factor: i16,
329
330 pub configs: Option<Vec<CreatableTopicConfigs>>,
334
335 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
337}
338
339impl CreatableTopicResult {
340 pub fn with_name(mut self, value: super::TopicName) -> Self {
346 self.name = value;
347 self
348 }
349 pub fn with_topic_id(mut self, value: Uuid) -> Self {
355 self.topic_id = value;
356 self
357 }
358 pub fn with_error_code(mut self, value: i16) -> Self {
364 self.error_code = value;
365 self
366 }
367 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
373 self.error_message = value;
374 self
375 }
376 pub fn with_topic_config_error_code(mut self, value: i16) -> Self {
382 self.topic_config_error_code = value;
383 self
384 }
385 pub fn with_num_partitions(mut self, value: i32) -> Self {
391 self.num_partitions = value;
392 self
393 }
394 pub fn with_replication_factor(mut self, value: i16) -> Self {
400 self.replication_factor = value;
401 self
402 }
403 pub fn with_configs(mut self, value: Option<Vec<CreatableTopicConfigs>>) -> Self {
409 self.configs = value;
410 self
411 }
412 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
414 self.unknown_tagged_fields = value;
415 self
416 }
417 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
419 self.unknown_tagged_fields.insert(key, value);
420 self
421 }
422}
423
424#[cfg(feature = "broker")]
425impl Encodable for CreatableTopicResult {
426 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
427 if version < 2 || version > 7 {
428 bail!("specified version not supported by this message type");
429 }
430 if version >= 5 {
431 types::CompactString.encode(buf, &self.name)?;
432 } else {
433 types::String.encode(buf, &self.name)?;
434 }
435 if version >= 7 {
436 types::Uuid.encode(buf, &self.topic_id)?;
437 }
438 types::Int16.encode(buf, &self.error_code)?;
439 if version >= 5 {
440 types::CompactString.encode(buf, &self.error_message)?;
441 } else {
442 types::String.encode(buf, &self.error_message)?;
443 }
444 if version >= 5 {
445 types::Int32.encode(buf, &self.num_partitions)?;
446 }
447 if version >= 5 {
448 types::Int16.encode(buf, &self.replication_factor)?;
449 }
450 if version >= 5 {
451 types::CompactArray(types::Struct { version }).encode(buf, &self.configs)?;
452 }
453 if version >= 5 {
454 let mut num_tagged_fields = self.unknown_tagged_fields.len();
455 if self.topic_config_error_code != 0 {
456 num_tagged_fields += 1;
457 }
458 if num_tagged_fields > std::u32::MAX as usize {
459 bail!(
460 "Too many tagged fields to encode ({} fields)",
461 num_tagged_fields
462 );
463 }
464 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
465 if self.topic_config_error_code != 0 {
466 let computed_size = types::Int16.compute_size(&self.topic_config_error_code)?;
467 if computed_size > std::u32::MAX as usize {
468 bail!(
469 "Tagged field is too large to encode ({} bytes)",
470 computed_size
471 );
472 }
473 types::UnsignedVarInt.encode(buf, 0)?;
474 types::UnsignedVarInt.encode(buf, computed_size as u32)?;
475 types::Int16.encode(buf, &self.topic_config_error_code)?;
476 }
477
478 write_unknown_tagged_fields(buf, 1.., &self.unknown_tagged_fields)?;
479 }
480 Ok(())
481 }
482 fn compute_size(&self, version: i16) -> Result<usize> {
483 let mut total_size = 0;
484 if version >= 5 {
485 total_size += types::CompactString.compute_size(&self.name)?;
486 } else {
487 total_size += types::String.compute_size(&self.name)?;
488 }
489 if version >= 7 {
490 total_size += types::Uuid.compute_size(&self.topic_id)?;
491 }
492 total_size += types::Int16.compute_size(&self.error_code)?;
493 if version >= 5 {
494 total_size += types::CompactString.compute_size(&self.error_message)?;
495 } else {
496 total_size += types::String.compute_size(&self.error_message)?;
497 }
498 if version >= 5 {
499 total_size += types::Int32.compute_size(&self.num_partitions)?;
500 }
501 if version >= 5 {
502 total_size += types::Int16.compute_size(&self.replication_factor)?;
503 }
504 if version >= 5 {
505 total_size +=
506 types::CompactArray(types::Struct { version }).compute_size(&self.configs)?;
507 }
508 if version >= 5 {
509 let mut num_tagged_fields = self.unknown_tagged_fields.len();
510 if self.topic_config_error_code != 0 {
511 num_tagged_fields += 1;
512 }
513 if num_tagged_fields > std::u32::MAX as usize {
514 bail!(
515 "Too many tagged fields to encode ({} fields)",
516 num_tagged_fields
517 );
518 }
519 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
520 if self.topic_config_error_code != 0 {
521 let computed_size = types::Int16.compute_size(&self.topic_config_error_code)?;
522 if computed_size > std::u32::MAX as usize {
523 bail!(
524 "Tagged field is too large to encode ({} bytes)",
525 computed_size
526 );
527 }
528 total_size += types::UnsignedVarInt.compute_size(0)?;
529 total_size += types::UnsignedVarInt.compute_size(computed_size as u32)?;
530 total_size += computed_size;
531 }
532
533 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
534 }
535 Ok(total_size)
536 }
537}
538
539#[cfg(feature = "client")]
540impl Decodable for CreatableTopicResult {
541 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
542 if version < 2 || version > 7 {
543 bail!("specified version not supported by this message type");
544 }
545 let name = if version >= 5 {
546 types::CompactString.decode(buf)?
547 } else {
548 types::String.decode(buf)?
549 };
550 let topic_id = if version >= 7 {
551 types::Uuid.decode(buf)?
552 } else {
553 Uuid::nil()
554 };
555 let error_code = types::Int16.decode(buf)?;
556 let error_message = if version >= 5 {
557 types::CompactString.decode(buf)?
558 } else {
559 types::String.decode(buf)?
560 };
561 let mut topic_config_error_code = 0;
562 let num_partitions = if version >= 5 {
563 types::Int32.decode(buf)?
564 } else {
565 -1
566 };
567 let replication_factor = if version >= 5 {
568 types::Int16.decode(buf)?
569 } else {
570 -1
571 };
572 let configs = if version >= 5 {
573 types::CompactArray(types::Struct { version }).decode(buf)?
574 } else {
575 Some(Default::default())
576 };
577 let mut unknown_tagged_fields = BTreeMap::new();
578 if version >= 5 {
579 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
580 for _ in 0..num_tagged_fields {
581 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
582 let size: u32 = types::UnsignedVarInt.decode(buf)?;
583 match tag {
584 0 => {
585 topic_config_error_code = types::Int16.decode(buf)?;
586 }
587 _ => {
588 let unknown_value = buf.try_get_bytes(size as usize)?;
589 unknown_tagged_fields.insert(tag as i32, unknown_value);
590 }
591 }
592 }
593 }
594 Ok(Self {
595 name,
596 topic_id,
597 error_code,
598 error_message,
599 topic_config_error_code,
600 num_partitions,
601 replication_factor,
602 configs,
603 unknown_tagged_fields,
604 })
605 }
606}
607
608impl Default for CreatableTopicResult {
609 fn default() -> Self {
610 Self {
611 name: Default::default(),
612 topic_id: Uuid::nil(),
613 error_code: 0,
614 error_message: Some(Default::default()),
615 topic_config_error_code: 0,
616 num_partitions: -1,
617 replication_factor: -1,
618 configs: Some(Default::default()),
619 unknown_tagged_fields: BTreeMap::new(),
620 }
621 }
622}
623
624impl Message for CreatableTopicResult {
625 const VERSIONS: VersionRange = VersionRange { min: 2, max: 7 };
626 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
627}
628
629#[non_exhaustive]
631#[derive(Debug, Clone, PartialEq)]
632pub struct CreateTopicsResponse {
633 pub throttle_time_ms: i32,
637
638 pub topics: Vec<CreatableTopicResult>,
642
643 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
645}
646
647impl CreateTopicsResponse {
648 pub fn with_throttle_time_ms(mut self, value: i32) -> Self {
654 self.throttle_time_ms = value;
655 self
656 }
657 pub fn with_topics(mut self, value: Vec<CreatableTopicResult>) -> Self {
663 self.topics = value;
664 self
665 }
666 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
668 self.unknown_tagged_fields = value;
669 self
670 }
671 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
673 self.unknown_tagged_fields.insert(key, value);
674 self
675 }
676}
677
678#[cfg(feature = "broker")]
679impl Encodable for CreateTopicsResponse {
680 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
681 if version < 2 || version > 7 {
682 bail!("specified version not supported by this message type");
683 }
684 types::Int32.encode(buf, &self.throttle_time_ms)?;
685 if version >= 5 {
686 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
687 } else {
688 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
689 }
690 if version >= 5 {
691 let num_tagged_fields = self.unknown_tagged_fields.len();
692 if num_tagged_fields > std::u32::MAX as usize {
693 bail!(
694 "Too many tagged fields to encode ({} fields)",
695 num_tagged_fields
696 );
697 }
698 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
699
700 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
701 }
702 Ok(())
703 }
704 fn compute_size(&self, version: i16) -> Result<usize> {
705 let mut total_size = 0;
706 total_size += types::Int32.compute_size(&self.throttle_time_ms)?;
707 if version >= 5 {
708 total_size +=
709 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
710 } else {
711 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
712 }
713 if version >= 5 {
714 let num_tagged_fields = self.unknown_tagged_fields.len();
715 if num_tagged_fields > std::u32::MAX as usize {
716 bail!(
717 "Too many tagged fields to encode ({} fields)",
718 num_tagged_fields
719 );
720 }
721 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
722
723 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
724 }
725 Ok(total_size)
726 }
727}
728
729#[cfg(feature = "client")]
730impl Decodable for CreateTopicsResponse {
731 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
732 if version < 2 || version > 7 {
733 bail!("specified version not supported by this message type");
734 }
735 let throttle_time_ms = types::Int32.decode(buf)?;
736 let topics = if version >= 5 {
737 types::CompactArray(types::Struct { version }).decode(buf)?
738 } else {
739 types::Array(types::Struct { version }).decode(buf)?
740 };
741 let mut unknown_tagged_fields = BTreeMap::new();
742 if version >= 5 {
743 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
744 for _ in 0..num_tagged_fields {
745 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
746 let size: u32 = types::UnsignedVarInt.decode(buf)?;
747 let unknown_value = buf.try_get_bytes(size as usize)?;
748 unknown_tagged_fields.insert(tag as i32, unknown_value);
749 }
750 }
751 Ok(Self {
752 throttle_time_ms,
753 topics,
754 unknown_tagged_fields,
755 })
756 }
757}
758
759impl Default for CreateTopicsResponse {
760 fn default() -> Self {
761 Self {
762 throttle_time_ms: 0,
763 topics: Default::default(),
764 unknown_tagged_fields: BTreeMap::new(),
765 }
766 }
767}
768
769impl Message for CreateTopicsResponse {
770 const VERSIONS: VersionRange = VersionRange { min: 2, max: 7 };
771 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
772}
773
774impl HeaderVersion for CreateTopicsResponse {
775 fn header_version(version: i16) -> i16 {
776 if version >= 5 {
777 1
778 } else {
779 0
780 }
781 }
782}