1use bytes::{Buf, BufMut};
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, get_compact_nullable_string_owned,
8 get_compact_string_owned, get_nullable_string_owned, get_string_owned, nullable_string_len,
9 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
10};
11use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
12use crate::{Decode, Encode, ProtocolError, UnknownTaggedFields};
13
14pub const API_KEY: i16 = 9;
15pub const MIN_VERSION: i16 = 1;
16pub const MAX_VERSION: i16 = 10;
17pub const FLEXIBLE_MIN: i16 = 6;
18
19#[inline]
20fn is_flexible(version: i16) -> bool {
21 version >= FLEXIBLE_MIN
22}
23
24#[derive(Debug, Clone, PartialEq, Eq, Default)]
25pub struct OffsetFetchResponse {
26 pub throttle_time_ms: i32,
27 pub topics: Vec<OffsetFetchResponseTopic>,
28 pub error_code: i16,
29 pub groups: Vec<OffsetFetchResponseGroup>,
30 pub unknown_tagged_fields: UnknownTaggedFields,
31}
32impl Encode for OffsetFetchResponse {
33 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
34 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
35 return Err(ProtocolError::UnsupportedVersion {
36 api_key: API_KEY,
37 version,
38 });
39 }
40 let flex = is_flexible(version);
41 if version >= 3 {
42 put_i32(buf, self.throttle_time_ms);
43 }
44 if (0..=7).contains(&version) {
45 {
46 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
47 for it in &self.topics {
48 it.encode(buf, version)?;
49 }
50 }
51 }
52 if (2..=7).contains(&version) {
53 put_i16(buf, self.error_code);
54 }
55 if version >= 8 {
56 {
57 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
58 for it in &self.groups {
59 it.encode(buf, version)?;
60 }
61 }
62 }
63 if flex {
64 let tagged = WriteTaggedFields::new();
65 tagged.write(buf, &self.unknown_tagged_fields);
66 }
67 Ok(())
68 }
69 fn encoded_len(&self, version: i16) -> usize {
70 let flex = is_flexible(version);
71 let mut n: usize = 0;
72 if version >= 3 {
73 n += 4;
74 }
75 if (0..=7).contains(&version) {
76 n += {
77 let prefix =
78 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
79 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
80 prefix + body
81 };
82 }
83 if (2..=7).contains(&version) {
84 n += 2;
85 }
86 if version >= 8 {
87 n += {
88 let prefix =
89 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
90 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
91 prefix + body
92 };
93 }
94 if flex {
95 let known_pairs: Vec<(u32, usize)> = Vec::new();
96 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
97 }
98 n
99 }
100}
101impl Decode<'_> for OffsetFetchResponse {
102 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
103 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
104 return Err(ProtocolError::UnsupportedVersion {
105 api_key: API_KEY,
106 version,
107 });
108 }
109 let flex = is_flexible(version);
110 let mut out = Self::default();
111 if version >= 3 {
112 out.throttle_time_ms = get_i32(buf)?;
113 }
114 if (0..=7).contains(&version) {
115 out.topics = {
116 let n = crate::primitives::array::get_array_len(buf, flex)?;
117 let mut v = Vec::with_capacity(n);
118 for _ in 0..n {
119 v.push(OffsetFetchResponseTopic::decode(buf, version)?);
120 }
121 v
122 };
123 }
124 if (2..=7).contains(&version) {
125 out.error_code = get_i16(buf)?;
126 }
127 if version >= 8 {
128 out.groups = {
129 let n = crate::primitives::array::get_array_len(buf, flex)?;
130 let mut v = Vec::with_capacity(n);
131 for _ in 0..n {
132 v.push(OffsetFetchResponseGroup::decode(buf, version)?);
133 }
134 v
135 };
136 }
137 if flex {
138 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
139 }
140 Ok(out)
141 }
142}
143#[cfg(test)]
144impl OffsetFetchResponse {
145 #[must_use]
146 pub fn populated(version: i16) -> Self {
147 let mut m = Self::default();
148 if version >= 3 {
149 m.throttle_time_ms = 1i32;
150 }
151 if (0..=7).contains(&version) {
152 m.topics = vec![OffsetFetchResponseTopic::populated(version)];
153 }
154 if (2..=7).contains(&version) {
155 m.error_code = 1i16;
156 }
157 if version >= 8 {
158 m.groups = vec![OffsetFetchResponseGroup::populated(version)];
159 }
160 m
161 }
162}
163#[derive(Debug, Clone, PartialEq, Eq, Default)]
164pub struct OffsetFetchResponseTopic {
165 pub name: String,
166 pub partitions: Vec<OffsetFetchResponsePartition>,
167 pub unknown_tagged_fields: UnknownTaggedFields,
168}
169impl Encode for OffsetFetchResponseTopic {
170 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
171 let flex = version >= 6;
172 if (0..=7).contains(&version) {
173 if flex {
174 put_compact_string(buf, &self.name);
175 } else {
176 put_string(buf, &self.name);
177 }
178 }
179 if (0..=7).contains(&version) {
180 {
181 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
182 for it in &self.partitions {
183 it.encode(buf, version)?;
184 }
185 }
186 }
187 if flex {
188 let tagged = WriteTaggedFields::new();
189 tagged.write(buf, &self.unknown_tagged_fields);
190 }
191 Ok(())
192 }
193 fn encoded_len(&self, version: i16) -> usize {
194 let flex = version >= 6;
195 let mut n: usize = 0;
196 if (0..=7).contains(&version) {
197 n += if flex {
198 compact_string_len(&self.name)
199 } else {
200 string_len(&self.name)
201 };
202 }
203 if (0..=7).contains(&version) {
204 n += {
205 let prefix =
206 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
207 let body: usize = (self.partitions)
208 .iter()
209 .map(|it| it.encoded_len(version))
210 .sum();
211 prefix + body
212 };
213 }
214 if flex {
215 let known_pairs: Vec<(u32, usize)> = Vec::new();
216 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
217 }
218 n
219 }
220}
221impl Decode<'_> for OffsetFetchResponseTopic {
222 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
223 let flex = version >= 6;
224 let mut out = Self::default();
225 if (0..=7).contains(&version) {
226 out.name = if flex {
227 get_compact_string_owned(buf)?
228 } else {
229 get_string_owned(buf)?
230 };
231 }
232 if (0..=7).contains(&version) {
233 out.partitions = {
234 let n = crate::primitives::array::get_array_len(buf, flex)?;
235 let mut v = Vec::with_capacity(n);
236 for _ in 0..n {
237 v.push(OffsetFetchResponsePartition::decode(buf, version)?);
238 }
239 v
240 };
241 }
242 if flex {
243 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
244 }
245 Ok(out)
246 }
247}
248#[cfg(test)]
249impl OffsetFetchResponseTopic {
250 #[must_use]
251 pub fn populated(version: i16) -> Self {
252 let mut m = Self::default();
253 if (0..=7).contains(&version) {
254 m.name = "x".to_string();
255 }
256 if (0..=7).contains(&version) {
257 m.partitions = vec![OffsetFetchResponsePartition::populated(version)];
258 }
259 m
260 }
261}
262#[derive(Debug, Clone, PartialEq, Eq)]
263pub struct OffsetFetchResponsePartition {
264 pub partition_index: i32,
265 pub committed_offset: i64,
266 pub committed_leader_epoch: i32,
267 pub metadata: Option<String>,
268 pub error_code: i16,
269 pub unknown_tagged_fields: UnknownTaggedFields,
270}
271impl Default for OffsetFetchResponsePartition {
272 fn default() -> Self {
273 Self {
274 partition_index: 0i32,
275 committed_offset: 0i64,
276 committed_leader_epoch: -1i32,
277 metadata: None,
278 error_code: 0i16,
279 unknown_tagged_fields: Default::default(),
280 }
281 }
282}
283impl Encode for OffsetFetchResponsePartition {
284 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
285 let flex = version >= 6;
286 if (0..=7).contains(&version) {
287 put_i32(buf, self.partition_index);
288 }
289 if (0..=7).contains(&version) {
290 put_i64(buf, self.committed_offset);
291 }
292 if (5..=7).contains(&version) {
293 put_i32(buf, self.committed_leader_epoch);
294 }
295 if (0..=7).contains(&version) {
296 if flex {
297 put_compact_nullable_string(buf, self.metadata.as_deref());
298 } else {
299 put_nullable_string(buf, self.metadata.as_deref());
300 }
301 }
302 if (0..=7).contains(&version) {
303 put_i16(buf, self.error_code);
304 }
305 if flex {
306 let tagged = WriteTaggedFields::new();
307 tagged.write(buf, &self.unknown_tagged_fields);
308 }
309 Ok(())
310 }
311 fn encoded_len(&self, version: i16) -> usize {
312 let flex = version >= 6;
313 let mut n: usize = 0;
314 if (0..=7).contains(&version) {
315 n += 4;
316 }
317 if (0..=7).contains(&version) {
318 n += 8;
319 }
320 if (5..=7).contains(&version) {
321 n += 4;
322 }
323 if (0..=7).contains(&version) {
324 n += if flex {
325 compact_nullable_string_len(self.metadata.as_deref())
326 } else {
327 nullable_string_len(self.metadata.as_deref())
328 };
329 }
330 if (0..=7).contains(&version) {
331 n += 2;
332 }
333 if flex {
334 let known_pairs: Vec<(u32, usize)> = Vec::new();
335 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
336 }
337 n
338 }
339}
340impl Decode<'_> for OffsetFetchResponsePartition {
341 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
342 let flex = version >= 6;
343 let mut out = Self::default();
344 if (0..=7).contains(&version) {
345 out.partition_index = get_i32(buf)?;
346 }
347 if (0..=7).contains(&version) {
348 out.committed_offset = get_i64(buf)?;
349 }
350 if (5..=7).contains(&version) {
351 out.committed_leader_epoch = get_i32(buf)?;
352 }
353 if (0..=7).contains(&version) {
354 out.metadata = if flex {
355 get_compact_nullable_string_owned(buf)?
356 } else {
357 get_nullable_string_owned(buf)?
358 };
359 }
360 if (0..=7).contains(&version) {
361 out.error_code = get_i16(buf)?;
362 }
363 if flex {
364 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
365 }
366 Ok(out)
367 }
368}
369#[cfg(test)]
370impl OffsetFetchResponsePartition {
371 #[must_use]
372 pub fn populated(version: i16) -> Self {
373 let mut m = Self::default();
374 if (0..=7).contains(&version) {
375 m.partition_index = 1i32;
376 }
377 if (0..=7).contains(&version) {
378 m.committed_offset = 1i64;
379 }
380 if (5..=7).contains(&version) {
381 m.committed_leader_epoch = 1i32;
382 }
383 if (0..=7).contains(&version) {
384 m.metadata = Some("x".to_string());
385 }
386 if (0..=7).contains(&version) {
387 m.error_code = 1i16;
388 }
389 m
390 }
391}
392#[derive(Debug, Clone, PartialEq, Eq, Default)]
393pub struct OffsetFetchResponseGroup {
394 pub group_id: String,
395 pub topics: Vec<OffsetFetchResponseTopics>,
396 pub error_code: i16,
397 pub unknown_tagged_fields: UnknownTaggedFields,
398}
399impl Encode for OffsetFetchResponseGroup {
400 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
401 let flex = version >= 6;
402 if version >= 8 {
403 if flex {
404 put_compact_string(buf, &self.group_id);
405 } else {
406 put_string(buf, &self.group_id);
407 }
408 }
409 if version >= 8 {
410 {
411 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
412 for it in &self.topics {
413 it.encode(buf, version)?;
414 }
415 }
416 }
417 if version >= 8 {
418 put_i16(buf, self.error_code);
419 }
420 if flex {
421 let tagged = WriteTaggedFields::new();
422 tagged.write(buf, &self.unknown_tagged_fields);
423 }
424 Ok(())
425 }
426 fn encoded_len(&self, version: i16) -> usize {
427 let flex = version >= 6;
428 let mut n: usize = 0;
429 if version >= 8 {
430 n += if flex {
431 compact_string_len(&self.group_id)
432 } else {
433 string_len(&self.group_id)
434 };
435 }
436 if version >= 8 {
437 n += {
438 let prefix =
439 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
440 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
441 prefix + body
442 };
443 }
444 if version >= 8 {
445 n += 2;
446 }
447 if flex {
448 let known_pairs: Vec<(u32, usize)> = Vec::new();
449 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
450 }
451 n
452 }
453}
454impl Decode<'_> for OffsetFetchResponseGroup {
455 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
456 let flex = version >= 6;
457 let mut out = Self::default();
458 if version >= 8 {
459 out.group_id = if flex {
460 get_compact_string_owned(buf)?
461 } else {
462 get_string_owned(buf)?
463 };
464 }
465 if version >= 8 {
466 out.topics = {
467 let n = crate::primitives::array::get_array_len(buf, flex)?;
468 let mut v = Vec::with_capacity(n);
469 for _ in 0..n {
470 v.push(OffsetFetchResponseTopics::decode(buf, version)?);
471 }
472 v
473 };
474 }
475 if version >= 8 {
476 out.error_code = get_i16(buf)?;
477 }
478 if flex {
479 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
480 }
481 Ok(out)
482 }
483}
484#[cfg(test)]
485impl OffsetFetchResponseGroup {
486 #[must_use]
487 pub fn populated(version: i16) -> Self {
488 let mut m = Self::default();
489 if version >= 8 {
490 m.group_id = "x".to_string();
491 }
492 if version >= 8 {
493 m.topics = vec![OffsetFetchResponseTopics::populated(version)];
494 }
495 if version >= 8 {
496 m.error_code = 1i16;
497 }
498 m
499 }
500}
501#[derive(Debug, Clone, PartialEq, Eq, Default)]
502pub struct OffsetFetchResponseTopics {
503 pub name: String,
504 pub topic_id: crate::primitives::uuid::Uuid,
505 pub partitions: Vec<OffsetFetchResponsePartitions>,
506 pub unknown_tagged_fields: UnknownTaggedFields,
507}
508impl Encode for OffsetFetchResponseTopics {
509 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
510 let flex = version >= 6;
511 if (8..=9).contains(&version) {
512 if flex {
513 put_compact_string(buf, &self.name);
514 } else {
515 put_string(buf, &self.name);
516 }
517 }
518 if version >= 10 {
519 crate::primitives::uuid::put_uuid(buf, self.topic_id);
520 }
521 if version >= 8 {
522 {
523 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
524 for it in &self.partitions {
525 it.encode(buf, version)?;
526 }
527 }
528 }
529 if flex {
530 let tagged = WriteTaggedFields::new();
531 tagged.write(buf, &self.unknown_tagged_fields);
532 }
533 Ok(())
534 }
535 fn encoded_len(&self, version: i16) -> usize {
536 let flex = version >= 6;
537 let mut n: usize = 0;
538 if (8..=9).contains(&version) {
539 n += if flex {
540 compact_string_len(&self.name)
541 } else {
542 string_len(&self.name)
543 };
544 }
545 if version >= 10 {
546 n += 16;
547 }
548 if version >= 8 {
549 n += {
550 let prefix =
551 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
552 let body: usize = (self.partitions)
553 .iter()
554 .map(|it| it.encoded_len(version))
555 .sum();
556 prefix + body
557 };
558 }
559 if flex {
560 let known_pairs: Vec<(u32, usize)> = Vec::new();
561 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
562 }
563 n
564 }
565}
566impl Decode<'_> for OffsetFetchResponseTopics {
567 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
568 let flex = version >= 6;
569 let mut out = Self::default();
570 if (8..=9).contains(&version) {
571 out.name = if flex {
572 get_compact_string_owned(buf)?
573 } else {
574 get_string_owned(buf)?
575 };
576 }
577 if version >= 10 {
578 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
579 }
580 if version >= 8 {
581 out.partitions = {
582 let n = crate::primitives::array::get_array_len(buf, flex)?;
583 let mut v = Vec::with_capacity(n);
584 for _ in 0..n {
585 v.push(OffsetFetchResponsePartitions::decode(buf, version)?);
586 }
587 v
588 };
589 }
590 if flex {
591 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
592 }
593 Ok(out)
594 }
595}
596#[cfg(test)]
597impl OffsetFetchResponseTopics {
598 #[must_use]
599 pub fn populated(version: i16) -> Self {
600 let mut m = Self::default();
601 if (8..=9).contains(&version) {
602 m.name = "x".to_string();
603 }
604 if version >= 10 {
605 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
606 }
607 if version >= 8 {
608 m.partitions = vec![OffsetFetchResponsePartitions::populated(version)];
609 }
610 m
611 }
612}
613#[derive(Debug, Clone, PartialEq, Eq)]
614pub struct OffsetFetchResponsePartitions {
615 pub partition_index: i32,
616 pub committed_offset: i64,
617 pub committed_leader_epoch: i32,
618 pub metadata: Option<String>,
619 pub error_code: i16,
620 pub unknown_tagged_fields: UnknownTaggedFields,
621}
622impl Default for OffsetFetchResponsePartitions {
623 fn default() -> Self {
624 Self {
625 partition_index: 0i32,
626 committed_offset: 0i64,
627 committed_leader_epoch: -1i32,
628 metadata: None,
629 error_code: 0i16,
630 unknown_tagged_fields: Default::default(),
631 }
632 }
633}
634impl Encode for OffsetFetchResponsePartitions {
635 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
636 let flex = version >= 6;
637 if version >= 8 {
638 put_i32(buf, self.partition_index);
639 }
640 if version >= 8 {
641 put_i64(buf, self.committed_offset);
642 }
643 if version >= 8 {
644 put_i32(buf, self.committed_leader_epoch);
645 }
646 if version >= 8 {
647 if flex {
648 put_compact_nullable_string(buf, self.metadata.as_deref());
649 } else {
650 put_nullable_string(buf, self.metadata.as_deref());
651 }
652 }
653 if version >= 8 {
654 put_i16(buf, self.error_code);
655 }
656 if flex {
657 let tagged = WriteTaggedFields::new();
658 tagged.write(buf, &self.unknown_tagged_fields);
659 }
660 Ok(())
661 }
662 fn encoded_len(&self, version: i16) -> usize {
663 let flex = version >= 6;
664 let mut n: usize = 0;
665 if version >= 8 {
666 n += 4;
667 }
668 if version >= 8 {
669 n += 8;
670 }
671 if version >= 8 {
672 n += 4;
673 }
674 if version >= 8 {
675 n += if flex {
676 compact_nullable_string_len(self.metadata.as_deref())
677 } else {
678 nullable_string_len(self.metadata.as_deref())
679 };
680 }
681 if version >= 8 {
682 n += 2;
683 }
684 if flex {
685 let known_pairs: Vec<(u32, usize)> = Vec::new();
686 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
687 }
688 n
689 }
690}
691impl Decode<'_> for OffsetFetchResponsePartitions {
692 fn decode<B: Buf>(buf: &mut B, version: i16) -> Result<Self, ProtocolError> {
693 let flex = version >= 6;
694 let mut out = Self::default();
695 if version >= 8 {
696 out.partition_index = get_i32(buf)?;
697 }
698 if version >= 8 {
699 out.committed_offset = get_i64(buf)?;
700 }
701 if version >= 8 {
702 out.committed_leader_epoch = get_i32(buf)?;
703 }
704 if version >= 8 {
705 out.metadata = if flex {
706 get_compact_nullable_string_owned(buf)?
707 } else {
708 get_nullable_string_owned(buf)?
709 };
710 }
711 if version >= 8 {
712 out.error_code = get_i16(buf)?;
713 }
714 if flex {
715 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
716 }
717 Ok(out)
718 }
719}
720#[cfg(test)]
721impl OffsetFetchResponsePartitions {
722 #[must_use]
723 pub fn populated(version: i16) -> Self {
724 let mut m = Self::default();
725 if version >= 8 {
726 m.partition_index = 1i32;
727 }
728 if version >= 8 {
729 m.committed_offset = 1i64;
730 }
731 if version >= 8 {
732 m.committed_leader_epoch = 1i32;
733 }
734 if version >= 8 {
735 m.metadata = Some("x".to_string());
736 }
737 if version >= 8 {
738 m.error_code = 1i16;
739 }
740 m
741 }
742}
743
744#[must_use]
747#[allow(unused_comparisons)]
748pub fn default_json(version: i16) -> ::serde_json::Value {
749 let mut obj = ::serde_json::Map::new();
750 if version >= 3 {
751 obj.insert("throttleTimeMs".to_string(), ::serde_json::json!(0));
752 }
753 if version <= 7 {
754 obj.insert("topics".to_string(), ::serde_json::Value::Array(vec![]));
755 }
756 if (2..=7).contains(&version) {
757 obj.insert("errorCode".to_string(), ::serde_json::json!(0));
758 }
759 if version >= 8 {
760 obj.insert("groups".to_string(), ::serde_json::Value::Array(vec![]));
761 }
762 ::serde_json::Value::Object(obj)
763}