1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct PartitionResult {
24 pub partition: i32,
28
29 pub error_code: i16,
33
34 pub error_message: Option<StrBytes>,
38
39 pub state_epoch: i32,
43
44 pub start_offset: i64,
48
49 pub state_batches: Vec<StateBatch>,
53
54 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
56}
57
58impl PartitionResult {
59 pub fn with_partition(mut self, value: i32) -> Self {
65 self.partition = value;
66 self
67 }
68 pub fn with_error_code(mut self, value: i16) -> Self {
74 self.error_code = value;
75 self
76 }
77 pub fn with_error_message(mut self, value: Option<StrBytes>) -> Self {
83 self.error_message = value;
84 self
85 }
86 pub fn with_state_epoch(mut self, value: i32) -> Self {
92 self.state_epoch = value;
93 self
94 }
95 pub fn with_start_offset(mut self, value: i64) -> Self {
101 self.start_offset = value;
102 self
103 }
104 pub fn with_state_batches(mut self, value: Vec<StateBatch>) -> Self {
110 self.state_batches = value;
111 self
112 }
113 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
115 self.unknown_tagged_fields = value;
116 self
117 }
118 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
120 self.unknown_tagged_fields.insert(key, value);
121 self
122 }
123}
124
125#[cfg(feature = "broker")]
126impl Encodable for PartitionResult {
127 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
128 if version != 0 {
129 bail!("specified version not supported by this message type");
130 }
131 types::Int32.encode(buf, &self.partition)?;
132 types::Int16.encode(buf, &self.error_code)?;
133 types::CompactString.encode(buf, &self.error_message)?;
134 types::Int32.encode(buf, &self.state_epoch)?;
135 types::Int64.encode(buf, &self.start_offset)?;
136 types::CompactArray(types::Struct { version }).encode(buf, &self.state_batches)?;
137 let num_tagged_fields = self.unknown_tagged_fields.len();
138 if num_tagged_fields > std::u32::MAX as usize {
139 bail!(
140 "Too many tagged fields to encode ({} fields)",
141 num_tagged_fields
142 );
143 }
144 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
145
146 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
147 Ok(())
148 }
149 fn compute_size(&self, version: i16) -> Result<usize> {
150 let mut total_size = 0;
151 total_size += types::Int32.compute_size(&self.partition)?;
152 total_size += types::Int16.compute_size(&self.error_code)?;
153 total_size += types::CompactString.compute_size(&self.error_message)?;
154 total_size += types::Int32.compute_size(&self.state_epoch)?;
155 total_size += types::Int64.compute_size(&self.start_offset)?;
156 total_size +=
157 types::CompactArray(types::Struct { version }).compute_size(&self.state_batches)?;
158 let num_tagged_fields = self.unknown_tagged_fields.len();
159 if num_tagged_fields > std::u32::MAX as usize {
160 bail!(
161 "Too many tagged fields to encode ({} fields)",
162 num_tagged_fields
163 );
164 }
165 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
166
167 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
168 Ok(total_size)
169 }
170}
171
172#[cfg(feature = "client")]
173impl Decodable for PartitionResult {
174 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
175 if version != 0 {
176 bail!("specified version not supported by this message type");
177 }
178 let partition = types::Int32.decode(buf)?;
179 let error_code = types::Int16.decode(buf)?;
180 let error_message = types::CompactString.decode(buf)?;
181 let state_epoch = types::Int32.decode(buf)?;
182 let start_offset = types::Int64.decode(buf)?;
183 let state_batches = types::CompactArray(types::Struct { version }).decode(buf)?;
184 let mut unknown_tagged_fields = BTreeMap::new();
185 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
186 for _ in 0..num_tagged_fields {
187 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
188 let size: u32 = types::UnsignedVarInt.decode(buf)?;
189 let unknown_value = buf.try_get_bytes(size as usize)?;
190 unknown_tagged_fields.insert(tag as i32, unknown_value);
191 }
192 Ok(Self {
193 partition,
194 error_code,
195 error_message,
196 state_epoch,
197 start_offset,
198 state_batches,
199 unknown_tagged_fields,
200 })
201 }
202}
203
204impl Default for PartitionResult {
205 fn default() -> Self {
206 Self {
207 partition: 0,
208 error_code: 0,
209 error_message: None,
210 state_epoch: 0,
211 start_offset: 0,
212 state_batches: Default::default(),
213 unknown_tagged_fields: BTreeMap::new(),
214 }
215 }
216}
217
218impl Message for PartitionResult {
219 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
220 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
221}
222
223#[non_exhaustive]
225#[derive(Debug, Clone, PartialEq)]
226pub struct ReadShareGroupStateResponse {
227 pub results: Vec<ReadStateResult>,
231
232 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
234}
235
236impl ReadShareGroupStateResponse {
237 pub fn with_results(mut self, value: Vec<ReadStateResult>) -> Self {
243 self.results = value;
244 self
245 }
246 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
248 self.unknown_tagged_fields = value;
249 self
250 }
251 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
253 self.unknown_tagged_fields.insert(key, value);
254 self
255 }
256}
257
258#[cfg(feature = "broker")]
259impl Encodable for ReadShareGroupStateResponse {
260 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
261 if version != 0 {
262 bail!("specified version not supported by this message type");
263 }
264 types::CompactArray(types::Struct { version }).encode(buf, &self.results)?;
265 let num_tagged_fields = self.unknown_tagged_fields.len();
266 if num_tagged_fields > std::u32::MAX as usize {
267 bail!(
268 "Too many tagged fields to encode ({} fields)",
269 num_tagged_fields
270 );
271 }
272 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
273
274 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
275 Ok(())
276 }
277 fn compute_size(&self, version: i16) -> Result<usize> {
278 let mut total_size = 0;
279 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.results)?;
280 let num_tagged_fields = self.unknown_tagged_fields.len();
281 if num_tagged_fields > std::u32::MAX as usize {
282 bail!(
283 "Too many tagged fields to encode ({} fields)",
284 num_tagged_fields
285 );
286 }
287 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
288
289 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
290 Ok(total_size)
291 }
292}
293
294#[cfg(feature = "client")]
295impl Decodable for ReadShareGroupStateResponse {
296 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
297 if version != 0 {
298 bail!("specified version not supported by this message type");
299 }
300 let results = types::CompactArray(types::Struct { version }).decode(buf)?;
301 let mut unknown_tagged_fields = BTreeMap::new();
302 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
303 for _ in 0..num_tagged_fields {
304 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
305 let size: u32 = types::UnsignedVarInt.decode(buf)?;
306 let unknown_value = buf.try_get_bytes(size as usize)?;
307 unknown_tagged_fields.insert(tag as i32, unknown_value);
308 }
309 Ok(Self {
310 results,
311 unknown_tagged_fields,
312 })
313 }
314}
315
316impl Default for ReadShareGroupStateResponse {
317 fn default() -> Self {
318 Self {
319 results: Default::default(),
320 unknown_tagged_fields: BTreeMap::new(),
321 }
322 }
323}
324
325impl Message for ReadShareGroupStateResponse {
326 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
327 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
328}
329
330#[non_exhaustive]
332#[derive(Debug, Clone, PartialEq)]
333pub struct ReadStateResult {
334 pub topic_id: Uuid,
338
339 pub partitions: Vec<PartitionResult>,
343
344 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
346}
347
348impl ReadStateResult {
349 pub fn with_topic_id(mut self, value: Uuid) -> Self {
355 self.topic_id = value;
356 self
357 }
358 pub fn with_partitions(mut self, value: Vec<PartitionResult>) -> Self {
364 self.partitions = value;
365 self
366 }
367 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
369 self.unknown_tagged_fields = value;
370 self
371 }
372 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
374 self.unknown_tagged_fields.insert(key, value);
375 self
376 }
377}
378
379#[cfg(feature = "broker")]
380impl Encodable for ReadStateResult {
381 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
382 if version != 0 {
383 bail!("specified version not supported by this message type");
384 }
385 types::Uuid.encode(buf, &self.topic_id)?;
386 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
387 let num_tagged_fields = self.unknown_tagged_fields.len();
388 if num_tagged_fields > std::u32::MAX as usize {
389 bail!(
390 "Too many tagged fields to encode ({} fields)",
391 num_tagged_fields
392 );
393 }
394 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
395
396 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
397 Ok(())
398 }
399 fn compute_size(&self, version: i16) -> Result<usize> {
400 let mut total_size = 0;
401 total_size += types::Uuid.compute_size(&self.topic_id)?;
402 total_size +=
403 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
404 let num_tagged_fields = self.unknown_tagged_fields.len();
405 if num_tagged_fields > std::u32::MAX as usize {
406 bail!(
407 "Too many tagged fields to encode ({} fields)",
408 num_tagged_fields
409 );
410 }
411 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
412
413 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
414 Ok(total_size)
415 }
416}
417
418#[cfg(feature = "client")]
419impl Decodable for ReadStateResult {
420 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
421 if version != 0 {
422 bail!("specified version not supported by this message type");
423 }
424 let topic_id = types::Uuid.decode(buf)?;
425 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
426 let mut unknown_tagged_fields = BTreeMap::new();
427 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
428 for _ in 0..num_tagged_fields {
429 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
430 let size: u32 = types::UnsignedVarInt.decode(buf)?;
431 let unknown_value = buf.try_get_bytes(size as usize)?;
432 unknown_tagged_fields.insert(tag as i32, unknown_value);
433 }
434 Ok(Self {
435 topic_id,
436 partitions,
437 unknown_tagged_fields,
438 })
439 }
440}
441
442impl Default for ReadStateResult {
443 fn default() -> Self {
444 Self {
445 topic_id: Uuid::nil(),
446 partitions: Default::default(),
447 unknown_tagged_fields: BTreeMap::new(),
448 }
449 }
450}
451
452impl Message for ReadStateResult {
453 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
454 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
455}
456
457#[non_exhaustive]
459#[derive(Debug, Clone, PartialEq)]
460pub struct StateBatch {
461 pub first_offset: i64,
465
466 pub last_offset: i64,
470
471 pub delivery_state: i8,
475
476 pub delivery_count: i16,
480
481 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
483}
484
485impl StateBatch {
486 pub fn with_first_offset(mut self, value: i64) -> Self {
492 self.first_offset = value;
493 self
494 }
495 pub fn with_last_offset(mut self, value: i64) -> Self {
501 self.last_offset = value;
502 self
503 }
504 pub fn with_delivery_state(mut self, value: i8) -> Self {
510 self.delivery_state = value;
511 self
512 }
513 pub fn with_delivery_count(mut self, value: i16) -> Self {
519 self.delivery_count = value;
520 self
521 }
522 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
524 self.unknown_tagged_fields = value;
525 self
526 }
527 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
529 self.unknown_tagged_fields.insert(key, value);
530 self
531 }
532}
533
534#[cfg(feature = "broker")]
535impl Encodable for StateBatch {
536 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
537 if version != 0 {
538 bail!("specified version not supported by this message type");
539 }
540 types::Int64.encode(buf, &self.first_offset)?;
541 types::Int64.encode(buf, &self.last_offset)?;
542 types::Int8.encode(buf, &self.delivery_state)?;
543 types::Int16.encode(buf, &self.delivery_count)?;
544 let num_tagged_fields = self.unknown_tagged_fields.len();
545 if num_tagged_fields > std::u32::MAX as usize {
546 bail!(
547 "Too many tagged fields to encode ({} fields)",
548 num_tagged_fields
549 );
550 }
551 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
552
553 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
554 Ok(())
555 }
556 fn compute_size(&self, version: i16) -> Result<usize> {
557 let mut total_size = 0;
558 total_size += types::Int64.compute_size(&self.first_offset)?;
559 total_size += types::Int64.compute_size(&self.last_offset)?;
560 total_size += types::Int8.compute_size(&self.delivery_state)?;
561 total_size += types::Int16.compute_size(&self.delivery_count)?;
562 let num_tagged_fields = self.unknown_tagged_fields.len();
563 if num_tagged_fields > std::u32::MAX as usize {
564 bail!(
565 "Too many tagged fields to encode ({} fields)",
566 num_tagged_fields
567 );
568 }
569 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
570
571 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
572 Ok(total_size)
573 }
574}
575
576#[cfg(feature = "client")]
577impl Decodable for StateBatch {
578 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
579 if version != 0 {
580 bail!("specified version not supported by this message type");
581 }
582 let first_offset = types::Int64.decode(buf)?;
583 let last_offset = types::Int64.decode(buf)?;
584 let delivery_state = types::Int8.decode(buf)?;
585 let delivery_count = types::Int16.decode(buf)?;
586 let mut unknown_tagged_fields = BTreeMap::new();
587 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
588 for _ in 0..num_tagged_fields {
589 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
590 let size: u32 = types::UnsignedVarInt.decode(buf)?;
591 let unknown_value = buf.try_get_bytes(size as usize)?;
592 unknown_tagged_fields.insert(tag as i32, unknown_value);
593 }
594 Ok(Self {
595 first_offset,
596 last_offset,
597 delivery_state,
598 delivery_count,
599 unknown_tagged_fields,
600 })
601 }
602}
603
604impl Default for StateBatch {
605 fn default() -> Self {
606 Self {
607 first_offset: 0,
608 last_offset: 0,
609 delivery_state: 0,
610 delivery_count: 0,
611 unknown_tagged_fields: BTreeMap::new(),
612 }
613 }
614}
615
616impl Message for StateBatch {
617 const VERSIONS: VersionRange = VersionRange { min: 0, max: 0 };
618 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
619}
620
621impl HeaderVersion for ReadShareGroupStateResponse {
622 fn header_version(version: i16) -> i16 {
623 1
624 }
625}