1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionData {
24 pub partition_index: i32,
28
29 pub replica_epoch: i32,
33
34 pub replica_id: super::BrokerId,
38
39 pub replica_directory_id: Uuid,
43
44 pub voter_directory_id: Uuid,
48
49 pub last_offset_epoch: i32,
53
54 pub last_offset: i64,
58
59 pub pre_vote: bool,
63
64 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
66}
67
68impl PartitionData {
69 pub fn with_partition_index(mut self, value: i32) -> Self {
75 self.partition_index = value;
76 self
77 }
78 pub fn with_replica_epoch(mut self, value: i32) -> Self {
84 self.replica_epoch = value;
85 self
86 }
87 pub fn with_replica_id(mut self, value: super::BrokerId) -> Self {
93 self.replica_id = value;
94 self
95 }
96 pub fn with_replica_directory_id(mut self, value: Uuid) -> Self {
102 self.replica_directory_id = value;
103 self
104 }
105 pub fn with_voter_directory_id(mut self, value: Uuid) -> Self {
111 self.voter_directory_id = value;
112 self
113 }
114 pub fn with_last_offset_epoch(mut self, value: i32) -> Self {
120 self.last_offset_epoch = value;
121 self
122 }
123 pub fn with_last_offset(mut self, value: i64) -> Self {
129 self.last_offset = value;
130 self
131 }
132 pub fn with_pre_vote(mut self, value: bool) -> Self {
138 self.pre_vote = value;
139 self
140 }
141 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
143 self.unknown_tagged_fields = value;
144 self
145 }
146 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
148 self.unknown_tagged_fields.insert(key, value);
149 self
150 }
151}
152
153#[cfg(feature = "client")]
154impl Encodable for PartitionData {
155 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
156 if version < 0 || version > 2 {
157 bail!("specified version not supported by this message type");
158 }
159 types::Int32.encode(buf, &self.partition_index)?;
160 types::Int32.encode(buf, &self.replica_epoch)?;
161 types::Int32.encode(buf, &self.replica_id)?;
162 if version >= 1 {
163 types::Uuid.encode(buf, &self.replica_directory_id)?;
164 }
165 if version >= 1 {
166 types::Uuid.encode(buf, &self.voter_directory_id)?;
167 }
168 types::Int32.encode(buf, &self.last_offset_epoch)?;
169 types::Int64.encode(buf, &self.last_offset)?;
170 if version >= 2 {
171 types::Boolean.encode(buf, &self.pre_vote)?;
172 } else {
173 if self.pre_vote {
174 bail!("A field is set that is not available on the selected protocol version");
175 }
176 }
177 let num_tagged_fields = self.unknown_tagged_fields.len();
178 if num_tagged_fields > std::u32::MAX as usize {
179 bail!(
180 "Too many tagged fields to encode ({} fields)",
181 num_tagged_fields
182 );
183 }
184 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
185
186 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
187 Ok(())
188 }
189 fn compute_size(&self, version: i16) -> Result<usize> {
190 let mut total_size = 0;
191 total_size += types::Int32.compute_size(&self.partition_index)?;
192 total_size += types::Int32.compute_size(&self.replica_epoch)?;
193 total_size += types::Int32.compute_size(&self.replica_id)?;
194 if version >= 1 {
195 total_size += types::Uuid.compute_size(&self.replica_directory_id)?;
196 }
197 if version >= 1 {
198 total_size += types::Uuid.compute_size(&self.voter_directory_id)?;
199 }
200 total_size += types::Int32.compute_size(&self.last_offset_epoch)?;
201 total_size += types::Int64.compute_size(&self.last_offset)?;
202 if version >= 2 {
203 total_size += types::Boolean.compute_size(&self.pre_vote)?;
204 } else {
205 if self.pre_vote {
206 bail!("A field is set that is not available on the selected protocol version");
207 }
208 }
209 let num_tagged_fields = self.unknown_tagged_fields.len();
210 if num_tagged_fields > std::u32::MAX as usize {
211 bail!(
212 "Too many tagged fields to encode ({} fields)",
213 num_tagged_fields
214 );
215 }
216 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
217
218 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
219 Ok(total_size)
220 }
221}
222
223#[cfg(feature = "broker")]
224impl Decodable for PartitionData {
225 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
226 if version < 0 || version > 2 {
227 bail!("specified version not supported by this message type");
228 }
229 let partition_index = types::Int32.decode(buf)?;
230 let replica_epoch = types::Int32.decode(buf)?;
231 let replica_id = types::Int32.decode(buf)?;
232 let replica_directory_id = if version >= 1 {
233 types::Uuid.decode(buf)?
234 } else {
235 Uuid::nil()
236 };
237 let voter_directory_id = if version >= 1 {
238 types::Uuid.decode(buf)?
239 } else {
240 Uuid::nil()
241 };
242 let last_offset_epoch = types::Int32.decode(buf)?;
243 let last_offset = types::Int64.decode(buf)?;
244 let pre_vote = if version >= 2 {
245 types::Boolean.decode(buf)?
246 } else {
247 false
248 };
249 let mut unknown_tagged_fields = BTreeMap::new();
250 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
251 for _ in 0..num_tagged_fields {
252 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
253 let size: u32 = types::UnsignedVarInt.decode(buf)?;
254 let unknown_value = buf.try_get_bytes(size as usize)?;
255 unknown_tagged_fields.insert(tag as i32, unknown_value);
256 }
257 Ok(Self {
258 partition_index,
259 replica_epoch,
260 replica_id,
261 replica_directory_id,
262 voter_directory_id,
263 last_offset_epoch,
264 last_offset,
265 pre_vote,
266 unknown_tagged_fields,
267 })
268 }
269}
270
271impl Default for PartitionData {
272 fn default() -> Self {
273 Self {
274 partition_index: 0,
275 replica_epoch: 0,
276 replica_id: (0).into(),
277 replica_directory_id: Uuid::nil(),
278 voter_directory_id: Uuid::nil(),
279 last_offset_epoch: 0,
280 last_offset: 0,
281 pre_vote: false,
282 unknown_tagged_fields: BTreeMap::new(),
283 }
284 }
285}
286
287impl Message for PartitionData {
288 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
289 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
290}
291
292#[non_exhaustive]
294#[derive(Debug, Clone, PartialEq)]
295pub struct TopicData {
296 pub topic_name: super::TopicName,
300
301 pub partitions: Vec<PartitionData>,
305
306 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
308}
309
310impl TopicData {
311 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
317 self.topic_name = value;
318 self
319 }
320 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
326 self.partitions = value;
327 self
328 }
329 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
331 self.unknown_tagged_fields = value;
332 self
333 }
334 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
336 self.unknown_tagged_fields.insert(key, value);
337 self
338 }
339}
340
341#[cfg(feature = "client")]
342impl Encodable for TopicData {
343 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
344 if version < 0 || version > 2 {
345 bail!("specified version not supported by this message type");
346 }
347 types::CompactString.encode(buf, &self.topic_name)?;
348 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
349 let num_tagged_fields = self.unknown_tagged_fields.len();
350 if num_tagged_fields > std::u32::MAX as usize {
351 bail!(
352 "Too many tagged fields to encode ({} fields)",
353 num_tagged_fields
354 );
355 }
356 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
357
358 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
359 Ok(())
360 }
361 fn compute_size(&self, version: i16) -> Result<usize> {
362 let mut total_size = 0;
363 total_size += types::CompactString.compute_size(&self.topic_name)?;
364 total_size +=
365 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
366 let num_tagged_fields = self.unknown_tagged_fields.len();
367 if num_tagged_fields > std::u32::MAX as usize {
368 bail!(
369 "Too many tagged fields to encode ({} fields)",
370 num_tagged_fields
371 );
372 }
373 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
374
375 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
376 Ok(total_size)
377 }
378}
379
380#[cfg(feature = "broker")]
381impl Decodable for TopicData {
382 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
383 if version < 0 || version > 2 {
384 bail!("specified version not supported by this message type");
385 }
386 let topic_name = types::CompactString.decode(buf)?;
387 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
388 let mut unknown_tagged_fields = BTreeMap::new();
389 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
390 for _ in 0..num_tagged_fields {
391 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
392 let size: u32 = types::UnsignedVarInt.decode(buf)?;
393 let unknown_value = buf.try_get_bytes(size as usize)?;
394 unknown_tagged_fields.insert(tag as i32, unknown_value);
395 }
396 Ok(Self {
397 topic_name,
398 partitions,
399 unknown_tagged_fields,
400 })
401 }
402}
403
404impl Default for TopicData {
405 fn default() -> Self {
406 Self {
407 topic_name: Default::default(),
408 partitions: Default::default(),
409 unknown_tagged_fields: BTreeMap::new(),
410 }
411 }
412}
413
414impl Message for TopicData {
415 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
416 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
417}
418
419#[non_exhaustive]
421#[derive(Debug, Clone, PartialEq)]
422pub struct VoteRequest {
423 pub cluster_id: Option<StrBytes>,
427
428 pub voter_id: super::BrokerId,
432
433 pub topics: Vec<TopicData>,
437
438 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
440}
441
442impl VoteRequest {
443 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
449 self.cluster_id = value;
450 self
451 }
452 pub fn with_voter_id(mut self, value: super::BrokerId) -> Self {
458 self.voter_id = value;
459 self
460 }
461 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
467 self.topics = value;
468 self
469 }
470 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
472 self.unknown_tagged_fields = value;
473 self
474 }
475 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
477 self.unknown_tagged_fields.insert(key, value);
478 self
479 }
480}
481
482#[cfg(feature = "client")]
483impl Encodable for VoteRequest {
484 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
485 if version < 0 || version > 2 {
486 bail!("specified version not supported by this message type");
487 }
488 types::CompactString.encode(buf, &self.cluster_id)?;
489 if version >= 1 {
490 types::Int32.encode(buf, &self.voter_id)?;
491 }
492 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
493 let num_tagged_fields = self.unknown_tagged_fields.len();
494 if num_tagged_fields > std::u32::MAX as usize {
495 bail!(
496 "Too many tagged fields to encode ({} fields)",
497 num_tagged_fields
498 );
499 }
500 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
501
502 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
503 Ok(())
504 }
505 fn compute_size(&self, version: i16) -> Result<usize> {
506 let mut total_size = 0;
507 total_size += types::CompactString.compute_size(&self.cluster_id)?;
508 if version >= 1 {
509 total_size += types::Int32.compute_size(&self.voter_id)?;
510 }
511 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
512 let num_tagged_fields = self.unknown_tagged_fields.len();
513 if num_tagged_fields > std::u32::MAX as usize {
514 bail!(
515 "Too many tagged fields to encode ({} fields)",
516 num_tagged_fields
517 );
518 }
519 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
520
521 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
522 Ok(total_size)
523 }
524}
525
526#[cfg(feature = "broker")]
527impl Decodable for VoteRequest {
528 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
529 if version < 0 || version > 2 {
530 bail!("specified version not supported by this message type");
531 }
532 let cluster_id = types::CompactString.decode(buf)?;
533 let voter_id = if version >= 1 {
534 types::Int32.decode(buf)?
535 } else {
536 (-1).into()
537 };
538 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
539 let mut unknown_tagged_fields = BTreeMap::new();
540 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
541 for _ in 0..num_tagged_fields {
542 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
543 let size: u32 = types::UnsignedVarInt.decode(buf)?;
544 let unknown_value = buf.try_get_bytes(size as usize)?;
545 unknown_tagged_fields.insert(tag as i32, unknown_value);
546 }
547 Ok(Self {
548 cluster_id,
549 voter_id,
550 topics,
551 unknown_tagged_fields,
552 })
553 }
554}
555
556impl Default for VoteRequest {
557 fn default() -> Self {
558 Self {
559 cluster_id: None,
560 voter_id: (-1).into(),
561 topics: Default::default(),
562 unknown_tagged_fields: BTreeMap::new(),
563 }
564 }
565}
566
567impl Message for VoteRequest {
568 const VERSIONS: VersionRange = VersionRange { min: 0, max: 2 };
569 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
570}
571
572impl HeaderVersion for VoteRequest {
573 fn header_version(version: i16) -> i16 {
574 2
575 }
576}