1use bytes::BufMut;
4
5use crate::primitives::fixed::{
6 get_bool, get_i8, get_i32, get_i64, put_bool, put_i8, put_i32, put_i64,
7};
8use crate::primitives::string_bytes::{
9 compact_nullable_string_len, nullable_string_len, put_compact_nullable_string,
10 put_nullable_string,
11};
12use crate::primitives::string_bytes_borrowed::{
13 get_compact_nullable_string_borrowed, get_nullable_string_borrowed,
14};
15use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
16use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
17
18pub const API_KEY: i16 = 78;
19pub const MIN_VERSION: i16 = 1;
20pub const MAX_VERSION: i16 = 2;
21pub const FLEXIBLE_MIN: i16 = 0;
22
23#[inline]
24fn is_flexible(version: i16) -> bool {
25 version >= FLEXIBLE_MIN
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct ShareFetchRequest<'a> {
30 pub group_id: Option<&'a str>,
31 pub member_id: Option<&'a str>,
32 pub share_session_epoch: i32,
33 pub max_wait_ms: i32,
34 pub min_bytes: i32,
35 pub max_bytes: i32,
36 pub max_records: i32,
37 pub batch_size: i32,
38 pub share_acquire_mode: i8,
39 pub is_renew_ack: bool,
40 pub topics: Vec<FetchTopic>,
41 pub forgotten_topics_data: Vec<ForgottenTopic>,
42 pub unknown_tagged_fields: UnknownTaggedFields,
43}
44impl Default for ShareFetchRequest<'_> {
45 fn default() -> Self {
46 Self {
47 group_id: None,
48 member_id: None,
49 share_session_epoch: 0i32,
50 max_wait_ms: 0i32,
51 min_bytes: 0i32,
52 max_bytes: 2_147_483_647i32,
53 max_records: 0i32,
54 batch_size: 0i32,
55 share_acquire_mode: 0i8,
56 is_renew_ack: false,
57 topics: Vec::new(),
58 forgotten_topics_data: Vec::new(),
59 unknown_tagged_fields: Default::default(),
60 }
61 }
62}
63impl ShareFetchRequest<'_> {
64 pub fn to_owned(&self) -> crate::owned::share_fetch_request::ShareFetchRequest {
65 crate::owned::share_fetch_request::ShareFetchRequest {
66 group_id: (self.group_id).map(std::string::ToString::to_string),
67 member_id: (self.member_id).map(std::string::ToString::to_string),
68 share_session_epoch: (self.share_session_epoch),
69 max_wait_ms: (self.max_wait_ms),
70 min_bytes: (self.min_bytes),
71 max_bytes: (self.max_bytes),
72 max_records: (self.max_records),
73 batch_size: (self.batch_size),
74 share_acquire_mode: (self.share_acquire_mode),
75 is_renew_ack: (self.is_renew_ack),
76 topics: (self.topics).iter().map(FetchTopic::to_owned).collect(),
77 forgotten_topics_data: (self.forgotten_topics_data)
78 .iter()
79 .map(ForgottenTopic::to_owned)
80 .collect(),
81 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
82 }
83 }
84}
85impl Encode for ShareFetchRequest<'_> {
86 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
87 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
88 return Err(ProtocolError::UnsupportedVersion {
89 api_key: API_KEY,
90 version,
91 });
92 }
93 let flex = is_flexible(version);
94 if version >= 0 {
95 if flex {
96 put_compact_nullable_string(buf, self.group_id);
97 } else {
98 put_nullable_string(buf, self.group_id);
99 }
100 }
101 if version >= 0 {
102 if flex {
103 put_compact_nullable_string(buf, self.member_id);
104 } else {
105 put_nullable_string(buf, self.member_id);
106 }
107 }
108 if version >= 0 {
109 put_i32(buf, self.share_session_epoch);
110 }
111 if version >= 0 {
112 put_i32(buf, self.max_wait_ms);
113 }
114 if version >= 0 {
115 put_i32(buf, self.min_bytes);
116 }
117 if version >= 0 {
118 put_i32(buf, self.max_bytes);
119 }
120 if version >= 1 {
121 put_i32(buf, self.max_records);
122 }
123 if version >= 1 {
124 put_i32(buf, self.batch_size);
125 }
126 if version >= 2 {
127 put_i8(buf, self.share_acquire_mode);
128 }
129 if version >= 2 {
130 put_bool(buf, self.is_renew_ack);
131 }
132 if version >= 0 {
133 {
134 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
135 for it in &self.topics {
136 it.encode(buf, version)?;
137 }
138 }
139 }
140 if version >= 0 {
141 {
142 crate::primitives::array::put_array_len(
143 buf,
144 (self.forgotten_topics_data).len(),
145 flex,
146 );
147 for it in &self.forgotten_topics_data {
148 it.encode(buf, version)?;
149 }
150 }
151 }
152 if flex {
153 let tagged = WriteTaggedFields::new();
154 tagged.write(buf, &self.unknown_tagged_fields);
155 }
156 Ok(())
157 }
158 fn encoded_len(&self, version: i16) -> usize {
159 let flex = is_flexible(version);
160 let mut n: usize = 0;
161 if version >= 0 {
162 n += if flex {
163 compact_nullable_string_len(self.group_id)
164 } else {
165 nullable_string_len(self.group_id)
166 };
167 }
168 if version >= 0 {
169 n += if flex {
170 compact_nullable_string_len(self.member_id)
171 } else {
172 nullable_string_len(self.member_id)
173 };
174 }
175 if version >= 0 {
176 n += 4;
177 }
178 if version >= 0 {
179 n += 4;
180 }
181 if version >= 0 {
182 n += 4;
183 }
184 if version >= 0 {
185 n += 4;
186 }
187 if version >= 1 {
188 n += 4;
189 }
190 if version >= 1 {
191 n += 4;
192 }
193 if version >= 2 {
194 n += 1;
195 }
196 if version >= 2 {
197 n += 1;
198 }
199 if version >= 0 {
200 n += {
201 let prefix =
202 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
203 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
204 prefix + body
205 };
206 }
207 if version >= 0 {
208 n += {
209 let prefix = crate::primitives::array::array_len_prefix_len(
210 (self.forgotten_topics_data).len(),
211 flex,
212 );
213 let body: usize = (self.forgotten_topics_data)
214 .iter()
215 .map(|it| it.encoded_len(version))
216 .sum();
217 prefix + body
218 };
219 }
220 if flex {
221 let known_pairs: Vec<(u32, usize)> = Vec::new();
222 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
223 }
224 n
225 }
226}
227impl<'de> DecodeBorrow<'de> for ShareFetchRequest<'de> {
228 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
229 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
230 return Err(ProtocolError::UnsupportedVersion {
231 api_key: API_KEY,
232 version,
233 });
234 }
235 let flex = is_flexible(version);
236 let mut out = Self::default();
237 if version >= 0 {
238 out.group_id = if flex {
239 get_compact_nullable_string_borrowed(buf)?
240 } else {
241 get_nullable_string_borrowed(buf)?
242 };
243 }
244 if version >= 0 {
245 out.member_id = if flex {
246 get_compact_nullable_string_borrowed(buf)?
247 } else {
248 get_nullable_string_borrowed(buf)?
249 };
250 }
251 if version >= 0 {
252 out.share_session_epoch = get_i32(buf)?;
253 }
254 if version >= 0 {
255 out.max_wait_ms = get_i32(buf)?;
256 }
257 if version >= 0 {
258 out.min_bytes = get_i32(buf)?;
259 }
260 if version >= 0 {
261 out.max_bytes = get_i32(buf)?;
262 }
263 if version >= 1 {
264 out.max_records = get_i32(buf)?;
265 }
266 if version >= 1 {
267 out.batch_size = get_i32(buf)?;
268 }
269 if version >= 2 {
270 out.share_acquire_mode = get_i8(buf)?;
271 }
272 if version >= 2 {
273 out.is_renew_ack = get_bool(buf)?;
274 }
275 if version >= 0 {
276 out.topics = {
277 let n = crate::primitives::array::get_array_len(buf, flex)?;
278 let mut v = Vec::with_capacity(n);
279 for _ in 0..n {
280 v.push(FetchTopic::decode_borrow(buf, version)?);
281 }
282 v
283 };
284 }
285 if version >= 0 {
286 out.forgotten_topics_data = {
287 let n = crate::primitives::array::get_array_len(buf, flex)?;
288 let mut v = Vec::with_capacity(n);
289 for _ in 0..n {
290 v.push(ForgottenTopic::decode_borrow(buf, version)?);
291 }
292 v
293 };
294 }
295 if flex {
296 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
297 }
298 Ok(out)
299 }
300}
301#[cfg(test)]
302impl ShareFetchRequest<'_> {
303 #[must_use]
304 pub fn populated(version: i16) -> Self {
305 let mut m = Self::default();
306 if version >= 0 {
307 m.group_id = Some("x");
308 }
309 if version >= 0 {
310 m.member_id = Some("x");
311 }
312 if version >= 0 {
313 m.share_session_epoch = 1i32;
314 }
315 if version >= 0 {
316 m.max_wait_ms = 1i32;
317 }
318 if version >= 0 {
319 m.min_bytes = 1i32;
320 }
321 if version >= 0 {
322 m.max_bytes = 1i32;
323 }
324 if version >= 1 {
325 m.max_records = 1i32;
326 }
327 if version >= 1 {
328 m.batch_size = 1i32;
329 }
330 if version >= 2 {
331 m.share_acquire_mode = 1i8;
332 }
333 if version >= 2 {
334 m.is_renew_ack = true;
335 }
336 if version >= 0 {
337 m.topics = vec![FetchTopic::populated(version)];
338 }
339 if version >= 0 {
340 m.forgotten_topics_data = vec![ForgottenTopic::populated(version)];
341 }
342 m
343 }
344}
345#[derive(Debug, Clone, PartialEq, Eq, Default)]
346pub struct FetchTopic {
347 pub topic_id: crate::primitives::uuid::Uuid,
348 pub partitions: Vec<FetchPartition>,
349 pub unknown_tagged_fields: UnknownTaggedFields,
350}
351impl FetchTopic {
352 pub fn to_owned(&self) -> crate::owned::share_fetch_request::FetchTopic {
353 crate::owned::share_fetch_request::FetchTopic {
354 topic_id: (self.topic_id),
355 partitions: (self.partitions)
356 .iter()
357 .map(FetchPartition::to_owned)
358 .collect(),
359 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
360 }
361 }
362}
363impl Encode for FetchTopic {
364 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
365 let flex = version >= 0;
366 if version >= 0 {
367 crate::primitives::uuid::put_uuid(buf, self.topic_id);
368 }
369 if version >= 0 {
370 {
371 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
372 for it in &self.partitions {
373 it.encode(buf, version)?;
374 }
375 }
376 }
377 if flex {
378 let tagged = WriteTaggedFields::new();
379 tagged.write(buf, &self.unknown_tagged_fields);
380 }
381 Ok(())
382 }
383 fn encoded_len(&self, version: i16) -> usize {
384 let flex = version >= 0;
385 let mut n: usize = 0;
386 if version >= 0 {
387 n += 16;
388 }
389 if version >= 0 {
390 n += {
391 let prefix =
392 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
393 let body: usize = (self.partitions)
394 .iter()
395 .map(|it| it.encoded_len(version))
396 .sum();
397 prefix + body
398 };
399 }
400 if flex {
401 let known_pairs: Vec<(u32, usize)> = Vec::new();
402 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
403 }
404 n
405 }
406}
407impl<'de> DecodeBorrow<'de> for FetchTopic {
408 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
409 let flex = version >= 0;
410 let mut out = Self::default();
411 if version >= 0 {
412 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
413 }
414 if version >= 0 {
415 out.partitions = {
416 let n = crate::primitives::array::get_array_len(buf, flex)?;
417 let mut v = Vec::with_capacity(n);
418 for _ in 0..n {
419 v.push(FetchPartition::decode_borrow(buf, version)?);
420 }
421 v
422 };
423 }
424 if flex {
425 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
426 }
427 Ok(out)
428 }
429}
430#[cfg(test)]
431impl FetchTopic {
432 #[must_use]
433 pub fn populated(version: i16) -> Self {
434 let mut m = Self::default();
435 if version >= 0 {
436 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
437 }
438 if version >= 0 {
439 m.partitions = vec![FetchPartition::populated(version)];
440 }
441 m
442 }
443}
444#[derive(Debug, Clone, PartialEq, Eq, Default)]
445pub struct FetchPartition {
446 pub partition_index: i32,
447 pub partition_max_bytes: i32,
448 pub acknowledgement_batches: Vec<AcknowledgementBatch>,
449 pub unknown_tagged_fields: UnknownTaggedFields,
450}
451impl FetchPartition {
452 pub fn to_owned(&self) -> crate::owned::share_fetch_request::FetchPartition {
453 crate::owned::share_fetch_request::FetchPartition {
454 partition_index: (self.partition_index),
455 partition_max_bytes: (self.partition_max_bytes),
456 acknowledgement_batches: (self.acknowledgement_batches)
457 .iter()
458 .map(AcknowledgementBatch::to_owned)
459 .collect(),
460 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
461 }
462 }
463}
464impl Encode for FetchPartition {
465 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
466 let flex = version >= 0;
467 if version >= 0 {
468 put_i32(buf, self.partition_index);
469 }
470 if version == 0 {
471 put_i32(buf, self.partition_max_bytes);
472 }
473 if version >= 0 {
474 {
475 crate::primitives::array::put_array_len(
476 buf,
477 (self.acknowledgement_batches).len(),
478 flex,
479 );
480 for it in &self.acknowledgement_batches {
481 it.encode(buf, version)?;
482 }
483 }
484 }
485 if flex {
486 let tagged = WriteTaggedFields::new();
487 tagged.write(buf, &self.unknown_tagged_fields);
488 }
489 Ok(())
490 }
491 fn encoded_len(&self, version: i16) -> usize {
492 let flex = version >= 0;
493 let mut n: usize = 0;
494 if version >= 0 {
495 n += 4;
496 }
497 if version == 0 {
498 n += 4;
499 }
500 if version >= 0 {
501 n += {
502 let prefix = crate::primitives::array::array_len_prefix_len(
503 (self.acknowledgement_batches).len(),
504 flex,
505 );
506 let body: usize = (self.acknowledgement_batches)
507 .iter()
508 .map(|it| it.encoded_len(version))
509 .sum();
510 prefix + body
511 };
512 }
513 if flex {
514 let known_pairs: Vec<(u32, usize)> = Vec::new();
515 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
516 }
517 n
518 }
519}
520impl<'de> DecodeBorrow<'de> for FetchPartition {
521 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
522 let flex = version >= 0;
523 let mut out = Self::default();
524 if version >= 0 {
525 out.partition_index = get_i32(buf)?;
526 }
527 if version == 0 {
528 out.partition_max_bytes = get_i32(buf)?;
529 }
530 if version >= 0 {
531 out.acknowledgement_batches = {
532 let n = crate::primitives::array::get_array_len(buf, flex)?;
533 let mut v = Vec::with_capacity(n);
534 for _ in 0..n {
535 v.push(AcknowledgementBatch::decode_borrow(buf, version)?);
536 }
537 v
538 };
539 }
540 if flex {
541 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
542 }
543 Ok(out)
544 }
545}
546#[cfg(test)]
547impl FetchPartition {
548 #[must_use]
549 pub fn populated(version: i16) -> Self {
550 let mut m = Self::default();
551 if version >= 0 {
552 m.partition_index = 1i32;
553 }
554 if version == 0 {
555 m.partition_max_bytes = 1i32;
556 }
557 if version >= 0 {
558 m.acknowledgement_batches = vec![AcknowledgementBatch::populated(version)];
559 }
560 m
561 }
562}
563#[derive(Debug, Clone, PartialEq, Eq, Default)]
564pub struct AcknowledgementBatch {
565 pub first_offset: i64,
566 pub last_offset: i64,
567 pub acknowledge_types: Vec<i8>,
568 pub unknown_tagged_fields: UnknownTaggedFields,
569}
570impl AcknowledgementBatch {
571 pub fn to_owned(&self) -> crate::owned::share_fetch_request::AcknowledgementBatch {
572 crate::owned::share_fetch_request::AcknowledgementBatch {
573 first_offset: (self.first_offset),
574 last_offset: (self.last_offset),
575 acknowledge_types: (self.acknowledge_types).clone(),
576 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
577 }
578 }
579}
580impl Encode for AcknowledgementBatch {
581 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
582 let flex = version >= 0;
583 if version >= 0 {
584 put_i64(buf, self.first_offset);
585 }
586 if version >= 0 {
587 put_i64(buf, self.last_offset);
588 }
589 if version >= 0 {
590 {
591 crate::primitives::array::put_array_len(buf, (self.acknowledge_types).len(), flex);
592 for it in &self.acknowledge_types {
593 put_i8(buf, *it);
594 }
595 }
596 }
597 if flex {
598 let tagged = WriteTaggedFields::new();
599 tagged.write(buf, &self.unknown_tagged_fields);
600 }
601 Ok(())
602 }
603 fn encoded_len(&self, version: i16) -> usize {
604 let flex = version >= 0;
605 let mut n: usize = 0;
606 if version >= 0 {
607 n += 8;
608 }
609 if version >= 0 {
610 n += 8;
611 }
612 if version >= 0 {
613 n += {
614 let prefix = crate::primitives::array::array_len_prefix_len(
615 (self.acknowledge_types).len(),
616 flex,
617 );
618 let body: usize = (self.acknowledge_types).iter().map(|_| 1).sum();
619 prefix + body
620 };
621 }
622 if flex {
623 let known_pairs: Vec<(u32, usize)> = Vec::new();
624 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
625 }
626 n
627 }
628}
629impl<'de> DecodeBorrow<'de> for AcknowledgementBatch {
630 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
631 let flex = version >= 0;
632 let mut out = Self::default();
633 if version >= 0 {
634 out.first_offset = get_i64(buf)?;
635 }
636 if version >= 0 {
637 out.last_offset = get_i64(buf)?;
638 }
639 if version >= 0 {
640 out.acknowledge_types = {
641 let n = crate::primitives::array::get_array_len(buf, flex)?;
642 let mut v = Vec::with_capacity(n);
643 for _ in 0..n {
644 v.push(get_i8(buf)?);
645 }
646 v
647 };
648 }
649 if flex {
650 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
651 }
652 Ok(out)
653 }
654}
655#[cfg(test)]
656impl AcknowledgementBatch {
657 #[must_use]
658 pub fn populated(version: i16) -> Self {
659 let mut m = Self::default();
660 if version >= 0 {
661 m.first_offset = 1i64;
662 }
663 if version >= 0 {
664 m.last_offset = 1i64;
665 }
666 if version >= 0 {
667 m.acknowledge_types = vec![1i8];
668 }
669 m
670 }
671}
672#[derive(Debug, Clone, PartialEq, Eq, Default)]
673pub struct ForgottenTopic {
674 pub topic_id: crate::primitives::uuid::Uuid,
675 pub partitions: Vec<i32>,
676 pub unknown_tagged_fields: UnknownTaggedFields,
677}
678impl ForgottenTopic {
679 pub fn to_owned(&self) -> crate::owned::share_fetch_request::ForgottenTopic {
680 crate::owned::share_fetch_request::ForgottenTopic {
681 topic_id: (self.topic_id),
682 partitions: (self.partitions).clone(),
683 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
684 }
685 }
686}
687impl Encode for ForgottenTopic {
688 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
689 let flex = version >= 0;
690 if version >= 0 {
691 crate::primitives::uuid::put_uuid(buf, self.topic_id);
692 }
693 if version >= 0 {
694 {
695 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
696 for it in &self.partitions {
697 put_i32(buf, *it);
698 }
699 }
700 }
701 if flex {
702 let tagged = WriteTaggedFields::new();
703 tagged.write(buf, &self.unknown_tagged_fields);
704 }
705 Ok(())
706 }
707 fn encoded_len(&self, version: i16) -> usize {
708 let flex = version >= 0;
709 let mut n: usize = 0;
710 if version >= 0 {
711 n += 16;
712 }
713 if version >= 0 {
714 n += {
715 let prefix =
716 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
717 let body: usize = (self.partitions).iter().map(|_| 4).sum();
718 prefix + body
719 };
720 }
721 if flex {
722 let known_pairs: Vec<(u32, usize)> = Vec::new();
723 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
724 }
725 n
726 }
727}
728impl<'de> DecodeBorrow<'de> for ForgottenTopic {
729 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
730 let flex = version >= 0;
731 let mut out = Self::default();
732 if version >= 0 {
733 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
734 }
735 if version >= 0 {
736 out.partitions = {
737 let n = crate::primitives::array::get_array_len(buf, flex)?;
738 let mut v = Vec::with_capacity(n);
739 for _ in 0..n {
740 v.push(get_i32(buf)?);
741 }
742 v
743 };
744 }
745 if flex {
746 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
747 }
748 Ok(out)
749 }
750}
751#[cfg(test)]
752impl ForgottenTopic {
753 #[must_use]
754 pub fn populated(version: i16) -> Self {
755 let mut m = Self::default();
756 if version >= 0 {
757 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
758 }
759 if version >= 0 {
760 m.partitions = vec![1i32];
761 }
762 m
763 }
764}