1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{
6 get_bool, get_i8, get_i32, get_i64, put_bool, put_i8, put_i32, put_i64,
7};
8use crate::primitives::string_bytes::{
9 compact_nullable_string_len, get_compact_nullable_string_owned, get_nullable_string_owned,
10 nullable_string_len, put_compact_nullable_string, put_nullable_string,
11};
12use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
13use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
14
15pub const API_KEY: i16 = 78;
16pub const MIN_VERSION: i16 = 1;
17pub const MAX_VERSION: i16 = 2;
18pub const FLEXIBLE_MIN: i16 = 0;
19
20#[inline]
21fn is_flexible(version: i16) -> bool {
22 version >= FLEXIBLE_MIN
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct ShareFetchRequest {
27 pub group_id: Option<String>,
28 pub member_id: Option<String>,
29 pub share_session_epoch: i32,
30 pub max_wait_ms: i32,
31 pub min_bytes: i32,
32 pub max_bytes: i32,
33 pub max_records: i32,
34 pub batch_size: i32,
35 pub share_acquire_mode: i8,
36 pub is_renew_ack: bool,
37 pub topics: Vec<FetchTopic>,
38 pub forgotten_topics_data: Vec<ForgottenTopic>,
39 pub unknown_tagged_fields: UnknownTaggedFields,
40}
41impl Default for ShareFetchRequest {
42 fn default() -> Self {
43 Self {
44 group_id: None,
45 member_id: None,
46 share_session_epoch: 0i32,
47 max_wait_ms: 0i32,
48 min_bytes: 0i32,
49 max_bytes: 2_147_483_647i32,
50 max_records: 0i32,
51 batch_size: 0i32,
52 share_acquire_mode: 0i8,
53 is_renew_ack: false,
54 topics: Vec::new(),
55 forgotten_topics_data: Vec::new(),
56 unknown_tagged_fields: Default::default(),
57 }
58 }
59}
60impl Encode for ShareFetchRequest {
61 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
62 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
63 return Err(ProtocolError::UnsupportedVersion {
64 api_key: API_KEY,
65 version,
66 });
67 }
68 let flex = is_flexible(version);
69 if version >= 0 {
70 if flex {
71 put_compact_nullable_string(buf, self.group_id.as_deref());
72 } else {
73 put_nullable_string(buf, self.group_id.as_deref());
74 }
75 }
76 if version >= 0 {
77 if flex {
78 put_compact_nullable_string(buf, self.member_id.as_deref());
79 } else {
80 put_nullable_string(buf, self.member_id.as_deref());
81 }
82 }
83 if version >= 0 {
84 put_i32(buf, self.share_session_epoch);
85 }
86 if version >= 0 {
87 put_i32(buf, self.max_wait_ms);
88 }
89 if version >= 0 {
90 put_i32(buf, self.min_bytes);
91 }
92 if version >= 0 {
93 put_i32(buf, self.max_bytes);
94 }
95 if version >= 1 {
96 put_i32(buf, self.max_records);
97 }
98 if version >= 1 {
99 put_i32(buf, self.batch_size);
100 }
101 if version >= 2 {
102 put_i8(buf, self.share_acquire_mode);
103 }
104 if version >= 2 {
105 put_bool(buf, self.is_renew_ack);
106 }
107 if version >= 0 {
108 {
109 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
110 for it in &self.topics {
111 it.encode(buf, version)?;
112 }
113 }
114 }
115 if version >= 0 {
116 {
117 crate::primitives::array::put_array_len(
118 buf,
119 (self.forgotten_topics_data).len(),
120 flex,
121 );
122 for it in &self.forgotten_topics_data {
123 it.encode(buf, version)?;
124 }
125 }
126 }
127 if flex {
128 let tagged = WriteTaggedFields::new();
129 tagged.write(buf, &self.unknown_tagged_fields);
130 }
131 Ok(())
132 }
133 fn encoded_len(&self, version: i16) -> usize {
134 let flex = is_flexible(version);
135 let mut n: usize = 0;
136 if version >= 0 {
137 n += if flex {
138 compact_nullable_string_len(self.group_id.as_deref())
139 } else {
140 nullable_string_len(self.group_id.as_deref())
141 };
142 }
143 if version >= 0 {
144 n += if flex {
145 compact_nullable_string_len(self.member_id.as_deref())
146 } else {
147 nullable_string_len(self.member_id.as_deref())
148 };
149 }
150 if version >= 0 {
151 n += 4;
152 }
153 if version >= 0 {
154 n += 4;
155 }
156 if version >= 0 {
157 n += 4;
158 }
159 if version >= 0 {
160 n += 4;
161 }
162 if version >= 1 {
163 n += 4;
164 }
165 if version >= 1 {
166 n += 4;
167 }
168 if version >= 2 {
169 n += 1;
170 }
171 if version >= 2 {
172 n += 1;
173 }
174 if version >= 0 {
175 n += {
176 let prefix =
177 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
178 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
179 prefix + body
180 };
181 }
182 if version >= 0 {
183 n += {
184 let prefix = crate::primitives::array::array_len_prefix_len(
185 (self.forgotten_topics_data).len(),
186 flex,
187 );
188 let body: usize = (self.forgotten_topics_data)
189 .iter()
190 .map(|it| it.encoded_len(version))
191 .sum();
192 prefix + body
193 };
194 }
195 if flex {
196 let known_pairs: Vec<(u32, usize)> = Vec::new();
197 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
198 }
199 n
200 }
201}
202impl Decode<'_> for ShareFetchRequest {
203 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
204 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
205 return Err(ProtocolError::UnsupportedVersion {
206 api_key: API_KEY,
207 version,
208 });
209 }
210 let flex = is_flexible(version);
211 let mut out = Self::default();
212 if version >= 0 {
213 out.group_id = if flex {
214 get_compact_nullable_string_owned(buf)?
215 } else {
216 get_nullable_string_owned(buf)?
217 };
218 }
219 if version >= 0 {
220 out.member_id = if flex {
221 get_compact_nullable_string_owned(buf)?
222 } else {
223 get_nullable_string_owned(buf)?
224 };
225 }
226 if version >= 0 {
227 out.share_session_epoch = get_i32(buf)?;
228 }
229 if version >= 0 {
230 out.max_wait_ms = get_i32(buf)?;
231 }
232 if version >= 0 {
233 out.min_bytes = get_i32(buf)?;
234 }
235 if version >= 0 {
236 out.max_bytes = get_i32(buf)?;
237 }
238 if version >= 1 {
239 out.max_records = get_i32(buf)?;
240 }
241 if version >= 1 {
242 out.batch_size = get_i32(buf)?;
243 }
244 if version >= 2 {
245 out.share_acquire_mode = get_i8(buf)?;
246 }
247 if version >= 2 {
248 out.is_renew_ack = get_bool(buf)?;
249 }
250 if version >= 0 {
251 out.topics = {
252 let n = crate::primitives::array::get_array_len(buf, flex)?;
253 let mut v = Vec::with_capacity(n);
254 for _ in 0..n {
255 v.push(FetchTopic::decode(buf, version)?);
256 }
257 v
258 };
259 }
260 if version >= 0 {
261 out.forgotten_topics_data = {
262 let n = crate::primitives::array::get_array_len(buf, flex)?;
263 let mut v = Vec::with_capacity(n);
264 for _ in 0..n {
265 v.push(ForgottenTopic::decode(buf, version)?);
266 }
267 v
268 };
269 }
270 if flex {
271 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
272 }
273 Ok(out)
274 }
275}
276#[cfg(test)]
277impl ShareFetchRequest {
278 #[must_use]
279 pub fn populated(version: i16) -> Self {
280 let mut m = Self::default();
281 if version >= 0 {
282 m.group_id = Some("x".to_string());
283 }
284 if version >= 0 {
285 m.member_id = Some("x".to_string());
286 }
287 if version >= 0 {
288 m.share_session_epoch = 1i32;
289 }
290 if version >= 0 {
291 m.max_wait_ms = 1i32;
292 }
293 if version >= 0 {
294 m.min_bytes = 1i32;
295 }
296 if version >= 0 {
297 m.max_bytes = 1i32;
298 }
299 if version >= 1 {
300 m.max_records = 1i32;
301 }
302 if version >= 1 {
303 m.batch_size = 1i32;
304 }
305 if version >= 2 {
306 m.share_acquire_mode = 1i8;
307 }
308 if version >= 2 {
309 m.is_renew_ack = true;
310 }
311 if version >= 0 {
312 m.topics = vec![FetchTopic::populated(version)];
313 }
314 if version >= 0 {
315 m.forgotten_topics_data = vec![ForgottenTopic::populated(version)];
316 }
317 m
318 }
319}
320#[derive(Debug, Clone, PartialEq, Eq, Default)]
321pub struct FetchTopic {
322 pub topic_id: crate::primitives::uuid::Uuid,
323 pub partitions: Vec<FetchPartition>,
324 pub unknown_tagged_fields: UnknownTaggedFields,
325}
326impl Encode for FetchTopic {
327 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
328 let flex = version >= 0;
329 if version >= 0 {
330 crate::primitives::uuid::put_uuid(buf, self.topic_id);
331 }
332 if version >= 0 {
333 {
334 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
335 for it in &self.partitions {
336 it.encode(buf, version)?;
337 }
338 }
339 }
340 if flex {
341 let tagged = WriteTaggedFields::new();
342 tagged.write(buf, &self.unknown_tagged_fields);
343 }
344 Ok(())
345 }
346 fn encoded_len(&self, version: i16) -> usize {
347 let flex = version >= 0;
348 let mut n: usize = 0;
349 if version >= 0 {
350 n += 16;
351 }
352 if version >= 0 {
353 n += {
354 let prefix =
355 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
356 let body: usize = (self.partitions)
357 .iter()
358 .map(|it| it.encoded_len(version))
359 .sum();
360 prefix + body
361 };
362 }
363 if flex {
364 let known_pairs: Vec<(u32, usize)> = Vec::new();
365 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
366 }
367 n
368 }
369}
370impl Decode<'_> for FetchTopic {
371 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
372 let flex = version >= 0;
373 let mut out = Self::default();
374 if version >= 0 {
375 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
376 }
377 if version >= 0 {
378 out.partitions = {
379 let n = crate::primitives::array::get_array_len(buf, flex)?;
380 let mut v = Vec::with_capacity(n);
381 for _ in 0..n {
382 v.push(FetchPartition::decode(buf, version)?);
383 }
384 v
385 };
386 }
387 if flex {
388 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
389 }
390 Ok(out)
391 }
392}
393#[cfg(test)]
394impl FetchTopic {
395 #[must_use]
396 pub fn populated(version: i16) -> Self {
397 let mut m = Self::default();
398 if version >= 0 {
399 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
400 }
401 if version >= 0 {
402 m.partitions = vec![FetchPartition::populated(version)];
403 }
404 m
405 }
406}
407#[derive(Debug, Clone, PartialEq, Eq, Default)]
408pub struct FetchPartition {
409 pub partition_index: i32,
410 pub partition_max_bytes: i32,
411 pub acknowledgement_batches: Vec<AcknowledgementBatch>,
412 pub unknown_tagged_fields: UnknownTaggedFields,
413}
414impl Encode for FetchPartition {
415 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
416 let flex = version >= 0;
417 if version >= 0 {
418 put_i32(buf, self.partition_index);
419 }
420 if version == 0 {
421 put_i32(buf, self.partition_max_bytes);
422 }
423 if version >= 0 {
424 {
425 crate::primitives::array::put_array_len(
426 buf,
427 (self.acknowledgement_batches).len(),
428 flex,
429 );
430 for it in &self.acknowledgement_batches {
431 it.encode(buf, version)?;
432 }
433 }
434 }
435 if flex {
436 let tagged = WriteTaggedFields::new();
437 tagged.write(buf, &self.unknown_tagged_fields);
438 }
439 Ok(())
440 }
441 fn encoded_len(&self, version: i16) -> usize {
442 let flex = version >= 0;
443 let mut n: usize = 0;
444 if version >= 0 {
445 n += 4;
446 }
447 if version == 0 {
448 n += 4;
449 }
450 if version >= 0 {
451 n += {
452 let prefix = crate::primitives::array::array_len_prefix_len(
453 (self.acknowledgement_batches).len(),
454 flex,
455 );
456 let body: usize = (self.acknowledgement_batches)
457 .iter()
458 .map(|it| it.encoded_len(version))
459 .sum();
460 prefix + body
461 };
462 }
463 if flex {
464 let known_pairs: Vec<(u32, usize)> = Vec::new();
465 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
466 }
467 n
468 }
469}
470impl Decode<'_> for FetchPartition {
471 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
472 let flex = version >= 0;
473 let mut out = Self::default();
474 if version >= 0 {
475 out.partition_index = get_i32(buf)?;
476 }
477 if version == 0 {
478 out.partition_max_bytes = get_i32(buf)?;
479 }
480 if version >= 0 {
481 out.acknowledgement_batches = {
482 let n = crate::primitives::array::get_array_len(buf, flex)?;
483 let mut v = Vec::with_capacity(n);
484 for _ in 0..n {
485 v.push(AcknowledgementBatch::decode(buf, version)?);
486 }
487 v
488 };
489 }
490 if flex {
491 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
492 }
493 Ok(out)
494 }
495}
496#[cfg(test)]
497impl FetchPartition {
498 #[must_use]
499 pub fn populated(version: i16) -> Self {
500 let mut m = Self::default();
501 if version >= 0 {
502 m.partition_index = 1i32;
503 }
504 if version == 0 {
505 m.partition_max_bytes = 1i32;
506 }
507 if version >= 0 {
508 m.acknowledgement_batches = vec![AcknowledgementBatch::populated(version)];
509 }
510 m
511 }
512}
513#[derive(Debug, Clone, PartialEq, Eq, Default)]
514pub struct AcknowledgementBatch {
515 pub first_offset: i64,
516 pub last_offset: i64,
517 pub acknowledge_types: Vec<i8>,
518 pub unknown_tagged_fields: UnknownTaggedFields,
519}
520impl Encode for AcknowledgementBatch {
521 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
522 let flex = version >= 0;
523 if version >= 0 {
524 put_i64(buf, self.first_offset);
525 }
526 if version >= 0 {
527 put_i64(buf, self.last_offset);
528 }
529 if version >= 0 {
530 {
531 crate::primitives::array::put_array_len(buf, (self.acknowledge_types).len(), flex);
532 for it in &self.acknowledge_types {
533 put_i8(buf, *it);
534 }
535 }
536 }
537 if flex {
538 let tagged = WriteTaggedFields::new();
539 tagged.write(buf, &self.unknown_tagged_fields);
540 }
541 Ok(())
542 }
543 fn encoded_len(&self, version: i16) -> usize {
544 let flex = version >= 0;
545 let mut n: usize = 0;
546 if version >= 0 {
547 n += 8;
548 }
549 if version >= 0 {
550 n += 8;
551 }
552 if version >= 0 {
553 n += {
554 let prefix = crate::primitives::array::array_len_prefix_len(
555 (self.acknowledge_types).len(),
556 flex,
557 );
558 let body: usize = (self.acknowledge_types).iter().map(|_| 1).sum();
559 prefix + body
560 };
561 }
562 if flex {
563 let known_pairs: Vec<(u32, usize)> = Vec::new();
564 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
565 }
566 n
567 }
568}
569impl Decode<'_> for AcknowledgementBatch {
570 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
571 let flex = version >= 0;
572 let mut out = Self::default();
573 if version >= 0 {
574 out.first_offset = get_i64(buf)?;
575 }
576 if version >= 0 {
577 out.last_offset = get_i64(buf)?;
578 }
579 if version >= 0 {
580 out.acknowledge_types = {
581 let n = crate::primitives::array::get_array_len(buf, flex)?;
582 let mut v = Vec::with_capacity(n);
583 for _ in 0..n {
584 v.push(get_i8(buf)?);
585 }
586 v
587 };
588 }
589 if flex {
590 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
591 }
592 Ok(out)
593 }
594}
595#[cfg(test)]
596impl AcknowledgementBatch {
597 #[must_use]
598 pub fn populated(version: i16) -> Self {
599 let mut m = Self::default();
600 if version >= 0 {
601 m.first_offset = 1i64;
602 }
603 if version >= 0 {
604 m.last_offset = 1i64;
605 }
606 if version >= 0 {
607 m.acknowledge_types = vec![1i8];
608 }
609 m
610 }
611}
612#[derive(Debug, Clone, PartialEq, Eq, Default)]
613pub struct ForgottenTopic {
614 pub topic_id: crate::primitives::uuid::Uuid,
615 pub partitions: Vec<i32>,
616 pub unknown_tagged_fields: UnknownTaggedFields,
617}
618impl Encode for ForgottenTopic {
619 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
620 let flex = version >= 0;
621 if version >= 0 {
622 crate::primitives::uuid::put_uuid(buf, self.topic_id);
623 }
624 if version >= 0 {
625 {
626 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
627 for it in &self.partitions {
628 put_i32(buf, *it);
629 }
630 }
631 }
632 if flex {
633 let tagged = WriteTaggedFields::new();
634 tagged.write(buf, &self.unknown_tagged_fields);
635 }
636 Ok(())
637 }
638 fn encoded_len(&self, version: i16) -> usize {
639 let flex = version >= 0;
640 let mut n: usize = 0;
641 if version >= 0 {
642 n += 16;
643 }
644 if version >= 0 {
645 n += {
646 let prefix =
647 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
648 let body: usize = (self.partitions).iter().map(|_| 4).sum();
649 prefix + body
650 };
651 }
652 if flex {
653 let known_pairs: Vec<(u32, usize)> = Vec::new();
654 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
655 }
656 n
657 }
658}
659impl Decode<'_> for ForgottenTopic {
660 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
661 let flex = version >= 0;
662 let mut out = Self::default();
663 if version >= 0 {
664 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
665 }
666 if version >= 0 {
667 out.partitions = {
668 let n = crate::primitives::array::get_array_len(buf, flex)?;
669 let mut v = Vec::with_capacity(n);
670 for _ in 0..n {
671 v.push(get_i32(buf)?);
672 }
673 v
674 };
675 }
676 if flex {
677 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
678 }
679 Ok(out)
680 }
681}
682#[cfg(test)]
683impl ForgottenTopic {
684 #[must_use]
685 pub fn populated(version: i16) -> Self {
686 let mut m = Self::default();
687 if version >= 0 {
688 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
689 }
690 if version >= 0 {
691 m.partitions = vec![1i32];
692 }
693 m
694 }
695}
696
697#[must_use]
700#[allow(unused_comparisons)]
701pub fn default_json(version: i16) -> ::serde_json::Value {
702 let mut obj = ::serde_json::Map::new();
703 obj.insert("groupId".to_string(), ::serde_json::Value::Null);
704 obj.insert("memberId".to_string(), ::serde_json::Value::Null);
705 obj.insert("shareSessionEpoch".to_string(), ::serde_json::json!(0));
706 obj.insert("maxWaitMs".to_string(), ::serde_json::json!(0));
707 obj.insert("minBytes".to_string(), ::serde_json::json!(0));
708 obj.insert("maxBytes".to_string(), ::serde_json::json!(2147483647));
709 if version >= 1 {
710 obj.insert("maxRecords".to_string(), ::serde_json::json!(0));
711 }
712 if version >= 1 {
713 obj.insert("batchSize".to_string(), ::serde_json::json!(0));
714 }
715 if version >= 2 {
716 obj.insert("shareAcquireMode".to_string(), ::serde_json::json!(0));
717 }
718 if version >= 2 {
719 obj.insert("isRenewAck".to_string(), ::serde_json::Value::Bool(false));
720 }
721 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
722 obj.insert(
723 "forgottenTopicsData".to_string(),
724 ::serde_json::Value::Array(vec![]),
725 );
726 ::serde_json::Value::Object(obj)
727}
728
729impl crate::ProtocolRequest for ShareFetchRequest {
730 const API_KEY: i16 = API_KEY;
731 const MIN_VERSION: i16 = MIN_VERSION;
732 const MAX_VERSION: i16 = MAX_VERSION;
733 const FLEXIBLE_MIN: i16 = FLEXIBLE_MIN;
734 type Response = super::share_fetch_response::ShareFetchResponse;
735}