1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionData {
24 pub partition: i32,
28
29 pub state_epoch: i32,
33
34 pub leader_epoch: i32,
38
39 pub start_offset: i64,
43
44 pub state_batches: Vec<StateBatch>,
48
49 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
51}
52
53impl PartitionData {
54 pub fn with_partition(mut self, value: i32) -> Self {
60 self.partition = value;
61 self
62 }
63 pub fn with_state_epoch(mut self, value: i32) -> Self {
69 self.state_epoch = value;
70 self
71 }
72 pub fn with_leader_epoch(mut self, value: i32) -> Self {
78 self.leader_epoch = value;
79 self
80 }
81 pub fn with_start_offset(mut self, value: i64) -> Self {
87 self.start_offset = value;
88 self
89 }
90 pub fn with_state_batches(mut self, value: Vec<StateBatch>) -> Self {
96 self.state_batches = value;
97 self
98 }
99 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
101 self.unknown_tagged_fields = value;
102 self
103 }
104 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
106 self.unknown_tagged_fields.insert(key, value);
107 self
108 }
109}
110
111#[cfg(feature = "client")]
112impl Encodable for PartitionData {
113 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
114 if version != 0 {
115 bail!("specified version not supported by this message type");
116 }
117 types::Int32.encode(buf, &self.partition)?;
118 types::Int32.encode(buf, &self.state_epoch)?;
119 types::Int32.encode(buf, &self.leader_epoch)?;
120 types::Int64.encode(buf, &self.start_offset)?;
121 types::CompactArray(types::Struct { version }).encode(buf, &self.state_batches)?;
122 let num_tagged_fields = self.unknown_tagged_fields.len();
123 if num_tagged_fields > std::u32::MAX as usize {
124 bail!(
125 "Too many tagged fields to encode ({} fields)",
126 num_tagged_fields
127 );
128 }
129 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
130
131 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
132 Ok(())
133 }
134 fn compute_size(&self, version: i16) -> Result<usize> {
135 let mut total_size = 0;
136 total_size += types::Int32.compute_size(&self.partition)?;
137 total_size += types::Int32.compute_size(&self.state_epoch)?;
138 total_size += types::Int32.compute_size(&self.leader_epoch)?;
139 total_size += types::Int64.compute_size(&self.start_offset)?;
140 total_size +=
141 types::CompactArray(types::Struct { version }).compute_size(&self.state_batches)?;
142 let num_tagged_fields = self.unknown_tagged_fields.len();
143 if num_tagged_fields > std::u32::MAX as usize {
144 bail!(
145 "Too many tagged fields to encode ({} fields)",
146 num_tagged_fields
147 );
148 }
149 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
150
151 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
152 Ok(total_size)
153 }
154}
155
156#[cfg(feature = "broker")]
157impl Decodable for PartitionData {
158 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
159 if version != 0 {
160 bail!("specified version not supported by this message type");
161 }
162 let partition = types::Int32.decode(buf)?;
163 let state_epoch = types::Int32.decode(buf)?;
164 let leader_epoch = types::Int32.decode(buf)?;
165 let start_offset = types::Int64.decode(buf)?;
166 let state_batches = types::CompactArray(types::Struct { version }).decode(buf)?;
167 let mut unknown_tagged_fields = BTreeMap::new();
168 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
169 for _ in 0..num_tagged_fields {
170 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
171 let size: u32 = types::UnsignedVarInt.decode(buf)?;
172 let unknown_value = buf.try_get_bytes(size as usize)?;
173 unknown_tagged_fields.insert(tag as i32, unknown_value);
174 }
175 Ok(Self {
176 partition,
177 state_epoch,
178 leader_epoch,
179 start_offset,
180 state_batches,
181 unknown_tagged_fields,
182 })
183 }
184}
185
186impl Default for PartitionData {
187 fn default() -> Self {
188 Self {
189 partition: 0,
190 state_epoch: 0,
191 leader_epoch: 0,
192 start_offset: 0,
193 state_batches: Default::default(),
194 unknown_tagged_fields: BTreeMap::new(),
195 }
196 }
197}
198
199impl Message for PartitionData {
200 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
201 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
202}
203
204#[non_exhaustive]
206#[derive(Debug, Clone, PartialEq)]
207pub struct StateBatch {
208 pub first_offset: i64,
212
213 pub last_offset: i64,
217
218 pub delivery_state: i8,
222
223 pub delivery_count: i16,
227
228 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
230}
231
232impl StateBatch {
233 pub fn with_first_offset(mut self, value: i64) -> Self {
239 self.first_offset = value;
240 self
241 }
242 pub fn with_last_offset(mut self, value: i64) -> Self {
248 self.last_offset = value;
249 self
250 }
251 pub fn with_delivery_state(mut self, value: i8) -> Self {
257 self.delivery_state = value;
258 self
259 }
260 pub fn with_delivery_count(mut self, value: i16) -> Self {
266 self.delivery_count = value;
267 self
268 }
269 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
271 self.unknown_tagged_fields = value;
272 self
273 }
274 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
276 self.unknown_tagged_fields.insert(key, value);
277 self
278 }
279}
280
281#[cfg(feature = "client")]
282impl Encodable for StateBatch {
283 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
284 if version != 0 {
285 bail!("specified version not supported by this message type");
286 }
287 types::Int64.encode(buf, &self.first_offset)?;
288 types::Int64.encode(buf, &self.last_offset)?;
289 types::Int8.encode(buf, &self.delivery_state)?;
290 types::Int16.encode(buf, &self.delivery_count)?;
291 let num_tagged_fields = self.unknown_tagged_fields.len();
292 if num_tagged_fields > std::u32::MAX as usize {
293 bail!(
294 "Too many tagged fields to encode ({} fields)",
295 num_tagged_fields
296 );
297 }
298 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
299
300 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
301 Ok(())
302 }
303 fn compute_size(&self, version: i16) -> Result<usize> {
304 let mut total_size = 0;
305 total_size += types::Int64.compute_size(&self.first_offset)?;
306 total_size += types::Int64.compute_size(&self.last_offset)?;
307 total_size += types::Int8.compute_size(&self.delivery_state)?;
308 total_size += types::Int16.compute_size(&self.delivery_count)?;
309 let num_tagged_fields = self.unknown_tagged_fields.len();
310 if num_tagged_fields > std::u32::MAX as usize {
311 bail!(
312 "Too many tagged fields to encode ({} fields)",
313 num_tagged_fields
314 );
315 }
316 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
317
318 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
319 Ok(total_size)
320 }
321}
322
323#[cfg(feature = "broker")]
324impl Decodable for StateBatch {
325 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
326 if version != 0 {
327 bail!("specified version not supported by this message type");
328 }
329 let first_offset = types::Int64.decode(buf)?;
330 let last_offset = types::Int64.decode(buf)?;
331 let delivery_state = types::Int8.decode(buf)?;
332 let delivery_count = types::Int16.decode(buf)?;
333 let mut unknown_tagged_fields = BTreeMap::new();
334 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
335 for _ in 0..num_tagged_fields {
336 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
337 let size: u32 = types::UnsignedVarInt.decode(buf)?;
338 let unknown_value = buf.try_get_bytes(size as usize)?;
339 unknown_tagged_fields.insert(tag as i32, unknown_value);
340 }
341 Ok(Self {
342 first_offset,
343 last_offset,
344 delivery_state,
345 delivery_count,
346 unknown_tagged_fields,
347 })
348 }
349}
350
351impl Default for StateBatch {
352 fn default() -> Self {
353 Self {
354 first_offset: 0,
355 last_offset: 0,
356 delivery_state: 0,
357 delivery_count: 0,
358 unknown_tagged_fields: BTreeMap::new(),
359 }
360 }
361}
362
363impl Message for StateBatch {
364 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
365 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
366}
367
368#[non_exhaustive]
370#[derive(Debug, Clone, PartialEq)]
371pub struct WriteShareGroupStateRequest {
372 pub group_id: StrBytes,
376
377 pub topics: Vec<WriteStateData>,
381
382 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
384}
385
386impl WriteShareGroupStateRequest {
387 pub fn with_group_id(mut self, value: StrBytes) -> Self {
393 self.group_id = value;
394 self
395 }
396 pub fn with_topics(mut self, value: Vec<WriteStateData>) -> Self {
402 self.topics = value;
403 self
404 }
405 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
407 self.unknown_tagged_fields = value;
408 self
409 }
410 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
412 self.unknown_tagged_fields.insert(key, value);
413 self
414 }
415}
416
417#[cfg(feature = "client")]
418impl Encodable for WriteShareGroupStateRequest {
419 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
420 if version != 0 {
421 bail!("specified version not supported by this message type");
422 }
423 types::CompactString.encode(buf, &self.group_id)?;
424 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
425 let num_tagged_fields = self.unknown_tagged_fields.len();
426 if num_tagged_fields > std::u32::MAX as usize {
427 bail!(
428 "Too many tagged fields to encode ({} fields)",
429 num_tagged_fields
430 );
431 }
432 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
433
434 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
435 Ok(())
436 }
437 fn compute_size(&self, version: i16) -> Result<usize> {
438 let mut total_size = 0;
439 total_size += types::CompactString.compute_size(&self.group_id)?;
440 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
441 let num_tagged_fields = self.unknown_tagged_fields.len();
442 if num_tagged_fields > std::u32::MAX as usize {
443 bail!(
444 "Too many tagged fields to encode ({} fields)",
445 num_tagged_fields
446 );
447 }
448 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
449
450 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
451 Ok(total_size)
452 }
453}
454
455#[cfg(feature = "broker")]
456impl Decodable for WriteShareGroupStateRequest {
457 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
458 if version != 0 {
459 bail!("specified version not supported by this message type");
460 }
461 let group_id = types::CompactString.decode(buf)?;
462 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
463 let mut unknown_tagged_fields = BTreeMap::new();
464 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
465 for _ in 0..num_tagged_fields {
466 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
467 let size: u32 = types::UnsignedVarInt.decode(buf)?;
468 let unknown_value = buf.try_get_bytes(size as usize)?;
469 unknown_tagged_fields.insert(tag as i32, unknown_value);
470 }
471 Ok(Self {
472 group_id,
473 topics,
474 unknown_tagged_fields,
475 })
476 }
477}
478
479impl Default for WriteShareGroupStateRequest {
480 fn default() -> Self {
481 Self {
482 group_id: Default::default(),
483 topics: Default::default(),
484 unknown_tagged_fields: BTreeMap::new(),
485 }
486 }
487}
488
489impl Message for WriteShareGroupStateRequest {
490 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
491 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
492}
493
494#[non_exhaustive]
496#[derive(Debug, Clone, PartialEq)]
497pub struct WriteStateData {
498 pub topic_id: Uuid,
502
503 pub partitions: Vec<PartitionData>,
507
508 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
510}
511
512impl WriteStateData {
513 pub fn with_topic_id(mut self, value: Uuid) -> Self {
519 self.topic_id = value;
520 self
521 }
522 pub fn with_partitions(mut self, value: Vec<PartitionData>) -> Self {
528 self.partitions = value;
529 self
530 }
531 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
533 self.unknown_tagged_fields = value;
534 self
535 }
536 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
538 self.unknown_tagged_fields.insert(key, value);
539 self
540 }
541}
542
543#[cfg(feature = "client")]
544impl Encodable for WriteStateData {
545 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
546 if version != 0 {
547 bail!("specified version not supported by this message type");
548 }
549 types::Uuid.encode(buf, &self.topic_id)?;
550 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
551 let num_tagged_fields = self.unknown_tagged_fields.len();
552 if num_tagged_fields > std::u32::MAX as usize {
553 bail!(
554 "Too many tagged fields to encode ({} fields)",
555 num_tagged_fields
556 );
557 }
558 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
559
560 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
561 Ok(())
562 }
563 fn compute_size(&self, version: i16) -> Result<usize> {
564 let mut total_size = 0;
565 total_size += types::Uuid.compute_size(&self.topic_id)?;
566 total_size +=
567 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
568 let num_tagged_fields = self.unknown_tagged_fields.len();
569 if num_tagged_fields > std::u32::MAX as usize {
570 bail!(
571 "Too many tagged fields to encode ({} fields)",
572 num_tagged_fields
573 );
574 }
575 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
576
577 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
578 Ok(total_size)
579 }
580}
581
582#[cfg(feature = "broker")]
583impl Decodable for WriteStateData {
584 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
585 if version != 0 {
586 bail!("specified version not supported by this message type");
587 }
588 let topic_id = types::Uuid.decode(buf)?;
589 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
590 let mut unknown_tagged_fields = BTreeMap::new();
591 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
592 for _ in 0..num_tagged_fields {
593 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
594 let size: u32 = types::UnsignedVarInt.decode(buf)?;
595 let unknown_value = buf.try_get_bytes(size as usize)?;
596 unknown_tagged_fields.insert(tag as i32, unknown_value);
597 }
598 Ok(Self {
599 topic_id,
600 partitions,
601 unknown_tagged_fields,
602 })
603 }
604}
605
606impl Default for WriteStateData {
607 fn default() -> Self {
608 Self {
609 topic_id: Uuid::nil(),
610 partitions: Default::default(),
611 unknown_tagged_fields: BTreeMap::new(),
612 }
613 }
614}
615
616impl Message for WriteStateData {
617 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
618 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
619}
620
621impl HeaderVersion for WriteShareGroupStateRequest {
622 fn header_version(version: i16) -> i16 {
623 2
624 }
625}