1use bytes::BufMut;
4
5use crate::primitives::fixed::{get_i16, get_i32, get_i64, put_i16, put_i32, put_i64};
6use crate::primitives::string_bytes::{
7 compact_nullable_string_len, compact_string_len, nullable_string_len,
8 put_compact_nullable_string, put_compact_string, put_nullable_string, put_string, string_len,
9};
10use crate::primitives::string_bytes_borrowed::{
11 get_compact_nullable_string_borrowed, get_compact_string_borrowed,
12 get_nullable_string_borrowed, get_string_borrowed,
13};
14use crate::tagged_fields::{WriteTaggedFields, read_tagged_fields, tagged_fields_len};
15use crate::{DecodeBorrow, Encode, ProtocolError, UnknownTaggedFields};
16
17pub const API_KEY: i16 = 9;
18pub const MIN_VERSION: i16 = 1;
19pub const MAX_VERSION: i16 = 10;
20pub const FLEXIBLE_MIN: i16 = 6;
21
22#[inline]
23fn is_flexible(version: i16) -> bool {
24 version >= FLEXIBLE_MIN
25}
26
27#[derive(Debug, Clone, PartialEq, Eq, Default)]
28pub struct OffsetFetchResponse<'a> {
29 pub throttle_time_ms: i32,
30 pub topics: Vec<OffsetFetchResponseTopic<'a>>,
31 pub error_code: i16,
32 pub groups: Vec<OffsetFetchResponseGroup<'a>>,
33 pub unknown_tagged_fields: UnknownTaggedFields,
34}
35impl OffsetFetchResponse<'_> {
36 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponse {
37 crate::owned::offset_fetch_response::OffsetFetchResponse {
38 throttle_time_ms: (self.throttle_time_ms),
39 topics: (self.topics)
40 .iter()
41 .map(OffsetFetchResponseTopic::to_owned)
42 .collect(),
43 error_code: (self.error_code),
44 groups: (self.groups)
45 .iter()
46 .map(OffsetFetchResponseGroup::to_owned)
47 .collect(),
48 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
49 }
50 }
51}
52impl Encode for OffsetFetchResponse<'_> {
53 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
54 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
55 return Err(ProtocolError::UnsupportedVersion {
56 api_key: API_KEY,
57 version,
58 });
59 }
60 let flex = is_flexible(version);
61 if version >= 3 {
62 put_i32(buf, self.throttle_time_ms);
63 }
64 if (0..=7).contains(&version) {
65 {
66 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
67 for it in &self.topics {
68 it.encode(buf, version)?;
69 }
70 }
71 }
72 if (2..=7).contains(&version) {
73 put_i16(buf, self.error_code);
74 }
75 if version >= 8 {
76 {
77 crate::primitives::array::put_array_len(buf, (self.groups).len(), flex);
78 for it in &self.groups {
79 it.encode(buf, version)?;
80 }
81 }
82 }
83 if flex {
84 let tagged = WriteTaggedFields::new();
85 tagged.write(buf, &self.unknown_tagged_fields);
86 }
87 Ok(())
88 }
89 fn encoded_len(&self, version: i16) -> usize {
90 let flex = is_flexible(version);
91 let mut n: usize = 0;
92 if version >= 3 {
93 n += 4;
94 }
95 if (0..=7).contains(&version) {
96 n += {
97 let prefix =
98 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
99 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
100 prefix + body
101 };
102 }
103 if (2..=7).contains(&version) {
104 n += 2;
105 }
106 if version >= 8 {
107 n += {
108 let prefix =
109 crate::primitives::array::array_len_prefix_len((self.groups).len(), flex);
110 let body: usize = (self.groups).iter().map(|it| it.encoded_len(version)).sum();
111 prefix + body
112 };
113 }
114 if flex {
115 let known_pairs: Vec<(u32, usize)> = Vec::new();
116 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
117 }
118 n
119 }
120}
121impl<'de> DecodeBorrow<'de> for OffsetFetchResponse<'de> {
122 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
123 if !(MIN_VERSION..=MAX_VERSION).contains(&version) {
124 return Err(ProtocolError::UnsupportedVersion {
125 api_key: API_KEY,
126 version,
127 });
128 }
129 let flex = is_flexible(version);
130 let mut out = Self::default();
131 if version >= 3 {
132 out.throttle_time_ms = get_i32(buf)?;
133 }
134 if (0..=7).contains(&version) {
135 out.topics = {
136 let n = crate::primitives::array::get_array_len(buf, flex)?;
137 let mut v = Vec::with_capacity(n);
138 for _ in 0..n {
139 v.push(OffsetFetchResponseTopic::decode_borrow(buf, version)?);
140 }
141 v
142 };
143 }
144 if (2..=7).contains(&version) {
145 out.error_code = get_i16(buf)?;
146 }
147 if version >= 8 {
148 out.groups = {
149 let n = crate::primitives::array::get_array_len(buf, flex)?;
150 let mut v = Vec::with_capacity(n);
151 for _ in 0..n {
152 v.push(OffsetFetchResponseGroup::decode_borrow(buf, version)?);
153 }
154 v
155 };
156 }
157 if flex {
158 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
159 }
160 Ok(out)
161 }
162}
163#[cfg(test)]
164impl OffsetFetchResponse<'_> {
165 #[must_use]
166 pub fn populated(version: i16) -> Self {
167 let mut m = Self::default();
168 if version >= 3 {
169 m.throttle_time_ms = 1i32;
170 }
171 if (0..=7).contains(&version) {
172 m.topics = vec![OffsetFetchResponseTopic::populated(version)];
173 }
174 if (2..=7).contains(&version) {
175 m.error_code = 1i16;
176 }
177 if version >= 8 {
178 m.groups = vec![OffsetFetchResponseGroup::populated(version)];
179 }
180 m
181 }
182}
183#[derive(Debug, Clone, PartialEq, Eq, Default)]
184pub struct OffsetFetchResponseTopic<'a> {
185 pub name: &'a str,
186 pub partitions: Vec<OffsetFetchResponsePartition<'a>>,
187 pub unknown_tagged_fields: UnknownTaggedFields,
188}
189impl OffsetFetchResponseTopic<'_> {
190 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseTopic {
191 crate::owned::offset_fetch_response::OffsetFetchResponseTopic {
192 name: (self.name).to_string(),
193 partitions: (self.partitions)
194 .iter()
195 .map(OffsetFetchResponsePartition::to_owned)
196 .collect(),
197 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
198 }
199 }
200}
201impl Encode for OffsetFetchResponseTopic<'_> {
202 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
203 let flex = version >= 6;
204 if (0..=7).contains(&version) {
205 if flex {
206 put_compact_string(buf, self.name);
207 } else {
208 put_string(buf, self.name);
209 }
210 }
211 if (0..=7).contains(&version) {
212 {
213 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
214 for it in &self.partitions {
215 it.encode(buf, version)?;
216 }
217 }
218 }
219 if flex {
220 let tagged = WriteTaggedFields::new();
221 tagged.write(buf, &self.unknown_tagged_fields);
222 }
223 Ok(())
224 }
225 fn encoded_len(&self, version: i16) -> usize {
226 let flex = version >= 6;
227 let mut n: usize = 0;
228 if (0..=7).contains(&version) {
229 n += if flex {
230 compact_string_len(self.name)
231 } else {
232 string_len(self.name)
233 };
234 }
235 if (0..=7).contains(&version) {
236 n += {
237 let prefix =
238 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
239 let body: usize = (self.partitions)
240 .iter()
241 .map(|it| it.encoded_len(version))
242 .sum();
243 prefix + body
244 };
245 }
246 if flex {
247 let known_pairs: Vec<(u32, usize)> = Vec::new();
248 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
249 }
250 n
251 }
252}
253impl<'de> DecodeBorrow<'de> for OffsetFetchResponseTopic<'de> {
254 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
255 let flex = version >= 6;
256 let mut out = Self::default();
257 if (0..=7).contains(&version) {
258 out.name = if flex {
259 get_compact_string_borrowed(buf)?
260 } else {
261 get_string_borrowed(buf)?
262 };
263 }
264 if (0..=7).contains(&version) {
265 out.partitions = {
266 let n = crate::primitives::array::get_array_len(buf, flex)?;
267 let mut v = Vec::with_capacity(n);
268 for _ in 0..n {
269 v.push(OffsetFetchResponsePartition::decode_borrow(buf, version)?);
270 }
271 v
272 };
273 }
274 if flex {
275 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
276 }
277 Ok(out)
278 }
279}
280#[cfg(test)]
281impl OffsetFetchResponseTopic<'_> {
282 #[must_use]
283 pub fn populated(version: i16) -> Self {
284 let mut m = Self::default();
285 if (0..=7).contains(&version) {
286 m.name = "x";
287 }
288 if (0..=7).contains(&version) {
289 m.partitions = vec![OffsetFetchResponsePartition::populated(version)];
290 }
291 m
292 }
293}
294#[derive(Debug, Clone, PartialEq, Eq)]
295pub struct OffsetFetchResponsePartition<'a> {
296 pub partition_index: i32,
297 pub committed_offset: i64,
298 pub committed_leader_epoch: i32,
299 pub metadata: Option<&'a str>,
300 pub error_code: i16,
301 pub unknown_tagged_fields: UnknownTaggedFields,
302}
303impl Default for OffsetFetchResponsePartition<'_> {
304 fn default() -> Self {
305 Self {
306 partition_index: 0i32,
307 committed_offset: 0i64,
308 committed_leader_epoch: -1i32,
309 metadata: None,
310 error_code: 0i16,
311 unknown_tagged_fields: Default::default(),
312 }
313 }
314}
315impl OffsetFetchResponsePartition<'_> {
316 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponsePartition {
317 crate::owned::offset_fetch_response::OffsetFetchResponsePartition {
318 partition_index: (self.partition_index),
319 committed_offset: (self.committed_offset),
320 committed_leader_epoch: (self.committed_leader_epoch),
321 metadata: (self.metadata).map(std::string::ToString::to_string),
322 error_code: (self.error_code),
323 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
324 }
325 }
326}
327impl Encode for OffsetFetchResponsePartition<'_> {
328 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
329 let flex = version >= 6;
330 if (0..=7).contains(&version) {
331 put_i32(buf, self.partition_index);
332 }
333 if (0..=7).contains(&version) {
334 put_i64(buf, self.committed_offset);
335 }
336 if (5..=7).contains(&version) {
337 put_i32(buf, self.committed_leader_epoch);
338 }
339 if (0..=7).contains(&version) {
340 if flex {
341 put_compact_nullable_string(buf, self.metadata);
342 } else {
343 put_nullable_string(buf, self.metadata);
344 }
345 }
346 if (0..=7).contains(&version) {
347 put_i16(buf, self.error_code);
348 }
349 if flex {
350 let tagged = WriteTaggedFields::new();
351 tagged.write(buf, &self.unknown_tagged_fields);
352 }
353 Ok(())
354 }
355 fn encoded_len(&self, version: i16) -> usize {
356 let flex = version >= 6;
357 let mut n: usize = 0;
358 if (0..=7).contains(&version) {
359 n += 4;
360 }
361 if (0..=7).contains(&version) {
362 n += 8;
363 }
364 if (5..=7).contains(&version) {
365 n += 4;
366 }
367 if (0..=7).contains(&version) {
368 n += if flex {
369 compact_nullable_string_len(self.metadata)
370 } else {
371 nullable_string_len(self.metadata)
372 };
373 }
374 if (0..=7).contains(&version) {
375 n += 2;
376 }
377 if flex {
378 let known_pairs: Vec<(u32, usize)> = Vec::new();
379 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
380 }
381 n
382 }
383}
384impl<'de> DecodeBorrow<'de> for OffsetFetchResponsePartition<'de> {
385 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
386 let flex = version >= 6;
387 let mut out = Self::default();
388 if (0..=7).contains(&version) {
389 out.partition_index = get_i32(buf)?;
390 }
391 if (0..=7).contains(&version) {
392 out.committed_offset = get_i64(buf)?;
393 }
394 if (5..=7).contains(&version) {
395 out.committed_leader_epoch = get_i32(buf)?;
396 }
397 if (0..=7).contains(&version) {
398 out.metadata = if flex {
399 get_compact_nullable_string_borrowed(buf)?
400 } else {
401 get_nullable_string_borrowed(buf)?
402 };
403 }
404 if (0..=7).contains(&version) {
405 out.error_code = get_i16(buf)?;
406 }
407 if flex {
408 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
409 }
410 Ok(out)
411 }
412}
413#[cfg(test)]
414impl OffsetFetchResponsePartition<'_> {
415 #[must_use]
416 pub fn populated(version: i16) -> Self {
417 let mut m = Self::default();
418 if (0..=7).contains(&version) {
419 m.partition_index = 1i32;
420 }
421 if (0..=7).contains(&version) {
422 m.committed_offset = 1i64;
423 }
424 if (5..=7).contains(&version) {
425 m.committed_leader_epoch = 1i32;
426 }
427 if (0..=7).contains(&version) {
428 m.metadata = Some("x");
429 }
430 if (0..=7).contains(&version) {
431 m.error_code = 1i16;
432 }
433 m
434 }
435}
436#[derive(Debug, Clone, PartialEq, Eq, Default)]
437pub struct OffsetFetchResponseGroup<'a> {
438 pub group_id: &'a str,
439 pub topics: Vec<OffsetFetchResponseTopics<'a>>,
440 pub error_code: i16,
441 pub unknown_tagged_fields: UnknownTaggedFields,
442}
443impl OffsetFetchResponseGroup<'_> {
444 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseGroup {
445 crate::owned::offset_fetch_response::OffsetFetchResponseGroup {
446 group_id: (self.group_id).to_string(),
447 topics: (self.topics)
448 .iter()
449 .map(OffsetFetchResponseTopics::to_owned)
450 .collect(),
451 error_code: (self.error_code),
452 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
453 }
454 }
455}
456impl Encode for OffsetFetchResponseGroup<'_> {
457 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
458 let flex = version >= 6;
459 if version >= 8 {
460 if flex {
461 put_compact_string(buf, self.group_id);
462 } else {
463 put_string(buf, self.group_id);
464 }
465 }
466 if version >= 8 {
467 {
468 crate::primitives::array::put_array_len(buf, (self.topics).len(), flex);
469 for it in &self.topics {
470 it.encode(buf, version)?;
471 }
472 }
473 }
474 if version >= 8 {
475 put_i16(buf, self.error_code);
476 }
477 if flex {
478 let tagged = WriteTaggedFields::new();
479 tagged.write(buf, &self.unknown_tagged_fields);
480 }
481 Ok(())
482 }
483 fn encoded_len(&self, version: i16) -> usize {
484 let flex = version >= 6;
485 let mut n: usize = 0;
486 if version >= 8 {
487 n += if flex {
488 compact_string_len(self.group_id)
489 } else {
490 string_len(self.group_id)
491 };
492 }
493 if version >= 8 {
494 n += {
495 let prefix =
496 crate::primitives::array::array_len_prefix_len((self.topics).len(), flex);
497 let body: usize = (self.topics).iter().map(|it| it.encoded_len(version)).sum();
498 prefix + body
499 };
500 }
501 if version >= 8 {
502 n += 2;
503 }
504 if flex {
505 let known_pairs: Vec<(u32, usize)> = Vec::new();
506 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
507 }
508 n
509 }
510}
511impl<'de> DecodeBorrow<'de> for OffsetFetchResponseGroup<'de> {
512 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
513 let flex = version >= 6;
514 let mut out = Self::default();
515 if version >= 8 {
516 out.group_id = if flex {
517 get_compact_string_borrowed(buf)?
518 } else {
519 get_string_borrowed(buf)?
520 };
521 }
522 if version >= 8 {
523 out.topics = {
524 let n = crate::primitives::array::get_array_len(buf, flex)?;
525 let mut v = Vec::with_capacity(n);
526 for _ in 0..n {
527 v.push(OffsetFetchResponseTopics::decode_borrow(buf, version)?);
528 }
529 v
530 };
531 }
532 if version >= 8 {
533 out.error_code = get_i16(buf)?;
534 }
535 if flex {
536 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
537 }
538 Ok(out)
539 }
540}
541#[cfg(test)]
542impl OffsetFetchResponseGroup<'_> {
543 #[must_use]
544 pub fn populated(version: i16) -> Self {
545 let mut m = Self::default();
546 if version >= 8 {
547 m.group_id = "x";
548 }
549 if version >= 8 {
550 m.topics = vec![OffsetFetchResponseTopics::populated(version)];
551 }
552 if version >= 8 {
553 m.error_code = 1i16;
554 }
555 m
556 }
557}
558#[derive(Debug, Clone, PartialEq, Eq, Default)]
559pub struct OffsetFetchResponseTopics<'a> {
560 pub name: &'a str,
561 pub topic_id: crate::primitives::uuid::Uuid,
562 pub partitions: Vec<OffsetFetchResponsePartitions<'a>>,
563 pub unknown_tagged_fields: UnknownTaggedFields,
564}
565impl OffsetFetchResponseTopics<'_> {
566 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponseTopics {
567 crate::owned::offset_fetch_response::OffsetFetchResponseTopics {
568 name: (self.name).to_string(),
569 topic_id: (self.topic_id),
570 partitions: (self.partitions)
571 .iter()
572 .map(OffsetFetchResponsePartitions::to_owned)
573 .collect(),
574 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
575 }
576 }
577}
578impl Encode for OffsetFetchResponseTopics<'_> {
579 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
580 let flex = version >= 6;
581 if (8..=9).contains(&version) {
582 if flex {
583 put_compact_string(buf, self.name);
584 } else {
585 put_string(buf, self.name);
586 }
587 }
588 if version >= 10 {
589 crate::primitives::uuid::put_uuid(buf, self.topic_id);
590 }
591 if version >= 8 {
592 {
593 crate::primitives::array::put_array_len(buf, (self.partitions).len(), flex);
594 for it in &self.partitions {
595 it.encode(buf, version)?;
596 }
597 }
598 }
599 if flex {
600 let tagged = WriteTaggedFields::new();
601 tagged.write(buf, &self.unknown_tagged_fields);
602 }
603 Ok(())
604 }
605 fn encoded_len(&self, version: i16) -> usize {
606 let flex = version >= 6;
607 let mut n: usize = 0;
608 if (8..=9).contains(&version) {
609 n += if flex {
610 compact_string_len(self.name)
611 } else {
612 string_len(self.name)
613 };
614 }
615 if version >= 10 {
616 n += 16;
617 }
618 if version >= 8 {
619 n += {
620 let prefix =
621 crate::primitives::array::array_len_prefix_len((self.partitions).len(), flex);
622 let body: usize = (self.partitions)
623 .iter()
624 .map(|it| it.encoded_len(version))
625 .sum();
626 prefix + body
627 };
628 }
629 if flex {
630 let known_pairs: Vec<(u32, usize)> = Vec::new();
631 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
632 }
633 n
634 }
635}
636impl<'de> DecodeBorrow<'de> for OffsetFetchResponseTopics<'de> {
637 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
638 let flex = version >= 6;
639 let mut out = Self::default();
640 if (8..=9).contains(&version) {
641 out.name = if flex {
642 get_compact_string_borrowed(buf)?
643 } else {
644 get_string_borrowed(buf)?
645 };
646 }
647 if version >= 10 {
648 out.topic_id = crate::primitives::uuid::get_uuid(buf)?;
649 }
650 if version >= 8 {
651 out.partitions = {
652 let n = crate::primitives::array::get_array_len(buf, flex)?;
653 let mut v = Vec::with_capacity(n);
654 for _ in 0..n {
655 v.push(OffsetFetchResponsePartitions::decode_borrow(buf, version)?);
656 }
657 v
658 };
659 }
660 if flex {
661 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
662 }
663 Ok(out)
664 }
665}
666#[cfg(test)]
667impl OffsetFetchResponseTopics<'_> {
668 #[must_use]
669 pub fn populated(version: i16) -> Self {
670 let mut m = Self::default();
671 if (8..=9).contains(&version) {
672 m.name = "x";
673 }
674 if version >= 10 {
675 m.topic_id = crate::primitives::uuid::Uuid([1u8; 16]);
676 }
677 if version >= 8 {
678 m.partitions = vec![OffsetFetchResponsePartitions::populated(version)];
679 }
680 m
681 }
682}
683#[derive(Debug, Clone, PartialEq, Eq)]
684pub struct OffsetFetchResponsePartitions<'a> {
685 pub partition_index: i32,
686 pub committed_offset: i64,
687 pub committed_leader_epoch: i32,
688 pub metadata: Option<&'a str>,
689 pub error_code: i16,
690 pub unknown_tagged_fields: UnknownTaggedFields,
691}
692impl Default for OffsetFetchResponsePartitions<'_> {
693 fn default() -> Self {
694 Self {
695 partition_index: 0i32,
696 committed_offset: 0i64,
697 committed_leader_epoch: -1i32,
698 metadata: None,
699 error_code: 0i16,
700 unknown_tagged_fields: Default::default(),
701 }
702 }
703}
704impl OffsetFetchResponsePartitions<'_> {
705 pub fn to_owned(&self) -> crate::owned::offset_fetch_response::OffsetFetchResponsePartitions {
706 crate::owned::offset_fetch_response::OffsetFetchResponsePartitions {
707 partition_index: (self.partition_index),
708 committed_offset: (self.committed_offset),
709 committed_leader_epoch: (self.committed_leader_epoch),
710 metadata: (self.metadata).map(std::string::ToString::to_string),
711 error_code: (self.error_code),
712 unknown_tagged_fields: self.unknown_tagged_fields.clone(),
713 }
714 }
715}
716impl Encode for OffsetFetchResponsePartitions<'_> {
717 fn encode<B: BufMut>(&self, buf: &mut B, version: i16) -> Result<(), ProtocolError> {
718 let flex = version >= 6;
719 if version >= 8 {
720 put_i32(buf, self.partition_index);
721 }
722 if version >= 8 {
723 put_i64(buf, self.committed_offset);
724 }
725 if version >= 8 {
726 put_i32(buf, self.committed_leader_epoch);
727 }
728 if version >= 8 {
729 if flex {
730 put_compact_nullable_string(buf, self.metadata);
731 } else {
732 put_nullable_string(buf, self.metadata);
733 }
734 }
735 if version >= 8 {
736 put_i16(buf, self.error_code);
737 }
738 if flex {
739 let tagged = WriteTaggedFields::new();
740 tagged.write(buf, &self.unknown_tagged_fields);
741 }
742 Ok(())
743 }
744 fn encoded_len(&self, version: i16) -> usize {
745 let flex = version >= 6;
746 let mut n: usize = 0;
747 if version >= 8 {
748 n += 4;
749 }
750 if version >= 8 {
751 n += 8;
752 }
753 if version >= 8 {
754 n += 4;
755 }
756 if version >= 8 {
757 n += if flex {
758 compact_nullable_string_len(self.metadata)
759 } else {
760 nullable_string_len(self.metadata)
761 };
762 }
763 if version >= 8 {
764 n += 2;
765 }
766 if flex {
767 let known_pairs: Vec<(u32, usize)> = Vec::new();
768 n += tagged_fields_len(&known_pairs, &self.unknown_tagged_fields);
769 }
770 n
771 }
772}
773impl<'de> DecodeBorrow<'de> for OffsetFetchResponsePartitions<'de> {
774 fn decode_borrow(buf: &mut &'de [u8], version: i16) -> Result<Self, ProtocolError> {
775 let flex = version >= 6;
776 let mut out = Self::default();
777 if version >= 8 {
778 out.partition_index = get_i32(buf)?;
779 }
780 if version >= 8 {
781 out.committed_offset = get_i64(buf)?;
782 }
783 if version >= 8 {
784 out.committed_leader_epoch = get_i32(buf)?;
785 }
786 if version >= 8 {
787 out.metadata = if flex {
788 get_compact_nullable_string_borrowed(buf)?
789 } else {
790 get_nullable_string_borrowed(buf)?
791 };
792 }
793 if version >= 8 {
794 out.error_code = get_i16(buf)?;
795 }
796 if flex {
797 out.unknown_tagged_fields = read_tagged_fields(buf, |_tag, _payload| Ok(false))?;
798 }
799 Ok(out)
800 }
801}
802#[cfg(test)]
803impl OffsetFetchResponsePartitions<'_> {
804 #[must_use]
805 pub fn populated(version: i16) -> Self {
806 let mut m = Self::default();
807 if version >= 8 {
808 m.partition_index = 1i32;
809 }
810 if version >= 8 {
811 m.committed_offset = 1i64;
812 }
813 if version >= 8 {
814 m.committed_leader_epoch = 1i32;
815 }
816 if version >= 8 {
817 m.metadata = Some("x");
818 }
819 if version >= 8 {
820 m.error_code = 1i16;
821 }
822 m
823 }
824}