1#![allow(unused)]
6
7use std::borrow::Borrow;
8use std::collections::BTreeMap;
9
10use anyhow::{bail, Result};
11use bytes::Bytes;
12use uuid::Uuid;
13
14use crate::protocol::{
15 buf::{ByteBuf, ByteBufMut},
16 compute_unknown_tagged_fields_size, types, write_unknown_tagged_fields, Decodable, Decoder,
17 Encodable, Encoder, HeaderVersion, Message, StrBytes, VersionRange,
18};
19
20#[non_exhaustive]
22#[derive(Debug, Clone, PartialEq)]
23pub struct AcknowledgePartition {
24 pub partition_index: i32,
28
29 pub acknowledgement_batches: Vec<AcknowledgementBatch>,
33
34 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
36}
37
38impl AcknowledgePartition {
39 pub fn with_partition_index(mut self, value: i32) -> Self {
45 self.partition_index = value;
46 self
47 }
48 pub fn with_acknowledgement_batches(mut self, value: Vec<AcknowledgementBatch>) -> Self {
54 self.acknowledgement_batches = value;
55 self
56 }
57 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
59 self.unknown_tagged_fields = value;
60 self
61 }
62 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
64 self.unknown_tagged_fields.insert(key, value);
65 self
66 }
67}
68
69#[cfg(feature = "client")]
70impl Encodable for AcknowledgePartition {
71 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
72 if version != 1 {
73 bail!("specified version not supported by this message type");
74 }
75 types::Int32.encode(buf, &self.partition_index)?;
76 types::CompactArray(types::Struct { version })
77 .encode(buf, &self.acknowledgement_batches)?;
78 let num_tagged_fields = self.unknown_tagged_fields.len();
79 if num_tagged_fields > std::u32::MAX as usize {
80 bail!(
81 "Too many tagged fields to encode ({} fields)",
82 num_tagged_fields
83 );
84 }
85 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
86
87 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
88 Ok(())
89 }
90 fn compute_size(&self, version: i16) -> Result<usize> {
91 let mut total_size = 0;
92 total_size += types::Int32.compute_size(&self.partition_index)?;
93 total_size += types::CompactArray(types::Struct { version })
94 .compute_size(&self.acknowledgement_batches)?;
95 let num_tagged_fields = self.unknown_tagged_fields.len();
96 if num_tagged_fields > std::u32::MAX as usize {
97 bail!(
98 "Too many tagged fields to encode ({} fields)",
99 num_tagged_fields
100 );
101 }
102 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
103
104 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
105 Ok(total_size)
106 }
107}
108
109#[cfg(feature = "broker")]
110impl Decodable for AcknowledgePartition {
111 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
112 if version != 1 {
113 bail!("specified version not supported by this message type");
114 }
115 let partition_index = types::Int32.decode(buf)?;
116 let acknowledgement_batches = types::CompactArray(types::Struct { version }).decode(buf)?;
117 let mut unknown_tagged_fields = BTreeMap::new();
118 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
119 for _ in 0..num_tagged_fields {
120 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
121 let size: u32 = types::UnsignedVarInt.decode(buf)?;
122 let unknown_value = buf.try_get_bytes(size as usize)?;
123 unknown_tagged_fields.insert(tag as i32, unknown_value);
124 }
125 Ok(Self {
126 partition_index,
127 acknowledgement_batches,
128 unknown_tagged_fields,
129 })
130 }
131}
132
133impl Default for AcknowledgePartition {
134 fn default() -> Self {
135 Self {
136 partition_index: 0,
137 acknowledgement_batches: Default::default(),
138 unknown_tagged_fields: BTreeMap::new(),
139 }
140 }
141}
142
143impl Message for AcknowledgePartition {
144 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
145 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
146}
147
148#[non_exhaustive]
150#[derive(Debug, Clone, PartialEq)]
151pub struct AcknowledgeTopic {
152 pub topic_id: Uuid,
156
157 pub partitions: Vec<AcknowledgePartition>,
161
162 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
164}
165
166impl AcknowledgeTopic {
167 pub fn with_topic_id(mut self, value: Uuid) -> Self {
173 self.topic_id = value;
174 self
175 }
176 pub fn with_partitions(mut self, value: Vec<AcknowledgePartition>) -> Self {
182 self.partitions = value;
183 self
184 }
185 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
187 self.unknown_tagged_fields = value;
188 self
189 }
190 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
192 self.unknown_tagged_fields.insert(key, value);
193 self
194 }
195}
196
197#[cfg(feature = "client")]
198impl Encodable for AcknowledgeTopic {
199 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
200 if version != 1 {
201 bail!("specified version not supported by this message type");
202 }
203 types::Uuid.encode(buf, &self.topic_id)?;
204 types::CompactArray(types::Struct { version }).encode(buf, &self.partitions)?;
205 let num_tagged_fields = self.unknown_tagged_fields.len();
206 if num_tagged_fields > std::u32::MAX as usize {
207 bail!(
208 "Too many tagged fields to encode ({} fields)",
209 num_tagged_fields
210 );
211 }
212 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
213
214 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
215 Ok(())
216 }
217 fn compute_size(&self, version: i16) -> Result<usize> {
218 let mut total_size = 0;
219 total_size += types::Uuid.compute_size(&self.topic_id)?;
220 total_size +=
221 types::CompactArray(types::Struct { version }).compute_size(&self.partitions)?;
222 let num_tagged_fields = self.unknown_tagged_fields.len();
223 if num_tagged_fields > std::u32::MAX as usize {
224 bail!(
225 "Too many tagged fields to encode ({} fields)",
226 num_tagged_fields
227 );
228 }
229 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
230
231 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
232 Ok(total_size)
233 }
234}
235
236#[cfg(feature = "broker")]
237impl Decodable for AcknowledgeTopic {
238 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
239 if version != 1 {
240 bail!("specified version not supported by this message type");
241 }
242 let topic_id = types::Uuid.decode(buf)?;
243 let partitions = types::CompactArray(types::Struct { version }).decode(buf)?;
244 let mut unknown_tagged_fields = BTreeMap::new();
245 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
246 for _ in 0..num_tagged_fields {
247 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
248 let size: u32 = types::UnsignedVarInt.decode(buf)?;
249 let unknown_value = buf.try_get_bytes(size as usize)?;
250 unknown_tagged_fields.insert(tag as i32, unknown_value);
251 }
252 Ok(Self {
253 topic_id,
254 partitions,
255 unknown_tagged_fields,
256 })
257 }
258}
259
260impl Default for AcknowledgeTopic {
261 fn default() -> Self {
262 Self {
263 topic_id: Uuid::nil(),
264 partitions: Default::default(),
265 unknown_tagged_fields: BTreeMap::new(),
266 }
267 }
268}
269
270impl Message for AcknowledgeTopic {
271 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
272 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
273}
274
275#[non_exhaustive]
277#[derive(Debug, Clone, PartialEq)]
278pub struct AcknowledgementBatch {
279 pub first_offset: i64,
283
284 pub last_offset: i64,
288
289 pub acknowledge_types: Vec<i8>,
293
294 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
296}
297
298impl AcknowledgementBatch {
299 pub fn with_first_offset(mut self, value: i64) -> Self {
305 self.first_offset = value;
306 self
307 }
308 pub fn with_last_offset(mut self, value: i64) -> Self {
314 self.last_offset = value;
315 self
316 }
317 pub fn with_acknowledge_types(mut self, value: Vec<i8>) -> Self {
323 self.acknowledge_types = value;
324 self
325 }
326 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
328 self.unknown_tagged_fields = value;
329 self
330 }
331 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
333 self.unknown_tagged_fields.insert(key, value);
334 self
335 }
336}
337
338#[cfg(feature = "client")]
339impl Encodable for AcknowledgementBatch {
340 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
341 if version != 1 {
342 bail!("specified version not supported by this message type");
343 }
344 types::Int64.encode(buf, &self.first_offset)?;
345 types::Int64.encode(buf, &self.last_offset)?;
346 types::CompactArray(types::Int8).encode(buf, &self.acknowledge_types)?;
347 let num_tagged_fields = self.unknown_tagged_fields.len();
348 if num_tagged_fields > std::u32::MAX as usize {
349 bail!(
350 "Too many tagged fields to encode ({} fields)",
351 num_tagged_fields
352 );
353 }
354 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
355
356 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
357 Ok(())
358 }
359 fn compute_size(&self, version: i16) -> Result<usize> {
360 let mut total_size = 0;
361 total_size += types::Int64.compute_size(&self.first_offset)?;
362 total_size += types::Int64.compute_size(&self.last_offset)?;
363 total_size += types::CompactArray(types::Int8).compute_size(&self.acknowledge_types)?;
364 let num_tagged_fields = self.unknown_tagged_fields.len();
365 if num_tagged_fields > std::u32::MAX as usize {
366 bail!(
367 "Too many tagged fields to encode ({} fields)",
368 num_tagged_fields
369 );
370 }
371 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
372
373 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
374 Ok(total_size)
375 }
376}
377
378#[cfg(feature = "broker")]
379impl Decodable for AcknowledgementBatch {
380 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
381 if version != 1 {
382 bail!("specified version not supported by this message type");
383 }
384 let first_offset = types::Int64.decode(buf)?;
385 let last_offset = types::Int64.decode(buf)?;
386 let acknowledge_types = types::CompactArray(types::Int8).decode(buf)?;
387 let mut unknown_tagged_fields = BTreeMap::new();
388 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
389 for _ in 0..num_tagged_fields {
390 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
391 let size: u32 = types::UnsignedVarInt.decode(buf)?;
392 let unknown_value = buf.try_get_bytes(size as usize)?;
393 unknown_tagged_fields.insert(tag as i32, unknown_value);
394 }
395 Ok(Self {
396 first_offset,
397 last_offset,
398 acknowledge_types,
399 unknown_tagged_fields,
400 })
401 }
402}
403
404impl Default for AcknowledgementBatch {
405 fn default() -> Self {
406 Self {
407 first_offset: 0,
408 last_offset: 0,
409 acknowledge_types: Default::default(),
410 unknown_tagged_fields: BTreeMap::new(),
411 }
412 }
413}
414
415impl Message for AcknowledgementBatch {
416 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
417 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
418}
419
420#[non_exhaustive]
422#[derive(Debug, Clone, PartialEq)]
423pub struct ShareAcknowledgeRequest {
424 pub group_id: Option<super::GroupId>,
428
429 pub member_id: Option<StrBytes>,
433
434 pub share_session_epoch: i32,
438
439 pub topics: Vec<AcknowledgeTopic>,
443
444 pub unknown_tagged_fields: BTreeMap<i32, Bytes>,
446}
447
448impl ShareAcknowledgeRequest {
449 pub fn with_group_id(mut self, value: Option<super::GroupId>) -> Self {
455 self.group_id = value;
456 self
457 }
458 pub fn with_member_id(mut self, value: Option<StrBytes>) -> Self {
464 self.member_id = value;
465 self
466 }
467 pub fn with_share_session_epoch(mut self, value: i32) -> Self {
473 self.share_session_epoch = value;
474 self
475 }
476 pub fn with_topics(mut self, value: Vec<AcknowledgeTopic>) -> Self {
482 self.topics = value;
483 self
484 }
485 pub fn with_unknown_tagged_fields(mut self, value: BTreeMap<i32, Bytes>) -> Self {
487 self.unknown_tagged_fields = value;
488 self
489 }
490 pub fn with_unknown_tagged_field(mut self, key: i32, value: Bytes) -> Self {
492 self.unknown_tagged_fields.insert(key, value);
493 self
494 }
495}
496
497#[cfg(feature = "client")]
498impl Encodable for ShareAcknowledgeRequest {
499 fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<()> {
500 if version != 1 {
501 bail!("specified version not supported by this message type");
502 }
503 types::CompactString.encode(buf, &self.group_id)?;
504 types::CompactString.encode(buf, &self.member_id)?;
505 types::Int32.encode(buf, &self.share_session_epoch)?;
506 types::CompactArray(types::Struct { version }).encode(buf, &self.topics)?;
507 let num_tagged_fields = self.unknown_tagged_fields.len();
508 if num_tagged_fields > std::u32::MAX as usize {
509 bail!(
510 "Too many tagged fields to encode ({} fields)",
511 num_tagged_fields
512 );
513 }
514 types::UnsignedVarInt.encode(buf, num_tagged_fields as u32)?;
515
516 write_unknown_tagged_fields(buf, 0.., &self.unknown_tagged_fields)?;
517 Ok(())
518 }
519 fn compute_size(&self, version: i16) -> Result<usize> {
520 let mut total_size = 0;
521 total_size += types::CompactString.compute_size(&self.group_id)?;
522 total_size += types::CompactString.compute_size(&self.member_id)?;
523 total_size += types::Int32.compute_size(&self.share_session_epoch)?;
524 total_size += types::CompactArray(types::Struct { version }).compute_size(&self.topics)?;
525 let num_tagged_fields = self.unknown_tagged_fields.len();
526 if num_tagged_fields > std::u32::MAX as usize {
527 bail!(
528 "Too many tagged fields to encode ({} fields)",
529 num_tagged_fields
530 );
531 }
532 total_size += types::UnsignedVarInt.compute_size(num_tagged_fields as u32)?;
533
534 total_size += compute_unknown_tagged_fields_size(&self.unknown_tagged_fields)?;
535 Ok(total_size)
536 }
537}
538
539#[cfg(feature = "broker")]
540impl Decodable for ShareAcknowledgeRequest {
541 fn decode<B: ByteBuf>(buf: &mut B, version: i16) -> Result<Self> {
542 if version != 1 {
543 bail!("specified version not supported by this message type");
544 }
545 let group_id = types::CompactString.decode(buf)?;
546 let member_id = types::CompactString.decode(buf)?;
547 let share_session_epoch = types::Int32.decode(buf)?;
548 let topics = types::CompactArray(types::Struct { version }).decode(buf)?;
549 let mut unknown_tagged_fields = BTreeMap::new();
550 let num_tagged_fields = types::UnsignedVarInt.decode(buf)?;
551 for _ in 0..num_tagged_fields {
552 let tag: u32 = types::UnsignedVarInt.decode(buf)?;
553 let size: u32 = types::UnsignedVarInt.decode(buf)?;
554 let unknown_value = buf.try_get_bytes(size as usize)?;
555 unknown_tagged_fields.insert(tag as i32, unknown_value);
556 }
557 Ok(Self {
558 group_id,
559 member_id,
560 share_session_epoch,
561 topics,
562 unknown_tagged_fields,
563 })
564 }
565}
566
567impl Default for ShareAcknowledgeRequest {
568 fn default() -> Self {
569 Self {
570 group_id: None,
571 member_id: Some(Default::default()),
572 share_session_epoch: 0,
573 topics: Default::default(),
574 unknown_tagged_fields: BTreeMap::new(),
575 }
576 }
577}
578
579impl Message for ShareAcknowledgeRequest {
580 const VERSIONS: VersionRange = VersionRange { min: 1, max: 1 };
581 const DEPRECATED_VERSIONS: Option<VersionRange> = None;
582}
583
584impl HeaderVersion for ShareAcknowledgeRequest {
585 fn header_version(version: i16) -> i16 {
586 2
587 }
588}