1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct CreatePartitionsAssignment {
24 pub broker_ids: Vec<super::BrokerId>,
28
29 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
31}
32
33impl CreatePartitionsAssignment {
34 pub fn with_broker_ids(mut self, value: Vec<super::BrokerId>) -> Self {
40 self.broker_ids = value;
41 self
42 }
43 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
45 self.unknown_tagged_fields = value;
46 self
47 }
48 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
50 self.unknown_tagged_fields.insert(key, value);
51 self
52 }
53}
54
55#[cfg(feature = "client")]
56impl Encodable for CreatePartitionsAssignment {
57 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
58 if version >= 2 {
59 types::CompactArray(types::Int32).encode(buf, &self.broker_ids)?;
60 } else {
61 types::Array(types::Int32).encode(buf, &self.broker_ids)?;
62 }
63 if version >= 2 {
64 let num_tagged_fields = self.unknown_tagged_fields.len();
65 if num_tagged_fields > std::u32::MAX as usize {
66 bail!(
67 "Too many tagged fields to encode ({} fields)",
68 num_tagged_fields
69 );
70 }
71 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
72
73 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
74 }
75 Ok(())
76 }
77 fn compute_size(&self, version: i16) -> Result<usize> {
78 let mut total_size = 0;
79 if version >= 2 {
80 total_size += types::CompactArray(types::Int32).compute_size(&self.broker_ids)?;
81 } else {
82 total_size += types::Array(types::Int32).compute_size(&self.broker_ids)?;
83 }
84 if version >= 2 {
85 let num_tagged_fields = self.unknown_tagged_fields.len();
86 if num_tagged_fields > std::u32::MAX as usize {
87 bail!(
88 "Too many tagged fields to encode ({} fields)",
89 num_tagged_fields
90 );
91 }
92 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
93
94 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
95 }
96 Ok(total_size)
97 }
98}
99
100#[cfg(feature = "broker")]
101impl Decodable for CreatePartitionsAssignment {
102 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
103 let broker_ids = if version >= 2 {
104 types::CompactArray(types::Int32).decode(buf)?
105 } else {
106 types::Array(types::Int32).decode(buf)?
107 };
108 let mut unknown_tagged_fields = BTreeMap::new();
109 if version >= 2 {
110 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
111 for _ in 0..num_tagged_fields {
112 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
113 let size: u32 = types::UnsignedVarInt.decode(buf)?;
114 let unknown_value = buf.try_get_bytes(size as usize)?;
115 unknown_tagged_fields.insert(tag as i32, unknown_value);
116 }
117 }
118 Ok(Self {
119 broker_ids,
120 unknown_tagged_fields,
121 })
122 }
123}
124
125impl Default for CreatePartitionsAssignment {
126 fn default() -> Self {
127 Self {
128 broker_ids: Default::default(),
129 unknown_tagged_fields: BTreeMap::new(),
130 }
131 }
132}
133
134impl Message for CreatePartitionsAssignment {
135 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
136 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
137}
138
139#[non_exhaustive]
141#[derive(Debug, Clone, PartialEq)]
142pub struct CreatePartitionsRequest {
143 pub topics: Vec<CreatePartitionsTopic>,
147
148 pub timeout_ms: i32,
152
153 pub validate_only: bool,
157
158 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
160}
161
162impl CreatePartitionsRequest {
163 pub fn with_topics(mut self, value: Vec<CreatePartitionsTopic>) -> Self {
169 self.topics = value;
170 self
171 }
172 pub fn with_timeout_ms(mut self, value: i32) -> Self {
178 self.timeout_ms = value;
179 self
180 }
181 pub fn with_validate_only(mut self, value: bool) -> Self {
187 self.validate_only = value;
188 self
189 }
190 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
192 self.unknown_tagged_fields = value;
193 self
194 }
195 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
197 self.unknown_tagged_fields.insert(key, value);
198 self
199 }
200}
201
202#[cfg(feature = "client")]
203impl Encodable for CreatePartitionsRequest {
204 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
205 if version >= 2 {
206 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
207 } else {
208 types::Array(types::Struct { version }).encode(buf, &self.topics)?;
209 }
210 types::Int32.encode(buf, &self.timeout_ms)?;
211 types::Boolean.encode(buf, &self.validate_only)?;
212 if version >= 2 {
213 let num_tagged_fields = self.unknown_tagged_fields.len();
214 if num_tagged_fields > std::u32::MAX as usize {
215 bail!(
216 "Too many tagged fields to encode ({} fields)",
217 num_tagged_fields
218 );
219 }
220 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
221
222 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
223 }
224 Ok(())
225 }
226 fn compute_size(&self, version: i16) -> Result<usize> {
227 let mut total_size = 0;
228 if version >= 2 {
229 total_size +=
230 types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
231 } else {
232 total_size += types::Array(types::Struct { version }).compute_size(&self.topics)?;
233 }
234 total_size += types::Int32.compute_size(&self.timeout_ms)?;
235 total_size += types::Boolean.compute_size(&self.validate_only)?;
236 if version >= 2 {
237 let num_tagged_fields = self.unknown_tagged_fields.len();
238 if num_tagged_fields > std::u32::MAX as usize {
239 bail!(
240 "Too many tagged fields to encode ({} fields)",
241 num_tagged_fields
242 );
243 }
244 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
245
246 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
247 }
248 Ok(total_size)
249 }
250}
251
252#[cfg(feature = "broker")]
253impl Decodable for CreatePartitionsRequest {
254 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
255 let topics = if version >= 2 {
256 types::CompactArray(types::Struct { version }).decode(buf)?
257 } else {
258 types::Array(types::Struct { version }).decode(buf)?
259 };
260 let timeout_ms = types::Int32.decode(buf)?;
261 let validate_only = types::Boolean.decode(buf)?;
262 let mut unknown_tagged_fields = BTreeMap::new();
263 if version >= 2 {
264 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
265 for _ in 0..num_tagged_fields {
266 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
267 let size: u32 = types::UnsignedVarInt.decode(buf)?;
268 let unknown_value = buf.try_get_bytes(size as usize)?;
269 unknown_tagged_fields.insert(tag as i32, unknown_value);
270 }
271 }
272 Ok(Self {
273 topics,
274 timeout_ms,
275 validate_only,
276 unknown_tagged_fields,
277 })
278 }
279}
280
281impl Default for CreatePartitionsRequest {
282 fn default() -> Self {
283 Self {
284 topics: Default::default(),
285 timeout_ms: 0,
286 validate_only: false,
287 unknown_tagged_fields: BTreeMap::new(),
288 }
289 }
290}
291
292impl Message for CreatePartitionsRequest {
293 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
294 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
295}
296
297#[non_exhaustive]
299#[derive(Debug, Clone, PartialEq)]
300pub struct CreatePartitionsTopic {
301 pub name: super::TopicName,
305
306 pub count: i32,
310
311 pub assignments: Option<Vec<CreatePartitionsAssignment>>,
315
316 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
318}
319
320impl CreatePartitionsTopic {
321 pub fn with_name(mut self, value: super::TopicName) -> Self {
327 self.name = value;
328 self
329 }
330 pub fn with_count(mut self, value: i32) -> Self {
336 self.count = value;
337 self
338 }
339 pub fn with_assignments(mut self, value: Option<Vec<CreatePartitionsAssignment>>) -> Self {
345 self.assignments = value;
346 self
347 }
348 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
350 self.unknown_tagged_fields = value;
351 self
352 }
353 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
355 self.unknown_tagged_fields.insert(key, value);
356 self
357 }
358}
359
360#[cfg(feature = "client")]
361impl Encodable for CreatePartitionsTopic {
362 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
363 if version >= 2 {
364 types::CompactString.encode(buf, &self.name)?;
365 } else {
366 types::String.encode(buf, &self.name)?;
367 }
368 types::Int32.encode(buf, &self.count)?;
369 if version >= 2 {
370 types::CompactArray(types::Struct { version }).encode(buf, &self.assignments)?;
371 } else {
372 types::Array(types::Struct { version }).encode(buf, &self.assignments)?;
373 }
374 if version >= 2 {
375 let num_tagged_fields = self.unknown_tagged_fields.len();
376 if num_tagged_fields > std::u32::MAX as usize {
377 bail!(
378 "Too many tagged fields to encode ({} fields)",
379 num_tagged_fields
380 );
381 }
382 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
383
384 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
385 }
386 Ok(())
387 }
388 fn compute_size(&self, version: i16) -> Result<usize> {
389 let mut total_size = 0;
390 if version >= 2 {
391 total_size += types::CompactString.compute_size(&self.name)?;
392 } else {
393 total_size += types::String.compute_size(&self.name)?;
394 }
395 total_size += types::Int32.compute_size(&self.count)?;
396 if version >= 2 {
397 total_size +=
398 types::CompactArray(types::Struct { version }).compute_size(&self.assignments)?;
399 } else {
400 total_size +=
401 types::Array(types::Struct { version }).compute_size(&self.assignments)?;
402 }
403 if version >= 2 {
404 let num_tagged_fields = self.unknown_tagged_fields.len();
405 if num_tagged_fields > std::u32::MAX as usize {
406 bail!(
407 "Too many tagged fields to encode ({} fields)",
408 num_tagged_fields
409 );
410 }
411 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
412
413 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
414 }
415 Ok(total_size)
416 }
417}
418
419#[cfg(feature = "broker")]
420impl Decodable for CreatePartitionsTopic {
421 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
422 let name = if version >= 2 {
423 types::CompactString.decode(buf)?
424 } else {
425 types::String.decode(buf)?
426 };
427 let count = types::Int32.decode(buf)?;
428 let assignments = if version >= 2 {
429 types::CompactArray(types::Struct { version }).decode(buf)?
430 } else {
431 types::Array(types::Struct { version }).decode(buf)?
432 };
433 let mut unknown_tagged_fields = BTreeMap::new();
434 if version >= 2 {
435 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
436 for _ in 0..num_tagged_fields {
437 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
438 let size: u32 = types::UnsignedVarInt.decode(buf)?;
439 let unknown_value = buf.try_get_bytes(size as usize)?;
440 unknown_tagged_fields.insert(tag as i32, unknown_value);
441 }
442 }
443 Ok(Self {
444 name,
445 count,
446 assignments,
447 unknown_tagged_fields,
448 })
449 }
450}
451
452impl Default for CreatePartitionsTopic {
453 fn default() -> Self {
454 Self {
455 name: Default::default(),
456 count: 0,
457 assignments: Some(Default::default()),
458 unknown_tagged_fields: BTreeMap::new(),
459 }
460 }
461}
462
463impl Message for CreatePartitionsTopic {
464 const VERSIONS: VersionRange = VersionRange { min: 0, max: 3 };
465 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
466}
467
468impl HeaderVersion for CreatePartitionsRequest {
469 fn header_version(version: i16) -> i16 {
470 if version >= 2 {
471 2
472 } else {
473 1
474 }
475 }
476}