1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionData {
24 pub partition_index: i32,
28
29 pub candidate_epoch: i32,
33
34 pub candidate_id: super::BrokerId,
38
39 pub last_offset_epoch: i32,
43
44 pub last_offset: i64,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl PartitionData {
54 pub fn with_partition_index(mut self, value: i32) -> Self {
60 self.partition_index = value;
61 self
62 }
63 pub fn with_candidate_epoch(mut self, value: i32) -> Self {
69 self.candidate_epoch = value;
70 self
71 }
72 pub fn with_candidate_id(mut self, value: super::BrokerId) -> Self {
78 self.candidate_id = value;
79 self
80 }
81 pub fn with_last_offset_epoch(mut self, value: i32) -> Self {
87 self.last_offset_epoch = value;
88 self
89 }
90 pub fn with_last_offset(mut self, value: i64) -> Self {
96 self.last_offset = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for PartitionData {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 types::Int32.encode(buf, &self.partition_index)?;
115 types::Int32.encode(buf, &self.candidate_epoch)?;
116 types::Int32.encode(buf, &self.candidate_id)?;
117 types::Int32.encode(buf, &self.last_offset_epoch)?;
118 types::Int64.encode(buf, &self.last_offset)?;
119 let num_tagged_fields = self.unknown_tagged_fields.len();
120 if num_tagged_fields > std::u32::MAX as usize {
121 bail!(
122 "Too many tagged fields to encode ({} fields)",
123 num_tagged_fields
124 );
125 }
126 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
127
128 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
129 Ok(())
130 }
131 fn compute_size(&self, version: i16) -> Result<usize> {
132 let mut total_size = 0;
133 total_size += types::Int32.compute_size(&self.partition_index)?;
134 total_size += types::Int32.compute_size(&self.candidate_epoch)?;
135 total_size += types::Int32.compute_size(&self.candidate_id)?;
136 total_size += types::Int32.compute_size(&self.last_offset_epoch)?;
137 total_size += types::Int64.compute_size(&self.last_offset)?;
138 let num_tagged_fields = self.unknown_tagged_fields.len();
139 if num_tagged_fields > std::u32::MAX as usize {
140 bail!(
141 "Too many tagged fields to encode ({} fields)",
142 num_tagged_fields
143 );
144 }
145 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
146
147 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
148 Ok(total_size)
149 }
150}
151
152#[cfg(feature = "broker")]
153impl Decodable for PartitionData {
154 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
155 let partition_index = types::Int32.decode(buf)?;
156 let candidate_epoch = types::Int32.decode(buf)?;
157 let candidate_id = types::Int32.decode(buf)?;
158 let last_offset_epoch = types::Int32.decode(buf)?;
159 let last_offset = types::Int64.decode(buf)?;
160 let mut unknown_tagged_fields = BTreeMap::new();
161 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
162 for _ in 0..num_tagged_fields {
163 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
164 let size: u32 = types::UnsignedVarInt.decode(buf)?;
165 let unknown_value = buf.try_get_bytes(size as usize)?;
166 unknown_tagged_fields.insert(tag as i32, unknown_value);
167 }
168 Ok(Self {
169 partition_index,
170 candidate_epoch,
171 candidate_id,
172 last_offset_epoch,
173 last_offset,
174 unknown_tagged_fields,
175 })
176 }
177}
178
179impl Default for PartitionData {
180 fn default() -> Self {
181 Self {
182 partition_index: 0,
183 candidate_epoch: 0,
184 candidate_id: (0).into(),
185 last_offset_epoch: 0,
186 last_offset: 0,
187 unknown_tagged_fields: BTreeMap::new(),
188 }
189 }
190}
191
192impl Message for PartitionData {
193 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
194 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
195}
196
197#[non_exhaustive]
199#[derive(Debug, Clone, PartialEq)]
200pub struct TopicData {
201 pub topic_name: super::TopicName,
205
206 pub partitions: Vec<PartitionData>,
210
211 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
213}
214
215impl TopicData {
216 pub fn with_topic_name(mut self, value: super::TopicName) -> Self {
222 self.topic_name = value;
223 self
224 }
225 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
231 self.partitions = value;
232 self
233 }
234 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
236 self.unknown_tagged_fields = value;
237 self
238 }
239 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
241 self.unknown_tagged_fields.insert(key, value);
242 self
243 }
244}
245
246#[cfg(feature = "client")]
247impl Encodable for TopicData {
248 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
249 types::CompactString.encode(buf, &self.topic_name)?;
250 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
251 let num_tagged_fields = self.unknown_tagged_fields.len();
252 if num_tagged_fields > std::u32::MAX as usize {
253 bail!(
254 "Too many tagged fields to encode ({} fields)",
255 num_tagged_fields
256 );
257 }
258 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
259
260 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
261 Ok(())
262 }
263 fn compute_size(&self, version: i16) -> Result<usize> {
264 let mut total_size = 0;
265 total_size += types::CompactString.compute_size(&self.topic_name)?;
266 total_size +=
267 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
268 let num_tagged_fields = self.unknown_tagged_fields.len();
269 if num_tagged_fields > std::u32::MAX as usize {
270 bail!(
271 "Too many tagged fields to encode ({} fields)",
272 num_tagged_fields
273 );
274 }
275 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
276
277 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
278 Ok(total_size)
279 }
280}
281
282#[cfg(feature = "broker")]
283impl Decodable for TopicData {
284 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
285 let topic_name = types::CompactString.decode(buf)?;
286 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
287 let mut unknown_tagged_fields = BTreeMap::new();
288 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
289 for _ in 0..num_tagged_fields {
290 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
291 let size: u32 = types::UnsignedVarInt.decode(buf)?;
292 let unknown_value = buf.try_get_bytes(size as usize)?;
293 unknown_tagged_fields.insert(tag as i32, unknown_value);
294 }
295 Ok(Self {
296 topic_name,
297 partitions,
298 unknown_tagged_fields,
299 })
300 }
301}
302
303impl Default for TopicData {
304 fn default() -> Self {
305 Self {
306 topic_name: Default::default(),
307 partitions: Default::default(),
308 unknown_tagged_fields: BTreeMap::new(),
309 }
310 }
311}
312
313impl Message for TopicData {
314 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
315 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
316}
317
318#[non_exhaustive]
320#[derive(Debug, Clone, PartialEq)]
321pub struct VoteRequest {
322 pub cluster_id: Option<StrBytes>,
326
327 pub topics: Vec<TopicData>,
331
332 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
334}
335
336impl VoteRequest {
337 pub fn with_cluster_id(mut self, value: Option<StrBytes>) -> Self {
343 self.cluster_id = value;
344 self
345 }
346 pub fn with_topics(mut self, value: Vec<TopicData>) -> Self {
352 self.topics = value;
353 self
354 }
355 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
357 self.unknown_tagged_fields = value;
358 self
359 }
360 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
362 self.unknown_tagged_fields.insert(key, value);
363 self
364 }
365}
366
367#[cfg(feature = "client")]
368impl Encodable for VoteRequest {
369 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
370 types::CompactString.encode(buf, &self.cluster_id)?;
371 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
372 let num_tagged_fields = self.unknown_tagged_fields.len();
373 if num_tagged_fields > std::u32::MAX as usize {
374 bail!(
375 "Too many tagged fields to encode ({} fields)",
376 num_tagged_fields
377 );
378 }
379 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
380
381 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
382 Ok(())
383 }
384 fn compute_size(&self, version: i16) -> Result<usize> {
385 let mut total_size = 0;
386 total_size += types::CompactString.compute_size(&self.cluster_id)?;
387 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
388 let num_tagged_fields = self.unknown_tagged_fields.len();
389 if num_tagged_fields > std::u32::MAX as usize {
390 bail!(
391 "Too many tagged fields to encode ({} fields)",
392 num_tagged_fields
393 );
394 }
395 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
396
397 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
398 Ok(total_size)
399 }
400}
401
402#[cfg(feature = "broker")]
403impl Decodable for VoteRequest {
404 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
405 let cluster_id = types::CompactString.decode(buf)?;
406 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
407 let mut unknown_tagged_fields = BTreeMap::new();
408 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
409 for _ in 0..num_tagged_fields {
410 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
411 let size: u32 = types::UnsignedVarInt.decode(buf)?;
412 let unknown_value = buf.try_get_bytes(size as usize)?;
413 unknown_tagged_fields.insert(tag as i32, unknown_value);
414 }
415 Ok(Self {
416 cluster_id,
417 topics,
418 unknown_tagged_fields,
419 })
420 }
421}
422
423impl Default for VoteRequest {
424 fn default() -> Self {
425 Self {
426 cluster_id: None,
427 topics: Default::default(),
428 unknown_tagged_fields: BTreeMap::new(),
429 }
430 }
431}
432
433impl Message for VoteRequest {
434 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
435 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
436}
437
438impl HeaderVersion for VoteRequest {
439 fn header_version(version: i16) -> i16 {
440 2
441 }
442}